diff --git a/ClusterInterface/ClusterInterface.csproj b/ClusterInterface/ClusterInterface.csproj
index 64a32ea..e2f16ff 100644
--- a/ClusterInterface/ClusterInterface.csproj
+++ b/ClusterInterface/ClusterInterface.csproj
@@ -1,6 +1,6 @@
-
+
-
+
Debug
@@ -51,15 +51,36 @@
False
- ..\packages\Microsoft.Research.Peloponnese.0.7.1-beta\lib\net45\Microsoft.Research.Peloponnese.HadoopBridge.dll
+ ..\packages\Microsoft.Research.Peloponnese.0.7.2-beta\lib\net45\Microsoft.Research.Peloponnese.HadoopBridge.dll
-
+
False
- ..\packages\Microsoft.Research.Peloponnese.0.7.1-beta\lib\net45\Microsoft.Research.Peloponnese.Utils.dll
+ ..\packages\Microsoft.Research.Peloponnese.0.7.2-beta\lib\net45\Microsoft.Research.Peloponnese.Utils.dll
+
+
+ False
+ ..\packages\Microsoft.Bcl.Async.1.0.166\lib\net40\Microsoft.Threading.Tasks.dll
+
+
+ ..\packages\Microsoft.Bcl.Async.1.0.166\lib\net40\Microsoft.Threading.Tasks.Extensions.dll
+
+
+ ..\packages\Microsoft.Bcl.Async.1.0.166\lib\net40\Microsoft.Threading.Tasks.Extensions.Desktop.dll
+
+
+ False
+ ..\packages\Microsoft.WindowsAzure.Common.1.0.1\lib\net45\Microsoft.WindowsAzure.Common.dll
+
+
+ False
+ ..\packages\Microsoft.WindowsAzure.Common.1.0.1\lib\net45\Microsoft.WindowsAzure.Common.NetFramework.dll
..\packages\Microsoft.WindowsAzure.ConfigurationManager.2.0.3\lib\net40\Microsoft.WindowsAzure.Configuration.dll
+
+ ..\packages\Microsoft.WindowsAzure.Management.1.0.0\lib\net40\Microsoft.WindowsAzure.Management.dll
+
False
..\packages\Microsoft.WindowsAzure.Management.HDInsight.1.1.0.7\lib\net40\Microsoft.WindowsAzure.Management.HDInsight.dll
@@ -72,17 +93,29 @@
False
..\packages\Microsoft.Hadoop.Client.1.1.0.7\lib\net40\Microsoft.WindowsAzure.Management.HDInsight.Framework.Core.dll
+
+ False
+ ..\packages\Microsoft.WindowsAzure.Management.Storage.1.0.0\lib\net40\Microsoft.WindowsAzure.Management.Storage.dll
+
False
..\packages\WindowsAzure.Storage.3.1.0.1\lib\net40\Microsoft.WindowsAzure.Storage.dll
-
- False
+
..\packages\Newtonsoft.Json.6.0.2\lib\net45\Newtonsoft.Json.dll
+
+
+
+ ..\packages\Microsoft.Net.Http.2.2.19\lib\net45\System.Net.Http.Extensions.dll
+
+
+ ..\packages\Microsoft.Net.Http.2.2.19\lib\net45\System.Net.Http.Primitives.dll
+
+
False
..\packages\System.Spatial.5.6.1\lib\net40\System.Spatial.dll
@@ -105,14 +138,19 @@
+
+
+
+
+
This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.
-
-
+
+
-
+
-
+
\ No newline at end of file
diff --git a/ClusterInterface/Properties/AssemblyInfo.cs b/ClusterInterface/Properties/AssemblyInfo.cs
index eab0852..ace9a43 100644
--- a/ClusterInterface/Properties/AssemblyInfo.cs
+++ b/ClusterInterface/Properties/AssemblyInfo.cs
@@ -51,5 +51,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
-[assembly: AssemblyVersion("0.1.0.0")]
-[assembly: AssemblyFileVersion("0.1.0.0")]
+[assembly: AssemblyVersion("0.1.2.0")]
+[assembly: AssemblyFileVersion("0.1.2.0")]
diff --git a/ClusterInterface/packages.config b/ClusterInterface/packages.config
index 6cc3081..bd29d67 100644
--- a/ClusterInterface/packages.config
+++ b/ClusterInterface/packages.config
@@ -1,13 +1,21 @@
-
+
+
+
+
-
+
+
+
+
+
+
-
+
\ No newline at end of file
diff --git a/DryadLinqGraphManager/DryadLinqGraphManager.csproj b/DryadLinqGraphManager/DryadLinqGraphManager.csproj
index f7e3948..2619ed3 100644
--- a/DryadLinqGraphManager/DryadLinqGraphManager.csproj
+++ b/DryadLinqGraphManager/DryadLinqGraphManager.csproj
@@ -1,6 +1,6 @@
-
+
-
+
Debug
AnyCPU
@@ -71,15 +71,39 @@
False
- ..\packages\Microsoft.Research.Peloponnese.0.7.1-beta\lib\net45\Microsoft.Research.Peloponnese.HadoopBridge.dll
+ ..\packages\Microsoft.Research.Peloponnese.0.7.2-beta\lib\net45\Microsoft.Research.Peloponnese.HadoopBridge.dll
-
+
False
- ..\packages\Microsoft.Research.Peloponnese.0.7.1-beta\lib\net45\Microsoft.Research.Peloponnese.Utils.dll
+ ..\packages\Microsoft.Research.Peloponnese.0.7.2-beta\lib\net45\Microsoft.Research.Peloponnese.Utils.dll
+
+
+ False
+ ..\packages\Microsoft.Bcl.Async.1.0.166\lib\net40\Microsoft.Threading.Tasks.dll
+
+
+ False
+ ..\packages\Microsoft.Bcl.Async.1.0.166\lib\net40\Microsoft.Threading.Tasks.Extensions.dll
+
+
+ False
+ ..\packages\Microsoft.Bcl.Async.1.0.166\lib\net40\Microsoft.Threading.Tasks.Extensions.Desktop.dll
+
+
+ False
+ ..\packages\Microsoft.WindowsAzure.Common.1.0.1\lib\net45\Microsoft.WindowsAzure.Common.dll
+
+
+ False
+ ..\packages\Microsoft.WindowsAzure.Common.1.0.1\lib\net45\Microsoft.WindowsAzure.Common.NetFramework.dll
..\packages\Microsoft.WindowsAzure.ConfigurationManager.2.0.3\lib\net40\Microsoft.WindowsAzure.Configuration.dll
+
+ False
+ ..\packages\Microsoft.WindowsAzure.Management.1.0.0\lib\net40\Microsoft.WindowsAzure.Management.dll
+
False
..\packages\Microsoft.WindowsAzure.Management.HDInsight.1.1.0.7\lib\net40\Microsoft.WindowsAzure.Management.HDInsight.dll
@@ -92,6 +116,10 @@
False
..\packages\Microsoft.Hadoop.Client.1.1.0.7\lib\net40\Microsoft.WindowsAzure.Management.HDInsight.Framework.Core.dll
+
+ False
+ ..\packages\Microsoft.WindowsAzure.Management.Storage.1.0.0\lib\net40\Microsoft.WindowsAzure.Management.Storage.dll
+
False
..\packages\WindowsAzure.Storage.3.1.0.1\lib\net40\Microsoft.WindowsAzure.Storage.dll
@@ -108,6 +136,17 @@
3.5
+
+
+
+ False
+ ..\packages\Microsoft.Net.Http.2.2.19\lib\net45\System.Net.Http.Extensions.dll
+
+
+ False
+ ..\packages\Microsoft.Net.Http.2.2.19\lib\net45\System.Net.Http.Primitives.dll
+
+
False
..\packages\System.Spatial.5.6.1\lib\net40\System.Spatial.dll
@@ -164,14 +203,19 @@
+
+
+
+
+
This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.
-
-
+
+
-
+
-
+
\ No newline at end of file
diff --git a/DryadLinqGraphManager/Properties/AssemblyInfo.cs b/DryadLinqGraphManager/Properties/AssemblyInfo.cs
index ea2eb9c..bc4e5c4 100644
--- a/DryadLinqGraphManager/Properties/AssemblyInfo.cs
+++ b/DryadLinqGraphManager/Properties/AssemblyInfo.cs
@@ -51,5 +51,5 @@ using System.Runtime.InteropServices;
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
-[assembly: AssemblyVersion("0.1.0.0")]
-[assembly: AssemblyFileVersion("0.1.0.0")]
+[assembly: AssemblyVersion("0.1.2.0")]
+[assembly: AssemblyFileVersion("0.1.2.0")]
diff --git a/DryadLinqGraphManager/packages.config b/DryadLinqGraphManager/packages.config
index 6cc3081..bd29d67 100644
--- a/DryadLinqGraphManager/packages.config
+++ b/DryadLinqGraphManager/packages.config
@@ -1,13 +1,21 @@
-
+
+
+
+
-
+
+
+
+
+
+
-
+
\ No newline at end of file
diff --git a/DryadLinqTests/ApplyAndForkTests.cs b/DryadLinqTests/ApplyAndForkTests.cs
new file mode 100644
index 0000000..b523841
--- /dev/null
+++ b/DryadLinqTests/ApplyAndForkTests.cs
@@ -0,0 +1,270 @@
+using Microsoft.Research.DryadLinq;
+using Microsoft.Research.Peloponnese.Storage;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+
+namespace DryadLinqTests
+{
+ public class ApplyAndForkTests
+ {
+ public static IEnumerable NonHomomorphic_Unary_Func(IEnumerable input)
+ {
+ return input;
+ }
+
+ // [DistributiveOverConcat]
+ public static IEnumerable Homomorphic_Unary_Func(IEnumerable input)
+ {
+ return input;
+ }
+
+ public static IEnumerable NonHomomorphic_Binary_Func(IEnumerable left, IEnumerable right)
+ {
+ return left;
+ }
+
+ // [LeftDistributiveOverConcat]
+ public static IEnumerable LeftHomomorphic_Binary_Func(IEnumerable left, IEnumerable right)
+ {
+ return left;
+ }
+
+ // Note: an apply function must only consume each enumerable once, and it must produce an enumerable
+ // So for a simple pass-through function that does a little work, we must enumerate only once.
+ // Else we get the error: "An HpcLinq channel can't be read more than once."
+ // [DistributiveOverConcat]
+ public static IEnumerable FullHomomorphic_Binary_Func(IEnumerable left, IEnumerable right)
+ {
+ long cLeft = 0;
+ foreach (int x in left)
+ {
+ cLeft++;
+ yield return x;
+ }
+
+ long cRight = 0;
+ foreach (int x in right)
+ {
+ cRight++;
+ yield return x;
+ }
+
+ if (cLeft == 0)
+ throw new Exception("a node received empty left-data");
+
+ if (cRight == 0)
+ throw new Exception("a node received empty right-data");
+ }
+
+
+ public static bool NonHomomorphicUnaryApply()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/NonHomomorphicUnaryApply.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q1 = pt1.ApplyPerPartition(x => NonHomomorphic_Unary_Func(x));
+ var jobInfo = q1.ToStore(outFile).Submit();
+ jobInfo.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool HomomorphicUnaryApply()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/HomomorphicUnaryApply.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q1 = pt1.ApplyPerPartition(x => Homomorphic_Unary_Func(x));
+ var jobInfo = q1.ToStore(outFile).Submit();
+ jobInfo.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool NonHomomorphicBinaryApply()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/NonHomomorphicBinaryApply.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q1 = pt1.Apply(pt1, (x, y) => NonHomomorphic_Binary_Func(x, y));
+ var jobInfo = q1.ToStore(outFile).Submit();
+ jobInfo.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool LeftHomomorphicBinaryApply()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/LeftHomomorphicBinaryApply.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q1 = pt1.ApplyPerPartition(pt1, (x, y) => LeftHomomorphic_Binary_Func(x, y), true);
+ var jobInfo = q1.ToStore(outFile).Submit();
+ jobInfo.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool FullHomomorphicBinaryApply_DifferentDataSets()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/FullHomomorphicBinaryApply_DifferentDataSets.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+ IQueryable pt2 = simple.Select(x => x.First());
+
+ var q1 = pt1.ApplyPerPartition(pt2, (x, y) => FullHomomorphic_Binary_Func(x, y), false);
+ var jobInfo = q1.ToStore(outFile).Submit();
+ jobInfo.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool FullHomomorphicBinaryApply_IdenticalDataSets()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/FullHomomorphicBinaryApply_2.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+ var q1 = pt1.ApplyPerPartition(pt1, (x, y) => FullHomomorphic_Binary_Func(x, y), false);
+ var jobInfo = q1.ToStore(outFile).Submit();
+ jobInfo.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ [Associative(typeof(AssociativeRecursive1))]
+ public static string IntToStringCSVAggregator(string agg, int next)
+ {
+ return agg + "," + next.ToString();
+ }
+ public class AssociativeRecursive1 : IAssociative
+ {
+ public string Seed()
+ {
+ return "";
+ }
+ public string RecursiveAccumulate(string first, string second)
+ {
+ return first + second;
+ }
+ }
+
+ public static bool Aggregate_WithCombiner()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ string q1 = pt1.Aggregate("", (str, x) => IntToStringCSVAggregator(str, x));
+
+ passed &= (q1.Length == 27); // string should have numbers 1..12 separated by commas
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+ }
+}
diff --git a/DryadLinqTests/BasicAPITests.cs b/DryadLinqTests/BasicAPITests.cs
new file mode 100644
index 0000000..532a127
--- /dev/null
+++ b/DryadLinqTests/BasicAPITests.cs
@@ -0,0 +1,1087 @@
+using Microsoft.Research.DryadLinq;
+using Microsoft.Research.Peloponnese.Storage;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Linq.Expressions;
+
+namespace DryadLinqTests
+{
+ public class BasicAPITests
+ {
+ public static bool ToStoreThrowsForNonQuery()
+ {
+ bool passed = true;
+ try
+ {
+ int[] data = new[] { 1, 2, 3 };
+ var q1 = data.AsQueryable().Select(x => 100 + x).ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "dummy")).ToArray();
+ //Should throw as we got into DryadLinq via AsQueryable() rather than via context.
+ passed &= false;
+ }
+ catch (ArgumentException)
+ {
+ //expected
+ }
+ return passed;
+ }
+
+ public static bool ToStoreGetEnumeratorThrows() // pass
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/ToStoreGetEnumeratorThrows.txt";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+
+ IQueryable pt1 = simple.Select(x => x.First());
+ IQueryable q1 = pt1.Select(x => 100 + x);
+
+ var output = q1.ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, outFile), true);
+ output.GetEnumerator();
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool GetEnumeratorNonToStoreTerminated()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ IQueryable q1 = pt1.Select(x => 100 + x);
+ IQueryable q2 = q1.Where(x => true);
+ foreach (int x in q2) // throws here
+ {
+ //Console.WriteLine(x);
+ }
+ //@TODO: perform a sequence-equals test.
+
+ //IQueryable format = q2.Select(x => new LineRecord(String.Format("{0}", x)));
+ //DryadLinqJobInfo output = format.ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ // "unittest/output/test2.txt")).SubmitAndWait();
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool ToStoreSubmitGetEnumerator() // pass
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/ToStoreSubmitGetEnumerator.txt";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+ var q1 = pt1.Select(x => 100 + x).HashPartition(x => x);
+ var q2 = q1.Where(x => true);
+ IQueryable output = q2.ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName, outFile), true);
+ DryadLinqJobInfo info = output.SubmitAndWait();
+
+ foreach (int x in output) // should not run a new dryad job.
+ {
+ //Console.WriteLine(x);
+ }
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool SubmitNonToStoreTerminated()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q1 = pt1.Select(x => 100 + x);
+ var q2 = q1.Where(x => true);
+ q2.SubmitAndWait(); // throws here
+ var outPT = q2.ToList();
+ foreach (int x in outPT)
+ {
+ //Console.WriteLine(x);
+ }
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool MaterializeToStoreTerminated()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile_a = "unittest/output/MaterializeToStoreTerminated_a.txt";
+ string outFile_b = "unittest/output/MaterializeToStoreTerminated_b.txt";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+ IQueryable query = pt1.Select(x => 100 + x);
+
+ var q1 = query.ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName, outFile_a), true); //stream name w/o prefixed slash
+
+ var q2 = query.Where(x => true).ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName, outFile_b), true); //stream name w/ prefixed slash
+
+ DryadLinqQueryable.Submit(q1, q2); //materialize // throws
+
+ var __unused2 = q1.Select(x => x); // Legal call, but BLOCKS
+ foreach (int x in q2)
+ {
+ //Console.WriteLine(x);
+ }
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile_a);
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile_b);
+
+ //@TODO: assert that only one query execution occurred.
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool MaterializeNonToStoreTerminated()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+ IQueryable query = pt1.Select(x => 100 + x);
+
+ DryadLinqQueryable.Submit(query); //materialize // throws
+
+ foreach (int x in query)
+ {
+ //Console.WriteLine(x);
+ }
+
+ //@TODO: assert that only one query execution occurred.
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool EnumeratePlainData()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+ foreach (int x in pt1) // throws
+ {
+ //Console.WriteLine(x);
+ }
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool CopyPlainDataViaToStoreSubmit()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/CopyPlainDataViaToStoreSubmit.txt";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q = pt1.ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName, outFile), true);
+ DryadLinqJobInfo info = q.Submit();
+ info.Wait();
+
+ foreach (int x in q)
+ {
+ //Console.WriteLine(x);
+ }
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool CopyPlainDataViaToStoreMaterialize()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/CopyPlainDataViaToStoreMaterialize.txt";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q = pt1.ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName, outFile), true);
+ DryadLinqJobInfo info = DryadLinqQueryable.Submit(q);
+ info.Wait();
+
+ foreach (int x in q)
+ {
+ //Console.WriteLine(x);
+ }
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+ /*
+ public static bool PlainEnumerableAsDryadQueryToStoreSubmit()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/PlainEnumerableAsDryadQueryToStoreSubmit.txt";
+
+ int[] plainData = { 5, 6, 7 };
+
+ var q = context.AsDryadQuery(plainData, CompressionScheme.None).ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ DryadLinqJobInfo info = q.Submit();
+ info.Wait();
+
+ foreach (int x in q)
+ {
+ //Console.WriteLine(x);
+ }
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ }
+ catch (DryadLinqException e)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+ */
+ public static bool RepeatSubmit()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/RepeatSubmit.txt";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+
+ var q = pt1.ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName, outFile), true);
+ DryadLinqJobInfo info1 = null;
+ DryadLinqJobInfo info2 = null;
+ try
+ {
+ info1 = q.Submit();
+ info2 = q.Submit(); // does not throw
+
+ if (!context.LocalDebug)
+ {
+ passed &= false;
+ }
+ }
+ catch (ArgumentException)
+ {
+ passed &= true;
+ }
+
+ //wait for any jobs to complete.
+ if (info1 != null)
+ {
+ info1.Wait();
+ }
+
+ if (info2 != null)
+ {
+ info2.Wait();
+ }
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool RepeatMaterialize()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/RepeatMaterialize.txt";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q = pt1.ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName, outFile), true);
+ DryadLinqJobInfo info1 = null;
+ DryadLinqJobInfo info2 = null;
+ try
+ {
+ info1 = DryadLinqQueryable.Submit(new[] { q }); //materialize
+ info2 = DryadLinqQueryable.Submit(new[] { q }); //materialize // does not throw
+
+ if (!context.LocalDebug)
+ {
+ passed &= false;
+ }
+ }
+ catch (ArgumentException)
+ {
+ passed &= true;
+ }
+
+ //wait for any jobs to complete.
+ if (info1 != null)
+ {
+ info1.Wait();
+ }
+
+ if (info2 != null)
+ {
+ info2.Wait();
+ }
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool MaterializeMentionsSameQueryTwice() // pass
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/MaterializeMentionsSameQueryTwice.txt";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q = pt1.ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName, outFile), true);
+ DryadLinqJobInfo info1 = null;
+ try
+ {
+ info1 = DryadLinqQueryable.Submit(q, q); //materialize // throws
+ passed &= false; // for Config.cluster execution, second materialize should throw;
+ }
+ catch (ArgumentException)
+ {
+ passed &= true;
+ }
+
+ //wait for any jobs to complete.
+ if (info1 != null)
+ {
+ info1.Wait();
+ }
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool QueryOnDataBackedDLQ()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/QueryOnDataBackedDLQ.txt";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+ var q = pt1.Select(x => 100 + x);
+ var outPT = q.ToStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, outFile), true);
+ outPT.Submit();
+
+ var outPT2_dummy_notUsed = outPT.Select(x => x); //BLOCKS HERE until the input is concrete
+ // source.Expression returns an expression for the backingDataDLQ
+ // CheckAndInitialize() on the backingData will block.
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+
+ foreach (int x in outPT)
+ {
+ //Console.WriteLine(x);
+ }
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool Bug11781_CountandFirstOrDefault()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/Bug11781.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ //Test Count()
+ var c = pt1.Count();
+
+ //Test CountAsQuery()
+ var q = pt1.CountAsQuery().ToStore(outFile);
+ DryadLinqJobInfo info = q.Submit();
+ info.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+
+ // Also test FirstOrDefault
+ // the affected code for dlq.Execute() also has a branch for FirstOrDefault() and friends.
+ int y = pt1.FirstOrDefault();
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool Bug11782_Aggregate()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/Bug11782_Aggregate.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ //test Aggregate()
+ var c = pt1.Select(x => x).Aggregate((x, y) => x + y);
+
+ //test AggregateAsQuery()
+ var q = pt1.Select(x => x).AggregateAsQuery((x, y) => x + y).ToStore(outFile);
+ DryadLinqJobInfo info = DryadLinqQueryable.Submit(q);
+ info.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool Bug11782_LowLevelQueryableManipulation()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ Expression lambda = Expression.Lambda>(
+ Expression.Constant(1),
+ new[] { Expression.Parameter(typeof(int), "x") });
+ var z = pt1.Provider.CreateQuery(
+ Expression.Call(
+ typeof(Queryable), "Select",
+ new Type[] { pt1.ElementType, pt1.ElementType },
+ pt1.Expression, Expression.Quote(lambda)));
+
+ passed &= false; // the use of non-generic Provider.CreateQuery() should have thrown
+ }
+ catch (DryadLinqException)
+ {
+ passed &= true;
+ }
+ return passed;
+ }
+
+ public static bool Bug11638_LongWhere()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/BasicAPITests_LongWhere.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q = pt1.Select(x => 100 + x);
+ var outPT = q.LongWhere((x, i) => true).ToStore(outFile);
+ var info = outPT.Submit();
+ info.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool AssumeRangePartition()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/BasicAPITests_AssumeRangePartition.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q =
+ pt1
+ .AssumeRangePartition(x => x, false)
+ .Select(x => 100 + x).ToStore(outFile);
+ var info = q.Submit();
+ info.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool Bug11638_LongMethods()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/Bug11638_LongMethods.out";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var q =
+ pt1
+ .LongSelect((x, i) => x)
+ .LongWhere((x, i) => true)
+ .LongSelectMany((x, i) => new[] { x })
+ .LongSelectMany((x, i) => new[] { x }, (i, seq) => seq) //overload#2
+ .LongTakeWhile((x, i) => true)
+ .LongSkipWhile((x, i) => false)
+ .ToStore(outFile);
+ var info = q.Submit();
+ info.Wait();
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool ContextConfigIsReadOnly()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+
+ try
+ {
+ string jobName = context.JobFriendlyName;
+ context.JobFriendlyName = "bob";
+ context.JobFriendlyName = jobName;
+ }
+ catch (NotSupportedException)
+ {
+ passed &= false; // "an exception should not thrown";
+ }
+
+ try
+ {
+ context.JobMinNodes = 120;
+ passed &= false; // "an exception should not thrown";
+ }
+ catch (NotSupportedException)
+ {
+ //expected
+ }
+
+ try
+ {
+ context.ResourcesToAdd.Add("blah");
+ passed &= false; // "an exception should not thrown";
+ }
+ catch (NotSupportedException)
+ {
+ //expected
+ }
+
+ try
+ {
+ context.ResourcesToRemove.Add("blah");
+ passed &= false; // "an exception should not thrown";
+ }
+ catch (NotSupportedException)
+ {
+ //expected
+ }
+
+ try
+ {
+ context.JobEnvironmentVariables.Add("bob", "bob");
+ passed &= false; // "an exception should not thrown";
+ }
+ catch (NotSupportedException)
+ {
+ //expected
+ }
+
+ try
+ {
+ context.EnableSpeculativeDuplication = false;
+ passed &= false; // "an exception should not thrown";
+ }
+ catch (NotSupportedException)
+ {
+ //expected
+ }
+
+ return passed;
+ }
+
+ public static bool ToggleSpeculativeDuplication()
+ {
+ var context = Utils.MakeBasicConfig(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ passed &= !context.EnableSpeculativeDuplication; // "Speculative Duplication enabled by default"
+ context.EnableSpeculativeDuplication = true;
+ passed &= context.EnableSpeculativeDuplication; // "Failed to enable speculative duplication"
+ context.EnableSpeculativeDuplication = false;
+ passed &= !context.EnableSpeculativeDuplication; // "Failed to disable speculative duplication"
+ context.EnableSpeculativeDuplication = false;
+ // ??? DryadLinqContext testContext = new DryadLinqContext(context);
+ // ??? passed &= !testContext.EnableSpeculativeDuplication; // "Speculative Duplication enabled after copy"
+
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false; // "Enabling and disabling speculative duplication should not throw"
+ }
+ return passed;
+ }
+
+ public static bool Bug15068_ConfigResourcesAPI()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ context.HeadNode = "MIKELID7"; // ???
+ passed &= (context.ResourcesToAdd.IsReadOnly == false); // "isReadOnly should be false"
+ passed &= (context.ResourcesToRemove.IsReadOnly == false); // "isReadOnly should be false"
+
+ //clear
+ context.ResourcesToAdd.Clear();
+ context.ResourcesToRemove.Clear();
+
+ //add
+ context.ResourcesToAdd.Add("abc");
+ context.ResourcesToRemove.Add("def");
+ context.ResourcesToRemove.Add("ghi");
+
+ //index, count, getEnumerator
+ passed &= (context.ResourcesToAdd[0] == "abc"); // "wrong value"
+ passed &= (context.ResourcesToAdd.Count == 1); // "wrong value"
+
+ passed &= (context.ResourcesToRemove[0] == "def"); // "wrong value"
+ passed &= (context.ResourcesToRemove.Where((x, i) => (i == 1)).First() == "ghi"); // "wrong value"
+ passed &= (context.ResourcesToRemove.Count == 2); // "wrong value"
+
+ // ???
+ //// read-only.
+ //DryadLinqContext ctx = new DryadLinqContext(context);
+ //passed &= (ctx.ResourcesToAdd.IsReadOnly == true); // "isReadOnly should be true"
+ //passed &= (ctx.ResourcesToRemove.IsReadOnly == true); // "isReadOnly should be true"
+
+ // clone was taken.
+ context.ResourcesToAdd.Clear();
+ context.ResourcesToRemove.Clear();
+ // ???
+ //passed &= (ctx.ResourcesToAdd.Count == 1); // "should be unaffected"
+ //passed &= (ctx.ResourcesToRemove.Count == 2); // "should be unaffected"
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool Bug14449_ContextShouldExposeVersionIDs()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ // ???
+ //passed &= (context.Major >= 3); // "problem with HpcLinq client version"
+ //passed &= (context.Major >= 3); // "problem with HpcLinq server version"
+
+ //passed &= (context.ClientVersion.Major >= 3); // "problem with Dsc client version"
+ //passed &= (context.ServerVersion.Major >= 3); // "problem with Dsc server version"
+
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool Bug_16341_SubmitThrowsForDifferentContexts()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ var context2 = new DryadLinqContext(Config.cluster);
+ context2.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ IQueryable input2 = context2.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple2 = input2.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt2 = simple2.Select(x => x.First());
+
+ DryadLinqQueryable.Submit(pt1, pt2);
+ passed &= false;
+ }
+ catch (DryadLinqException)
+ {
+ }
+
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ IQueryable input2 = context2.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.storageKey, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple2 = input2.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt2 = simple2.Select(x => x.First());
+
+ DryadLinqQueryable.SubmitAndWait(pt1, pt2);
+ passed &= false;
+ }
+ catch (DryadLinqException)
+ {
+ }
+
+ return passed;
+ }
+
+ public static bool Bug_16341_VariousTestsForSubmit()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ int[] data = new[] { 1, 2, 3 };
+ var badQ1 = data.AsQueryable().Select(x => 100 + x);
+ var badQ2 = data.AsQueryable().Select(x => 100 + x);
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable goodQ1 = simple.Select(x => x.First());
+
+ IQueryable input_copy = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple_copy = input_copy.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable goodQ2 = simple_copy.Select(x => x.First());
+
+
+ try
+ {
+ badQ1.Submit();
+ passed &= false; // "should throw as input isn't a L2H query"
+ }
+ catch (ArgumentException)
+ {
+ }
+
+ try
+ {
+ DryadLinqQueryable.Submit((IQueryable)null); //this-Query overload
+ passed &= false; // "should throw ArgNull as input is null"
+ }
+ catch (ArgumentException)
+ {
+ //although we pass null, it goes to params[] overload which creates an actual array[1] containing one null
+ //hence we throw ArgumentException rather than ArgumentNullException.
+ }
+
+ try
+ {
+ DryadLinqQueryable.Submit((IQueryable[])null); //multi-query overload
+ passed &= false; // "should throw ArgNull as input is null"
+ }
+ catch (ArgumentNullException)
+ {
+ }
+
+ try
+ {
+ DryadLinqQueryable.Submit(goodQ1, null); //multi-query overload
+ passed &= false; // "should throw ArgEx as one of the inputs is null"
+ }
+ catch (ArgumentException)
+ {
+ }
+
+ try
+ {
+ DryadLinqQueryable.Submit(goodQ1, badQ1); //multi-query overload
+ passed &= false; // "should throw ArgEx as one of the inputs is not a L2H"
+ }
+ catch (ArgumentException)
+ {
+ }
+
+ //----------
+ // same tests again for SubmitAndWait
+
+ try
+ {
+ badQ1.SubmitAndWait();
+ passed &= false; // "should throw as input isn't a L2H query"
+ }
+ catch (ArgumentException)
+ {
+ }
+
+ try
+ {
+ DryadLinqQueryable.SubmitAndWait((IQueryable)null); //this-Query overload
+ passed &= false; // "should throw ArgNull as input is null"
+ }
+ catch (ArgumentException)
+ {
+ //although we pass null, it goes to params[] overload which creates an actual array[1] containing one null
+ //hence we throw ArgumentException rather than ArgumentNullException.
+ }
+
+ try
+ {
+ DryadLinqQueryable.SubmitAndWait((IQueryable[])null); //multi-query overload
+ passed &= false; // "should throw ArgNull as input is null"
+ }
+ catch (ArgumentNullException)
+ {
+ }
+
+ try
+ {
+ DryadLinqQueryable.SubmitAndWait(goodQ1, null); //multi-query overload
+ passed &= false; // "should throw ArgEx as one of the inputs is null"
+ }
+ catch (ArgumentException)
+ {
+ }
+
+ try
+ {
+ DryadLinqQueryable.SubmitAndWait(goodQ1, badQ1); //multi-query overload
+ passed &= false; // "should throw ArgEx as one of the inputs is not a L2H"
+ }
+ catch (ArgumentException)
+ {
+ }
+
+ }
+ catch (DryadLinqException)
+ {
+ }
+ return passed;
+ }
+
+
+ public static bool template()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string outFile = "unittest/output/x.txt";
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ passed &= Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile);
+
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ }
+
+}
diff --git a/DryadLinqTests/DryadLinqTests.csproj b/DryadLinqTests/DryadLinqTests.csproj
index 74f321f..c88877b 100644
--- a/DryadLinqTests/DryadLinqTests.csproj
+++ b/DryadLinqTests/DryadLinqTests.csproj
@@ -1,6 +1,6 @@
-
+
-
+
Debug
@@ -63,37 +63,78 @@
False
..\packages\Microsoft.Data.Services.Client.5.6.1\lib\net40\Microsoft.Data.Services.Client.dll
-
+
+ False
..\packages\Microsoft.Hadoop.Client.1.1.0.7\lib\net40\Microsoft.Hadoop.Client.dll
-
- ..\packages\Microsoft.Research.Peloponnese.0.7.1-beta\lib\net45\Microsoft.Research.Peloponnese.HadoopBridge.dll
+
+ False
+ ..\packages\Microsoft.Research.Peloponnese.0.7.2-beta\lib\net45\Microsoft.Research.Peloponnese.HadoopBridge.dll
-
- ..\packages\Microsoft.Research.Peloponnese.0.7.1-beta\lib\net45\Microsoft.Research.Peloponnese.Utils.dll
+
+ False
+ ..\packages\Microsoft.Research.Peloponnese.0.7.2-beta\lib\net45\Microsoft.Research.Peloponnese.Utils.dll
+
+
+ False
+ ..\packages\Microsoft.Bcl.Async.1.0.166\lib\net40\Microsoft.Threading.Tasks.dll
+
+
+ ..\packages\Microsoft.Bcl.Async.1.0.166\lib\net40\Microsoft.Threading.Tasks.Extensions.dll
+
+
+ ..\packages\Microsoft.Bcl.Async.1.0.166\lib\net40\Microsoft.Threading.Tasks.Extensions.Desktop.dll
+
+
+ False
+ ..\packages\Microsoft.WindowsAzure.Common.1.0.1\lib\net45\Microsoft.WindowsAzure.Common.dll
+
+
+ False
+ ..\packages\Microsoft.WindowsAzure.Common.1.0.1\lib\net45\Microsoft.WindowsAzure.Common.NetFramework.dll
..\packages\Microsoft.WindowsAzure.ConfigurationManager.2.0.3\lib\net40\Microsoft.WindowsAzure.Configuration.dll
-
+
+ ..\packages\Microsoft.WindowsAzure.Management.1.0.0\lib\net40\Microsoft.WindowsAzure.Management.dll
+
+
+ False
..\packages\Microsoft.WindowsAzure.Management.HDInsight.1.1.0.7\lib\net40\Microsoft.WindowsAzure.Management.HDInsight.dll
-
+
+ False
..\packages\Microsoft.Hadoop.Client.1.1.0.7\lib\net40\Microsoft.WindowsAzure.Management.HDInsight.Framework.dll
-
+
+ False
..\packages\Microsoft.Hadoop.Client.1.1.0.7\lib\net40\Microsoft.WindowsAzure.Management.HDInsight.Framework.Core.dll
+
+ False
+ ..\packages\Microsoft.WindowsAzure.Management.Storage.1.0.0\lib\net40\Microsoft.WindowsAzure.Management.Storage.dll
+
False
..\packages\WindowsAzure.Storage.3.1.0.1\lib\net40\Microsoft.WindowsAzure.Storage.dll
-
+
+ False
..\packages\Newtonsoft.Json.6.0.2\lib\net45\Newtonsoft.Json.dll
+
+
+
+ ..\packages\Microsoft.Net.Http.2.2.19\lib\net45\System.Net.Http.Extensions.dll
+
+
+ ..\packages\Microsoft.Net.Http.2.2.19\lib\net45\System.Net.Http.Primitives.dll
+
+
False
..\packages\System.Spatial.5.6.1\lib\net40\System.Spatial.dll
@@ -106,8 +147,14 @@
+
+
+
+
+
+
@@ -137,14 +184,19 @@
+
+
+
+
+
This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.
-
-
+
+
-
+
-
+
\ No newline at end of file
diff --git a/DryadLinqTests/GroupByReduceTests.cs b/DryadLinqTests/GroupByReduceTests.cs
new file mode 100644
index 0000000..e7c7d69
--- /dev/null
+++ b/DryadLinqTests/GroupByReduceTests.cs
@@ -0,0 +1,1003 @@
+using Microsoft.Research.DryadLinq;
+using Microsoft.Research.Peloponnese.Storage;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Linq.Expressions;
+
+namespace DryadLinqTests
+{
+ public static class GroupByReduceTests
+ {
+ public static bool Decomposition_Average()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ double[] aggregates = pt1.GroupBy(x => x % 2).Select(g => g.Average()).ToArray();
+ //int[] expected = new[] { 1 + 3 + 5 + 7 + 9 + 11, 2 + 4 + 6 + 8 + 10 + 12 };
+
+ ////note the order of the result elements is not guaranteed, so order them before testing
+ //int[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray();
+ //int[] expectedOrdered = expected.OrderBy(x => x).ToArray();
+
+ //passed &= aggregatesOrdered.SequenceEqual(expectedOrdered);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool DistributiveResultSelector_1()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ // this result selector satisfies "DistributiveOverConcat"
+ int[] aggregates = pt1.GroupBy(x => x % 2, (key, seq) => seq.Sum()).ToArray();
+ int[] expected = new[] { 1 + 3 + 5 + 7 + 9 + 11, 2 + 4 + 6 + 8 + 10 + 12 };
+
+ //note the order of the result elements is not guaranteed, so order them before testing
+ int[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray();
+ int[] expectedOrdered = expected.OrderBy(x => x).ToArray();
+
+ passed &= aggregatesOrdered.SequenceEqual(expectedOrdered);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool DistributiveSelect_1()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ // this result selector satisfies "DistributiveOverConcat"
+ int[] aggregates = pt1.GroupBy(x => x % 2).Select(group => group.Sum()).ToArray();
+ int[] expected = new[] { 1 + 3 + 5 + 7 + 9 + 11, 2 + 4 + 6 + 8 + 10 + 12 };
+
+ //note the order of the result elements is not guaranteed, so order them before testing
+ int[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray();
+ int[] expectedOrdered = expected.OrderBy(x => x).ToArray();
+
+ passed &= aggregatesOrdered.SequenceEqual(expectedOrdered);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool BuiltInCountIsDistributable()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateSimpleFileSets());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ // Built in Count is Distributable as built-in logic knows to use Sum() as the combiner function.
+ // Count(a,b,c,d) = Sum(Count(a,b), Count(c,d))
+ int[] aggregates = pt1.GroupBy(x => x % 2, (key, seq) => seq.Count()).ToArray();
+ int[] expected = new[] { 6, 6 }; // six elements in each full group.
+
+ //note the order of the result elements is not guaranteed, so order them before testing
+ int[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray();
+ int[] expectedOrdered = expected.OrderBy(x => x).ToArray();
+
+ passed &= aggregatesOrdered.SequenceEqual(expectedOrdered);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool Bug12078_GroupByReduceWithResultSelectingAggregate()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable data = simple.Select(x => x.First());
+
+ double[] aggregates = data
+ .Select(x => (double)x)
+ .GroupBy(x => 0, (key, seq) => seq.Aggregate((double)0, (acc, item) => acc + item, val => val / 100)).ToArray();
+ double[] expected = new[] { Enumerable.Range(1, 200).Sum() / 100.0 };
+
+ //note the order of the result elements is not guaranteed, so order them before testing
+ double[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray();
+ double[] expectedOrdered = expected.OrderBy(x => x).ToArray();
+
+ passed &= aggregatesOrdered.SequenceEqual(expectedOrdered);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ #region GroupByReduceWithCustomDecomposableFunction_DistributableCombiner
+
+ [Decomposable(typeof(Decomposer_1))]
+ public static double DecomposableFunc(IEnumerable seq)
+ {
+ // hard to test with context system.. TestUtils.Assert(HpcLinq.LocalDebug, "This method should only be called during LocalDebug");
+ return seq.Aggregate((double)0, (acc, item) => acc + item, val => val / 100);
+ }
+ public class Decomposer_1 : IDecomposable
+ {
+ public void Initialize(object state) { }
+
+ public double Seed(double source)
+ {
+ return source;
+ }
+
+ public double Accumulate(double a, double x)
+ {
+ return a + x;
+ }
+
+ public double RecursiveAccumulate(double a, double x)
+ {
+ return a + x;
+ }
+
+ public double FinalReduce(double a)
+ {
+ return a / 100;
+ }
+ }
+
+ public static bool GroupByReduceWithCustomDecomposableFunction_DistributableCombiner()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable data = simple.Select(x => x.First());
+
+ double[] aggregates = data
+ .Select(x => (double)x)
+ .GroupBy(x => 0, (k, g) => DecomposableFunc(g))
+ .ToArray();
+ double[] expected = new[] { Enumerable.Range(1, 200).Sum() / 100.0 };
+
+ //note the order of the result elements is not guaranteed, so order them before testing
+ double[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray();
+ double[] expectedOrdered = expected.OrderBy(x => x).ToArray();
+
+ passed &= aggregatesOrdered.SequenceEqual(expectedOrdered);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ #endregion GroupByReduceWithCustomDecomposableFunction_DistributableCombiner
+
+ #region GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_DifferingTypes
+ // Tests a fully decomposed function whose reducer changes types.
+ [Decomposable(typeof(Decomposer_2))]
+ public static string DecomposableFunc2(IEnumerable seq)
+ {
+ //TestUtils.Assert(HpcLinq.LocalDebug, "This method should only be called during LocalDebug");
+ return seq.Aggregate((double)0, (acc, item) => acc + item, val => ("hello:" + val.ToString()));
+ }
+ public class Decomposer_2 : IDecomposable
+ {
+ public void Initialize(object state) { }
+
+ public double Seed(double source)
+ {
+ return source;
+ }
+
+ public double Accumulate(double a, double x)
+ {
+ return a + x;
+ }
+
+ public double RecursiveAccumulate(double a, double x)
+ {
+ return a + x;
+ }
+
+ public string FinalReduce(double a)
+ {
+ return ("hello:" + a.ToString());
+ }
+ }
+
+ public static bool GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_DifferingTypes()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable data = simple.Select(x => x.First());
+
+ string[] aggregates = data
+ .Select(x => (double)x)
+ .GroupBy(x => 0, (key, seq) => DecomposableFunc2(seq)).ToArray();
+ string[] expected = new[] { "hello:" + Enumerable.Range(1, 200).Sum() };
+
+ //note the order of the result elements is not guaranteed, so order them before testing
+ string[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray();
+ string[] expectedOrdered = expected.OrderBy(x => x).ToArray();
+
+ passed &= aggregatesOrdered.SequenceEqual(expectedOrdered);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ #endregion GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_DifferingTypes
+
+ #region GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_NoFinalizer
+ // Tests a decomposed function with no need for a particular reduce.
+ // The combiner changes type, and the recursive-combiner operators on the altered type
+ // The reducer just calls combiner again.
+ [Decomposable(typeof(Decomposer_3))]
+ public static string DecomposableFunc3(IEnumerable seq)
+ {
+ // TestUtils.Assert(HpcLinq.LocalDebug, "This method should only be called during LocalDebug");
+ return seq.Aggregate("0", (acc, item) => (double.Parse(acc) + item).ToString());
+ }
+ public class Decomposer_3 : IDecomposable
+ {
+ public void Initialize(object state) { }
+
+ public string Seed(double source)
+ {
+ return source.ToString();
+ }
+
+ public string Accumulate(string a, double x)
+ {
+ return (double.Parse(a) + x).ToString();
+ }
+
+ public string RecursiveAccumulate(string a, string x)
+ {
+ return (double.Parse(a) + double.Parse(x)).ToString();
+ }
+
+ public string FinalReduce(string a)
+ {
+ return a;
+ }
+ }
+
+ public static bool GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_NoFinalizer()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable data = simple.Select(x => x.First());
+
+ string[] aggregates = data
+ .Select(x => (double)x)
+ .GroupBy(x => 0, (key, seq) => DecomposableFunc3(seq)).ToArray();
+ string[] expected = new[] { Enumerable.Range(1, 200).Sum().ToString() };
+
+ //note the order of the result elements is not guaranteed, so order them before testing
+ string[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray();
+ string[] expectedOrdered = expected.OrderBy(x => x).ToArray();
+
+ passed &= aggregatesOrdered.SequenceEqual(expectedOrdered);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ #endregion GroupByReduceWithCustomDecomposableFunction_DistributableCombiner_NoFinalizer
+
+ #region GroupByReduceWithCustomDecomposableFunction_NonDistributableCombiner
+ // Tests simplified pattern where the Combiner is not recursively applied.
+ // Note: Func4 can be represented as a decomposable with distributive-combiner and a finalizer.. but here we choose not to.
+ // Because of the form of the Combiner, it is critical that it not be used recursively.
+ [Decomposable(typeof(Decomposer_4))]
+ public static double DecomposableFunc4(IEnumerable seq)
+ {
+ // TestUtils.Assert(HpcLinq.LocalDebug, "This method should only be called during LocalDebug");
+ return seq.Aggregate(0.0, (acc, item) => acc + item, acc => acc / 100);
+ }
+ public class Decomposer_4 : IDecomposable
+ {
+ public void Initialize(object state) { }
+
+ public double Seed(double source)
+ {
+ return source;
+ }
+
+ public double Accumulate(double a, double x)
+ {
+ return a + x;
+ }
+
+ public double RecursiveAccumulate(double a, double x)
+ {
+ return a + x;
+ }
+
+ public double FinalReduce(double a)
+ {
+ return a / 100;
+ }
+ }
+
+ public static bool GroupByReduceWithCustomDecomposableFunction_NonDistributableCombiner()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable data = simple.Select(x => x.First());
+
+ double[] aggregates = data
+ .Select(x => (double)x)
+ .GroupBy(x => 0, (key, seq) => DecomposableFunc4(seq)).ToArray();
+ double[] expected = new[] { Enumerable.Range(1, 200).Sum() / 100.0 };
+
+ //note the order of the result elements is not guaranteed, so order them before testing
+ double[] aggregatesOrdered = aggregates.OrderBy(x => x).ToArray();
+ double[] expectedOrdered = expected.OrderBy(x => x).ToArray();
+
+ passed &= aggregatesOrdered.SequenceEqual(expectedOrdered);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ #endregion GroupByReduceWithCustomDecomposableFunction_NonDistributableCombiner
+
+ public static bool GroupByReduce_BuiltIn_First()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable data = simple.Select(x => x.First());
+
+ int[] aggregates = data
+ .GroupBy(x => 0, (key, seq) => seq.First())
+ .ToArray();
+
+ // the output of First can be the first item of either partition.
+ passed &= aggregates.SequenceEqual(new[] { 1 }) || aggregates.SequenceEqual(new[] { 101 });
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool GroupByReduce_ResultSelector_ComplexNewExpression()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable data = simple.Select(x => x.First());
+
+ var aggregates = data.GroupBy(x => 0, (key, seq) => new KeyValuePair>(key, new KeyValuePair(seq.Average(), seq.Average()))).ToArray();
+
+ var expected = new KeyValuePair>[] { new KeyValuePair>(0, new KeyValuePair(100.5, 100.5)) };
+
+ passed &= aggregates.SequenceEqual(expected);
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ #region GroupByReduce_ProgrammingManualExample
+
+ public static bool GroupByReduce_ProgrammingManualExample()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ string filesetName = "DevUnitTest/0to999integers";
+ Utils.DeleteFile(Config.accountName, Config.storageKey, Config.containerName, filesetName, true);
+
+ IEnumerable> rawdata = new[] { Enumerable.Range(0, 334), Enumerable.Range(334, 333), Enumerable.Range(667, 333) };
+ // ??? DscIngressHelpers.AsDryadQueryPartitions(context, rawdata, filesetName, DscCompressionScheme.None);
+ var data = context.FromStore(filesetName);
+
+ var count = data.AsEnumerable().Count();
+ var sum = data.AsEnumerable().Sum();
+ var min = data.AsEnumerable().Min();
+ var max = data.AsEnumerable().Max();
+ var uniques = data.AsEnumerable().Distinct().Count();
+
+ //Console.WriteLine("DATA:: count:{0} uniques:{1} sum:{2}, min:{3}, max:{4}", count, uniques, sum, min, max);
+
+ // ???
+ //var results = data
+ // .GroupBy(x => x % 10, (key, seq) => new KeyValuePair(key, seq.MyAverage()))
+ // .OrderBy(y => y.Key)
+ // .ToArray();
+
+ ////foreach (var result in results)
+ //// Console.WriteLine("For group {0} the average is {1}", result.Key, result.Value);
+
+ //passed &= (results.Count() == 10);
+ //passed &= (results[0].Key == 0); // "first element should be key=0");
+ //passed &= (results[0].Value == 495); // "first element should be value=495 ie avg(0,10,20,..,990)");
+
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ [Decomposable(typeof(Decomposer_5))]
+ public static double MyAverage(this IEnumerable recordSequence)
+ {
+ int count = 0, sum = 0;
+ foreach (var r in recordSequence)
+ {
+ sum += r;
+ count++;
+ }
+ if (count == 0) throw new Exception("Can't average empty sequence");
+ return (double)sum / (double)count;
+ }
+
+ [Serializable]
+ public struct Partial
+ {
+ public int PartialSum;
+ public int PartialCount;
+ }
+
+ public class Decomposer_5 : IDecomposable
+ {
+ public void Initialize(object state) { }
+
+ public Partial Seed(int x)
+ {
+ Partial p = new Partial();
+ p.PartialSum = x;
+ p.PartialCount = 1;
+ return p;
+ }
+
+ public Partial Accumulate(Partial a, int x)
+ {
+ Partial p = new Partial();
+ p.PartialSum = a.PartialSum + x;
+ p.PartialCount = a.PartialCount + 1;
+ return p;
+ }
+
+ public Partial RecursiveAccumulate(Partial a, Partial x)
+ {
+ Partial p = new Partial();
+ p.PartialSum = a.PartialSum + x.PartialSum;
+ p.PartialCount = a.PartialCount + x.PartialCount;
+ return p;
+ }
+
+ public double FinalReduce(Partial a)
+ {
+ if (a.PartialCount == 0) throw new Exception("Can't average empty sequence");
+ return (double)a.PartialSum / (double)a.PartialCount;
+ }
+ }
+
+ #endregion GroupByReduce_ProgrammingManualExample
+
+
+ #region GroupByReduce_SameDecomposableUsedTwice
+
+ public static bool GroupByReduce_SameDecomposableUsedTwice()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var results = pt1.GroupBy(x => x % 2, (k, g) => MyFunc(k, DecomposableFunc5(g), DecomposableFunc5(g), g.Average())).ToArray();
+
+ //key0: count = 6, av = av(2,4,6,8,10,12) = 7
+ //key1: count = 6, av = av(1,3,5,7,9,11) = 6
+
+ //local sort.. so that keys are in order.
+ var results_sorted = results.OrderBy(x => x.Key).ToArray();
+
+ passed &= (results_sorted.Length == 2); // "wrong results"
+
+ passed &= (results_sorted[0].Key == 0); // "wrong results"
+ passed &= (results_sorted[0].A == 6); // "wrong results"
+ passed &= (results_sorted[0].B == 6); // "wrong results"
+ passed &= (results_sorted[0].Av == 7.0); // "wrong results"
+
+ passed &= (results_sorted[1].Key == 1); // "wrong results"
+ passed &= (results_sorted[1].A == 6); // "wrong results"
+ passed &= (results_sorted[1].B == 6); // "wrong results"
+ passed &= (results_sorted[1].Av == 6.0); // "wrong results"
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static MyStruct3 MyFunc(int key, int a, int b, double av)
+ {
+ return new MyStruct3(key, a, b, av);
+ }
+ [Decomposable(typeof(Decomposer_6))]
+ private static int DecomposableFunc5(IEnumerable g)
+ {
+ return g.Count();
+ }
+ public class Decomposer_6 : IDecomposable
+ {
+ public void Initialize(object state) { }
+
+ public int Seed(int source) { return 1; }
+
+ public int Accumulate(int a, int x)
+ {
+ return a + 1;
+ }
+
+ public int RecursiveAccumulate(int a, int x)
+ {
+ return a + x;
+ }
+
+ public int FinalReduce(int a)
+ {
+ return a;
+ }
+ }
+ [Serializable]
+ public struct MyStruct3
+ {
+ public int Key;
+ public int A;
+ public int B;
+ public double Av;
+
+ public MyStruct3(int key, int a, int b, double av)
+ {
+ Key = key; A = a; B = b; Av = av;
+ }
+ }
+
+ #endregion GroupByReduce_SameDecomposableUsedTwice
+
+ #region API_Misuse
+ internal static bool GroupByReduce_APIMisuse()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ if (context.LocalDebug)
+ {
+ // "decomposition logic doesn't run in LocalDebug.. skipping";
+ return true;
+ }
+
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ // internal-visibility decomposable type should fail.
+ try
+ {
+ pt1.GroupBy(x => x, (k, g) => BadDecomposable1(g)).ToArray();
+ passed &= false; // "exception should be thrown"
+ }
+ catch (DryadLinqException)
+ {
+ //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypeMustBePublic); // "error code is wrong"
+ }
+
+ // decomposable type doesn't implement IDecomposable or IDecomposableRecursive
+ try
+ {
+ pt1.GroupBy(x => x, (k, g) => BadDecomposable2(g)).ToArray();
+ passed &= false; //"exception should be thrown");
+ }
+ catch (DryadLinqException)
+ {
+ //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypeDoesNotImplementInterface);
+ }
+
+ // decomposable type implements more than one IDecomposable or IDecomposableRecursive
+ try
+ {
+ pt1.GroupBy(x => x, (k, g) => BadDecomposable3(g)).ToArray();
+ passed &= false;
+ }
+ catch (DryadLinqException)
+ {
+ //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypeImplementsTooManyInterfaces);
+ }
+
+ // decomposable type doesn't have public default ctor
+ try
+ {
+ pt1.GroupBy(x => x, (k, g) => BadDecomposable4(g)).ToArray();
+ passed &= false;
+ }
+ catch (DryadLinqException)
+ {
+ //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypeDoesNotHavePublicDefaultCtor);
+ }
+
+ // decomposable type input type doesn't match
+ try
+ {
+ pt1.GroupBy(x => x, (k, g) => BadDecomposable5(g)).ToArray();
+ passed &= false;
+ }
+ catch (DryadLinqException)
+ {
+ //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypesDoNotMatch);
+ }
+
+ // decomposable type output type doesn't match
+ try
+ {
+ pt1.GroupBy(x => x, (k, g) => BadDecomposable6(g)).ToArray();
+ passed &= false;
+ }
+ catch (DryadLinqException)
+ {
+ //??? passed &= (Ex.ErrorCode == DryadLinqErrorCode.DecomposerTypesDoNotMatch);
+ }
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ [Decomposable(typeof(BadDecomposerType1))]
+ private static int BadDecomposable1(IEnumerable g)
+ {
+ throw new NotImplementedException();
+ }
+ internal class BadDecomposerType1 : IDecomposable
+ {
+ public void Initialize(object state) { }
+ public int Seed(int x) { return x; }
+ public int Accumulate(int a, int x) { throw new NotImplementedException(); }
+ public int RecursiveAccumulate(int a, int x) { throw new NotImplementedException(); }
+ public int FinalReduce(int a) { throw new NotImplementedException(); }
+ }
+ [Decomposable(typeof(BadDecomposerType2))]
+ private static int BadDecomposable2(IEnumerable g)
+ {
+ throw new NotImplementedException();
+ }
+ public class BadDecomposerType2
+ {
+ }
+ [Decomposable(typeof(BadDecomposerType3))]
+ private static int BadDecomposable3(IEnumerable g)
+ {
+ throw new NotImplementedException();
+ }
+ public class BadDecomposerType3 : IDecomposable
+ {
+ public void Initialize(object state) { }
+ public int Seed(int x) { return x; }
+ public int Accumulate(int a, int x) { throw new NotImplementedException(); }
+ public int RecursiveAccumulate(int a, int x) { throw new NotImplementedException(); }
+ public int FinalReduce(int a) { throw new NotImplementedException(); }
+ }
+ [Decomposable(typeof(BadDecomposerType4))]
+ private static int BadDecomposable4(IEnumerable g)
+ {
+ throw new NotImplementedException();
+ }
+ public class BadDecomposerType4 : IDecomposable
+ {
+ internal BadDecomposerType4() { }
+ public BadDecomposerType4(int x) { }
+ public void Initialize(object state) { }
+ public int Seed(int x) { return x; }
+ public int Accumulate(int a, int x) { throw new NotImplementedException(); }
+ public int RecursiveAccumulate(int a, int x) { throw new NotImplementedException(); }
+ public int FinalReduce(int a) { throw new NotImplementedException(); }
+ }
+ [Decomposable(typeof(BadDecomposerType5))]
+ private static int BadDecomposable5(IEnumerable g)
+ {
+ throw new NotImplementedException();
+ }
+ public class BadDecomposerType5 : IDecomposable
+ {
+ public void Initialize(object state) { }
+ public int Seed(double s) { throw new NotImplementedException(); }
+ public int Accumulate(int a, double x) { throw new NotImplementedException(); }
+ public int RecursiveAccumulate(int a, int x) { throw new NotImplementedException(); }
+ public int FinalReduce(int a) { throw new NotImplementedException(); }
+ }
+ [Decomposable(typeof(BadDecomposerType6))]
+ private static int BadDecomposable6(IEnumerable g)
+ {
+ throw new NotImplementedException();
+ }
+ public class BadDecomposerType6 : IDecomposable
+ {
+ public void Initialize(object state) { }
+ public int Seed(int s) { throw new NotImplementedException(); }
+ public int Accumulate(int a, int x) { throw new NotImplementedException(); }
+ public int RecursiveAccumulate(int a, int x) { throw new NotImplementedException(); }
+ public double FinalReduce(int a) { throw new NotImplementedException(); }
+ }
+
+ #endregion API_Misuse
+
+ public static bool GroupByReduce_ListInitializerReducer()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var results = pt1.GroupBy(x => x % 2, (k, g) => new List() { k, g.Count(), g.Sum() }).ToArray();
+
+ //local sort.. so that keys are in order.
+ var resultsSorted = results.OrderBy(list => list[0]).ToArray();
+
+ //key0: count = 6, sum = 42
+ //key1: count = 6, sum = 36
+
+ passed &= (resultsSorted[0][0] == 0); // "incorrect results.1"
+ passed &= (resultsSorted[0][1] == 6); // "incorrect results.2"
+ passed &= (resultsSorted[0][2] == 42); // "incorrect results.3"
+
+ passed &= (resultsSorted[1][0] == 1); // "incorrect results.4"
+ passed &= (resultsSorted[1][1] == 6); // "incorrect results.5"
+ passed &= (resultsSorted[1][2] == 36); // "incorrect results.6"
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ public static bool GroupByReduce_CustomListInitializerReducer()
+ {
+ var context = new DryadLinqContext(Config.cluster);
+ context.LocalExecution = false;
+ bool passed = true;
+ try
+ {
+ IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName,
+ "unittest/inputdata/SimpleFile.txt"));
+
+ IQueryable> simple = input.Apply(x => DataGenerator.CreateGroupByReduceDataSet());
+ IQueryable pt1 = simple.Select(x => x.First());
+
+ var results = pt1.GroupBy(x => x % 2, (k, g) => new MultiParamInitializerClass() {
+ {k, g.Count(), g.Sum()} , // one item, comprising three components
+ }).ToArray();
+ //local sort.. so that keys are in order.
+ var resultsSorted = results.OrderBy(list => list.Key).ToArray();
+
+ //key0: count = 6, sum = 42
+ //key1: count = 6, sum = 36
+
+ passed &= (resultsSorted[0].Key == 0); // "incorrect results.1"
+ passed &= (resultsSorted[0].Count() == 6); // "incorrect results.2"
+ passed &= (resultsSorted[0].Sum() == 42); // "incorrect results.3"
+
+ passed &= (resultsSorted[1].Key == 1); // "incorrect results.4"
+ passed &= (resultsSorted[1].Count() == 6); // "incorrect results.5"
+ passed &= (resultsSorted[1].Sum() == 36); // "incorrect results.6"
+ }
+ catch (DryadLinqException)
+ {
+ passed &= false;
+ }
+ return passed;
+ }
+
+ // note: must be IEnumerable<> to be allowed to participate in list-initializer syntax.
+ // we are cheating here and only supporting one "add" call, just as an example.
+ [Serializable]
+ public class MultiParamInitializerClass : IEnumerable