Dryad/LinqToDryad/DryadLinqQueryExplain.cs

786 lines
30 KiB
C#

/*
Copyright (c) Microsoft Corporation
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License
at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF
TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions and
limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using System.Reflection;
using System.Xml;
using System.Diagnostics;
using System.Xml.Linq;
using System.Drawing;
using System.Drawing.Drawing2D;
using Microsoft.Research.DryadLinq.Internal;
namespace Microsoft.Research.DryadLinq
{
/// <summary>
/// This class explains in detail the generated plan.
/// </summary>
internal sealed class DryadLinqQueryExplain
{
/// <summary>
/// Visit the set of nodes in the query plan and build an explanation of the plan.
/// </summary>
/// <param name="plan">Return plan description here.</param>
/// <param name="nodes">Nodes to explain.</param>
internal void CodeShowVisit(StringBuilder plan, DLinqQueryNode[] nodes)
{
HashSet<DLinqQueryNode> visited = new HashSet<DLinqQueryNode>();
foreach (DLinqQueryNode n in nodes)
{
CodeShowVisit(plan, n, visited);
}
}
/// <summary>
/// Helper for CodeShowVisit: do not revisit a node twice.
/// </summary>
/// <param name="plan">Return plan here.</param>
/// <param name="n">Node to explain.</param>
/// <param name="visited">Set of nodes already visited.</param>
private void CodeShowVisit(StringBuilder plan, DLinqQueryNode n, HashSet<DLinqQueryNode> visited)
{
if (visited.Contains(n)) return;
visited.Add(n);
foreach (DLinqQueryNode c in n.Children)
{
CodeShowVisit(plan, c, visited);
}
ExplainNode(plan, n);
}
/// <summary>
/// Explain one query node.
/// </summary>
/// <param name="plan">Return plan here.</param>
/// <param name="n">Node to explain.</param>
internal static void ExplainNode(StringBuilder plan, DLinqQueryNode n)
{
if (n is DLinqTeeNode || n is DLinqOutputNode ||
n is DLinqDoWhileNode || n is DLinqDummyNode)
{
return;
}
else if (n is DLinqInputNode)
{
plan.AppendLine("Input:");
plan.Append("\t");
n.BuildString(plan);
plan.AppendLine();
return;
}
plan.Append(n.m_vertexEntryMethod);
plan.AppendLine(":");
HashSet<DLinqQueryNode> allchildren = new HashSet<DLinqQueryNode>();
if (n is DLinqSuperNode)
{
DLinqSuperNode sn = n as DLinqSuperNode;
List<DLinqQueryNode> tovisit = new List<DLinqQueryNode>();
tovisit.Add(sn.RootNode);
while (tovisit.Count > 0)
{
DLinqQueryNode t = tovisit[0];
tovisit.RemoveAt(0);
if (!(t is DLinqSuperNode))
allchildren.Add(t);
foreach (DLinqQueryNode tc in t.Children)
{
if (!allchildren.Contains(tc) && sn.Contains(tc))
tovisit.Add(tc);
}
}
}
else
{
allchildren.Add(n);
}
foreach (DLinqQueryNode nc in allchildren.Reverse())
{
Expression expression = null; // expression to print
List<string> additional = new List<string>(); // additional arguments to print
int argsToSkip = 0;
string methodname = nc.OpName;
plan.Append("\t");
if (nc is DLinqMergeNode)
{
expression = ((DLinqMergeNode)nc).ComparerExpression;
}
else if (nc is DLinqHashPartitionNode)
{
DLinqHashPartitionNode hp = (DLinqHashPartitionNode)nc;
expression = hp.KeySelectExpression;
additional.Add(hp.NumberOfPartitions.ToString());
}
else if (nc is DLinqGroupByNode)
{
DLinqGroupByNode gb = (DLinqGroupByNode)nc;
expression = gb.KeySelectExpression;
if (gb.ElemSelectExpression != null)
additional.Add(DryadLinqExpression.Summarize(gb.ElemSelectExpression));
if (gb.ResSelectExpression != null)
additional.Add(DryadLinqExpression.Summarize(gb.ResSelectExpression));
if (gb.ComparerExpression != null)
additional.Add(DryadLinqExpression.Summarize(gb.ComparerExpression));
if (gb.SeedExpression != null)
additional.Add(DryadLinqExpression.Summarize(gb.SeedExpression));
if (gb.AccumulatorExpression != null)
additional.Add(DryadLinqExpression.Summarize(gb.AccumulatorExpression));
}
else if (nc is DLinqOrderByNode)
{
DLinqOrderByNode ob = (DLinqOrderByNode)nc;
expression = ob.KeySelectExpression;
if (ob.ComparerExpression != null)
additional.Add(DryadLinqExpression.Summarize(ob.ComparerExpression));
}
else if (nc is DLinqWhereNode)
{
expression = ((DLinqWhereNode)nc).WhereExpression;
}
else if (nc is DLinqSelectNode)
{
DLinqSelectNode s = (DLinqSelectNode)nc;
expression = s.SelectExpression;
if (s.ResultSelectExpression != null)
additional.Add(DryadLinqExpression.Summarize(s.ResultSelectExpression));
}
else if (nc is DLinqAggregateNode)
{
DLinqAggregateNode a = (DLinqAggregateNode)nc;
expression = a.FuncLambda;
if (a.SeedExpression != null)
additional.Add(DryadLinqExpression.Summarize(a.SeedExpression));
if (a.ResultLambda != null)
additional.Add(DryadLinqExpression.Summarize(a.ResultLambda));
}
else if (nc is DLinqPartitionOpNode)
{
expression = ((DLinqPartitionOpNode)nc).ControlExpression;
}
else if (nc is DLinqJoinNode)
{
DLinqJoinNode j = (DLinqJoinNode)nc;
expression = j.OuterKeySelectorExpression;
additional.Add(DryadLinqExpression.Summarize(j.InnerKeySelectorExpression));
additional.Add(DryadLinqExpression.Summarize(j.ResultSelectorExpression));
if (j.ComparerExpression != null)
additional.Add(DryadLinqExpression.Summarize(j.ComparerExpression));
}
else if (nc is DLinqDistinctNode)
{
expression = ((DLinqDistinctNode)nc).ComparerExpression;
}
else if (nc is DLinqContainsNode)
{
DLinqContainsNode c = (DLinqContainsNode)nc;
expression = c.ValueExpression;
if (c.ComparerExpression != null)
additional.Add(DryadLinqExpression.Summarize(c.ComparerExpression));
}
else if (nc is DLinqBasicAggregateNode)
{
expression = ((DLinqBasicAggregateNode)nc).SelectExpression;
}
else if (nc is DLinqConcatNode)
// nothing to do
{
}
else if (nc is DLinqSetOperationNode)
{
expression = ((DLinqSetOperationNode)nc).ComparerExpression;
}
else if (nc is DLinqRangePartitionNode)
{
DLinqRangePartitionNode r = (DLinqRangePartitionNode)nc;
expression = r.CountExpression;
// TODO: there's some other possible interesting info
}
else if (nc is DLinqApplyNode)
{
expression = ((DLinqApplyNode)nc).LambdaExpression;
}
else if (nc is DLinqForkNode)
{
expression = ((DLinqForkNode)nc).ForkLambda;
}
else if (nc is DLinqTeeNode)
{
// nothing
}
else if (nc is DLinqDynamicNode)
{
// nothing
}
else
{
expression = nc.QueryExpression;
}
if (expression is MethodCallExpression)
{
MethodCallExpression mc = (MethodCallExpression)expression;
methodname = mc.Method.Name; // overwrite methodname
// determine which arguments to skip
#region LINQMETHODS
switch (mc.Method.Name)
{
case "Aggregate":
case "AggregateAsQuery":
case "Select":
case "LongSelect":
case "SelectMany":
case "LongSelectMany":
case "OfType":
case "Where":
case "LongWhere":
case "First":
case "FirstOrDefault":
case "FirstAsQuery":
case "Single":
case "SingleOrDefault":
case "SingleAsQuery":
case "Last":
case "LastOrDefault":
case "LastAsQuery":
case "Distinct":
case "Any":
case "AnyAsQuery":
case "All":
case "AllAsQuery":
case "Count":
case "CountAsQuery":
case "LongCount":
case "LongCountAsQuery":
case "Sum":
case "SumAsQuery":
case "Min":
case "MinAsQuery":
case "Max":
case "MaxAsQuery":
case "Average":
case "AverageAsQuery":
case "GroupBy":
case "OrderBy":
case "OrderByDescending":
case "ThenBy":
case "ThenByDescending":
case "Take":
case "TakeWhile":
case "LongTakeWhile":
case "Skip":
case "SkipWhile":
case "LongSkipWhile":
case "Contains":
case "ContainsAsQuery":
case "Reverse":
case "Merge":
case "HashPartition":
case "RangePartition":
case "Fork":
case "ForkChoose":
case "AssumeHashPartition":
case "AssumeRangePartition":
case "AssumeOrderBy":
case "ToPartitionedTableLazy":
case "AddCacheEntry":
case "SlidingWindow":
case "ApplyWithPartitionIndex":
case "DoWhile":
argsToSkip = 1;
break;
case "Join":
case "GroupJoin":
case "Concat":
case "MultiConcat":
case "Union":
case "Intersect":
case "Except":
case "SequenceEqual":
case "SequenceEqualAsQuery":
case "Zip":
argsToSkip = 2;
break;
case "Apply":
case "ApplyPerPartition":
if (mc.Arguments.Count < 3)
argsToSkip = 1;
else
argsToSkip = 2;
break;
default:
throw DryadLinqException.Create(DryadLinqErrorCode.OperatorNotSupported,
String.Format(SR.OperatorNotSupported, mc.Method.Name),
expression);
}
#endregion
plan.Append(methodname);
plan.Append("(");
int argno = 0;
foreach (var arg in mc.Arguments)
{
argno++;
if (argno <= argsToSkip) continue;
if (argno > argsToSkip + 1)
{
plan.Append(",");
}
plan.Append(DryadLinqExpression.Summarize(arg));
}
plan.AppendLine(")");
}
else
{
// expression is not methodcall
plan.Append(methodname);
plan.Append("(");
if (expression != null)
{
plan.Append(DryadLinqExpression.Summarize(expression));
}
foreach (string e in additional)
{
plan.Append(",");
plan.Append(e);
}
plan.AppendLine(")");
}
}
}
/// <summary>
/// Explain a query plan in terms of elementary operations.
/// </summary>
/// <param name="gen">Query generator.</param>
/// <returns>A string explaining the plan.</returns>
internal string Explain(DryadLinqQueryGen gen)
{
StringBuilder plan = new StringBuilder();
gen.CodeGenVisit();
this.CodeShowVisit(plan, gen.QueryPlan());
return plan.ToString();
}
}
/// <summary>
/// Summary information about a job query plan.
/// </summary>
internal class DryadLinqJobStaticPlan
{
/// <summary>
/// Connection between two stages.
/// </summary>
public class Connection
{
/// <summary>
/// Arity of connection.
/// </summary>
public enum ConnectionType
{
/// <summary>
/// Point-to-point connection between two stages.
/// </summary>
PointToPoint,
/// <summary>
/// Cross-product connection between two stages.
/// </summary>
AllToAll
};
/// <summary>
/// Type of channel backing the connection.
/// </summary>
public enum ChannelType
{
/// <summary>
/// Persistent file.
/// </summary>
DiskFile,
/// <summary>
/// In-memory fifo.
/// </summary>
Fifo,
/// <summary>
/// TCP pipe.
/// </summary>
TCP
}
/// <summary>
/// Stage originating the connection.
/// </summary>
public Stage From { internal set; get; }
/// <summary>
/// Stage terminating the connection.
/// </summary>
public Stage To { internal set; get; }
/// <summary>
/// Type of connection.
/// </summary>
public ConnectionType Arity { get; internal set; }
/// <summary>
/// Type of channel backing the connection.
/// </summary>
public ChannelType ChannelKind { get; internal set; }
/// <summary>
/// Dynamic manager associated with the connection.
/// </summary>
public string ConnectionManager { get; internal set; }
/// <summary>
/// Color used to represent the connection.
/// </summary>
/// <returns>A string describing the color.</returns>
public string Color()
{
switch (this.ChannelKind)
{
case ChannelType.DiskFile:
return "black";
case ChannelType.Fifo:
return "red";
case ChannelType.TCP:
return "yellow";
default:
throw new Exception(String.Format(SR.UnknownChannelType, this.ChannelKind.ToString()));
}
}
}
/// <summary>
/// Per-node connection information (should be per-edge...)
/// </summary>
struct ConnectionInformation
{
/// <summary>
/// Type of connection.
/// </summary>
public Connection.ConnectionType Arity { get; internal set; }
/// <summary>
/// Type of channel backing the connection.
/// </summary>
public Connection.ChannelType ChannelKind { get; internal set; }
/// <summary>
/// Dynamic manager associated with the connection.
/// </summary>
public string ConnectionManager { get; internal set; }
}
/// <summary>
/// Information about a stage.
/// </summary>
public class Stage
{
/// <summary>
/// Stage name.
/// </summary>
public string Name { get; internal set; }
/// <summary>
/// Code executed in the stage.
/// </summary>
public string[] Code { get; internal set; }
/// <summary>
/// DryadLINQ operator implemented by the stage.
/// </summary>
public string Operator { get; internal set; }
/// <summary>
/// Number of vertices in stage.
/// </summary>
public int Replication { get; internal set; }
/// <summary>
/// Unique identifier.
/// </summary>
public int Id { get; set; }
/// <summary>
/// True if the stage is an input.
/// </summary>
public bool IsInput { get; internal set; }
/// <summary>
/// True if the stage is an output.
/// </summary>
public bool IsOutput { get; internal set; }
/// <summary>
/// True if the stage is a tee.
/// </summary>
public bool IsTee { get; internal set; }
/// <summary>
/// True if the stage is a concatenation.
/// </summary>
public bool IsConcat { get; internal set; }
/// <summary>
/// True if the stage is virtual (no real vertices synthesized).
/// </summary>
public bool IsVirtual { get { return this.IsInput || this.IsOutput || this.IsTee || this.IsConcat; } }
/// <summary>
/// Only defined for tables.
/// </summary>
public string Uri { get; internal set; }
/// <summary>
/// Only defined for tables.
/// </summary>
public string UriType { get; internal set; }
}
/// <summary>
/// File containing the plan.
/// </summary>
string xmlPlanFile;
/// <summary>
/// Map from stage id to stage.
/// </summary>
Dictionary<int, Stage> stages;
/// <summary>
/// List of inter-stage connections in the plan.
/// </summary>
List<Connection> connections;
/// <summary>
/// Store here per-node connection information (map from node id).
/// </summary>
Dictionary<int, ConnectionInformation> perNodeConnectionInfo;
/// <summary>
/// Create a dryadlinq job plan starting from an xml plan file.
/// </summary>
/// <param name="xmlPlanFile">Plan file to parse.</param>
public DryadLinqJobStaticPlan(string xmlPlanFile)
{
this.stages = new Dictionary<int, Stage>();
this.connections = new List<Connection>();
this.perNodeConnectionInfo = new Dictionary<int, ConnectionInformation>();
this.xmlPlanFile = xmlPlanFile;
this.ParseQueryPlan();
}
/// <summary>
/// Parse an XML query plan and represent that information.
/// </summary>
private void ParseQueryPlan()
{
if (!File.Exists(this.xmlPlanFile))
throw new Exception(String.Format( SR.CannotReadQueryPlan , this.xmlPlanFile));
XDocument plan = XDocument.Load(this.xmlPlanFile);
XElement query = plan.Root.Elements().Where(e => e.Name == "QueryPlan").First();
IEnumerable<XElement> vertices = query.Elements().Where(e => e.Name == "Vertex");
foreach (XElement v in vertices)
{
Stage stage = new Stage();
stage.Id = int.Parse(v.Element("UniqueId").Value);
stage.Replication = int.Parse(v.Element("Partitions").Value);
stage.Operator = v.Element("Type").Value;
stage.Name = v.Element("Name").Value;
{
string code = v.Element("Explain").Value;
stage.Code = code.Split('\n').
Skip(1). // drop stage name
Select(l => l.Trim()). // remove leading tab
ToArray();
}
this.stages.Add(stage.Id, stage);
{
// These should be connection attributes, not stage attributes.
string cht = v.Element("ChannelType").Value;
string connectionManager = v.Element("DynamicManager").Element("Type").Value;
string connection = v.Element("ConnectionOperator").Value;
ConnectionInformation info = new ConnectionInformation();
info.ConnectionManager = connectionManager;
switch (connection)
{
case "Pointwise":
info.Arity = Connection.ConnectionType.PointToPoint;
break;
case "CrossProduct":
info.Arity = Connection.ConnectionType.AllToAll;
break;
default:
throw new Exception(String.Format( SR.UnknownConnectionType , connection));
}
switch (cht)
{
case "DiskFile":
info.ChannelKind = Connection.ChannelType.DiskFile;
break;
case "TCPPipe":
info.ChannelKind = Connection.ChannelType.TCP;
break;
case "MemoryFIFO":
info.ChannelKind = Connection.ChannelType.Fifo;
break;
default:
throw new Exception(String.Format( SR.UnknownChannelType2 , cht));
}
this.perNodeConnectionInfo.Add(stage.Id, info);
}
switch (stage.Operator)
{
case "InputTable":
stage.IsInput = true;
stage.UriType = v.Element("StorageSet").Element("Type").Value;
stage.Uri = v.Element("StorageSet").Element("SourceURI").Value;
break;
case "OutputTable":
stage.IsOutput = true;
stage.UriType = v.Element("StorageSet").Element("Type").Value;
stage.Uri = v.Element("StorageSet").Element("SinkURI").Value;
break;
case "Tee":
stage.IsTee = true;
break;
case "Concat":
stage.IsConcat = true;
break;
default:
break;
}
if (v.Elements("Children").Count() == 0)
continue;
bool first = true;
IEnumerable<XElement> children = v.Element("Children").Elements().Where(e => e.Name == "Child");
foreach (XElement child in children)
{
// This code parallels the graphbuilder.cpp for XmlExecHost
Connection conn = new Connection();
int fromid = int.Parse(child.Element("UniqueId").Value);
ConnectionInformation fromConnectionInformation = this.perNodeConnectionInfo[fromid];
Stage from = this.stages[fromid];
conn.From = from;
conn.To = stage;
conn.ChannelKind = fromConnectionInformation.ChannelKind;
switch (fromConnectionInformation.ConnectionManager)
{
case "FullAggregator":
case "HashDistributor":
case "RangeDistributor":
// Ignore except first child
if (first)
{
first = false;
conn.ConnectionManager = fromConnectionInformation.ConnectionManager;
}
else
{
conn.ConnectionManager = "";
}
break;
case "PartialAggregator":
case "Broadcast":
// All children have the same connection manager
conn.ConnectionManager = fromConnectionInformation.ConnectionManager;
break;
case "Splitter":
// The connection manager depends on the number of children
if (first)
{
first = false;
if (children.Count() == 1)
conn.ConnectionManager = fromConnectionInformation.ConnectionManager;
else
conn.ConnectionManager = "SemiSplitter";
}
else
{
conn.ConnectionManager = "";
}
break;
case "None":
case "":
break;
}
conn.Arity = fromConnectionInformation.Arity;
this.connections.Add(conn);
}
}
}
/// <summary>
/// Find the stage given the stage id as a string.
/// </summary>
/// <param name="stageId">Stage id.</param>
/// <returns>A handle to the stage with the specified static Id.</returns>
public Stage GetStageByStaticId(string stageId)
{
int id = int.Parse(stageId);
return this.stages[id];
}
/// <summary>
/// Find the stage given the stage name.
/// </summary>
/// <param name="name">Name of stage to return.</param>
/// <returns>The stage with the given name or null.</returns>
public Stage GetStageByName(string name)
{
foreach (Stage s in this.stages.Values)
{
if (s.Name.Equals(name))
return s;
}
return null;
}
/// <summary>
/// The list of all stages in the plan.
/// </summary>
/// <returns>An iterator over the list of stages.</returns>
public IEnumerable<Stage> GetAllStages()
{
return this.stages.Values;
}
/// <summary>
/// The list of all connections in the plan.
/// </summary>
/// <returns>An iterator over a list of connections.</returns>
public IEnumerable<Connection> GetAllConnections()
{
return this.connections;
}
}
}