/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ using Microsoft.Research.DryadLinq; using Microsoft.Research.DryadLinq.Internal; using Microsoft.Research.Peloponnese.Storage; using Microsoft.WindowsAzure.Storage; using Microsoft.WindowsAzure.Storage.Blob; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Reflection; using System.Threading.Tasks; namespace DryadLinqTests { public class Config { public static string accountName = @""; public static string storageKey = @""; public static string containerName = @""; public static string cluster = @""; public static string testLogPath = @""; public Config(string clusterName, string container, string logPath) { cluster = clusterName; AzureSubscriptions azs = new AzureSubscriptions(); Task clusterTask = azs.GetClusterAsync(clusterName); clusterTask.Wait(); accountName = clusterTask.Result.StorageAccount; storageKey = clusterTask.Result.StorageKey; containerName = container; testLogPath = logPath; } } public class DataGenerator { public DataGenerator() { } public static IQueryable CreateSimpleFileSetsEx() { int[] input = { 0, 1, 2 }; IEnumerable range = input.Apply(x => Enumerable.Range(0, 3)); // {0, 1, 2} IEnumerable partitions = range.HashPartition(x => x, 3); // create 3 partitions IEnumerable rangePartition = partitions.SelectMany(x => Enumerable.Range(x * 4, 4)); return rangePartition.AsQueryable(); } public static IEnumerable> CreateSimpleFileSets() { IEnumerable> data = new int[][] { new[] { 1, 2, 3, 4 }, new[] { 5, 6, 7, 8 }, new[] { 9, 10, 11, 12 }, }; return data; } public static IEnumerable> CreateGroupByReduceDataSet() { // we need quite a few elements to ensure the combiner will be activated in Stage#1 groupBy. // 33 elements per partition should suffice, but 100 per partition is safer. IEnumerable> data = new int[][] { Enumerable.Range(1,100).ToArray(), Enumerable.Range(101,100).ToArray(), }; return data; } public static IEnumerable> CreateRangePartitionDataSet() { // we need a lot of data to ensure sampler will get some data. // A few thousand should suffice. IEnumerable> data = new int[][] { Enumerable.Range(1,1000).ToArray(), Enumerable.Range(20000,2000).ToArray(), Enumerable.Range(40000,5000).ToArray(), }; return data; } public static IQueryable GetSimpleFileSets(DryadLinqContext context) { //IEnumerable> data = new int[][] // { // new[] { 0, 1, 2, 3 }, // new[] { 4, 5, 6, 7 }, // new[] { 8, 9, 10, 11}, // }; //IQueryable input = context.FromStore(AzureUtils.ToAzureUri(Config.accountName, Config.containerName, // "unittest/inputdata/SimpleFile.txt")); IQueryable input = context.FromEnumerable(new int[1]); IQueryable range = input.Apply(x => Enumerable.Range(0, 3)); // {0, 1, 2} IQueryable partitions = range.HashPartition(x => x, 3); // create 3 partitions IQueryable rangePartition = partitions.SelectMany(x => Enumerable.Range(x * 4, 4)); //IQueryable store = rangePartition.ToStore(@"unittest/inputdata/SimpleFile.txt"); return rangePartition; } public static IQueryable GetGroupByReduceDataSet(DryadLinqContext context) { //IEnumerable> data = new int[][] { // Enumerable.Range(1,100).ToArray(), // Enumerable.Range(101,100).ToArray(), // }; IQueryable input = context.FromEnumerable(new int[1]); IQueryable range = input.Apply(x => Enumerable.Range(0, 2)); // {0, 1} IQueryable partitions = range.HashPartition(x => x, 2); // create 2 partitions IQueryable rangePartition = partitions.SelectMany(x => Enumerable.Range(x * 100 + 1, 100)); return rangePartition; } public static IQueryable GetRangePartitionDataSet(DryadLinqContext context) { // we need a lot of data to ensure sampler will get some data. // A few thousand should suffice. //IEnumerable> data = new int[][] { // Enumerable.Range(1,1000).ToArray(), // Enumerable.Range(20000,2000).ToArray(), // Enumerable.Range(40000,5000).ToArray(), // }; IQueryable input = context.FromEnumerable(new int[1]); IQueryable range = input.Apply(x => Enumerable.Range(0, 3)); // {0, 1, 2} IQueryable partitions = range.HashPartition(x => x, 3); // create 3 partitions IQueryable rangePartition = partitions.SelectMany(x => Enumerable.Range(x * 20000 + 1, 1000)); return rangePartition; } } [Serializable] public class ReverseComparer : IComparer { IComparer _originalComparer; public ReverseComparer(IComparer originalComparer) { _originalComparer = originalComparer ?? Comparer.Default; } public int Compare(T x, T y) { return (_originalComparer.Compare(y, x)); //note reversed order of operands } public override bool Equals(object obj) { ReverseComparer objTyped = obj as ReverseComparer; return objTyped != null && _originalComparer.Equals(objTyped._originalComparer); } public override int GetHashCode() { // Modify the hash code so that it differs from the hash code for the underlying comparer. // It would also probably be good enough to just return _originalComparer.GetHashCode(). return unchecked((_originalComparer.GetHashCode() + 123457) * 10007); } } public class Utils { public static bool DeleteFile(string accountName, string accountKey, string containerName, string fileName, bool delSubDirs) { try { CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=http;AccountName=" + accountName + ";AccountKey=" + accountKey); CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient(); CloudBlobContainer container = blobClient.GetContainerReference(containerName); container.CreateIfNotExists(); BlobContainerPermissions containerPermissions = new BlobContainerPermissions(); containerPermissions.PublicAccess = BlobContainerPublicAccessType.Blob; container.SetPermissions(containerPermissions); if (false == delSubDirs) { CloudBlockBlob remoteFile = container.GetBlockBlobReference(fileName); remoteFile.DeleteIfExists(); } if (true == delSubDirs) { foreach (IListBlobItem item in container.ListBlobs(fileName, true)) { CloudBlockBlob blob = (CloudBlockBlob)item; blob.DeleteIfExists(); } } } catch (Exception) { return false; } return true; } public static bool FileExists(string accountName, string accountKey, string containerName, string fileName) { try { CloudStorageAccount storageAccount = CloudStorageAccount.Parse("DefaultEndpointsProtocol=http;AccountName=" + accountName + ";AccountKey=" + accountKey); CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient(); CloudBlobContainer container = blobClient.GetContainerReference(containerName); container.CreateIfNotExists(); BlobContainerPermissions containerPermissions = new BlobContainerPermissions(); containerPermissions.PublicAccess = BlobContainerPublicAccessType.Blob; container.SetPermissions(containerPermissions); IEnumerable files = container.ListBlobs(fileName, true); return (files.Count() > 0); } catch (Exception) { return false; } } internal static DryadLinqContext MakeBasicConfig(string cluster) //??? { var context = new DryadLinqContext(cluster); try { context.JobFriendlyName = "DryadLinq_DevUnitTests"; context.CompileForVertexDebugging = true; context.JobEnvironmentVariables.Add("DummyEnvVar", "hello"); //note: this is consumed by a unit-test. if (File.Exists("Microsoft.Hpc.Linq.pdb")) // TODO: fix references { context.ResourcesToAdd.Add("Microsoft.Hpc.Linq.pdb"); } if (File.Exists("Microsoft.Hpc.Dsc.Client.pdb")) // TODO: fix references { context.ResourcesToAdd.Add("Microsoft.Hpc.Dsc.Client.pdb"); } // To prevent job from running forever, and blocking other test context.JobRuntimeLimit = (int)TimeSpan.FromMinutes(30).TotalSeconds; //config.AllowConcurrentUserDelegatesInSingleProcess = false; // If we are on Azure, we have to set the nodeGroup to "NodeRole" so that the default of "ComputeNodes" is not used // This fixes "FromEnumerableTests" on Azure which queries the active node-group. // Note also, the headnode for an azure deployment defaults to "HPCCluster" (at least from James' script) int onAzureInt = 0; string onAzureString = Environment.GetEnvironmentVariable("CCP_SCHEDULERONAZURE"); if (onAzureString != null) { int.TryParse(onAzureString, out onAzureInt); } if (onAzureInt == 1) { context.NodeGroup = "NodeRole"; } } catch (DryadLinqException) { } return context; } internal static DryadLinqRecordReader MakeDryadRecordReader(DryadLinqContext context, string readPath) { DryadLinqFactory factory = (DryadLinqFactory)DryadLinqCodeGen.GetFactory(context, typeof(TRecord)); NativeBlockStream nativeStream = ReflectionHelper.CreateDryadLinqFileStream(readPath, FileMode.Open, FileAccess.Read); // ??? NativeBlockStream nativeStream = ReflectionHelper.CreateDryadLinqFileStream(readPath, FileMode.Open, FileAccess.Read, DscCompressionScheme.None); DryadLinqRecordReader reader = factory.MakeReader(nativeStream); return reader; } } public class Validate { public static void Check( IEnumerable[] ss, IComparer comparer = null, bool sort = true, bool verbose = false, IComparer sortcomparer = null ) { if (ss.Length == 0) return; if (comparer == null) { comparer = Comparer.Default; if (comparer == null) { throw new ArgumentNullException("Can't not be null."); } } if (sortcomparer == null) sortcomparer = comparer; T[][] aa = new T[ss.Length][]; for (int i = 0; i < aa.Length; i++) { aa[i] = ss[i].ToArray(); if (sort) Array.Sort(aa[i], sortcomparer); } int len = aa[0].Length; for (int i = 1; i < aa.Length; i++) { if (aa[i].Length != len) { throw new Exception("Wrong number of elements."); } } for (int i = 0; i < len; i++) { T elem = aa[0][i]; for (int j = 1; j < aa.Length; j++) { if (verbose) { //TestOutput.WriteLine("Comparing {0} to {1}", elem.ToString(), aa[j][i].ToString()); } if (comparer.Compare(elem, aa[j][i]) != 0) { throw new Exception("Elements failed to match: " + elem + " != " + aa[j][i]); } } } } internal static bool outFileExists(string outFile) { try { return Utils.FileExists(Config.accountName, Config.storageKey, Config.containerName, outFile); } catch (Exception) { return false; } } } public static class ReflectionHelper { /// /// Create DryadLinqFileStream object via reflection /// /// /// public static NativeBlockStream CreateDryadLinqFileStream(params object[] parameters) { return Assembly.LoadWithPartialName("Microsoft.Hpc.Linq").GetType("Microsoft.Hpc.Linq.Internal.HpcLinqFileStream") //??? .GetConstructor(BindingFlags.NonPublic | BindingFlags.Instance, null, parameters.Select(p => p.GetType()).ToArray(), null) .Invoke(parameters) as NativeBlockStream; } private static Type s_errorCodeType = null; public static int GetDryadLinqErrorCode(string name) { if (s_errorCodeType == null) { Assembly asm = Assembly.Load("Microsoft.Research.DryadLinq"); Type[] types = asm.GetTypes(); foreach (var t in types) { if (t.Name == "DryadLinqErrorCode") { s_errorCodeType = t; break; } } } var finfo = s_errorCodeType.GetField(name); return (int)finfo.GetValue(null); } } }