/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ using System; using System.Collections; using System.Collections.Generic; using System.Collections.Specialized; using System.Text; using System.IO; using System.Linq; using System.Linq.Expressions; using System.Xml; using System.Data.Linq; using System.Data.Linq.Mapping; using System.Reflection; using System.Runtime.CompilerServices; using System.Diagnostics; using Microsoft.Research.DryadLinq.Internal; using System.IO.Compression; using Microsoft.Research.Peloponnese.Storage; namespace Microsoft.Research.DryadLinq { /// /// Specifies whether to compress the output of a DryadLINQ vertex. /// public enum CompressionScheme { /// /// No compression. /// None, /// /// Compression using gzip. /// Gzip } /// /// DataProvider provides an abstraction for different data backends. New data storage backends /// could be added by subclassing this class. /// public abstract class DataProvider { private static Dictionary s_providers; static DataProvider() { s_providers = new Dictionary(); s_providers.Add(DataPath.HDFS_URI_SCHEME, new HdfsDataProvider()); s_providers.Add(DataPath.PARTFILE_URI_SCHEME, new PartitionedFileDataProvider()); s_providers.Add(DataPath.WASB_URI_SCHEME, new WasbDataProvider()); s_providers.Add(DataPath.AZUREBLOB_URI_SCHEME, new AzureBlobDataProvider()); } /// /// The scheme of this data provider. /// public abstract string Scheme { get; } /// /// Gets the metadata of a specified dataset. /// /// A DryadLinqConext object. /// The URI of the dataset. /// The metadata. Returns null if metadats is not present. public abstract DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri); /// /// Gets information of a specified dataset. /// /// A DryadLinqContext object. /// The URI of the dataset. /// Information about a dataset. public abstract DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri); /// /// Gets a URI to store a temporary dataset. /// /// A DryadLinqContext object. /// A local path. /// The URI of a temporary directory. public abstract Uri GetTemporaryStreamUri(DryadLinqContext context, string path); /// /// Rewrites the URI of a dataset. Allows DataProvider specific rewriting. /// /// The element type of the specified dataset /// The current DryadLinqContext. /// The URI of the dataset. /// The intended access to the dataset. /// The rewritten URI of the dataset. public virtual Uri RewriteUri(DryadLinqContext context, Uri dataSetUri, FileAccess access = FileAccess.Read) { return dataSetUri; } /// /// Ingress a .NET collection to a specified store location. /// /// The record type of the collection. /// An instance of DryadLinqContext. /// The collection to be ingressed. /// The URI to store the collection. /// The metadata for the collection. /// The compression scheme used to store the collection. /// true to only store the collection temporarily with a time lease. public abstract void Ingress(DryadLinqContext context, IEnumerable source, Uri dataSetUri, DryadLinqMetaData metaData, CompressionScheme outputScheme, bool isTemp = false); /// /// Creates an instance of Stream for a dataset at a specified location. This is /// used by DryadLINQ to read a .NET collection from a store. /// /// An instance of DryadLinqContext. /// The URI of a dataset. /// An instance of Stream. public abstract Stream Egress(DryadLinqContext context, Uri dataSetUri); /// /// Checks the existence of a specified dataset. /// /// The current DryadLinqContext. /// The URI of the dataset. /// True to delete if the dataset exists. public abstract void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists); /// /// The path separator of this data provider. /// public virtual char PathSeparator { get { return '/'; } } /// /// Registers a new . This can be used to extend DryadLINQ to /// interact with a new kind of data store. /// /// A new DataProvider public static void Register(DataProvider provider) { string scheme = provider.Scheme; if (s_providers.ContainsKey(scheme)) { throw new DryadLinqException("Data provider for " + scheme + " has already existed."); } s_providers[scheme] = provider; } /// /// Get the data provider associated with a prefix. /// /// The data provider scheme /// The data provider internal static DataProvider GetDataProvider(string scheme) { DataProvider provider; if (!s_providers.TryGetValue(scheme, out provider)) { throw new DryadLinqException(DryadLinqErrorCode.Internal, String.Format(SR.UnknownProvider, scheme)); } return provider; } /// /// Get the dataset specified by a URI. /// /// The record type of the dataset. /// An instance of /// The URI of the dataset /// A query object representing the specified dataset. internal static DryadLinqQuery GetPartitionedTable(DryadLinqContext context, Uri dataSetUri) { string scheme = DataPath.GetScheme(dataSetUri); DataProvider dataProvider = DataProvider.GetDataProvider(scheme); dataSetUri = dataProvider.RewriteUri(context, dataSetUri); return new DryadLinqQuery(context, dataProvider, dataSetUri); } /// /// Reads a specified dataset. /// /// The record type of the dataset. /// An instance of /// The URI of the dataset. /// A sequence of records as IEnumerable{T}. public static IEnumerable ReadData(DryadLinqContext context, Uri dataSetUri) { string scheme = DataPath.GetScheme(dataSetUri); DataProvider dataProvider = DataProvider.GetDataProvider(scheme); dataSetUri = dataProvider.RewriteUri(context, dataSetUri); return new DryadLinqQueryEnumerable(context, dataProvider, dataSetUri); } /// /// Stores an IEnumerable{T} at a specified location. /// /// The record type of the data. /// An instance of /// The data to store. /// The URI of the store location. /// The metadata of the data. /// The compression scheme. /// true if the data is only stored temporarily. /// An instance of IQueryable{T} for the data. internal static DryadLinqQuery StoreData(DryadLinqContext context, IEnumerable source, Uri dataSetUri, DryadLinqMetaData metaData, CompressionScheme outputScheme, bool isTemp = false) { string scheme = DataPath.GetScheme(dataSetUri); DataProvider dataProvider = DataProvider.GetDataProvider(scheme); dataSetUri = dataProvider.RewriteUri(context, dataSetUri); dataProvider.Ingress(context, source, dataSetUri, metaData, outputScheme, isTemp); DryadLinqQuery res = DataProvider.GetPartitionedTable(context, dataSetUri); res.CheckAndInitialize(); // must initialize return res; } } /// /// Basic information of a dataset. /// public class DryadLinqStreamInfo { /// /// The number of partitions of the dataset. Returns -1 if unknown. /// public Int32 PartitionCount { get; private set; } /// /// The size in bytes of the dataset. Returns -1 if unknown. /// public Int64 DataSize { get; private set; } /// /// Initializes an instance of DryadLinqStreamInfo. /// /// The number of partitions. /// The size in bytes. public DryadLinqStreamInfo(Int32 parCnt, Int64 size) { this.PartitionCount = parCnt; this.DataSize = size; } } internal class HdfsDataProvider : DataProvider { public override string Scheme { get { return DataPath.HDFS_URI_SCHEME; } } public override Uri GetTemporaryStreamUri(DryadLinqContext context, string path) { return context.Cluster.MakeDefaultUri(DataPath.TEMPORARY_STREAM_NAME_PREFIX + path); } public override DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri) { // TBD return null; } public override DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri) { Int32 parCnt = 0; Int64 size = -1; context.Cluster.DfsClient.GetContentSummary(dataSetUri.AbsolutePath, ref size, ref parCnt); if (parCnt == 0) { throw new DryadLinqException("Got 0 partition count for " + dataSetUri.AbsoluteUri); } return new DryadLinqStreamInfo(parCnt, size); } public override void Ingress(DryadLinqContext context, IEnumerable source, Uri dataSetUri, DryadLinqMetaData metaData, CompressionScheme outputScheme, bool isTemp = false) { throw new DryadLinqException("TBA"); } public override Stream Egress(DryadLinqContext context, Uri dataSetUri) { throw new DryadLinqException("TBA"); } public override void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists) { WebHdfsClient client = new WebHdfsClient(dataSetUri.Host, 8033, 50070); if (client.IsFileExists(dataSetUri.AbsolutePath)) { if (!deleteIfExists) { throw new DryadLinqException("Can't output to existing HDFS collection " + dataSetUri.AbsoluteUri); } client.DeleteDfsFile(dataSetUri.AbsolutePath); } } } internal class PartitionedFileDataProvider : DataProvider { public override string Scheme { get { return DataPath.PARTFILE_URI_SCHEME; } } public override Uri GetTemporaryStreamUri(DryadLinqContext context, string path) { string wd = Directory.GetCurrentDirectory(); path = Path.Combine(Path.GetPathRoot(wd), DataPath.TEMPORARY_STREAM_NAME_PREFIX, path); Directory.CreateDirectory(Path.GetDirectoryName(path)); Uri uri = new Uri(this.Scheme + ":///" + path); return uri; } public override DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri) { // TBD return null; } public override DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri) { string fileName = dataSetUri.LocalPath; var lines = File.ReadAllLines(fileName); if (lines.Length < 3) { throw new DryadLinqException("The partition file " + dataSetUri + " is malformed."); } Int32 parCnt = int.Parse(lines[1].Trim()); Int64 size = 0; for (int i = 2; i < lines.Length; i++) { string[] fields = lines[i].Split(','); size += Int64.Parse(fields[1]); } return new DryadLinqStreamInfo(parCnt, size); } public override void Ingress(DryadLinqContext context, IEnumerable source, Uri dataSetUri, DryadLinqMetaData metaData, CompressionScheme compressionScheme, bool isTemp = false) { // Write the partition: string partDir = context.PartitionUncPath; if (partDir == null) { partDir = Path.GetDirectoryName(dataSetUri.LocalPath); } if (!Path.IsPathRooted(partDir)) { partDir = Path.Combine("/", partDir); } partDir = Path.Combine(partDir, DryadLinqUtil.MakeUniqueName()); Directory.CreateDirectory(partDir); string partPath = Path.Combine(partDir, "Part"); string partFilePath = partPath + ".00000000"; DryadLinqFactory factory = (DryadLinqFactory)DryadLinqCodeGen.GetFactory(context, typeof(T)); using (FileStream fstream = new FileStream(partFilePath, FileMode.CreateNew, FileAccess.Write)) { DryadLinqFileBlockStream nativeStream = new DryadLinqFileBlockStream(fstream, compressionScheme); DryadLinqRecordWriter writer = factory.MakeWriter(nativeStream); foreach (T rec in source) { writer.WriteRecordSync(rec); } writer.Close(); } // Write the partfile: FileInfo finfo = new FileInfo(partFilePath); using (StreamWriter writer = File.CreateText(dataSetUri.LocalPath)) { writer.WriteLine(partPath); writer.WriteLine("1"); writer.WriteLine("{0},{1},{2}", 0, finfo.Length, Environment.MachineName); } } public override Stream Egress(DryadLinqContext context, Uri dataSetUri) { string fileName = dataSetUri.LocalPath; var lines = File.ReadAllLines(fileName); if (lines.Length < 3) { throw new DryadLinqException("The partition file " + dataSetUri + " is malformed."); } string[] filePathArray = this.GetPartitionPaths(lines); return new DryadLinqMultiFileStream(filePathArray, CompressionScheme.None); } public override void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists) { string fileName = dataSetUri.LocalPath; if (File.Exists(fileName)) { if (!deleteIfExists) { throw new DryadLinqException("Can't output to existing Partitioned File collection " + dataSetUri.AbsoluteUri); } // Note: We delete all the partitions! var lines = File.ReadAllLines(fileName); try { foreach (string path in this.GetPartitionPaths(lines)) { if (File.Exists(path)) { File.Delete(path); } } } catch (Exception) { /*skip*/ } File.Delete(fileName); } } private string[] GetPartitionPaths(string[] lines) { bool isLocalPath = lines[0].Contains(':'); string[] filePathArray = new string[lines.Length - 2]; for (int i = 2; i < lines.Length; i++) { int idx = i - 2; string[] fields = lines[i].Split(','); if (fields[2].Contains(':')) { string[] parts = fields[2].Split(':'); filePathArray[idx] = String.Format(@"\\{0}\{1}", parts[0], parts[1]); } else if (isLocalPath) { filePathArray[idx] = String.Format("{0}.{1:X8}", lines[0], idx); } else { filePathArray[idx] = String.Format(@"\\{0}\{1}.{2:X8}", fields[2], lines[0], idx); } } return filePathArray; } } internal class WasbDataProvider : HdfsDataProvider { public override string Scheme { get { return DataPath.WASB_URI_SCHEME; } } } internal class AzureBlobDataProvider : DataProvider { public override string Scheme { get { return DataPath.AZUREBLOB_URI_SCHEME; } } public override Uri GetTemporaryStreamUri(DryadLinqContext context, string path) { return context.Cluster.MakeDefaultUri(DataPath.TEMPORARY_STREAM_NAME_PREFIX + path); } public override Uri RewriteUri(DryadLinqContext context, Uri dataSetUri, FileAccess access) { string account, key, container, blob; AzureUtils.FromAzureUri(dataSetUri, out account, out key, out container, out blob); UriBuilder builder = new UriBuilder(dataSetUri); NameValueCollection query = System.Web.HttpUtility.ParseQueryString(builder.Query); if (key == null) { query["key"] = context.AzureAccountKey(account); } if (access != FileAccess.Write && typeof(T) == typeof(Microsoft.Research.DryadLinq.LineRecord)) { query["seekBoundaries"] = "Microsoft.Research.DryadLinq.LineRecord"; } builder.Query = query.ToString(); return builder.Uri; } public override DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri) { // TBD return null; } public override DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri) { Int32 parCnt = 1; Int64 size = -1; try { AzureCollectionPartition partition = new AzureCollectionPartition(dataSetUri); if (!partition.IsCollectionExists()) { throw new DryadLinqException("Input collection " + dataSetUri + " does not exist"); } parCnt = partition.GetPartition().Count(); size = partition.TotalLength; } catch (Exception e) { throw new DryadLinqException("Can't get Azure stream info for " + dataSetUri, e); } return new DryadLinqStreamInfo(parCnt, size); } public override void Ingress(DryadLinqContext context, IEnumerable source, Uri dataSetUri, DryadLinqMetaData metaData, CompressionScheme compressionScheme, bool isTemp = false) { string account, key, container, blob; AzureUtils.FromAzureUri(dataSetUri, out account, out key, out container, out blob); if (compressionScheme != CompressionScheme.None) { throw new DryadLinqException("Not implemented: writing to Azure temporary storage with compression enabled"); } AzureDfsClient client = new AzureDfsClient(account, key, container); DryadLinqFactory factory = (DryadLinqFactory)DryadLinqCodeGen.GetFactory(context, typeof(T)); using (Stream stream = client.GetFileStreamWriterAsync(blob).Result) { DryadLinqBlockStream nativeStream = new DryadLinqBlockStream(stream); DryadLinqRecordWriter writer = factory.MakeWriter(nativeStream); foreach (T rec in source) { writer.WriteRecordSync(rec); } writer.Close(); } } public override Stream Egress(DryadLinqContext context, Uri dataSetUri) { try { AzureCollectionPartition partition = new AzureCollectionPartition(dataSetUri); if (!partition.IsCollectionExists()) { throw new DryadLinqException("Input collection " + dataSetUri + " does not exist"); } Stream dataSetStream = partition.GetReadStream(); return dataSetStream; } catch (Exception e) { throw new DryadLinqException("Can't get Azure stream info for " + dataSetUri, e); } } public override void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists) { AzureCollectionPartition partition = new AzureCollectionPartition(dataSetUri); if (partition.IsCollectionExists()) { if (!deleteIfExists) { throw new DryadLinqException("Can't output to existing Azure Blob collection " + dataSetUri.AbsoluteUri); } partition.DeleteCollection(); } } } }