Dryad/LinqToDryad/DryadLinqQuery.cs

693 lines
26 KiB
C#

/*
Copyright (c) Microsoft Corporation
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License
at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF
TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions and
limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Linq;
using System.Diagnostics;
using System.Linq.Expressions;
using System.Reflection;
using System.Globalization;
using Microsoft.Research.DryadLinq.Internal;
namespace Microsoft.Research.DryadLinq
{
// The base provider for all DryadLinq queries.
// All IQueryable extension methods ask for (queryable.Provider) and then call provider.CreateQuery(expr)
internal abstract class DryadLinqProviderBase : IQueryProvider
{
private DryadLinqContext m_context;
internal DryadLinqProviderBase(DryadLinqContext context)
{
this.m_context = context;
}
internal DryadLinqContext Context { get { return this.m_context; } }
public abstract IQueryable<TElement> CreateQuery<TElement>(Expression expression);
public abstract IQueryable CreateQuery(Expression expression);
public abstract TResult Execute<TResult>(Expression expression);
public abstract object Execute(Expression expression);
}
// The provider for DryadLinq queries that will be executed by the LocalDebug infrastructure.
internal sealed class DryadLinqLocalProvider : DryadLinqProviderBase
{
private IQueryProvider m_linqToObjectsProvider;
public DryadLinqLocalProvider(IQueryProvider linqToObjectsProvider, DryadLinqContext context)
: base(context)
{
this.m_linqToObjectsProvider = linqToObjectsProvider;
}
//Always throw for untyped call.
public override IQueryable CreateQuery(Expression expression)
{
MethodCallExpression callExpr = expression as MethodCallExpression;
if (callExpr == null)
{
throw new DryadLinqException(DryadLinqErrorCode.ExpressionMustBeMethodCall,
SR.ExpressionMustBeMethodCall);
}
string methodName = callExpr.Method.Name;
throw new DryadLinqException(DryadLinqErrorCode.UntypedProviderMethodsNotSupported,
String.Format(SR.UntypedProviderMethodsNotSupported, methodName));
}
//Always throw for untyped call.
public override object Execute(Expression expression)
{
return this.CreateQuery(expression);
}
public override IQueryable<TElement> CreateQuery<TElement>(Expression expression)
{
var localQuery = this.m_linqToObjectsProvider.CreateQuery<TElement>(expression);
return new DryadLinqLocalQuery<TElement>(this, localQuery);
}
public override TResult Execute<TResult>(Expression expression)
{
return this.m_linqToObjectsProvider.Execute<TResult>(expression);
}
}
// The IQueryable<T> that is used for LocalDebug queries.
// This is much simpler than DryadLinqQuery<T> as it only has to support fallback to LINQ-to-objects.
internal sealed class DryadLinqLocalQuery<T> : IOrderedQueryable<T>, IEnumerable<T>, IOrderedQueryable
{
private IQueryProvider m_queryProvider;
private IQueryable<T> m_localQuery;
public DryadLinqLocalQuery(IQueryProvider queryProvider, IQueryable<T> localQuery)
{
this.m_queryProvider = queryProvider;
this.m_localQuery = localQuery;
}
public Expression Expression
{
get { return this.m_localQuery.Expression; }
}
Type IQueryable.ElementType
{
get { return typeof(T); }
}
IQueryProvider IQueryable.Provider
{
get { return this.m_queryProvider; }
}
public IEnumerator<T> GetEnumerator()
{
return this.m_localQuery.GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
}
// The provider for DryadLinq queries that will be executed by the cluster infrastructure.
internal class DryadLinqProvider : DryadLinqProviderBase
{
internal DryadLinqProvider(DryadLinqContext context)
: base(context)
{
}
public override IQueryable CreateQuery(Expression expression)
{
MethodCallExpression callExpr = expression as MethodCallExpression;
if (callExpr == null)
{
throw new DryadLinqException(DryadLinqErrorCode.ExpressionMustBeMethodCall,
SR.ExpressionMustBeMethodCall);
}
string methodName = callExpr.Method.Name;
throw new DryadLinqException(DryadLinqErrorCode.UntypedProviderMethodsNotSupported,
String.Format(SR.UntypedProviderMethodsNotSupported, methodName));
}
public override IQueryable<TElement> CreateQuery<TElement>(Expression expression)
{
return new DryadLinqQuery<TElement>(this, expression);
}
// This is the IQueryProvider.Execute() method used for execution
// when a single value is produced (rather than an enumerable)
public override object Execute(Expression expression)
{
return this.CreateQuery(expression); // which will throw.
}
// This is the IQueryProvider.Execute() method used for execution
// when a single value is produced (rather than an enumerable)
public override TResult Execute<TResult>(Expression expression)
{
MethodCallExpression callExpr = expression as MethodCallExpression;
if (callExpr == null)
{
throw new ArgumentException(String.Format(SR.ExpressionMustBeMethodCall,
DryadLinqExpression.Summarize(expression)),
"expression");
}
string methodName = callExpr.Method.Name;
if (methodName == "FirstOrDefault" ||
methodName == "SingleOrDefault" ||
methodName == "LastOrDefault")
{
Type elemType = typeof(AggregateValue<>).MakeGenericType(expression.Type);
Type qType = typeof(DryadLinqQuery<>).MakeGenericType(elemType);
AggregateValue<TResult> res = ((IEnumerable<AggregateValue<TResult>>)
Activator.CreateInstance(
qType,
BindingFlags.NonPublic | BindingFlags.Instance,
null,
new object[] { this, expression },
CultureInfo.CurrentCulture
)).Single();
if (res.Count == 0) return default(TResult);
return res.Value;
}
else
{
Type qType = typeof(DryadLinqQuery<>).MakeGenericType(expression.Type);
return ((IEnumerable<TResult>)Activator.CreateInstance(
qType,
BindingFlags.NonPublic | BindingFlags.Instance,
null,
new object[] { this, expression },
CultureInfo.CurrentCulture
)).Single();
}
}
}
internal abstract class DryadLinqQuery : IQueryable
{
protected DryadLinqProviderBase m_queryProvider;
private DataProvider m_dataProvider;
private bool m_isTemp;
private DryadLinqJobExecutor m_queryExecutor;
internal DryadLinqQuery(DryadLinqProviderBase queryProvider,
DataProvider dataProvider)
{
this.m_queryProvider = queryProvider;
this.m_dataProvider = dataProvider;
this.m_isTemp = false;
this.m_queryExecutor = null;
}
//if non-null, this provided a data-backed DLQ that should be used in place of (this).
//query-execution causes a _backingData field to be set for the DLQ nodes that were "executed".
internal abstract DryadLinqQuery BackingData { get; set; }
internal bool IsDataBacked
{
get { return this.BackingData != null; }
}
public abstract Type ElementType { get; }
public abstract Expression Expression { get; }
internal abstract bool IsPlainData { get; }
internal abstract Uri DataSourceUri { get; }
internal abstract LambdaExpression Deserializer { get; }
internal abstract bool IsDynamic { get; }
internal abstract int PartitionCount { get; }
internal abstract DataSetInfo DataSetInfo { get; }
protected abstract IEnumerator IEnumGetEnumerator();
IEnumerator IEnumerable.GetEnumerator()
{
return this.IEnumGetEnumerator();
}
public IQueryProvider Provider
{
get { return this.m_queryProvider; }
set { this.m_queryProvider = (DryadLinqProviderBase)value; }
}
internal DataProvider DataProvider
{
get { return this.m_dataProvider; }
}
public DryadLinqContext Context
{
get { return this.m_queryProvider.Context; }
}
internal bool IsTemp
{
set { this.m_isTemp = value; }
}
internal DryadLinqJobExecutor QueryExecutor
{
get { return this.m_queryExecutor; }
set { this.m_queryExecutor = value; }
}
protected void CloneBase(DryadLinqQuery otherQuery)
{
if (otherQuery.m_queryProvider == null)
{
otherQuery.m_queryProvider = this.m_queryProvider;
}
if (otherQuery.m_dataProvider == null)
{
otherQuery.m_dataProvider = this.m_dataProvider;
}
otherQuery.m_isTemp = this.m_isTemp;
otherQuery.m_queryExecutor = this.m_queryExecutor;
}
internal virtual VertexCodeGen GetVertexCodeGen()
{
return new VertexCodeGen(this.m_queryProvider.Context);
}
}
// The IQueryable<T> that is used for cluster-execution queries.
internal class DryadLinqQuery<T>
: DryadLinqQuery, IOrderedQueryable<T>, IEnumerable<T>, IOrderedQueryable
{
// If BackingData is set, this is a normal query node that was executed and now has a
// "PlainData" DLQ available with the results.
private DryadLinqQuery<T> m_backingData;
private Expression m_queryExpression;
private Uri m_dataSourceUri;
private Expression<Func<Stream, IEnumerable<T>>> m_deserializer;
private DataSetInfo m_dataSetInfo;
private DryadLinqQueryEnumerable<T> m_tableEnumerable;
private bool m_isDynamic;
private bool m_initialized;
// Used by IQueryProvider. e.g., IQueryable<>.Select() and IQueryable<>.ToStore()
internal DryadLinqQuery(DryadLinqProviderBase provider, Expression expression)
: base(provider, null)
{
this.m_queryExpression = expression;
this.m_dataSourceUri = null;
this.m_deserializer = null;
this.m_dataSetInfo = null;
this.m_tableEnumerable = null;
this.m_isDynamic = false;
this.m_initialized = false;
}
// Used by DryadLinqContext.FromStore(uri)
internal DryadLinqQuery(DryadLinqContext context,
DataProvider dataProvider,
Uri dataSetUri,
Expression<Func<Stream, IEnumerable<T>>> deserializer)
: base(null, dataProvider)
{
if (!DataPath.IsValidDataPath(dataSetUri))
{
throw new DryadLinqException(DryadLinqErrorCode.UnrecognizedDataSource,
String.Format(SR.UnrecognizedDataSource, dataSetUri.AbsoluteUri));
}
this.m_dataSourceUri = dataSetUri;
this.m_deserializer = deserializer;
this.m_dataSetInfo = null;
this.m_isDynamic = false;
this.m_initialized = false;
this.m_tableEnumerable
= new DryadLinqQueryEnumerable<T>(context, this.DataProvider, this.m_dataSourceUri, this.m_deserializer);
// YY: query expression and provider are at least set consistently
if (context.LocalDebug)
{
this.m_queryExpression = Expression.Constant(this.m_tableEnumerable.AsQueryable());
IQueryProvider linqToObjectProvider = this.m_tableEnumerable.AsQueryable().Provider;
this.m_queryProvider = new DryadLinqLocalProvider(linqToObjectProvider, context);
}
else
{
this.m_queryExpression = Expression.Constant(this);
this.m_queryProvider = new DryadLinqProvider(context);
}
}
internal void Clone(DryadLinqQuery<T> otherQuery)
{
this.CloneBase(otherQuery);
otherQuery.m_backingData = this.m_backingData;
otherQuery.m_queryExpression = this.m_queryExpression;
otherQuery.m_dataSourceUri = this.m_dataSourceUri;
otherQuery.m_deserializer = this.m_deserializer;
otherQuery.m_dataSetInfo = this.m_dataSetInfo;
otherQuery.m_tableEnumerable = this.m_tableEnumerable;
otherQuery.m_isDynamic = this.m_isDynamic;
otherQuery.m_initialized = this.m_initialized;
}
// returns true for DLQ that are pointing directly at plain data.
// Note: plain-data DLQ might also have an executor associated with it.. the data
// wont be available unless the executor completes sucessfully.
internal override bool IsPlainData
{
get { return (this.m_dataSourceUri != null); }
}
internal override DryadLinqQuery BackingData
{
get { return this.m_backingData; }
set { this.m_backingData = (DryadLinqQuery<T>)value; }
}
public override Type ElementType
{
get { return typeof(T); }
}
internal override LambdaExpression Deserializer
{
get { return this.m_deserializer; }
}
// only legal/valid for plainData and data-backed DLQ.
internal override Uri DataSourceUri
{
get
{
if (this.IsPlainData)
{
this.CheckAndInitialize();
return this.m_dataSourceUri;
}
else if (this.IsDataBacked)
{
// as above, regarding CheckAndInitialize()
return this.m_backingData.DataSourceUri;
}
throw new DryadLinqException(DryadLinqErrorCode.OnlyAvailableForPhysicalData,
SR.OnlyAvailableForPhysicalData);
}
}
// Plain data: we create an expression to represent plain-data
// Data-backed query: we behave as if the IQueryable were just the backing data.
public override Expression Expression
{
get
{
if (this.IsPlainData)
{
this.CheckAndInitialize();
return this.m_queryExpression;
}
else if (this.IsDataBacked)
{
if (this.m_backingData.QueryExecutor != null)
{
this.CheckAndInitialize();
}
return this.m_backingData.Expression;
}
this.CheckAndInitialize();
return this.m_queryExpression;
}
}
internal override int PartitionCount
{
get
{
if (this.IsPlainData)
{
this.CheckAndInitialize();
return this.m_dataSetInfo.partitionInfo.Count;
}
else if (this.IsDataBacked)
{
return this.m_backingData.PartitionCount;
}
throw new DryadLinqException(DryadLinqErrorCode.OnlyAvailableForPhysicalData,
SR.OnlyAvailableForPhysicalData);
}
}
internal override bool IsDynamic
{
get
{
this.CheckAndInitialize();
return this.m_isDynamic;
}
}
internal override DataSetInfo DataSetInfo
{
get
{
if (this.IsPlainData)
{
this.CheckAndInitialize();
return this.m_dataSetInfo;
}
else if (this.IsDataBacked)
{
return this.m_backingData.DataSetInfo;
}
this.CheckAndInitialize();
return this.m_dataSetInfo;
}
}
internal void CheckAndInitialize()
{
if (this.QueryExecutor != null)
{
JobStatus status = this.QueryExecutor.WaitForCompletion();
if (status == JobStatus.Failure)
{
throw new DryadLinqException(DryadLinqErrorCode.JobToCreateTableFailed,
String.Format(SR.JobToCreateTableFailed,
this.QueryExecutor.ErrorMsg));
}
if (status == JobStatus.Cancelled)
{
throw new DryadLinqException(DryadLinqErrorCode.JobToCreateTableWasCanceled,
SR.JobToCreateTableWasCanceled);
}
if (status == JobStatus.Success)
{
DryadLinqClientLog.Add("Dataset " + this.m_dataSourceUri + " was created successfully.");
}
}
this.Initialize();
}
internal void Initialize()
{
if (this.IsPlainData && !this.m_initialized)
{
DryadLinqStreamInfo streamInfo = this.DataProvider.GetStreamInfo(this.Context, this.m_dataSourceUri);
Int32 parCount = streamInfo.PartitionCount;
Int64 estSize = streamInfo.DataSize;
// Finally load any stored metadata to check settings, extract compression-setting
// and initialize the DataInfo for this Query. It is uri.. have to convert to stream-name.
DryadLinqMetaData meta = DryadLinqMetaData.Get(this.Context, this.m_dataSourceUri);
if (meta != null)
{
//check the record-type matches meta-data. (disabled until final API is determined)
//if (meta.ElemType != typeof(T))
//{
// throw new DryadLinqException(DryadLinqErrorCode.MetadataRecordType,
// String.Format(SR.MetadataRecordType,
// typeof(T), meta.ElemType));
//}
//check the serialization flags match meta-data.
//(disabled as serialization flags are fixed. re-consider if user-settable.)
//if (StaticConfig.AllowNullFields != meta.AllowNullFields ||
// StaticConfig.AllowNullArrayElements != meta.AllowNullArrayElements ||
// StaticConfig.AllowNullRecords != meta.AllowNullRecords)
//{
// DryadLinqClientLog.Add("Warning: Table was generated with AllowNullFields=" +
// meta.AllowNullFields +
// ", AllowNullRecords=" + meta.AllowNullRecords +
// ", and AllowNullArrayElements=" +
// meta.AllowNullArrayElements);
//}
}
// Initialize the DataInfo -- currently we always initialize to the "nothing" datainfo.
PartitionInfo pinfo = new RandomPartition(parCount);
OrderByInfo oinfo = DataSetInfo.NoOrderBy;
DistinctInfo dinfo = DataSetInfo.NoDistinct;
this.m_dataSetInfo = new DataSetInfo(pinfo, oinfo, dinfo);
this.m_initialized = true;
}
}
protected override IEnumerator IEnumGetEnumerator()
{
return this.GetEnumerator();
}
// Use table if present, else start query to generate anonymous output table.
public IEnumerator<T> GetEnumerator()
{
// Process:
// 1. if this is plain-data, return an enumerator over the data.
// 2. if this is a data-backed-query, return an enumerator over the backing data
// 3. otherwise, start an anonymous query execution (which will produce a data-backed-query),
// and call GetEnumerator() again to hit the first path.
if (this.IsPlainData)
{
this.CheckAndInitialize();
return this.m_tableEnumerable.GetEnumerator();
}
else if (this.IsDataBacked)
{
return this.m_backingData.GetEnumerator();
}
else
{
this.ToTemporary();
return this.m_backingData.GetEnumerator();
}
}
private void ToTemporary()
{
// Execute this query and store the result in a temp location
Uri tableUri = this.Context.MakeTemporaryStreamUri();
DryadLinqQueryGen dryadGen = new DryadLinqQueryGen(
this.Context, this.GetVertexCodeGen(), this.m_queryExpression, tableUri, true);
DryadLinqQuery[] tables = dryadGen.Execute();
tables[0].IsTemp = true;
this.BackingData = tables[0];
}
// Generate the query plan as an XML file and return the queryPlan xml path.
internal string ToDryadLinqProgram()
{
Uri tableUri = this.Context.MakeTemporaryStreamUri();
DryadLinqQueryGen dryadGen = new DryadLinqQueryGen(
this.Context, this.GetVertexCodeGen(), this.m_queryExpression, tableUri, true);
return dryadGen.GenerateDryadProgram();
}
}
internal class DryadLinqQueryEnumerable<T> : IEnumerable<T>, IEnumerable
{
private DryadLinqContext m_context;
private DataProvider m_dataProvider;
private Uri m_dataSetUri;
private Expression<Func<Stream, IEnumerable<T>>> m_deserializer;
public DryadLinqQueryEnumerable(DryadLinqContext context,
DataProvider dataProvider,
Uri dataSetUri,
Expression<Func<Stream, IEnumerable<T>>> deserializer)
{
this.m_context = context;
this.m_dataProvider = dataProvider;
this.m_dataSetUri = dataSetUri;
this.m_deserializer = deserializer;
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public IEnumerator<T> GetEnumerator()
{
if (this.m_deserializer == null)
{
return new TableEnumerator(this.m_context, this.m_dataProvider, this.m_dataSetUri);
}
else
{
Func<Stream, IEnumerable<T>> deserializerFunc = this.m_deserializer.Compile();
Stream stream = this.m_dataProvider.Egress(this.m_context, this.m_dataSetUri);
IEnumerable<T> elems = deserializerFunc(stream);
return elems.GetEnumerator();
}
}
// Internal enumerator class
private class TableEnumerator : IEnumerator<T>
{
private T m_current;
private DryadLinqFactory<T> m_factory;
private DryadLinqRecordReader<T> m_reader;
internal TableEnumerator(DryadLinqContext context, DataProvider dataProvider, Uri dataSetUri)
{
this.m_current = default(T);
this.m_factory = (DryadLinqFactory<T>)DryadLinqCodeGen.GetFactory(context, typeof(T));
Stream stream = dataProvider.Egress(context, dataSetUri);
DryadLinqBlockStream nativeStream = new DryadLinqBlockStream(stream);
this.m_reader = this.m_factory.MakeReader(nativeStream);
this.m_reader.StartWorker();
}
public bool MoveNext()
{
return this.m_reader.ReadRecordAsync(ref this.m_current);
}
object IEnumerator.Current
{
get { return this.m_current; }
}
public T Current
{
get { return this.m_current; }
}
public void Reset()
{
throw new DryadLinqException("The stream doesn't support Reset");
}
void IDisposable.Dispose()
{
if (this.m_reader != null)
{
this.m_reader.Close();
}
}
}
}
}