/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ using System; using System.Collections; using System.Collections.Generic; using System.IO; using System.Globalization; using System.Reflection; using System.Linq.Expressions; using System.Linq; using System.Diagnostics; using Microsoft.Research.DryadLinq.Internal; namespace Microsoft.Research.DryadLinq { /// /// This class extends LINQ with a set of new operators that are specific to DryadLINQ. /// The new operators includes partitioning operators (HashPartition and RangePartition) /// and the Apply operator that enables stateful transformations on datasets. /// public static class DryadLinqQueryable { internal static bool IsLocalDebugSource(IQueryable source) { return !(source.Provider is DryadLinqProvider); } /// /// Filters a sequence of values based on a predicate. Each element's index is used in /// the logic of the predicate function. /// /// The type of the elements of source /// The sequence of elements to filter /// The filter predicate. /// The elements in the input that satisfy the predicate public static IQueryable LongWhere(this IQueryable source, Expression> predicate) { if (source == null) { throw new ArgumentNullException("source"); } if (predicate == null) { throw new ArgumentNullException("predicate"); } if (IsLocalDebugSource(source)) { var result = DryadLinqEnumerable.LongWhere(source, predicate.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, result); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(predicate) } )); } /// /// Transforms each element of a sequence into a new form by applying a function of /// the element and its index. /// /// The type of the elements of source /// The type of the elements of the result /// The sequence of input elements /// A transform function to apply to each source element; /// the second parameter of the function represents the index of the source element. /// The sequence resulting from applying the transformation function on each input element public static IQueryable LongSelect(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var result = DryadLinqEnumerable.LongSelect(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, result); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Transforms each element of a sequence into an IEnumerable{T} by applying a function to the /// element and its index, and then flattens the resulting sequences into one sequence. /// /// The type of the elements of source /// The type of the elements of the result /// The sequence of input elements /// A transform function to apply to each source element; /// the second parameter of the function represents the index of the source element. /// The sequence resulting from applying the function on each input element and /// flattening the results public static IQueryable LongSelectMany(this IQueryable source, Expression>> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var result = DryadLinqEnumerable.LongSelectMany(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, result); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Transforms each element of a sequence into an IEnumerable{T} by applying a function to the /// element and its index, and then flattens the resulting sequences into one sequence. /// /// The type of the elements of source /// The type of the element in the intermediate IEnumerable sequences /// The type of the elements of the result /// The sequence of input elements /// A transform function to apply to each source element; /// the second parameter of the function represents the index of the source element. /// A transformation function to apply to each intermediate element /// The sequence resulting from applying selector to each input element and /// flattening and transforming the elements in the intermediate sequences public static IQueryable LongSelectMany(this IQueryable source, Expression>> selector, Expression> resultSelector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (resultSelector == null) { throw new ArgumentNullException("resultSelector"); } if (IsLocalDebugSource(source)) { var result = DryadLinqEnumerable.LongSelectMany(source, selector.Compile(), resultSelector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, result); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)), new Expression[] { source.Expression, Expression.Quote(selector), Expression.Quote(resultSelector) } )); } /// /// Returns the largest prefix of a sequence such that the elements satisfy a specified predicate. /// /// The element type of the input sequence /// The input sequence /// A predicate to test each element for a condition /// The largest prefix satisfying the predicate public static IQueryable LongTakeWhile(this IQueryable source, Expression> predicate) { if (source == null) { throw new ArgumentNullException("source"); } if (predicate == null) { throw new ArgumentNullException("predicate"); } if (IsLocalDebugSource(source)) { var result = DryadLinqEnumerable.LongTakeWhile(source, predicate.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, result); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(predicate) } )); } /// /// Skips elements in a sequence as long as a specified condition is true and then returns the /// remaining elements. The predicate is a function of an element and its index. /// /// The element type of the input sequence /// The input sequence /// A predicate to test each element for a condition /// The remaining sequence by skipping the elements in the head that satisfy the predicate public static IQueryable LongSkipWhile(this IQueryable source, Expression> predicate) { if (source == null) { throw new ArgumentNullException("source"); } if (predicate == null) { throw new ArgumentNullException("predicate"); } if (IsLocalDebugSource(source)) { var result = DryadLinqEnumerable.LongSkipWhile(source, predicate.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, result); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(predicate) } )); } /// /// Hash partition a dataset. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// The dataset to be partitioned /// The function to extract the key from a record /// An EqualityComparer on TKey to compare keys /// The number of partitions to create /// An IQueryable hash-partitioned according to a key public static IQueryable HashPartition(this IQueryable source, Expression> keySelector, IEqualityComparer comparer, int partitionCount) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (partitionCount <= 0) { throw new ArgumentOutOfRangeException("partitionCount"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(comparer, typeof(IEqualityComparer)), Expression.Constant(partitionCount, typeof(int)) } )); } /// /// Hash partition a dataset. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// the dataset to be partitioned /// The funtion to extract the key from a record /// The number of partitioned to create /// An IQueryable hash-partitioned according to a key public static IQueryable HashPartition(this IQueryable source, Expression> keySelector, int partitionCount) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (partitionCount <= 0) { throw new ArgumentOutOfRangeException("partitionCount"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(partitionCount, typeof(int)) } )); } /// /// Hash partition a dataset. The number of resulting partitions is dynamically determined /// at the runtime. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// the dataset to be partitioned /// The function to extract the key from a record /// An IQueryable hash-partitioned according to a key public static IQueryable HashPartition(this IQueryable source, Expression> keySelector) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector) } )); } /// /// Hash partition a dataset. The number of resulting partitions is dynamically determined /// at the runtime. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// The dataset to be partitioned /// The function to extract the key from a record /// An IComparer on TKey to compare keys /// An IQueryable hash-partitioned according to a key public static IQueryable HashPartition(this IQueryable source, Expression> keySelector, IEqualityComparer comparer) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(comparer, typeof(IEqualityComparer)) } )); } /// /// Hash partition a dataset. The number of resulting partitions is dynamically determined /// at the runtime. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// The type of the records in the result dataset /// The dataset to be partitioned /// The function to extract the key from a record /// The function to compute output record /// An IQueryable hash-partitioned according to a key public static IQueryable HashPartition(this IQueryable source, Expression> keySelector, Expression> resultSelector) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (resultSelector == null) { throw new ArgumentNullException("resultSelector"); } if (IsLocalDebugSource(source)) { Func resultSelect = resultSelector.Compile(); return source.Select(resultSelect).AsQueryable(); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey), typeof(TResult)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Quote(resultSelector) } )); } /// /// Hash partition a dataset. The number of resulting partitions is dynamically determined /// at the runtime. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// The type of the records in the result dataset /// The dataset to be partitioned /// The function to extract the key from a record /// An IComparer on TKey to compare keys /// The function to compute output record /// An IQueryable hash-partitioned according to a key public static IQueryable HashPartition(this IQueryable source, Expression> keySelector, IEqualityComparer comparer, Expression> resultSelector) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (resultSelector == null) { throw new ArgumentNullException("resultSelector"); } if (IsLocalDebugSource(source)) { Func resultSelect = resultSelector.Compile(); return source.Select(resultSelect).AsQueryable(); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey), typeof(TResult)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(comparer, typeof(IEqualityComparer)), Expression.Quote(resultSelector) } )); } /// /// Range partition a dataset. The list of range keys are determined dynamically at /// runtime. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// The dataset to be partitioned /// The function to extract the key from a record /// An IQueryable hash-partitioned according to a key public static IQueryable RangePartition(this IQueryable source, Expression> keySelector) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector) } )); } /// /// Range partition a dataset. The list of range keys are determined dynamically at /// runtime. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// The dataset to be partitioned /// The function to extract the key from a record /// The number of partitions in the output dataset /// An IQueryable partitioned according to a list of range keys determined at runtime public static IQueryable RangePartition(this IQueryable source, Expression> keySelector, int partitionCount) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (partitionCount <= 0) { throw new ArgumentOutOfRangeException("partitionCount"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(partitionCount, typeof(int)) } )); } /// /// Range partition a dataset. The list of range keys are determined dynamically at /// runtime. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// The dataset to be partitioned /// The funtion to extract the key from a record /// true if the partition keys are descending /// An IQueryable partitioned according to a list of range keys determined at runtime public static IQueryable RangePartition(this IQueryable source, Expression> keySelector, bool isDescending) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(isDescending, typeof(bool)) } )); } /// /// Range partition a dataset using an array of partition keys. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// The dataset to be partitioned /// The funtion to extract the key from a record /// An array of partition keys, either in ascending or descending order /// An IQueryable partitioned according to the specified range keys public static IQueryable RangePartition(this IQueryable source, Expression> keySelector, TKey[] rangeSeparators) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (rangeSeparators == null) throw new ArgumentNullException("rangeSeparators"); // check that the range-keys are consistent. bool? dummy; if (!DryadLinqUtil.ComputeIsDescending(rangeSeparators, Comparer.Default, out dummy)) { throw new ArgumentException(SR.PartitionKeysAreNotConsistentlyOrdered, "rangeSeparators"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(rangeSeparators, typeof(TKey[])) } )); } /// /// Range partition a dataset. The list of range keys are determined dynamically at /// runtime. /// /// The type of the records in the dataset /// The type of the key on which the partition is based /// The dataset to be partitioned /// The funtion to extract the key from a record /// true if the partition keys are descending /// Number of partitions in the output dataset /// An IQueryable partitioned according to a list of keys determined at runtime public static IQueryable RangePartition(this IQueryable source, Expression> keySelector, bool isDescending, int partitionCount ) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (partitionCount <= 0) { throw new ArgumentOutOfRangeException("partitionCount"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(isDescending, typeof(bool)), Expression.Constant(partitionCount, typeof(int)) } )); } /// /// Range partition a dataset. The list of range keys are determined dynamically at runtime. /// /// The type of the records in the input dataset /// The type of the key on which the partition is based /// The input dataset to be partitioned /// The function to extract the key from a record /// An IComparer on TKey to compare keys /// true if the generated keys must be descending; otherwise ascending /// An IQueryable partitioned according to a list of keys determined at runtime public static IQueryable RangePartition(this IQueryable source, Expression> keySelector, IComparer comparer, bool isDescending) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(comparer, typeof(IComparer)), Expression.Constant(isDescending, typeof(bool)) } )); } /// /// Range partition a dataset using a specified list of keys. /// /// The type of the records in the input dataset /// The type of the key on which the partition is based /// The input dataset to be partitioned /// The function to extract the key from a record /// The list of range keys /// An IComparer on TKey to compare keys /// An IQueryable partitioned according to a specified list of keys. public static IQueryable RangePartition(this IQueryable source, Expression> keySelector, TKey[] rangeSeparators, IComparer comparer) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (rangeSeparators == null) throw new ArgumentNullException("rangeSeparators"); if (comparer == null && !TypeSystem.HasDefaultComparer(typeof(TKey))) { throw new DryadLinqException(DryadLinqErrorCode.ComparerMustBeSpecifiedOrKeyTypeMustBeIComparable, string.Format(SR.ComparerMustBeSpecifiedOrKeyTypeMustBeIComparable, typeof(TKey))); } comparer = TypeSystem.GetComparer(comparer); // check that the range-keys are consistent. bool? dummy; if (!DryadLinqUtil.ComputeIsDescending(rangeSeparators, comparer, out dummy)) { throw new ArgumentException(SR.PartitionKeysAreNotConsistentlyOrdered, "rangeSeparators"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(rangeSeparators, typeof(TKey[])), Expression.Constant(comparer, typeof(IComparer)) } )); } /// /// Range partition a dataset. The list of range keys are determined dynamically at runtime. /// /// The type of the records in the input dataset /// The type of the key on which the partition is based /// The input dataset to be partitioned /// The function to extract the key from a record /// An IComparer on TKey to compare keys /// true if the generated keys must be descending; otherwise ascending /// The number of partitions in the output dataset /// An IQueryable partitioned according to a list of keys determined at runtime public static IQueryable RangePartition(this IQueryable source, Expression> keySelector, IComparer comparer, bool isDescending, int partitionCount) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (partitionCount <= 0) { throw new ArgumentOutOfRangeException("partitionCount"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(comparer, typeof(IComparer)), Expression.Constant(isDescending, typeof(bool)), Expression.Constant(partitionCount, typeof(int))} )); } /// /// Range partition a dataset using a specified list of keys. /// /// The type of the records in the input dataset /// The type of the key on which the partition is based /// The input dataset to be partitioned /// The function to extract the key from a record /// The list of range keys /// An IComparer on TKey to compare keys /// true if the keys must be in descending order; otherwise false /// An IQueryable partitioned according to a specified list of keys public static IQueryable RangePartition(this IQueryable source, Expression> keySelector, TKey[] rangeSeparators, IComparer comparer, bool isDescending) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (rangeSeparators == null) throw new ArgumentNullException("rangeSeparators"); if (comparer == null && !TypeSystem.HasDefaultComparer(typeof(TKey))) { throw new DryadLinqException(DryadLinqErrorCode.ComparerMustBeSpecifiedOrKeyTypeMustBeIComparable, string.Format(SR.ComparerMustBeSpecifiedOrKeyTypeMustBeIComparable, typeof(TKey))); } comparer = TypeSystem.GetComparer(comparer); // check that the range-keys are consistent. bool? detectedDescending; bool keysAreConsistent = DryadLinqUtil.ComputeIsDescending(rangeSeparators, comparer, out detectedDescending); // Note: detectedDescending==null implies that we couldn't precisely tell (single element, repeated elements, etc). if (!keysAreConsistent) { throw new ArgumentException(SR.PartitionKeysAreNotConsistentlyOrdered, "rangeSeparators"); } // and check that the actual direction of keys matches what the user said they wanted. if (detectedDescending != null && detectedDescending != isDescending) { throw new ArgumentException(SR.IsDescendingIsInconsistent); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(rangeSeparators, typeof(TKey[])), Expression.Constant(comparer, typeof(IComparer)), Expression.Constant(isDescending, typeof(bool)) } )); } /// /// Compute applyFunc (source) /// /// The type of the records of the input dataset /// The type of the records of the output dataset /// The input dataset /// The function to be applied to the input dataset /// The result of computing applyFunc(source) public static IQueryable Apply(this IQueryable source, Expression, IEnumerable>> applyFunc) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = applyFunc.Compile()(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)), new Expression[] { source.Expression, Expression.Quote(applyFunc) } )); } /// /// Compute applyFunc(source1, source2) /// /// The type of the records of the first input dataset /// The type of the records of the second input dataset /// he type of the records of the output dataset /// The first input dataset /// The second input dataset /// The function to be applied to the input datasets /// The result of computing applyFunc(source1, source2) public static IQueryable Apply(this IQueryable source1, IQueryable source2, Expression, IEnumerable, IEnumerable>> applyFunc) { if (source1 == null) { throw new ArgumentNullException("source1"); } if (source2 == null) { throw new ArgumentNullException("source2"); } if (IsLocalDebugSource(source1)) { var q = applyFunc.Compile()(source1, source2).AsQueryable(); return new DryadLinqLocalQuery(source1.Provider, q); } return source1.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2), typeof(T3)), new Expression[] { source1.Expression, source2.Expression, Expression.Quote(applyFunc) } )); } /// /// Compute applyFunc on multiple sources /// /// The type of the records of input /// The type of the records of output /// The first input dataset /// Other input datasets /// The function to be applied to the input datasets /// The result of computing applyFunc(source,pieces) public static IQueryable Apply(this IQueryable source, IQueryable[] otherSources, Expression[], IEnumerable>> applyFunc) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { IQueryable[] allSources = new IQueryable[otherSources.Length + 1]; allSources[0] = source; for (int i = 0; i < otherSources.Length; ++i) { allSources[i + 1] = otherSources[i]; } var q = applyFunc.Compile()(allSources).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } Expression[] others = new Expression[otherSources.Length]; for (int i = 0; i < otherSources.Length; i++) { others[i] = otherSources[i].Expression; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)), new Expression[] { source.Expression, Expression.NewArrayInit(typeof(IQueryable), others), Expression.Quote(applyFunc) } )); } /// /// Compute applyFunc on multiple sources /// /// The type of the records of input /// The type of the records of output /// The first input dataset /// Other input datasets /// The function to be applied to the input datasets /// The result of computing applyFunc(source,pieces) public static IQueryable Apply(this IQueryable source, IQueryable[] otherSources, Expression, IEnumerable[], IEnumerable>> applyFunc) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = applyFunc.Compile()(source, otherSources).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } Expression[] others = new Expression[otherSources.Length]; for (int i = 0; i < otherSources.Length; i++) { others[i] = otherSources[i].Expression; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)), new Expression[] { source.Expression, Expression.NewArrayInit(typeof(IQueryable), others), Expression.Quote(applyFunc) } )); } /// /// Compute applyFunc(source) /// /// The type of the records of the input dataset /// The type of the records of the output dataset /// The input dataset /// The function to be applied to the input dataset /// The result of computing applyFunc(source) public static IQueryable ApplyPerPartition( this IQueryable source, Expression, IEnumerable>> applyFunc) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = applyFunc.Compile()(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)), new Expression[] { source.Expression, Expression.Quote(applyFunc) } )); } /// /// Compute applyFunc(source1, source2) /// /// The type of the records of the first input dataset /// The type of the records of the second input dataset /// he type of the records of the output dataset /// The first input dataset /// The second input dataset /// The function to be applied to the input datasets /// True if only distributive over the first dataset /// The result of computing applyFunc(source1, source2) public static IQueryable ApplyPerPartition( this IQueryable source1, IQueryable source2, Expression, IEnumerable, IEnumerable>> applyFunc, bool isFirstOnly = false) { if (source1 == null) { throw new ArgumentNullException("source1"); } if (source2 == null) { throw new ArgumentNullException("source2"); } if (IsLocalDebugSource(source1)) { var q = applyFunc.Compile()(source1, source2).AsQueryable(); return new DryadLinqLocalQuery(source1.Provider, q); } return source1.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2), typeof(T3)), new Expression[] { source1.Expression, source2.Expression, Expression.Quote(applyFunc), Expression.Constant(isFirstOnly) } )); } /// /// Compute applyFunc on multiple sources /// /// The type of the records of input /// The type of the records of output /// The first input dataset /// Other input datasets /// The function to be applied to the input datasets /// True if only distributive over the first dataset /// The result of computing applyFunc(source, otherSources) public static IQueryable ApplyPerPartition( this IQueryable source, IQueryable[] otherSources, Expression[], IEnumerable>> applyFunc, bool isFirstOnly = false) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { IQueryable[] allSources = new IQueryable[otherSources.Length + 1]; allSources[0] = source; for (int i = 0; i < otherSources.Length; ++i) { allSources[i + 1] = otherSources[i]; } var q = applyFunc.Compile()(allSources).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } Expression[] others = new Expression[otherSources.Length]; for (int i = 0; i < otherSources.Length; i++) { others[i] = otherSources[i].Expression; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)), new Expression[] { source.Expression, Expression.NewArrayInit(typeof(IQueryable), others), Expression.Quote(applyFunc), Expression.Constant(isFirstOnly) } )); } /// /// Compute applyFunc on multiple sources /// /// The type of the records of input /// The type of the records of output /// The first input dataset /// Other input datasets /// The function to be applied to the input datasets /// True if only distributive over the first dataset /// The result of computing applyFunc(source,pieces) public static IQueryable ApplyPerPartition( this IQueryable source, IQueryable[] otherSources, Expression, IEnumerable[], IEnumerable>> applyFunc, bool isFirstOnly = false) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = applyFunc.Compile()(source, otherSources).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } Expression[] others = new Expression[otherSources.Length]; for (int i = 0; i < otherSources.Length; i++) { others[i] = otherSources[i].Expression; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)), new Expression[] { source.Expression, Expression.NewArrayInit(typeof(IQueryable), others), Expression.Quote(applyFunc), Expression.Constant(isFirstOnly) } )); } private static IQueryable Dummy(IQueryable source) { return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T)), new Expression[] { source.Expression } )); } private static IQueryable DoWhile(IQueryable source, IQueryable body, IQueryable cond, IQueryable bodySource, IQueryable condSource1, IQueryable condSource2) { return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T)), new Expression[] { source.Expression, body.Expression, cond.Expression, bodySource.Expression, condSource1.Expression, condSource2.Expression } )); } /// /// Conditional DoWhile loop. /// /// The type of the input records /// The input dataset /// The code body of the DoWhile loop /// The termination condition of the DoWhile loop /// The output dataset when the loop terminates public static IQueryable DoWhile(this IQueryable source, Func, IQueryable> body, Func, IQueryable, IQueryable> cond) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.DoWhile(source, body, cond); return new DryadLinqLocalQuery(source.Provider, q.AsQueryable()); } DryadLinqContext context = DryadLinqContext.GetContext(source.Provider); IQueryable before = source; while (true) { IQueryable after = before; after = body(after); var more = cond(before, after); DryadLinqQueryable.SubmitAndWait(after, more); if (!more.Single()) return after; before = after; } } /// /// Apply a function on every sliding window on the input sequence of records. /// /// The type of the input records /// The type of the output records /// The input dataset /// The function to apply to every sliding window /// The size of the window /// An IQueryable formed by the results for each sliding window public static IQueryable SlidingWindow(this IQueryable source, Expression, T2>> procFunc, Int32 windowSize) { if (source == null) { throw new ArgumentNullException("source"); } if (windowSize < 2) { throw new DryadLinqException(SR.WindowSizeMustyBeGTOne); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SlidingWindow(source, procFunc.Compile(), windowSize); return new DryadLinqLocalQuery(source.Provider, q.AsQueryable()); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)), new Expression[] { source.Expression, Expression.Quote(procFunc), Expression.Constant(windowSize, typeof(int))} )); } /// /// Computes a user-defined function on each partition of the input. The function takes a /// partition and its partition index as arguments. /// /// The type of the input records /// The type of the output records /// The input dataset /// The function to apply to each partition /// An IQueryable formed by concatenating the results of applying the function /// to each partition public static IQueryable ApplyWithPartitionIndex(this IQueryable source, Expression, int, IEnumerable>> procFunc) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = procFunc.Compile()(source, 0); return new DryadLinqLocalQuery(source.Provider, q.AsQueryable()); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)), new Expression[] { source.Expression, Expression.Quote(procFunc) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source. /// The input sequence /// true iff the input sequence contains at least one element public static IQueryable AnyAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AnyAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source. /// The input sequence /// A predicate to test each element for a condition. /// true iff the input sequence contains at least one element that satisfies /// the predicate public static IQueryable AnyAsQuery(this IQueryable source, Expression> predicate) { if (source == null) { throw new ArgumentNullException("source"); } if (predicate == null) { throw new ArgumentNullException("predicate"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AnyAsQuery(source, predicate.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(predicate) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source. /// The input sequence /// A predicate to test each element for a condition /// true iff every element in the input sequence satisfies the predicate public static IQueryable AllAsQuery(this IQueryable source, Expression> predicate) { if (source == null) { throw new ArgumentNullException("source"); } if (predicate == null) { throw new ArgumentNullException("predicate"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AllAsQuery(source, predicate.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(predicate) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// The number of elements in the input sequence public static IQueryable CountAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.CountAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// A predicate to test each element for a condition /// The number of elements in the input sequence satisfying the predicate public static IQueryable CountAsQuery(this IQueryable source, Expression> predicate) { if (source == null) { throw new ArgumentNullException("source"); } if (predicate == null) { throw new ArgumentNullException("predicate"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.CountAsQuery(source, predicate.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(predicate) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// The number of elements in the input sequence public static IQueryable LongCountAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.LongCountAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// A predicate to test each element for a condition /// The number of elements in the input sequence satisfying the predicate public static IQueryable LongCountAsQuery(this IQueryable source, Expression> predicate) { if (source == null) { throw new ArgumentNullException("source"); } if (predicate == null) { throw new ArgumentNullException("predicate"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.LongCountAsQuery(source, predicate.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(predicate) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// The value to locate in the sequence /// true iff the source sequence contains an element of the specified value public static IQueryable ContainsAsQuery(this IQueryable source, TSource item) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.ContainsAsQuery(source, item).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Constant(item, typeof(TSource)) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// The value to locate in the sequence /// The equality comparer to use /// true iff the source sequence contains an element of the specified value public static IQueryable ContainsAsQuery(this IQueryable source, TSource item, IEqualityComparer comparer) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.ContainsAsQuery(source, item, comparer).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Constant(item, typeof(TSource)), Expression.Constant(comparer, typeof(IEqualityComparer)) } )); } private static Expression GetSourceExpression(IEnumerable source) { IQueryable q = source as IQueryable; if (q != null) return q.Expression; return Expression.Constant(source.ToArray(), typeof(TSource[])); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The first input sequence /// The second input sequence /// true iff the two input sequences are equal public static IQueryable SequenceEqualAsQuery(this IQueryable source1, IEnumerable source2) { if (source1 == null) { throw new ArgumentNullException("source1"); } if (source2 == null) { throw new ArgumentNullException("source2"); } if (IsLocalDebugSource(source1)) { var q = DryadLinqEnumerable.SequenceEqualAsQuery(source1, source2).AsQueryable(); return new DryadLinqLocalQuery(source1.Provider, q); } return source1.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source1.Expression, GetSourceExpression(source2) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The first input sequence /// The second input sequence /// The equality comparer to use /// true iff the two input sequences are equal public static IQueryable SequenceEqualAsQuery(this IQueryable source1, IEnumerable source2, IEqualityComparer comparer) { if (source1 == null) { throw new ArgumentNullException("source1"); } if (source2 == null) { throw new ArgumentNullException("source2"); } if (IsLocalDebugSource(source1)) { var q = DryadLinqEnumerable.SequenceEqualAsQuery(source1, source2, comparer).AsQueryable(); return new DryadLinqLocalQuery(source1.Provider, q); } return source1.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source1.Expression, GetSourceExpression(source2), Expression.Constant(comparer, typeof(IEqualityComparer)) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// The first element in the input sequence /// The input sequence is empty public static IQueryable FirstAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.FirstAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// A predicate to test each element for a condition /// The first element in the input sequence that satisfies the predicate /// No element in the input sequence satisfies the predicate public static IQueryable FirstAsQuery(this IQueryable source, Expression> predicate) { if (source == null) { throw new ArgumentNullException("source"); } if (predicate == null) { throw new ArgumentNullException("predicate"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.FirstAsQuery(source, predicate.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(predicate) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// The last element in the input sequence public static IQueryable LastAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.LastAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// A predicate to test each element for a condition /// The last element in the input sequence that satisfies the predicate public static IQueryable LastAsQuery(this IQueryable source, Expression> predicate) { if (source == null) { throw new ArgumentNullException("source"); } if (predicate == null) { throw new ArgumentNullException("predicate"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.LastAsQuery(source, predicate.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(predicate) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// The single element of the input sequence public static IQueryable SingleAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SingleAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// A predicate to test each element for a condition /// The single element of the input sequence that satisfies the predicate public static IQueryable SingleAsQuery(this IQueryable source, Expression> predicate) { if (source == null) { throw new ArgumentNullException("source"); } if (predicate == null) { throw new ArgumentNullException("predicate"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SingleAsQuery(source, predicate.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(predicate) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input dataset /// The minimum value in the input dataset public static IQueryable MinAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.MinAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The type of the result value /// The input dataset /// A transformation function to apply to each element /// The minimum value in the transformed values public static IQueryable MinAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.MinAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input dataset /// The maximum value in the input dataset public static IQueryable MaxAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.MaxAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The type of the result value /// The input dataset /// A transformation function to apply to each element /// The maximum value in the transformed values public static IQueryable MaxAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.MaxAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of Int32 values. /// /// A dataset of Int32 values to calculate the sum of /// The sum of the values in the dataset public static IQueryable SumAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the sum of a set of nullable Int32 values. /// /// A dataset of nullable Int32 values to calculate the sum of /// The sum of the values in the dataset public static IQueryable SumAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of Int64 values. /// /// A dataset of Int64 values to calculate the sum of /// The sum of the values in the dataset public static IQueryable SumAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the sum of a set of nullable Int64 values. /// /// A dataset of nullable Int64 values to calculate the sum of /// The sum of the values in the dataset public static IQueryable SumAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of float values. /// /// A dataset of float values to calculate the sum of /// The sum of the values in the dataset public static IQueryable SumAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the sum of a set of nullable float values. /// /// A dataset of nullable float values to calculate the sum of /// The sum of the values in the dataset public static IQueryable SumAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of double values. /// /// A dataset of double values to calculate the sum of /// The sum of the values in the dataset public static IQueryable SumAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the sum of a set of nullable double values. /// /// A dataset of nullable double values to calculate the sum of /// The sum of the values in the dataset public static IQueryable SumAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of decimal values. /// /// A dataset of decimal values to calculate the sum of /// The sum of the values in the dataset public static IQueryable SumAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the sum of a set of nullable decimal values. /// /// A dataset of nullable decimal values to calculate the sum of /// The sum of the values in the dataset public static IQueryable SumAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of Int32 values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The sum of the values after applying the transformation function public static IQueryable SumAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the sum of a set of nullable Int32 values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The sum of the values after applying the transformation function public static IQueryable SumAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of Int64 values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The sum of the values after applying the transformation function public static IQueryable SumAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of nullable Int64 values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The sum of the values after applying the transformation function public static IQueryable SumAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of float values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The sum of the values after applying the transformation function public static IQueryable SumAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of nullable float values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The sum of the values after applying the transformation function public static IQueryable SumAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of double values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The sum of the values after applying the transformation function public static IQueryable SumAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of nullable double values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The sum of the values after applying the transformation function public static IQueryable SumAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of decimal values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The sum of the values after applying the transformation function public static IQueryable SumAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the sum of a set of nullable decimal values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The sum of the values after applying the transformation function public static IQueryable SumAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.SumAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the average of a set of Int32 values in the input /// dataset. /// /// A set of Int32 values to calculate the average of /// The average of the values in the input dataset public static IQueryable AverageAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the average of a set of nullable Int32 values in the input /// dataset. /// /// A set of nullable Int32 values to calculate the average of /// The average of the values in the input dataset public static IQueryable AverageAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the average of a set of Int64 values in the input /// dataset. /// /// A set of Int64 values to calculate the average of /// The average of the values in the input dataset public static IQueryable AverageAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the average of a set of nullable Int64 values in the input /// dataset. /// /// A set of nullable Int64 values to calculate the average of /// The average of the values in the input dataset public static IQueryable AverageAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the average of a set of float values in the input /// dataset. /// /// A set of float values to calculate the average of /// The average of the values in the input dataset public static IQueryable AverageAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the average of a set of nullable float values in the input /// dataset. /// /// A set of nullable float values to calculate the average of /// The average of the values in the input dataset public static IQueryable AverageAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the average of a set of double values in the input /// dataset. /// /// A set of double values to calculate the average of /// The average of the values in the input dataset public static IQueryable AverageAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the average of a set of nullable double values in the input /// dataset. /// /// A set of nullable double values to calculate the average of /// The average of the values in the input dataset public static IQueryable AverageAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the average of a set of decimal values in the input /// dataset. /// /// A set of decimal values to calculate the average of /// The average of the values in the input dataset public static IQueryable AverageAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the average of a set of nullable decimal values in the input /// dataset. /// /// A set of nullable decimal values to calculate the average of /// The average of the values in the input dataset public static IQueryable AverageAsQuery(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()), new Expression[] { source.Expression } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the average of a set of Int32 values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The average of the values after applying the transformation function public static IQueryable AverageAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the average of a set of nullable Int32 values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The average of the values after applying the transformation function public static IQueryable AverageAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the average of a set of float values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The average of the values after applying the transformation function public static IQueryable AverageAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the average of a set of nullable float values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The average of the values after applying the transformation function public static IQueryable AverageAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the average of a set of Int64 values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The average of the values after applying the transformation function public static IQueryable AverageAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the average of a set of nullable Int64 values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The average of the values after applying the transformation function public static IQueryable AverageAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the average of a set of double values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The average of the values after applying the transformation function public static IQueryable AverageAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the average of a set of nullable double values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The average of the values after applying the transformation function public static IQueryable AverageAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. Computes the average of a set of decimal values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The average of the values after applying the transformation function public static IQueryable AverageAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Same as , but returns an <<>> /// containing a single element. Computes the average of a set of nullable decimal values that /// is obtained by applying a function to each element of the input dataset. /// /// The type of the elements of source /// The input dataset /// A transformation function to apply to each element /// The average of the values after applying the transformation function public static IQueryable AverageAsQuery(this IQueryable source, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AverageAsQuery(source, selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(selector) } )); } /// /// Applies an aggregator function over a sequence. /// /// The type of the elements of source /// The type of the accumulator value /// The input sequence /// The function that creates the initial accumulator value /// An accumualator function to apply to each element /// The final accumulator value public static TAccumulate Aggregate(this IQueryable source, Expression> seedFunc, Expression> func) { if (source == null) { throw new ArgumentNullException("source"); } if (func == null) { throw new ArgumentNullException("func"); } if (IsLocalDebugSource(source)) { return DryadLinqEnumerable.Aggregate(source, seedFunc.Compile(), func.Compile()); } return source.Provider.Execute( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TAccumulate)), new Expression[] { source.Expression, Expression.Quote(seedFunc), Expression.Quote(func) } )); } /// /// Applies an aggregator function over a sequence. /// /// The type of the elements of source /// The type of the accumulator value /// The type of final result /// The input sequence /// The function that creates the initial accumulator value /// An accumualator function to apply to each element /// A function to transform the final accumulator value into the result value /// The result of applying selector to the accumulator value public static TResult Aggregate( this IQueryable source, Expression> seedFunc, Expression> func, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (func == null) { throw new ArgumentNullException("func"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { return DryadLinqEnumerable.Aggregate(source, seedFunc.Compile(), func.Compile(), selector.Compile()); } return source.Provider.Execute( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TAccumulate), typeof(TResult)), new Expression[] { source.Expression, Expression.Quote(seedFunc), Expression.Quote(func), Expression.Quote(selector) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The input sequence /// An accumualator function to apply to each element /// The final accumulator value public static IQueryable AggregateAsQuery(this IQueryable source, Expression> func) { if (source == null) { throw new ArgumentNullException("source"); } if (func == null) { throw new ArgumentNullException("func"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AggregateAsQuery(source, func.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), new Expression[] { source.Expression, Expression.Quote(func) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The type of the accumulator value /// The input sequence /// The initial accumulator value /// An accumualator function to apply to each element /// The final accumulator value public static IQueryable AggregateAsQuery(this IQueryable source, TAccumulate seed, Expression> func) { if (source == null) { throw new ArgumentNullException("source"); } if (func == null) { throw new ArgumentNullException("func"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AggregateAsQuery(source, seed, func.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TAccumulate)), new Expression[] { source.Expression, Expression.Constant(seed), Expression.Quote(func) } )); } /// /// Same as , but returns an <> /// containing a single element. /// /// The type of the elements of source /// The type of the accumulator value /// The type of the final result /// The input sequence /// The initial accumulator value /// An accumualator function to apply to each element /// A function to transform the final accumulator value into the result value /// The result of applying selector to the accumulator value public static IQueryable AggregateAsQuery(this IQueryable source, TAccumulate seed, Expression> func, Expression> selector) { if (source == null) { throw new ArgumentNullException("source"); } if (func == null) { throw new ArgumentNullException("func"); } if (selector == null) { throw new ArgumentNullException("selector"); } if (IsLocalDebugSource(source)) { var q = DryadLinqEnumerable.AggregateAsQuery(source, seed, func.Compile(), selector.Compile()).AsQueryable(); return new DryadLinqLocalQuery(source.Provider, q); } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TAccumulate), typeof(TResult)), new Expression[] { source.Expression, Expression.Constant(seed), Expression.Quote(func), Expression.Quote(selector) } )); } /// /// Instruct DryadLINQ to assume that the dataset is hash partitioned. /// /// The type of the records of the dataset /// The type of the keys on which the partition is based /// The input dataset /// The function to extract the key from a record /// The same dataset as input public static IQueryable AssumeHashPartition(this IQueryable source, Expression> keySelector) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector) } )); } /// /// Instructs DryadLINQ to assume that the dataset is hash partitioned. /// /// The type of the records of the dataset /// The type of the key on which partition is based /// The input dataset /// The function to extract the key from a record /// An equality comparer to compute the hash code of a key /// The same dataset as input public static IQueryable AssumeHashPartition(this IQueryable source, Expression> keySelector, IEqualityComparer comparer) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(comparer, typeof(IEqualityComparer)) } )); } /// /// Instructs DryadLINQ to assume that the dataset is range partitioned. /// /// The type of the records of the dataset /// The type of the key on which partition is based /// The input dataset /// The function to extract the key from a record /// true to assume the partition keys are ordered descendingly /// The same dataset as input public static IQueryable AssumeRangePartition(this IQueryable source, Expression> keySelector, bool isDescending) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(isDescending) } )); } /// /// Instructs DryadLINQ to assume that the dataset is range partitioned. /// /// The type of the records of the dataset /// The type of the key on which partition is based /// The input dataset /// The function to extract the key from a record /// An IComparer on TKey to compare keys /// true to assume that the partition keys are descending /// The same dataset as input public static IQueryable AssumeRangePartition(this IQueryable source, Expression> keySelector, IComparer comparer, bool isDescending) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(comparer, typeof(IComparer)), Expression.Constant(isDescending) } )); } /// /// Instructs DryadLINQ to assume that the dataset is range partitioned by a specified list of keys. /// /// The type of the records of the dataset /// The type of the key on which partition is based /// The input dataset /// The function to extract the key from a record /// A list of partition keys /// The same dataset as input public static IQueryable AssumeRangePartition(this IQueryable source, Expression> keySelector, TKey[] rangeSeparators) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (rangeSeparators == null) { throw new ArgumentNullException("rangeSeparators"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(rangeSeparators) } )); } /// /// Instructs DryadLINQ to assume that the dataset is range partitioned by a specified list of keys. /// /// The type of the records of the dataset /// The type of the key on which partition is based /// The input dataset /// The function to extract the key from a record /// An IComparer on TKey to compare keys /// A list of partition keys /// The same dataset as input public static IQueryable AssumeRangePartition(this IQueryable source, Expression> keySelector, TKey[] rangeSeparators, IComparer comparer) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (rangeSeparators == null) { throw new ArgumentNullException("rangeSeparators"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(rangeSeparators), Expression.Constant(comparer, typeof(IComparer)) } )); } /// /// Instructs DryadLINQ to assume that each partition of the dataset is ordered. A dataset /// is ordered if it is range partitioned and each partition of it is ordered on the same /// key. /// /// The type of the recrods of the dataset /// The type of the key on which partition is based /// The dataset /// The function to extract the key from a record /// true to assume the order is descending /// The same dataset as input public static IQueryable AssumeOrderBy(this IQueryable source, Expression> keySelector, bool isDescending) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(isDescending) } )); } /// /// Instructs DryadLINQ to assume that each partition of the dataset is ordered. A dataset /// is ordered if it is range partitioned and each partition of it is ordered on the same /// key. /// /// The type of the recrods of the dataset /// The type of the key on which partition is based /// The input dataset /// The function to extract the key from a record /// An IComparer on TKey to compare keys /// true to assume the order is descending /// The same dataset as input public static IQueryable AssumeOrderBy(this IQueryable source, Expression> keySelector, IComparer comparer, bool isDescending) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (IsLocalDebugSource(source)) { return source; } return source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(comparer, typeof(IComparer)), Expression.Constant(isDescending) } )); } /// /// Forks a specified input dataset into two datasets. A specified user-defined function is /// applied to each partition of the input dataset to produce a sequence of ForkTuples. /// /// The type of the elements of source /// The element type of the first output dataset /// The element type of the second output dataset /// The input dataset /// The function to apply to each partition of the input dataset /// An IMultiQueryable exposing two output datasets public static IMultiQueryable Fork(this IQueryable source, Expression, IEnumerable>>> mapper) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { IMultiEnumerable enumerables = DryadLinqEnumerable.Fork(source, mapper.Compile()); return new MultiQueryable(source, enumerables); } Expression expr = Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T), typeof(R1), typeof(R2)), new Expression[] { source.Expression, Expression.Quote(mapper) } ); return new MultiQueryable(source, expr); } /// /// Forks a specified input dataset into three datasets. A specified user-defined function is /// applied to each partition of the input dataset to produce a sequence of ForkTuples. /// /// The type of the elements of source /// The element type of the first output dataset /// The element type of the second output dataset /// The element type of the third output dataset /// The input dataset /// The function to apply to each partition of the input dataset /// An IMultiQueryable exposing three output datasets public static IMultiQueryable Fork(this IQueryable source, Expression, IEnumerable>>> mapper) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { IMultiEnumerable enumerables = DryadLinqEnumerable.Fork(source, mapper.Compile()); return new MultiQueryable(source, enumerables); } Expression expr = Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T), typeof(R1), typeof(R2), typeof(R3)), new Expression[] { source.Expression, Expression.Quote(mapper) } ); return new MultiQueryable(source, expr); } /// /// Compute two output datasets from one input dataset. A specified user-defined function is /// applied to each input element to produce zero or one element for each output dataset. /// /// The type of records of input dataset /// The type of records of first output dataset /// The type of records of second output dataset /// The input dataset /// The function applied to each record of the input /// An IMultiQueryable for the two output datasets public static IMultiQueryable Fork(this IQueryable source, Expression>> mapper) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { IMultiEnumerable enumerables = DryadLinqEnumerable.Fork(source, mapper.Compile()); return new MultiQueryable(source, enumerables); } Expression expr = Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T), typeof(R1), typeof(R2)), new Expression[] { source.Expression, Expression.Quote(mapper) } ); return new MultiQueryable(source, expr); } /// /// Forks one input dataset into three output datasets. A specified user-defined function is /// applied to each input element to produce zero or one element for each output dataset. /// /// The type of records of input dataset /// The type of records of the first output dataset /// The type of records of the second output dataset /// The type of records of the third output dataset /// The input dataset /// The function applied to each record of the input /// An IMultiQueryable for the three output datasets public static IMultiQueryable Fork(this IQueryable source, Expression>> mapper) { if (source == null) { throw new ArgumentNullException("source"); } if (IsLocalDebugSource(source)) { IMultiEnumerable enumerables = DryadLinqEnumerable.Fork(source, mapper.Compile()); return new MultiQueryable(source, enumerables); } Expression expr = Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T), typeof(R1), typeof(R2), typeof(R3)), new Expression[] { source.Expression, Expression.Quote(mapper) } ); return new MultiQueryable(source, expr); } /// /// Divides the input dataset into a collection of datasets based on the keys of the records. /// The method produces one output dataset for each key in the specified key array. Input /// records that don't match any of the keys are dropped. /// /// The type of records of input dataset /// The type of the keys of the input records /// The input dataset /// The function to extract the key from a record /// A list of the partition keys /// An IKeyedMultiQueryable for the output datasets. public static IKeyedMultiQueryable Fork(this IQueryable source, Expression> keySelector, TKey[] keys) { if (source == null) { throw new ArgumentNullException("source"); } if (keySelector == null) { throw new ArgumentNullException("keySelector"); } if (keys == null) { throw new ArgumentNullException("keys"); } if (IsLocalDebugSource(source)) { IMultiEnumerable enumerables = DryadLinqEnumerable.Fork(source, keySelector.Compile(), keys); return new MultiQueryable(source, keys, enumerables); } Expression expr = Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), new Expression[] { source.Expression, Expression.Quote(keySelector), Expression.Constant(keys, typeof(TKey[])) } ); return new MultiQueryable(source, keys, expr); } internal static IQueryable ForkChoose(this IMultiQueryable source, int index) { if (source == null) { throw new ArgumentNullException("source"); } return source.Provider.CreateQuery( Expression.Call(null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T)), new Expression[] { source.Expression, Expression.Constant(index) })); } /// /// Specifies a stream URI to be populated with the result of a specified DryadLINQ query. /// /// The type of the records of the table /// The data source /// A stream name /// If this flag is true, delete the output stream /// if it already exisit before execution /// An optional stream-based serializer /// An optional stream-based deserializer /// A query representing the output data. public static IQueryable ToStore(this IQueryable source, string dataSetUri, bool deleteIfExists = false, Expression, Stream>> serializer = null, Expression>> deserializer = null) { return ToStore(source, new Uri(dataSetUri), deleteIfExists, serializer, deserializer); } /// /// Specifies a stream URI to be populated with the result of a specified DryadLINQ query. /// /// The type of the records of the table /// The data source /// The stream name to store the result /// If this flag is true, delete the output stream /// if it already exisit before execution /// An optional stream-based serializer /// An optional stream-based deserializer /// A query representing the output data. public static IQueryable ToStore(this IQueryable source, Uri dataSetUri, bool deleteIfExists = false, Expression, Stream>> serializer = null, Expression>> deserializer = null) { if ((serializer == null) ^ (deserializer == null)) { throw new DryadLinqException("Must provide both serializer and deserializer."); } DryadLinqContext context = DryadLinqContext.GetContext(source.Provider); DataProvider dataProvider = DataProvider.GetDataProvider(dataSetUri.Scheme); dataSetUri = dataProvider.RewriteUri(context, dataSetUri, FileAccess.Write); dataProvider.CheckExistence(context, dataSetUri, deleteIfExists); return ToStoreInternal(source, dataSetUri, false, serializer, deserializer); } private static IQueryable ToStoreInternal(this IQueryable source, Uri dataSetUri, bool isTemp, Expression, Stream>> serializer, Expression>> deserializer) { if (source == null) { throw new ArgumentNullException("source"); } if (dataSetUri == null) { throw new ArgumentNullException("streamName"); } if (!(source.Provider is DryadLinqProviderBase)) { throw new ArgumentException(String.Format(SR.NotADryadLinqQuery, 0), "source"); } DryadLinqContext context = DryadLinqContext.GetContext(source.Provider); IQueryable result; if (IsLocalDebugSource(source)) { CompressionScheme compressionScheme = context.OutputDataCompressionScheme; DryadLinqMetaData metadata = new DryadLinqMetaData(context, typeof(TSource), dataSetUri, compressionScheme); result = DataProvider.StoreData(context, source, dataSetUri, metadata, compressionScheme, isTemp, serializer, deserializer); } else { // Strip out ToStore if source.Expression has one. Expression expr = source.Expression; MethodCallExpression mcExpr = expr as MethodCallExpression; if (mcExpr != null && (mcExpr.Method.Name == ReflectedNames.DLQ_ToStoreInternal || mcExpr.Method.Name == ReflectedNames.DLQ_ToStoreInternalAux)) { expr = mcExpr.Arguments[0]; } if (serializer == null) { MethodInfo minfo = typeof(DryadLinqQueryable) .GetMethod(ReflectedNames.DLQ_ToStoreInternalAux, BindingFlags.Static | BindingFlags.NonPublic); result = source.Provider.CreateQuery( Expression.Call(minfo.MakeGenericMethod(typeof(TSource)), expr, Expression.Constant(dataSetUri, typeof(Uri)), Expression.Constant(isTemp, typeof(bool)))); } else { result = source.Provider.CreateQuery( Expression.Call( null, ((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), expr, Expression.Constant(dataSetUri, typeof(Uri)), Expression.Constant(isTemp, typeof(bool)), Expression.Quote(serializer), Expression.Quote(deserializer))); } ((DryadLinqQuery)source).BackingData = (DryadLinqQuery)result; } return result; } private static IQueryable ToStoreInternalAux(this IQueryable source, Uri dataSetUri, bool isTemp) { return ToStoreInternal(source, dataSetUri, isTemp, null, null); } /// /// Submits a specified query for asynchronous execution. /// /// The type of the records of the table /// The data source /// Information about the execution job. public static DryadLinqJobInfo Submit(this IQueryable source) { if (source == null) { throw new ArgumentNullException("source"); } // Extract the context. if (!(source.Provider is DryadLinqProviderBase)) { throw new ArgumentException(String.Format(SR.NotADryadLinqQuery, 0), "source"); } DryadLinqContext context = DryadLinqContext.GetContext(source.Provider); if (IsLocalDebugSource(source)) { // Noop for LocalDebug return new DryadLinqJobInfo(DryadLinqJobInfo.JOBID_LOCALDEBUG, null, null); } // Now we are not LocalDebug mode: DryadLinqQuery sourceQuery = source as DryadLinqQuery; if (sourceQuery == null) { throw new ArgumentException(String.Format(SR.NotADryadLinqQuery, 0), "source"); } // Handle repeat submissions. if (sourceQuery.IsPlainData) { string jobId = DryadLinqJobInfo.JOBID_NOJOB; if (sourceQuery.QueryExecutor != null) { jobId = sourceQuery.QueryExecutor.GetJobId(); } return new DryadLinqJobInfo(jobId, context.HeadNode, sourceQuery.QueryExecutor); } else if (sourceQuery.IsDataBacked) { // This query has already been submitted. DryadLinqQuery backingData = sourceQuery.BackingData; while (backingData.IsDataBacked) { backingData = backingData.BackingData; } string jobId = backingData.QueryExecutor.GetJobId(); return new DryadLinqJobInfo(jobId, context.HeadNode, backingData.QueryExecutor); } else { // Sanity check that we have a DryadLinqQuery MethodCallExpression mcExpr = source.Expression as MethodCallExpression; if (mcExpr == null) { throw new ArgumentException(String.Format(SR.AtLeastOneOperatorRequired, 0), "source"); } if (!mcExpr.Method.IsStatic || !TypeSystem.IsQueryOperatorCall(mcExpr)) { throw new ArgumentException(String.Format(SR.NotADryadLinqQuery, 0), "source"); } bool isTemp = false; if (mcExpr.Method.Name != ReflectedNames.DLQ_ToStoreInternal && mcExpr.Method.Name != ReflectedNames.DLQ_ToStoreInternalAux) { // Support for non-ToStoreQuery.Submit() isTemp = true; Uri tableUri = context.MakeTemporaryStreamUri(); mcExpr = Expression.Call( null, typeof(DryadLinqQueryable).GetMethod(ReflectedNames.DLQ_ToStoreInternalAux, BindingFlags.Static | BindingFlags.NonPublic) .MakeGenericMethod(typeof(TSource)), source.Expression, Expression.Constant(tableUri, typeof(Uri)), Expression.Constant(true, typeof(bool))); } // Execute the queries DryadLinqQueryGen dryadGen = new DryadLinqQueryGen(context, ((DryadLinqQuery)source).GetVertexCodeGen(), new Expression[] { mcExpr }); DryadLinqQuery[] tables = dryadGen.Execute(); tables[0].IsTemp = isTemp; ((DryadLinqQuery)source).BackingData = tables[0]; string jobId = tables[0].QueryExecutor.GetJobId(); return new DryadLinqJobInfo(jobId, context.HeadNode, tables[0].QueryExecutor); } } /// /// Submits a specified query and then waits for the job to complete /// /// If the job completes in error or is cancelled. /// If repeated errors occur while polling for status. /// The type of the records of the table /// The data source /// Information about the execution job. public static DryadLinqJobInfo SubmitAndWait(this IQueryable source) { DryadLinqJobInfo info = source.Submit(); info.Wait(); return info; } /// /// Submits a list of DryadLINQ queries for asynchronous execution. /// /// Queries to execute. /// Job information for tracking the execution. public static DryadLinqJobInfo Submit(params IQueryable[] sources) { if (sources == null) { throw new ArgumentNullException("sources"); } if (sources.Length == 0) { throw new ArgumentException("sources is empty", "sources"); } DryadLinqContext context = CheckSourcesAndGetCommonContext(sources); if (IsLocalDebugSource(sources[0])) { // Noop for LocalDebug return new DryadLinqJobInfo(DryadLinqJobInfo.JOBID_LOCALDEBUG, null, null); } // Not LocalDebug mode: List qList = new List(); List isTempList = new List(); string[] jobIds = new string[sources.Length]; string[] headNodes = new string[sources.Length]; DryadLinqJobExecutor[] jobExecutors = new DryadLinqJobExecutor[sources.Length]; for (int i = 0; i < sources.Length; i++) { DryadLinqQuery sourceQuery = sources[i] as DryadLinqQuery; if (sourceQuery == null) { throw new ArgumentException(String.Format(SR.NotADryadLinqQuery, i), string.Format("sources[{0}]", i)); } if (sourceQuery.IsPlainData) { jobIds[i] = DryadLinqJobInfo.JOBID_NOJOB; if (sourceQuery.QueryExecutor != null) { jobIds[i] = sourceQuery.QueryExecutor.GetJobId(); } headNodes[i] = context.HeadNode; jobExecutors[i] = sourceQuery.QueryExecutor; } else if (sourceQuery.IsDataBacked) { // This query has already been submitted. sourceQuery = sourceQuery.BackingData; while (sourceQuery.IsDataBacked) { sourceQuery = sourceQuery.BackingData; } jobIds[i] = sourceQuery.QueryExecutor.GetJobId(); headNodes[i] = context.HeadNode; jobExecutors[i] = sourceQuery.QueryExecutor; } else { // Sanity check that we have a normal DryadLinqQuery MethodCallExpression mcExpr = sources[i].Expression as MethodCallExpression; if (mcExpr == null) { throw new ArgumentException(String.Format(SR.AtLeastOneOperatorRequired, 0), "source"); } if (!mcExpr.Method.IsStatic || !TypeSystem.IsQueryOperatorCall(mcExpr)) { throw new ArgumentException(String.Format(SR.NotADryadLinqQuery, i), string.Format("sources[{0}]", i)); } bool isTemp = false; ; if (mcExpr.Method.Name != ReflectedNames.DLQ_ToStoreInternal && mcExpr.Method.Name != ReflectedNames.DLQ_ToStoreInternalAux) { isTemp = true; Uri tableUri = context.MakeTemporaryStreamUri(); MethodInfo minfo = typeof(DryadLinqQueryable) .GetMethod(ReflectedNames.DLQ_ToStoreInternalAux, BindingFlags.Static | BindingFlags.NonPublic); Type elemType = mcExpr.Type.GetGenericArguments()[0]; minfo = minfo.MakeGenericMethod(elemType); mcExpr = Expression.Call(minfo, mcExpr, Expression.Constant(tableUri, typeof(Uri)), Expression.Constant(isTemp, typeof(bool))); } qList.Add(mcExpr); isTempList.Add(isTemp); } } // Execute the queries on the cluster: VertexCodeGen vertexCodeGen = ((DryadLinqQuery)sources[0]).GetVertexCodeGen(); DryadLinqQueryGen queryGen = new DryadLinqQueryGen(context, vertexCodeGen, qList.ToArray()); DryadLinqQuery[] tables = queryGen.Execute(); // Store the results in the queries int idx = 0; for (int i = 0; i < tables.Length; i++) { tables[i].IsTemp = isTempList[i]; while (headNodes[idx] != null) idx++; ((DryadLinqQuery)sources[idx]).BackingData = tables[i]; headNodes[idx] = context.HeadNode; jobExecutors[idx] = tables[0].QueryExecutor; } return new DryadLinqJobInfo(jobIds, headNodes, jobExecutors); } /// /// Submits a list of DryadLinq queries for execution and waits for the job to complete /// /// If the job completes in error or is cancelled. /// If repeated errors occur while polling for status. /// A set of DryadLINQ queries to execute /// Information about the job being submitted for execution. /// /// Every item in sources must be an DryadLinq IQueryable object that terminates with ToStore() /// Only one job will be executed, but the job will produce the output associated with each item in sources. /// public static DryadLinqJobInfo SubmitAndWait(params IQueryable[] sources) { DryadLinqJobInfo info = DryadLinqQueryable.Submit(sources); info.Wait(); return info; } private static DryadLinqContext CheckSourcesAndGetCommonContext(IQueryable[] sources) { if (sources == null) { throw new ArgumentNullException("sources"); } if (sources.Length == 0) { throw new ArgumentException("sources is empty", "sources"); } for (int i = 0; i < sources.Length; i++) { if (sources[i] == null) { throw new ArgumentException(string.Format("sources[{0}] was null.", i), "sources"); } // Sanity check that we have normal DryadLinqQuery objects if (!(sources[i].Provider is DryadLinqProviderBase)) { throw new ArgumentException(String.Format(SR.NotADryadLinqQuery, i), "sources"); } } //check for duplicate query objects HashSet repeatedQueryDetector = new HashSet(); for (int i = 0; i < sources.Length; i++) { var q = sources[i]; if (repeatedQueryDetector.Contains(q)) { throw new ArgumentException(string.Format(SR.SameQuerySubmittedMultipleTimesInMaterialize), string.Format("sources[{0}]", i)); } repeatedQueryDetector.Add(q); } // Check the queries all use the same context DryadLinqContext commonContext = DryadLinqContext.GetContext(sources[0].Provider); for (int i = 1; i < sources.Length; i++) { if (commonContext != DryadLinqContext.GetContext(sources[i].Provider)) { throw new DryadLinqException("The queries submitted together must be created using the same DryadLinqContext."); } } return commonContext; } } }