/*
Copyright (c) Microsoft Corporation
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License
at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF
TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions and
limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text.RegularExpressions;
using System.Xml.Linq;
using System.Diagnostics;
using Microsoft.Research.Tools;
namespace Microsoft.Research.JobObjectModel
{
///
/// Exception thrown when we cannot understand the structure of a Dryad/DryadLINQ job.
///
public class DryadException : Exception
{
///
/// Create a new DryadException.
///
/// Message conveyed by the exception.
public DryadException(string message) : base(message) { }
}
///
/// Classes providing a parsing from string routine.
///
public interface IParse
{
///
/// Parse one line.
///
/// Line to parse.
void Parse(string line);
}
///
/// One stage (a set of vertices) in a DryadLINQ job.
///
public class DryadLinqJobStage
{
///
/// Number of vertices defined in the static plan; 0 if unknown.
/// This field must be set explicitly, it is not computed by the constructor, since the information is not available in the set of vertices.
///
public int StaticVertexCount { get; set; }
///
/// Stage name.
///
public string Name { get; protected set; }
///
/// List of vertices in the stage.
///
readonly IEnumerable vertices;
///
/// Number of executed vertices; does not include abandoned vertices.
///
public int TotalInitiatedVertices { get; protected set; }
///
/// Created but not yet started.
///
public int CreatedVertices { get; protected set; }
///
/// Vertices that have started (may still be running).
///
public int StartedVertices { get; protected set; }
///
/// Vertices that have completed and then have been cancelled.
///
public int InvalidatedVertices { get; protected set; }
///
/// Number of failed vertices.
///
public int FailedVertices { get; protected set; }
///
/// Vertices that have completed successfully.
///
public int SuccessfulVertices { get; protected set; }
///
/// Number of cancelled vertices.
///
public int CancelledVertices { get; protected set; }
///
/// Number of vertices abandoned before running.
///
public int AbandonedVertices { get; protected set; }
///
/// Number of vertices cancelled by the remote scheduler.
///
public int RevokedVertices { get; protected set; }
///
/// How long has this stage been running?
///
public TimeSpan RunningTime { get { return this.EndTime - this.StartTime; } }
///
/// Time when first vertex in stage started.
///
public DateTime StartTime { get; protected set; }
///
/// Time when last vertex in stage finished.
///
public DateTime EndTime { get; protected set; }
///
/// Amount of data read (-1 if unknown).
///
public long DataRead { get; protected set; }
///
/// Amount of data written (-1 if unknown).
///
public long DataWritten { get; protected set; }
///
/// Information about the vertices executed in this stage.
///
public IEnumerable Vertices { get { return this.vertices; } }
///
/// Create a DryadLinqJobStage from a set of vertices.
///
/// Name of stage.
/// Set of vertices contained in the stage.
public DryadLinqJobStage(string stagename, List vertices)
{
this.StaticVertexCount = 0; // not yet known
this.DataRead = vertices.Select(v => v.DataRead).Sum();
this.DataWritten = vertices.Select(v => v.DataWritten).Sum();
if (this.DataRead < 0)
this.DataRead = -1;
if (this.DataWritten < 0)
this.DataWritten = -1;
this.vertices = vertices;
this.Name = stagename;
this.AbandonedVertices =
this.FailedVertices =
this.CancelledVertices =
this.StartedVertices =
this.SuccessfulVertices =
this.CreatedVertices =
this.InvalidatedVertices =
this.RevokedVertices =
this.TotalInitiatedVertices = 0;
foreach (var vertex in this.vertices)
{
this.TotalInitiatedVertices++;
switch (vertex.State)
{
case ExecutedVertexInstance.VertexState.Revoked:
this.RevokedVertices++;
break;
case ExecutedVertexInstance.VertexState.Abandoned:
this.AbandonedVertices++;
break;
case ExecutedVertexInstance.VertexState.Invalidated:
this.InvalidatedVertices++;
break;
case ExecutedVertexInstance.VertexState.Cancelled:
this.CancelledVertices++;
break;
case ExecutedVertexInstance.VertexState.Failed:
this.FailedVertices++;
break;
case ExecutedVertexInstance.VertexState.Created:
this.CreatedVertices++;
break;
case ExecutedVertexInstance.VertexState.Started:
this.StartedVertices++;
break;
case ExecutedVertexInstance.VertexState.Successful:
this.SuccessfulVertices++;
break;
default:
throw new DryadException("Unexpected vertex state " + vertex.State);
}
}
this.TotalInitiatedVertices -= this.AbandonedVertices;
if (this.TotalInitiatedVertices == 0)
return;
List runVertices = this.vertices.Where(v => v.Start != DateTime.MinValue).ToList();
if (runVertices.Count > 0) this.StartTime = runVertices.Select(v => v.Start).Min();
this.EndTime = this.vertices.Select(v => v.End).Max();
}
}
///
/// This class contains all the information about a Dryad job.
///
public class DryadLinqJobInfo
{
///
/// A summary of the job.
///
DryadLinqJobSummary jobSummary;
///
/// The start time of the job manager.
///
public DateTime StartJMTime {
get
{
return this.ManagerVertex.Start;
}
}
///
/// When was the job state last updated?
///
public DateTime LastUpdatetime { get; private set; }
///
/// The end time of the job.
///
public DateTime EndTime {
get
{
return this.ManagerVertex.End;
}
}
///
/// How long has the job been running?
///
public TimeSpan RunningTime {
get
{
return this.ManagerVertex.RunningTime;
}
}
///
/// All the vertices for the job.
///
JobVertices jobVertices;
///
/// When parsing stdout save here last vertex with failure, to attach additional
/// error messages to it.
///
ExecutedVertexInstance lastFailedVertex;
///
/// The path to the stdout of the job manager
///
IClusterResidentObject stdoutpath;
///
/// The name of the Job
///
public string JobName { get { return this.Summary.Name; } }
///
/// Error code of the dryadlinq job.
///
public string ErrorCode { get; set; }
///
/// The job manager vertex for this job.
///
public ExecutedVertexInstance ManagerVertex { get; set; }
///
/// Is the standard output complete or truncated?
///
public bool ManagerStdoutIncomplete { get; protected set; }
///
/// True if the information to create this jobinfo is no longer available.
///
public bool JobInfoCannotBeCollected { get; protected set; }
///
/// Number of stages that have started execution.
///
public int ExecutedStageCount { get { return this.jobVertices.ExecutedStageCount; } }
///
/// Total data read by job.
///
public long TotalDataRead { get; protected set; }
///
/// Data read intra-pod.
///
public long IntraPodDataRead { get; protected set; }
///
/// Data read cross pod.
///
public long CrossPodDataRead { get; protected set; }
///
/// Data read from the same machine.
///
public long LocalReadData { get; protected set; }
///
/// Approximate timing information, used for vertices which have not terminated yet.
///
private DateTime lastTimestampSeen;
///
/// The vertices in this job
///
public IEnumerable Vertices
{
get { return this.jobVertices.AllVertices(); }
}
///
/// The summary of this job
///
public DryadLinqJobSummary Summary
{
get { return this.jobSummary; }
}
///
/// Message returned by job manager when job aborts.
///
public string AbortingMsg { get; private set; }
///
/// The cluster where the job information resides.
///
public ClusterConfiguration ClusterConfiguration { get; protected set; }
///
/// Original cluster configuration (the config can be just "cache").
///
// ReSharper disable once UnusedAutoPropertyAccessor.Local
private ClusterConfiguration OriginalClusterConfiguration { get; set; }
///
/// Regular expression for parsing a stdout line with vertex statistics.
///
private readonly static Regex vertexStartRegex, vertexCreatedRegex, processCreatingRegex, timingInfoRegex, terminationRegex, verticesCreatedRegex, ioRegex,
terminatedRegex, vertexAbandonedRegex, failedRegex, cancelRegex, datareadRegex, inputFailureRegex, setToFailedlRegex, revokedRegex;
///
/// Useful CPU time.
///
public TimeSpan UsefulCPUTime { get; protected set; }
///
/// Time spent in failed vertices.
///
public TimeSpan WastedCPUTime { get; protected set; }
///
/// Average degree of parallelism.
///
public double AverageParallelism {
get
{
// ReSharper disable CompareOfFloatsByEqualityOperator
if (this.RunningTime.TotalSeconds != 0)
// ReSharper restore CompareOfFloatsByEqualityOperator
return this.UsefulCPUTime.TotalSeconds / this.RunningTime.TotalSeconds;
else
return 0;
}
}
///
/// Number of executed vertices.
///
public int ExecutedVertexCount { get { return this.jobVertices.Count; } }
///
/// Compile a bunch of constant regular expressions.
///
static DryadLinqJobInfo()
{
// optional guid regular expression
const string optGuidRegex = @"GUID \{?([-a-fA-F0-9]+)\}";
// Abandoning duplicate scheduling/execution of vertex 83.1 (InputTable__26[5])
vertexAbandonedRegex = new Regex(@"Abandoning duplicate \w+ of vertex (\d+)\.(\d+) \((.+)\)", RegexOptions.Compiled);
// Created process execution record for vertex 33 (Super__0[0]) v.0 GUID {B0FC788F-1FFC-4D74-AFC4-3EDFF03AF11A}
vertexCreatedRegex = new Regex(@"Created process execution record for vertex (\d+) \((.*)\) v.(\d+) " + optGuidRegex,
RegexOptions.Compiled);
// Creating process for vertex 2945 (Merge__17[440]) v.0 GUID {DDC9BB35-25D9-48A9-98C6-9EC7753FFB3B} machine sherwood-022
processCreatingRegex = new Regex(@"Creating process for vertex (\d+) \((.*)\) v.(\d+) " + optGuidRegex + @" machine (\w+)", RegexOptions.Compiled);
// Created process execution record for vertices 192 (Merge__41[0]) 223 (Union__45[0]) v.0 GUID {0297A91C-FFEA-42EA-94AF-CD0163A04D45}
verticesCreatedRegex = new Regex(@"Created process execution record for vertices (.*) v.(\d+) " + optGuidRegex,
RegexOptions.Compiled);
// Process started for vertex 5 (Super__0[1]) v.0 GUID {73EA55E0-0326-43C4-AD61-CB0B8CF8FE49} machine sherwood-025
// Process started for vertices 23 (Merge__29) 24 (Apply__33) v.0 GUID {E945DC5D-9AF6-4732-8770-2A6BF7FA3041} machine sherwood-237
vertexStartRegex = new Regex(@"Process started for vert(\w+) (.*) v\.(.*) " + optGuidRegex + @" machine (\S+)",
RegexOptions.Compiled);
// Timing Information 5 1 Super__0[1] 128654556602334453 0.0000 0.0000 0.0000 0.0000 0.2969
timingInfoRegex = new Regex(@"Timing Information (\d+) (\d+) (.+) (\d+) ([-.0-9]+) ([-.0-9]+) ([-.0-9]+) ([-.0-9]+) ([-.0-9]+)",
RegexOptions.Compiled);
// Vertex 5.0 (Super__0[1]) machine sherwood-025 guid {73EA55E0-0326-43C4-AD61-CB0B8CF8FE49} status Vertex Has Completed,
terminationRegex = new Regex(@"Vertex (\d+)\.(\d+) \((.+)\) machine (\S+) guid \{?([-a-fA-F0-9]+)\}? status (.*)",
RegexOptions.Compiled);
// Process was terminated Vertex 11.0 (Select__6[1]) GUID {C1E35A88-F5AD-4A26-BE5F-46B6D515623F} machine sherwood-118 status The operation succeeded
terminatedRegex = new Regex(@"Process was terminated Vertex (\d+)\.(\d+) \((.+)\) " + optGuidRegex + @" machine (\S+) status (.*)",
RegexOptions.Compiled);
// Process has failed Vertex 11.0 (Select__6[1]) GUID {C1E35A88-F5AD-4A26-BE5F-46B6D515623F} machine sherwood-118 Exitcode status The operation succeeded
failedRegex = new Regex(@"Process has failed Vertex (\d+)\.(\d+) \((.+)\) " + optGuidRegex + @" machine (\S+) Exitcode (.*)",
RegexOptions.Compiled);
// Canceling vertex 1461.0 (Merge__13[258]) due to dependent failure
cancelRegex = new Regex(@"Canceling vertex (\d+)\.(\d+) \((.+)\) due to (.*)", RegexOptions.Compiled);
// Setting vertex 1461.0 (Merge__13[258]) to failed
setToFailedlRegex = new Regex(@"Setting vertex (\d+)\.(\d+) \((.+)\) to failed(.*)", RegexOptions.Compiled);
// total=951722563162 local=37817665237 intrapod=189765117248 crosspod=724139780677
datareadRegex = new Regex(@"total=(\d+) local=(\d+) intrapod=(\d+) crosspod=(\d+)", RegexOptions.Compiled);
// Input vertex %u (%s) had %u read failure%s\n
inputFailureRegex = new Regex(@"Input vertex (\d+) \(.*\) had (\d+) read failure", RegexOptions.Compiled);
// Io information 23 1 Super__4[5] read 7106 wrote 933
ioRegex = new Regex(@"Io information (\d+) (\d+) (.+) read (\d+) wrote (\d+)");
// Cancellations by Quincy
revokedRegex = new Regex(@"Process was revoked by remote scheduler Old " + optGuidRegex + @" New " + optGuidRegex);
}
///
/// Create information about a job run on the cluster.
///
/// Cluster configuration.
/// Summary description of the job.
/// The Dryad job description, or null.
/// If true, fill all the information, otherwise the user will have to call FillInformation on the result later.
/// Communication manager.
public static DryadLinqJobInfo CreateDryadLinqJobInfo(ClusterConfiguration cf, DryadLinqJobSummary summary, bool fill, CommManager manager)
{
try
{
DryadLinqJobInfo job = new DryadLinqJobInfo(cf, summary);
if (fill)
job.CollectEssentialInformation(manager);
return job;
}
catch (Exception e)
{
Trace.TraceInformation(e.ToString());
manager.Status("Could not collect job information for " + summary.Name + ": " + e.Message, StatusKind.Error);
return null;
}
}
///
/// Read the information about a job which ran the JM on the cluster
///
/// Configuration of the cluster.
/// Summary of the job.
protected DryadLinqJobInfo(ClusterConfiguration cf, DryadLinqJobSummary summary)
{
this.JobInfoCannotBeCollected = true;
this.ClusterConfiguration = cf;
if (cf is CacheClusterConfiguration)
this.OriginalClusterConfiguration = (cf as CacheClusterConfiguration).ActualConfig(summary);
else
this.OriginalClusterConfiguration = cf;
this.Initialize(summary);
}
///
/// Initialize a job info.
///
/// Job to summarize.
private void Initialize(DryadLinqJobSummary summary)
{
this.UsefulCPUTime = TimeSpan.Zero;
this.WastedCPUTime = TimeSpan.Zero;
this.LastUpdatetime = DateTime.Now;
this.ManagerStdoutIncomplete = true; // until we've seen the end
this.ManagerVertex = null;
this.jobSummary = summary;
this.ErrorCode = "";
this.AbortingMsg = "";
this.cachedStages = new Dictionary();
this.jobVertices = new JobVertices();
bool terminated = ClusterJobInformation.JobIsFinished(summary.Status);
IClusterResidentObject managerstdoutfile = this.ClusterConfiguration.ProcessStdoutFile(summary.ManagerProcessGuid, terminated, summary.Machine, summary);
if (this.ClusterConfiguration is CacheClusterConfiguration)
this.stdoutpath = managerstdoutfile;
else
{
IClusterResidentObject jmdir = this.ClusterConfiguration.ProcessDirectory(summary.ManagerProcessGuid, terminated, summary.Machine, summary);
if (this.stdoutpath == null)
{
string filename = managerstdoutfile.Name;
//this.stdoutpath = jmdir.GetFile("stdout.txt");
// do this by scanning the folder; this can give additional information about the file size on some platforms
IEnumerable files = jmdir.GetFilesAndFolders(filename);
foreach (var f in files)
{
if (f.Exception != null)
{
throw f.Exception;
}
if (f.RepresentsAFolder)
continue;
// there should be exactly one match
this.stdoutpath = f;
break;
}
if (this.stdoutpath == null)
{
throw new ClusterException("Could not locate JM standard output file in folder " + jmdir);
}
}
}
}
///
/// Refresh the job status.
///
/// Communication manager.
public void RefreshJobStatus(CommManager manager)
{
// skip if job is finished
if (this.Summary.Status == ClusterJobInformation.ClusterJobStatus.Failed ||
this.Summary.Status == ClusterJobInformation.ClusterJobStatus.Cancelled ||
this.Summary.Status == ClusterJobInformation.ClusterJobStatus.Succeeded)
return;
ClusterStatus status = this.ClusterConfiguration.CreateClusterStatus();
status.RefreshStatus(this.Summary, manager);
}
///
/// Fill the job info by parsing the stdout.txt.
/// The updated job.
/// Communication manager.
///
public bool CollectEssentialInformation(CommManager manager)
{
this.RefreshJobStatus(manager);
if (this.ManagerVertex == null)
{
this.ManagerVertex = new ExecutedVertexInstance(this, -1, 0, "JobManager", "", this.Summary.Date);
this.ManagerVertex.IsManager = true;
this.ManagerVertex.SetStartInformation(this, this.Summary.Machine, this.Summary.Date, this.Summary.ManagerProcessGuid, "");
this.ManagerVertex.StartCommandTime = this.ManagerVertex.CreationTime = this.ManagerVertex.VertexScheduleTime = this.Summary.Date;
ExecutedVertexInstance.VertexState jmstate = ExecutedVertexInstance.VertexState.Started;
switch (this.Summary.Status)
{
case ClusterJobInformation.ClusterJobStatus.Failed:
jmstate = ExecutedVertexInstance.VertexState.Failed;
break;
/*
case ClusterJobInformation.ClusterJobStatus.Succeeded:
jmstate = ExecutedVertexInstance.VertexState.Successful;
break;
*/
}
this.ManagerVertex.SetState(jmstate);
this.jobVertices.Add(this.ManagerVertex);
}
if (this.stdoutpath == null)
return false;
bool success = this.ParseStdout(this.stdoutpath, manager);
manager.Progress(100);
if (!success)
return false;
this.JobInfoCannotBeCollected = false;
manager.Status("Stdout parsed", StatusKind.OK);
this.LastUpdatetime = DateTime.Now;
if (this.Summary.Status == ClusterJobInformation.ClusterJobStatus.Running)
{
foreach (var vertex in this.Vertices.Where(v => v.State == ExecutedVertexInstance.VertexState.Started))
vertex.MarkVertexWasRunning(this.LastUpdatetime);
this.ManagerVertex.MarkVertexWasRunning(this.LastUpdatetime);
}
else if (this.jobSummary.Status == ClusterJobInformation.ClusterJobStatus.Failed)
{
if (this.ManagerVertex.State == ExecutedVertexInstance.VertexState.Started)
this.ManagerVertex.SetState(ExecutedVertexInstance.VertexState.Failed);
foreach (var vertex in this.Vertices.Where(v => v.State == ExecutedVertexInstance.VertexState.Started))
vertex.MarkVertexWasRunning(this.ManagerVertex.End);
}
return true;
}
///
/// Given a list of vertex (number \(name\))* pairs return the numbers.
///
/// A string of shape (number \(name\))*.
/// The list of name-number pairs.
private static IEnumerable> ParseVertices(string vertexlist)
{
Regex numberre = new Regex(@"(\d+) (.*)");
while (vertexlist.Length > 0)
{
Match m = numberre.Match(vertexlist);
if (!m.Success)
throw new DryadException("Could not find vertex number in " + vertexlist);
string number = m.Groups[1].Value;
// now scan a balanced number of parantheses
string rest = m.Groups[2].Value;
if (rest[0] != '(')
throw new DryadException("Expecting open parens after vertex number");
int opened = 0;
int i;
for (i = 0; i < rest.Length; i++)
{
if (rest[i] == '(')
opened++;
else if (rest[i] == ')')
{
opened--;
if (opened == 0)
{
i++;
break;
}
}
}
if (opened != 0 || i <= 2)
throw new DryadException("did not find matched parantheses in vertex name in " + vertexlist + ", can't parse");
string name = rest.Substring(1, i - 2); // skip first and last paranthesis
yield return new Tuple(name, int.Parse(number));
vertexlist = rest.Substring(i);
}
}
///
/// In new versions of L2H some lines start with a timestamp. Parse this timestamp.
///
/// Line that may start with [timestamp].
/// The timestamp at the beginning of the line, or DateTime.MinValue if none.
static DateTime ParseLineTimestamp(string line)
{
int square = line.IndexOf(']');
DateTime result = DateTime.MinValue;
if (line.StartsWith("[") && square >= 1)
{
string datetime = line.Substring(1, square-1);
DateTime.TryParse(datetime, out result);
}
return result;
}
///
/// Try to read a numeric value from a dictionary at a specific key.
///
/// Dictionary containing key-value pairs.
/// Key we are interested in.
/// The numeric value with that key, or 0 if some error occurs.
private long TryGetNumeric(Dictionary dict, string key)
{
if (!dict.ContainsKey(key)) return 0;
long result;
if (long.TryParse(dict[key], out result))
return result;
return 0;
}
///
/// Sometimes the StreamReader can read a partial line only.
/// We cache here the previously read line and if it looks like the current line is just a fragment, then we concatentate it
/// to the previous line. The line is a fragment if it does not start with 'logtimelocal'.
///
private string previousLine = "";
///
/// New JM stdout parsing code, for YARN-based DryadLINQ.
///
/// Line to parse.
/// False if the line terminated in a quoted string and has to be combined with the next line.
private bool ParseStdoutLineNew(string line)
{
if (string.IsNullOrWhiteSpace(line)) return true;
// The line should start with logtimelocal, otherwise it's probably a fragment of the previous line
this.previousLine += line;
if (!(line.EndsWith("\r") || line.EndsWith("\n")))
{
// line is incomplete, return now, parse later
return true;
}
line = this.previousLine.Trim();
this.previousLine = "";
Dictionary kvp = Utilities.ParseCSVKVP(line);
if (kvp == null) return false;
var strTs = kvp["logtimelocal"];
int cutOff = strTs.IndexOf("UTC");
if (cutOff >= 0)
{
strTs = strTs.Substring(0, cutOff);
}
DateTime timeStamp = DateTime.Parse(strTs, CultureInfo.InvariantCulture);
timeStamp = timeStamp.ToLocalTime();
this.lastTimestampSeen = timeStamp;
if (kvp.ContainsKey("job"))
{
string operation = kvp["job"];
switch (operation)
{
case "start":
this.ManagerVertex.SetStartInformation(this, this.Summary.Machine, timeStamp, this.Summary.ManagerProcessGuid, "");
this.ManagerVertex.StartCommandTime = this.ManagerVertex.CreationTime = this.ManagerVertex.VertexScheduleTime = timeStamp;
break;
case "stop":
this.ManagerVertex.End = timeStamp;
string exitcode;
if (kvp.TryGetValue("exitcode", out exitcode))
{
this.ErrorCode = exitcode;
int numCode = Convert.ToInt32(exitcode, 16);
if (numCode == 0)
{
this.ManagerVertex.SetState(ExecutedVertexInstance.VertexState.Successful);
}
else
{
this.ManagerVertex.SetState(ExecutedVertexInstance.VertexState.Failed);
}
}
string errorstring;
if (kvp.TryGetValue("errorstring", out errorstring))
{
this.ManagerVertex.AddErrorString(errorstring);
this.AbortingMsg = errorstring;
}
break;
}
}
else if (kvp.ContainsKey("vertex"))
{
string vertex = kvp["vertex"];
int number;
int version;
int dot = vertex.IndexOf('.');
if (dot < 0)
{
number = int.Parse(vertex);
version = int.Parse(kvp["version"]);
}
else
{
number = int.Parse(vertex.Substring(0, dot));
version = int.Parse(vertex.Substring(dot + 1));
}
if (kvp.ContainsKey("transition"))
{
string transition = kvp["transition"];
switch (transition)
{
case "created":
{
string name = kvp["name"];
ExecutedVertexInstance vi = new ExecutedVertexInstance(this, number, version, name, "", timeStamp);
this.jobVertices.Add(vi);
}
break;
case "starting":
{
// not doing anything
break;
}
case "running":
{
string process;
kvp.TryGetValue("id", out process);
if (process == null)
kvp.TryGetValue("process", out process);
string machine = kvp["computer"];
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
this.jobVertices.Remap(vi, process);
string pid = this.ClusterConfiguration.ExtractPidFromGuid(process, this.Summary);
DryadProcessIdentifier identifier = new DryadProcessIdentifier(pid);
vi.SetStartInformation(this, machine, timeStamp, identifier, process);
}
break;
case "completed":
{
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
vi.SetState(ExecutedVertexInstance.VertexState.Successful);
vi.End = timeStamp;
vi.ExitCode = "";
this.UsefulCPUTime += vi.RunningTime;
break;
}
case "failed":
{
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (vi.State != ExecutedVertexInstance.VertexState.Started)
vi.SetState(ExecutedVertexInstance.VertexState.Cancelled);
else
{
vi.SetState(ExecutedVertexInstance.VertexState.Failed);
if (vi.RunningTime > TimeSpan.Zero)
this.WastedCPUTime += vi.RunningTime;
}
if (kvp.ContainsKey("errorstring"))
vi.AddErrorString(kvp["errorstring"]);
string exitcode;
if (kvp.TryGetValue("errorcode", out exitcode))
vi.ExitCode = exitcode;
vi.End = timeStamp;
break;
}
}
}
else if (kvp.ContainsKey("outputChannel"))
{
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (kvp.ContainsKey("errorstring"))
vi.AddErrorString(kvp["errorstring"]);
}
else if (kvp.ContainsKey("inputChannel"))
{
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (kvp.ContainsKey("errorstring"))
vi.AddErrorString(kvp["errorstring"]);
}
else if (kvp.ContainsKey("io"))
{
if (kvp["io"] == "starting")
{
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
int numberOfInputs = (int) TryGetNumeric(kvp, "numberOfInputs");
int numberOfOutputs = (int)TryGetNumeric(kvp, "numberOfOutputs");
if (vi.InputChannels == null)
vi.InputChannels = new Dictionary();
for (int i = 0; i < numberOfInputs; i++)
{
string uri;
if (kvp.TryGetValue("uriIn." + i, out uri))
{
var ched = new ChannelEndpointDescription(false, i, uri, 0);
vi.InputChannels[i] = ched;
}
}
if (vi.OutputChannels == null)
vi.OutputChannels = new Dictionary();
for (int i = 0; i < numberOfOutputs; i++)
{
string uri;
if (kvp.TryGetValue("uriOut." + i, out uri))
{
var ched = new ChannelEndpointDescription(false, i, uri, 0);
vi.OutputChannels[i] = ched;
}
}
}
else if (kvp["io"] == "total")
{
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
long totalRead = TryGetNumeric(kvp, "totalRead");
//long tempRead = TryGetNumeric(kvp, "tempRead");
long tempReadInRack = TryGetNumeric(kvp, "tempReadInRack");
long tempReadCrossRack = TryGetNumeric(kvp, "tempReadCrossRack");
long localRead = TryGetNumeric(kvp, "localRead");
long totalWritten = TryGetNumeric(kvp, "totalWritten");
vi.DataRead = totalRead;
vi.DataWritten = totalWritten;
if (vi.InputChannels != null)
{
foreach (int ch in vi.InputChannels.Keys)
{
long bytes = TryGetNumeric(kvp, "rb." + ch);
vi.InputChannels[ch].Size = bytes;
}
}
if (vi.OutputChannels != null)
{
foreach (int ch in vi.OutputChannels.Keys)
{
long bytes = TryGetNumeric(kvp, "wb." + ch);
vi.OutputChannels[ch].Size = bytes;
}
}
this.TotalDataRead += totalRead;
this.LocalReadData += localRead;
this.CrossPodDataRead += tempReadCrossRack;
this.IntraPodDataRead += tempReadInRack;
}
else if (kvp["io"] == "running")
{
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (vi.InputChannels != null)
{
foreach (int ch in vi.InputChannels.Keys)
{
long bytes = TryGetNumeric(kvp, "rb." + ch);
vi.InputChannels[ch].Size = bytes;
bytes = TryGetNumeric(kvp, "tb." + ch);
vi.InputChannels[ch].TotalSize = bytes;
}
}
if (vi.InputChannels != null)
{
foreach (int ch in vi.OutputChannels.Keys)
{
long bytes = TryGetNumeric(kvp, "wb." + ch);
vi.OutputChannels[ch].Size = bytes;
}
}
long totalRead = TryGetNumeric(kvp, "totalRead");
long totalWritten = TryGetNumeric(kvp, "totalWritten");
vi.DataRead = totalRead;
vi.DataWritten = totalWritten;
}
}
}
return true;
}
///
/// Parse one line from the JM standard output.
///
/// The line to parse.
private void ParseStdoutLine(string line)
{
DateTime lineTimeStamp = DateTime.MinValue;
if (line.Contains("Created process execution record"))
{
Match m = vertexCreatedRegex.Match(line);
if (m.Success)
{
lineTimeStamp = ParseLineTimestamp(line);
// Created process execution record for vertex (\d+) \((.*)\) v.(\d+) GUID \{?([-A-F0-9]+)\}?
int number = Int32.Parse(m.Groups[1].Value);
string name = m.Groups[2].Value;
int version = Int32.Parse(m.Groups[3].Value);
string guid = m.Groups[4].Value; // on some platforms, e.g. HPC, this identifier is not yet assigned properly
// the vertex may be already there, sometimes numbers are reused...
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (vi == null)
{
vi = new ExecutedVertexInstance(this, number, version, name, guid, lineTimeStamp);
this.jobVertices.Add(vi);
}
}
else
{
m = verticesCreatedRegex.Match(line);
if (m.Success)
{
lineTimeStamp = ParseLineTimestamp(line);
// Created process execution record for vertices (.*) v.(\d+) GUID \{?([-A-F0-9]+)\}?
// Created process execution record for vertices 192 (Merge__41[0]) 223 (Union__45[0]) v.0 GUID {0297A91C-FFEA-42EA-94AF-CD0163A04D45}
int version = Int32.Parse(m.Groups[2].Value);
string vertices = m.Groups[1].Value;
string guid = m.Groups[3].Value; // on some platforms, e.g. HPC, this identifier is not yet assigned properly
IEnumerable> vertexList = DryadLinqJobInfo.ParseVertices(vertices);
foreach (var p in vertexList)
{
int number = p.Item2;
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (vi == null)
{
vi = new ExecutedVertexInstance(this, number, version, p.Item1, guid, lineTimeStamp);
this.jobVertices.Add(vi);
}
}
}
}
}
else if (line.StartsWith("Creating process"))
{
Match m = processCreatingRegex.Match(line);
if (m.Success)
{
lineTimeStamp = ParseLineTimestamp(line);
// Creating process for vertex (\d+) \((.*)\\) v.(\d+) GUID \{?([-A-F0-9]+)\}? machine (\w+)
int number = Int32.Parse(m.Groups[1].Value);
//string name = m.Groups[2].Value;
int version = Int32.Parse(m.Groups[3].Value);
string guid = m.Groups[4].Value;
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (vi != null)
{
this.jobVertices.Remap(vi, guid);
}
}
}
else if (line.StartsWith("Process was revoked"))
{
Match m = revokedRegex.Match(line);
if (m.Success)
{
string oldGuid = m.Groups[1].Value;
ExecutedVertexInstance vi = this.jobVertices.FindVertexByGuid(oldGuid);
if (vi != null)
{
vi.SetState(ExecutedVertexInstance.VertexState.Revoked);
string newGuid = m.Groups[2].Value;
this.jobVertices.Remap(vi, newGuid);
}
else
{
Trace.TraceInformation("Could not find revoked vertex with guid " + oldGuid);
}
}
}
else if (line.StartsWith("---HiPriTime"))
{
// Scope-specific line which we use to get the i/o information
// ---HiPriTime D7D51A1F-6693-4378-95FD-FC778A67C632,F52CA694-0202-411E-85E9-0C883E770A0E,SV4_Extract_Split[0],Completed,ch1sch010331112,2011-05-03 15:26:01.681 PDT,2011-05-03 15:26:01.696 PDT,2011-05-03 15:26:02.118 PDT,2011-05-03 15:26:04.286 PDT,2011-05-03 15:26:07.656 PDT,2011-05-03 15:26:01.696 PDT,97390825,1498630
string info = line.Substring(13);
string[] parts = info.Split(',');
if (parts.Length >= 13)
{
long read = long.Parse(parts[11]);
long written = long.Parse(parts[12]);
string guid = parts[1];
ExecutedVertexInstance vi = this.jobVertices.FindVertexByGuid(guid);
if (vi != null)
{
vi.DataRead = read;
vi.DataWritten = written;
this.TotalDataRead += read;
}
}
}
else if (line.Contains("Io information"))
{
// HPC-specific line
Match m = ioRegex.Match(line);
if (m.Success)
{
int number = Int32.Parse(m.Groups[1].Value);
int version = Int32.Parse(m.Groups[2].Value);
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (vi != null)
{
vi.DataRead = long.Parse(m.Groups[4].Value);
vi.DataWritten = long.Parse(m.Groups[5].Value);
this.TotalDataRead += vi.DataRead;
}
}
}
else if (line.Contains("Process started"))
{
//those vertices which are being canceled may not be here
Match m = vertexStartRegex.Match(line);
if (m.Success)
{
lineTimeStamp = ParseLineTimestamp(line);
string version = m.Groups[3].Value;
string guid = m.Groups[4].Value;
string pid = this.ClusterConfiguration.ExtractPidFromGuid(guid, this.Summary);
DryadProcessIdentifier identifier = new DryadProcessIdentifier(pid);
string machine = m.Groups[5].Value;
// Process started for vertex 4 (Super__0[0]) v.0 GUID {9DDD0B00-C93F-46D2-9073-1CFD27829300} machine sherwood-255
// Process started for vertices 23 (Merge__29) 24 (Apply__33) v.0 GUID {E945DC5D-9AF6-4732-8770-2A6BF7FA3041} machine sherwood-237
string vertices = m.Groups[2].Value;
// This is a list of (number \(name\))* pairs
// we will assume that the parantheses are matched, or we can't do much
bool onevertex;
if (m.Groups[1].Value == "ex") // one vertEX
onevertex = true;
else if (m.Groups[1].Value == "ices")
onevertex = false;
else
throw new DryadException("Can't figure out if one or many vertices");
IEnumerable> vertexList = DryadLinqJobInfo.ParseVertices(vertices);
int vertexcount = 0;
int iversion = int.Parse(version);
if (lineTimeStamp > this.lastTimestampSeen)
this.lastTimestampSeen = lineTimeStamp;
foreach (var p in vertexList)
{
int number = p.Item2;
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, iversion);
//new ExecutedVertexInstance(this, number, version, name, identifier, machine, this.lastTimestampSeen);
if (vi == null)
Trace.TraceInformation("Could not find information for vertex {0}.{1}", number, version);
else
vi.SetStartInformation(this, machine, this.lastTimestampSeen, identifier, guid);
vertexcount++;
}
if (vertexcount > 1 && onevertex)
throw new DryadException("Expected one vertex, found " + vertexcount);
}
else
{
Trace.TraceInformation("Unexpected parsing error on line {0}", line);
}
}
else if (line.Contains("Abandoning"))
{
Match m = vertexAbandonedRegex.Match(line);
if (m.Success)
{
int number = Int32.Parse(m.Groups[1].Value);
int version = Int32.Parse(m.Groups[2].Value);
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (vi != null)
vi.SetState(ExecutedVertexInstance.VertexState.Abandoned);
}
}
else if (line.Contains("Setting"))
{
Match m = setToFailedlRegex.Match(line);
if (m.Success)
{
// Setting vertex 1461.0 (Merge__13[258]) to failed
// Setting vertex (\d+)\.(\d+) \((.+)\) to failed(.*)
int number = Int32.Parse(m.Groups[1].Value);
int version = Int32.Parse(m.Groups[2].Value);
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (vi != null)
{
vi.SetState(ExecutedVertexInstance.VertexState.Failed);
//vi.ErrorString = m.Groups[4].Value;
}
}
}
else if (line.Contains("Process was terminated"))
{
// terminatedRegex = new Regex(@"Process was terminated Vertex (\d+)\.(\d+) \((.+)\) GUID \{?([-A-F0-9]+)\}? machine (\S+) status (.*)",
// Process was terminated Vertex 11.0 (Select__6[1]) GUID {C1E35A88-F5AD-4A26-BE5F-46B6D515623F} machine sherwood-118 status The operation succeeded
Match m = terminatedRegex.Match(line);
if (m.Success)
{
lineTimeStamp = ParseLineTimestamp(line);
int number = Int32.Parse(m.Groups[1].Value);
int version = Int32.Parse(m.Groups[2].Value);
ExecutedVertexInstance vi = this.jobVertices.FindVertex(number, version);
if (vi != null)
{
// sometimes successful processes are terminated, because they don't report quickly enough being done
if (vi.State != ExecutedVertexInstance.VertexState.Successful)
{
vi.SetState(ExecutedVertexInstance.VertexState.Cancelled);
}
vi.ErrorString = m.Groups[6].Value;
if (lineTimeStamp != DateTime.MinValue)
vi.End = lineTimeStamp;
}
}
}
else if (line.Contains("Timing Information Graph Start Time"))
{
// Cosmos-specific line
// Timing Information Graph Start Time 128654556581866096
Match m = Regex.Match(line, @"Timing Information Graph Start Time (\d+)");
DateTime createTime = Utilities.Convert64time(ClusterConfiguration.GetClusterTimeZone(this.Summary), m.Groups[1].Value);
this.ManagerVertex.SetStartInformation(this, this.Summary.Machine, createTime, this.Summary.ManagerProcessGuid, "");
this.ManagerVertex.StartCommandTime = this.ManagerVertex.CreationTime = this.ManagerVertex.VertexScheduleTime = createTime;
this.lastTimestampSeen = createTime;
}
else if (line.StartsWith("Start time: "))
{
// HPC L2H specific line
// Start time: 04/05/2011 17:25:42.223
DateTime createTime;
bool parse = DateTime.TryParse(line.Substring("Start time: ".Length), out createTime);
if (parse)
{
this.ManagerVertex.SetStartInformation(this, this.Summary.Machine, createTime, this.Summary.ManagerProcessGuid, "");
this.ManagerVertex.StartCommandTime = this.ManagerVertex.CreationTime = this.ManagerVertex.VertexScheduleTime = createTime;
this.lastTimestampSeen = createTime;
}
}
else if (line.Contains("JM Finish time:"))
{
// Cosmos-specific line
// JM Finish time: 129140295499437263 2010-03-25T22:25:49.943726Z
Match m = Regex.Match(line, @"JM Finish time: (\d+)");
DateTime time = Utilities.Convert64time(ClusterConfiguration.GetClusterTimeZone(this.Summary), m.Groups[1].Value);
this.lastTimestampSeen = time;
this.ManagerVertex.End = time;
}
else if (line.StartsWith("Stop time "))
{
// HPC L2H specific line
// Stop time (Exit code = 2148734208): 04/05/2011 17:25:46.614
Regex regex = new Regex(@"Stop time \(Exit code = (.*)\): (.*)");
Match m = regex.Match(line);
if (m.Success)
{
this.ManagerStdoutIncomplete = false;
DateTime time;
bool parse = DateTime.TryParse(m.Groups[2].Value, out time);
if (parse)
{
this.lastTimestampSeen = time;
this.ManagerVertex.End = time;
}
this.ErrorCode = m.Groups[1].Value;
if (this.ErrorCode == "0")
{
this.ManagerVertex.SetState(ExecutedVertexInstance.VertexState.Successful);
}
else
{
this.ManagerVertex.SetState(ExecutedVertexInstance.VertexState.Failed);
}
}
}
else if (line.Contains("Timing Information"))
{
// Timing Information 4 1 Super__0[0] 128654556603428182 0.0000 0.0000 0.0000 0.0000 0.2500
Match m = timingInfoRegex.Match(line);
if (m.Success)
{
int vertex = Int32.Parse(m.Groups[1].Value);
int version = Int32.Parse(m.Groups[2].Value);
DateTime createtime = Utilities.Convert64time(ClusterConfiguration.GetClusterTimeZone(this.Summary), m.Groups[4].Value);
ExecutedVertexInstance vi = jobVertices.FindVertex(vertex, version);
if (vi == null)
return; // we do not keep track of vertices with duplicate scheduling, so these won't show up here
if (vi.State == ExecutedVertexInstance.VertexState.Started)
{
Console.WriteLine("Timing information while vertex is still running " + vi);
//throw new ClusterException("Timing information for vertex still running: " + vi);
}
DateTime last = vi.SetTiming(createtime, m.Groups[5].Value, m.Groups[6].Value, m.Groups[7].Value, m.Groups[8].Value, m.Groups[9].Value);
if (last > this.lastTimestampSeen)
this.lastTimestampSeen = last;
this.ManagerVertex.MarkVertexWasRunning(last);
try
{
if (vi.State == ExecutedVertexInstance.VertexState.Successful)
this.UsefulCPUTime += vi.RunningTime;
else if (vi.RunningTime > TimeSpan.Zero)
this.WastedCPUTime += vi.RunningTime;
}
catch (Exception ex)
{
Console.WriteLine("Time value exception: " + ex.Message);
}
}
else
throw new DryadException("Unmatched timing information line " + line);
}
else if (line.Contains("Process has failed"))
{
// Process has failed Vertex 11.0 (Select__6[1]) GUID {C1E35A88-F5AD-4A26-BE5F-46B6D515623F} machine sherwood-118 Exitcode 0 status The operation succeeded
// failedRegex = new Regex(@"Process has failed Vertex (\d+)\.(\d+) \((.+)\) GUID \{?([-A-F0-9]+)\}? machine (\S+) Exitcode (.*)",
Match m = failedRegex.Match(line);
if (m.Success)
{
lineTimeStamp = ParseLineTimestamp(line);
int vertex = Int32.Parse(m.Groups[1].Value);
int version = Int32.Parse(m.Groups[2].Value);
string exitcode = m.Groups[6].Value;
//string status = m.Groups[7].Value;
ExecutedVertexInstance vi = jobVertices.FindVertex(vertex, version);
if (vi != null)
{
vi.SetState(ExecutedVertexInstance.VertexState.Failed);
vi.ExitCode = exitcode;
if (lineTimeStamp != DateTime.MinValue)
vi.End = lineTimeStamp;
//vi.ErrorString = status;
}
}
}
else if (line.Contains("ABORTING:"))
{
this.AbortingMsg = line.Substring(10);
this.ManagerVertex.SetState(ExecutedVertexInstance.VertexState.Failed);
}
else if (line.Contains("Accurate read data"))
{
Match m = datareadRegex.Match(line);
if (m.Success)
{
this.TotalDataRead = long.Parse(m.Groups[1].Value);
this.LocalReadData = long.Parse(m.Groups[2].Value);
this.IntraPodDataRead = long.Parse(m.Groups[3].Value);
this.CrossPodDataRead = long.Parse(m.Groups[4].Value);
}
}
else if (line.Contains(""))
{
//some errors contains "Error returned from managed runtime invocation"
//which shows the error is from application code
Match m = Regex.Match(line, @"\(.*)\");
if (m.Success && lastFailedVertex != null)
{
lastFailedVertex.AddErrorString(System.Web.HttpUtility.HtmlDecode(m.Groups[1].Value));
}
}
else if (line.Contains("Canceling"))
{
// Canceling vertex 1461.0 (Merge__13[258]) due to dependent failure
Match m = cancelRegex.Match(line);
if (m.Success)
{
lineTimeStamp = ParseLineTimestamp(line);
int vertex = Int32.Parse(m.Groups[1].Value);
int version = Int32.Parse(m.Groups[2].Value);
string name = m.Groups[3].Value;
ExecutedVertexInstance vi = jobVertices.FindVertex(vertex, version);
if (vi != null)
{
if (vi.State == ExecutedVertexInstance.VertexState.Successful)
vi.SetState(ExecutedVertexInstance.VertexState.Invalidated);
else
vi.SetState(ExecutedVertexInstance.VertexState.Cancelled);
if (lineTimeStamp != DateTime.MinValue)
vi.End = lineTimeStamp;
}
else
{
// TODO: this should not be needed, but this is a workaround for a bug in the HPC L2H software
vi = new ExecutedVertexInstance(this, vertex, version, name, "", lineTimeStamp);
vi.SetState(ExecutedVertexInstance.VertexState.Cancelled);
this.jobVertices.Add(vi);
}
// Process wasn't even started, so there is nothing to cancel
}
}
else if (line.Contains("Application"))
{
//the job ends successfully
Regex endSuccessRegex = new Regex(@"Application completed successfully.");
//the job failed
Regex endFailRegex = new Regex(@"Application failed with error code (.*)");
Match m1 = endFailRegex.Match(line);
if (m1.Success)
{
this.ErrorCode = m1.Groups[1].Value;
this.ManagerStdoutIncomplete = false;
this.ManagerVertex.SetState(ExecutedVertexInstance.VertexState.Failed);
}
else
{
Match m2 = endSuccessRegex.Match(line);
if (m2.Success)
{
this.ManagerVertex.SetState(ExecutedVertexInstance.VertexState.Successful);
this.ManagerStdoutIncomplete = false;
}
}
}
else if (line.StartsWith("Input"))
{
// Input vertex %u (%s) had %u read failure%s\n
Match m = inputFailureRegex.Match(line);
if (m.Success)
{
this.AbortingMsg = line;
}
}
else if (line.Contains("Vertex"))
{
// terminationRegex = new Regex(@"Vertex (\d+)\.(\d+) \((.+)\) machine (\S+) guid \{?([-0-9A-F]+)\}? status (.*)"
Match m = terminationRegex.Match(line);
if (m.Success)
{
lineTimeStamp = ParseLineTimestamp(line);
int vertex = Int32.Parse(m.Groups[1].Value);
int version = Int32.Parse(m.Groups[2].Value);
ExecutedVertexInstance vi = this.jobVertices.FindVertex(vertex, version);
if (vi == null)
{
Trace.TraceInformation("Could not find vertex {0}.{1} line {2}", vertex, version, line);
}
else
{
bool failed = vi.SetTermination(m.Groups[6].Value, lineTimeStamp);
if (failed)
this.lastFailedVertex = vi;
}
}
}
if (lineTimeStamp != DateTime.MinValue)
this.lastTimestampSeen = lineTimeStamp;
}
private Dictionary cachedStages;
///
/// Get information about a particular stage.
///
/// Name of stage sought.
/// A description of the stage in question, or null if there are no vertices in that stage.
public DryadLinqJobStage GetStage(string stagename)
{
if (this.cachedStages.ContainsKey(stagename))
return this.cachedStages[stagename];
List stageVertices = this.jobVertices.GetStageVertices(stagename);
if (stageVertices == null)
stageVertices = new List();
DryadLinqJobStage retval = new DryadLinqJobStage(stagename, stageVertices);
this.cachedStages.Add(stagename, retval);
return retval;
}
private ISharedStreamReader cachedStdoutReader;
///
/// Remember how many lines were parsed, and skip them on a second invocation.
///
private int stdoutLinesParsed;
///
/// Parse the stdout.txt file from the job manager.
///
/// File to parse.
/// Communication manager.
/// True if the parsing succeeds.
private bool ParseStdout(IClusterResidentObject file, CommManager manager)
{
int currentLine = 0;
if (this.stdoutLinesParsed == 0)
// don't lose it if we are only parsing the tail.
this.lastTimestampSeen = this.Summary.Date; // start from the job submission timestamp
// we are reusing the stream
this.stdoutLinesParsed = 0;
try
{
long filesize = file.Size;
long readbytes = 0;
string message = "Scanning JM stdout " + file;
if (filesize >= 0)
message += string.Format("({0:N0} bytes)", filesize);
manager.Status(message, StatusKind.LongOp);
if (this.cachedStdoutReader == null)
this.cachedStdoutReader = file.GetStream(true);
if (this.cachedStdoutReader.Exception != null)
{
manager.Status("Exception while opening stdout " + this.cachedStdoutReader.Exception.Message, StatusKind.Error);
return false;
}
while (!this.cachedStdoutReader.EndOfStream)
{
string line = this.cachedStdoutReader.ReadLine();
readbytes += line.Length;
if (currentLine >= this.stdoutLinesParsed)
{
while (true)
{
manager.Token.ThrowIfCancellationRequested();
bool completeLine = true;
try
{
if (this.ClusterConfiguration is DfsClusterConfiguration)
completeLine = this.ParseStdoutLineNew(line);
else
{
this.ParseStdoutLine(line);
}
}
catch (Exception ex)
{
manager.Status(string.Format("Line {0}: Exception {1}", currentLine, ex.Message), StatusKind.Error);
Console.WriteLine("Line {0}: Exception {1}", currentLine, ex);
}
if (!completeLine)
{
if (this.cachedStdoutReader.EndOfStream)
// no exception, the log may be truncated
break;
string extraline = this.cachedStdoutReader.ReadLine();
line += "\n" + extraline;
currentLine++;
}
else break;
}
}
currentLine++;
if (currentLine % 100 == 0 && filesize > 0)
{
manager.Progress(Math.Min(100, (int)(100 * readbytes / filesize)));
}
}
this.stdoutLinesParsed = currentLine;
if (this.ManagerVertex != null)
{
if (this.ManagerVertex.End == DateTime.MinValue)
// approximation
this.ManagerVertex.End = this.lastTimestampSeen;
// we are done with this stream
if (this.ManagerVertex.State == ExecutedVertexInstance.VertexState.Failed ||
this.ManagerVertex.State == ExecutedVertexInstance.VertexState.Successful)
{
this.cachedStdoutReader.Close();
this.cachedStdoutReader = null; // will force reopening if refreshed
}
}
return true;
}
catch (Exception e)
{
manager.Status("Exception while reading stdout " + e.Message, StatusKind.Error);
Trace.TraceInformation(e.ToString());
return false;
}
}
///
/// How many log files were successfuly parsed.
///
private int logFilesParsed;
///
/// Parse the logs generated by the Job Manager and learn more information from them.
/// This function should be called after parsing the stdout.
/// This function is extremely slow; it may be invoked on a background thread.
/// Delegate used to report errors.
/// True on success.
/// Delegate used to report progress.
///
public bool ParseJMLogs(StatusReporter statusReporter, Action updateProgress)
{
IClusterResidentObject dir = this.ClusterConfiguration.ProcessLogDirectory(this.Summary.ManagerProcessGuid, this.ManagerVertex.VertexIsCompleted, this.Summary.Machine, this.Summary);
if (dir.Exception != null)
{
statusReporter("Exception finding logs in " + dir, StatusKind.Error);
return false;
}
string pattern = this.ClusterConfiguration.JMLogFilesPattern(false, this.Summary);
List logfiles = dir.GetFilesAndFolders(pattern).ToList();
long totalWork = 0;
foreach (var logfile in logfiles)
{
if (logfile.Size >= 0 && totalWork > 0)
totalWork += logfile.Size;
else
totalWork = -1;
}
bool success = true;
statusReporter(string.Format("Parsing {0} log files", logfiles.Count - this.logFilesParsed), StatusKind.OK);
int currentFile = 0;
bool invalidateCache = false;
foreach (var logfile in logfiles)
{
if (currentFile >= this.logFilesParsed)
{
invalidateCache = true;
success = this.ParseJMLogFile(logfile, statusReporter);
}
if (!success)
// stop at first failure
break;
currentFile++;
updateProgress(100 * currentFile / logfiles.Count);
}
updateProgress(100);
if (invalidateCache)
this.InvalidateCaches();
// reparse the last one again
this.logFilesParsed = currentFile - 1;
return success;
}
///
/// Parse a log file of the job manager and extract useful information.
///
/// Log file to parse.
/// Delegate used to parse errors.
/// True if parsing succeeds.
internal bool ParseJMLogFile(IClusterResidentObject logfile, StatusReporter statusReporter)
{
bool success = true;
ISharedStreamReader sr = logfile.GetStream(false);
if (sr.Exception != null)
{
statusReporter("Exception while opening file " + logfile + ":" + sr.Exception.Message, StatusKind.Error);
return false;
}
while (!sr.EndOfStream)
{
string line = sr.ReadLine();
if (!line.Contains("DryadProfiler")) continue;
DryadLogEntry le = new DryadLogEntry(line);
if (le.Subsystem != "DryadProfiler") continue;
if (!le.Message.EndsWith("channel status")) continue;
Dictionary kvp = Utilities.ParseCommaSeparatedKeyValuePair(le.ExtraInfo);
string verver = kvp["Vertex"];
string[] numbers = verver.Split('.');
int vertex = int.Parse(numbers[0]);
int version = int.Parse(numbers[1]);
ExecutedVertexInstance vi = this.jobVertices.FindVertex(vertex, version);
if (vi == null)
{
// We have overshot the information about the vertices parsed from stdout; stop parsing here
success = false;
break;
}
if (le.Message == "Input channel status")
{
// Vertex=69.0, Name=Merge__446[3], MachPod=sherwood-005:pod1, TotalRead=1470802, TotalReadFromMach=1470802, TotalReadCrossMach=1470802, TotalReadCrossPod=0
long info = long.Parse(kvp["TotalRead"]);
vi.DataRead = info;
}
else if (le.Message == "Output channel status")
{
// Vertex=69.0, Name=Merge__446[3], MachPod=sherwood-005:pod1, TotalWrite=1213418
long info = long.Parse(kvp["TotalWrite"]);
vi.DataWritten = info;
}
}
sr.Close();
return success;
}
///
/// The list of all stage names in the job.
///
/// An iterator over the stage names.
public IEnumerable GetStageNames()
{
return this.jobVertices.GetAllStageNames();
}
///
/// The list of all stages in the job.
///
/// An iterator over all stages.
public IEnumerable AllStages()
{
return this.GetStageNames().Select(this.GetStage);
}
///
/// Generate a layout suitable for drawing the plan.
///
/// A graph layout.
/// Delegate used to report errors.
// ReSharper disable once UnusedParameter.Global
public GraphLayout ComputePlanLayout(StatusReporter statusReporter)
{
IEnumerable stages = this.AllStages().OrderBy(s => s.StartTime).ToList();
if (!stages.Any())
// no layout to compute
return null;
DateTime jobStartTime = this.StartJMTime;
DateTime lastTime = stages.Max(s => s.EndTime);
if (lastTime == jobStartTime)
// avoid the degenerate case
lastTime = jobStartTime + new TimeSpan(0, 0, 1);
GraphLayout result = new GraphLayout((lastTime - jobStartTime).TotalSeconds, stages.Count()*2);
int currentStage = 0;
foreach (DryadLinqJobStage s in stages)
{
// node represents the schedule: horizontal position is starttime - endtime
DateTime endTime = s.EndTime;
DateTime startTime = s.StartTime;
if (endTime <= jobStartTime) // unknown time?
endTime = lastTime; // assume still running
if (startTime <= jobStartTime)
startTime = jobStartTime;
GraphLayout.GraphNode node = new GraphLayout.GraphNode(
(startTime - jobStartTime).TotalSeconds, currentStage*2, (endTime - startTime).TotalSeconds, 1);
node.Shape = GraphLayout.GraphNode.NodeShape.Box;
node.Label = s.Name;
node.Stage = s.Name;
result.Add(node);
currentStage++;
}
return result;
}
///
/// Find a vertex having specified the process id.
///
/// Process id.
/// The vertex with the specified guid.
public ExecutedVertexInstance FindVertex(DryadProcessIdentifier id)
{
return this.jobVertices.FindVertexByGuid(id.ToString());
}
///
/// Invalidate the cached information.
///
public void InvalidateCaches()
{
this.cachedStages.Clear();
}
}
///
/// Summary information about a job plan.
///
public abstract class DryadJobStaticPlan
{
///
/// Connection between two stages.
///
public class Connection
{
///
/// Arity of connection.
///
public enum ConnectionType
{
///
/// Point-to-point connection between two stages.
///
PointToPoint,
///
/// Cross-product connection between two stages.
///
AllToAll
};
///
/// Type of channel backing the connection.
///
public enum ChannelType
{
///
/// Persistent file.
///
DiskFile,
///
/// In-memory fifo.
///
Fifo,
///
/// TCP pipe.
///
TCP
}
///
/// Stage originating the connection.
///
public Stage From { internal set; get; }
///
/// Stage terminating the connection.
///
public Stage To { internal set; get; }
///
/// Type of connection.
///
public ConnectionType Arity { get; internal set; }
///
/// Type of channel backing the connection.
///
public ChannelType ChannelKind { get; internal set; }
///
/// Dynamic manager associated with the connection.
///
public string ConnectionManager { get; internal set; }
///
/// Color used to represent the connection.
///
/// A string describing the color.
public string Color()
{
switch (this.ChannelKind)
{
case ChannelType.DiskFile:
return "black";
case ChannelType.Fifo:
return "red";
case ChannelType.TCP:
return "yellow";
default:
throw new Exception("Unknown channel kind " + this.ChannelKind);
}
}
}
///
/// Per-node connection information (should be per-edge...)
///
protected struct ConnectionInformation
{
///
/// Type of connection.
///
public Connection.ConnectionType Arity { get; internal set; }
///
/// Type of channel backing the connection.
///
public Connection.ChannelType ChannelKind { get; internal set; }
///
/// Dynamic manager associated with the connection.
///
public string ConnectionManager { get; internal set; }
}
///
/// Information about a stage.
///
public class Stage
{
///
/// Stage name.
///
public string Name { get; internal set; }
///
/// Code executed in the stage.
///
public string [] Code { get; internal set; }
///
/// DryadLINQ operator implemented by the stage.
///
public string Operator { get; internal set; }
///
/// Number of vertices in stage.
///
public int Replication { get; internal set; }
///
/// Unique identifier.
///
public int Id { get; set; }
///
/// True if the stage is an input.
///
public bool IsInput { get; internal set; }
///
/// True if the stage is an output.
///
public bool IsOutput { get; internal set; }
///
/// True if the stage is a tee.
///
public bool IsTee { get; internal set; }
///
/// True if the stage is a concatenation.
///
public bool IsConcat { get; internal set; }
///
/// True if the stage is virtual (no real vertices synthesized).
///
public bool IsVirtual { get { return this.IsInput || this.IsOutput || this.IsTee || this.IsConcat; } }
///
/// Only defined for tables.
///
public string Uri { get; internal set; }
///
/// Only defined for tables.
///
public string UriType { get; internal set; }
}
///
/// Map from stage id to stage.
///
protected readonly Dictionary stages;
///
/// List of inter-stage connections in the plan.
///
protected readonly List connections;
///
/// Store here per-node connection information (map from node id).
///
protected readonly Dictionary perNodeConnectionInfo;
///
/// Stream containing the plan.
///
protected readonly ISharedStreamReader planStream;
///
/// Create a dryadlinq job plan starting from an xml plan file.
///
/// Stream containing the plan.
protected DryadJobStaticPlan(ISharedStreamReader plan)
{
if (plan.Exception != null)
// don't do this
throw plan.Exception;
this.planStream = plan;
this.stages = new Dictionary();
this.connections = new List();
this.perNodeConnectionInfo = new Dictionary();
this.fictitiousStages = 0;
}
///
/// Parse the query plan: cluster-specific.
///
/// Communication manager.
protected abstract void ParseQueryPlan(CommManager manager);
int fictitiousStages;
///
/// Create a fictitious node for the job manager.
///
public void AddFictitiousStages()
{
this.fictitiousStages = 2;
Stage stage = new Stage();
stage.Id = -1;
stage.Replication = 1;
stage.Operator = "Job Manager";
stage.Name = "JobManager";
stage.Code = null;
this.stages.Add(stage.Id, stage);
stage = new Stage();
stage.Id = -2;
stage.Replication = 1;
stage.Operator = "All vertices";
stage.Name = "All vertices";
stage.Code = null;
this.stages.Add(stage.Id, stage);
}
///
/// Find the stage given the stage id as a string.
///
/// Stage id.
/// A handle to the stage with the specified static Id.
public Stage GetStageByStaticId(string stageId)
{
int id = int.Parse(stageId);
return this.stages[id];
}
///
/// Find the stage given the stage name.
///
/// Name of stage to return.
/// The stage with the given name or null.
public Stage GetStageByName(string name)
{
foreach (Stage s in this.stages.Values)
{
if (s.Name.Equals(name))
return s;
}
return null;
}
///
/// The list of all stages in the plan.
///
/// An iterator over the list of stages.
public IEnumerable GetAllStages()
{
return this.stages.Values;
}
///
/// The list of all connections in the plan.
///
/// An iterator over a list of connections.
public IEnumerable GetAllConnections()
{
return this.connections;
}
///
/// Number of stages in static plan.
///
public int StageCount
{
get
{
return this.stages.Count - this.fictitiousStages;
}
}
///
/// Get all connections adjacent to a stage. Warning: this method is inefficient.
///
/// If true return the stage inputs, else return the stage outputs.
/// Stage we are looking for.
/// A list of connections.
public IEnumerable GetStageConnections(Stage stage, bool inputs)
{
foreach (Connection c in this.GetAllConnections())
{
if (inputs && c.To == stage)
yield return c;
else if (!inputs && c.From == stage)
yield return c;
}
}
///
/// Factory: create the plan for a given job.
///
/// Job to create plan for.
/// The plan or null.
/// Communication manager.
public static DryadJobStaticPlan CreatePlan(DryadLinqJobInfo dryadLinqJobInfo, CommManager manager)
{
manager.Status("Trying to build static plan", StatusKind.LongOp);
ClusterConfiguration config = dryadLinqJobInfo.ClusterConfiguration;
IClusterResidentObject file = config.JobQueryPlan(dryadLinqJobInfo.Summary);
if (config is CacheClusterConfiguration)
config = (config as CacheClusterConfiguration).ActualConfig(dryadLinqJobInfo.Summary);
if (file.Exception == null)
{
DryadJobStaticPlan retval;
{
retval = new DryadLinqJobStaticPlan(config, file.GetStream(false));
}
retval.ParseQueryPlan(manager);
return retval;
}
else
{
manager.Status("Exception while looking for plan " + file.Exception.Message, StatusKind.Error);
return null;
}
}
}
///
/// DryadLINQ-specific job information.
///
public class DryadLinqJobStaticPlan : DryadJobStaticPlan
{
///
/// Create a DryadLinqJobStaticPlan.
///
/// Cluster configuration.
/// Stream containing the file.
// ReSharper disable once UnusedParameter.Local
public DryadLinqJobStaticPlan(ClusterConfiguration config, ISharedStreamReader planFile)
: base(planFile)
{
}
///
/// Parse an XML query plan and represent that information.
///
/// Communicaton manager.
protected override void ParseQueryPlan(CommManager manager)
{
string planString = this.planStream.ReadToEnd(manager.Token);
XDocument plan = XDocument.Parse(planString);
// ReSharper disable PossibleNullReferenceException
XElement query = plan.Root.Elements().First(e => e.Name == "QueryPlan");
IEnumerable vertices = query.Elements().Where(e => e.Name == "Vertex");
foreach (XElement v in vertices)
{
Stage stage = new Stage();
stage.Id = int.Parse(v.Element("UniqueId").Value);
stage.Replication = int.Parse(v.Element("Partitions").Value);
stage.Operator = v.Element("Type").Value;
stage.Name = v.Element("Name").Value;
{
string code = v.Element("Explain").Value;
string[] lines = code.Split('\n');
if (lines.Length > 1)
{
stage.Code = lines.Skip(1). // drop stage name
Select(l => l.Trim()). // remove leading tab
ToArray();
}
else
{
stage.Code = new string[] { };
}
}
this.stages.Add(stage.Id, stage);
{
// These should be connection attributes, not stage attributes.
string cht = v.Element("ChannelType").Value;
string connectionManager = v.Element("DynamicManager").Element("Type").Value;
string connection = v.Element("ConnectionOperator").Value;
ConnectionInformation info = new ConnectionInformation();
info.ConnectionManager = connectionManager;
switch (connection)
{
case "Pointwise":
info.Arity = Connection.ConnectionType.PointToPoint;
break;
case "CrossProduct":
info.Arity = Connection.ConnectionType.AllToAll;
break;
default:
throw new DryadException("Don't know about connection of type " + connection);
}
switch (cht)
{
case "DiskFile":
info.ChannelKind = Connection.ChannelType.DiskFile;
break;
case "TCPPipe":
info.ChannelKind = Connection.ChannelType.TCP;
break;
case "MemoryFIFO":
info.ChannelKind = Connection.ChannelType.Fifo;
break;
default:
throw new DryadException("Don't know about channel of type " + cht);
}
this.perNodeConnectionInfo.Add(stage.Id, info);
}
switch (stage.Operator)
{
case "InputTable":
stage.IsInput = true;
stage.UriType = v.Element("StorageSet").Element("Type").Value;
stage.Uri = v.Element("StorageSet").Element("SourceURI").Value;
break;
case "OutputTable":
stage.IsOutput = true;
stage.UriType = v.Element("StorageSet").Element("Type").Value;
stage.Uri = v.Element("StorageSet").Element("SinkURI").Value;
break;
case "Tee":
stage.IsTee = true;
break;
case "Concat":
stage.IsConcat = true;
break;
}
if (!v.Elements("Children").Any())
continue;
bool first = true;
var children = v.Element("Children").Elements().Where(e => e.Name == "Child").ToList();
foreach (XElement child in children)
{
// This code parallels the graphbuilder.cpp for XmlExecHost
Connection conn = new Connection();
int fromid = int.Parse(child.Element("UniqueId").Value);
ConnectionInformation fromConnectionInformation = this.perNodeConnectionInfo[fromid];
Stage from = this.stages[fromid];
conn.From = from;
conn.To = stage;
conn.ChannelKind = fromConnectionInformation.ChannelKind;
ConnectionInformation thisConnectionInformation = this.perNodeConnectionInfo[stage.Id];
switch (thisConnectionInformation.ConnectionManager)
{
case "FullAggregator":
case "HashDistributor":
case "RangeDistributor":
// Ignore except first child
if (first)
{
first = false;
conn.ConnectionManager = thisConnectionInformation.ConnectionManager;
}
else
{
conn.ConnectionManager = "";
}
break;
case "PartialAggregator":
case "Broadcast":
// All children have the same connection manager
conn.ConnectionManager = thisConnectionInformation.ConnectionManager;
break;
case "Splitter":
// The connection manager depends on the number of children
if (first)
{
first = false;
if (children.Count() == 1)
conn.ConnectionManager = thisConnectionInformation.ConnectionManager;
else
conn.ConnectionManager = "SemiSplitter";
}
else
{
conn.ConnectionManager = "";
}
break;
case "None":
case "":
break;
}
conn.Arity = fromConnectionInformation.Arity;
this.connections.Add(conn);
}
}
// ReSharper restore PossibleNullReferenceException
}
}
///
/// Scope-specific job information.
///
public class ScopeJobStaticPlan : DryadJobStaticPlan
{
private readonly ISharedStreamReader vertexDef;
///
/// Create a ScopeJobStaticPlan.
///
/// Cluster configuration.
/// Stream containing the file.
/// File containing the vertex definition (ScopeVertexDef.xml).
// ReSharper disable once UnusedParameter.Local
public ScopeJobStaticPlan(ClusterConfiguration config, ISharedStreamReader planFile, ISharedStreamReader vertexDef)
: base(planFile)
{
this.vertexDef = vertexDef;
}
///
/// Simplify the name of an IO.
///
/// Name to simplify.
/// The simplified name.
private static string NormalizeIOName(string ioname)
{
// drop everything between square braces
Regex re = new Regex(@"\[.*\]");
string result = re.Replace(ioname, "");
return result;
}
///
/// Parse the Algebra file.
///
/// Communication manager.
private void ParseAlgebra(CommManager manager)
{
// TODO: this parser is not really complete, as I don't understand the semantics of all xml elements.
Dictionary outToStage = new Dictionary(); // map an output to a stage name. Assume that ios have unique names.
Dictionary> inputs = new Dictionary>();
// ...
string planString = this.planStream.ReadToEnd(manager.Token);
XDocument plan = XDocument.Parse(planString);
// ReSharper disable PossibleNullReferenceException
XElement graph = plan.Root.Element("graph"); // graph node, children are stages
// add stages
int id = 0;
foreach (XElement child in graph.Elements())
{
if (child.Name == "process")
{
string stageName = child.Attribute("id").Value;
Stage stage = new Stage();
stage.Name = stageName;
stage.Replication = 1;
stage.Code = new string[] { child.Attribute("command").Value };
stage.Id = id++;
this.stages.Add(stage.Id, stage);
List stageInputs = new List();
inputs.Add(stageName, stageInputs);
foreach (var io in child.Elements())
{
if (io.Name != "input" && io.Name != "output")
continue;
string cosmosStream = io.Attribute("cosmosStream") != null ? io.Attribute("cosmosStream").Value : null;
string structuredStream = io.Attribute("structuredStream") != null ? io.Attribute("structuredStream").Value : null;
string ioid = NormalizeIOName(io.Attribute("id").Value);
string streamname = cosmosStream ?? structuredStream;
if (io.Name == "input")
{
stageInputs.Add(ioid);
if (streamname != null)
{
Stage alreadyDone = this.GetStageByName(streamname);
if (alreadyDone == null)
{
Stage input = new Stage();
input.Id = id++;
input.IsInput = true;
input.Name = streamname;
input.Code = new string[0];
input.Replication = 1;
input.UriType = "cosmos";
input.Uri = "cosmos://" + streamname;
this.stages.Add(input.Id, input);
}
outToStage.Add(ioid, streamname);
}
}
else if (io.Name == "output")
{
outToStage.Add(ioid, stageName);
if (streamname != null)
{
Stage alreadyDone = this.GetStageByName(streamname);
if (alreadyDone == null)
{
Stage output = new Stage();
output.IsOutput = true;
output.Replication = 1;
output.Name = streamname;
output.Code = new string[0];
output.Id = id++;
output.UriType = "cosmos";
output.Uri = "cosmos://" + streamname;
this.stages.Add(output.Id, output);
inputs.Add(streamname, new List { ioid });
}
}
}
}
}
else if (child.Name == "dataset")
{
string stageName = child.Attribute("id").Value;
stageName = NormalizeIOName(stageName);
Stage stage = new Stage();
stage.Name = stageName;
stage.Replication = 1;
stage.IsConcat = true;
stage.Code = new string[0];
stage.Id = id++;
this.stages.Add(stage.Id, stage);
List stageInputs = new List();
inputs.Add(stageName, stageInputs);
foreach (var io in child.Elements())
{
if (io.Name == "element")
{
string ioid = NormalizeIOName(io.Attribute("id").Value);
stageInputs.Add(ioid);
}
}
// implicit output with stage name
outToStage.Add(stage.Name, stage.Name);
}
else if (child.Name == "inputStreamList")
{
Stage input = new Stage();
input.Id = id++;
input.IsInput = true;
input.Name = child.Attribute("id").Value;
input.Replication = 1;
input.UriType = "cosmos";
input.Uri = "inputStreamList";
input.Code = child.Elements().Select(e => e.Attribute("cosmosPath").Value).ToArray();
this.stages.Add(input.Id, input);
outToStage.Add(input.Name, input.Name);
}
else if (child.Name == "outputStreamList")
{
Stage output = new Stage();
output.Id = id++;
output.IsOutput = true;
output.Name = child.Attribute("id").Value;
output.Replication = 1;
output.UriType = "cosmos";
output.Uri = "outputStreamList";
output.Code = child.Elements().Select(e => e.Attribute("cosmosPath").Value).ToArray();
this.stages.Add(output.Id, output);
inputs.Add(output.Name, new List { output.Name });
}
}
// scan the dictionaries and build the edges
foreach (string stage in inputs.Keys)
{
foreach (string inputName in inputs[stage])
{
string iName = inputName;
if (outToStage.ContainsKey(iName))
{
string sourceStage = outToStage[iName];
Connection conn = new Connection();
conn.From = this.GetStageByName(sourceStage);
conn.To = this.GetStageByName(stage);
conn.ChannelKind = Connection.ChannelType.DiskFile;
conn.ConnectionManager = "";
conn.Arity = Connection.ConnectionType.PointToPoint;
this.connections.Add(conn);
}
else
{
Trace.TraceInformation("Could not find stage for input {0}", iName);
}
}
}
// ReSharper restore PossibleNullReferenceException
}
///
/// Parse the vertex definition file.
///
private void ParseVertexDef(CommManager manager)
{
if (this.vertexDef.Exception != null)
return;
//
string planString = this.vertexDef.ReadToEnd(manager.Token);
XDocument vxDef = XDocument.Parse(planString);
XElement vertices = vxDef.Root;
// ReSharper disable PossibleNullReferenceException
foreach (XElement vertex in vertices.Elements())
{
List code = new List();
string id = vertex.Attribute("id").Value;
Stage stage = this.GetStageByName(id);
if (stage == null)
{
Trace.TraceInformation("Could not find stage {0}", id);
continue;
}
foreach (XElement op in vertex.Elements("operator"))
{
string className = op.Attribute("className").Value;
if (op.Attribute("args") != null)
className += " " + op.Attribute("args").Value;
code.Add(className);
foreach (XElement input in op.Elements("input"))
{
XAttribute indexatt = input.Attribute("inputIndex");
string index = indexatt != null ? indexatt.Value : " ";
string schema = input.Attribute("schema").Value;
code.Add("\tI" + index + ": " + schema);
}
foreach (XElement output in op.Elements("output"))
{
XAttribute indexatt = output.Attribute("outputIndex");
string index = indexatt != null ? indexatt.Value : " ";
string schema = output.Attribute("schema").Value;
code.Add("\tO" + index + ": " + schema);
}
}
stage.Code = code.ToArray();
// ReSharper restore PossibleNullReferenceException
}
}
///
/// Parse the query plan for a Scope job.
///
/// Communication manager.
protected override void ParseQueryPlan(CommManager manager)
{
this.ParseAlgebra(manager);
this.ParseVertexDef(manager);
}
}
///
/// A collection describing all the vertices encountered so far in a job.
///
internal class JobVertices
{
///
/// Vertices indexed by numeric vertex id. A vertex can have multiple executions.
///
private readonly Dictionary> vertices;
///
/// Total number of vertices in collection.
///
private int count;
private readonly Dictionary> jobStages;
private readonly Dictionary vertexByGuid;
///
/// Create a collection representing the job vertices.
///
public JobVertices()
{
this.count = 0;
this.vertexByGuid = new Dictionary();
this.vertices = new Dictionary>();
this.jobStages = new Dictionary>();
this.jobStages.Add("All vertices", new List()); // this list holds all vertices
}
///
/// Number of stages in job.
///
public int ExecutedStageCount { get { return this.jobStages.Count - 2; } } // subtract GM and all vertices
///
/// The list of vertices in this stage.
///
/// Name of stage to return.
/// The vertices in the stage, or null if the stage is not found (e.g., it is a table).
public List GetStageVertices(string stagename)
{
if (this.jobStages.ContainsKey(stagename))
return this.jobStages[stagename];
return null;
}
///
/// The list of all stage names.
///
/// An iterator over the stage names.
public IEnumerable GetAllStageNames()
{
return this.jobStages.Keys;
}
///
/// Number of vertices in job.
///
public int Count
{
get { return this.count; }
}
///
/// Add a new vertex to this job.
///
/// Vertex description to add.
/// Stage name that the vertex belongs to.
public void Add(ExecutedVertexInstance vi)
{
int id = vi.Number;
List l;
if (vertices.ContainsKey(id))
l = vertices[id];
else
{
l = new List();
vertices.Add(id, l);
}
l.Add(vi);
this.count++;
string stage = vi.StageName;
List members;
if (this.jobStages.ContainsKey(stage))
members = this.jobStages[stage];
else
{
members = new List();
this.jobStages.Add(stage, members);
}
members.Add(vi);
if (!this.vertexByGuid.ContainsKey(vi.UniqueID))
this.vertexByGuid.Add(vi.UniqueID, vi);
this.jobStages["All vertices"].Add(vi);
}
///
/// Find the information associated with a given vertex and version.
///
/// Vertex number in job.
/// Vertex version.
/// Matching VertexInfo or null if no vertex exists.
public ExecutedVertexInstance FindVertex(int id, int version)
{
if (!this.vertices.ContainsKey(id))
return null;
List l = this.vertices[id];
foreach (ExecutedVertexInstance i in l)
if (i.Version == version)
return i;
return null;
}
///
/// Find a vertex from its guid. Currently very slow.
///
/// Vertex guid.
/// The vertex with the correct guid, or null.
public ExecutedVertexInstance FindVertexByGuid(string guid)
{
if (this.vertexByGuid.ContainsKey(guid))
return this.vertexByGuid[guid];
return null;
}
///
/// The set of all vertices in the job.
///
/// An enumerator over VertexInfo objects.
public IEnumerable AllVertices()
{
foreach (int key in this.vertices.Keys)
{
List l = this.vertices[key];
foreach (ExecutedVertexInstance vi in l)
yield return vi;
}
}
///
/// A vertex has received a new guid.
///
/// Executed vertex instance.
/// New guid.
internal void Remap(ExecutedVertexInstance vi, string newGuid)
{
if (!this.vertexByGuid.ContainsKey(newGuid))
this.vertexByGuid.Add(newGuid, vi);
}
}
///
/// Brief description of a channel endpoint.
///
public class ChannelEndpointDescription
{
///
/// Is the endpoint of this channel an input?
///
public bool IsInput { get; protected set; }
///
/// The input/output number.
///
public int Number { get; protected set; }
///
/// Type of URI.
///
public string UriType { get; protected set; }
///
/// Part of URI without the type to the channel contents.
///
public string LocalPath { get; protected set; }
///
/// The actual data read/written so far (0 if it cannot be determined, e.g. FIFO, -1 if the channel data cannot be retrieved).
///
public long Size { get; set; }
///
/// How much of the channel was
///
public long TotalSize { get; set; }
///
/// String representation of the endpoint.
///
public override string ToString()
{
string uritype = this.UriType;
string localpath = this.LocalPath;
if (this.TotalSize == 0)
return string.Format("{0,4} {1,20:N0} {2}://{3}", this.Number, this.Size, uritype, localpath);
else
return string.Format("{0,4} {1,20:N0}/{2,20:N0} {3}://{4}", this.Number, this.Size, this.TotalSize, uritype, localpath);
}
///
/// Create a channel endpoint description
///
/// True if the channel endpoint is an input.
/// The input/output number.
/// URI to channel contents.
/// Relative uris will need this prefix appended.
/// If true the channel size is not computed (this is much faster).
/// Delegate used to report errors.
// ReSharper disable once UnusedParameter.Local
public ChannelEndpointDescription(bool isInput, int number, string uri, string uripathprefix, bool fast, StatusReporter reporter)
{
this.IsInput = isInput;
this.Number = number;
int sepindex = uri.IndexOf("://");
if (sepindex < 0)
throw new DryadException("Channel URI " + uri + " does not contain separator ://");
this.UriType = uri.Substring(0, sepindex);
// some HPC URIs use the compression scheme as an "option" (not really defined for file:// uris, but...)
// strip it here
int option = uri.IndexOf('?');
if (option >= 0)
{
uri = uri.Substring(0, option);
}
this.LocalPath = uri.Substring(sepindex + 3).Trim();
if (uripathprefix != null) {
// Unfortunately the uri is absolute, although it should be relative sometimes. We fix this here.
this.LocalPath = Path.Combine(uripathprefix, this.LocalPath);
}
if (fast)
this.Size = 0;
else
{
switch (this.UriType)
{
case "file":
if (File.Exists(this.LocalPath))
{
this.Size = new FileInfo(this.LocalPath).Length;
}
else
{
this.Size = -1;
}
break;
default:
this.Size = 0;
break;
}
}
}
///
/// Create a channel endpoint description from Scope information.
///
/// True if the channel endpoint is an input.
/// The input/output number.
/// URI to channel contents.
/// Size of channel if known.
public ChannelEndpointDescription(bool isInput, int number, string uri, long size)
{
this.IsInput = isInput;
this.Number = number;
int sepindex = uri.IndexOf("://");
if (sepindex < 0)
throw new ClusterException("Channel URI " + uri + " does not contain separator ://");
this.UriType = uri.Substring(0, sepindex);
this.LocalPath = uri.Substring(sepindex + 3);
this.TotalSize = size;
}
}
///
/// An instance of an executed vertex (each vertex may execute multiple times).
///
public class ExecutedVertexInstance
{
///
/// State that the vertex is in.
///
public enum VertexState
{
///
/// Scheduled but never executed (duplicate scheduling abandoned).
///
Abandoned,
///
/// Vertex has been created but not started.
///
Created,
///
/// Vertex has started running, but has not yet completed.
///
Started,
///
/// Vertex has been cancelled.
///
Cancelled,
///
/// Vertex has been cancelled after completing successfully.
///
Invalidated,
///
/// Vertex has completed successfully.
///
Successful,
///
/// Vertex has failed.
///
Failed,
///
/// Vertex has been cancelled by the scheduler.
///
Revoked,
///
/// Vertex state is not yet known.
///
Unknown,
};
///
/// State the vertex is in.
///
public VertexState State { get; protected set; }
///
/// The error message related to this vertex.
///
string error;
///
/// Directory where the vertex executed.
///
public IClusterResidentObject WorkDirectory { get; protected set; }
///
/// Amount of data read by vertex (may be unknown, then it's -1).
///
public long DataRead { get; internal set; }
///
/// Amount of data written by vertex (may be unknown, then it's -1).
///
public long DataWritten { get; internal set; }
///
/// On some platforms this is a guid, but not always. At least the identifier is unique per job.
///
public string UniqueID { get; protected set; }
///
/// String representation of the cluster configuration type.
///
private readonly string ClusterConfigType;
// 0x830a0017
static readonly Regex errorCodeRegex = new Regex(@"Vertex Had Errors, \(.*)\
if (m.Success)
this.ExitCode = m.Groups[1].Value;
else
this.ExitCode = status;
this.SetState(VertexState.Failed);
}
else if (status.ToLower().StartsWith("vertex received termination"))
{
this.SetState(VertexState.Cancelled);
}
else
{
this.SetState(VertexState.Failed);
}
if (time != DateTime.MinValue)
this.End = time;
return this.State == VertexState.Failed;
}
///
/// Additional information about the vertex error.
///
/// Information about error.
internal void AddErrorString(string s)
{
if (this.error == null)
this.error = "";
this.error += "\n" + s;
}
///
/// Exit code of vertex.
///
public string ExitCode { get; set; }
///
/// Return the error string.
///
/// The error string as reported to the JM.
public string ErrorString
{
get
{
return this.error;
}
set
{
if (string.IsNullOrEmpty(this.error))
this.error = value;
else
this.error += " " + value;
}
}
///
/// Parse a part of the 'originalInfo.txt' file to discover a set of channel endpoints.
///
/// Stream reader which contains the channel information.
/// The list of channels, or null on failure.
/// If the channel is an output, prefix the path with this; this is null for inputs.
/// If true, do not return anything (still useful to advance the stream reader).
/// If true the channel sizes are not discovered; this is much faster, since no remote machines are queried for files.
/// Communication manager.
private Dictionary DiscoverOriginalInfoChannels(ISharedStreamReader sr, string uriprefix, bool skip, bool fast, CommManager manager)
{
bool isInput = uriprefix == null;
string countline = sr.ReadLine();
if (countline == null)
return null;
int channelCount;
int spaceIndex = countline.IndexOf(' ');
if (spaceIndex > 0)
countline = countline.Substring(0, spaceIndex);
bool success = int.TryParse(countline, out channelCount);
if (!success)
return null;
var channels = new Dictionary(channelCount);
for (int i = 0; i < channelCount; i++)
{
string channel = sr.ReadLine();
if (channel == null)
{
manager.Progress(100);
return null;
}
if (!skip)
{
ChannelEndpointDescription desc = new ChannelEndpointDescription(isInput, i, channel, uriprefix, fast, manager.Status);
channels.Add(i, desc);
manager.Progress(i * 100 / channelCount);
}
}
manager.Progress(100);
if (skip)
return null;
return channels;
}
///
/// Discover the vertex channels in a vertex-*-rerun file.
///
/// True if the discovery was successful.
/// If true discover the inputs.
/// If true discover the outputs.
/// If true do not discover the channel sizes (much faster).
/// Communication manager.
public bool DiscoverOriginalInfoChannels(bool inputs, bool outputs, bool fast, CommManager manager)
{
string filename = string.Format("vertex-{0}-{1}-rerun-originalInfo.txt", this.Number, this.Version);
bool success = true;
// The format of this file is fixed.
if (this.InputChannels != null)
// skip discovery
inputs = false;
ISharedStreamReader sr = this.WorkDirectory.GetFile(filename).GetStream(false);
var channels = this.DiscoverOriginalInfoChannels(sr, null, !inputs, fast, manager);
if (channels == null)
{
if (inputs)
success = false;
}
else
this.InputChannels = channels;
if (this.OutputChannels != null)
// skip discovery
outputs = false;
channels = this.DiscoverOriginalInfoChannels(sr, this.WorkDirectory.ToString(), !outputs, fast, manager);
if (channels == null)
{
if (outputs)
success = false;
}
else
this.OutputChannels = channels;
sr.Close();
return success;
}
///
/// Discover the vertex channels in a Scope-generated vcmdStart*xml file.
///
/// True if the discovery was successful.
/// If true discover the inputs.
/// If true discover the outputs.
/// If true do not discover the channel sizes (much faster).
/// Communication manager.
// ReSharper disable UnusedParameter.Global
public bool DiscoverScopeChannels(bool inputs, bool outputs, bool fast, CommManager manager)
// ReSharper restore UnusedParameter.Global
{
// find the xml file
var files = this.WorkDirectory.GetFilesAndFolders("vcmdStart*.xml").ToList();
if (files.Count != 1)
{
manager.Status("Cannot locate vcmdStart*.xml file", StatusKind.Error);
return false;
}
ISharedStreamReader sr = files.First().GetStream(false);
if (sr.Exception != null)
{
manager.Status("Error reading vcmdStart*.xml file" + sr.Exception.Message, StatusKind.Error);
return false;
}
// ReSharper disable PossibleNullReferenceException
XDocument plan = XDocument.Parse(sr.ReadToEnd(manager.Token));
if (inputs && this.InputChannels == null)
{
var channels = new Dictionary();
IEnumerable inputsData = plan.Root.Element("inputs").Elements();
int chno = 0;
foreach (var e in inputsData)
{
string chpath = e.Attribute("path").Value;
long size = long.Parse(e.Attribute("length").Value);
ChannelEndpointDescription desc = new ChannelEndpointDescription(true, chno, chpath, size);
channels.Add(chno, desc);
chno++;
}
this.InputChannels = channels;
}
if (outputs && this.OutputChannels == null)
{
var channels = new Dictionary();
IEnumerable inputsData = plan.Root.Element("outputs").Elements();
int chno = 0;
foreach (var e in inputsData)
{
string chpath = e.Attribute("path").Value;
ChannelEndpointDescription desc = new ChannelEndpointDescription(true, chno, chpath, -1);
channels.Add(chno, desc);
chno ++;
}
this.OutputChannels = channels;
}
// ReSharper restore PossibleNullReferenceException
sr.Close();
return true;
}
///
/// Discover the input and output channels of the vertex. Populates the 'InputChannels' and 'OutputChannel' lists.
///
/// True if the discovery was successful.
/// If true discover the inputs.
/// If true discover the outputs.
/// If true do not discover the channel sizes (much faster).
/// Communication manager.
public bool DiscoverChannels(bool inputs, bool outputs, bool fast, CommManager manager)
{
// check if the result is already cached
if ((this.InputChannels != null || !inputs) &&
(this.OutputChannels != null || !outputs))
return true;
if (this.WorkDirectory == null || this.WorkDirectory.Exception != null)
return false;
// The format of this file is fixed.
if (!this.channelsAreFinal)
{
// invalidate cache
this.InputChannels = null;
this.OutputChannels = null;
}
bool result;
IClusterResidentObject wd = this.WorkDirectory;
if (wd is FolderInCachedCluster)
{
wd = (wd as FolderInCachedCluster).OriginalFolder;
}
if (wd is UNCFile)
{
result = this.DiscoverOriginalInfoChannels(inputs, outputs, fast, manager);
}
else
{
result = false;
}
if (this.VertexIsCompleted)
this.channelsAreFinal = true;
return result;
}
///
/// If true the vertex is no longer running; some of its information can be cached.
///
public bool VertexIsCompleted
{
get
{
switch (this.State)
{
case VertexState.Cancelled:
case VertexState.Abandoned:
case VertexState.Failed:
case VertexState.Successful:
case VertexState.Invalidated:
case VertexState.Revoked:
return true;
default:
return false;
}
}
}
///
/// String representation of the executed vertex instance.
///
/// A string briefly describing the executed vertex instance.
public override string ToString()
{
return string.Format("Vertex {0}.{1} ({2}) status {3}", this.Number, this.Version, this.Name, this.State);
}
///
/// Replace the information in an executed vertex instance when a new vertex is created.
/// This can only happen in some cases when cancelled vertex numbers are reused.
///
/// New vertex name.
/// New vertex guid.
// ReSharper disable once UnusedParameter.Global
internal void Update(string name, string guid)
{
if (this.State != VertexState.Cancelled && this.State != VertexState.Abandoned)
throw new DryadException("Updating a non-cancelled/abandoned vertex");
if (this.Name != name)
throw new DryadException("Vertex changed name");
this.UniqueID = guid;
this.SetState(VertexState.Created);
// the stdoutfile is expected to change, so I don't invalidate the cache
}
///
/// Set the vertex state.
///
/// New vertex state.
public void SetState(VertexState state)
{
this.State = state;
bool cache = this.VertexIsCompleted;
if (this.StdoutFile != null)
this.StdoutFile.ShouldCacheLocally = cache;
if (this.LogDirectory != null)
this.LogDirectory.ShouldCacheLocally = cache;
if (this.WorkDirectory != null)
this.WorkDirectory.ShouldCacheLocally = cache;
}
///
/// A CSV header matching the AsCSV data.
///
/// A string describing the CSV header for a vertex executed instance.
public static string CSV_Header()
{
return "Name,Stage,Start,End,Running time,State,Data Read,Data Written,Version,Machine,Process ID";
}
///
/// CSV representation of the information about an executed vertex instance.
///
/// The information in CSV format, matching the CSV_Header.
public string AsCSV()
{
string start = this.Start != DateTime.MinValue ? this.Start.ToString("s") : "";
string end = this.End != DateTime.MinValue ? this.End.ToString("s") : "";
string running = this.RunningTime > TimeSpan.Zero ? this.RunningTime.ToString() : "";
return string.Format("{0},{1},{2},{3},{4},{5},{6},{7},{8},{9},{10}",
this.Name, this.StageName, start, end, running, this.State, this.DataRead, this.DataWritten, this.Version, this.Machine, this.ProcessIdentifier);
}
}
}