/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ using System; using System.Collections; using System.Collections.Generic; using System.IO; using System.Globalization; using System.Reflection; using System.Linq.Expressions; using System.Linq; using Microsoft.Research.DryadLinq.Internal; namespace Microsoft.Research.DryadLinq { /// /// Represents a key/value pair. Very similar to KeyValuePair, but adds a few more methods. /// /// The type of the key /// The type of the value [Serializable] public struct Pair : IEquatable> { private T1 m_key; private T2 m_value; /// /// Initializes an instance of this key-value Pair structure. /// /// The key of the pair. /// The value of the pair. [FieldMapping("x", "Key")] [FieldMapping("y", "Value")] public Pair(T1 x, T2 y) { this.m_key = x; this.m_value = y; } /// /// Gets the key in the key-value pair. /// public T1 Key { get { return this.m_key; } } /// /// Gets the value in the key-value pair. /// public T2 Value { get { return this.m_value; } } /// /// Indicates whether this instance and a specified object are equal. /// /// The object to compare with /// true iff this instance is equal to a specified object public override bool Equals(Object obj) { if (!(obj is Pair)) return false; Pair pair = (Pair)obj; return this.m_key.Equals(pair.Key) && this.m_value.Equals(pair.Value); } /// /// Determines whether this instance and another Pair are equal. /// /// The other Pair to compare with /// true iff this instance and the specified Pair are equal public bool Equals(Pair val) { return this.m_key.Equals(val.Key) && this.m_value.Equals(val.Value); } /// /// Determines whether two specified Pairs are equal. /// /// The first Pair /// The second Pair /// true iff two Pairs are equal public static bool Equals(Pair a, Pair b) { return a.Equals(b); } /// /// Determines whether two specified Pairs are equal. /// /// The left Pair /// The right Pair /// true iff two Pairs are equal public static bool operator ==(Pair a, Pair b) { return a.Equals(b); } /// /// Determines whether two specified Pairs are not equal. /// /// The left Pair /// The right Pair /// true iff two Pairs are not equal public static bool operator !=(Pair a, Pair b) { return !a.Equals(b); } /// /// Returns the hash code of the current Pair. /// /// A 32-bit signed integer. public override int GetHashCode() { return (-1521134295 * this.m_key.GetHashCode()) + this.m_value.GetHashCode(); } /// /// Returns a string that represents the current Pair. /// /// A string that represents the current Pair. public override string ToString() { return "<" + this.Key + ", " + this.Value + ">"; } } /// /// Defines some useful operators that are commonly used in applications. The /// operators are defined using the basic DryadLINQ operators. This class /// also shows how a user library can be defined. /// public static class DryadLinqExtension { /// /// The standard MapReduce. /// /// The type of the records of input dataset /// The type of the resulting records of mapper /// The type of the keys for hash exchange /// The type of the resulting records of reducer /// The input dataset /// The map function /// The key extraction function /// The reduce function /// The result dataset of MapReduce public static IQueryable MapReduce( this IQueryable source, Expression>> mapper, Expression> keySelector, Expression, TResult>> reducer) { return source.SelectMany(mapper).GroupBy(keySelector, reducer); } /// /// Compute the cross product of two datasets. The function procFunc is applied to each /// pair of the cross product to form the output dataset. /// /// The type of the records of dataset source1 /// The type of the records of dataset source2 /// The type of the records of the result dataset /// The first input dataset /// The second input dataset /// The function to apply to each pair of the cross product /// The output dataset public static IQueryable CrossProduct(this IQueryable source1, IQueryable source2, Expression> procFunc) { return source1.ApplyPerPartition(source2, (x_1, y_1) => DryadLinqHelper.Cross(x_1, y_1, procFunc), true); } /// /// Conditional DoWhile loop. /// /// The type of the input records /// The input dataset /// The code body of the DoWhile loop /// The termination condition of the DoWhile loop /// The loop unrolling count /// The output dataset public static IQueryable DoWhile(this IQueryable source, Func, IQueryable> body, Func, IQueryable, IQueryable> cond, Int32 count) { if (count < 0) { throw new ArgumentOutOfRangeException("count"); } if (count == 0) return source; IQueryable before = source; while (true) { IQueryable after = before; for (int i = 0; i < count; i++) { after = body(after); } var more = cond(before, after); DryadLinqQueryable.SubmitAndWait(after, more); if (!more.Single()) return after; before = after; } } /// /// Conditional DoWhile loop. /// /// The type of the input records /// The input dataset /// The code body of the DoWhile loop /// The termination condition of the DoWhile loop /// The output dataset public static IQueryable DoWhile(this IQueryable source, Func, IQueryable> body, Func, IQueryable, IQueryable> cond) { IQueryable before = source; while (true) { IQueryable after = body(before); var more = cond(before, after); DryadLinqQueryable.SubmitAndWait(after, more); if (!more.Single()) return after; before = after; } } /// /// Broadcast a dataset to multiple partitions /// /// The record type of the source /// The record type of the destination /// The source dataset to broadcast /// The destination dataset to receive /// The output dataset, which consists of multiple copies of source. The number /// of copies is the number of partitions of destination. public static IQueryable BroadCast(this IQueryable source, IQueryable destination) { return destination.ApplyPerPartition(source, (x, y) => y, true); } /// /// Broadcast a dataset to n partitions. /// /// The record type of the source /// The source dataset to broadcast /// The number of copies to broadcast /// The output dataset, each partition of which is a copy of source public static IQueryable BroadCast(this IQueryable source, int bcnt) { var dummy = source.ApplyPerPartition(x => DryadLinqHelper.ValueZero(x)) .HashPartition(x => x, bcnt); return dummy.ApplyPerPartition(source, (x, y) => y, true); } /// /// Check if each partition of the input dataset is ordered. /// /// The type of the records of the input dataset /// The type of the keys on which ordering is based /// The input dataset /// The key extraction function /// A Comparer on TKey to compare records /// True if the check is for descending /// The same dataset as the input public static IQueryable CheckOrderBy(this IQueryable source, Expression> keySelector, IComparer comparer, bool isDescending) { return source.ApplyPerPartition(x_1 => DryadLinqHelper.CheckSort(x_1, keySelector, comparer, isDescending)); } } }