/*
Copyright (c) Microsoft Corporation
All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
compliance with the License. You may obtain a copy of the License
at http://www.apache.org/licenses/LICENSE-2.0
THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER
EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF
TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT.
See the Apache Version 2.0 License for specific language governing permissions and
limitations under the License.
*/
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.Text;
using System.IO;
using System.Linq;
using System.Linq.Expressions;
using System.Xml;
using System.Data.Linq;
using System.Data.Linq.Mapping;
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Diagnostics;
using Microsoft.Research.DryadLinq.Internal;
using System.IO.Compression;
using Microsoft.Research.Peloponnese.Storage;
namespace Microsoft.Research.DryadLinq
{
///
/// Specifies whether to compress the output of a DryadLINQ vertex.
///
public enum CompressionScheme
{
///
/// No compression.
///
None,
///
/// Compression using gzip.
///
Gzip
}
///
/// DataProvider provides an abstraction for different data backends. New data storage backends
/// could be added by subclassing this class.
///
public abstract class DataProvider
{
private static Dictionary s_providers;
static DataProvider()
{
s_providers = new Dictionary();
s_providers.Add(DataPath.HDFS_URI_SCHEME, new HdfsDataProvider());
s_providers.Add(DataPath.PARTFILE_URI_SCHEME, new PartitionedFileDataProvider());
s_providers.Add(DataPath.WASB_URI_SCHEME, new WasbDataProvider());
s_providers.Add(DataPath.AZUREBLOB_URI_SCHEME, new AzureBlobDataProvider());
}
///
/// The scheme of this data provider.
///
public abstract string Scheme { get; }
///
/// Gets the metadata of a specified dataset.
///
/// A DryadLinqConext object.
/// The URI of the dataset.
/// The metadata. Returns null if metadats is not present.
public abstract DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri);
///
/// Gets information of a specified dataset.
///
/// A DryadLinqContext object.
/// The URI of the dataset.
/// Information about a dataset.
public abstract DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri);
///
/// Gets a URI to store a temporary dataset.
///
/// A DryadLinqContext object.
/// A local path.
/// The URI of a temporary directory.
public abstract Uri GetTemporaryStreamUri(DryadLinqContext context, string path);
///
/// Rewrites the URI of a dataset. Allows DataProvider specific rewriting.
///
/// The element type of the specified dataset
/// The current DryadLinqContext.
/// The URI of the dataset.
/// The intended access to the dataset.
/// The rewritten URI of the dataset.
public virtual Uri RewriteUri(DryadLinqContext context,
Uri dataSetUri,
FileAccess access = FileAccess.Read)
{
return dataSetUri;
}
///
/// Ingress a .NET collection to a specified store location.
///
/// The record type of the collection.
/// An instance of DryadLinqContext.
/// The collection to be ingressed.
/// The URI to store the collection.
/// The metadata for the collection.
/// The compression scheme used to store the collection.
/// true to only store the collection temporarily with a time lease.
public abstract void Ingress(DryadLinqContext context,
IEnumerable source,
Uri dataSetUri,
DryadLinqMetaData metaData,
CompressionScheme outputScheme,
bool isTemp = false);
///
/// Creates an instance of Stream for a dataset at a specified location. This is
/// used by DryadLINQ to read a .NET collection from a store.
///
/// An instance of DryadLinqContext.
/// The URI of a dataset.
/// An instance of Stream.
public abstract Stream Egress(DryadLinqContext context, Uri dataSetUri);
///
/// Checks the existence of a specified dataset.
///
/// The current DryadLinqContext.
/// The URI of the dataset.
/// True to delete if the dataset exists.
public abstract void CheckExistence(DryadLinqContext context,
Uri dataSetUri,
bool deleteIfExists);
///
/// The path separator of this data provider.
///
public virtual char PathSeparator
{
get { return '/'; }
}
///
/// Registers a new . This can be used to extend DryadLINQ to
/// interact with a new kind of data store.
///
/// A new DataProvider
public static void Register(DataProvider provider)
{
string scheme = provider.Scheme;
if (s_providers.ContainsKey(scheme))
{
throw new DryadLinqException("Data provider for " + scheme + " has already existed.");
}
s_providers[scheme] = provider;
}
///
/// Get the data provider associated with a prefix.
///
/// The data provider scheme
/// The data provider
internal static DataProvider GetDataProvider(string scheme)
{
DataProvider provider;
if (!s_providers.TryGetValue(scheme, out provider))
{
throw new DryadLinqException(DryadLinqErrorCode.Internal,
String.Format(SR.UnknownProvider, scheme));
}
return provider;
}
///
/// Get the dataset specified by a URI.
///
/// The record type of the dataset.
/// An instance of
/// The URI of the dataset
/// A query object representing the specified dataset.
internal static DryadLinqQuery GetPartitionedTable(DryadLinqContext context, Uri dataSetUri)
{
string scheme = DataPath.GetScheme(dataSetUri);
DataProvider dataProvider = DataProvider.GetDataProvider(scheme);
dataSetUri = dataProvider.RewriteUri(context, dataSetUri);
return new DryadLinqQuery(context, dataProvider, dataSetUri);
}
///
/// Reads a specified dataset.
///
/// The record type of the dataset.
/// An instance of
/// The URI of the dataset.
/// A sequence of records as IEnumerable{T}.
public static IEnumerable ReadData(DryadLinqContext context, Uri dataSetUri)
{
string scheme = DataPath.GetScheme(dataSetUri);
DataProvider dataProvider = DataProvider.GetDataProvider(scheme);
dataSetUri = dataProvider.RewriteUri(context, dataSetUri);
return new DryadLinqQueryEnumerable(context, dataProvider, dataSetUri);
}
///
/// Stores an IEnumerable{T} at a specified location.
///
/// The record type of the data.
/// An instance of
/// The data to store.
/// The URI of the store location.
/// The metadata of the data.
/// The compression scheme.
/// true if the data is only stored temporarily.
/// An instance of IQueryable{T} for the data.
internal static DryadLinqQuery StoreData(DryadLinqContext context,
IEnumerable source,
Uri dataSetUri,
DryadLinqMetaData metaData,
CompressionScheme outputScheme,
bool isTemp = false)
{
string scheme = DataPath.GetScheme(dataSetUri);
DataProvider dataProvider = DataProvider.GetDataProvider(scheme);
dataSetUri = dataProvider.RewriteUri(context, dataSetUri);
dataProvider.Ingress(context, source, dataSetUri, metaData, outputScheme, isTemp);
DryadLinqQuery res = DataProvider.GetPartitionedTable(context, dataSetUri);
res.CheckAndInitialize(); // must initialize
return res;
}
}
///
/// Basic information of a dataset.
///
public class DryadLinqStreamInfo
{
///
/// The number of partitions of the dataset. Returns -1 if unknown.
///
public Int32 PartitionCount { get; private set; }
///
/// The size in bytes of the dataset. Returns -1 if unknown.
///
public Int64 DataSize { get; private set; }
///
/// Initializes an instance of DryadLinqStreamInfo.
///
/// The number of partitions.
/// The size in bytes.
public DryadLinqStreamInfo(Int32 parCnt, Int64 size)
{
this.PartitionCount = parCnt;
this.DataSize = size;
}
}
internal class HdfsDataProvider : DataProvider
{
public override string Scheme
{
get { return DataPath.HDFS_URI_SCHEME; }
}
public override Uri GetTemporaryStreamUri(DryadLinqContext context, string path)
{
return context.Cluster.MakeDefaultUri(DataPath.TEMPORARY_STREAM_NAME_PREFIX + path);
}
public override DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri)
{
// TBD
return null;
}
public override DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri)
{
Int32 parCnt = 0;
Int64 size = -1;
context.Cluster.DfsClient.GetContentSummary(dataSetUri.AbsolutePath, ref size, ref parCnt);
if (parCnt == 0)
{
throw new DryadLinqException("Got 0 partition count for " + dataSetUri.AbsoluteUri);
}
return new DryadLinqStreamInfo(parCnt, size);
}
public override void Ingress(DryadLinqContext context,
IEnumerable source,
Uri dataSetUri,
DryadLinqMetaData metaData,
CompressionScheme outputScheme,
bool isTemp = false)
{
throw new DryadLinqException("TBA");
}
public override Stream Egress(DryadLinqContext context, Uri dataSetUri)
{
throw new DryadLinqException("TBA");
}
public override void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists)
{
WebHdfsClient client = new WebHdfsClient(dataSetUri.Host, 8033, 50070);
if (client.IsFileExists(dataSetUri.AbsolutePath))
{
if (!deleteIfExists)
{
throw new DryadLinqException("Can't output to existing HDFS collection " + dataSetUri.AbsoluteUri);
}
client.DeleteDfsFile(dataSetUri.AbsolutePath);
}
}
}
internal class PartitionedFileDataProvider : DataProvider
{
public override string Scheme
{
get { return DataPath.PARTFILE_URI_SCHEME; }
}
public override Uri GetTemporaryStreamUri(DryadLinqContext context, string path)
{
string wd = Directory.GetCurrentDirectory();
path = Path.Combine(Path.GetPathRoot(wd), DataPath.TEMPORARY_STREAM_NAME_PREFIX, path);
Directory.CreateDirectory(Path.GetDirectoryName(path));
Uri uri = new Uri(this.Scheme + ":///" + path);
return uri;
}
public override DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri)
{
// TBD
return null;
}
public override DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri)
{
string fileName = dataSetUri.LocalPath;
var lines = File.ReadAllLines(fileName);
if (lines.Length < 3)
{
throw new DryadLinqException("The partition file " + dataSetUri + " is malformed.");
}
Int32 parCnt = int.Parse(lines[1].Trim());
Int64 size = 0;
for (int i = 2; i < lines.Length; i++)
{
string[] fields = lines[i].Split(',');
size += Int64.Parse(fields[1]);
}
return new DryadLinqStreamInfo(parCnt, size);
}
public override void Ingress(DryadLinqContext context,
IEnumerable source,
Uri dataSetUri,
DryadLinqMetaData metaData,
CompressionScheme compressionScheme,
bool isTemp = false)
{
// Write the partition:
string partDir = context.PartitionUncPath;
if (partDir == null)
{
partDir = Path.GetDirectoryName(dataSetUri.LocalPath);
}
if (!Path.IsPathRooted(partDir))
{
partDir = Path.Combine("/", partDir);
}
partDir = Path.Combine(partDir, DryadLinqUtil.MakeUniqueName());
Directory.CreateDirectory(partDir);
string partPath = Path.Combine(partDir, "Part");
string partFilePath = partPath + ".00000000";
DryadLinqFactory factory = (DryadLinqFactory)DryadLinqCodeGen.GetFactory(context, typeof(T));
using (FileStream fstream = new FileStream(partFilePath, FileMode.CreateNew, FileAccess.Write))
{
DryadLinqFileBlockStream nativeStream = new DryadLinqFileBlockStream(fstream, compressionScheme);
DryadLinqRecordWriter writer = factory.MakeWriter(nativeStream);
foreach (T rec in source)
{
writer.WriteRecordSync(rec);
}
writer.Close();
}
// Write the partfile:
FileInfo finfo = new FileInfo(partFilePath);
using (StreamWriter writer = File.CreateText(dataSetUri.LocalPath))
{
writer.WriteLine(partPath);
writer.WriteLine("1");
writer.WriteLine("{0},{1},{2}", 0, finfo.Length, Environment.MachineName);
}
}
public override Stream Egress(DryadLinqContext context, Uri dataSetUri)
{
string fileName = dataSetUri.LocalPath;
var lines = File.ReadAllLines(fileName);
if (lines.Length < 3)
{
throw new DryadLinqException("The partition file " + dataSetUri + " is malformed.");
}
string[] filePathArray = this.GetPartitionPaths(lines);
return new DryadLinqMultiFileStream(filePathArray, CompressionScheme.None);
}
public override void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists)
{
string fileName = dataSetUri.LocalPath;
if (File.Exists(fileName))
{
if (!deleteIfExists)
{
throw new DryadLinqException("Can't output to existing Partitioned File collection " + dataSetUri.AbsoluteUri);
}
// Note: We delete all the partitions!
var lines = File.ReadAllLines(fileName);
try
{
foreach (string path in this.GetPartitionPaths(lines))
{
if (File.Exists(path))
{
File.Delete(path);
}
}
}
catch (Exception) { /*skip*/ }
File.Delete(fileName);
}
}
private string[] GetPartitionPaths(string[] lines)
{
bool isLocalPath = lines[0].Contains(':');
string[] filePathArray = new string[lines.Length - 2];
for (int i = 2; i < lines.Length; i++)
{
int idx = i - 2;
string[] fields = lines[i].Split(',');
if (fields[2].Contains(':'))
{
string[] parts = fields[2].Split(':');
filePathArray[idx] = String.Format(@"\\{0}\{1}", parts[0], parts[1]);
}
else if (isLocalPath)
{
filePathArray[idx] = String.Format("{0}.{1:X8}", lines[0], idx);
}
else
{
filePathArray[idx] = String.Format(@"\\{0}\{1}.{2:X8}", fields[2], lines[0], idx);
}
}
return filePathArray;
}
}
internal class WasbDataProvider : HdfsDataProvider
{
public override string Scheme
{
get { return DataPath.WASB_URI_SCHEME; }
}
}
internal class AzureBlobDataProvider : DataProvider
{
public override string Scheme
{
get { return DataPath.AZUREBLOB_URI_SCHEME; }
}
public override Uri GetTemporaryStreamUri(DryadLinqContext context, string path)
{
return context.Cluster.MakeDefaultUri(DataPath.TEMPORARY_STREAM_NAME_PREFIX + path);
}
public override Uri RewriteUri(DryadLinqContext context, Uri dataSetUri, FileAccess access)
{
string account, key, container, blob;
AzureUtils.FromAzureUri(dataSetUri, out account, out key, out container, out blob);
UriBuilder builder = new UriBuilder(dataSetUri);
NameValueCollection query = System.Web.HttpUtility.ParseQueryString(builder.Query);
if (key == null)
{
query["key"] = context.AzureAccountKey(account);
}
if (access != FileAccess.Write &&
typeof(T) == typeof(Microsoft.Research.DryadLinq.LineRecord))
{
query["seekBoundaries"] = "Microsoft.Research.DryadLinq.LineRecord";
}
builder.Query = query.ToString();
return builder.Uri;
}
public override DryadLinqMetaData GetMetaData(DryadLinqContext context, Uri dataSetUri)
{
// TBD
return null;
}
public override DryadLinqStreamInfo GetStreamInfo(DryadLinqContext context, Uri dataSetUri)
{
Int32 parCnt = 1;
Int64 size = -1;
try
{
AzureCollectionPartition partition = new AzureCollectionPartition(dataSetUri);
if (!partition.IsCollectionExists())
{
throw new DryadLinqException("Input collection " + dataSetUri + " does not exist");
}
parCnt = partition.GetPartition().Count();
size = partition.TotalLength;
}
catch (Exception e)
{
throw new DryadLinqException("Can't get Azure stream info for " + dataSetUri, e);
}
return new DryadLinqStreamInfo(parCnt, size);
}
public override void Ingress(DryadLinqContext context,
IEnumerable source,
Uri dataSetUri,
DryadLinqMetaData metaData,
CompressionScheme compressionScheme,
bool isTemp = false)
{
string account, key, container, blob;
AzureUtils.FromAzureUri(dataSetUri, out account, out key, out container, out blob);
if (compressionScheme != CompressionScheme.None)
{
throw new DryadLinqException("Not implemented: writing to Azure temporary storage with compression enabled");
}
AzureDfsClient client = new AzureDfsClient(account, key, container);
DryadLinqFactory factory = (DryadLinqFactory)DryadLinqCodeGen.GetFactory(context, typeof(T));
using (Stream stream = client.GetFileStreamWriterAsync(blob).Result)
{
DryadLinqBlockStream nativeStream = new DryadLinqBlockStream(stream);
DryadLinqRecordWriter writer = factory.MakeWriter(nativeStream);
foreach (T rec in source)
{
writer.WriteRecordSync(rec);
}
writer.Close();
}
}
public override Stream Egress(DryadLinqContext context, Uri dataSetUri)
{
try
{
AzureCollectionPartition partition = new AzureCollectionPartition(dataSetUri);
if (!partition.IsCollectionExists())
{
throw new DryadLinqException("Input collection " + dataSetUri + " does not exist");
}
Stream dataSetStream = partition.GetReadStream();
return dataSetStream;
}
catch (Exception e)
{
throw new DryadLinqException("Can't get Azure stream info for " + dataSetUri, e);
}
}
public override void CheckExistence(DryadLinqContext context, Uri dataSetUri, bool deleteIfExists)
{
AzureCollectionPartition partition = new AzureCollectionPartition(dataSetUri);
if (partition.IsCollectionExists())
{
if (!deleteIfExists)
{
throw new DryadLinqException("Can't output to existing Azure Blob collection " + dataSetUri.AbsoluteUri);
}
partition.DeleteCollection();
}
}
}
}