using Microsoft.Research.DryadLinq; using Microsoft.Research.Peloponnese.Storage; using System; using System.Collections.Generic; using System.Linq; using System.Linq.Expressions; namespace DryadLinqTests { public static class GroupByReduceTests { public static bool Decomposition_Average() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets()); IQueryable pt1 = simple.Select(x => x.First()); double[] aggregates = pt1.GroupBy(x => x % 2).Select(g => g.Average()).ToArray(); //int[] expected = new[] { 1 + 3 + 5 + 7 + 9 + 11, 2 + 4 + 6 + 8 + 10 + 12 }; ////note the order of the result elements is not guaranteed, so order them before testing //int[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray(); //int[] expectedOrdered = expected.OrderBy(x => x).ToArray(); //passed &= aggregatesOrdered.SequenceEqual(expectedOrdered); } catch (DryadLinqException) { passed &= false; } return passed; } public static bool DistributiveResultSelector_1() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets()); IQueryable pt1 = simple.Select(x => x.First()); // this result selector satisfies "DistributiveOverConcat" int[] aggregates = pt1.GroupBy(x => x % 2, (key, seq) => seq.Sum()).ToArray(); int[] expected = new[] { 1 + 3 + 5 + 7 + 9 + 11, 2 + 4 + 6 + 8 + 10 + 12 }; //note the order of the result elements is not guaranteed, so order them before testing int[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray(); int[] expectedOrdered = expected.OrderBy(x => x).ToArray(); passed &= aggregatesOrdered.SequenceEqual(expectedOrdered); } catch (DryadLinqException) { passed &= false; } return passed; } public static bool DistributiveSelect_1() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets()); IQueryable pt1 = simple.Select(x => x.First()); // this result selector satisfies "DistributiveOverConcat" int[] aggregates = pt1.GroupBy(x => x % 2).Select(group => group.Sum()).ToArray(); int[] expected = new[] { 1 + 3 + 5 + 7 + 9 + 11, 2 + 4 + 6 + 8 + 10 + 12 }; //note the order of the result elements is not guaranteed, so order them before testing int[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray(); int[] expectedOrdered = expected.OrderBy(x => x).ToArray(); passed &= aggregatesOrdered.SequenceEqual(expectedOrdered); } catch (DryadLinqException) { passed &= false; } return passed; } public static bool BuiltInCountIsDistributable() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets()); IQueryable pt1 = simple.Select(x => x.First()); // Built in Count is Distributable as built-in logic knows to use Sum() as the combiner function. // Count(a,b,c,d) = Sum(Count(a,b), Count(c,d)) int[] aggregates = pt1.GroupBy(x => x % 2, (key, seq) => seq.Count()).ToArray(); int[] expected = new[] { 6, 6 }; // six elements in each full group. //note the order of the result elements is not guaranteed, so order them before testing int[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray(); int[] expectedOrdered = expected.OrderBy(x => x).ToArray(); passed &= aggregatesOrdered.SequenceEqual(expectedOrdered); } catch (DryadLinqException) { passed &= false; } return passed; } public static bool Bug12078_GroupByReduceWithResultSelectingAggregate() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable data = simple.Select(x => x.First()); double[] aggregates = data .Select(x => (double)x) .GroupBy(x => 0, (key, seq) => seq.Aggregate((double)0, (acc, item) => acc + item, val => val / 100)).ToArray(); double[] expected = new[] { Enumerable.Range(1, 200).Sum() / 100.0 }; //note the order of the result elements is not guaranteed, so order them before testing double[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray(); double[] expectedOrdered = expected.OrderBy(x => x).ToArray(); passed &= aggregatesOrdered.SequenceEqual(expectedOrdered); } catch (DryadLinqException) { passed &= false; } return passed; } #region GroupByReduceWithCustomDecomposableFunction_DistributableCombiner [Decomposable(typeof(Decomposer_1))] public static double DecomposableFunc(IEnumerable seq) { // hard to test with context system.. TestUtils.Assert(HpcLinq.LocalDebug, "This method should only be called during LocalDebug"); return seq.Aggregate((double)0, (acc, item) => acc + item, val => val / 100); } public class Decomposer_1 : IDecomposable { public void Initialize(object state) { } public double Seed(double source) { return source; } public double Accumulate(double a, double x) { return a + x; } public double RecursiveAccumulate(double a, double x) { return a + x; } public double FinalReduce(double a) { return a / 100; } } public static bool GroupByReduceWithCustomDecomposableFunction_DistributableCombiner() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable data = simple.Select(x => x.First()); double[] aggregates = data .Select(x => (double)x) .GroupBy(x => 0, (k, g) => DecomposableFunc(g)) .ToArray(); double[] expected = new[] { Enumerable.Range(1, 200).Sum() / 100.0 }; //note the order of the result elements is not guaranteed, so order them before testing double[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray(); double[] expectedOrdered = expected.OrderBy(x => x).ToArray(); passed &= aggregatesOrdered.SequenceEqual(expectedOrdered); } catch (DryadLinqException) { passed &= false; } return passed; } #endregion GroupByReduceWithCustomDecomposableFunction_DistributableCombiner #region GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_DifferingTypes // Tests a fully decomposed function whose reducer changes types. [Decomposable(typeof(Decomposer_2))] public static string DecomposableFunc2(IEnumerable seq) { //TestUtils.Assert(HpcLinq.LocalDebug, "This method should only be called during LocalDebug"); return seq.Aggregate((double)0, (acc, item) => acc + item, val => ("hello:" + val.ToString())); } public class Decomposer_2 : IDecomposable { public void Initialize(object state) { } public double Seed(double source) { return source; } public double Accumulate(double a, double x) { return a + x; } public double RecursiveAccumulate(double a, double x) { return a + x; } public string FinalReduce(double a) { return ("hello:" + a.ToString()); } } public static bool GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_DifferingTypes() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable data = simple.Select(x => x.First()); string[] aggregates = data .Select(x => (double)x) .GroupBy(x => 0, (key, seq) => DecomposableFunc2(seq)).ToArray(); string[] expected = new[] { "hello:" + Enumerable.Range(1, 200).Sum() }; //note the order of the result elements is not guaranteed, so order them before testing string[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray(); string[] expectedOrdered = expected.OrderBy(x => x).ToArray(); passed &= aggregatesOrdered.SequenceEqual(expectedOrdered); } catch (DryadLinqException) { passed &= false; } return passed; } #endregion GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_DifferingTypes #region GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_NoFinalizer // Tests a decomposed function with no need for a particular reduce. // The combiner changes type, and the recursive-combiner operators on the altered type // The reducer just calls combiner again. [Decomposable(typeof(Decomposer_3))] public static string DecomposableFunc3(IEnumerable seq) { // TestUtils.Assert(HpcLinq.LocalDebug, "This method should only be called during LocalDebug"); return seq.Aggregate("0", (acc, item) => (double.Parse(acc) + item).ToString()); } public class Decomposer_3 : IDecomposable { public void Initialize(object state) { } public string Seed(double source) { return source.ToString(); } public string Accumulate(string a, double x) { return (double.Parse(a) + x).ToString(); } public string RecursiveAccumulate(string a, string x) { return (double.Parse(a) + double.Parse(x)).ToString(); } public string FinalReduce(string a) { return a; } } public static bool GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_NoFinalizer() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable data = simple.Select(x => x.First()); string[] aggregates = data .Select(x => (double)x) .GroupBy(x => 0, (key, seq) => DecomposableFunc3(seq)).ToArray(); string[] expected = new[] { Enumerable.Range(1, 200).Sum().ToString() }; //note the order of the result elements is not guaranteed, so order them before testing string[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray(); string[] expectedOrdered = expected.OrderBy(x => x).ToArray(); passed &= aggregatesOrdered.SequenceEqual(expectedOrdered); } catch (DryadLinqException) { passed &= false; } return passed; } #endregion GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_NoFinalizer #region GroupByReduceWithCustomDecomposableFunction_NonDistributableCombiner // Tests simplified pattern where the Combiner is not recursively applied. // Note: Func4 can be represented as a decomposable with distributive-combiner and a finalizer.. but here we choose not to. // Because of the form of the Combiner, it is critical that it not be used recursively. [Decomposable(typeof(Decomposer_4))] public static double DecomposableFunc4(IEnumerable seq) { // TestUtils.Assert(HpcLinq.LocalDebug, "This method should only be called during LocalDebug"); return seq.Aggregate(0.0, (acc, item) => acc + item, acc => acc / 100); } public class Decomposer_4 : IDecomposable { public void Initialize(object state) { } public double Seed(double source) { return source; } public double Accumulate(double a, double x) { return a + x; } public double RecursiveAccumulate(double a, double x) { return a + x; } public double FinalReduce(double a) { return a / 100; } } public static bool GroupByReduceWithCustomDecomposableFunction_NonDistributableCombiner() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable data = simple.Select(x => x.First()); double[] aggregates = data .Select(x => (double)x) .GroupBy(x => 0, (key, seq) => DecomposableFunc4(seq)).ToArray(); double[] expected = new[] { Enumerable.Range(1, 200).Sum() / 100.0 }; //note the order of the result elements is not guaranteed, so order them before testing double[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray(); double[] expectedOrdered = expected.OrderBy(x => x).ToArray(); passed &= aggregatesOrdered.SequenceEqual(expectedOrdered); } catch (DryadLinqException) { passed &= false; } return passed; } #endregion GroupByReduceWithCustomDecomposableFunction_NonDistributableCombiner public static bool GroupByReduce_BuiltIn_First() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable data = simple.Select(x => x.First()); int[] aggregates = data .GroupBy(x => 0, (key, seq) => seq.First()) .ToArray(); // the output of First can be the first item of either partition. passed &= aggregates.SequenceEqual(new[] { 1 }) || aggregates.SequenceEqual(new[] { 101 }); } catch (DryadLinqException) { passed &= false; } return passed; } public static bool GroupByReduce_ResultSelector_ComplexNewExpression() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable data = simple.Select(x => x.First()); var aggregates = data.GroupBy(x => 0, (key, seq) => new KeyValuePair>(key, new KeyValuePair(seq.Average(), seq.Average()))).ToArray(); var expected = new KeyValuePair>[] { new KeyValuePair>(0, new KeyValuePair(100.5, 100.5)) }; passed &= aggregates.SequenceEqual(expected); } catch (DryadLinqException) { passed &= false; } return passed; } #region GroupByReduce_ProgrammingManualExample public static bool GroupByReduce_ProgrammingManualExample() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { string filesetName = "DevUnitTest/0to999integers"; Utils.DeleteFile(Config.accountName, Config.storageKey, Config.containerName, filesetName, true); IEnumerable> rawdata = new[] { Enumerable.Range(0, 334), Enumerable.Range(334, 333), Enumerable.Range(667, 333) }; // ??? DscIngressHelpers.AsDryadQueryPartitions(context, rawdata, filesetName, DscCompressionScheme.None); var data = context.FromStore(filesetName); var count = data.AsEnumerable().Count(); var sum = data.AsEnumerable().Sum(); var min = data.AsEnumerable().Min(); var max = data.AsEnumerable().Max(); var uniques = data.AsEnumerable().Distinct().Count(); //Console.WriteLine("DATA:: count:{0} uniques:{1} sum:{2}, min:{3}, max:{4}", count, uniques, sum, min, max); // ??? //var results = data // .GroupBy(x => x % 10, (key, seq) => new KeyValuePair(key, seq.MyAverage())) // .OrderBy(y => y.Key) // .ToArray(); ////foreach (var result in results) //// Console.WriteLine("For group {0} the average is {1}", result.Key, result.Value); //passed &= (results.Count() == 10); //passed &= (results[0].Key == 0); // "first element should be key=0"); //passed &= (results[0].Value == 495); // "first element should be value=495 ie avg(0,10,20,..,990)"); } catch (DryadLinqException) { passed &= false; } return passed; } [Decomposable(typeof(Decomposer_5))] public static double MyAverage(this IEnumerable recordSequence) { int count = 0, sum = 0; foreach (var r in recordSequence) { sum += r; count++; } if (count == 0) throw new Exception("Can't average empty sequence"); return (double)sum / (double)count; } [Serializable] public struct Partial { public int PartialSum; public int PartialCount; } public class Decomposer_5 : IDecomposable { public void Initialize(object state) { } public Partial Seed(int x) { Partial p = new Partial(); p.PartialSum = x; p.PartialCount = 1; return p; } public Partial Accumulate(Partial a, int x) { Partial p = new Partial(); p.PartialSum = a.PartialSum + x; p.PartialCount = a.PartialCount + 1; return p; } public Partial RecursiveAccumulate(Partial a, Partial x) { Partial p = new Partial(); p.PartialSum = a.PartialSum + x.PartialSum; p.PartialCount = a.PartialCount + x.PartialCount; return p; } public double FinalReduce(Partial a) { if (a.PartialCount == 0) throw new Exception("Can't average empty sequence"); return (double)a.PartialSum / (double)a.PartialCount; } } #endregion GroupByReduce_ProgrammingManualExample #region GroupByReduce_SameDecomposableUsedTwice public static bool GroupByReduce_SameDecomposableUsedTwice() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable pt1 = simple.Select(x => x.First()); var results = pt1.GroupBy(x => x % 2, (k, g) => MyFunc(k, DecomposableFunc5(g), DecomposableFunc5(g), g.Average())).ToArray(); //key0: count = 6, av = av(2,4,6,8,10,12) = 7 //key1: count = 6, av = av(1,3,5,7,9,11) = 6 //local sort.. so that keys are in order. var results_sorted = results.OrderBy(x => x.Key).ToArray(); passed &= (results_sorted.Length == 2); // "wrong results" passed &= (results_sorted[0].Key == 0); // "wrong results" passed &= (results_sorted[0].A == 6); // "wrong results" passed &= (results_sorted[0].B == 6); // "wrong results" passed &= (results_sorted[0].Av == 7.0); // "wrong results" passed &= (results_sorted[1].Key == 1); // "wrong results" passed &= (results_sorted[1].A == 6); // "wrong results" passed &= (results_sorted[1].B == 6); // "wrong results" passed &= (results_sorted[1].Av == 6.0); // "wrong results" } catch (DryadLinqException) { passed &= false; } return passed; } public static MyStruct3 MyFunc(int key, int a, int b, double av) { return new MyStruct3(key, a, b, av); } [Decomposable(typeof(Decomposer_6))] private static int DecomposableFunc5(IEnumerable g) { return g.Count(); } public class Decomposer_6 : IDecomposable { public void Initialize(object state) { } public int Seed(int source) { return 1; } public int Accumulate(int a, int x) { return a + 1; } public int RecursiveAccumulate(int a, int x) { return a + x; } public int FinalReduce(int a) { return a; } } [Serializable] public struct MyStruct3 { public int Key; public int A; public int B; public double Av; public MyStruct3(int key, int a, int b, double av) { Key = key; A = a; B = b; Av = av; } } #endregion GroupByReduce_SameDecomposableUsedTwice #region API_Misuse internal static bool GroupByReduce_APIMisuse() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { if (context.LocalDebug) { // "decomposition logic doesn't run in LocalDebug.. skipping"; return true; } IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable pt1 = simple.Select(x => x.First()); // internal-visibility decomposable type should fail. try { pt1.GroupBy(x => x, (k, g) => BadDecomposable1(g)).ToArray(); passed &= false; // "exception should be thrown" } catch (DryadLinqException) { //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypeMustBePublic); // "error code is wrong" } // decomposable type doesn't implement IDecomposable or IDecomposableRecursive try { pt1.GroupBy(x => x, (k, g) => BadDecomposable2(g)).ToArray(); passed &= false; //"exception should be thrown"); } catch (DryadLinqException) { //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypeDoesNotImplementInterface); } // decomposable type implements more than one IDecomposable or IDecomposableRecursive try { pt1.GroupBy(x => x, (k, g) => BadDecomposable3(g)).ToArray(); passed &= false; } catch (DryadLinqException) { //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypeImplementsTooManyInterfaces); } // decomposable type doesn't have public default ctor try { pt1.GroupBy(x => x, (k, g) => BadDecomposable4(g)).ToArray(); passed &= false; } catch (DryadLinqException) { //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypeDoesNotHavePublicDefaultCtor); } // decomposable type input type doesn't match try { pt1.GroupBy(x => x, (k, g) => BadDecomposable5(g)).ToArray(); passed &= false; } catch (DryadLinqException) { //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypesDoNotMatch); } // decomposable type output type doesn't match try { pt1.GroupBy(x => x, (k, g) => BadDecomposable6(g)).ToArray(); passed &= false; } catch (DryadLinqException) { //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypesDoNotMatch); } } catch (DryadLinqException) { passed &= false; } return passed; } [Decomposable(typeof(BadDecomposerType1))] private static int BadDecomposable1(IEnumerable g) { throw new NotImplementedException(); } internal class BadDecomposerType1 : IDecomposable { public void Initialize(object state) { } public int Seed(int x) { return x; } public int Accumulate(int a, int x) { throw new NotImplementedException(); } public int RecursiveAccumulate(int a, int x) { throw new NotImplementedException(); } public int FinalReduce(int a) { throw new NotImplementedException(); } } [Decomposable(typeof(BadDecomposerType2))] private static int BadDecomposable2(IEnumerable g) { throw new NotImplementedException(); } public class BadDecomposerType2 { } [Decomposable(typeof(BadDecomposerType3))] private static int BadDecomposable3(IEnumerable g) { throw new NotImplementedException(); } public class BadDecomposerType3 : IDecomposable { public void Initialize(object state) { } public int Seed(int x) { return x; } public int Accumulate(int a, int x) { throw new NotImplementedException(); } public int RecursiveAccumulate(int a, int x) { throw new NotImplementedException(); } public int FinalReduce(int a) { throw new NotImplementedException(); } } [Decomposable(typeof(BadDecomposerType4))] private static int BadDecomposable4(IEnumerable g) { throw new NotImplementedException(); } public class BadDecomposerType4 : IDecomposable { internal BadDecomposerType4() { } public BadDecomposerType4(int x) { } public void Initialize(object state) { } public int Seed(int x) { return x; } public int Accumulate(int a, int x) { throw new NotImplementedException(); } public int RecursiveAccumulate(int a, int x) { throw new NotImplementedException(); } public int FinalReduce(int a) { throw new NotImplementedException(); } } [Decomposable(typeof(BadDecomposerType5))] private static int BadDecomposable5(IEnumerable g) { throw new NotImplementedException(); } public class BadDecomposerType5 : IDecomposable { public void Initialize(object state) { } public int Seed(double s) { throw new NotImplementedException(); } public int Accumulate(int a, double x) { throw new NotImplementedException(); } public int RecursiveAccumulate(int a, int x) { throw new NotImplementedException(); } public int FinalReduce(int a) { throw new NotImplementedException(); } } [Decomposable(typeof(BadDecomposerType6))] private static int BadDecomposable6(IEnumerable g) { throw new NotImplementedException(); } public class BadDecomposerType6 : IDecomposable { public void Initialize(object state) { } public int Seed(int s) { throw new NotImplementedException(); } public int Accumulate(int a, int x) { throw new NotImplementedException(); } public int RecursiveAccumulate(int a, int x) { throw new NotImplementedException(); } public double FinalReduce(int a) { throw new NotImplementedException(); } } #endregion API_Misuse public static bool GroupByReduce_ListInitializerReducer() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable pt1 = simple.Select(x => x.First()); var results = pt1.GroupBy(x => x % 2, (k, g) => new List() { k, g.Count(), g.Sum() }).ToArray(); //local sort.. so that keys are in order. var resultsSorted = results.OrderBy(list => list[0]).ToArray(); //key0: count = 6, sum = 42 //key1: count = 6, sum = 36 passed &= (resultsSorted[0][0] == 0); // "incorrect results.1" passed &= (resultsSorted[0][1] == 6); // "incorrect results.2" passed &= (resultsSorted[0][2] == 42); // "incorrect results.3" passed &= (resultsSorted[1][0] == 1); // "incorrect results.4" passed &= (resultsSorted[1][1] == 6); // "incorrect results.5" passed &= (resultsSorted[1][2] == 36); // "incorrect results.6" } catch (DryadLinqException) { passed &= false; } return passed; } public static bool GroupByReduce_CustomListInitializerReducer() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable pt1 = simple.Select(x => x.First()); var results = pt1.GroupBy(x => x % 2, (k, g) => new MultiParamInitializerClass() { {k, g.Count(), g.Sum()} , // one item, comprising three components }).ToArray(); //local sort.. so that keys are in order. var resultsSorted = results.OrderBy(list => list.Key).ToArray(); //key0: count = 6, sum = 42 //key1: count = 6, sum = 36 passed &= (resultsSorted[0].Key == 0); // "incorrect results.1" passed &= (resultsSorted[0].Count() == 6); // "incorrect results.2" passed &= (resultsSorted[0].Sum() == 42); // "incorrect results.3" passed &= (resultsSorted[1].Key == 1); // "incorrect results.4" passed &= (resultsSorted[1].Count() == 6); // "incorrect results.5" passed &= (resultsSorted[1].Sum() == 36); // "incorrect results.6" } catch (DryadLinqException) { passed &= false; } return passed; } // note: must be IEnumerable<> to be allowed to participate in list-initializer syntax. // we are cheating here and only supporting one "add" call, just as an example. [Serializable] public class MultiParamInitializerClass : IEnumerable { public int Key; public int Sum; public int Count; public void Add(int key, int count, int sum) { Key = key; Count = count; Sum = sum; } public IEnumerator GetEnumerator() { yield return Key; yield return Count; yield return Sum; } System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return GetEnumerator(); } } public static bool GroupByReduce_BitwiseNegationOperator() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable pt1 = simple.Select(x => x.First()); var results = pt1.GroupBy(x => x % 2, (k, g) => new KeyValuePair(k, ~g.Sum())).ToArray(); //local sort.. so that keys are in order. var resultsSorted = results.OrderBy(list => list.Key).ToArray(); //key0: count = 6, sum = 42 //key1: count = 6, sum = 36 passed &= (resultsSorted[0].Key == 0); // "incorrect results.1" passed &= (resultsSorted[0].Value == ~42); // "incorrect results.2" passed &= (resultsSorted[1].Key == 1); // "incorrect results.3" passed &= (resultsSorted[1].Value == ~36); // "incorrect results.4" } catch (DryadLinqException) { passed &= false; } return passed; } public static bool template() { var context = new DryadLinqContext(Config.cluster); context.LocalExecution = false; bool passed = true; try { IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, "unittest/inputdata/SimpleFile.txt")); IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet()); IQueryable data = simple.Select(x => x.First()); //passed &= aggregatesOrdered.SequenceEqual(expectedOrdered); } catch (DryadLinqException) { passed &= false; } return passed; } } }