/*
Copyright (c) Microsoft Corporation
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License
at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF
TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions and
limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Text;
using System.IO;
using System.Reflection;
using System.Threading;
using System.Linq;
using System.Linq.Expressions;
using System.CodeDom;
using System.Xml;
using System.Diagnostics;
using Microsoft.Research.DryadLinq.Internal;
using System.Globalization;
namespace Microsoft.Research.DryadLinq
{
///
/// This class handles code generation for multiple queries and execution invocation.
///
internal class DryadLinqQueryGen
{
internal const int StartPhaseId = -4;
private const string DryadLinqProgram = "DryadLinqProgram__.xml";
private const string QueryGraph = "QueryGraph__.txt";
private const string VertexHostExe = "VertexHost.exe";
private static int s_uniqueProgId = -1;
private static object s_queryGenLock = new Object();
private int m_currentPhaseId = StartPhaseId;
private int m_nextVertexId = 0;
private bool m_codeGenDone = false;
private DryadLinqCodeGen m_codeGen;
private Expression[] m_queryExprs;
private DLinqQueryNode[] m_queryPlan1;
private DLinqQueryNode[] m_queryPlan2;
private DLinqQueryNode[] m_queryPlan3;
private Uri[] m_outputTableUris;
private bool[] m_isTempOutput;
private Uri[] m_outputDatapaths;
private Type[] m_outputTypes;
private QueryNodeInfo[] m_queryNodeInfos;
private DryadLinqQuery[] m_outputTables;
private string m_DryadLinqProgram;
private string m_queryGraph;
private Dictionary m_exprNodeInfoMap;
private Dictionary m_referencedQueryMap;
private Dictionary m_inputUriMap;
private Dictionary m_outputUriMap;
private DryadLinqJobExecutor m_queryExecutor;
private DryadLinqContext m_context;
// This constructor is specifically to support enumeration of a query.
// It assumes that the Expressions all terminate with a ToStore node.
internal DryadLinqQueryGen(DryadLinqContext context,
VertexCodeGen vertexCodeGen,
Expression queryExpr,
Uri tableUri,
bool isTempOutput)
{
this.m_queryExprs = new Expression[] { queryExpr };
Uri fullTableUri = tableUri;
this.m_outputTableUris = new Uri[] { fullTableUri };
this.m_isTempOutput = new bool[] { isTempOutput };
this.m_context = context;
this.Initialize(vertexCodeGen);
}
// This constructor is specifically to support SubmitAndWait() calls.
// It assumes that the Expressions all terminate with a ToStore node.
internal DryadLinqQueryGen(DryadLinqContext context,
VertexCodeGen vertexCodeGen,
Expression[] qlist)
{
this.m_queryExprs = new Expression[qlist.Length];
this.m_outputTableUris = new Uri[qlist.Length];
this.m_isTempOutput = new bool[qlist.Length];
this.m_context = context;
for (int i = 0; i < this.m_queryExprs.Length; i++)
{
MethodCallExpression mcExpr = (MethodCallExpression)qlist[i];
this.m_queryExprs[i] = mcExpr.Arguments[0];
Uri tableUri;
if (mcExpr.Method.Name == ReflectedNames.DLQ_ToStore)
{
ExpressionSimplifier e2 = new ExpressionSimplifier();
tableUri = e2.Eval(mcExpr.Arguments[1]);
ExpressionSimplifier e3 = new ExpressionSimplifier();
this.m_isTempOutput[i] = e3.Eval(mcExpr.Arguments[2]);
}
else
{
throw new DryadLinqException("Internal error: The method must be " + ReflectedNames.DLQ_ToStore);
}
this.m_outputTableUris[i] = tableUri;
}
this.Initialize(vertexCodeGen);
}
private void Initialize(VertexCodeGen vertexCodeGen)
{
this.m_codeGen = new DryadLinqCodeGen(this.m_context, vertexCodeGen);
this.m_queryPlan1 = null;
this.m_queryPlan2 = null;
this.m_queryPlan3 = null;
this.m_DryadLinqProgram = null;
this.m_queryPlan1 = null;
this.m_exprNodeInfoMap = new Dictionary();
this.m_referencedQueryMap = new Dictionary();
this.m_inputUriMap = new Dictionary();
this.m_outputUriMap = new Dictionary();
this.m_queryExecutor = new DryadLinqJobExecutor(this.m_context);
// Initialize the data structures for the output tables
this.m_outputTypes = new Type[this.m_queryExprs.Length];
this.m_outputDatapaths = new Uri[this.m_queryExprs.Length];
this.m_queryNodeInfos = new QueryNodeInfo[this.m_queryExprs.Length];
for (int i = 0; i < this.m_queryExprs.Length; i++)
{
this.m_queryNodeInfos[i] = this.BuildNodeInfoGraph(this.m_queryExprs[i]);
this.m_queryNodeInfos[i] = new DummyQueryNodeInfo(this.m_queryExprs[i], false, this.m_queryNodeInfos[i]);
this.m_outputDatapaths[i] = this.m_outputTableUris[i];
if (!DataPath.IsValidDataPath(this.m_outputDatapaths[i]))
{
throw new DryadLinqException(DryadLinqErrorCode.UnrecognizedDataSource,
String.Format(SR.UnrecognizedDataSource,
this.m_outputTableUris[i].AbsoluteUri));
}
}
}
internal DryadLinqContext Context
{
get { return this.m_context; }
}
internal DryadLinqCodeGen CodeGen
{
get { return this.m_codeGen; }
}
internal Dictionary ReferencedQueryMap
{
get { return this.m_referencedQueryMap; }
}
///
/// Probes the running assembly and its dependencies, and throws an exception
/// if any of them is targetted to x86. Returns silently if all managed assemblies
/// in the list are x64 or AnyCPU. Native or unloadable binaries are ignored.
///
private void CheckAssemblyArchitectures()
{
// First create the list of assemblies to probe
// We use a stripped down version of the resource discovery logic in GenerateDryadProgram.
// i) We start with the same set of currently loaded binaries
// (== client app + its dependencies + dynamically loaded assemblies)
// ii) We take out user specified resource exclusions (this enables a workaround
// for x86 assemblies that must be loaded on the client side, think UI plugins,
// but aren't needed by the vertex code)
//
// The difference is we don't add the vertex DLL, or user resources.
List resourcesToExclude = new List();
resourcesToExclude.AddRange(this.m_context.ResourcesToRemove.Select(x => x.ToLower(CultureInfo.InvariantCulture)));
IEnumerable loadedAssemblyPaths
= TypeSystem.GetLoadedNonSystemAssemblyPaths().Select(x => x.ToLower(CultureInfo.InvariantCulture));
var asembliesToCheck = loadedAssemblyPaths.Where(path => !resourcesToExclude.Contains(path));
foreach (string path in asembliesToCheck)
{
Assembly asm = null;
try
{
asm = Assembly.ReflectionOnlyLoadFrom(path);
}
catch
{
// silently ignore load errors
}
if (asm != null)
{
PortableExecutableKinds peKind;
ImageFileMachine machine;
asm.ManifestModule.GetPEKind(out peKind, out machine);
// machine will always be reported as "I386" for both true x86 and AnyCPU assemblies
// peKind will have the "Required32Bit" flag set only for x86 binaries. Therefore we
// use peKind to make our decision.
if ((peKind & PortableExecutableKinds.Required32Bit) != 0)
{
string offendingAssemblyName = Path.GetFileName(path);
throw new DryadLinqException(DryadLinqErrorCode.Binaries32BitNotSupported,
String.Format(SR.Binaries32BitNotSupported, offendingAssemblyName));
}
}
}
}
internal DryadLinqQuery[] Execute()
{
lock (s_queryGenLock)
{
this.GenerateDryadProgram();
this.CheckAssemblyArchitectures();
// Invoke the background execution
this.m_queryExecutor.ExecuteAsync(this.m_DryadLinqProgram);
// Create the resulting partitioned tables
this.m_outputTables = new DryadLinqQuery[this.m_outputTableUris.Length];
MethodInfo minfo = typeof(Microsoft.Research.DryadLinq.DataProvider).GetMethod(
ReflectedNames.DataProvider_GetPartitionedTable,
BindingFlags.NonPublic | BindingFlags.Static);
for (int i = 0; i < this.m_outputTableUris.Length; i++)
{
MethodInfo minfo1 = minfo.MakeGenericMethod(this.m_outputTypes[i]);
object[] args = new object[] { this.m_context, this.m_outputTableUris[i] };
this.m_outputTables[i] = (DryadLinqQuery)minfo1.Invoke(null, args);
this.m_outputTables[i].QueryExecutor = this.m_queryExecutor;
}
return this.m_outputTables;
}
}
// Phase 1 of the query optimization
internal void GenerateQueryPlanPhase1()
{
if (this.m_queryPlan1 != null) return;
// Apply some simple rewrite rules
SimpleRewriter rewriter = new SimpleRewriter(this.m_exprNodeInfoMap.Values.ToList());
rewriter.Rewrite();
// Generate the query plan of phase1
var referencedNodes = this.m_referencedQueryMap.Values;
this.m_queryPlan1 = new DLinqQueryNode[this.m_queryExprs.Length + referencedNodes.Count];
for (int i = 0; i < this.m_queryExprs.Length; i++)
{
this.m_queryPlan1[i] = this.Visit(this.m_queryNodeInfos[i].Children[0].Child);
}
int idx = this.m_queryExprs.Length;
foreach (DummyQueryNodeInfo nodeInfo in referencedNodes)
{
if (nodeInfo.NeedsMerge)
{
// Add a Tee'd Merge
this.m_queryPlan1[idx] = this.Visit(nodeInfo.Children[0].Child);
DLinqQueryNode mergeNode = new DLinqMergeNode(true,
nodeInfo.QueryExpression,
this.m_queryPlan1[idx]);
this.m_queryPlan1[idx] = new DLinqTeeNode(mergeNode.OutputTypes[0], true,
mergeNode.QueryExpression, mergeNode);
}
else
{
this.m_queryPlan1[idx] = this.Visit(nodeInfo.Children[0].Child);
}
nodeInfo.QueryNode = this.m_queryPlan1[idx];
idx++;
}
// Finally, add the output nodes.
Dictionary forkCounts = new Dictionary();
for (int i = 0; i < this.m_queryExprs.Length; i++)
{
DLinqQueryNode queryNode = this.m_queryPlan1[i];
int cnt;
if (!forkCounts.TryGetValue(queryNode, out cnt))
{
cnt = queryNode.Parents.Count;
}
forkCounts[queryNode] = cnt + 1;
}
for (int i = 0; i < this.m_queryExprs.Length; i++)
{
DryadLinqClientLog.Add("Query " + i + " Output: " + this.m_outputDatapaths[i]);
DLinqQueryNode queryNode = this.m_queryPlan1[i];
if (TypeSystem.IsAnonymousType(queryNode.OutputTypes[0]))
{
throw new DryadLinqException(DryadLinqErrorCode.OutputTypeCannotBeAnonymous,
SR.OutputTypeCannotBeAnonymous);
}
// Add dummy Apply to make Dryad happy (it doesn't like to hook inputs straight to outputs)
if ((queryNode is DLinqInputNode) || (forkCounts[queryNode] > 1))
{
// Add a dummy Apply
Type paramType = typeof(IEnumerable<>).MakeGenericType(queryNode.OutputTypes[0]);
ParameterExpression param = Expression.Parameter(paramType, "x");
Type type = typeof(Func<,>).MakeGenericType(paramType, paramType);
LambdaExpression applyExpr = Expression.Lambda(type, param, param);
DLinqQueryNode applyNode = new DLinqApplyNode(applyExpr, this.m_queryExprs[i], queryNode);
applyNode.OutputDataSetInfo = queryNode.OutputDataSetInfo;
queryNode = applyNode;
}
if (queryNode is DLinqConcatNode)
{
// Again, we add dummy Apply in certain cases to make Dryad happy
((DLinqConcatNode)queryNode).FixInputs();
}
// Add the output node
CompressionScheme outputScheme = this.m_context.OutputDataCompressionScheme;
DLinqOutputNode outputNode = new DLinqOutputNode(this.m_context,
this.m_outputDatapaths[i],
this.m_isTempOutput[i],
outputScheme,
this.m_queryExprs[i],
queryNode);
this.m_queryPlan1[i] = outputNode;
if (this.m_outputUriMap.ContainsKey(this.m_outputDatapaths[i].AbsoluteUri.ToLower()))
{
throw new DryadLinqException(DryadLinqErrorCode.MultipleOutputsWithSameDscUri,
String.Format(SR.MultipleOutputsWithSameUri, this.m_outputDatapaths[i]));
}
this.m_outputUriMap.Add(this.m_outputDatapaths[i].AbsoluteUri.ToLower(), outputNode);
this.m_outputTypes[i] = this.m_queryPlan1[i].OutputTypes[0];
// Remove useless Tees to make Dryad happy
if ((queryNode is DLinqTeeNode) && (forkCounts[queryNode] == 1))
{
DLinqQueryNode teeChild = queryNode.Children[0];
teeChild.UpdateParent(queryNode, outputNode);
outputNode.UpdateChildren(queryNode, teeChild);
}
}
}
// Phase 2 of the query optimization
internal DLinqQueryNode[] GenerateQueryPlanPhase2()
{
if (this.m_queryPlan2 == null)
{
this.GenerateQueryPlanPhase1();
this.m_queryPlan2 = new DLinqQueryNode[this.m_queryPlan1.Length];
for (int i = 0; i < this.m_queryPlan1.Length; i++)
{
this.m_queryPlan2[i] = this.VisitPhase2(this.m_queryPlan1[i]);
}
this.m_currentPhaseId++;
}
return this.m_queryPlan2;
}
private DLinqQueryNode VisitPhase2(DLinqQueryNode node)
{
DLinqQueryNode resNode = node;
if (node.m_uniqueId == this.m_currentPhaseId)
{
if (node is DLinqForkNode)
{
// For now, we require every branch of a fork be used:
DLinqForkNode forkNode = (DLinqForkNode)node;
for (int i = 0; i < forkNode.Parents.Count; i++)
{
if ((forkNode.Parents[i] is DLinqTeeNode) &&
(forkNode.Parents[i].Parents.Count == 0))
{
throw DryadLinqException.Create(DryadLinqErrorCode.BranchOfForkNotUsed,
string.Format(SR.BranchOfForkNotUsed, i),
node.QueryExpression);
}
}
}
resNode = node.SuperNode;
if (resNode == null)
{
for (int i = 0; i < node.Children.Length; i++)
{
node.Children[i] = this.VisitPhase2(node.Children[i]);
}
resNode = node.PipelineReduce();
resNode.m_uniqueId++;
// Special treatment for DoWhile
DLinqDoWhileNode doWhile = resNode as DLinqDoWhileNode;
if (doWhile != null)
{
doWhile.Body = this.VisitPhase2(doWhile.Body);
doWhile.Cond = this.VisitPhase2(doWhile.Cond);
}
// Insert a Tee node if needed:
DLinqQueryNode outputNode = resNode.OutputNode;
if (outputNode.IsForked &&
!(outputNode is DLinqForkNode) &&
!(outputNode is DLinqTeeNode))
{
resNode = resNode.InsertTee(true);
}
}
}
return resNode;
}
// Phase 3 of the query optimization
internal DLinqQueryNode[] GenerateQueryPlanPhase3()
{
if (this.m_queryPlan3 == null)
{
this.GenerateQueryPlanPhase2();
this.m_queryPlan3 = this.m_queryPlan2;
for (int i = 0; i < this.m_queryPlan2.Length; i++)
{
this.VisitPhase3(this.m_queryPlan2[i]);
}
this.m_currentPhaseId++;
}
return this.m_queryPlan3;
}
private void VisitPhase3(DLinqQueryNode node)
{
if (node.m_uniqueId == this.m_currentPhaseId)
{
node.m_uniqueId++;
// Remove some useless Tee nodes
foreach (DLinqQueryNode child in node.Children)
{
if ((child is DLinqTeeNode) && !child.IsForked)
{
DLinqQueryNode teeChild = child.Children[0];
teeChild.UpdateParent(child, node);
node.UpdateChildren(child, teeChild);
}
}
// Remove some useless Merge nodes
if ((node is DLinqMergeNode) &&
!node.IsForked &&
!(node.Parents[0] is DLinqOutputNode) &&
!node.Children[0].IsDynamic &&
node.Children[0].PartitionCount == 1)
{
node.Children[0].UpdateParent(node, node.Parents[0]);
node.Parents[0].UpdateChildren(node, node.Children[0]);
}
// Add dynamic managers for tee nodes.
if ((StaticConfig.DynamicOptLevel & StaticConfig.NoDynamicOpt) != 0 &&
node is DLinqTeeNode &&
node.DynamicManager.ManagerType == DynamicManagerType.None)
{
// insert a dynamic broadcast manager on Tee
node.DynamicManager = DynamicManager.Broadcast;
}
// Recurse on the children of node
foreach (DLinqQueryNode child in node.Children)
{
this.VisitPhase3(child);
}
if (node is DLinqDoWhileNode)
{
this.VisitPhase3(((DLinqDoWhileNode)node).Body);
this.VisitPhase3(((DLinqDoWhileNode)node).Cond);
}
}
}
// The main method that generates the query plan.
internal DLinqQueryNode[] QueryPlan()
{
return this.GenerateQueryPlanPhase3();
}
// Generate the vertex code for all the query nodes.
internal void CodeGenVisit()
{
if (!this.m_codeGenDone)
{
DLinqQueryNode[] optimizedPlan = this.QueryPlan();
// make sure none of the outputs share a URI with inputs
foreach (var kvp in this.m_outputUriMap)
{
string outputPath = kvp.Key;
if(m_inputUriMap.ContainsKey(outputPath))
{
throw new DryadLinqException(DryadLinqErrorCode.OutputUriAlsoQueryInput,
String.Format(SR.OutputUriAlsoQueryInput, outputPath));
}
}
foreach (DLinqQueryNode node in optimizedPlan)
{
this.CodeGenVisit(node);
}
this.m_currentPhaseId++;
this.m_codeGenDone = true;
}
}
private void CodeGenVisit(DLinqQueryNode node)
{
if (node.m_uniqueId == this.m_currentPhaseId)
{
node.m_uniqueId++;
// We process the types first so that children will also know about all
// proxies/mappings that should be used.
node.CreateCodeAndMappingsForVertexTypes(false);
// Recurse on the children
foreach (DLinqQueryNode child in node.Children)
{
this.CodeGenVisit(child);
}
switch (node.NodeType)
{
case QueryNodeType.InputTable:
{
// vertex with no code
string t = ((DLinqInputNode)node).Table.DataSourceUri.AbsolutePath;
int index = t.LastIndexOf('/');
int bk = t.LastIndexOf('\\');
if (index < bk) index = bk;
node.m_vertexEntryMethod = t.Substring(index + 1);
break;
}
case QueryNodeType.OutputTable:
{
// vertex with no code
string t = ((DLinqOutputNode)node).OutputUri.AbsolutePath;
int index = t.LastIndexOf('/');
int bk = t.LastIndexOf('\\');
if (index < bk) index = bk;
int len = Math.Min(8, t.Length - index - 1);
node.m_vertexEntryMethod = t.Substring(index + 1, len);
break;
}
case QueryNodeType.Tee:
{
// vertex with no code
node.m_vertexEntryMethod = DryadLinqCodeGen.MakeUniqueName("Tee");
// broadcast manager code generation
if (node.DynamicManager.ManagerType != DynamicManagerType.None)
{
node.DynamicManager.CreateVertexCode();
}
break;
}
case QueryNodeType.Concat:
case QueryNodeType.Dummy:
{
// vertex with no code
node.m_vertexEntryMethod = DryadLinqCodeGen.MakeUniqueName(node.NodeType.ToString());
break;
}
case QueryNodeType.DoWhile:
{
// vertex with no code
node.m_vertexEntryMethod = DryadLinqCodeGen.MakeUniqueName(node.NodeType.ToString());
this.CodeGenVisit(((DLinqDoWhileNode)node).Body);
this.CodeGenVisit(((DLinqDoWhileNode)node).Cond);
break;
}
default:
{
CodeMemberMethod vertexMethod = this.m_codeGen.AddVertexMethod(node);
node.m_vertexEntryMethod = vertexMethod.Name;
node.DynamicManager.CreateVertexCode();
break;
}
}
}
}
// Assign unique ids to all the query nodes
private void AssignUniqueId()
{
if (this.m_currentPhaseId != -1)
{
//@@TODO: this should not be reachable. could change to Assert/InvalidOpEx
throw new DryadLinqException(DryadLinqErrorCode.Internal,
"Internal error: Optimization phase should be -1, not " +
this.m_currentPhaseId);
}
foreach (DLinqQueryNode node in this.QueryPlan())
{
this.AssignUniqueId(node);
}
}
private void AssignUniqueId(DLinqQueryNode node)
{
if (node.m_uniqueId == this.m_currentPhaseId)
{
foreach (Pair refChild in node.GetReferencedQueries())
{
this.AssignUniqueId(refChild.Value);
}
foreach (DLinqQueryNode child in node.Children)
{
this.AssignUniqueId(child);
}
if (node.m_uniqueId == this.m_currentPhaseId)
{
node.m_uniqueId = this.m_nextVertexId++;
// Special treatment for DoWhile
DLinqDoWhileNode doWhileNode = node as DLinqDoWhileNode;
if (doWhileNode != null)
{
this.AssignUniqueId(doWhileNode.Body);
this.AssignUniqueId(doWhileNode.Cond);
this.AssignUniqueId(doWhileNode.BodySource);
this.AssignUniqueId(doWhileNode.CondSource1);
this.AssignUniqueId(doWhileNode.CondSource2);
}
// Special treatment for Fork
if (node.OutputNode is DLinqForkNode)
{
foreach (DLinqQueryNode pnode in node.Parents)
{
if (pnode.m_uniqueId == this.m_currentPhaseId)
{
pnode.m_uniqueId = this.m_nextVertexId++;
}
}
}
}
}
}
private void CreateQueryPlan(XmlDocument queryDoc, XmlElement queryPlan)
{
this.AssignUniqueId();
HashSet seen = new HashSet();
foreach (DLinqQueryNode node in this.QueryPlan())
{
node.AddToQueryPlan(queryDoc, queryPlan, seen);
}
}
///
/// Find the pdb file associated with a given filename.
///
/// Filename with debugging information.
/// The associated pdb.
internal static string FindPDB(string filename)
{
string basename = Path.GetFileNameWithoutExtension(filename);
string directory = Path.GetDirectoryName(filename);
string pdbname = directory + Path.DirectorySeparatorChar + basename + ".pdb";
return (File.Exists(pdbname)) ? pdbname : null;
}
///
/// Add a resource to the Xml plan.
///
/// Document holding the xml plan.
/// Parent node.
/// Resource to add.
/// Handle to the inserted node.
private void AddResourceToPlan(XmlDocument queryDoc,
XmlElement parent,
string resource,
IEnumerable resourcesToExclude)
{
AddResourceToPlan_Core(queryDoc, parent, resource, resourcesToExclude);
if (this.m_context.CompileForVertexDebugging)
{
string pdb = FindPDB(resource);
if (pdb != null)
{
AddResourceToPlan_Core(queryDoc, parent, pdb, resourcesToExclude);
}
}
}
// Add a resource to the plan unless it was specifically in the exclusions list.
private void AddResourceToPlan_Core(XmlDocument queryDoc,
XmlElement parent,
string resource,
IEnumerable resourcesToExclude)
{
if (resourcesToExclude.Contains(resource, StringComparer.OrdinalIgnoreCase))
{
return;
}
XmlElement resourceElem = queryDoc.CreateElement("Resource");
resourceElem.InnerText = m_queryExecutor.AddResource(resource);
parent.AppendChild(resourceElem);
}
private void GenerateAppConfigResource(string appConfigPath)
{
// Generates an app config XML for the VertexHost which
//
// 1) specifies the server GC mode
//
// 2) requests a .NET runtime version equal to that of the client
// submitting the job (if the client has a .NET version higher than 3.5)
// also specifies as a fallback
// in case the cluster nodes don't have the client's newer .Net version
// (v2.0.50727 corresponds to .Net 3.5, both Sp1 and Sp2 pointing to this
// version ID, since it's actually the underlying CLR's version). This rule
// only kicks in if the query config has the MatchClientNetFrameworkVersion flag set.
//
// 3) disables Authenticode checks for the VH by specifying
// . This is necessary
// because some cluster nodes don't have an Internet connection, in which
// case which the Authenticode check leads to a 20sec delay during process startup.
//
// NOTE: useLegacyV2RuntimeActivationPolicy="true" is needed becuase the VH binary is mixed mode.
string clientVersionString = "";
if (Environment.Version.Major > 2 && this.m_context.MatchClientNetFrameworkVersion)
{
clientVersionString = String.Format(CultureInfo.InvariantCulture,
@" ",
Environment.Version.Major,
Environment.Version.Minor);
// NOTE: We use the "v4.0" syntax instead of the longer "v4.0.30319" because as of .NET4
// the format of this app config tag has been simplified to "major.minor"
}
string appConfigBody =
@"
";
appConfigBody += clientVersionString; // add the client specific version string if we generated one.
appConfigBody +=
@"
";
File.WriteAllText(appConfigPath, appConfigBody);
}
///
/// Generate the executable code.
///
/// The path to the queryPlanXml location
internal string GenerateDryadProgram()
{
DryadLinqObjectStore.Clear();
// Any resource that we try to add will be tested against this list.
IEnumerable resourcesToExclude = this.m_context.ResourcesToRemove;
// BuildDryadLinqAssembly:
// 1. Performs query optimizations
// 2. Generate vertex code for all the nodes in the query plan
this.m_codeGen.BuildDryadLinqAssembly(this);
// DryadQueryExplain explain = new DryadQueryExplain();
// DryadLinqClientLog.Add("{0}", explain.Explain(this));
// Finally, write out the query plan.
if (this.m_DryadLinqProgram == null)
{
int progId = Interlocked.Increment(ref s_uniqueProgId);
this.m_DryadLinqProgram = DryadLinqCodeGen.GetPathForGeneratedFile(DryadLinqProgram, progId);
this.m_queryGraph = DryadLinqCodeGen.GetPathForGeneratedFile(QueryGraph, progId);
}
XmlDocument queryDoc = new XmlDocument();
queryDoc.LoadXml("");
// Write the assembly version information
Assembly dryadlinqassembly = Assembly.GetExecutingAssembly();
AssemblyName asmName = dryadlinqassembly.GetName();
XmlElement elem = queryDoc.CreateElement("DryadLinqVersion");
elem.InnerText = asmName.Version.ToString();
queryDoc.DocumentElement.AppendChild(elem);
// Add the cluster name element
elem = queryDoc.CreateElement("ClusterName");
elem.InnerText = this.m_context.HeadNode;
queryDoc.DocumentElement.AppendChild(elem);
// Add the minimum number of nodes
int minComputeNodes = 1;
if (this.m_context.JobMinNodes.HasValue)
{
minComputeNodes = this.m_context.JobMinNodes.Value;
}
elem = queryDoc.CreateElement("MinimumComputeNodes");
elem.InnerText = minComputeNodes.ToString();
queryDoc.DocumentElement.AppendChild(elem);
// Add the maximum number of nodes
int maxComputeNodes = -1;
if (this.m_context.JobMaxNodes.HasValue)
{
maxComputeNodes = this.m_context.JobMaxNodes.Value;
}
elem = queryDoc.CreateElement("MaximumComputeNodes");
elem.InnerText = maxComputeNodes.ToString();
queryDoc.DocumentElement.AppendChild(elem);
// intermediate data compression
elem = queryDoc.CreateElement("IntermediateDataCompression");
elem.InnerText = ((int)this.m_context.IntermediateDataCompressionScheme).ToString();
queryDoc.DocumentElement.AppendChild(elem);
// Speculative Duplication Node
elem = queryDoc.CreateElement("EnableSpeculativeDuplication");
elem.InnerText = this.m_context.EnableSpeculativeDuplication.ToString();
queryDoc.DocumentElement.AppendChild(elem);
// Add the visualization element
//@@TODO[p2]: remove this element from the queryXML.
elem = queryDoc.CreateElement("Visualization");
elem.InnerText = "none";
queryDoc.DocumentElement.AppendChild(elem);
// Add the query name element
elem = queryDoc.CreateElement("QueryName");
if (String.IsNullOrEmpty(this.m_context.JobFriendlyName))
{
elem.InnerText = asmName.Name;
}
else
{
elem.InnerText = this.m_context.JobFriendlyName;
}
queryDoc.DocumentElement.AppendChild(elem);
// Add the XmlExecHostArgs element
elem = queryDoc.CreateElement("XmlExecHostArgs");
queryDoc.DocumentElement.AppendChild(elem);
// Add an element for each argument
string[] args = StaticConfig.XmlExecHostArgs.Split(new char[] {' '}, StringSplitOptions.RemoveEmptyEntries);
List xmlExecResources = new List();
for (int i = 0; i < args.Length; ++i)
{
string arg = args[i];
XmlElement argElem = queryDoc.CreateElement("Argument");
argElem.InnerText = arg;
elem.AppendChild(argElem);
if (arg.Equals("-bw") || arg.Equals("-r"))
{
xmlExecResources.Add(args[i+1]);
}
}
// Add the resources element
elem = queryDoc.CreateElement("Resources");
queryDoc.DocumentElement.AppendChild(elem);
// Add resource item for this LINQ DLL
AddResourceToPlan(queryDoc, elem, this.m_codeGen.GetTargetLocation(), resourcesToExclude);
// Add resource item for each loaded DLL that isn't a system DLL
IEnumerable loadedAssemblyPaths = TypeSystem.GetLoadedNonSystemAssemblyPaths();
foreach(string assemblyPath in loadedAssemblyPaths)
{
AddResourceToPlan(queryDoc, elem, assemblyPath, resourcesToExclude);
}
// Add the xmlExec resources
foreach (string resourcePath in xmlExecResources)
{
AddResourceToPlan(queryDoc, elem, resourcePath, resourcesToExclude);
}
// Add codegen resources
foreach (string resourcePath in this.m_codeGen.VertexCodeGen.ResourcesToAdd())
{
AddResourceToPlan(queryDoc, elem, resourcePath, resourcesToExclude);
}
// Create an app config file for the VertexHost process, and add it to the resources
string vertexHostAppConfigPath = DryadLinqCodeGen.GetPathForGeneratedFile(Path.ChangeExtension(VertexHostExe, "exe.config"), null);
GenerateAppConfigResource(vertexHostAppConfigPath);
AddResourceToPlan(queryDoc, elem, vertexHostAppConfigPath, resourcesToExclude);
// Save and add the object store as a resource
if (!DryadLinqObjectStore.IsEmpty)
{
DryadLinqObjectStore.Save();
AddResourceToPlan(queryDoc, elem, DryadLinqObjectStore.GetClientSideObjectStorePath(), resourcesToExclude);
}
// Add resource item for user-added resources
foreach (string userResource in this.m_context.ResourcesToAdd.Distinct(StringComparer.OrdinalIgnoreCase))
{
AddResourceToPlan(queryDoc, elem, userResource, resourcesToExclude);
}
// Add the query plan element
XmlElement queryPlanElem = queryDoc.CreateElement("QueryPlan");
queryDoc.DocumentElement.AppendChild(queryPlanElem);
// Add the query tree as a sequence of nodes
this.CreateQueryPlan(queryDoc, queryPlanElem);
// Finally, save the DryadQuery doc to a file
queryDoc.Save(this.m_DryadLinqProgram);
return this.m_DryadLinqProgram;
}
private void BuildReferencedQuery(Expression expr)
{
ExpressionQuerySet querySet = new ExpressionQuerySet();
querySet.Visit(expr);
foreach (Expression qexpr in querySet.QuerySet)
{
QueryNodeInfo nodeInfo = this.BuildNodeInfoGraph(qexpr);
this.m_referencedQueryMap[qexpr] = new DummyQueryNodeInfo(qexpr, true, nodeInfo);
}
}
private void BuildReferencedQuery(int startIdx, ReadOnlyCollection exprs)
{
ExpressionQuerySet querySet = new ExpressionQuerySet();
for (int i = startIdx; i < exprs.Count; i++)
{
querySet.Visit(exprs[i]);
}
foreach (Expression qexpr in querySet.QuerySet)
{
QueryNodeInfo nodeInfo = this.BuildNodeInfoGraph(qexpr);
this.m_referencedQueryMap[qexpr] = new DummyQueryNodeInfo(qexpr, true, nodeInfo);
}
}
// Document what the 'NodeInfo' and 'ReferencedQuery' system does
private QueryNodeInfo BuildNodeInfoGraph(Expression expression)
{
QueryNodeInfo resNodeInfo = null;
if (this.m_exprNodeInfoMap.TryGetValue(expression, out resNodeInfo))
{
return resNodeInfo;
}
MethodCallExpression mcExpr = expression as MethodCallExpression;
if (mcExpr != null && mcExpr.Method.IsStatic && TypeSystem.IsQueryOperatorCall(mcExpr))
{
switch (mcExpr.Method.Name)
{
case "Join":
case "GroupJoin":
case "Union":
case "Intersect":
case "Except":
case "Zip":
case "SequenceEqual":
case "SequenceEqualAsQuery":
{
QueryNodeInfo child1 = this.BuildNodeInfoGraph(mcExpr.Arguments[0]);
QueryNodeInfo child2 = this.BuildNodeInfoGraph(mcExpr.Arguments[1]);
resNodeInfo = new QueryNodeInfo(mcExpr, true, child1, child2);
this.BuildReferencedQuery(2, mcExpr.Arguments);
break;
}
case "Concat":
{
NewArrayExpression others = mcExpr.Arguments[1] as NewArrayExpression;
if (others == null)
{
QueryNodeInfo child1 = this.BuildNodeInfoGraph(mcExpr.Arguments[0]);
QueryNodeInfo child2 = this.BuildNodeInfoGraph(mcExpr.Arguments[1]);
resNodeInfo = new QueryNodeInfo(mcExpr, true, child1, child2);
}
else
{
QueryNodeInfo[] infos = new QueryNodeInfo[others.Expressions.Count + 1];
infos[0] = this.BuildNodeInfoGraph(mcExpr.Arguments[0]);
for (int i = 0; i < others.Expressions.Count; ++i)
{
infos[i + 1] = this.BuildNodeInfoGraph(others.Expressions[i]);
}
resNodeInfo = new QueryNodeInfo(mcExpr, true, infos);
}
break;
}
case "Apply":
case "ApplyPerPartition":
{
QueryNodeInfo child1 = this.BuildNodeInfoGraph(mcExpr.Arguments[0]);
if (mcExpr.Arguments.Count == 2)
{
// Apply with only one input source
resNodeInfo = new QueryNodeInfo(mcExpr, true, child1);
this.BuildReferencedQuery(mcExpr.Arguments[1]);
}
else
{
LambdaExpression lambda = DryadLinqExpression.GetLambda(mcExpr.Arguments[2]);
NewArrayExpression others = mcExpr.Arguments[1] as NewArrayExpression;
if (lambda.Parameters.Count == 2 && others == null)
{
// Apply with two input sources
QueryNodeInfo child2 = this.BuildNodeInfoGraph(mcExpr.Arguments[1]);
resNodeInfo = new QueryNodeInfo(mcExpr, true, child1, child2);
}
else
{
// Apply with multiple sources
QueryNodeInfo[] infos = new QueryNodeInfo[others.Expressions.Count + 1];
infos[0] = child1;
for (int i = 0; i < others.Expressions.Count; ++i)
{
infos[i + 1] = this.BuildNodeInfoGraph(others.Expressions[i]);
}
resNodeInfo = new QueryNodeInfo(mcExpr, true, infos);
}
this.BuildReferencedQuery(mcExpr.Arguments[2]);
}
break;
}
case "RangePartition":
{
QueryNodeInfo child1 = this.BuildNodeInfoGraph(mcExpr.Arguments[0]);
if (mcExpr.Arguments.Count == 6)
{
// This is a key part of handling for RangePartition( ... , IQueryable<> keysQuery, ...)
// The keys expression is established as a child[1] nodeInfo for the rangePartition node.
QueryNodeInfo child2 = this.BuildNodeInfoGraph(mcExpr.Arguments[2]);
resNodeInfo = new QueryNodeInfo(mcExpr, true, child1, child2);
}
else
{
resNodeInfo = new QueryNodeInfo(mcExpr, true, child1);
}
this.BuildReferencedQuery(mcExpr.Arguments[1]);
break;
}
case "DoWhile":
{
QueryNodeInfo child1 = BuildNodeInfoGraph(mcExpr.Arguments[0]);
Expression bodyExpr = mcExpr.Arguments[1];
QueryNodeInfo bodyNodeInfo = this.BuildNodeInfoGraph(bodyExpr);
Expression condExpr = mcExpr.Arguments[2];
QueryNodeInfo condNodeInfo = this.BuildNodeInfoGraph(condExpr);
Expression bodySourceExpr = mcExpr.Arguments[3];
QueryNodeInfo bodySourceNodeInfo = this.BuildNodeInfoGraph(bodySourceExpr);
Expression condSourceExpr1 = mcExpr.Arguments[4];
QueryNodeInfo condSourceNodeInfo1 = this.BuildNodeInfoGraph(condSourceExpr1);
Expression condSourceExpr2 = mcExpr.Arguments[5];
QueryNodeInfo condSourceNodeInfo2 = this.BuildNodeInfoGraph(condSourceExpr2);
resNodeInfo = new DoWhileQueryNodeInfo(mcExpr, bodyNodeInfo,
condNodeInfo,
bodySourceNodeInfo,
condSourceNodeInfo1, condSourceNodeInfo2,
child1);
break;
}
case "Dummy":
{
// Just return a QueryNodeInfo with no child.
resNodeInfo = new QueryNodeInfo(mcExpr, true);
break;
}
default:
{
QueryNodeInfo child1 = BuildNodeInfoGraph(mcExpr.Arguments[0]);
resNodeInfo = new QueryNodeInfo(mcExpr, true, child1);
this.BuildReferencedQuery(1, mcExpr.Arguments);
break;
}
}
}
if (resNodeInfo == null)
{
resNodeInfo = new QueryNodeInfo(expression, false);
}
this.m_exprNodeInfoMap.Add(expression, resNodeInfo);
return resNodeInfo;
}
private static bool IsMergeNodeNeeded(DLinqQueryNode node)
{
return node.IsDynamic || node.PartitionCount > 1;
}
internal DLinqQueryNode Visit(QueryNodeInfo nodeInfo)
{
Expression expression = nodeInfo.QueryExpression;
if (expression.NodeType == ExpressionType.Call)
{
MethodCallExpression mcExpr = (MethodCallExpression)expression;
if (mcExpr.Method.IsStatic && TypeSystem.IsQueryOperatorCall(mcExpr))
{
return this.VisitQueryOperatorCall(nodeInfo);
}
throw DryadLinqException.Create(DryadLinqErrorCode.OperatorNotSupported,
String.Format(SR.OperatorNotSupported, mcExpr.Method.Name),
expression);
}
else if (expression.NodeType == ExpressionType.Constant)
{
DLinqInputNode inputNode = new DLinqInputNode(this, (ConstantExpression)expression);
if (!this.m_inputUriMap.ContainsKey(inputNode.Table.DataSourceUri.AbsoluteUri.ToLower()))
{
this.m_inputUriMap.Add(inputNode.Table.DataSourceUri.AbsoluteUri.ToLower(), inputNode);
}
return inputNode;
}
else
{
string errMsg = "Can't handle expression of type " + expression.NodeType;
throw DryadLinqException.Create(DryadLinqErrorCode.UnsupportedExpressionsType,
String.Format(SR.UnsupportedExpressionsType,expression.NodeType),
expression);
}
}
private static bool IsLambda(Expression expr, int n)
{
LambdaExpression lambdaExpr = DryadLinqExpression.GetLambda(expr);
return (lambdaExpr != null && lambdaExpr.Parameters.Count == n);
}
// Checks if the child of the source node is a groupby node without any result selectors.
private static bool IsGroupByWithoutResultSelector(Expression source)
{
bool isGroupBy = false;
if (source.NodeType == ExpressionType.Call)
{
MethodCallExpression expression = (MethodCallExpression)source;
if (expression.Method.IsStatic &&
TypeSystem.IsQueryOperatorCall(expression) &&
expression.Method.Name == "GroupBy")
{
if (expression.Arguments.Count == 2)
{
isGroupBy = true;
}
else if (expression.Arguments.Count == 3)
{
isGroupBy = !IsLambda(expression.Arguments[2], 2);
}
else if (expression.Arguments.Count == 4)
{
isGroupBy = !(IsLambda(expression.Arguments[2], 2) ||
IsLambda(expression.Arguments[3], 2));
}
}
}
return isGroupBy;
}
private DLinqQueryNode CreateOffset(bool isLong, Expression queryExpr, DLinqQueryNode child)
{
// Count node
DLinqQueryNode countNode = new DLinqBasicAggregateNode(null, AggregateOpType.LongCount,
true, false, queryExpr, child);
// Apply node for x => Offsets(x)
Type paramType = typeof(IEnumerable<>).MakeGenericType(typeof(long));
ParameterExpression param = Expression.Parameter(paramType, "x");
MethodInfo minfo = typeof(DryadLinqEnumerable).GetMethod("Offsets");
Expression body = Expression.Call(minfo, param, Expression.Constant(isLong, typeof(bool)));
Type type = typeof(Func<,>).MakeGenericType(param.Type, body.Type);
LambdaExpression procFunc = Expression.Lambda(type, body, param);
DLinqQueryNode mergeCountNode = new DLinqMergeNode(true, queryExpr, countNode);
DLinqQueryNode offsetsNode = new DLinqApplyNode(procFunc, queryExpr, mergeCountNode);
// HashPartition
LambdaExpression keySelectExpr = IdentityFunction.Instance(typeof(IndexedValue));
int pcount = child.OutputPartition.Count;
DLinqQueryNode hdistNode = new DLinqHashPartitionNode(keySelectExpr, null, null, pcount,
false, queryExpr, offsetsNode);
DLinqQueryNode resNode = new DLinqMergeNode(false, queryExpr, hdistNode);
return resNode;
}
private DLinqQueryNode PromoteConcat(QueryNodeInfo source,
DLinqQueryNode sourceNode,
Func func)
{
DLinqQueryNode resNode = sourceNode;
if ((resNode is DLinqConcatNode) && !source.IsForked)
{
DLinqQueryNode[] children = resNode.Children;
DLinqQueryNode[] newChildren = new DLinqQueryNode[children.Length];
for (int i = 0; i < children.Length; i++)
{
children[i].Parents.Remove(resNode);
newChildren[i] = func(children[i]);
}
resNode = new DLinqConcatNode(source.QueryExpression, newChildren);
}
else
{
resNode = func(resNode);
}
return resNode;
}
private DLinqQueryNode VisitSelect(QueryNodeInfo source,
QueryNodeType nodeType,
LambdaExpression selector,
LambdaExpression resultSelector,
MethodCallExpression queryExpr)
{
DLinqQueryNode selectNode;
if (selector.Type.GetGenericArguments().Length == 2)
{
// If this select's child is a groupby node, push this select into its child, if
// 1. The groupby node is not tee'd, and
// 2. The groupby node has no result selector, and
// 3. The selector is decomposable
if (!source.IsForked &&
IsGroupByWithoutResultSelector(source.QueryExpression) &&
Decomposition.GetDecompositionInfoList(selector, m_codeGen) != null)
{
MethodCallExpression expr = (MethodCallExpression)source.QueryExpression;
LambdaExpression keySelectExpr = DryadLinqExpression.GetLambda(expr.Arguments[1]);
// Figure out elemSelectExpr and comparerExpr
LambdaExpression elemSelectExpr = null;
Expression comparerExpr = null;
if (expr.Arguments.Count == 3)
{
elemSelectExpr = DryadLinqExpression.GetLambda(expr.Arguments[2]);
if (elemSelectExpr == null)
{
comparerExpr = expr.Arguments[2];
}
}
else if (expr.Arguments.Count == 4)
{
elemSelectExpr = DryadLinqExpression.GetLambda(expr.Arguments[2]);
comparerExpr = expr.Arguments[3];
}
// Construct new query expression by building result selector expression
// and pushing it to groupby node.
selectNode = VisitGroupBy(source.Children[0].Child, keySelectExpr,
elemSelectExpr, selector, comparerExpr, queryExpr);
if (nodeType == QueryNodeType.SelectMany)
{
Type selectorRetType = selector.Type.GetGenericArguments()[1];
LambdaExpression id = IdentityFunction.Instance(selectorRetType);
selectNode = new DLinqSelectNode(nodeType, id, resultSelector, queryExpr, selectNode);
}
}
else
{
DLinqQueryNode child = this.Visit(source);
selectNode = this.PromoteConcat(
source, child,
x => new DLinqSelectNode(nodeType, selector, resultSelector, queryExpr, x));
}
}
else
{
// The "indexed" version
DLinqQueryNode child = this.Visit(source);
if (!child.IsDynamic && child.OutputPartition.Count == 1)
{
selectNode = this.PromoteConcat(
source, child,
x => new DLinqSelectNode(nodeType, selector, resultSelector, queryExpr, x));
}
else
{
child.IsForked = true;
// Create (x, y) => Select(x, y, selector)
Type ptype1 = typeof(IEnumerable<>).MakeGenericType(child.OutputTypes[0]);
Type ptype2 = typeof(IEnumerable<>).MakeGenericType(typeof(IndexedValue));
ParameterExpression param1 = Expression.Parameter(ptype1, DryadLinqCodeGen.MakeUniqueName("x"));
ParameterExpression param2 = Expression.Parameter(ptype2, DryadLinqCodeGen.MakeUniqueName("y"));
string methodName = queryExpr.Method.Name;
Type[] selectorTypeArgs = selector.Type.GetGenericArguments();
Type typeArg2 = selectorTypeArgs[selectorTypeArgs.Length - 1];
if (nodeType == QueryNodeType.SelectMany)
{
if (resultSelector != null)
{
methodName += "Result";
}
typeArg2 = typeArg2.GetGenericArguments()[0];
}
string targetMethodName = methodName + "WithStartIndex";
MethodInfo minfo = typeof(DryadLinqEnumerable).GetMethod(targetMethodName);
Expression body;
if (resultSelector == null)
{
minfo = minfo.MakeGenericMethod(child.OutputTypes[0], typeArg2);
body = Expression.Call(minfo, param1, param2, selector);
}
else
{
minfo = minfo.MakeGenericMethod(child.OutputTypes[0], typeArg2, resultSelector.Body.Type);
body = Expression.Call(minfo, param1, param2, selector, resultSelector);
}
Type type = typeof(Func<,,>).MakeGenericType(ptype1, ptype2, body.Type);
LambdaExpression procFunc = Expression.Lambda(type, body, param1, param2);
bool isLong = methodName.StartsWith("Long", StringComparison.Ordinal);
DLinqQueryNode offsetNode = this.CreateOffset(isLong, queryExpr, child);
selectNode = new DLinqApplyNode(procFunc, queryExpr, child, offsetNode);
}
}
return selectNode;
}
private DLinqQueryNode VisitWhere(QueryNodeInfo source,
LambdaExpression predicate,
MethodCallExpression queryExpr)
{
DLinqQueryNode child = this.Visit(source);
DLinqQueryNode whereNode;
if (predicate.Type.GetGenericArguments().Length == 2 ||
(!child.IsDynamic && child.OutputPartition.Count == 1))
{
whereNode = this.PromoteConcat(source, child, x => new DLinqWhereNode(predicate, queryExpr, x));
}
else
{
// The "indexed" version
// Create (x, y) => DryadWhere(x, y, predicate)
Type ptype1 = typeof(IEnumerable<>).MakeGenericType(child.OutputTypes[0]);
Type ptype2 = typeof(IEnumerable<>).MakeGenericType(typeof(IndexedValue));
ParameterExpression param1 = Expression.Parameter(ptype1, DryadLinqCodeGen.MakeUniqueName("x"));
ParameterExpression param2 = Expression.Parameter(ptype2, DryadLinqCodeGen.MakeUniqueName("y"));
string targetMethod = queryExpr.Method.Name + "WithStartIndex";
MethodInfo minfo = typeof(DryadLinqEnumerable).GetMethod(targetMethod);
minfo = minfo.MakeGenericMethod(child.OutputTypes[0]);
Expression body = Expression.Call(minfo, param1, param2, predicate);
Type type = typeof(Func<,,>).MakeGenericType(ptype1, ptype2, body.Type);
LambdaExpression procFunc = Expression.Lambda(type, body, param1, param2);
child.IsForked = true;
bool isLong = (queryExpr.Method.Name == "LongWhere");
DLinqQueryNode offsetNode = this.CreateOffset(isLong, queryExpr, child);
whereNode = new DLinqApplyNode(procFunc, queryExpr, child, offsetNode);
}
return whereNode;
}
private DLinqQueryNode VisitJoin(QueryNodeInfo outerSource,
QueryNodeInfo innerSource,
QueryNodeType nodeType,
LambdaExpression outerKeySelector,
LambdaExpression innerKeySelector,
LambdaExpression resultSelector,
Expression comparerExpr,
Expression queryExpr)
{
DLinqQueryNode outerChild = this.Visit(outerSource);
DLinqQueryNode innerChild = this.Visit(innerSource);
DLinqQueryNode joinNode = null;
Type keyType = outerKeySelector.Type.GetGenericArguments()[1];
if (comparerExpr == null && !TypeSystem.HasDefaultEqualityComparer(keyType))
{
throw DryadLinqException.Create(DryadLinqErrorCode.ComparerMustBeSpecifiedOrKeyTypeMustBeIEquatable,
string.Format(SR.ComparerMustBeSpecifiedOrKeyTypeMustBeIEquatable, keyType),
queryExpr);
}
// The comparer object:
object comparer = null;
if (comparerExpr != null)
{
ExpressionSimplifier