/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ using System; using System.Collections; using System.Collections.Generic; using System.Collections.Specialized; using System.Text; using System.IO; using System.Linq; using System.Linq.Expressions; using System.Xml; using System.Data.Linq; using System.Data.Linq.Mapping; using System.Reflection; using System.Runtime.CompilerServices; using System.Diagnostics; using Microsoft.Research.DryadLinq.Internal; using System.IO.Compression; using Microsoft.Research.Peloponnese.Azure; namespace Microsoft.Research.DryadLinq { /// /// Specifies whether to compress the output of a DryadLINQ vertex. /// public enum CompressionScheme { /// /// No compression. /// None, /// /// Compression using gzip. /// Gzip } /// /// DataProvider provides an abstraction for different data backends. New data storage backends /// could be added by subclassing this class. /// public abstract class DataProvider { private static Dictionary s_providers; static DataProvider() { s_providers = new Dictionary(); s_providers.Add(DataPath.HDFS_URI_SCHEME, new HdfsDataProvider()); s_providers.Add(DataPath.PARTFILE_URI_SCHEME, new PartitionedFileDataProvider()); s_providers.Add(DataPath.WASB_URI_SCHEME, new WasbDataProvider()); s_providers.Add(DataPath.AZUREBLOB_URI_SCHEME, new AzureBlobDataProvider()); } /// /// The scheme of this data provider. /// public abstract string Scheme { get; } /// /// Gets the metadata of a specified dataset. /// /// A DryadLinqConext object. /// The URI of the dataset. /// The metadata. Returns null if metadats is not present. public abstract DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri); /// /// Gets information of a specified dataset. /// /// A DryadLinqContext object. /// The URI of the dataset. /// Information about a dataset. public abstract DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri); /// /// Gets a URI to store a temporary dataset. /// /// A DryadLinqContext object. /// A local path. /// The URI of a temporary directory. public abstract Uri GetTemporaryStreamUri(DryadLinqContext context, string path); /// /// Rewrites the URI of a dataset. Allows DataProvider specific rewriting. /// /// The element type of the specified dataset /// The current DryadLinqContext. /// The URI of the dataset. /// The intended access to the dataset. /// The rewritten URI of the dataset. public virtual Uri RewriteUri(DryadLinqContext context, Uri dataSetUri, FileAccess access = FileAccess.Read) { return dataSetUri; } /// /// Ingress a .NET collection to a specified store location. /// /// The record type of the collection. /// An instance of DryadLinqContext. /// The collection to be ingressed. /// The URI to store the collection. /// The metadata for the collection. /// The compression scheme used to store the collection. /// true to only store the collection temporarily with a time lease. /// A stream-based serializer. public abstract void Ingress(DryadLinqContext context, IEnumerable source, Uri dataSetUri, DryadLinqMetaData metaData, CompressionScheme outputScheme, bool isTemp, Expression, Stream>> serializer); /// /// Creates an instance of Stream for a dataset at a specified location. This is /// used by DryadLINQ to read a .NET collection from a store. /// /// An instance of DryadLinqContext. /// The URI of a dataset. /// An instance of Stream. public abstract Stream Egress(DryadLinqContext context, Uri dataSetUri); /// /// Checks the existence of a specified dataset. /// /// The current DryadLinqContext. /// The URI of the dataset. /// True to delete if the dataset exists. public abstract void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists); /// /// The path separator of this data provider. /// public virtual char PathSeparator { get { return '/'; } } /// /// Registers a new . This can be used to extend DryadLINQ to /// interact with a new kind of data store. /// /// A new DataProvider public static void Register(DataProvider provider) { string scheme = provider.Scheme; if (s_providers.ContainsKey(scheme)) { throw new DryadLinqException("Data provider for " + scheme + " has already existed."); } s_providers[scheme] = provider; } /// /// Get the data provider associated with a prefix. /// /// The data provider scheme /// The data provider internal static DataProvider GetDataProvider(string scheme) { DataProvider provider; if (!s_providers.TryGetValue(scheme, out provider)) { throw new DryadLinqException(DryadLinqErrorCode.Internal, String.Format(SR.UnknownProvider, scheme)); } return provider; } /// /// Get the dataset specified by a URI. /// /// The record type of the dataset. /// An instance of /// The URI of the dataset /// A stream-based deserializer /// A query object representing the specified dataset. internal static DryadLinqQuery GetPartitionedTable(DryadLinqContext context, Uri dataSetUri, Expression>> deserializer) { string scheme = DataPath.GetScheme(dataSetUri); DataProvider dataProvider = DataProvider.GetDataProvider(scheme); dataSetUri = dataProvider.RewriteUri(context, dataSetUri); return new DryadLinqQuery(context, dataProvider, dataSetUri, deserializer); } /// /// Reads the dataset specified by a URI. /// /// The record type of the dataset /// An instance of /// The URI of the dataset /// A stream-based deserializer /// A sequence of records as IEnumerable{T} internal static IEnumerable ReadData(DryadLinqContext context, Uri dataSetUri, Expression>> deserializer) { string scheme = DataPath.GetScheme(dataSetUri); DataProvider dataProvider = DataProvider.GetDataProvider(scheme); dataSetUri = dataProvider.RewriteUri(context, dataSetUri); return new DryadLinqQueryEnumerable(context, dataProvider, dataSetUri, deserializer); } /// /// Stores an IEnumerable{T} at a specified location. /// /// The record type of the data. /// An instance of /// The data to store. /// The URI of the store location. /// The metadata of the data. /// The compression scheme. /// true if the data is only stored temporarily. /// A stream-based serializer /// A stream-based deserializer /// An instance of IQueryable{T} for the data. internal static DryadLinqQuery StoreData(DryadLinqContext context, IEnumerable source, Uri dataSetUri, DryadLinqMetaData metaData, CompressionScheme outputScheme, bool isTemp, Expression, Stream>> serializer, Expression>> deserializer) { string scheme = DataPath.GetScheme(dataSetUri); DataProvider dataProvider = DataProvider.GetDataProvider(scheme); dataSetUri = dataProvider.RewriteUri(context, dataSetUri); dataProvider.Ingress(context, source, dataSetUri, metaData, outputScheme, isTemp, serializer); DryadLinqQuery res = DataProvider.GetPartitionedTable(context, dataSetUri, deserializer); res.CheckAndInitialize(); // must initialize return res; } } /// /// Basic information of a dataset. /// public class DryadLinqStreamInfo { /// /// The number of partitions of the dataset. Returns -1 if unknown. /// public Int32 PartitionCount { get; private set; } /// /// The size in bytes of the dataset. Returns -1 if unknown. /// public Int64 DataSize { get; private set; } /// /// Initializes an instance of DryadLinqStreamInfo. /// /// The number of partitions. /// The size in bytes. public DryadLinqStreamInfo(Int32 parCnt, Int64 size) { this.PartitionCount = parCnt; this.DataSize = size; } } internal class HdfsDataProvider : DataProvider { public override string Scheme { get { return DataPath.HDFS_URI_SCHEME; } } public override Uri GetTemporaryStreamUri(DryadLinqContext context, string path) { return context.Cluster.MakeInternalClusterUri("tmp", DataPath.TEMPORARY_STREAM_NAME_PREFIX, path); } public override DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri) { // TBD return null; } public override Uri RewriteUri(DryadLinqContext context, Uri dataSetUri, FileAccess access) { UriBuilder builder = new UriBuilder(dataSetUri); NameValueCollection query = System.Web.HttpUtility.ParseQueryString(builder.Query); if (access != FileAccess.Write && typeof(T) == typeof(Microsoft.Research.DryadLinq.LineRecord)) { query["seekBoundaries"] = "Microsoft.Research.DryadLinq.LineRecord"; } builder.Query = query.ToString(); return builder.Uri; } public override DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri) { Int32 parCnt = 0; Int64 size = -1; NameValueCollection query = System.Web.HttpUtility.ParseQueryString(dataSetUri.Query); bool expandBlocks = (query["seekboundaries"] == "Microsoft.Research.DryadLinq.LineRecord"); context.GetHdfsClient.GetDirectoryContentSummary(dataSetUri, expandBlocks, ref size, ref parCnt); if (parCnt == 0) { throw new DryadLinqException("Got 0 partition count for " + dataSetUri.AbsoluteUri); } return new DryadLinqStreamInfo(parCnt, size); } public override void Ingress(DryadLinqContext context, IEnumerable source, Uri dataSetUri, DryadLinqMetaData metaData, CompressionScheme outputScheme, bool isTemp, Expression, Stream>> serializer) { DryadLinqFactory factory = (DryadLinqFactory)DryadLinqCodeGen.GetFactory(context, typeof(T)); using (Stream stream = context.GetHdfsClient.GetDfsStreamWriter(dataSetUri)) { DryadLinqBlockStream nativeStream = new DryadLinqBlockStream(stream); DryadLinqRecordWriter writer = factory.MakeWriter(nativeStream); foreach (T rec in source) { writer.WriteRecordSync(rec); } writer.Close(); } } public override Stream Egress(DryadLinqContext context, Uri dataSetUri) { return context.GetHdfsClient.GetDfsDirectoryStreamReader(dataSetUri); } public override void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists) { if (context.GetHdfsClient.IsFileExists(dataSetUri)) { if (!deleteIfExists) { throw new DryadLinqException("Can't output to existing HDFS collection " + dataSetUri.AbsoluteUri); } context.GetHdfsClient.DeleteDfsFile(dataSetUri, true); } } } internal class PartitionedFileDataProvider : DataProvider { public override string Scheme { get { return DataPath.PARTFILE_URI_SCHEME; } } public override Uri GetTemporaryStreamUri(DryadLinqContext context, string path) { string wd = Directory.GetCurrentDirectory(); path = Path.Combine(Path.GetPathRoot(wd), DataPath.TEMPORARY_STREAM_NAME_PREFIX, path); Directory.CreateDirectory(Path.GetDirectoryName(path)); Uri uri = new Uri(this.Scheme + ":///" + path); return uri; } public override DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri) { // TBD return null; } public override DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri) { string fileName = dataSetUri.LocalPath; var lines = File.ReadAllLines(fileName); if (lines.Length < 3) { throw new DryadLinqException("The partition file " + dataSetUri + " is malformed."); } Int32 parCnt = int.Parse(lines[1].Trim()); Int64 size = 0; for (int i = 2; i < lines.Length; i++) { string[] fields = lines[i].Split(','); size += Int64.Parse(fields[1]); } return new DryadLinqStreamInfo(parCnt, size); } public override void Ingress(DryadLinqContext context, IEnumerable source, Uri dataSetUri, DryadLinqMetaData metaData, CompressionScheme compressionScheme, bool isTemp, Expression, Stream>> serializer) { string fileName = dataSetUri.LocalPath; if (!String.IsNullOrEmpty(dataSetUri.Host)) { fileName = @"\\" + dataSetUri.Host + fileName; } // Write the partition: string partDir = Path.GetDirectoryName(fileName); partDir = Path.Combine(partDir, DryadLinqUtil.MakeUniqueName()); Directory.CreateDirectory(partDir); string uncPath = Path.Combine(partDir, "Part"); string partitionPath = uncPath + ".00000000"; DryadLinqFactory factory = (DryadLinqFactory)DryadLinqCodeGen.GetFactory(context, typeof(T)); using (FileStream fstream = new FileStream(partitionPath, FileMode.CreateNew, FileAccess.Write)) { if (serializer == null) { DryadLinqFileBlockStream nativeStream = new DryadLinqFileBlockStream(fstream, compressionScheme); DryadLinqRecordWriter writer = factory.MakeWriter(nativeStream); foreach (T rec in source) { writer.WriteRecordSync(rec); } writer.Close(); } else { Action, Stream> serializerFunc = serializer.Compile(); serializerFunc(source, fstream); } } // Write the partfile: long partSize = new FileInfo(partitionPath).Length; using (StreamWriter writer = File.CreateText(fileName)) { writer.WriteLine(uncPath); writer.WriteLine("1"); writer.WriteLine("{0},{1}", 0, partSize); } } public override Stream Egress(DryadLinqContext context, Uri dataSetUri) { string fileName = dataSetUri.LocalPath; if (!String.IsNullOrEmpty(dataSetUri.Host)) { fileName = @"\\" + dataSetUri.Host + fileName; } var lines = File.ReadAllLines(fileName); if (lines.Length < 3) { throw new DryadLinqException("The partition file " + dataSetUri + " is malformed."); } string[] filePathArray = this.GetPartitionPaths(lines); return new DryadLinqMultiFileStream(filePathArray, CompressionScheme.None); } public override void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists) { string fileName = dataSetUri.LocalPath; if (!String.IsNullOrEmpty(dataSetUri.Host)) { fileName = @"\\" + dataSetUri.Host + fileName; } if (File.Exists(fileName)) { if (!deleteIfExists) { throw new DryadLinqException("Can't output to existing Partitioned File collection " + dataSetUri.AbsoluteUri); } // Note: We delete all the partitions! var lines = File.ReadAllLines(fileName); try { foreach (string path in this.GetPartitionPaths(lines)) { if (File.Exists(path)) { File.Delete(path); } } } catch (Exception) { /*skip*/ } File.Delete(fileName); } } private string[] GetPartitionPaths(string[] lines) { string[] filePathArray = new string[lines.Length - 2]; for (int i = 2; i < lines.Length; i++) { int idx = i - 2; string[] fields = lines[i].Split(','); if (fields.Length > 2 && fields[2].Contains(':')) { string[] parts = fields[2].Split(':'); filePathArray[idx] = String.Format(@"\\{0}\{1}", parts[0], parts[1]); } else if (Path.IsPathRooted(lines[0])) { filePathArray[idx] = String.Format("{0}.{1:X8}", lines[0], idx); } else { filePathArray[idx] = String.Format(@"\\{0}\{1}.{2:X8}", fields[2], lines[0], idx); } } return filePathArray; } } internal class WasbDataProvider : HdfsDataProvider { public override string Scheme { get { return DataPath.WASB_URI_SCHEME; } } } internal class AzureBlobDataProvider : DataProvider { public override string Scheme { get { return DataPath.AZUREBLOB_URI_SCHEME; } } public override Uri GetTemporaryStreamUri(DryadLinqContext context, string path) { return context.Cluster.MakeInternalClusterUri(DataPath.TEMPORARY_STREAM_NAME_PREFIX, path); } public override Uri RewriteUri(DryadLinqContext context, Uri dataSetUri, FileAccess access) { string account, key, container, blob; Microsoft.Research.Peloponnese.Azure.Utils.FromAzureUri(dataSetUri, out account, out key, out container, out blob); UriBuilder builder = new UriBuilder(dataSetUri); NameValueCollection query = System.Web.HttpUtility.ParseQueryString(builder.Query); if (key == null) { query["key"] = context.AzureAccountKey(account); } if (access != FileAccess.Write && typeof(T) == typeof(Microsoft.Research.DryadLinq.LineRecord)) { query["seekBoundaries"] = "Microsoft.Research.DryadLinq.LineRecord"; } builder.Query = query.ToString(); return builder.Uri; } public override DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri) { // TBD return null; } public override DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri) { Int32 parCnt = 1; Int64 size = -1; try { AzureCollectionPartition partition = new AzureCollectionPartition(dataSetUri); if (!partition.IsCollectionExists()) { throw new DryadLinqException("Input collection " + dataSetUri + " does not exist"); } parCnt = partition.GetPartition().Count(); size = partition.TotalLength; } catch (Exception e) { throw new DryadLinqException("Can't get Azure stream info for " + dataSetUri, e); } return new DryadLinqStreamInfo(parCnt, size); } public override void Ingress(DryadLinqContext context, IEnumerable source, Uri dataSetUri, DryadLinqMetaData metaData, CompressionScheme compressionScheme, bool isTemp, Expression, Stream>> serializer) { string account, key, container, blob; Utils.FromAzureUri(dataSetUri, out account, out key, out container, out blob); if (compressionScheme != CompressionScheme.None) { throw new DryadLinqException("Not implemented: writing to Azure temporary storage with compression enabled"); } AzureDfsClient client = new AzureDfsClient(account, key, container); DryadLinqFactory factory = (DryadLinqFactory)DryadLinqCodeGen.GetFactory(context, typeof(T)); using (Stream stream = client.GetDfsStreamWriterAsync(dataSetUri).Result) { if (serializer == null) { DryadLinqBlockStream nativeStream = new DryadLinqBlockStream(stream); DryadLinqRecordWriter writer = factory.MakeWriter(nativeStream); foreach (T rec in source) { writer.WriteRecordSync(rec); } writer.Close(); } else { Action, Stream> serializerFunc = serializer.Compile(); serializerFunc(source, stream); } } } public override Stream Egress(DryadLinqContext context, Uri dataSetUri) { try { AzureCollectionPartition partition = new AzureCollectionPartition(dataSetUri); if (!partition.IsCollectionExists()) { throw new DryadLinqException("Input collection " + dataSetUri + " does not exist"); } Stream dataSetStream = partition.GetReadStream(); return dataSetStream; } catch (Exception e) { throw new DryadLinqException("Can't get Azure stream info for " + dataSetUri, e); } } public override void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists) { AzureCollectionPartition partition = new AzureCollectionPartition(dataSetUri); if (partition.IsCollectionExists()) { if (!deleteIfExists) { throw new DryadLinqException("Can't output to existing Azure Blob collection " + dataSetUri.AbsoluteUri); } partition.DeleteCollection(); } } } }