/*
Copyright (c) Microsoft Corporation
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License
at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF
TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions and
limitations under the License.
*/
using Microsoft.Research.Calypso.Tools;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.RegularExpressions;
using System.Net;
using System.Diagnostics;
using Microsoft.Research.Peloponnese.Storage;
namespace Microsoft.Research.Calypso.JobObjectModel
{
///
/// Error during conversation with cluster.
///
public sealed class CalypsoClusterException : Exception
{
///
/// Create an exception about handling a cluster.
///
/// Exception message.
public CalypsoClusterException(string message) : base(message) { }
}
///
/// Identifier for a Dryad process; for now we are using globally-unique process GUID.
/// A Dryad Job identifier is always the same as the job manager process guid.
///
[Serializable]
public class DryadProcessIdentifier : IEquatable
{
///
/// Process identifier.
///
private string processIdentifier;
///
/// Used just for XML serialization.
///
public DryadProcessIdentifier()
{ }
///
/// create an indentifier struct.
/// Be sure to initialize all fields before use;
///
/// The id of the process (platform-dependent).
public DryadProcessIdentifier(string pid)
{
this.processIdentifier = pid;
}
///
/// Process identifier; public for serialization only.
///
public string ProcessIdentifier
{
get { return this.processIdentifier; }
set { this.processIdentifier = value; }
}
///
/// If true the process identifier is not known.
///
public bool IsUnknown { get { return this.ProcessIdentifier == null; } }
///
/// Human-readable description of the process identifier.
///
/// An empty string if the pid is not set.
public override string ToString()
{
if (this.ProcessIdentifier != null) return this.ProcessIdentifier;
return "";
}
///
/// Equality test.
///
/// Object to compare to.
/// True if both objects represent the same process id.
public override bool Equals(object obj)
{
if (!(obj is DryadProcessIdentifier))
return false;
return this.Equals((DryadProcessIdentifier)obj);
}
#region IEquatable Members
///
/// Equality test.
///
/// Process id to compare to.
/// True if the id's represent the same process.
public bool Equals(DryadProcessIdentifier other)
{
if (this.IsUnknown)
return other.IsUnknown;
if (other.IsUnknown)
return false;
return this.ProcessIdentifier.Equals(other.ProcessIdentifier);
}
///
/// Overriden implementation of getHashCode.
///
/// The hashcode of the process id.
public override int GetHashCode()
{
// ReSharper disable once BaseObjectGetHashCodeCallInGetHashCode
return base.GetHashCode();
}
#endregion
}
///
/// Brief summary of an executed DryadLINQ job.
///
[Serializable]
public sealed class DryadLinqJobSummary : IEquatable
{
///
/// Empty constructor for XML serialization.
///
public DryadLinqJobSummary()
{ }
///
/// Initialize a job summary.
///
/// Cluster where the job ran.
/// A string corresponding to the type of ClusterConfiguration.
/// Machine where job manager ran.
/// Id of job.
/// Guid of job manager process.
/// Id of job on the cluster.
/// Friendly name used.
/// Who ran the job.
/// Start date (not completion date).
/// Job status.
/// Estimated end running time.
/// Virtual cluster where job ran.
public DryadLinqJobSummary(
string cluster,
ClusterConfiguration.ClusterType clusterType,
string virtualcluster,
string machine,
string jobId,
string clusterJobId,
DryadProcessIdentifier jmProcessGuid,
string friendlyname,
string username,
DateTime date,
DateTime endTime,
ClusterJobInformation.ClusterJobStatus status)
{
this.VirtualCluster = virtualcluster;
this.Cluster = cluster;
this.ClusterType = clusterType;
this.Machine = machine;
this.Name = friendlyname;
this.User = username;
this.Date = date;
this.EndTime = endTime;
this.Status = status;
this.ManagerProcessGuid = jmProcessGuid;
this.JobID = jobId;
this.ClusterJobId = clusterJobId;
}
///
/// Cluster where the job ran.
///
public string Cluster { get; /*private*/ set; }
///
/// Id of cluster job that originated this DryadLinq job (can be used to find the cluster job from the dryadlinq job).
///
public string ClusterJobId { get; /*private*/ set; }
///
/// Cluster where the job ran.
///
public DateTime EndTime { get; /*private*/ set; }
///
/// String describing cluster type.
///
public ClusterConfiguration.ClusterType ClusterType { get; /*private*/ set; }
///
/// Virtual cluster where job ran.
///
public string VirtualCluster { get; /*private*/ set; }
///
/// (Friendly) name of the job.
///
public string Name { get; /*private*/ set; }
///
/// User who submitted job.
///
public string User { get; /*private*/ set; }
///
/// ID of job on the cluster.
///
public string JobID { get; /*private*/ set; }
///
/// The Guid of the job manager process.
///
public DryadProcessIdentifier ManagerProcessGuid { set; get; }
///
/// User who submitted job.
///
public string GetAlias()
{
int pos = User.IndexOf(@"\");
return User.Substring(pos + 1);
}
///
/// Date when job was submitted.
///
public DateTime Date { get; /*private*/ set; }
///
/// Did the job fail?
///
public ClusterJobInformation.ClusterJobStatus Status { get; /*internal*/ set; }
///
/// Machine where the job manager ran.
///
public string Machine { get; /*private*/ set; }
///
/// Get a short name for this job summary.
///
/// Short name of job summary.
public string ShortName()
{
// we use the starting time to uniquify the job name
return this.Date.ToString("s") + "-" + this.Name;
}
///
/// True if these two summaries are the same. The status and end time do not matter, since the job may still be running.
///
/// Summary to compare against.
/// True if they are equal.
public bool Equals(DryadLinqJobSummary other)
{
return this.Cluster == other.Cluster &&
this.ClusterJobId == other.ClusterJobId &&
this.Date == other.Date &&
this.Machine == other.Machine &&
this.Name == other.Name &&
this.User == other.User;
}
///
/// Hashcode proper for the equality test.
///
/// The object hashcode.
public override int GetHashCode()
{
return this.ClusterJobId.GetHashCode() ^ this.ClusterJobId.GetHashCode() ^ this.Date.GetHashCode() ^ this.Machine.GetHashCode() ^ this.Name.GetHashCode() ^ this.User.GetHashCode();
}
///
/// A string describing the unique identifying part of the summary.
/// Two different summaries may represent the same job at different times.
///
/// The part common to all jobs.
public string AsIdentifyingString()
{
StringBuilder builder = new StringBuilder();
builder.AppendFormat("Cluster={0} ClusterJobID={1} Date={2} Machine={3} Name={4} User={5}",
this.Cluster, this.ClusterJobId, this.Date, this.Machine, this.Name, this.User);
return builder.ToString();
}
}
///
/// This class is an abstraction of a cluster-level job, as opposed to a DryadLINQ job.
/// In Cosmos that's called a task, in HPC that's called a Job.
/// (In cosmos a task is a recurring job. In DryadLINQ running on top of cosmos, a task always contains exactly one job.)
///
public class ClusterJobInformation : IEquatable
{
///
/// Status of a cluster job.
///
public enum ClusterJobStatus
{
///
/// Job is still running.
///
Running,
///
/// Job has finished successfully.
///
Succeeded,
///
/// Job has finished and has failed.
///
Failed,
///
/// Job has been cancelled. Not precise on cosmos clusters.
///
Cancelled,
///
/// Could not determine job status.
///
Unknown,
};
///
/// True if job is finished, false if not, or unknown.
///
/// Job status.
/// True if the job is no longer running.
public static bool JobIsFinished(ClusterJobStatus status)
{
switch (status)
{
case ClusterJobInformation.ClusterJobStatus.Failed:
case ClusterJobInformation.ClusterJobStatus.Succeeded:
case ClusterJobInformation.ClusterJobStatus.Cancelled:
return true;
case ClusterJobInformation.ClusterJobStatus.Running:
case ClusterJobInformation.ClusterJobStatus.Unknown:
return false;
default:
throw new InvalidDataException("Invalid job status " + status);
}
}
///
/// Create a cluster job structure from a bunch of information.
///
/// Cluster where the job is running.
/// Cluster job guid.
/// Name of the cluster job.
/// User who submitted cluster job.
/// Last execution of cluster job.
/// Execution status.
/// Time the job ran.
/// Cluster where the job has run.
public ClusterJobInformation(
string cluster,
string virtualCluster,
string clusterJobGuid,
string jobName,
string username,
DateTime date,
TimeSpan runningTime,
ClusterJobStatus status)
{
this.VirtualCluster = virtualCluster;
this.Cluster = cluster;
this.ClusterJobID = clusterJobGuid;
this.Name = jobName;
this.User = username;
this.Date = date;
this.EstimatedRunningTime = runningTime;
this.Status = status;
this.JobSummary = null;
}
///
/// Name of cluster job.
///
public string Name { get; set; }
///
/// Uset who submitted cluster job.
///
public string User { get; set; }
///
/// Date when job was submitted.
///
public DateTime Date { get; set; }
///
/// ID of Job on cluster.
///
public string ClusterJobID { get; set; }
///
/// Status of the execution.
///
public ClusterJobStatus Status { get; set; }
///
/// Cluster where the job ran.
///
public string Cluster { get; set; }
///
/// In some installations a cluster is composed of multiple virtual clusters.
///
public string VirtualCluster { get; set; }
///
/// Is the cluster job information still available on the cluster?
///
public bool IsUnavailable { get; set; }
///
/// Cache here the associated job, if available. Null if not cached.
///
private DryadLinqJobSummary JobSummary { get; set; }
///
/// Estimated time the job ran.
///
public TimeSpan EstimatedRunningTime { get; set; }
///
/// If known, set the associated job summary.
///
/// Job summary for this cluster job.
public void SetAssociatedSummary(DryadLinqJobSummary summary)
{
this.JobSummary = summary;
}
///
/// Discover the dryadlinq job associated with a cluster job.
///
/// Cluster configuration.
/// The job, if any
/// Delegate used to report errors.
public DryadLinqJobSummary DiscoverDryadLinqJob(ClusterStatus status, StatusReporter reporter)
{
if (this.IsUnavailable)
return null;
if (this.JobSummary != null)
return this.JobSummary;
DryadLinqJobSummary j = status.DiscoverDryadLinqJobFromClusterJob(this, reporter);
if (j == null)
{
this.IsUnavailable = true;
}
return this.JobSummary = j;
}
///
/// Copy the content of a cluster job.
///
/// The value to copy from.
internal void Copy(ClusterJobInformation refresh)
{
this.Name = refresh.Name;
this.Status = refresh.Status;
this.User = refresh.User;
this.JobSummary = refresh.JobSummary;
this.ClusterJobID = refresh.ClusterJobID;
this.Date = refresh.Date;
this.IsUnavailable = refresh.IsUnavailable;
this.VirtualCluster = refresh.VirtualCluster;
this.EstimatedRunningTime = refresh.EstimatedRunningTime;
}
///
/// True if these two records represent the same job.
///
///
///
public bool Equals(ClusterJobInformation other)
{
return
this.Cluster == other.Cluster &&
this.VirtualCluster == other.VirtualCluster &&
this.Name == other.Name &&
this.User == other.User &&
this.ClusterJobID == other.ClusterJobID &&
this.Date == other.Date;
}
}
///
/// Serializable properties key-value pairs.
///
[Serializable]
public class PropertySetting
{
///
/// Property of a configuration.
///
public string Property { get; set; }
///
/// Value associated with property.
///
public string Value { get; set; }
///
/// Empty constructor, for serialization.
///
public PropertySetting()
{
}
///
/// Create a property setting.
///
/// Property name.
/// Value.
public PropertySetting(String prop, string value)
{
this.Property = prop;
this.Value = value;
}
}
///
/// The serializable data part of a clusterConfiguration.
///
[Serializable]
public class ClusterConfigurationSerialization
{
///
/// Cluster name.
///
public string Name { get; set; }
///
/// Cluster type.
///
public ClusterConfiguration.ClusterType Type { get; set; }
///
/// The other properties.
///
public List Properties { get; set; }
///
/// Create a ClusterConfiguration from its serialization.
///
/// The corresponding cluster configuration.
public ClusterConfiguration Create()
{
var config = ClusterConfiguration.CreateConfiguration(this.Type);
config.Name = this.Name;
for (int i = 0; i < this.Properties.Count; i++)
{
var property = config.GetType().GetProperty(this.Properties[i].Property);
property.SetValue(config, this.Properties[i].Value);
}
return config;
}
}
///
/// All configuration parameters descrbing the cluster setup should be here.
///
public abstract class ClusterConfiguration
{
///
/// The type of runtime for the cluster.
///
public enum ClusterType
{
///
/// Could not detect cluster version.
///
Unknown,
///
/// Fake cluster, used for post-mortem debugging; keeps some information about jobs in a local folder.
///
Cache,
///
/// Cluster emulated on a local machine
///
LocalEmulator,
///
/// Azure DFS client
///
AzureDfs,
///
/// Max type, unused; for enumerating.
///
MaxUnused,
};
///
/// Properties that can be edited.
///
/// List of properties that can be edited.
public abstract List GetPropertiesToEdit();
///
/// Must be called after setting all properties.
/// Returns null if initialization succeeds, an error otherwise.
///
public abstract string Initialize();
///
/// Create serialization data structure for this configuration.
///
/// The corresponding serialization.
public ClusterConfigurationSerialization ExtractData()
{
ClusterConfigurationSerialization result = new ClusterConfigurationSerialization()
{
Type = this.TypeOfCluster,
Name = this.Name,
Properties = new List()
};
foreach (var prop in this.GetPropertiesToEdit())
{
var property = this.GetType().GetProperty(prop);
var value = property.GetValue(this);
PropertySetting setting = new PropertySetting(prop, value != null ? value.ToString() : null);
result.Properties.Add(setting);
}
return result;
}
///
/// The serialization of all known clusters.
///
/// The serializations of all known clusters in a list.
public static List KnownClustersSerialization()
{
var result = new List();
foreach (var clus in KnownClusters.Values)
{
result.Add(clus.ExtractData());
}
return result;
}
///
/// Reconstruct the known clusters from the saved serialization.
///
/// Serializations for each cluster.
public static void ReconstructKnownCluster(List sers)
{
if (sers == null) return;
foreach (var cs in sers)
{
var clus = cs.Create();
if (clus == null) continue;
string error = clus.Initialize();
if (error != null) continue;
AddKnownCluster(clus);
}
}
///
/// Create a cluster configuration of the specified type.
///
/// Type of cluster.
protected ClusterConfiguration(ClusterConfiguration.ClusterType type)
{
this.TypeOfCluster = type;
}
///
/// Credentials to use for authentication.
///
[NonSerialized]
// ReSharper disable once NotAccessedField.Global
protected NetworkCredential credentials;
///
/// Set the credentials for connecting to this cluster.
///
/// Credentials to use.
public void SetCredential(NetworkCredential credential)
{
this.credentials = credential;
}
///
/// The name of this cluster.
///
public string Name { get; set; }
///
/// The type of the cluster
///
public ClusterType TypeOfCluster { get; protected set; }
///
/// Base directory on all cluster machines.
///
public string DryadInstallDirectory { get; protected set; }
///
/// Default domain for user account to use for connecting to cluster; empty to use the current domain.
///
public string UserDomain { get; protected set; }
///
/// The machine where the metadata for the copied jobs is stored.
///
public string MetaDataMachine { get; protected set; }
///
/// Time zone of the analyzed cluster. We assume that the cluster is in the local time zone.
///
/// Timezone infomation of the cluster
/// Job we are interested in.
public virtual TimeZoneInfo GetClusterTimeZone(DryadLinqJobSummary job)
{
return TimeZoneInfo.Local;
}
///
/// The directory where a specific process is created on the cluster.
///
/// Process identifier
/// Machine where process ran.
/// Home directory containing the process information (not working directory of vertex).
/// Job where the process belongs.
/// True if vertex is terminated.
public abstract IClusterResidentObject ProcessDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job);
///
/// Work directory of a process vertex.
///
/// Vertex guid.
/// Machine where process ran.
/// The path to the work directory of the vertex.
/// Job where the process belongs.
/// True if vertex is terminated.
public abstract IClusterResidentObject ProcessWorkDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job);
///
/// Given an input file identify the process that produced it.
///
/// Input file of a process.
/// Job that contained the process.
/// The identity of the process that produced the file.
// ReSharper disable UnusedParameter.Global
public abstract DryadProcessIdentifier ProcessFromInputFile(IClusterResidentObject input, DryadLinqJobSummary job);
// ReSharper restore UnusedParameter.Global
///
/// File containing standard output of a process.
///
/// Process identifier.
/// Machine where process ran.
/// Job containing process.
/// The pathname to the standard output.
/// True if vertex is terminated.
public virtual IClusterResidentObject ProcessStdoutFile(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
IClusterResidentObject processdir = this.ProcessDirectory(identifier, terminated, machine, job);
IClusterResidentObject file = processdir.GetFile("stdout.txt");
return file;
}
///
/// Create a cluster status for this cluster.
///
/// The proper cluster status.
public abstract ClusterStatus CreateClusterStatus();
static Dictionary KnownClusters;
static ClusterConfiguration()
{
KnownClusters = new Dictionary();
//KnownClusters.Add("Cache", new CacheClusterConfiguration());
//KnownClusters.Add("Local emulation", new LocalEmulator());
//KnownClusters.Add("AzureDfs", new AzureDfsClusterConfiguration());
}
///
/// A known cluster configuration by name.
///
/// Name of the cluster.
/// The cluster configuration for that cluster.
public static ClusterConfiguration KnownClusterByName(string name)
{
if (KnownClusters.ContainsKey(name))
return KnownClusters[name];
return null;
}
///
/// Add a new cluster to the list of known clusters.
///
/// New config to add.
public static void AddKnownCluster(ClusterConfiguration config)
{
if (KnownClusters.ContainsKey(config.Name))
KnownClusters[config.Name] = config;
else
KnownClusters.Add(config.Name, config);
}
///
/// Remove a cluster form the list of known clusters.
///
/// Name of cluster to remove.
/// The removed configuration.
public static ClusterConfiguration RemoveKnownCluster(string name)
{
if (KnownClusters.ContainsKey(name))
{
var config = KnownClusters[name];
KnownClusters.Remove(name);
return config;
}
return null;
}
///
/// Get the list of known clusters.
///
/// A list of cluster names.
public static IEnumerable GetKnownClusterNames()
{
return KnownClusters.Keys;
}
///
/// File containing standard error of a process. Not available on all architectures.
///
/// Process identifier.
/// Machine where process ran.
/// Job containing process.
/// A reference to the standard output.
/// Vertex state.
public virtual IClusterResidentObject ProcessStderrFile(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
return this.ProcessStdoutFile(identifier, terminated, machine, job);
}
///
/// Log directory of a process vertex.
///
/// Vertex guid.
/// The path to the work directory of the vertex.
/// Machine where process ran.
/// Job where the process belongs.
/// Vertex state.
public abstract IClusterResidentObject ProcessLogDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job);
///
/// A shell pattern matching (just the) log files produced by a job manager process.
///
/// Pattern matching the log files.
/// If true, return only the error logs.
/// Job where the JM process belongs.
// ReSharper disable once UnusedParameter.Global
public abstract string JMLogFilesPattern(bool error, DryadLinqJobSummary job);
///
/// A shell pattern matching (just the) log files produced by a vertex process.
///
/// Pattern matching the log files.
/// If true, return only the error logs.
/// Job containing this vertex.
public abstract string VertexLogFilesPattern(bool error, DryadLinqJobSummary job);
///
/// Directory where the perfmon logs are being collected, relative to local machine.
///
/// A directory containing the perfmon logs.
public virtual string PerfmonLogDirectory()
{
return Path.Combine(this.DryadInstallDirectory, "Perfmon");
}
///
/// Create an empty configuration of the specified type.
///
/// Configuration type.
/// The created configuration.
public static ClusterConfiguration CreateConfiguration(ClusterType type)
{
switch (type)
{
case ClusterType.Cache:
return new CacheClusterConfiguration();
case ClusterType.LocalEmulator:
return new LocalEmulator();
case ClusterType.AzureDfs:
return new AzureDfsClusterConfiguration();
case ClusterType.Unknown:
case ClusterType.MaxUnused:
default:
throw new ArgumentOutOfRangeException("type");
}
}
///
/// Convert a GUID printed by the Dryad job manager into a process-id, which is platform dependent.
///
/// Process guid.
/// Process id.\
/// Job where guid is from.
public abstract string ExtractPidFromGuid(string guid, DryadLinqJobSummary job);
///
/// Navigate to a given url and return a stream reader with the corresponding web page.
///
/// Url to navigate to.
/// The web page.
internal virtual StreamReader Navigate(string url)
{
return Utilities.Navigate(url, null);
}
///
/// The file containing the job query plan.
///
/// Job whose plan is sought.
/// An object containing the path, or null if it cannot be found.
public virtual IClusterResidentObject JobQueryPlan(DryadLinqJobSummary job)
{
try
{
IClusterResidentObject dir = this.ProcessWorkDirectory(job.ManagerProcessGuid, true, job.Machine, job); // immutable
var matchingfiles = dir.GetFilesAndFolders("DryadLinqProgram__*.xml").ToList();
if (matchingfiles.Count() != 1)
throw new CalypsoClusterException("Could not find query plan file; got " + matchingfiles.Count() + " possible matches");
IClusterResidentObject result = matchingfiles.First();
result.ShouldCacheLocally = true; // immutable
return result;
}
catch (Exception e)
{
return new UNCFile(e);
}
}
}
///
/// A cached cluster is just a set of files on a filesystem which are saved from old jobs; allows postmortem analysis.
///
public sealed class CacheClusterConfiguration : ClusterConfiguration
{
///
/// Each cached job may originate from a completely different cluster; cache the actual configurations here.
///
Dictionary jobConfig;
///
/// Folder where the information is saved.
///
public string LocalInformationFolder { get; set; }
///
/// Create a fake cluster which stores the information in the specified folder.
///
public CacheClusterConfiguration() : base(ClusterConfiguration.ClusterType.Cache)
{
this.LocalInformationFolder = null;
this.Name = "Cache";
this.jobConfig = new Dictionary();
}
///
/// Make this cluster the active cache.
///
public void StartCaching()
{
string folder = String.IsNullOrEmpty(this.LocalInformationFolder) ? null : this.LocalInformationFolder;
CachedClusterResidentObject.CacheDirectory = folder; // enables caching
}
///
/// Disable caching in this cluster.
///
public void StopCaching()
{
CachedClusterResidentObject.CacheDirectory = null; // disables caching
}
private static List props = new List
{
"LocalInformationFolder"
};
///
/// Properties that can be edited.
///
/// List of properties that can be edited.
public override List GetPropertiesToEdit()
{
return props;
}
///
/// Must be called after setting all properties.
/// Returns null if initialization succeeds, an error otherwise.
///
public override string Initialize()
{
return null;
}
///
/// Create a cluster status for this cluster.
///
/// The proper cluster status.
public override ClusterStatus CreateClusterStatus()
{
var stat = ClusterStatus.LookupStatus(this.Name);
if (stat != null) return stat;
return new CacheClusterStatus(this);
}
///
/// Find the actual cluster configuration corresponding to the specified job.
///
/// Cached job.
/// A configuration that could have been used for the cluster running the job.
public ClusterConfiguration ActualConfig(DryadLinqJobSummary job)
{
if (!this.jobConfig.ContainsKey(job.Cluster))
{
ClusterConfiguration config = ClusterConfiguration.KnownClusterByName(job.Cluster);
if (config == null)
config = ClusterConfiguration.CreateConfiguration(job.ClusterType);
this.jobConfig.Add(job.Cluster, config);
}
return this.jobConfig[job.Cluster];
}
///
/// Object that can be used to access the process directory.
///
/// Process identifier.
/// True if process is terminated.
/// Machine where the process ran.
/// Job that contained the process.
/// An object which can be used to access the process home directory.
public override IClusterResidentObject ProcessDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
ClusterConfiguration config = this.ActualConfig(job);
IClusterResidentObject pd = config.ProcessDirectory(identifier, terminated, machine, job);
if (pd == null) return null;
IClusterResidentObject result = new FolderInCachedCluster(pd as CachedClusterResidentObject);
return result;
}
///
/// Object that can be used to access the process work directory.
///
/// Process identifier.
/// True if process is terminated.
/// Machine where the process ran.
/// Job that contained the process.
/// An object which can be used to access the process work directory.
public override IClusterResidentObject ProcessWorkDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
ClusterConfiguration config = this.ActualConfig(job);
IClusterResidentObject wd = config.ProcessWorkDirectory(identifier, terminated, machine, job);
if (wd == null) return null;
IClusterResidentObject result = new FolderInCachedCluster(wd as CachedClusterResidentObject);
return result;
}
///
/// Time zone of cluster.
///
/// Job we are interested in.
/// The time zome of the cluster.
public override TimeZoneInfo GetClusterTimeZone(DryadLinqJobSummary job)
{
ClusterConfiguration config = this.ActualConfig(job);
return config.GetClusterTimeZone(job);
}
///
/// Given an input file identify the process that produced it.
///
/// Input file of a process.
/// Job that contained the process.
/// The identity of the process that produced the file.
public override DryadProcessIdentifier ProcessFromInputFile(IClusterResidentObject input, DryadLinqJobSummary job)
{
throw new InvalidOperationException();
}
///
/// Object that can be used to access the process log directory.
///
/// Process identifier.
/// True if process is terminated.
/// Machine where the process ran.
/// Job that contained the process.
/// An object which can be used to access the process log directory.
public override IClusterResidentObject ProcessLogDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
ClusterConfiguration config = this.ActualConfig(job);
IClusterResidentObject pld = config.ProcessLogDirectory(identifier, terminated, machine, job);
if (pld == null) return null;
IClusterResidentObject result = new FolderInCachedCluster(pld as CachedClusterResidentObject);
return result;
}
///
/// Pattern which matches the log files.
///
/// If true return only the log files containing errors.
/// A string that can be used to match only log files.
/// Job where process belongs.
public override string JMLogFilesPattern(bool error, DryadLinqJobSummary job)
{
ClusterConfiguration config = this.ActualConfig(job);
return config.JMLogFilesPattern(error, job);
}
///
/// Pattern which matches the log files.
///
/// If true return only the log files containing errors.
/// A string that can be used to match only log files.
/// Job containing the vertex.
public override string VertexLogFilesPattern(bool error, DryadLinqJobSummary job)
{
ClusterConfiguration config = this.ActualConfig(job);
return config.VertexLogFilesPattern(error, job);
}
///
/// File containing standard output of a process.
///
/// Process identifier.
/// Machine where process ran.
/// Job containing process.
/// The pathname to the standard output.
/// True if vertex is terminated.
public override IClusterResidentObject ProcessStdoutFile(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
ClusterConfiguration config = this.ActualConfig(job);
return config.ProcessStdoutFile(identifier, terminated, machine, job);
}
///
/// File containing standard error of a process. Not available on all architectures.
///
/// Process identifier.
/// Machine where process ran.
/// Job containing process.
/// A reference to the standard output.
/// Vertex state.
public override IClusterResidentObject ProcessStderrFile(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
ClusterConfiguration config = this.ActualConfig(job);
return config.ProcessStderrFile(identifier, terminated, machine, job);
}
///
/// Extract the process id from a guid.
///
/// Process guid.
/// Job containing process.
/// The process id.
public override string ExtractPidFromGuid(string guid, DryadLinqJobSummary job)
{
ClusterConfiguration config = this.ActualConfig(job);
return config.ExtractPidFromGuid(guid, job);
}
///
/// The file containing the job query plan.
///
/// Job whose plan is sought.
/// An object containing the path, or null if it cannot be found.
public override IClusterResidentObject JobQueryPlan(DryadLinqJobSummary job)
{
try
{
// we have to handle the xml plan differently; the cached file is renamed to always be "0".
IClusterResidentObject dir = this.ProcessWorkDirectory(job.ManagerProcessGuid, true, job.Machine, job);
if (dir == null) return null;
IClusterResidentObject result = dir.GetFile("DryadLinqProgram__0.xml");
return result;
}
catch (Exception e)
{
return new UNCFile(e);
}
}
}
///
/// A local machine used to run an emulated Yarn cluster.
///
public sealed class LocalEmulator : ClusterConfiguration
{
///
/// Folder where job logs are stored.
///
public string JobsFolder { get; private set; }
///
/// Create a cluster representing the local machine only.
///
public LocalEmulator()
: base(ClusterType.LocalEmulator)
{
string dryadHome = Environment.GetEnvironmentVariable("DRYAD_HOME");
if (string.IsNullOrEmpty(dryadHome))
throw new InvalidOperationException("Environment variable DRYAD_HOME is not set");
this.JobsFolder = Path.Combine(dryadHome, "LocalJobs");
this.Name = "LocalEmulator";
}
private static List props = new List();
///
/// Must be called after setting all properties.
/// Returns null if initialization succeeds, an error otherwise.
///
public override string Initialize()
{
return null;
}
///
/// Properties that can be edited.
///
/// List of properties that can be edited.
public override List GetPropertiesToEdit()
{
return props;
}
///
/// Create a cluster status for this cluster.
///
/// The proper cluster status.
public override ClusterStatus CreateClusterStatus()
{
var stat = ClusterStatus.LookupStatus(this.Name);
if (stat != null) return stat;
return new YarnEmulatedClusterStatus(this);
}
///
/// The directory where a specific process is created on the cluster.
///
/// Process identifier
/// Machine where process ran.
/// Home directory containing the process information (not working directory of vertex).
/// Job where the process belongs.
/// True if vertex is terminated.
public override IClusterResidentObject ProcessDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
if (identifier.ToString() == "jm")
{
// The job manager process is special
return new LocalFile(Utilities.PathCombine(this.JobsFolder, job.ClusterJobId, identifier.ProcessIdentifier, "Process.000.001"));
}
else
{
string folder = Utilities.PathCombine(this.JobsFolder, job.ClusterJobId, "Worker");
return new LocalFile(Path.Combine(folder, identifier.ProcessIdentifier));
}
}
///
/// Given an input file, identify the process that has produced it.
///
/// Input file.
/// Job containing the process.
/// The process that has produced the file.
public override DryadProcessIdentifier ProcessFromInputFile(IClusterResidentObject input, DryadLinqJobSummary job)
{
throw new InvalidOperationException();
}
///
/// File containing standard error of a process. Not available on all architectures.
///
/// Process identifier.
/// Machine where process ran.
/// Job containing process.
/// A reference to the standard output.
/// Vertex state.
public override IClusterResidentObject ProcessStderrFile(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
IClusterResidentObject processdir = this.ProcessDirectory(identifier, terminated, machine, job);
IClusterResidentObject file = processdir.GetFile("stderr.txt");
return file;
}
///
/// The file containing the job query plan.
///
/// Job whose plan is sought.
/// An object containing the path, or null if it cannot be found.
public override IClusterResidentObject JobQueryPlan(DryadLinqJobSummary job)
{
try
{
IClusterResidentObject dir = this.ProcessWorkDirectory(job.ManagerProcessGuid, true, job.Machine, job); // this is missing at this point
//IClusterResidentObject dir = this.ProcessWorkDirectory(new DryadProcessIdentifier("Process.000.001"), true, job.Machine, job);
var matchingfiles = dir.GetFilesAndFolders("DryadLinqProgram__*.xml").ToList();
if (matchingfiles.Count() != 1)
throw new CalypsoClusterException("Could not find query plan file; got " + matchingfiles.Count() + " possible matches");
IClusterResidentObject result = matchingfiles.First();
result.ShouldCacheLocally = true; // immutable
return result;
}
catch (Exception e)
{
return new UNCFile(e);
}
}
///
/// Work directory of a process vertex.
///
/// Vertex guid.
/// Machine where process ran.
/// The path to the work directory of the vertex.
/// Job where the process belongs.
/// True if vertex is terminated.
public override IClusterResidentObject ProcessWorkDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
return this.ProcessDirectory(identifier, terminated, machine, job);
}
///
/// Log directory of a process vertex.
///
/// Vertex guid.
/// The path to the work directory of the vertex.
/// Machine where process ran.
/// True if vertex is terminated.
/// Job where the process belongs.
public override IClusterResidentObject ProcessLogDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
return this.ProcessDirectory(identifier, terminated, machine, job);
}
///
/// A shell pattern matching (just the) log files produced by a job manager process.
///
/// Pattern matching the log files.
/// If true, return only the error logs.
/// Job where process belongs.
public override string JMLogFilesPattern(bool error, DryadLinqJobSummary job)
{
return "stderr.txt";
}
///
/// A shell pattern matching (just the) log files produced by a vertex process.
///
/// Pattern matching the log files.
/// If true, return only the error logs.
/// Job where process belongs.
public override string VertexLogFilesPattern(bool error, DryadLinqJobSummary job)
{
return this.JMLogFilesPattern(error, job);
}
///
/// Convert a GUID printed by the Dryad job manager into a process-id, which is platform dependent.
///
/// Process guid.
/// Process id.
/// Job where process belongs.
public override string ExtractPidFromGuid(string guid, DryadLinqJobSummary job)
{
return guid;
}
}
///
/// Configuration for an AzureDfs cluster.
///
public sealed class AzureDfsClusterConfiguration : ClusterConfiguration
{
///
/// Handle to client to enumerate logs.
///
public AzureDfsClient AzureClient;
///
/// Create a cluster representing the local machine only.
///
public AzureDfsClusterConfiguration()
: base(ClusterType.AzureDfs)
{
}
///
/// Azure account name.
///
public string AccountName { get; set; }
///
/// Azure account key.
///
public string AccountKey { get; set; }
///
/// Azure container.
///
public string Container { get; set; }
///
/// Must be called after setting all properties.
/// Returns true if initialization succeeds.
///
public override string Initialize()
{
try
{
this.AzureClient = new AzureDfsClient(
this.AccountName,
this.AccountKey,
this.Container);
return null;
}
catch (Exception ex)
{
Console.WriteLine(ex);
return ex.Message;
}
}
///
/// The directory where a specific process is created on the cluster.
///
/// Process identifier
/// Machine where process ran.
/// Home directory containing the process information (not working directory of vertex).
/// Job where the process belongs.
/// True if vertex is terminated.
public override IClusterResidentObject ProcessDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
if (identifier.ToString() == "jm")
{
// The job manager process is special
var result = new AzureDfsFile(this, job, this.AzureClient, job.ClusterJobId, terminated, true);
return result;
}
// vertices not supported
return null;
}
///
/// Create a cluster status for this cluster.
///
/// The proper cluster status.
public override ClusterStatus CreateClusterStatus()
{
var stat = ClusterStatus.LookupStatus(this.Name);
if (stat != null) return stat;
return new AzureDfsClusterStatus(this);
}
///
/// Work directory of a process vertex.
///
/// Vertex guid.
/// Machine where process ran.
/// The path to the work directory of the vertex.
/// Job where the process belongs.
/// True if vertex is terminated.
public override IClusterResidentObject ProcessWorkDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
return this.ProcessDirectory(identifier, terminated, machine, job);
}
///
/// Given an input file identify the process that produced it.
///
/// Input file of a process.
/// Job that contained the process.
/// The identity of the process that produced the file.
// ReSharper disable UnusedParameter.Global
public override DryadProcessIdentifier ProcessFromInputFile(IClusterResidentObject input, DryadLinqJobSummary job)
{
return null;
}
// ReSharper restore UnusedParameter.Global
///
/// File containing standard output of a process.
///
/// Process identifier.
/// Machine where process ran.
/// Job containing process.
/// The pathname to the standard output.
/// True if vertex is terminated.
public override IClusterResidentObject ProcessStdoutFile(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
if (identifier.ToString() == "jm")
{
IClusterResidentObject processdir = this.ProcessDirectory(identifier, terminated, machine, job);
IClusterResidentObject file = processdir.GetFile("calypso.log");
return file;
}
// vertices not supported
return null;
}
///
/// Log directory of a process vertex.
///
/// Vertex guid.
/// The path to the work directory of the vertex.
/// Machine where process ran.
/// Job where the process belongs.
/// Vertex state.
public override IClusterResidentObject ProcessLogDirectory(DryadProcessIdentifier identifier, bool terminated, string machine, DryadLinqJobSummary job)
{
return this.ProcessDirectory(identifier, terminated, machine, job);
}
///
/// A shell pattern matching (just the) log files produced by a job manager process.
///
/// Pattern matching the log files.
/// If true, return only the error logs.
/// Job where the JM process belongs.
// ReSharper disable once UnusedParameter.Global
public override string JMLogFilesPattern(bool error, DryadLinqJobSummary job)
{
return "*.log";
}
///
/// A shell pattern matching (just the) log files produced by a vertex process.
///
/// Pattern matching the log files.
/// If true, return only the error logs.
/// Job containing this vertex.
public override string VertexLogFilesPattern(bool error, DryadLinqJobSummary job)
{
return "*.log";
}
///
/// Convert a GUID printed by the Dryad job manager into a process-id, which is platform dependent.
///
/// Process guid.
/// Process id.\
/// Job where guid is from.
public override string ExtractPidFromGuid(string guid, DryadLinqJobSummary job)
{
return guid;
}
///
/// The file containing the job query plan.
///
/// Job whose plan is sought.
/// An object containing the path, or null if it cannot be found.
public override IClusterResidentObject JobQueryPlan(DryadLinqJobSummary job)
{
try
{
IClusterResidentObject dir = this.ProcessWorkDirectory(job.ManagerProcessGuid, true, job.Machine, job); // immutable
var matchingfiles = dir.GetFilesAndFolders("DryadLinqProgram__*.xml").ToList();
if (matchingfiles.Count() != 1)
throw new CalypsoClusterException("Could not find query plan file; got " + matchingfiles.Count() + " possible matches");
IClusterResidentObject result = matchingfiles.First();
(result as AzureDfsFile).IsDfsStream = true;
result.ShouldCacheLocally = true; // immutable
return result;
}
catch (Exception e)
{
return new UNCFile(e);
}
}
private static List props = new List
{
"AccountName",
"AccountKey",
"Container"
};
///
/// Properties that can be edited.
///
/// List of properties that can be edited.
public override List GetPropertiesToEdit()
{
return props;
}
}
}