/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Diagnostics; using System.Linq.Expressions; using System.Reflection; using System.IO; using System.Security.Cryptography.X509Certificates; using System.Threading.Tasks; using Microsoft.Research.DryadLinq.Internal; using Microsoft.Research.Peloponnese.ClusterUtils; using Microsoft.Research.Peloponnese.Shared; using Microsoft.Research.Peloponnese.Hdfs; using Microsoft.Research.Peloponnese.WebHdfs; using Microsoft.Research.Peloponnese.Azure; using Microsoft.Research.Peloponnese.Yarn; namespace Microsoft.Research.DryadLinq { /// /// The executor to run DryadLINQ jobs. The current release only supports Dryad. /// public enum ExecutorKind { /// /// Run DryadLINQ using Dryad. /// DRYAD } /// /// The service platforms where you can run DryadLINQ. /// public enum PlatformKind { /// /// run directly on a YARN cluster /// YARN_NATIVE, /// /// run on a YARN cluster in Azure HDInsight /// YARN_AZURE, /// /// run locally at client side /// LOCAL } /// /// Base interface for cluster types that the DryadLinqContext constructor can accept. /// public interface DryadLinqCluster { /// /// Gets the service platform of this cluster. /// PlatformKind Kind { get; } /// /// Gets the hostname of the head node of the cluster. /// string HeadNode { get; } /// /// Gets the client DFS interface. /// DfsClient DfsClient { get; } /// /// Gets the client cluster interface. /// /// An instnace of DryadLinqContext /// The client interface to the cluster ClusterClient Client(DryadLinqContext context); /// /// Makes a new URI for storing a dataset in the DFS. /// /// A user provided local path /// A new URI that can be used to store a dataset Uri MakeInternalClusterUri(params string[] path); /// /// The maximum number of nodes to use for a job if JobMaxNodes is not set in DryadLinqContext /// int DefaultMaxNodes { get; } } /// /// The interface for a YARN native cluster. /// public class DryadLinqYarnCluster : DryadLinqCluster { /// /// The username for the job on the YARN cluster /// public string User; /// /// The hostname of the computer where the YARN Resource Node is running /// public string HeadNode { get; set; } /// /// The port that the YARN Resource Node is listening on /// public int YarnPort; /// /// The queue that should be used on the cluster /// public string Queue { get; set; } /// /// The hostname of the computer where the HDFS instance for resource staging is running /// public string NameNode; /// /// The port that the Hdfs protocol is listening on /// public int HdfsPort; /// /// The port that the WebHdfs protocol is listening on, or -1 if Java/YARN are present on the client /// public int WebHdfsPort; /// /// The hostname of the computer where the DryadLINQ YARN launcher is running, or null if Java/YARN are /// present on the client /// public string LauncherNode; /// /// The port where the DryadLINQ Yarn launcher is listening, or -1 if Java/YARN are /// present on the client /// public int LauncherPort; /// /// The number of containers a job should ask for, if JobMaxNodes and JobMinNodes are not overwritten in /// the DryadLinqContext /// public int DefaultNumberOfContainers; /// /// The amount of memory to be requested for the Application Master container /// public int ApplicationMasterMbMemory; /// /// The amount of memory to be requested for each container other than the Application Master /// public int ContainerMbMemory; private DfsClient _dfsClient; private NativeYarnClient _clusterClient; /// /// Make a new cluster object representing a YARN cluster with default ports for a client with Java installed /// /// username for the cluster job /// Number of containers the job will ask for, if JobMaxNodes and /// JobMinNodes are not overwritten in the DryadLinqContext /// /// The amount of memory, in Megabytes, that should be /// requested for the Application Master container /// The amount of memory, in Megabytes, that should be /// requested for containers other than the Application Master /// The queue to submit the job to, or null for "default" /// computer where the YARN resource node is running /// port the YARN resource node is listening on /// HDFS namenode to use for resource staging /// HDFS namenode port, or -1 for filesystem default public DryadLinqYarnCluster( string user, int defaultNumberOfContainers, int appMasterMbMemory, int containerMbMemory, string queue, string resourceNode, int yarnPort, string nameNode, int hdfsPort) { User = user; DefaultNumberOfContainers = defaultNumberOfContainers; ApplicationMasterMbMemory = appMasterMbMemory; ContainerMbMemory = containerMbMemory; if (queue == null) { Queue = "default"; } else { Queue = queue; } HeadNode = resourceNode; YarnPort = yarnPort; NameNode = nameNode; HdfsPort = hdfsPort; WebHdfsPort = -1; LauncherNode = null; LauncherPort = -1; _dfsClient = null; _clusterClient = null; } /// /// Make a new cluster object representing a YARN cluster with default ports for a client without Java installed /// /// username for the cluster job /// Number of containers the job will ask for, if JobMaxNodes and /// JobMinNodes are not overwritten in the DryadLinqContext /// The amount of memory, in Megabytes, that should be /// requested for the Application Master container /// The amount of memory, in Megabytes, that should be /// requested for containers other than the Application Master /// The queue to submit the job to, or null for "default" /// computer where the YARN resource node is running /// port the YARN resource node is listening on /// HDFS namenode to use for resource staging /// HDFS namenode port, or -1 for filesystem default /// HDFS WebHdfs port /// computer where DryadLINQ Yarn launcher is running /// port that DryadLINQ Yarn launcher is listening on public DryadLinqYarnCluster( string user, int defaultNumberOfContainers, int appMasterMbMemory, int containerMbMemory, string queue, string resourceNode, int yarnPort, string nameNode, int hdfsPort, int webHdfsPort, string launcherNode, int launcherPort) { User = user; DefaultNumberOfContainers = defaultNumberOfContainers; ApplicationMasterMbMemory = appMasterMbMemory; ContainerMbMemory = containerMbMemory; if (queue == null) { Queue = "default"; } else { Queue = queue; } HeadNode = resourceNode; YarnPort = yarnPort; NameNode = nameNode; HdfsPort = hdfsPort; WebHdfsPort = webHdfsPort; LauncherNode = launcherNode; LauncherPort = launcherPort; _dfsClient = null; _clusterClient = null; } /// /// Gets the service platform of this cluster. /// public PlatformKind Kind { get { return PlatformKind.YARN_NATIVE; } } /// /// Gets the client DFS interface. /// public DfsClient DfsClient { get { if (_dfsClient == null) { if (WebHdfsPort > 0) { _dfsClient = new WebHdfsClient(User, WebHdfsPort); } else { _dfsClient = new HdfsClient(User); } } return _dfsClient; } } /// /// Gets the client cluster interface. /// /// An instnace of DryadLinqContext /// The client interface to the cluster public ClusterClient Client(DryadLinqContext context) { if (_clusterClient == null) { if (LauncherNode == null) { string yarnHomeDirectory = Environment.GetEnvironmentVariable("HADOOP_COMMON_HOME"); if (yarnHomeDirectory == null) { throw new ApplicationException("Yarn client needs HADOOP_COMMON_HOME environment variable set to YARN installation"); } _clusterClient = new NativeYarnClient( HeadNode, YarnPort, DfsClient, MakeInternalClusterUri("user", User), "Microsoft.Research.Peloponnese.YarnLauncher.jar", yarnHomeDirectory); } else { _clusterClient = new NativeYarnClient( HeadNode, YarnPort, DfsClient, MakeInternalClusterUri("user", User), LauncherNode, LauncherPort); } } return _clusterClient; } /// /// Makes a new URI for storing a dataset in the DFS. /// /// A user provided local path /// A new URI that can be used to store a dataset public Uri MakeInternalClusterUri(params string[] path) { UriBuilder builder = new UriBuilder(); builder.Scheme = "hdfs"; builder.Host = NameNode; builder.Port = HdfsPort; return DfsClient.Combine(builder.Uri, path); } /// /// The maximum number of nodes to use for a job if JobMinNodes is not set in DryadLinqContext /// public int DefaultMaxNodes { get { return DefaultNumberOfContainers; } } } /// /// The interface for a YARN Azure cluster. /// internal class DryadLinqAzureCluster : DryadLinqCluster { /// /// The name of the HDInsight cluster /// public string HeadNode { get { return _cluster.Result.Name; } } private readonly AzureSubscriptions _azureSubscriptions; private readonly Task _cluster; private readonly Task _dfsClient; private Task _clusterClient; /// /// Make a new cluster object representing an Azure HDInsight cluster, reading the details /// from a subscription stored in the Powershell defaults. /// /// The name of the HDInsight cluster public DryadLinqAzureCluster(string clusterName) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); _cluster = _azureSubscriptions.GetClusterAsync(clusterName); _dfsClient = _cluster.ContinueWith( c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, "staging")); } /// /// Make a new cluster object representing an Azure HDInsight cluster, specifying the details /// manually. /// /// The name of the HDInsight cluster /// The storage account to use for staging job resources /// The storage account container to use for staging job resources /// The storage account key, which will be looked up in the subscription if null public DryadLinqAzureCluster(string clusterName, string storageAccount, string storageContainer, string storageKey = null) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); if (storageKey != null) { _azureSubscriptions.AddAccount(storageAccount, storageKey); } _cluster = _azureSubscriptions.GetClusterAsync(clusterName) .ContinueWith(t => { t.Result.SetStorageAccount(storageAccount, storageKey); return t.Result; }); _dfsClient = _cluster.ContinueWith( c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, storageContainer)); } /// /// Make a new cluster object representing an Azure HDInsight cluster, specifying the details /// manually. /// /// The name of the HDInsight cluster /// The ID of the subscription to fetch cluster details from /// The thumbprint of the certificate associated with the subscription public DryadLinqAzureCluster(string clusterName, string subscriptionId, string certificateThumbprint) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); _azureSubscriptions.AddSubscription(subscriptionId, certificateThumbprint); _cluster = _azureSubscriptions.GetClusterAsync(clusterName); _dfsClient = _cluster.ContinueWith(c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, "staging")); } /// /// Make a new cluster object representing an Azure HDInsight cluster, specifying the details /// manually. /// /// The name of the HDInsight cluster /// The ID of the subscription to fetch cluster details from /// The certificate associated with the subscription public DryadLinqAzureCluster(string clusterName, string subscriptionId, X509Certificate2 certificate) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); _azureSubscriptions.AddSubscription(subscriptionId, certificate); _cluster = _azureSubscriptions.GetClusterAsync(clusterName); _dfsClient = _cluster.ContinueWith( c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, "staging")); } /// /// Make a new cluster object representing an Azure HDInsight cluster, specifying the details /// manually. /// /// The name of the HDInsight cluster /// The ID of the subscription to fetch cluster details from /// The thumbprint of the certificate associated with the subscription /// The storage account to use for staging job resources /// The storage account container to use for staging job resources /// The storage account key, which will be looked up in the subscription if null public DryadLinqAzureCluster(string clusterName, string subscriptionId, string certificateThumbprint, string storageAccount, string storageContainer, string storageKey = null) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); if (storageKey != null) { _azureSubscriptions.AddAccount(storageAccount, storageKey); } _azureSubscriptions.AddCluster(clusterName, storageAccount, storageKey, subscriptionId, certificateThumbprint); _cluster = _azureSubscriptions.GetClusterAsync(clusterName); _dfsClient = _cluster.ContinueWith( c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, storageContainer)); } /// /// Make a new cluster object representing an Azure HDInsight cluster, specifying the details /// manually /// /// The name of the HDInsight cluster /// The ID of the subscription to fetch cluster details from /// The certificate associated with the subscription /// The storage account to use for staging job resources /// The storage account container to use for staging job resources /// The storage account key, which will be looked up in the subscription if null public DryadLinqAzureCluster(string clusterName, string subscriptionId, X509Certificate2 certificate, string storageAccount, string storageContainer, string storageKey = null) { // start fetching details about the subscriptions, available clusters, etc. _azureSubscriptions = new AzureSubscriptions(); if (storageKey != null) { _azureSubscriptions.AddAccount(storageAccount, storageKey); } _azureSubscriptions.AddCluster(clusterName, storageAccount, storageKey, subscriptionId, certificate); _cluster = _azureSubscriptions.GetClusterAsync(clusterName); _dfsClient = _cluster.ContinueWith( c => new AzureDfsClient(c.Result.StorageAccount, c.Result.StorageKey, storageContainer)); } public PlatformKind Kind { get { return PlatformKind.YARN_AZURE; } } internal AzureSubscriptions Subscriptions { get { return _azureSubscriptions; } } public AzureCluster Cluster { get { return _cluster.Result; } } public DfsClient DfsClient { get { return _dfsClient.Result; } } public ClusterClient Client(DryadLinqContext context) { if (_clusterClient == null) { _clusterClient = _dfsClient.ContinueWith( c => new AzureYarnClient(_azureSubscriptions, c.Result, MakeInternalClusterUri(""), context.PeloponneseHomeDirectory, Cluster.Name)); } return _clusterClient.Result; } public Uri MakeInternalClusterUri(params string[] path) { return DfsClient.Combine( Microsoft.Research.Peloponnese.Azure.Utils.ToAzureUri( _dfsClient.Result.AccountName, _dfsClient.Result.ContainerName, "", null, _dfsClient.Result.AccountKey), path); } /// /// The maximum number of nodes to use for a job if JobMinNodes is not set in DryadLinqContext /// public int DefaultMaxNodes { get { return -1; } } } /// /// Represents the context necessary to prepare and execute a DryadLinq Query, /// /// /// /// DryadLinqContext is the main entry point for the DryadLINQ framework. /// The context that is maintained by a DryadLinqContext instance includes /// configuration information. /// /// /// A DryadLinqContext may be reused by multiple queries and query executions. /// /// /// A DryadLinqContext may hold open connections to cluster services. /// To release these connections, call DryadLinqContext.Dispose(). /// /// public class DryadLinqContext : IDisposable, IEquatable { private ExecutorKind _executorKind = ExecutorKind.DRYAD; private PlatformKind _platformKind = PlatformKind.LOCAL; private string _headNode; private DryadLinqCluster _clusterDetails; private AzureSubscriptions _azureSubscriptions; private Version _clientVersion; private CompressionScheme _intermediateDataCompressionScheme = CompressionScheme.None; private CompressionScheme _outputCompressionScheme = CompressionScheme.None; private bool _compileForVertexDebugging = false; // Ship PDBs + No optimization private string _jobFriendlyName; private int? _jobMinNodes; private int? _jobMaxNodes; private int _threadsPerWorker; private int _amMemoryMb; private int _containerMemoryMb; private string _queue = "default"; private string _nodeGroup; private int? _jobRuntimeLimit; private bool _localDebug = false; private string _jobUsername = null; private string _jobPassword = null; private QueryLoggingLevel _runtimeLoggingLevel = QueryLoggingLevel.Error; private string _graphManagerNode; private bool _enableSpeculativeDuplication = true; private bool _selectOrderPreserving = false; private bool _matchClientNetFrameworkVersion = true; private string _partitionUncPath = null; private string _storageSetScheme = null; private DryadLinqStringDictionary _jobEnvironmentVariables = new DryadLinqStringDictionary(); private DryadLinqStringList _resourcesToAdd = new DryadLinqStringList(); private DryadLinqStringList _resourcesToRemove = new DryadLinqStringList(); private bool _forceGC = false; private bool _isDisposed = false; private string _dryadHome; private string _peloponneseHome; private static DryadLinqCluster MakeCluster(string clusterName, PlatformKind kind) { return MakeCluster(clusterName, clusterName, kind); } private static DryadLinqCluster MakeCluster(string headNode, string nameNode, PlatformKind kind) { if (kind == PlatformKind.LOCAL) { throw new DryadLinqException("Can't make a cluster of kind LOCAL"); } else if (kind == PlatformKind.YARN_NATIVE) { throw new DryadLinqException("Can't make a cluster of kind YARN_NATIVE: make a DryadLinqYarnCluster object"); } else if (kind == PlatformKind.YARN_AZURE) { return new DryadLinqAzureCluster(headNode); } else { throw new DryadLinqException("Unknown cluster kind " + kind); } } /// /// Initializes a new instance of the DryadLinqContext class for local execution. /// /// The number of local worker processes that should be started. /// The default scheme for storage. Defaults to partitioned file public DryadLinqContext(int numProcesses, string storageSetScheme = null) { this.CommonInit(); this._platformKind = PlatformKind.LOCAL; this._headNode = "LocalExecution"; this._storageSetScheme = storageSetScheme; if (String.IsNullOrEmpty(this._storageSetScheme)) { this._storageSetScheme = DataPath.PARTFILE_URI_SCHEME; } this._jobMaxNodes = numProcesses; this._threadsPerWorker = Environment.ProcessorCount; // make an Azure subscriptions object just in case we want to access azure streams from local execution this._azureSubscriptions = new AzureSubscriptions(); } /// /// Initializes a new instance of the DryadLinqContext class for a YARN cluster. /// /// The head node of the cluster and DFS /// The service platform to run DryadLINQ jobs. Defaults to YARN Azure public DryadLinqContext(string clusterName, PlatformKind platform = PlatformKind.YARN_AZURE) : this(MakeCluster(clusterName, platform)) { } /// /// Initializes a new instance of the DryadLinqContext class for a specified cluster. /// /// The cluster to run DryadLINQ jobs public DryadLinqContext(DryadLinqCluster cluster) { // Verify that the head node is set if (String.IsNullOrEmpty(cluster.HeadNode)) { throw new DryadLinqException(DryadLinqErrorCode.ClusterNameMustBeSpecified, SR.ClusterNameMustBeSpecified); } this.CommonInit(); this._platformKind = cluster.Kind; this._headNode = cluster.HeadNode; this._clusterDetails = cluster; this._jobMaxNodes = cluster.DefaultMaxNodes; this._threadsPerWorker = Environment.ProcessorCount; if (cluster.Kind == DryadLinq.PlatformKind.YARN_NATIVE) { this._storageSetScheme = DataPath.HDFS_URI_SCHEME; // make an Azure subscriptions object just in case we want to access azure streams from the native yarn cluster this._azureSubscriptions = new AzureSubscriptions(); DryadLinqYarnCluster yarnCluster = cluster as DryadLinqYarnCluster; if (yarnCluster == null) { throw new DryadLinqException("Expected DryadLinqYarnCluster cluster object when running on YARN"); } this._amMemoryMb = yarnCluster.ApplicationMasterMbMemory; this._containerMemoryMb = yarnCluster.ContainerMbMemory; this._queue = yarnCluster.Queue; } else if (cluster.Kind == DryadLinq.PlatformKind.YARN_AZURE) { this._storageSetScheme = DataPath.AZUREBLOB_URI_SCHEME; DryadLinqAzureCluster azureCluster = cluster as DryadLinqAzureCluster; this._azureSubscriptions = azureCluster.Subscriptions; } } private void CommonInit() { this._peloponneseHome = Peloponnese.ClusterUtils.ConfigHelpers.GetPPMHome(null); if (Microsoft.Research.Peloponnese.ClusterUtils.ConfigHelpers.RunningFromNugetPackage) { this._dryadHome = Microsoft.Research.Peloponnese.ClusterUtils.ConfigHelpers.GetPPMHome(null); } else { this._dryadHome = Environment.GetEnvironmentVariable(StaticConfig.DryadHomeVar); } this._amMemoryMb = -1; this._containerMemoryMb = -1; } /// /// Gets and sets the job executor. The current release only supports Dryad. /// public ExecutorKind ExecutorKind { get { return this._executorKind; } set { _executorKind = value; } } /// /// Gets or sets the service platform /// public PlatformKind PlatformKind { get { return _platformKind; } set { _platformKind = value; } } /// /// Gets or sets the value specifying whether data passed between stages will be compressed. /// /// /// The default is true. /// public CompressionScheme IntermediateDataCompressionScheme { get { return this._intermediateDataCompressionScheme; } set { this._intermediateDataCompressionScheme = value; } } /// /// Gets or sets the value specifying the compression scheme for output data. /// /// /// The default is . /// public CompressionScheme OutputDataCompressionScheme { get { return this._outputCompressionScheme; } set { this._outputCompressionScheme = value; } } /// /// Gets or sets the value specifying whether to compile code with debugging support. /// /// /// If true, vertex code will be compiled with no code-level optimizations and a PDB will be generated. /// Also, the query execution job look for and include the PDB associated with every DLL resource /// that is part of the submitted job. /// The default is false. /// public bool CompileForVertexDebugging { get { return _compileForVertexDebugging; } set { _compileForVertexDebugging = value; } } /// /// Gets or sets the bin directory for Dryad. /// public string DryadHomeDirectory { get { return _dryadHome; } set { _dryadHome = value; } } /// /// Gets or sets the bin directory for Peloponnese. /// public string PeloponneseHomeDirectory { get { return _peloponneseHome; } set { _peloponneseHome = value; } } /// /// Gets or sets the head node for executing a DryadLinq query. /// public string HeadNode { get { return _headNode; } set { _headNode = value; } } /// /// Gets or sets the partition UNC path used when constructing a partitioned table. /// public string PartitionUncPath { get { return _partitionUncPath; } set { _partitionUncPath = value; } } internal DfsClient GetHdfsClient { get { if (_clusterDetails != null && _clusterDetails is DryadLinqYarnCluster) { return _clusterDetails.DfsClient; } if (Environment.GetEnvironmentVariable("HADOOP_COMMON_HOME") == null || Environment.GetEnvironmentVariable("JAVA_HOME") == null) { return new WebHdfsClient(null, 50070); } else { return new HdfsClient(); } } } /// /// Gets the cluster object used to run the DryadLINQ query /// internal DryadLinqCluster Cluster { get { return _clusterDetails; } } /// /// Gets the collection of environment variables associated with the DryadLINQ job. /// public IDictionary JobEnvironmentVariables { get { return _jobEnvironmentVariables; } } /// /// Gets or sets the descriptive name used to describe the DryadLINQ job. /// public string JobFriendlyName { get { return _jobFriendlyName; } set { _jobFriendlyName = value; } } /// /// Gets or sets the minimum number of cluster nodes for the DryadLINQ job. /// /// /// The default is null (no lower limit). May be overriden by cluster settings such as node templates. /// public int? JobMinNodes { get { return _jobMinNodes; } set { _jobMinNodes = value; } } /// /// Gets or sets the maximum number of cluster nodes for the DryadLINQ job. /// /// /// The default is null (no upper limit). May be overriden by cluster settings such as node templates. /// public int? JobMaxNodes { get { return _jobMaxNodes; } set { _jobMaxNodes = value; } } /// /// Gets or sets the number of threads each DryadLINQ worker vertex will use /// /// /// The default is 1. /// public int ThreadsPerWorker { get { return _threadsPerWorker; } set { _threadsPerWorker = value; } } /// /// Gets or sets the amount of memory in Megabytes requested for the Application Master container. /// public int ApplicationMasterMbMemory { get { return _amMemoryMb; } set { _amMemoryMb = value; } } /// /// Gets or sets the amount of memory in Megabytes requested for the worker containers. /// public int ContainerMbMemory { get { return _containerMemoryMb; } set { _containerMemoryMb = value; } } /// /// The Queue that should be used for job submission. The default queue is the default value. /// public string Queue { get { return _queue; } set { _queue = value; } } /// /// Gets or sets the name of the compute node group when running on the cluster. /// /// /// Creation and management of nodes groups is performed using the Cluster Manager. /// /// /// The default is null (no node group restriction). /// public string NodeGroup { get { return _nodeGroup; } set { _nodeGroup = value; } } /// /// Gets or sets the maximum execution time for the DryadLINQ job, in seconds. /// /// /// The default is null (no runtime limit). /// public int? JobRuntimeLimit { get { return _jobRuntimeLimit; } set { _jobRuntimeLimit = value; } } /// /// Enables or disables speculative duplication of vertices based on runtime performance analysis. /// /// /// The default is true. /// public bool EnableSpeculativeDuplication { get { return _enableSpeculativeDuplication; } set { _enableSpeculativeDuplication = value; } } /// /// Gets or sets the value specifying whether to use Local debugging mode. /// /// /// /// If true, the DryadLINQ query will execute in the current CLR via LINQ-to-Objects. /// This mode is particularly useful for debugging user-functions before attempting cluster execution. /// LocalDebug mode accesses input and output data as usual. /// /// /// LocalDebug mode does not perform vertex-code compilation. /// /// The default is false. /// public bool LocalDebug { get { return _localDebug; } set { _localDebug = value; } } /// /// Gets and sets the value specifying whether a vertex should break into the debugger /// public bool DebugBreak { get { return this.JobEnvironmentVariables.ContainsKey("DLINQ_DEBUGVERTEX"); } set { this.JobEnvironmentVariables["DLINQ_DEBUGVERTEX"] = "BREAK"; } } /// /// Get the list of resources to add to the DryadLINQ job. /// /// /// /// During query submission, some resources will be detected and added automatically. /// It is only necessary to add resources that are not detected automatically. /// /// /// Each resource should be a complete path to a file-based resource accessible /// from the local machine. /// /// public IList ResourcesToAdd { get { return _resourcesToAdd; } } /// /// Get the list of resources to be excluded from the DryadLINQ job. /// /// /// /// During query submission, some resources will be detected and added automatically. /// Remove resources that are detected automatically but that are not required for job execution. /// /// /// Each resource should be a complete path to a file-based resource accessible from the local machine. /// /// public IList ResourcesToRemove { get { return _resourcesToRemove; } } /// /// Gets or sets the RunAs password for jobs submitted to the cluster. /// /// /// The default is null (use the credentials of the current Thread) /// public string JobUsername { get { return _jobUsername; } set { _jobUsername = value; } } /// /// Gets or sets the RunAs password for jobs submitted to the cluster. /// /// /// The default is null (use the credentials of the current Thread) /// public string JobPassword { get { return _jobPassword; } set { _jobPassword = value; } } /// /// Gets or sets the logging level to use for DryadLINQ Query jobs. /// /// /// The RuntimeLoggingLevel affects the logs produced by all components associated with the execution /// of a DryadLINQ Query job. /// /// The default is QueryLoggingLevel.Error /// public QueryLoggingLevel RuntimeLoggingLevel { get { return _runtimeLoggingLevel; } set { _runtimeLoggingLevel = value; } } /// /// Gets or sets the node that should be used for running the Dryad Graph Manager task. /// /// /// If null, the Graph Manager task will run on an arbitrary machine that is allocated to the DryadLINQ job. /// public string GraphManagerNode { get { return _graphManagerNode; } set { _graphManagerNode = value; } } /// /// Gets or sets whether certain operators will preserve item ordering. /// When true, the Select, SelectMany and Where operators will preserve item ordering; /// otherwise, they may shuffle the input items as they are processed. /// public bool SelectOrderPreserving { get { return _selectOrderPreserving; } set { _selectOrderPreserving = value; } } /// /// Configures query jobs to be launched on the cluster nodes against a .NET framework version /// matching that of the client process. This should only be set if all cluster nodes are known to have /// the same .NET version as the client. /// When set to false (default), the vertex code will be compiled and run against .NET Framework 3.5. /// public bool MatchClientNetFrameworkVersion { get { return _matchClientNetFrameworkVersion; } set { _matchClientNetFrameworkVersion = value; } } /// /// Gets or sets whether to run GC after Moxie runs each task. /// /// /// This only works with Moxie (for now at least). /// public bool ForceGC { get { return _forceGC; } set { _forceGC = value; } } /// /// Version of the DryadLinq client components /// /// The version of the DryadLINQ DLL public Version ClientVersion() { ThrowIfDisposed(); if (_clientVersion == null) { try { Assembly asm = Assembly.GetExecutingAssembly(); _clientVersion = new Version(FileVersionInfo.GetVersionInfo(asm.Location).FileVersion); } catch (Exception ex) { throw new DryadLinqException(DryadLinqErrorCode.CouldNotGetClientVersion, SR.CouldNotGetClientVersion, ex); } } return _clientVersion; } internal DryadLinqJobExecutor MakeJobExecutor() { switch (this.ExecutorKind) { case ExecutorKind.DRYAD: { return new DryadLinqJobExecutor(this); } default: { throw new Exception("No implementation for scheduler: " + this.ExecutorKind.ToString()); } } } internal Uri MakeTemporaryStreamUri() { if (this._storageSetScheme == null) { throw new DryadLinqException("The storage scheme for temporary streams must be specified."); } DataProvider dataProvider = DataProvider.GetDataProvider(this._storageSetScheme); return dataProvider.GetTemporaryStreamUri(this, DryadLinqUtil.MakeUniqueName()); } /// /// Open a dataset as a DryadLinq specialized IQueryable{T}. /// /// The type of the records in the table /// The name of the dataset /// The function to deserialize the input dataset /// An IQueryable{T} representing the data public IQueryable FromStore(string dataSetUri, Expression>> deserializer = null) { return this.FromStore(new Uri(dataSetUri), deserializer); } /// /// Open a dataset as a DryadLinq specialized IQueryable{T}. /// /// The type of the records in the table /// The name of the dataset /// The function to deserialize the input dataset /// An IQueryable{T} representing the data public IQueryable FromStore(Uri dataSetUri, Expression>> deserializer = null) { ThrowIfDisposed(); DryadLinqQuery q = DataProvider.GetPartitionedTable(this, dataSetUri, deserializer); q.CheckAndInitialize(); // Must initialize! return q; } /// /// Converts an IEnumerable{T} to a DryadLinq specialized IQueryable{T}. /// /// The type of the records in the table. /// The source data. /// An optional stream-based serializer /// An optional stream-based deserializer /// An IQueryable{T} representing the data with DryadLinq query provider. /// /// The source data will be serialized to a temp stream. /// The resulting fileset has an auto-generated name and a temporary lease. /// public IQueryable FromEnumerable(IEnumerable data, Expression, Stream>> serializer = null, Expression>> deserializer = null) { if ((serializer == null) ^ (deserializer == null)) { throw new DryadLinqException("Must provide both serializer and deserializer."); } Uri dataSetName = this.MakeTemporaryStreamUri(); CompressionScheme compressionScheme = this.OutputDataCompressionScheme; DryadLinqMetaData metadata = new DryadLinqMetaData(this, typeof(T), dataSetName, compressionScheme); return DataProvider.StoreData(this, data, dataSetName, metadata, compressionScheme, true, serializer, deserializer); } /// /// Register a named account with the specified storage key, so that key won't need to be specified in Azure blob URIs /// /// The name of the storage account /// The account's key public void RegisterAzureAccount(string storageAccountName, string storageAccountKey) { _azureSubscriptions.AddAccount(storageAccountName, storageAccountKey); } /// /// Get the key associated with a named account, or null if it is not registered or auto-detected from /// the subscriptions /// /// The name of the storage account /// The storage key, or null public string AzureAccountKey(string storageAccountName) { return _azureSubscriptions.GetAccountKeyAsync(storageAccountName).Result; } internal static DryadLinqContext GetContext(IQueryProvider provider) { DryadLinqProviderBase baseProvider = provider as DryadLinqProviderBase; if (baseProvider == null) { throw new DryadLinqException("Must be DryadLINQ query provider."); } DryadLinqContext context = baseProvider.Context; context.ThrowIfDisposed(); return context; } /// /// Releases all resources used by the DryadLinqContext. /// public void Dispose() { if (!_isDisposed) { _isDisposed = true; } } internal void ThrowIfDisposed() { if (this._isDisposed) { throw new DryadLinqException(DryadLinqErrorCode.ContextDisposed, SR.ContextDisposed); } } /// /// Determines whether this instance of DryadLinqContext is equal to another instance /// of . /// /// The other DryadLinqContext instance /// true if the two instances are equal public virtual bool Equals(DryadLinqContext context) { return (this.IntermediateDataCompressionScheme == context.IntermediateDataCompressionScheme && this.OutputDataCompressionScheme == context.OutputDataCompressionScheme && this.CompileForVertexDebugging == context.CompileForVertexDebugging && this.DryadHomeDirectory == context.DryadHomeDirectory && this.PeloponneseHomeDirectory == context.PeloponneseHomeDirectory && this.HeadNode == context.HeadNode && this.Cluster == context.Cluster && this.PartitionUncPath == context.PartitionUncPath && this.JobMinNodes == context.JobMinNodes && this.JobMaxNodes == context.JobMaxNodes && this.NodeGroup == context.NodeGroup && this.JobRuntimeLimit == context.JobRuntimeLimit && this.EnableSpeculativeDuplication == context.EnableSpeculativeDuplication && this.LocalDebug == context.LocalDebug && this.PlatformKind == context.PlatformKind && this.JobUsername == context.JobUsername && this.JobPassword == context.JobPassword && this.RuntimeLoggingLevel == context.RuntimeLoggingLevel && this.GraphManagerNode == context.GraphManagerNode && this.SelectOrderPreserving == context.SelectOrderPreserving && this.MatchClientNetFrameworkVersion == context.MatchClientNetFrameworkVersion && this.ThreadsPerWorker == context.ThreadsPerWorker && this.ForceGC == context.ForceGC); } } }