/*
Copyright (c) Microsoft Corporation
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License
at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF
TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions and
limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using Microsoft.Research.Peloponnese.Storage;
using Microsoft.Research.Tools;
namespace Microsoft.Research.JobObjectModel
{
///
/// Dynamic information of all the jobs and machines in a cluster.
///
public abstract class ClusterStatus
{
///
/// The static configuration of the cluster
///
public ClusterConfiguration Config { get; protected set; }
///
/// Cache here the collection of cluster jobs on the cluster; index is cluster job guid.
///
protected Dictionary clusterJobs;
///
/// For each machine the pod it belongs to.
///
protected Dictionary pods;
///
/// Create a cluster status.
///
/// Cluster configuration.
protected ClusterStatus(ClusterConfiguration config)
{
this.Config = config;
this.Initialize();
if (ClusterStatuses.ContainsKey(config.Name))
ClusterStatuses[config.Name] = this;
else
ClusterStatuses.Add(config.Name, this);
}
///
/// Cache each for each cluster its status.
///
static Dictionary ClusterStatuses = new Dictionary();
///
/// See if a status is already cached.
///
/// Name of cluster.
/// The cached status.
public static ClusterStatus LookupStatus(string clusterName)
{
if (ClusterStatuses.ContainsKey(clusterName))
return ClusterStatuses[clusterName];
return null;
}
///
/// Initialize the cluster status.
///
private void Initialize()
{
this.pods = null;
}
///
/// The cluster map: for each machine the pod it resides in.
///
/// An iterator over the cluster map.
public IEnumerable> Pods()
{
if (this.pods == null)
{
this.pods = this.QueryClusterTopology();
}
return this.pods.AsEnumerable();
}
///
/// Discover the way machines are organized into racks.
///
protected abstract Dictionary QueryClusterTopology();
///
/// Generate the list of machines which may be in the cluster.
///
/// An iterator returning all machines in the cluster.
public IEnumerable GetClusterMachines()
{
if (this.pods == null)
this.pods = this.QueryClusterTopology();
return this.pods.Select(m => m.Key);
}
///
/// The cached of tasks on the cluster.
///
/// Virtual cluster selected; defined only for Scope clusters.
/// Communication manager.
public IEnumerable GetClusterJobList(string virtualCluster, CommManager manager)
{
this.RecomputeClusterJobList(virtualCluster, manager);
return this.clusterJobs.Values.ToList();
}
///
/// Force the recomputation of the cluster job list.
///
/// Virtual cluster to use (defined only for some cluster types).
/// Communication manager.
// ReSharper disable once UnusedParameter.Global
protected abstract void RecomputeClusterJobList(string virtualCluster, CommManager manager);
///
/// Discover the (unique) dryadlinq job corresponding to a cluster job.
///
/// Cluster Job.
/// The job description.
/// Delegate used to report errors.
public abstract DryadLinqJobSummary DiscoverDryadLinqJobFromClusterJob(ClusterJobInformation clusterJob, StatusReporter reporter);
///
/// Discover the dryadlinq job given a url from the cluster scheduler.
/// (Does not make sense for some cluster architectures, e.g., HPC.)
///
/// URL pointing to the job.
/// The dryadlinq job summary.
/// Delegate used to report errors.
// ReSharper disable UnusedParameter.Global
public abstract DryadLinqJobSummary DiscoverDryadLinqJobFromURL(string url, StatusReporter reporter);
// ReSharper restore UnusedParameter.Global
///
/// Discover a cluster job given its id.
///
/// Job to discover.
/// The cluster job, or null if not found.
/// Communication manager.
public virtual ClusterJobInformation DiscoverClusterJob(DryadLinqJobSummary job, CommManager manager)
{
if (this.clusterJobs == null)
this.RecomputeClusterJobList(job.VirtualCluster, manager);
return this.clusterJobs[job.ClusterJobId];
}
///
/// Refresh the job summary status.
///
/// Summary to refresh.
/// Communication manager.
public virtual void RefreshStatus(DryadLinqJobSummary summary, CommManager manager)
{
// refresh the whole list
this.RecomputeClusterJobList(summary.VirtualCluster, manager);
ClusterJobInformation info = this.DiscoverClusterJob(summary, manager);
if (info == null)
{
summary.Status = ClusterJobInformation.ClusterJobStatus.Unknown;
return;
}
summary.Status = info.Status;
}
///
/// Cancel the specified job.
///
/// Job whose execution is cancelled.
/// True if the cancellation succeeded.
public abstract bool CancelJob(DryadLinqJobSummary job);
}
///
/// A fake cluster keeps some information about past jobs on a local filesystem, to allow post-mortem debugging.
///
public class CacheClusterStatus : ClusterStatus
{
///
/// Create a fake cluster status.
///
/// Configuration to use for this cluster.
public CacheClusterStatus(ClusterConfiguration config)
: base(config)
{
if (!(config is CacheClusterConfiguration))
throw new ArgumentException("Expected configuration to be for a cache cluster");
}
///
/// Not implemented; return an empty topology.
///
///
protected override Dictionary QueryClusterTopology()
{
return new Dictionary();
}
///
/// Recompute the list of jobs on the cluster and add them to the clusterJobs field.
///
/// Unused.
/// Communication manager.
protected override void RecomputeClusterJobList(string virtualCluster, CommManager manager)
{
this.clusterJobs = new Dictionary();
if (string.IsNullOrEmpty(CachedClusterResidentObject.CacheDirectory))
return;
string joblist = Path.Combine(CachedClusterResidentObject.CacheDirectory, "jobs");
if (!Directory.Exists(joblist))
Directory.CreateDirectory(joblist);
string[] files = Directory.GetFiles(joblist, "*.xml");
foreach (var file in files)
{
manager.Token.ThrowIfCancellationRequested();
DryadLinqJobSummary job = Utilities.LoadXml(file);
string cjid = job.Cluster + "-" + job.ClusterJobId; // there may be two jobs with same id from different clusters
ClusterJobInformation ci = new ClusterJobInformation(this.Config.Name, job.Cluster, cjid, job.Name, job.User, job.Date, job.EndTime - job.Date, job.Status);
ci.SetAssociatedSummary(job);
if (this.clusterJobs.ContainsKey(cjid))
{
manager.Status("Duplicate job id, cannot insert in cache " + job.AsIdentifyingString(), StatusKind.Error);
continue;
}
this.clusterJobs.Add(cjid, ci);
}
manager.Progress(100);
}
///
/// Refresh the job summary status.
///
/// Summary to refresh.
/// Communication manager.
public override void RefreshStatus(DryadLinqJobSummary job, CommManager manager)
{
ClusterConfiguration actual = (this.Config as CacheClusterConfiguration).ActualConfig(job);
ClusterStatus actualStatus = actual.CreateClusterStatus();
actualStatus.RefreshStatus(job, manager);
ClusterJobInformation info = actualStatus.DiscoverClusterJob(job, manager);
if (info == null)
{
job.Status = ClusterJobInformation.ClusterJobStatus.Unknown;
return;
}
job.Status = info.Status;
}
///
/// Not needed, all summaries are already known.
///
/// Cluster job information.
/// Delegate used to report errors.
/// Throws an exception.
public override DryadLinqJobSummary DiscoverDryadLinqJobFromClusterJob(ClusterJobInformation clusterJob, StatusReporter reporter)
{
throw new InvalidOperationException();
}
///
/// This functionality is not available for cached jobs.
///
/// Job url.
/// Delegate used to report errors.
/// Throws an exception.
public override DryadLinqJobSummary DiscoverDryadLinqJobFromURL(string url, StatusReporter reporter)
{
throw new InvalidOperationException();
}
///
/// Not needed, all summaries are already known.
///
/// Cluster job.
/// Throws an exception.
/// Communication manager.
public override ClusterJobInformation DiscoverClusterJob(DryadLinqJobSummary job, CommManager manager)
{
ClusterConfiguration actual = (this.Config as CacheClusterConfiguration).ActualConfig(job);
ClusterStatus actualStatus = actual.CreateClusterStatus();
return actualStatus.DiscoverClusterJob(job, manager);
}
///
/// Not needed, these jobs are no longer running.
///
/// Job to cancel.
/// Throws an exception.
public override bool CancelJob(DryadLinqJobSummary job)
{
throw new InvalidOperationException();
}
}
#region IDisposable Members
#endregion
///
/// Status of a cluster comprised only of the local machine.
///
public class YarnEmulatedClusterStatus : ClusterStatus
{
private LocalEmulator config;
///
/// Create a cluster containing just the local machine.
///
/// Configuration for the local machine.
public YarnEmulatedClusterStatus(ClusterConfiguration config)
: base(config)
{
if (!(config is LocalEmulator))
throw new ArgumentException("Expected a LocalMachineConfiguration, got a " + config.GetType());
this.config = config as LocalEmulator;
}
///
/// Discover the way machines are organized into racks.
///
protected override Dictionary QueryClusterTopology()
{
var result = new Dictionary();
result.Add("localhost", "localhost");
return result;
}
///
/// Force the recomputation of the cluster job list.
///
/// Virtual cluster to use (defined only for some cluster types).
/// Communication manager.
protected override void RecomputeClusterJobList(string virtualCluster, CommManager manager)
{
this.clusterJobs = new Dictionary();
if (!Directory.Exists(this.config.JobsFolder))
return;
string[] subfolders = Directory.GetDirectories(this.config.JobsFolder);
int done = 0;
foreach (var job in subfolders)
{
manager.Token.ThrowIfCancellationRequested();
string jobId = Path.GetFileName(job);
ClusterJobInformation info = this.GetJobInfo(job, jobId);
if (info != null)
{
// ReSharper disable once AssignNullToNotNullAttribute
this.clusterJobs.Add(jobId, info);
}
manager.Progress(done++ *100/subfolders.Length);
}
manager.Progress(100);
}
///
/// Discover the (unique) dryadlinq job corresponding to a cluster job.
///
/// Cluster Job.
/// The job description.
/// Delegate used to report errors.
public override DryadLinqJobSummary DiscoverDryadLinqJobFromClusterJob(ClusterJobInformation clusterJob, StatusReporter reporter)
{
DryadLinqJobSummary result = new DryadLinqJobSummary(
clusterJob.Cluster,
this.config.TypeOfCluster,
"", // virtual cluster
"", // machine
clusterJob.ClusterJobID, // jobId
clusterJob.ClusterJobID, // clusterJobId
new DryadProcessIdentifier("jm"), // jmProcessGuid
clusterJob.Name,
clusterJob.User,
clusterJob.Date,
clusterJob.Date + clusterJob.EstimatedRunningTime,
clusterJob.Status);
return result;
}
///
/// Discover the dryadlinq job given a url from the cluster scheduler.
/// (Does not make sense for some cluster architectures, e.g., HPC.)
///
/// URL pointing to the job.
/// The dryadlinq job summary.
/// Delegate used to report errors.
public override DryadLinqJobSummary DiscoverDryadLinqJobFromURL(string url, StatusReporter reporter)
{
throw new InvalidOperationException();
}
///
/// Extract the job information from a folder with logs on the local machine.
///
/// Folder with logs for the specified job.
/// The job information, or null if not found.
/// Job id.
private ClusterJobInformation GetJobInfo(string jobRootFolder, string jobId)
{
string jmFolder = Path.Combine(jobRootFolder, "jm");
if (!Directory.Exists(jmFolder)) return null;
var date = File.GetCreationTime(jmFolder);
ClusterJobInformation info = new ClusterJobInformation(this.config.Name, "", jobId, jobId, Environment.UserName, date, TimeSpan.Zero, ClusterJobInformation.ClusterJobStatus.Unknown);
return info;
}
///
/// Cancel the specified job.
///
/// Job whose execution is cancelled.
/// True if the cancellation succeeded.
public override bool CancelJob(DryadLinqJobSummary job)
{
throw new InvalidOperationException();
}
}
///
/// Status of an Azure DFS cluster.
///
public class AzureDfsClusterStatus : ClusterStatus
{
private AzureDfsClusterConfiguration config;
///
/// Create a cluster containing just the local machine.
///
/// Configuration for the local machine.
public AzureDfsClusterStatus(ClusterConfiguration config)
: base(config)
{
if (!(config is AzureDfsClusterConfiguration))
throw new ArgumentException("Expected a AzureYarnClusterConfiguration, got a " + config.GetType());
this.config = config as AzureDfsClusterConfiguration;
}
///
/// Discover the way machines are organized into racks.
///
protected override Dictionary QueryClusterTopology()
{
var result = new Dictionary();
result.Add("localhost", "localhost");
return result;
}
///
/// Force the recomputation of the cluster job list.
///
/// Virtual cluster to use (defined only for some cluster types).
/// Communication manager.
protected override void RecomputeClusterJobList(string virtualCluster, CommManager manager)
{
this.clusterJobs = new Dictionary();
var jobs = this.config.AzureClient.EnumerateDirectory("").ToList();
int done = 0;
foreach (var job in jobs)
{
manager.Token.ThrowIfCancellationRequested();
ClusterJobInformation info = this.GetJobInfo(job);
if (info != null)
{
// ReSharper disable once AssignNullToNotNullAttribute
this.clusterJobs.Add(job, info);
}
manager.Progress(100*done++/jobs.Count);
}
manager.Progress(100);
}
///
/// Discover the (unique) dryadlinq job corresponding to a cluster job.
///
/// Cluster Job.
/// The job description.
/// Delegate used to report errors.
public override DryadLinqJobSummary DiscoverDryadLinqJobFromClusterJob(ClusterJobInformation clusterJob, StatusReporter reporter)
{
DryadLinqJobSummary result = new DryadLinqJobSummary(
clusterJob.Cluster,
this.config.TypeOfCluster,
"", // virtual cluster
"", // machine
clusterJob.ClusterJobID, // jobId
clusterJob.ClusterJobID, // clusterJobId
new DryadProcessIdentifier("jm"), // jmProcessGuid
clusterJob.Name,
clusterJob.User,
clusterJob.Date,
clusterJob.Date + clusterJob.EstimatedRunningTime,
clusterJob.Status);
return result;
}
///
/// Discover the dryadlinq job given a url from the cluster scheduler.
/// (Does not make sense for some cluster architectures, e.g., HPC.)
///
/// URL pointing to the job.
/// The dryadlinq job summary.
/// Delegate used to report errors.
public override DryadLinqJobSummary DiscoverDryadLinqJobFromURL(string url, StatusReporter reporter)
{
throw new InvalidOperationException();
}
///
/// Extract the job information from a folder with logs on the local machine.
///
/// Folder with logs for the specified job.
/// The job information, or null if not found.
private ClusterJobInformation GetJobInfo(string jobRootFolder)
{
DateTime date = DateTime.MinValue;
DateTime lastHeartBeat = DateTime.MinValue;
ClusterJobInformation.ClusterJobStatus status = ClusterJobInformation.ClusterJobStatus.Unknown;
bool found = false;
string jobName = jobRootFolder;
var jobsFolders = this.config.AzureClient.EnumerateDirectory(jobRootFolder).ToList();
foreach (var file in jobsFolders)
{
if (file.EndsWith("heartbeat"))
{
var blob = this.config.AzureClient.Container.GetPageBlobReference(file);
blob.FetchAttributes();
var props = blob.Metadata;
if (props.ContainsKey("status"))
{
var st = props["status"];
switch (st)
{
case "failure":
status = ClusterJobInformation.ClusterJobStatus.Failed;
break;
case "success":
status = ClusterJobInformation.ClusterJobStatus.Succeeded;
break;
case "running":
status = ClusterJobInformation.ClusterJobStatus.Running;
break;
case "killed":
status = ClusterJobInformation.ClusterJobStatus.Cancelled;
break;
default:
Console.WriteLine("Unknown status " + st);
break;
}
}
if (props.ContainsKey("heartbeat"))
{
var hb = props["heartbeat"];
if (DateTime.TryParse(hb, out lastHeartBeat))
{
lastHeartBeat = lastHeartBeat.ToLocalTime();
if (status == ClusterJobInformation.ClusterJobStatus.Running &&
DateTime.Now - lastHeartBeat > TimeSpan.FromSeconds(40))
// job has in fact crashed
status = ClusterJobInformation.ClusterJobStatus.Failed;
}
}
if (props.ContainsKey("jobname"))
{
jobName = props["jobname"];
}
if (props.ContainsKey("starttime"))
{
var t = props["starttime"];
if (DateTime.TryParse(t, out date))
date = date.ToLocalTime();
}
found = true;
}
else if (file.Contains("DryadLinqProgram__") &&
// newer heartbeats contain the date
date != DateTime.MinValue)
{
var blob = this.config.AzureClient.Container.GetBlockBlobReference(file);
blob.FetchAttributes();
var props = blob.Properties;
if (props.LastModified.HasValue)
{
date = props.LastModified.Value.DateTime;
date = date.ToLocalTime();
}
}
}
if (!found) return null;
TimeSpan running = TimeSpan.Zero;
if (date != DateTime.MinValue && lastHeartBeat != DateTime.MinValue)
running = lastHeartBeat - date;
var info = new ClusterJobInformation(this.config.Name, "", jobRootFolder, jobName, Environment.UserName, date, running, status);
return info;
}
///
/// Refresh the job summary status.
///
/// Summary to refresh.
/// Communication manager.
public override void RefreshStatus(DryadLinqJobSummary summary, CommManager manager)
{
// refresh the whole list
ClusterJobInformation info = this.GetJobInfo(summary.JobID);
if (info == null)
{
summary.Status = ClusterJobInformation.ClusterJobStatus.Unknown;
return;
}
summary.Status = info.Status;
}
///
/// Cancel the specified job.
///
/// Job whose execution is cancelled.
/// True if the cancellation succeeded.
public override bool CancelJob(DryadLinqJobSummary job)
{
AzureUtils.KillJob(this.config.AccountName, this.config.AccountKey, this.config.Container, job.ClusterJobId);
return true;
}
}
}