/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Diagnostics; using System.Linq.Expressions; using System.Reflection; using System.IO; using System.Security.Cryptography.X509Certificates; using System.Threading.Tasks; using Microsoft.Research.DryadLinq.Internal; using Microsoft.Research.Peloponnese.ClusterUtils; using Microsoft.Research.Peloponnese.Storage; namespace Microsoft.Research.DryadLinq { /// /// The executor to run DryadLINQ jobs. The current release only supports Dryad. /// public enum ExecutorKind { /// /// Run DryadLINQ using Dryad. /// DRYAD } /// /// The service platforms where you can run DryadLINQ. /// public enum PlatformKind { /// /// run directly on a YARN cluster /// YARN_NATIVE, /// /// run on a YARN cluster in Azure HDInsight /// YARN_AZURE, /// /// run locally at client side /// LOCAL } /// /// Base interface for cluster types that the DryadLinqContext constructor can accept. /// public interface DryadLinqCluster { /// /// Gets the service platform of this cluster. /// PlatformKind Kind { get; } /// /// Gets the hostname of the head node of the cluster. /// string HeadNode { get; } /// /// Gets the client DFS interface. /// IDfsClient DfsClient { get; } /// /// Gets the client cluster interface. /// /// An instnace of DryadLinqContext /// The client interface to the cluster ClusterClient Client(DryadLinqContext context); /// /// Makes a new unique URI for storing a dataset in the DFS. /// /// A user provided local path /// A new unique URI that can be used to store a dataset Uri MakeDefaultUri(string path); } /// /// The interface for a YARN native cluster. /// internal class DryadLinqYarnCluster : DryadLinqCluster { /// /// The hostname of the computer where the YarnLauncher program is running /// public string HeadNode { get; set; } /// /// The port where the YarnLauncher program is listening /// public int LauncherPort; /// /// The hostname of the computer where the default HDFS instance is running /// public string NameNode; /// /// The port that the Hdfs protocol is listening on /// public int HdfsPort; /// /// The port that the WebHdfs protocol is listening on /// public int WebHdfsPort; private WebHdfsClient _dfsClient; private NativeYarnClient _clusterClient; /// /// Make a new cluster object representing a YARN cluster with default ports /// /// The computer where the YarnLauncher is running public DryadLinqYarnCluster(string headNode) { HeadNode = headNode; LauncherPort = 8471; NameNode = headNode; HdfsPort = 9000; WebHdfsPort = 50070; _dfsClient = null; _clusterClient = null; } public PlatformKind Kind { get { return PlatformKind.YARN_NATIVE; } } public IDfsClient DfsClient { get { if (_dfsClient == null) { _dfsClient = new WebHdfsClient(HeadNode, HdfsPort, WebHdfsPort); } return _dfsClient; } } public ClusterClient Client(DryadLinqContext context) { if (_clusterClient == null) { _clusterClient = new NativeYarnClient(HeadNode, HdfsPort, LauncherPort); } return _clusterClient; } public Uri MakeDefaultUri(string path) { return _dfsClient.MakeDfsUri(path); } } /// /// The interface for a YARN Azure cluster. /// internal class DryadLinqAzureCluster : DryadLinqCluster { /// /// The name of the HDInsight cluster /// public string HeadNode { get { return _cluster.Result.Name; } } private readonly AzureSubscriptions _azureSubscriptions; private readonly Task _cluster; private readonly Task _dfsClient; private Task _clusterClient; /// /// Make a new cluster object representing an Azure HDInsight cluster, reading the details /// from a subscription stored in the Powershell defaults. /// /// The name of the HDInsight cluster public DryadLinqAzureCluster(string clusterName) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); _cluster = _azureSubscriptions.GetClusterAsync(clusterName); _dfsClient = _cluster.ContinueWith(c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, "staging")); } /// /// Make a new cluster object representing an Azure HDInsight cluster, specifying the details /// manually. /// /// The name of the HDInsight cluster /// The storage account to use for staging job resources /// The storage account container to use for staging job resources /// The storage account key, which will be looked up in the subscription if null public DryadLinqAzureCluster(string clusterName, string storageAccount, string storageContainer, string storageKey = null) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); if (storageKey != null) { _azureSubscriptions.AddAccount(storageAccount, storageKey); } _cluster = _azureSubscriptions.GetClusterAsync(clusterName) .ContinueWith(t => { t.Result.SetStorageAccount(storageAccount, storageKey); return t.Result; }); _dfsClient = _cluster.ContinueWith(c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, storageContainer)); } /// /// Make a new cluster object representing an Azure HDInsight cluster, specifying the details /// manually. /// /// The name of the HDInsight cluster /// The ID of the subscription to fetch cluster details from /// The thumbprint of the certificate associated with the subscription public DryadLinqAzureCluster(string clusterName, string subscriptionId, string certificateThumbprint) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); _azureSubscriptions.AddSubscription(subscriptionId, certificateThumbprint); _cluster = _azureSubscriptions.GetClusterAsync(clusterName); _dfsClient = _cluster.ContinueWith(c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, "staging")); } /// /// Make a new cluster object representing an Azure HDInsight cluster, specifying the details /// manually. /// /// The name of the HDInsight cluster /// The ID of the subscription to fetch cluster details from /// The certificate associated with the subscription public DryadLinqAzureCluster(string clusterName, string subscriptionId, X509Certificate2 certificate) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); _azureSubscriptions.AddSubscription(subscriptionId, certificate); _cluster = _azureSubscriptions.GetClusterAsync(clusterName); _dfsClient = _cluster.ContinueWith(c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, "staging")); } /// /// Make a new cluster object representing an Azure HDInsight cluster, specifying the details /// manually. /// /// The name of the HDInsight cluster /// The ID of the subscription to fetch cluster details from /// The thumbprint of the certificate associated with the subscription /// The storage account to use for staging job resources /// The storage account container to use for staging job resources /// The storage account key, which will be looked up in the subscription if null public DryadLinqAzureCluster(string clusterName, string subscriptionId, string certificateThumbprint, string storageAccount, string storageContainer, string storageKey = null) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); if (storageKey != null) { _azureSubscriptions.AddAccount(storageAccount, storageKey); } _azureSubscriptions.AddCluster(clusterName, storageAccount, storageKey, subscriptionId, certificateThumbprint); _cluster = _azureSubscriptions.GetClusterAsync(clusterName); _dfsClient = _cluster.ContinueWith(c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, storageContainer)); } /// /// Make a new cluster object representing an Azure HDInsight cluster, specifying the details /// manually /// /// The name of the HDInsight cluster /// The ID of the subscription to fetch cluster details from /// The certificate associated with the subscription /// The storage account to use for staging job resources /// The storage account container to use for staging job resources /// The storage account key, which will be looked up in the subscription if null public DryadLinqAzureCluster(string clusterName, string subscriptionId, X509Certificate2 certificate, string storageAccount, string storageContainer, string storageKey = null) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); if (storageKey != null) { _azureSubscriptions.AddAccount(storageAccount, storageKey); } _azureSubscriptions.AddCluster(clusterName, storageAccount, storageKey, subscriptionId, certificate); _cluster = _azureSubscriptions.GetClusterAsync(clusterName); _dfsClient = _cluster.ContinueWith( c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, storageContainer)); } public PlatformKind Kind { get { return PlatformKind.YARN_AZURE; } } internal AzureSubscriptions Subscriptions { get { return _azureSubscriptions; } } public AzureCluster Cluster { get { return _cluster.Result; } } public IDfsClient DfsClient { get { return _dfsClient.Result; } } public ClusterClient Client(DryadLinqContext context) { if (_clusterClient == null) { _clusterClient = _dfsClient.ContinueWith( c => new AzureYarnClient(_azureSubscriptions, c.Result, context.PeloponneseHomeDirectory, Cluster.Name)); } return _clusterClient.Result; } public Uri MakeDefaultUri(string path) { return AzureUtils.ToAzureUri(_dfsClient.Result.AccountName, _dfsClient.Result.ContainerName, path, null, _dfsClient.Result.AccountKey); } } /// /// Represents the context necessary to prepare and execute a DryadLinq Query, /// /// /// /// DryadLinqContext is the main entry point for the DryadLINQ framework. /// The context that is maintained by a DryadLinqContext instance includes /// configuration information. /// /// /// A DryadLinqContext may be reused by multiple queries and query executions. /// /// /// A DryadLinqContext may hold open connections to cluster services. /// To release these connections, call DryadLinqContext.Dispose(). /// /// public class DryadLinqContext : IDisposable, IEquatable { private ExecutorKind _executorKind = ExecutorKind.DRYAD; private PlatformKind _platformKind = PlatformKind.LOCAL; private string _headNode; private DryadLinqCluster _clusterDetails; private AzureSubscriptions _azureSubscriptions; private Version _clientVersion; private CompressionScheme _intermediateDataCompressionScheme = CompressionScheme.None; private CompressionScheme _outputCompressionScheme = CompressionScheme.None; private bool _compileForVertexDebugging = false; // Ship PDBs + No optimization private string _jobFriendlyName; private int? _jobMinNodes; private int? _jobMaxNodes; private string _nodeGroup; private int? _jobRuntimeLimit; private bool _localDebug = false; private bool _localExecution = false; private string _jobUsername = null; private string _jobPassword = null; private QueryTraceLevel _runtimeTraceLevel = QueryTraceLevel.Error; private string _graphManagerNode; private bool _enableSpeculativeDuplication = true; private bool _selectOrderPreserving = false; private bool _matchClientNetFrameworkVersion = true; private bool _multiThreading = true; private string _partitionUncPath = null; private string _storageSetScheme = null; private DryadLinqStringDictionary _jobEnvironmentVariables = new DryadLinqStringDictionary(); private DryadLinqStringList _resourcesToAdd = new DryadLinqStringList(); private DryadLinqStringList _resourcesToRemove = new DryadLinqStringList(); private bool _forceGC = false; private bool _isDisposed = false; private string _dryadHome; private string _peloponneseHome; private static DryadLinqCluster MakeCluster(string clusterName, PlatformKind kind) { if (kind == PlatformKind.LOCAL) { throw new DryadLinqException("Can't make a cluster of kind LOCAL"); } else if (kind == PlatformKind.YARN_NATIVE) { return new DryadLinqYarnCluster(clusterName); } else if (kind == PlatformKind.YARN_AZURE) { return new DryadLinqAzureCluster(clusterName); } else { throw new DryadLinqException("Unknown cluster kind " + kind); } } /// /// Initializes a new instance of the DryadLinqContext class for local execution. /// /// The number of local worker processes that should be started. /// The default scheme for storage. Defaults to partitioned file public DryadLinqContext(int numProcesses, string storageSetScheme = null) { this.CommonInit(); this._platformKind = PlatformKind.LOCAL; this._localExecution = true; this._headNode = "LocalExecution"; this._storageSetScheme = storageSetScheme; if (String.IsNullOrEmpty(this._storageSetScheme)) { this._storageSetScheme = DataPath.PARTFILE_URI_SCHEME; } this._jobMinNodes = numProcesses; // make an Azure subscriptions object just in case we want to access azure streams from local execution this._azureSubscriptions = new AzureSubscriptions(); } /// /// Initializes a new instance of the DryadLinqContext class for a YARN cluster. /// /// The head node of the cluster and DFS /// The service platform to run DryadLINQ jobs. Defaults to YARN Azure public DryadLinqContext(string clusterName, PlatformKind platform = PlatformKind.YARN_AZURE) : this(MakeCluster(clusterName, platform)) { } /// /// Initializes a new instance of the DryadLinqContext class for a specified cluster. /// /// The cluster to run DryadLINQ jobs public DryadLinqContext(DryadLinqCluster cluster) { // Verify that the head node is set if (String.IsNullOrEmpty(cluster.HeadNode)) { throw new DryadLinqException(DryadLinqErrorCode.ClusterNameMustBeSpecified, SR.ClusterNameMustBeSpecified); } this.CommonInit(); this._platformKind = cluster.Kind; this._headNode = cluster.HeadNode; this._clusterDetails = cluster; if (cluster.Kind == DryadLinq.PlatformKind.YARN_NATIVE) { this._storageSetScheme = DataPath.HDFS_URI_SCHEME; // make an Azure subscriptions object just in case we want to access azure streams from the native yarn cluster this._azureSubscriptions = new AzureSubscriptions(); } else if (cluster.Kind == DryadLinq.PlatformKind.YARN_AZURE) { this._storageSetScheme = DataPath.AZUREBLOB_URI_SCHEME; DryadLinqAzureCluster azureCluster = cluster as DryadLinqAzureCluster; this._azureSubscriptions = azureCluster.Subscriptions; } } private void CommonInit() { this._peloponneseHome = Peloponnese.ClusterUtils.ConfigHelpers.GetPPMHome(null); if (Microsoft.Research.Peloponnese.ClusterUtils.ConfigHelpers.RunningFromNugetPackage) { this._dryadHome = Microsoft.Research.Peloponnese.ClusterUtils.ConfigHelpers.GetPPMHome(null); } else { this._dryadHome = Environment.GetEnvironmentVariable(StaticConfig.DryadHomeVar); } } /// /// Gets and sets the job executor. The current release only supports Dryad. /// public ExecutorKind ExecutorKind { get { return this._executorKind; } set { _executorKind = value; } } /// /// Gets or sets the service platform /// public PlatformKind PlatformKind { get { return _platformKind; } set { _platformKind = value; } } /// /// Gets or sets the value specifying whether data passed between stages will be compressed. /// /// /// The default is true. /// public CompressionScheme IntermediateDataCompressionScheme { get { return this._intermediateDataCompressionScheme; } set { this._intermediateDataCompressionScheme = value; } } /// /// Gets or sets the value specifying the compression scheme for output data. /// /// /// The default is . /// public CompressionScheme OutputDataCompressionScheme { get { return this._outputCompressionScheme; } set { this._outputCompressionScheme = value; } } /// /// Gets or sets the value specifying whether to compile code with debugging support. /// /// /// If true, vertex code will be compiled with no code-level optimizations and a PDB will be generated. /// Also, the query execution job look for and include the PDB associated with every DLL resource /// that is part of the submitted job. /// The default is false. /// public bool CompileForVertexDebugging { get { return _compileForVertexDebugging; } set { _compileForVertexDebugging = value; } } /// /// Gets or sets the bin directory for Dryad. /// public string DryadHomeDirectory { get { return _dryadHome; } set { _dryadHome = value; } } /// /// Gets or sets the bin directory for Peloponnese. /// public string PeloponneseHomeDirectory { get { return _peloponneseHome; } set { _peloponneseHome = value; } } /// /// Gets or sets the head node for executing a DryadLinq query. /// public string HeadNode { get { return _headNode; } set { _headNode = value; } } /// /// Gets or sets the partition UNC path used when constructing a partitioned table. /// public string PartitionUncPath { get { return _partitionUncPath; } set { _partitionUncPath = value; } } /// /// Gets the cluster object used to run the DryadLINQ query /// internal DryadLinqCluster Cluster { get { return _clusterDetails; } } /// /// Gets the collection of environment variables associated with the DryadLINQ job. /// public IDictionary JobEnvironmentVariables { get { return _jobEnvironmentVariables; } } /// /// Gets or sets the descriptive name used to describe the DryadLINQ job. /// public string JobFriendlyName { get { return _jobFriendlyName; } set { _jobFriendlyName = value; } } /// /// Gets or sets the minimum number of cluster nodes for the DryadLINQ job. /// /// /// The default is null (no lower limit). May be overriden by cluster settings such as node templates. /// public int? JobMinNodes { get { return _jobMinNodes; } set { _jobMinNodes = value; } } /// /// Gets or sets the maximum number of cluster nodes for the DryadLINQ job. /// /// /// The default is null (no upper limit). May be overriden by cluster settings such as node templates. /// public int? JobMaxNodes { get { return _jobMaxNodes; } set { _jobMaxNodes = value; } } /// /// Gets or sets the name of the compute node group when running on the cluster. /// /// /// Creation and management of nodes groups is performed using the Cluster Manager. /// /// /// The default is null (no node group restriction). /// public string NodeGroup { get { return _nodeGroup; } set { _nodeGroup = value; } } /// /// Gets or sets the maximum execution time for the DryadLINQ job, in seconds. /// /// /// The default is null (no runtime limit). /// public int? JobRuntimeLimit { get { return _jobRuntimeLimit; } set { _jobRuntimeLimit = value; } } /// /// Enables or disables speculative duplication of vertices based on runtime performance analysis. /// /// /// The default is true. /// public bool EnableSpeculativeDuplication { get { return _enableSpeculativeDuplication; } set { _enableSpeculativeDuplication = value; } } /// /// Gets or sets the value specifying whether to use Local debugging mode. /// /// /// /// If true, the DryadLINQ query will execute in the current CLR via LINQ-to-Objects. /// This mode is particularly useful for debugging user-functions before attempting cluster execution. /// LocalDebug mode accesses input and output data as usual. /// /// /// LocalDebug mode does not perform vertex-code compilation. /// /// The default is false. /// public bool LocalDebug { get { return _localDebug; } set { _localDebug = value; } } /// /// Gets or sets the value specifying whether to use Local execution mode. /// /// /// /// If true, the DryadLINQ Query will execute by forking processes on the local /// computer instead of using a cluster. LocalExecution mode accesses HDFS as usual for /// input and output data. /// /// The default is false. /// public bool LocalExecution { get { return _localExecution; } set { _localExecution = value; } } /// /// Gets and sets the value specifying whether a vertex should break into the debugger /// public bool DebugBreak { get { return this.JobEnvironmentVariables.ContainsKey("DLINQ_DEBUGVERTEX"); } set { this.JobEnvironmentVariables["DLINQ_DEBUGVERTEX"] = "BREAK"; } } /// /// Get the list of resources to add to the DryadLINQ job. /// /// /// /// During query submission, some resources will be detected and added automatically. /// It is only necessary to add resources that are not detected automatically. /// /// /// Each resource should be a complete path to a file-based resource accessible /// from the local machine. /// /// public IList ResourcesToAdd { get { return _resourcesToAdd; } } /// /// Get the list of resources to be excluded from the DryadLINQ job. /// /// /// /// During query submission, some resources will be detected and added automatically. /// Remove resources that are detected automatically but that are not required for job execution. /// /// /// Each resource should be a complete path to a file-based resource accessible from the local machine. /// /// public IList ResourcesToRemove { get { return _resourcesToRemove; } } /// /// Gets or sets the RunAs password for jobs submitted to the cluster. /// /// /// The default is null (use the credentials of the current Thread) /// public string JobUsername { get { return _jobUsername; } set { _jobUsername = value; } } /// /// Gets or sets the RunAs password for jobs submitted to the cluster. /// /// /// The default is null (use the credentials of the current Thread) /// public string JobPassword { get { return _jobPassword; } set { _jobPassword = value; } } /// /// Gets or sets the trace level to use for DryadLINQ Query jobs. /// /// /// The RuntimeTraceLevel affects the logs produced by all components associated with the execution /// of a DryadLINQ Query job. /// /// The default is QueryTraceLevel.Error /// public QueryTraceLevel RuntimeTraceLevel { get { return _runtimeTraceLevel; } set { _runtimeTraceLevel = value; } } /// /// Gets or sets the node that should be used for running the Dryad Graph Manager task. /// /// /// If null, the Graph Manager task will run on an arbitrary machine that is allocated to the DryadLINQ job. /// public string GraphManagerNode { get { return _graphManagerNode; } set { _graphManagerNode = value; } } /// /// Gets or sets whether certain operators will preserve item ordering. /// When true, the Select, SelectMany and Where operators will preserve item ordering; /// otherwise, they may shuffle the input items as they are processed. /// public bool SelectOrderPreserving { get { return _selectOrderPreserving; } set { _selectOrderPreserving = value; } } /// /// Configures query jobs to be launched on the cluster nodes against a .NET framework version /// matching that of the client process. This should only be set if all cluster nodes are known to have /// the same .NET version as the client. /// When set to false (default), the vertex code will be compiled and run against .NET Framework 3.5. /// public bool MatchClientNetFrameworkVersion { get { return _matchClientNetFrameworkVersion; } set { _matchClientNetFrameworkVersion = value; } } /// /// Gets or sets whether user-defined methods and custom serializers may be called on /// multiple threads of a single process. /// /// /// This option affects the internal behavior of individual queries and applies to both the /// client process (for serialization and local-debug mode) and to vertex processes. /// This option does not have any serializing effect for queries that are submitted /// concurrently by one or more client processes. /// If true, user-defined methods may be called concurrently. /// If false, user-defined methods will be called without concurrency. /// public bool EnableMultiThreadingInVertex { get { return _multiThreading; } set { _multiThreading = value; } } /// /// Gets or sets whether to run GC after Moxie runs each task. /// /// /// This only works with Moxie (for now at least). /// public bool ForceGC { get { return _forceGC; } set { _forceGC = value; } } /// /// Version of the DryadLinq client components /// /// The version of the DryadLINQ DLL public Version ClientVersion() { ThrowIfDisposed(); if (_clientVersion == null) { try { Assembly asm = Assembly.GetExecutingAssembly(); _clientVersion = new Version(FileVersionInfo.GetVersionInfo(asm.Location).FileVersion); } catch (Exception ex) { throw new DryadLinqException(DryadLinqErrorCode.CouldNotGetClientVersion, SR.CouldNotGetClientVersion, ex); } } return _clientVersion; } internal DryadLinqJobExecutor MakeJobExecutor() { switch (this.ExecutorKind) { case ExecutorKind.DRYAD: { return new DryadLinqJobExecutor(this); } default: { throw new Exception("No implementation for scheduler: " + this.ExecutorKind.ToString()); } } } internal Uri MakeTemporaryStreamUri() { if (this._storageSetScheme == null) { throw new DryadLinqException("The storage scheme for temporary streams must be specified."); } DataProvider dataProvider = DataProvider.GetDataProvider(this._storageSetScheme); return dataProvider.GetTemporaryStreamUri(this, DryadLinqUtil.MakeUniqueName()); } /// /// Open a dataset as a DryadLinq specialized IQueryable{T}. /// /// The type of the records in the table. /// The name of the dataset. /// An IQueryable{T} representing the data. public IQueryable FromStore(string dataSetUri) { return FromStore(new Uri(dataSetUri)); } /// /// Open a dataset as a DryadLinq specialized IQueryable{T}. /// /// The type of the records in the table. /// The name of the dataset. /// An IQueryable{T} representing the data. public IQueryable FromStore(Uri dataSetUri) { ThrowIfDisposed(); DryadLinqQuery q = DataProvider.GetPartitionedTable(this, dataSetUri); q.CheckAndInitialize(); // force the data-info checks. return q; } /// /// Converts an IEnumerable{T} to a DryadLinq specialized IQueryable{T}. /// /// The type of the records in the table. /// The source data. /// An IQueryable{T} representing the data with DryadLinq query provider. /// /// The source data will be serialized to a temp stream. /// The resulting fileset has an auto-generated name and a temporary lease. /// public IQueryable FromEnumerable(IEnumerable data) { Uri dataSetName = this.MakeTemporaryStreamUri(); CompressionScheme compressionScheme = this.OutputDataCompressionScheme; DryadLinqMetaData metadata = new DryadLinqMetaData(this, typeof(T), dataSetName, compressionScheme); return DataProvider.StoreData(this, data, dataSetName, metadata, compressionScheme, true); } /// /// Register a named account with the specified storage key, so that key won't need to be specified in Azure blob URIs /// /// The name of the storage account /// The account's key public void RegisterAzureAccount(string storageAccountName, string storageAccountKey) { _azureSubscriptions.AddAccount(storageAccountName, storageAccountKey); } /// /// Get the key associated with a named account, or null if it is not registered or auto-detected from /// the subscriptions /// /// The name of the storage account /// The storage key, or null public string AzureAccountKey(string storageAccountName) { return _azureSubscriptions.GetAccountKeyAsync(storageAccountName).Result; } internal static DryadLinqContext GetContext(IQueryProvider provider) { DryadLinqProviderBase baseProvider = provider as DryadLinqProviderBase; if (baseProvider == null) { throw new DryadLinqException("Must be DryadLINQ query provider."); } DryadLinqContext context = baseProvider.Context; context.ThrowIfDisposed(); return context; } /// /// Releases all resources used by the DryadLinqContext. /// public void Dispose() { if (!_isDisposed) { _isDisposed = true; } } internal void ThrowIfDisposed() { if (this._isDisposed) { throw new DryadLinqException(DryadLinqErrorCode.ContextDisposed, SR.ContextDisposed); } } /// /// Determines whether this instance of DryadLinqContext is equal to another instance /// of . /// /// The other DryadLinqContext instance /// true if the two instances are equal public virtual bool Equals(DryadLinqContext context) { return (this.IntermediateDataCompressionScheme == context.IntermediateDataCompressionScheme && this.OutputDataCompressionScheme == context.OutputDataCompressionScheme && this.CompileForVertexDebugging == context.CompileForVertexDebugging && this.DryadHomeDirectory == context.DryadHomeDirectory && this.PeloponneseHomeDirectory == context.PeloponneseHomeDirectory && this.HeadNode == context.HeadNode && this.Cluster == context.Cluster && this.PartitionUncPath == context.PartitionUncPath && this.JobMinNodes == context.JobMinNodes && this.JobMaxNodes == context.JobMaxNodes && this.NodeGroup == context.NodeGroup && this.JobRuntimeLimit == context.JobRuntimeLimit && this.EnableSpeculativeDuplication == context.EnableSpeculativeDuplication && this.LocalDebug == context.LocalDebug && this.LocalExecution == context.LocalExecution && this.PlatformKind == context.PlatformKind && this.JobUsername == context.JobUsername && this.JobPassword == context.JobPassword && this.RuntimeTraceLevel == context.RuntimeTraceLevel && this.GraphManagerNode == context.GraphManagerNode && this.SelectOrderPreserving == context.SelectOrderPreserving && this.MatchClientNetFrameworkVersion == context.MatchClientNetFrameworkVersion && this.EnableMultiThreadingInVertex == context.EnableMultiThreadingInVertex && this.ForceGC == context.ForceGC); } } }