/*
Copyright (c) Microsoft Corporation
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License
at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF
TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions and
limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.IO;
using System.Globalization;
using System.Reflection;
using System.Linq.Expressions;
using System.Linq;
using System.Diagnostics;
using Microsoft.Research.DryadLinq.Internal;
namespace Microsoft.Research.DryadLinq
{
///
/// This class extends LINQ with a set of new operators that are specific to DryadLINQ.
/// The new operators includes partitioning operators (HashPartition and RangePartition)
/// and the Apply operator that enables stateful transformations on datasets.
///
public static class DryadLinqQueryable
{
internal static bool IsLocalDebugSource(IQueryable source)
{
return !(source.Provider is DryadLinqProvider);
}
///
/// Filters a sequence of values based on a predicate. Each element's index is used in
/// the logic of the predicate function.
///
/// The type of the elements of source
/// The sequence of elements to filter
/// The filter predicate.
/// The elements in the input that satisfy the predicate
public static IQueryable
LongWhere(this IQueryable source,
Expression> predicate)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (predicate == null)
{
throw new ArgumentNullException("predicate");
}
if (IsLocalDebugSource(source))
{
var result = DryadLinqEnumerable.LongWhere(source, predicate.Compile()).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, result);
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
new Expression[] { source.Expression, Expression.Quote(predicate) }
));
}
///
/// Transforms each element of a sequence into a new form by applying a function of
/// the element and its index.
///
/// The type of the elements of source
/// The type of the elements of the result
/// The sequence of input elements
/// A transform function to apply to each source element;
/// the second parameter of the function represents the index of the source element.
/// The sequence resulting from applying the transformation function on each input element
public static IQueryable
LongSelect(this IQueryable source,
Expression> selector)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (selector == null)
{
throw new ArgumentNullException("selector");
}
if (IsLocalDebugSource(source))
{
var result = DryadLinqEnumerable.LongSelect(source, selector.Compile()).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, result);
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
new Expression[] { source.Expression, Expression.Quote(selector) }
));
}
///
/// Transforms each element of a sequence into an IEnumerable{T} by applying a function to the
/// element and its index, and then flattens the resulting sequences into one sequence.
///
/// The type of the elements of source
/// The type of the elements of the result
/// The sequence of input elements
/// A transform function to apply to each source element;
/// the second parameter of the function represents the index of the source element.
/// The sequence resulting from applying the function on each input element and
/// flattening the results
public static IQueryable
LongSelectMany(this IQueryable source,
Expression>> selector)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (selector == null)
{
throw new ArgumentNullException("selector");
}
if (IsLocalDebugSource(source))
{
var result = DryadLinqEnumerable.LongSelectMany(source, selector.Compile()).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, result);
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
new Expression[] { source.Expression, Expression.Quote(selector) }
));
}
///
/// Transforms each element of a sequence into an IEnumerable{T} by applying a function to the
/// element and its index, and then flattens the resulting sequences into one sequence.
///
/// The type of the elements of source
/// The type of the element in the intermediate IEnumerable sequences
/// The type of the elements of the result
/// The sequence of input elements
/// A transform function to apply to each source element;
/// the second parameter of the function represents the index of the source element.
/// A transformation function to apply to each intermediate element
/// The sequence resulting from applying selector to each input element and
/// flattening and transforming the elements in the intermediate sequences
public static IQueryable
LongSelectMany(this IQueryable source,
Expression>> selector,
Expression> resultSelector)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (selector == null)
{
throw new ArgumentNullException("selector");
}
if (resultSelector == null)
{
throw new ArgumentNullException("resultSelector");
}
if (IsLocalDebugSource(source))
{
var result = DryadLinqEnumerable.LongSelectMany(source, selector.Compile(), resultSelector.Compile()).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, result);
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
new Expression[] { source.Expression, Expression.Quote(selector), Expression.Quote(resultSelector) }
));
}
///
/// Returns the largest prefix of a sequence such that the elements satisfy a specified predicate.
///
/// The element type of the input sequence
/// The input sequence
/// A predicate to test each element for a condition
/// The largest prefix satisfying the predicate
public static IQueryable
LongTakeWhile(this IQueryable source,
Expression> predicate)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (predicate == null)
{
throw new ArgumentNullException("predicate");
}
if (IsLocalDebugSource(source))
{
var result = DryadLinqEnumerable.LongTakeWhile(source, predicate.Compile()).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, result);
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
new Expression[] { source.Expression, Expression.Quote(predicate) }
));
}
///
/// Skips elements in a sequence as long as a specified condition is true and then returns the
/// remaining elements. The predicate is a function of an element and its index.
///
/// The element type of the input sequence
/// The input sequence
/// A predicate to test each element for a condition
/// The remaining sequence by skipping the elements in the head that satisfy the predicate
public static IQueryable
LongSkipWhile(this IQueryable source,
Expression> predicate)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (predicate == null)
{
throw new ArgumentNullException("predicate");
}
if (IsLocalDebugSource(source))
{
var result = DryadLinqEnumerable.LongSkipWhile(source, predicate.Compile()).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, result);
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
new Expression[] { source.Expression, Expression.Quote(predicate) }
));
}
///
/// Hash partition a dataset.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// The dataset to be partitioned
/// The function to extract the key from a record
/// An EqualityComparer on TKey to compare keys
/// The number of partitions to create
/// An IQueryable hash-partitioned according to a key
public static IQueryable
HashPartition(this IQueryable source,
Expression> keySelector,
IEqualityComparer comparer,
int partitionCount)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (partitionCount <= 0)
{
throw new ArgumentOutOfRangeException("partitionCount");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(comparer, typeof(IEqualityComparer)),
Expression.Constant(partitionCount, typeof(int)) }
));
}
///
/// Hash partition a dataset.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// the dataset to be partitioned
/// The funtion to extract the key from a record
/// The number of partitioned to create
/// An IQueryable hash-partitioned according to a key
public static IQueryable
HashPartition(this IQueryable source,
Expression> keySelector,
int partitionCount)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (partitionCount <= 0)
{
throw new ArgumentOutOfRangeException("partitionCount");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(partitionCount, typeof(int)) }
));
}
///
/// Hash partition a dataset. The number of resulting partitions is dynamically determined
/// at the runtime.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// the dataset to be partitioned
/// The function to extract the key from a record
/// An IQueryable hash-partitioned according to a key
public static IQueryable
HashPartition(this IQueryable source,
Expression> keySelector)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector) }
));
}
///
/// Hash partition a dataset. The number of resulting partitions is dynamically determined
/// at the runtime.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// The dataset to be partitioned
/// The function to extract the key from a record
/// An IComparer on TKey to compare keys
/// An IQueryable hash-partitioned according to a key
public static IQueryable
HashPartition(this IQueryable source,
Expression> keySelector,
IEqualityComparer comparer)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(comparer, typeof(IEqualityComparer)) }
));
}
///
/// Hash partition a dataset. The number of resulting partitions is dynamically determined
/// at the runtime.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// The type of the records in the result dataset
/// The dataset to be partitioned
/// The function to extract the key from a record
/// The function to compute output record
/// An IQueryable hash-partitioned according to a key
public static IQueryable
HashPartition(this IQueryable source,
Expression> keySelector,
Expression> resultSelector)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (resultSelector == null)
{
throw new ArgumentNullException("resultSelector");
}
if (IsLocalDebugSource(source))
{
Func resultSelect = resultSelector.Compile();
return source.Select(resultSelect).AsQueryable();
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey), typeof(TResult)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Quote(resultSelector) }
));
}
///
/// Hash partition a dataset. The number of resulting partitions is dynamically determined
/// at the runtime.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// The type of the records in the result dataset
/// The dataset to be partitioned
/// The function to extract the key from a record
/// An IComparer on TKey to compare keys
/// The function to compute output record
/// An IQueryable hash-partitioned according to a key
public static IQueryable
HashPartition(this IQueryable source,
Expression> keySelector,
IEqualityComparer comparer,
Expression> resultSelector)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (resultSelector == null)
{
throw new ArgumentNullException("resultSelector");
}
if (IsLocalDebugSource(source))
{
Func resultSelect = resultSelector.Compile();
return source.Select(resultSelect).AsQueryable();
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey), typeof(TResult)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(comparer, typeof(IEqualityComparer)),
Expression.Quote(resultSelector) }
));
}
///
/// Range partition a dataset. The list of range keys are determined dynamically at
/// runtime.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// The dataset to be partitioned
/// The function to extract the key from a record
/// An IQueryable hash-partitioned according to a key
public static IQueryable
RangePartition(this IQueryable source,
Expression> keySelector)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector) }
));
}
///
/// Range partition a dataset. The list of range keys are determined dynamically at
/// runtime.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// The dataset to be partitioned
/// The function to extract the key from a record
/// The number of partitions in the output dataset
/// An IQueryable partitioned according to a list of range keys determined at runtime
public static IQueryable
RangePartition(this IQueryable source,
Expression> keySelector,
int partitionCount)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (partitionCount <= 0)
{
throw new ArgumentOutOfRangeException("partitionCount");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(partitionCount, typeof(int)) }
));
}
///
/// Range partition a dataset. The list of range keys are determined dynamically at
/// runtime.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// The dataset to be partitioned
/// The funtion to extract the key from a record
/// true if the partition keys are descending
/// An IQueryable partitioned according to a list of range keys determined at runtime
public static IQueryable
RangePartition(this IQueryable source,
Expression> keySelector,
bool isDescending)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(isDescending, typeof(bool)) }
));
}
///
/// Range partition a dataset using an array of partition keys.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// The dataset to be partitioned
/// The funtion to extract the key from a record
/// An array of partition keys, either in ascending or descending order
/// An IQueryable partitioned according to the specified range keys
public static IQueryable
RangePartition(this IQueryable source,
Expression> keySelector,
TKey[] rangeSeparators)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (rangeSeparators == null)
throw new ArgumentNullException("rangeSeparators");
// check that the range-keys are consistent.
bool? dummy;
if (!DryadLinqUtil.ComputeIsDescending(rangeSeparators, Comparer.Default, out dummy))
{
throw new ArgumentException(SR.PartitionKeysAreNotConsistentlyOrdered, "rangeSeparators");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(rangeSeparators, typeof(TKey[])) }
));
}
///
/// Range partition a dataset. The list of range keys are determined dynamically at
/// runtime.
///
/// The type of the records in the dataset
/// The type of the key on which the partition is based
/// The dataset to be partitioned
/// The funtion to extract the key from a record
/// true if the partition keys are descending
/// Number of partitions in the output dataset
/// An IQueryable partitioned according to a list of keys determined at runtime
public static IQueryable
RangePartition(this IQueryable source,
Expression> keySelector,
bool isDescending,
int partitionCount )
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (partitionCount <= 0)
{
throw new ArgumentOutOfRangeException("partitionCount");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(isDescending, typeof(bool)),
Expression.Constant(partitionCount, typeof(int)) }
));
}
///
/// Range partition a dataset. The list of range keys are determined dynamically at runtime.
///
/// The type of the records in the input dataset
/// The type of the key on which the partition is based
/// The input dataset to be partitioned
/// The function to extract the key from a record
/// An IComparer on TKey to compare keys
/// true if the generated keys must be descending; otherwise ascending
/// An IQueryable partitioned according to a list of keys determined at runtime
public static IQueryable
RangePartition(this IQueryable source,
Expression> keySelector,
IComparer comparer,
bool isDescending)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(comparer, typeof(IComparer)),
Expression.Constant(isDescending, typeof(bool)) }
));
}
///
/// Range partition a dataset using a specified list of keys.
///
/// The type of the records in the input dataset
/// The type of the key on which the partition is based
/// The input dataset to be partitioned
/// The function to extract the key from a record
/// The list of range keys
/// An IComparer on TKey to compare keys
/// An IQueryable partitioned according to a specified list of keys.
public static IQueryable
RangePartition(this IQueryable source,
Expression> keySelector,
TKey[] rangeSeparators,
IComparer comparer)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (rangeSeparators == null)
throw new ArgumentNullException("rangeSeparators");
if (comparer == null && !TypeSystem.HasDefaultComparer(typeof(TKey)))
{
throw new DryadLinqException(DryadLinqErrorCode.ComparerMustBeSpecifiedOrKeyTypeMustBeIComparable,
string.Format(SR.ComparerMustBeSpecifiedOrKeyTypeMustBeIComparable, typeof(TKey)));
}
comparer = TypeSystem.GetComparer(comparer);
// check that the range-keys are consistent.
bool? dummy;
if (!DryadLinqUtil.ComputeIsDescending(rangeSeparators, comparer, out dummy))
{
throw new ArgumentException(SR.PartitionKeysAreNotConsistentlyOrdered, "rangeSeparators");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(rangeSeparators, typeof(TKey[])),
Expression.Constant(comparer, typeof(IComparer)) }
));
}
///
/// Range partition a dataset. The list of range keys are determined dynamically at runtime.
///
/// The type of the records in the input dataset
/// The type of the key on which the partition is based
/// The input dataset to be partitioned
/// The function to extract the key from a record
/// An IComparer on TKey to compare keys
/// true if the generated keys must be descending; otherwise ascending
/// The number of partitions in the output dataset
/// An IQueryable partitioned according to a list of keys determined at runtime
public static IQueryable
RangePartition(this IQueryable source,
Expression> keySelector,
IComparer comparer,
bool isDescending,
int partitionCount)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (partitionCount <= 0)
{
throw new ArgumentOutOfRangeException("partitionCount");
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(comparer, typeof(IComparer)),
Expression.Constant(isDescending, typeof(bool)),
Expression.Constant(partitionCount, typeof(int))}
));
}
///
/// Range partition a dataset using a specified list of keys.
///
/// The type of the records in the input dataset
/// The type of the key on which the partition is based
/// The input dataset to be partitioned
/// The function to extract the key from a record
/// The list of range keys
/// An IComparer on TKey to compare keys
/// true if the keys must be in descending order; otherwise false
/// An IQueryable partitioned according to a specified list of keys
public static IQueryable
RangePartition(this IQueryable source,
Expression> keySelector,
TKey[] rangeSeparators,
IComparer comparer,
bool isDescending)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (keySelector == null)
{
throw new ArgumentNullException("keySelector");
}
if (rangeSeparators == null)
throw new ArgumentNullException("rangeSeparators");
if (comparer == null && !TypeSystem.HasDefaultComparer(typeof(TKey)))
{
throw new DryadLinqException(DryadLinqErrorCode.ComparerMustBeSpecifiedOrKeyTypeMustBeIComparable,
string.Format(SR.ComparerMustBeSpecifiedOrKeyTypeMustBeIComparable, typeof(TKey)));
}
comparer = TypeSystem.GetComparer(comparer);
// check that the range-keys are consistent.
bool? detectedDescending;
bool keysAreConsistent = DryadLinqUtil.ComputeIsDescending(rangeSeparators, comparer, out detectedDescending);
// Note: detectedDescending==null implies that we couldn't precisely tell (single element, repeated elements, etc).
if (!keysAreConsistent)
{
throw new ArgumentException(SR.PartitionKeysAreNotConsistentlyOrdered, "rangeSeparators");
}
// and check that the actual direction of keys matches what the user said they wanted.
if (detectedDescending != null && detectedDescending != isDescending)
{
throw new ArgumentException(SR.IsDescendingIsInconsistent);
}
if (IsLocalDebugSource(source))
{
return source;
}
return source.Provider.CreateQuery(
Expression.Call(
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
new Expression[] { source.Expression,
Expression.Quote(keySelector),
Expression.Constant(rangeSeparators, typeof(TKey[])),
Expression.Constant(comparer, typeof(IComparer)),
Expression.Constant(isDescending, typeof(bool)) }
));
}
///
/// Compute applyFunc (source)
///
/// The type of the records of the input dataset
/// The type of the records of the output dataset
/// The input dataset
/// The function to be applied to the input dataset
/// The result of computing applyFunc(source)
public static IQueryable
Apply(this IQueryable source,
Expression, IEnumerable>> applyFunc)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (IsLocalDebugSource(source))
{
var q = applyFunc.Compile()(source).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, q);
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)),
new Expression[] { source.Expression,
Expression.Quote(applyFunc) }
));
}
///
/// Compute applyFunc(source1, source2)
///
/// The type of the records of the first input dataset
/// The type of the records of the second input dataset
/// he type of the records of the output dataset
/// The first input dataset
/// The second input dataset
/// The function to be applied to the input datasets
/// The result of computing applyFunc(source1, source2)
public static IQueryable
Apply(this IQueryable source1,
IQueryable source2,
Expression, IEnumerable, IEnumerable>> applyFunc)
{
if (source1 == null)
{
throw new ArgumentNullException("source1");
}
if (source2 == null)
{
throw new ArgumentNullException("source2");
}
if (IsLocalDebugSource(source1))
{
var q = applyFunc.Compile()(source1, source2).AsQueryable();
return new DryadLinqLocalQuery(source1.Provider, q);
}
return source1.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2), typeof(T3)),
new Expression[] { source1.Expression,
source2.Expression,
Expression.Quote(applyFunc) }
));
}
///
/// Compute applyFunc on multiple sources
///
/// The type of the records of input
/// The type of the records of output
/// The first input dataset
/// Other input datasets
/// The function to be applied to the input datasets
/// The result of computing applyFunc(source,pieces)
public static IQueryable
Apply(this IQueryable source,
IQueryable[] otherSources,
Expression[], IEnumerable>> applyFunc)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (IsLocalDebugSource(source))
{
IQueryable[] allSources = new IQueryable[otherSources.Length + 1];
allSources[0] = source;
for (int i = 0; i < otherSources.Length; ++i)
{
allSources[i + 1] = otherSources[i];
}
var q = applyFunc.Compile()(allSources).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, q);
}
Expression[] others = new Expression[otherSources.Length];
for (int i = 0; i < otherSources.Length; i++)
{
others[i] = otherSources[i].Expression;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)),
new Expression[] { source.Expression,
Expression.NewArrayInit(typeof(IQueryable), others),
Expression.Quote(applyFunc) }
));
}
///
/// Compute applyFunc on multiple sources
///
/// The type of the records of input
/// The type of the records of output
/// The first input dataset
/// Other input datasets
/// The function to be applied to the input datasets
/// The result of computing applyFunc(source,pieces)
public static IQueryable
Apply(this IQueryable source,
IQueryable[] otherSources,
Expression, IEnumerable[], IEnumerable>> applyFunc)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (IsLocalDebugSource(source))
{
var q = applyFunc.Compile()(source, otherSources).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, q);
}
Expression[] others = new Expression[otherSources.Length];
for (int i = 0; i < otherSources.Length; i++)
{
others[i] = otherSources[i].Expression;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)),
new Expression[] { source.Expression,
Expression.NewArrayInit(typeof(IQueryable), others),
Expression.Quote(applyFunc) }
));
}
///
/// Compute applyFunc(source)
///
/// The type of the records of the input dataset
/// The type of the records of the output dataset
/// The input dataset
/// The function to be applied to the input dataset
/// The result of computing applyFunc(source)
public static IQueryable
ApplyPerPartition(
this IQueryable source,
Expression, IEnumerable>> applyFunc)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (IsLocalDebugSource(source))
{
var q = applyFunc.Compile()(source).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, q);
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)),
new Expression[] { source.Expression,
Expression.Quote(applyFunc) }
));
}
///
/// Compute applyFunc(source1, source2)
///
/// The type of the records of the first input dataset
/// The type of the records of the second input dataset
/// he type of the records of the output dataset
/// The first input dataset
/// The second input dataset
/// The function to be applied to the input datasets
/// True if only distributive over the first dataset
/// The result of computing applyFunc(source1, source2)
public static IQueryable
ApplyPerPartition(
this IQueryable source1,
IQueryable source2,
Expression, IEnumerable, IEnumerable>> applyFunc,
bool isFirstOnly = false)
{
if (source1 == null)
{
throw new ArgumentNullException("source1");
}
if (source2 == null)
{
throw new ArgumentNullException("source2");
}
if (IsLocalDebugSource(source1))
{
var q = applyFunc.Compile()(source1, source2).AsQueryable();
return new DryadLinqLocalQuery(source1.Provider, q);
}
return source1.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2), typeof(T3)),
new Expression[] { source1.Expression,
source2.Expression,
Expression.Quote(applyFunc),
Expression.Constant(isFirstOnly) }
));
}
///
/// Compute applyFunc on multiple sources
///
/// The type of the records of input
/// The type of the records of output
/// The first input dataset
/// Other input datasets
/// The function to be applied to the input datasets
/// True if only distributive over the first dataset
/// The result of computing applyFunc(source, otherSources)
public static IQueryable
ApplyPerPartition(
this IQueryable source,
IQueryable[] otherSources,
Expression[], IEnumerable>> applyFunc,
bool isFirstOnly = false)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (IsLocalDebugSource(source))
{
IQueryable[] allSources = new IQueryable[otherSources.Length + 1];
allSources[0] = source;
for (int i = 0; i < otherSources.Length; ++i)
{
allSources[i + 1] = otherSources[i];
}
var q = applyFunc.Compile()(allSources).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, q);
}
Expression[] others = new Expression[otherSources.Length];
for (int i = 0; i < otherSources.Length; i++)
{
others[i] = otherSources[i].Expression;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)),
new Expression[] { source.Expression,
Expression.NewArrayInit(typeof(IQueryable), others),
Expression.Quote(applyFunc),
Expression.Constant(isFirstOnly) }
));
}
///
/// Compute applyFunc on multiple sources
///
/// The type of the records of input
/// The type of the records of output
/// The first input dataset
/// Other input datasets
/// The function to be applied to the input datasets
/// True if only distributive over the first dataset
/// The result of computing applyFunc(source,pieces)
public static IQueryable
ApplyPerPartition(
this IQueryable source,
IQueryable[] otherSources,
Expression, IEnumerable[], IEnumerable>> applyFunc,
bool isFirstOnly = false)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (IsLocalDebugSource(source))
{
var q = applyFunc.Compile()(source, otherSources).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, q);
}
Expression[] others = new Expression[otherSources.Length];
for (int i = 0; i < otherSources.Length; i++)
{
others[i] = otherSources[i].Expression;
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)),
new Expression[] { source.Expression,
Expression.NewArrayInit(typeof(IQueryable), others),
Expression.Quote(applyFunc),
Expression.Constant(isFirstOnly) }
));
}
private static IQueryable Dummy(IQueryable source)
{
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T)),
new Expression[] { source.Expression }
));
}
private static IQueryable DoWhile(IQueryable source,
IQueryable body,
IQueryable cond,
IQueryable bodySource,
IQueryable condSource1,
IQueryable condSource2)
{
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T)),
new Expression[] { source.Expression,
body.Expression,
cond.Expression,
bodySource.Expression,
condSource1.Expression,
condSource2.Expression }
));
}
///
/// Conditional DoWhile loop.
///
/// The type of the input records
/// The input dataset
/// The code body of the DoWhile loop
/// The termination condition of the DoWhile loop
/// The output dataset when the loop terminates
public static IQueryable
DoWhile(this IQueryable source,
Func, IQueryable> body,
Func, IQueryable, IQueryable> cond)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (IsLocalDebugSource(source))
{
var q = DryadLinqEnumerable.DoWhile(source, body, cond);
return new DryadLinqLocalQuery(source.Provider, q.AsQueryable());
}
DryadLinqContext context = DryadLinqContext.GetContext(source.Provider);
IQueryable before = source;
while (true)
{
IQueryable after = before;
after = body(after);
var more = cond(before, after);
DryadLinqQueryable.SubmitAndWait(after, more);
if (!more.Single()) return after;
before = after;
}
}
///
/// Apply a function on every sliding window on the input sequence of records.
///
/// The type of the input records
/// The type of the output records
/// The input dataset
/// The function to apply to every sliding window
/// The size of the window
/// An IQueryable formed by the results for each sliding window
public static IQueryable
SlidingWindow(this IQueryable source,
Expression, T2>> procFunc,
Int32 windowSize)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (windowSize < 2)
{
throw new DryadLinqException(SR.WindowSizeMustyBeGTOne);
}
if (IsLocalDebugSource(source))
{
var q = DryadLinqEnumerable.SlidingWindow(source, procFunc.Compile(), windowSize);
return new DryadLinqLocalQuery(source.Provider, q.AsQueryable());
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)),
new Expression[] { source.Expression,
Expression.Quote(procFunc),
Expression.Constant(windowSize, typeof(int))}
));
}
///
/// Computes a user-defined function on each partition of the input. The function takes a
/// partition and its partition index as arguments.
///
/// The type of the input records
/// The type of the output records
/// The input dataset
/// The function to apply to each partition
/// An IQueryable formed by concatenating the results of applying the function
/// to each partition
public static IQueryable
ApplyWithPartitionIndex(this IQueryable source,
Expression, int, IEnumerable>> procFunc)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (IsLocalDebugSource(source))
{
var q = procFunc.Compile()(source, 0);
return new DryadLinqLocalQuery(source.Provider, q.AsQueryable());
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(T1), typeof(T2)),
new Expression[] { source.Expression,
Expression.Quote(procFunc) }
));
}
///
/// Same as , but returns an <>
/// containing a single element.
///
/// The type of the elements of source.
/// The input sequence
/// true iff the input sequence contains at least one element
public static IQueryable AnyAsQuery(this IQueryable source)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (IsLocalDebugSource(source))
{
var q = DryadLinqEnumerable.AnyAsQuery(source).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, q);
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
new Expression[] { source.Expression }
));
}
///
/// Same as , but returns an <>
/// containing a single element.
///
/// The type of the elements of source.
/// The input sequence
/// A predicate to test each element for a condition.
/// true iff the input sequence contains at least one element that satisfies
/// the predicate
public static IQueryable AnyAsQuery(this IQueryable source,
Expression> predicate)
{
if (source == null)
{
throw new ArgumentNullException("source");
}
if (predicate == null)
{
throw new ArgumentNullException("predicate");
}
if (IsLocalDebugSource(source))
{
var q = DryadLinqEnumerable.AnyAsQuery(source, predicate.Compile()).AsQueryable();
return new DryadLinqLocalQuery(source.Provider, q);
}
return source.Provider.CreateQuery(
Expression.Call(
null,
((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
new Expression[] { source.Expression, Expression.Quote(predicate) }
));
}
///
/// Same as , but returns an <>
/// containing a single element.
///
/// The type of the elements of source.
/// The input sequence
/// A predicate to test each element for a condition
/// true iff every element in the input sequence satisfies the predicate
public static IQueryable AllAsQuery(this IQueryable