/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ using System; using System.Collections.Generic; using System.IO; using System.Linq; using Microsoft.Research.Peloponnese.Storage; using Microsoft.Research.Tools; namespace Microsoft.Research.JobObjectModel { /// /// Dynamic information of all the jobs and machines in a cluster. /// public abstract class ClusterStatus { /// /// The static configuration of the cluster /// public ClusterConfiguration Config { get; protected set; } /// /// Cache here the collection of cluster jobs on the cluster; index is cluster job guid. /// protected Dictionary clusterJobs; /// /// For each machine the pod it belongs to. /// protected Dictionary pods; /// /// Create a cluster status. /// /// Cluster configuration. protected ClusterStatus(ClusterConfiguration config) { this.Config = config; this.Initialize(); if (ClusterStatuses.ContainsKey(config.Name)) ClusterStatuses[config.Name] = this; else ClusterStatuses.Add(config.Name, this); } /// /// Cache each for each cluster its status. /// static Dictionary ClusterStatuses = new Dictionary(); /// /// See if a status is already cached. /// /// Name of cluster. /// The cached status. public static ClusterStatus LookupStatus(string clusterName) { if (ClusterStatuses.ContainsKey(clusterName)) return ClusterStatuses[clusterName]; return null; } /// /// Initialize the cluster status. /// private void Initialize() { this.pods = null; } /// /// The cluster map: for each machine the pod it resides in. /// /// An iterator over the cluster map. public IEnumerable> Pods() { if (this.pods == null) { this.pods = this.QueryClusterTopology(); } return this.pods.AsEnumerable(); } /// /// Discover the way machines are organized into racks. /// protected abstract Dictionary QueryClusterTopology(); /// /// Generate the list of machines which may be in the cluster. /// /// An iterator returning all machines in the cluster. public IEnumerable GetClusterMachines() { if (this.pods == null) this.pods = this.QueryClusterTopology(); return this.pods.Select(m => m.Key); } /// /// The cached of tasks on the cluster. /// /// Virtual cluster selected; defined only for Scope clusters. /// Communication manager. public IEnumerable GetClusterJobList(string virtualCluster, CommManager manager) { this.RecomputeClusterJobList(virtualCluster, manager); return this.clusterJobs.Values.ToList(); } /// /// Force the recomputation of the cluster job list. /// /// Virtual cluster to use (defined only for some cluster types). /// Communication manager. // ReSharper disable once UnusedParameter.Global protected abstract void RecomputeClusterJobList(string virtualCluster, CommManager manager); /// /// Discover the (unique) dryadlinq job corresponding to a cluster job. /// /// Cluster Job. /// The job description. /// Delegate used to report errors. public abstract DryadLinqJobSummary DiscoverDryadLinqJobFromClusterJob(ClusterJobInformation clusterJob, StatusReporter reporter); /// /// Discover the dryadlinq job given a url from the cluster scheduler. /// (Does not make sense for some cluster architectures, e.g., HPC.) /// /// URL pointing to the job. /// The dryadlinq job summary. /// Delegate used to report errors. // ReSharper disable UnusedParameter.Global public abstract DryadLinqJobSummary DiscoverDryadLinqJobFromURL(string url, StatusReporter reporter); // ReSharper restore UnusedParameter.Global /// /// Discover a cluster job given its id. /// /// Job to discover. /// The cluster job, or null if not found. /// Communication manager. public virtual ClusterJobInformation DiscoverClusterJob(DryadLinqJobSummary job, CommManager manager) { if (this.clusterJobs == null) this.RecomputeClusterJobList(job.VirtualCluster, manager); return this.clusterJobs[job.ClusterJobId]; } /// /// Refresh the job summary status. /// /// Summary to refresh. /// Communication manager. public virtual void RefreshStatus(DryadLinqJobSummary summary, CommManager manager) { // refresh the whole list this.RecomputeClusterJobList(summary.VirtualCluster, manager); ClusterJobInformation info = this.DiscoverClusterJob(summary, manager); if (info == null) { summary.Status = ClusterJobInformation.ClusterJobStatus.Unknown; return; } summary.Status = info.Status; } /// /// Cancel the specified job. /// /// Job whose execution is cancelled. /// True if the cancellation succeeded. public abstract bool CancelJob(DryadLinqJobSummary job); } /// /// A fake cluster keeps some information about past jobs on a local filesystem, to allow post-mortem debugging. /// public class CacheClusterStatus : ClusterStatus { /// /// Create a fake cluster status. /// /// Configuration to use for this cluster. public CacheClusterStatus(ClusterConfiguration config) : base(config) { if (!(config is CacheClusterConfiguration)) throw new ArgumentException("Expected configuration to be for a cache cluster"); } /// /// Not implemented; return an empty topology. /// /// protected override Dictionary QueryClusterTopology() { return new Dictionary(); } /// /// Recompute the list of jobs on the cluster and add them to the clusterJobs field. /// /// Unused. /// Communication manager. protected override void RecomputeClusterJobList(string virtualCluster, CommManager manager) { this.clusterJobs = new Dictionary(); if (string.IsNullOrEmpty(CachedClusterResidentObject.CacheDirectory)) return; string joblist = Path.Combine(CachedClusterResidentObject.CacheDirectory, "jobs"); if (!Directory.Exists(joblist)) Directory.CreateDirectory(joblist); string[] files = Directory.GetFiles(joblist, "*.xml"); foreach (var file in files) { manager.Token.ThrowIfCancellationRequested(); DryadLinqJobSummary job = Utilities.LoadXml(file); string cjid = job.Cluster + "-" + job.ClusterJobId; // there may be two jobs with same id from different clusters ClusterJobInformation ci = new ClusterJobInformation(this.Config.Name, job.Cluster, cjid, job.Name, job.User, job.Date, job.EndTime - job.Date, job.Status); ci.SetAssociatedSummary(job); if (this.clusterJobs.ContainsKey(cjid)) { manager.Status("Duplicate job id, cannot insert in cache " + job.AsIdentifyingString(), StatusKind.Error); continue; } this.clusterJobs.Add(cjid, ci); } manager.Progress(100); } /// /// Refresh the job summary status. /// /// Summary to refresh. /// Communication manager. public override void RefreshStatus(DryadLinqJobSummary job, CommManager manager) { ClusterConfiguration actual = (this.Config as CacheClusterConfiguration).ActualConfig(job); ClusterStatus actualStatus = actual.CreateClusterStatus(); actualStatus.RefreshStatus(job, manager); ClusterJobInformation info = actualStatus.DiscoverClusterJob(job, manager); if (info == null) { job.Status = ClusterJobInformation.ClusterJobStatus.Unknown; return; } job.Status = info.Status; } /// /// Not needed, all summaries are already known. /// /// Cluster job information. /// Delegate used to report errors. /// Throws an exception. public override DryadLinqJobSummary DiscoverDryadLinqJobFromClusterJob(ClusterJobInformation clusterJob, StatusReporter reporter) { throw new InvalidOperationException(); } /// /// This functionality is not available for cached jobs. /// /// Job url. /// Delegate used to report errors. /// Throws an exception. public override DryadLinqJobSummary DiscoverDryadLinqJobFromURL(string url, StatusReporter reporter) { throw new InvalidOperationException(); } /// /// Not needed, all summaries are already known. /// /// Cluster job. /// Throws an exception. /// Communication manager. public override ClusterJobInformation DiscoverClusterJob(DryadLinqJobSummary job, CommManager manager) { ClusterConfiguration actual = (this.Config as CacheClusterConfiguration).ActualConfig(job); ClusterStatus actualStatus = actual.CreateClusterStatus(); return actualStatus.DiscoverClusterJob(job, manager); } /// /// Not needed, these jobs are no longer running. /// /// Job to cancel. /// Throws an exception. public override bool CancelJob(DryadLinqJobSummary job) { throw new InvalidOperationException(); } } #region IDisposable Members #endregion /// /// Status of a cluster comprised only of the local machine. /// public class YarnEmulatedClusterStatus : ClusterStatus { private LocalEmulator config; /// /// Create a cluster containing just the local machine. /// /// Configuration for the local machine. public YarnEmulatedClusterStatus(ClusterConfiguration config) : base(config) { if (!(config is LocalEmulator)) throw new ArgumentException("Expected a LocalMachineConfiguration, got a " + config.GetType()); this.config = config as LocalEmulator; } /// /// Discover the way machines are organized into racks. /// protected override Dictionary QueryClusterTopology() { var result = new Dictionary(); result.Add("localhost", "localhost"); return result; } /// /// Force the recomputation of the cluster job list. /// /// Virtual cluster to use (defined only for some cluster types). /// Communication manager. protected override void RecomputeClusterJobList(string virtualCluster, CommManager manager) { this.clusterJobs = new Dictionary(); if (!Directory.Exists(this.config.JobsFolder)) return; string[] subfolders = Directory.GetDirectories(this.config.JobsFolder); int done = 0; foreach (var job in subfolders) { manager.Token.ThrowIfCancellationRequested(); string jobId = Path.GetFileName(job); ClusterJobInformation info = this.GetJobInfo(job, jobId); if (info != null) { // ReSharper disable once AssignNullToNotNullAttribute this.clusterJobs.Add(jobId, info); } manager.Progress(done++ *100/subfolders.Length); } manager.Progress(100); } /// /// Discover the (unique) dryadlinq job corresponding to a cluster job. /// /// Cluster Job. /// The job description. /// Delegate used to report errors. public override DryadLinqJobSummary DiscoverDryadLinqJobFromClusterJob(ClusterJobInformation clusterJob, StatusReporter reporter) { DryadLinqJobSummary result = new DryadLinqJobSummary( clusterJob.Cluster, this.config.TypeOfCluster, "", // virtual cluster "", // machine clusterJob.ClusterJobID, // jobId clusterJob.ClusterJobID, // clusterJobId new DryadProcessIdentifier("jm"), // jmProcessGuid clusterJob.Name, clusterJob.User, clusterJob.Date, clusterJob.Date + clusterJob.EstimatedRunningTime, clusterJob.Status); return result; } /// /// Discover the dryadlinq job given a url from the cluster scheduler. /// (Does not make sense for some cluster architectures, e.g., HPC.) /// /// URL pointing to the job. /// The dryadlinq job summary. /// Delegate used to report errors. public override DryadLinqJobSummary DiscoverDryadLinqJobFromURL(string url, StatusReporter reporter) { throw new InvalidOperationException(); } /// /// Extract the job information from a folder with logs on the local machine. /// /// Folder with logs for the specified job. /// The job information, or null if not found. /// Job id. private ClusterJobInformation GetJobInfo(string jobRootFolder, string jobId) { string jmFolder = Path.Combine(jobRootFolder, "jm"); if (!Directory.Exists(jmFolder)) return null; var date = File.GetCreationTime(jmFolder); ClusterJobInformation info = new ClusterJobInformation(this.config.Name, "", jobId, jobId, Environment.UserName, date, TimeSpan.Zero, ClusterJobInformation.ClusterJobStatus.Unknown); return info; } /// /// Cancel the specified job. /// /// Job whose execution is cancelled. /// True if the cancellation succeeded. public override bool CancelJob(DryadLinqJobSummary job) { throw new InvalidOperationException(); } } /// /// Status of an Azure DFS cluster. /// public class AzureDfsClusterStatus : ClusterStatus { private AzureDfsClusterConfiguration config; /// /// Create a cluster containing just the local machine. /// /// Configuration for the local machine. public AzureDfsClusterStatus(ClusterConfiguration config) : base(config) { if (!(config is AzureDfsClusterConfiguration)) throw new ArgumentException("Expected a AzureYarnClusterConfiguration, got a " + config.GetType()); this.config = config as AzureDfsClusterConfiguration; } /// /// Discover the way machines are organized into racks. /// protected override Dictionary QueryClusterTopology() { var result = new Dictionary(); result.Add("localhost", "localhost"); return result; } /// /// Force the recomputation of the cluster job list. /// /// Virtual cluster to use (defined only for some cluster types). /// Communication manager. protected override void RecomputeClusterJobList(string virtualCluster, CommManager manager) { this.clusterJobs = new Dictionary(); var jobs = this.config.AzureClient.EnumerateDirectory("").ToList(); int done = 0; foreach (var job in jobs) { manager.Token.ThrowIfCancellationRequested(); ClusterJobInformation info = this.GetJobInfo(job); if (info != null) { // ReSharper disable once AssignNullToNotNullAttribute this.clusterJobs.Add(job, info); } manager.Progress(100*done++/jobs.Count); } manager.Progress(100); } /// /// Discover the (unique) dryadlinq job corresponding to a cluster job. /// /// Cluster Job. /// The job description. /// Delegate used to report errors. public override DryadLinqJobSummary DiscoverDryadLinqJobFromClusterJob(ClusterJobInformation clusterJob, StatusReporter reporter) { DryadLinqJobSummary result = new DryadLinqJobSummary( clusterJob.Cluster, this.config.TypeOfCluster, "", // virtual cluster "", // machine clusterJob.ClusterJobID, // jobId clusterJob.ClusterJobID, // clusterJobId new DryadProcessIdentifier("jm"), // jmProcessGuid clusterJob.Name, clusterJob.User, clusterJob.Date, clusterJob.Date + clusterJob.EstimatedRunningTime, clusterJob.Status); return result; } /// /// Discover the dryadlinq job given a url from the cluster scheduler. /// (Does not make sense for some cluster architectures, e.g., HPC.) /// /// URL pointing to the job. /// The dryadlinq job summary. /// Delegate used to report errors. public override DryadLinqJobSummary DiscoverDryadLinqJobFromURL(string url, StatusReporter reporter) { throw new InvalidOperationException(); } /// /// Extract the job information from a folder with logs on the local machine. /// /// Folder with logs for the specified job. /// The job information, or null if not found. private ClusterJobInformation GetJobInfo(string jobRootFolder) { DateTime date = DateTime.MinValue; DateTime lastHeartBeat = DateTime.MinValue; ClusterJobInformation.ClusterJobStatus status = ClusterJobInformation.ClusterJobStatus.Unknown; bool found = false; string jobName = jobRootFolder; var jobsFolders = this.config.AzureClient.EnumerateDirectory(jobRootFolder).ToList(); foreach (var file in jobsFolders) { if (file.EndsWith("heartbeat")) { var blob = this.config.AzureClient.Container.GetPageBlobReference(file); blob.FetchAttributes(); var props = blob.Metadata; if (props.ContainsKey("status")) { var st = props["status"]; switch (st) { case "failure": status = ClusterJobInformation.ClusterJobStatus.Failed; break; case "success": status = ClusterJobInformation.ClusterJobStatus.Succeeded; break; case "running": status = ClusterJobInformation.ClusterJobStatus.Running; break; case "killed": status = ClusterJobInformation.ClusterJobStatus.Cancelled; break; default: Console.WriteLine("Unknown status " + st); break; } } if (props.ContainsKey("heartbeat")) { var hb = props["heartbeat"]; if (DateTime.TryParse(hb, out lastHeartBeat)) { lastHeartBeat = lastHeartBeat.ToLocalTime(); if (status == ClusterJobInformation.ClusterJobStatus.Running && DateTime.Now - lastHeartBeat > TimeSpan.FromSeconds(40)) // job has in fact crashed status = ClusterJobInformation.ClusterJobStatus.Failed; } } if (props.ContainsKey("jobname")) { jobName = props["jobname"]; } if (props.ContainsKey("starttime")) { var t = props["starttime"]; if (DateTime.TryParse(t, out date)) date = date.ToLocalTime(); } found = true; } else if (file.Contains("DryadLinqProgram__") && // newer heartbeats contain the date date != DateTime.MinValue) { var blob = this.config.AzureClient.Container.GetBlockBlobReference(file); blob.FetchAttributes(); var props = blob.Properties; if (props.LastModified.HasValue) { date = props.LastModified.Value.DateTime; date = date.ToLocalTime(); } } } if (!found) return null; TimeSpan running = TimeSpan.Zero; if (date != DateTime.MinValue && lastHeartBeat != DateTime.MinValue) running = lastHeartBeat - date; var info = new ClusterJobInformation(this.config.Name, "", jobRootFolder, jobName, Environment.UserName, date, running, status); return info; } /// /// Refresh the job summary status. /// /// Summary to refresh. /// Communication manager. public override void RefreshStatus(DryadLinqJobSummary summary, CommManager manager) { // refresh the whole list ClusterJobInformation info = this.GetJobInfo(summary.JobID); if (info == null) { summary.Status = ClusterJobInformation.ClusterJobStatus.Unknown; return; } summary.Status = info.Status; } /// /// Cancel the specified job. /// /// Job whose execution is cancelled. /// True if the cancellation succeeded. public override bool CancelJob(DryadLinqJobSummary job) { AzureUtils.KillJob(this.config.AccountName, this.config.AccountKey, this.config.Container, job.ClusterJobId); return true; } } }