/*
Copyright (c) Microsoft Corporation
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License
at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF
TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions and
limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Text;
using System.IO;
using System.Reflection;
using System.Linq;
using System.Linq.Expressions;
using System.Data.Linq;
using System.Xml;
using System.Data.Linq.Mapping;
using System.Diagnostics;
using System.Threading;
using System.Data;
using System.Data.SqlClient;
using System.Collections.ObjectModel;
using System.Collections.Concurrent;
using System.Threading.Tasks;
using Microsoft.Research.DryadLinq;
#pragma warning disable 1591
namespace Microsoft.Research.DryadLinq.Internal
{
///
/// This class provides the generic vertex runtime for the query operators
/// supported by DryadLINQ. The auto-generated vertex code uses the methods
/// in this class extensively.
///
/// A DryadLINQ user should not need to use DryadLinqVertex directly.
public static class DryadLinqVertex
{
public static bool s_multiThreading = true; //vertex code will set this at runtime.
internal static IParallelPipeline
ExtendParallelPipeline(this IEnumerable source,
Func, IEnumerable> func,
bool orderPreserving)
{
IParallelPipeline pipe = source as IParallelPipeline;
IParallelPipeline result;
if (pipe == null)
{
result = new ParallelApply(source, func, orderPreserving);
}
else
{
result = pipe.Extend(func, orderPreserving);
}
return result;
}
// Operator: Where
public static IEnumerable Where(IEnumerable source,
Func predicate,
bool orderPreserving)
{
if (s_multiThreading)
{
return source.ExtendParallelPipeline(s => s.Where(predicate), orderPreserving);
}
else
{
return System.Linq.Enumerable.Where(source, predicate);
}
}
public static IEnumerable Where(IEnumerable source,
Func predicate,
bool orderPreserving)
{
return System.Linq.Enumerable.Where(source, predicate);
}
public static IEnumerable LongWhere(IEnumerable source,
Func predicate,
bool orderPreserving)
{
return DryadLinqEnumerable.LongWhere(source, predicate);
}
// Operator: Select
public static IEnumerable
Select(IEnumerable source,
Func selector,
bool orderPreserving)
{
if (s_multiThreading)
{
return source.ExtendParallelPipeline(s => s.Select(selector), orderPreserving);
}
else
{
return System.Linq.Enumerable.Select(source, selector);
}
}
public static IEnumerable
Select(IEnumerable source,
Func selector,
bool orderPreserving)
{
return System.Linq.Enumerable.Select(source, selector);
}
public static IEnumerable
LongSelect(IEnumerable source,
Func selector,
bool orderPreserving)
{
return DryadLinqEnumerable.LongSelect(source, selector);
}
// Operator: SelectMany
public static IEnumerable
SelectMany(IEnumerable source,
Func> selector,
bool orderPreserving)
{
if (s_multiThreading)
{
return source.ExtendParallelPipeline(s => s.SelectMany(selector), orderPreserving);
}
else
{
return System.Linq.Enumerable.SelectMany(source, selector);
}
}
public static IEnumerable
SelectMany(IEnumerable source,
Func> selector,
bool orderPreserving)
{
return System.Linq.Enumerable.SelectMany(source, selector);
}
public static IEnumerable
SelectMany(IEnumerable source,
Func> collectionSelector,
Func resultSelector,
bool orderPreserving)
{
if (s_multiThreading)
{
return source.ExtendParallelPipeline(s => s.SelectMany(collectionSelector, resultSelector), orderPreserving);
}
else
{
return System.Linq.Enumerable.SelectMany(source, collectionSelector, resultSelector);
}
}
public static IEnumerable
SelectMany(IEnumerable source,
Func> collectionSelector,
Func resultSelector,
bool orderPreserving)
{
return System.Linq.Enumerable.SelectMany(source, collectionSelector, resultSelector);
}
public static IEnumerable
LongSelectMany(IEnumerable source,
Func> collectionSelector,
bool orderPreserving)
{
return DryadLinqEnumerable.LongSelectMany(source, collectionSelector);
}
public static IEnumerable
LongSelectMany(IEnumerable source,
Func> collectionSelector,
Func resultSelector,
bool orderPreserving)
{
return DryadLinqEnumerable.LongSelectMany(source, collectionSelector, resultSelector);
}
// Operator: Zip
private static IEnumerable> ZipToPairs(IEnumerable s1,
IEnumerable s2)
{
IEnumerator elems1 = s1.GetEnumerator();
IEnumerator elems2 = s2.GetEnumerator();
while (elems1.MoveNext() && elems2.MoveNext())
{
yield return new Pair(elems1.Current, elems2.Current);
}
}
public static IEnumerable Zip(IEnumerable s1,
IEnumerable s2,
Func zipper,
bool orderPreserving)
{
var pairs = ZipToPairs(s1, s2);
return Select(pairs, x => zipper(x.Key, x.Value), orderPreserving);
}
// Operator: Take
public static IEnumerable
Take(IEnumerable source, int count)
{
return source.Take(count);
}
// Operator: TakeWhile
public static IEnumerable
TakeWhile(IEnumerable, bool>> source)
{
foreach (Pair, bool> group in source)
{
foreach (TSource elem in group.Key)
{
yield return elem;
}
if (!group.Value) yield break;
}
}
// Operator: Skip
public static IEnumerable
Skip(IEnumerable source, int count)
{
return source.Skip(count);
}
// Operator: SkipWhile
public static IEnumerable
SkipWhile(IEnumerable source, Func predicate)
{
return source.SkipWhile(predicate);
}
public static IEnumerable
SkipWhile(IEnumerable source, Func predicate)
{
return source.SkipWhile(predicate);
}
public static IEnumerable
LongSkipWhile(IEnumerable source,
Func predicate)
{
long index = -1;
bool yielding = false;
using (IEnumerator sourceEnum = source.GetEnumerator())
{
while (sourceEnum.MoveNext())
{
checked { index++; }
if (!predicate(sourceEnum.Current, index))
{
yielding = true;
break;
}
}
if (yielding)
{
do
{
yield return sourceEnum.Current;
}
while (sourceEnum.MoveNext());
}
}
}
// Operator: OrderBy
public static IEnumerable
Sort(IEnumerable source,
Func keySelector,
IComparer comparer,
bool isDescending,
bool isIdKeySelector,
DryadLinqFactory factory)
{
if (s_multiThreading)
{
return new ParallelSort(
source, keySelector, comparer, isDescending, isIdKeySelector, factory);
}
else
{
if (isDescending)
{
return Enumerable.OrderByDescending(source, keySelector, comparer);
}
else
{
return Enumerable.OrderBy(source, keySelector, comparer);
}
}
}
public static IEnumerable
MergeSort(this IEnumerable source,
Func keySelector,
IComparer comparer,
bool isDescending)
{
IMultiEnumerable msource = source as IMultiEnumerable;
if (msource == null)
{
throw new DryadLinqException(DryadLinqErrorCode.SourceOfMergesortMustBeMultiEnumerable,
SR.SourceOfMergesortMustBeMultiEnumerable);
}
if (msource.NumberOfInputs == 1)
{
return source;
}
if (s_multiThreading)
{
return new ParallelMergeSort(msource, keySelector, comparer, isDescending);
}
else
{
return SequentialMergeSort(msource, keySelector, comparer, isDescending);
}
}
private static IEnumerable
SequentialMergeSort(IMultiEnumerable source,
Func keySelector,
IComparer comparer,
bool isDescending)
{
comparer = TypeSystem.GetComparer(comparer);
// Initialize
IEnumerator[] readers = new IEnumerator[source.NumberOfInputs];
for (int i = 0; i < readers.Length; i++)
{
readers[i] = source[i].GetEnumerator();
}
DryadLinqLog.AddInfo("Sequential MergeSort started reading at {0}",
DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.fff"));
TSource[] elems = new TSource[readers.Length];
TKey[] keys = new TKey[readers.Length];
int lastIdx = readers.Length - 1;
int readerCnt = 0;
while (readerCnt <= lastIdx)
{
elems[readerCnt] = default(TSource);
if (readers[readerCnt].MoveNext())
{
elems[readerCnt] = readers[readerCnt].Current;
keys[readerCnt] = keySelector(elems[readerCnt]);
readerCnt++;
}
else
{
readers[readerCnt].Dispose();
if (readerCnt == lastIdx) break;
readers[readerCnt] = readers[lastIdx];
lastIdx--;
}
}
// Merge sort
while (readerCnt > 0)
{
TKey key = keys[0];
int idx = 0;
for (int i = 1; i < readerCnt; i++)
{
int cmp = comparer.Compare(key, keys[i]);
int cmpRes = (isDescending) ? -cmp : cmp;
if (cmpRes > 0)
{
key = keys[i];
idx = i;
}
}
yield return elems[idx];
if (readers[idx].MoveNext())
{
elems[idx] = readers[idx].Current;
keys[idx] = keySelector(elems[idx]);
}
else
{
readers[idx].Dispose();
readerCnt--;
if (idx < readerCnt)
{
readers[idx] = readers[readerCnt];
elems[idx] = elems[readerCnt];
keys[idx] = keys[readerCnt];
}
}
}
DryadLinqLog.AddInfo("Sequential MergeSort ended reading at {0}",
DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.fff"));
}
// Operator: ThenBy
public static IEnumerable
ThenBy(IEnumerable source,
Func keySelector,
IComparer comparer,
bool isDescending)
{
throw new DryadLinqException(DryadLinqErrorCode.ThenByNotSupported, SR.ThenByNotSupported);
}
// Operator: GroupBy
public static IEnumerable>
GroupBy(
IEnumerable source,
Func keySelector,
Func seed,
Func accumulator,
IEqualityComparer comparer,
bool isPartial)
{
return GroupBy(source, keySelector, x => x, seed, accumulator, comparer, isPartial);
}
public static IEnumerable>
GroupBy(
IEnumerable source,
Func keySelector,
Func elementSelector,
Func seed,
Func accumulator,
IEqualityComparer comparer,
bool isPartial)
{
if (s_multiThreading)
{
if (isPartial)
{
if (source is IParallelApply)
{
IParallelApply parSource = (IParallelApply)source;
return parSource.ExtendGroupBy(keySelector, elementSelector, seed, accumulator, comparer);
}
else
{
return new ParallelHashGroupByPartialAccumulate(
source, null, keySelector, elementSelector, seed, accumulator, comparer);
}
}
else
{
return new ParallelHashGroupByFullAccumulate>(
source, keySelector, elementSelector, seed, accumulator, comparer, null);
}
}
else
{
return SequentialHashGroupBy(source, keySelector, elementSelector, seed, accumulator, comparer);
}
}
private static IEnumerable>
SequentialHashGroupBy(
IEnumerable source,
Func keySelector,
Func elementSelector,
Func seed,
Func accumulator,
IEqualityComparer comparer)
{
DryadLinqLog.AddInfo("Sequential HashGroupBy (Acc) started reading at {0}",
DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.fff"));
AccumulateDictionary
groups = new AccumulateDictionary(comparer, 16411, seed, accumulator);
foreach (TSource item in source)
{
groups.Add(keySelector(item), elementSelector(item));
}
DryadLinqLog.AddInfo("Sequential HashGroupBy (Acc) ended reading at {0}",
DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.fff"));
return groups;
}
public static IEnumerable>
GroupBy(IEnumerable source,
Func keySelector,
IEqualityComparer comparer)
{
if (comparer == null)
{
comparer = EqualityComparer.Default;
}
GroupingHashSet groupings = new GroupingHashSet(comparer, 16411);
foreach (TSource item in source)
{
groupings.AddItem(keySelector(item), item);
}
return groupings;
}
public static IEnumerable
GroupBy(IEnumerable source,
Func keySelector,
Func, TResult> resultSelector,
IEqualityComparer comparer)
{
var groupings = GroupBy(source, keySelector, comparer);
if (s_multiThreading)
{
return new ParallelApply, TResult>(
groupings, s => s.Select(g => resultSelector(g.Key, g)), false);
}
else
{
return Apply(groupings, s => s.Select(g => resultSelector(g.Key, g)));
}
}
public static IEnumerable>
GroupBy(IEnumerable source,
Func keySelector,
Func elementSelector,
IEqualityComparer comparer)
{
if (comparer == null)
{
comparer = EqualityComparer.Default;
}
GroupingHashSet groupings = new GroupingHashSet(comparer, 16411);
foreach (TSource item in source)
{
groupings.AddItem(keySelector(item), elementSelector(item));
}
return groupings;
}
public static IEnumerable
GroupBy(IEnumerable source,
Func keySelector,
Func elementSelector,
Func, TResult> resultSelector,
IEqualityComparer comparer)
{
var groupings = GroupBy(source, keySelector, elementSelector, comparer);
if (s_multiThreading)
{
return new ParallelApply, TResult>(
groupings, s => s.Select(g => resultSelector(g.Key, g)), false);
}
else
{
return Apply(groupings, s => s.Select(g => resultSelector(g.Key, g)));
}
}
// Operator: OrderedGroupBy
public static IEnumerable>
OrderedGroupBy(IEnumerable source,
Func keySelector,
Func seed,
Func accumulator,
IEqualityComparer comparer,
bool isPartial)
{
return OrderedGroupBy(source, keySelector, x => x, seed, accumulator, comparer, isPartial);
}
public static IEnumerable>
OrderedGroupBy(IEnumerable source,
Func keySelector,
Func elementSelector,
Func seed,
Func accumulator,
IEqualityComparer comparer,
bool isPartial)
{
if (s_multiThreading)
{
return new ParallelOrderedGroupByAccumulate>(
source, keySelector, elementSelector, seed, accumulator, comparer, null);
}
else
{
return SequentialOrderedGroupBy(source, keySelector, elementSelector, seed, accumulator, comparer);
}
}
private static IEnumerable>
SequentialOrderedGroupBy(
IEnumerable source,
Func keySelector,
Func elementSelector,
Func seed,
Func accumulator,
IEqualityComparer comparer)
{
DryadLinqLog.AddInfo("Sequential OrderedGroupBy (Acc) started reading at {0}",
DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.fff"));
using (IEnumerator elems = source.GetEnumerator())
{
if (elems.MoveNext())
{
TKey curKey = keySelector(elems.Current);
TResult curValue = seed(elementSelector(elems.Current));
while (elems.MoveNext())
{
if (comparer.Equals(curKey, keySelector(elems.Current)))
{
curValue = accumulator(curValue, elementSelector(elems.Current));
}
else
{
yield return new Pair(curKey, curValue);
curKey = keySelector(elems.Current);
curValue = seed(elementSelector(elems.Current));
}
}
yield return new Pair(curKey, curValue);
}
}
DryadLinqLog.AddInfo("Sequential OrderedGroupBy (Acc) ended reading at {0}",
DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.fff"));
}
public static IEnumerable>
OrderedGroupBy(IEnumerable source,
Func keySelector,
IEqualityComparer comparer)
{
if (comparer == null)
{
comparer = EqualityComparer.Default;
}
DryadLinqLog.AddInfo("Sequential OrderedGroupBy started reading at {0}",
DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.fff"));
using (IEnumerator elems = source.GetEnumerator())
{
Grouping curGroup;
if (elems.MoveNext())
{
curGroup = new Grouping(keySelector(elems.Current));
curGroup.AddItem(elems.Current);
while (elems.MoveNext())
{
if (!comparer.Equals(curGroup.Key, keySelector(elems.Current)))
{
yield return curGroup;
curGroup = new Grouping(keySelector(elems.Current));
}
curGroup.AddItem(elems.Current);
}
yield return curGroup;
}
}
DryadLinqLog.AddInfo("Sequential OrderedGroupBy ended reading at {0}",
DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.fff"));
}
public static IEnumerable>
OrderedGroupBy(IEnumerable source,
Func keySelector,
Func elementSelector,
IEqualityComparer comparer)
{
if (comparer == null)
{
comparer = EqualityComparer.Default;
}
DryadLinqLog.AddInfo("Sequential OrderedGroupBy started reading at {0}",
DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.fff"));
using (IEnumerator elems = source.GetEnumerator())
{
Grouping curGroup;
if (elems.MoveNext())
{
curGroup = new Grouping(keySelector(elems.Current));
curGroup.AddItem(elementSelector(elems.Current));
while (elems.MoveNext())
{
if (!comparer.Equals(curGroup.Key, keySelector(elems.Current)))
{
yield return curGroup;
curGroup = new Grouping(keySelector(elems.Current));
}
curGroup.AddItem(elementSelector(elems.Current));
}
yield return curGroup;
}
}
DryadLinqLog.AddInfo("Sequential OrderedGroupBy ended reading at {0}",
DateTime.Now.ToString("MM/dd/yyyy HH:mm:ss.fff"));
}
public static IEnumerable
OrderedGroupBy(IEnumerable source,
Func keySelector,
Func, TResult> resultSelector,
IEqualityComparer comparer)
{
var groupings = OrderedGroupBy(source, keySelector, comparer);
if (s_multiThreading)
{
return new ParallelApply, TResult>(
groupings, s => s.Select(g => resultSelector(g.Key, g)), true);
}
else
{
return Apply(groupings, s => s.Select(g => resultSelector(g.Key, g)));
}
}
public static IEnumerable
OrderedGroupBy(
IEnumerable source,
Func keySelector,
Func elementSelector,
Func, TResult> resultSelector,
IEqualityComparer comparer)
{
var groupings = OrderedGroupBy(source, keySelector, elementSelector, comparer);
if (s_multiThreading)
{
return new ParallelApply, TResult>(
groupings, s => s.Select(g => resultSelector(g.Key, g)), true);
}
else
{
return Apply(groupings, s => s.Select(g => resultSelector(g.Key, g)));
}
}
// Operator: Join
internal static IEnumerable
SequentialHashJoin(
IEnumerable outer,
IEnumerable inner,
Func outerKeySelector,
Func innerKeySelector,
Func