/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ using System.Linq; using System.Text; using System.Text.RegularExpressions; using System; using System.Collections.Generic; using System.IO; using System.Diagnostics; using Microsoft.Research.Peloponnese.Azure; using Microsoft.Research.Tools; using Microsoft.WindowsAzure.Storage.Blob; using Microsoft.Research.Peloponnese.Shared; using Microsoft.Research.Peloponnese.WebHdfs; using Microsoft.Research.Peloponnese.Hdfs; namespace Microsoft.Research.JobObjectModel { /// /// A cluster-resident object is a file or a folder. /// public interface IClusterResidentObject { /// /// True if the object is a folder. /// bool RepresentsAFolder { get; } /// /// Returns a stream that can be used to access the contents of the object, if the object is not a folder. /// /// A stream that can be used to access the object contents. /// If true the streamReader will not strip the newlines. ISharedStreamReader GetStream(bool keepNewline); /// /// If the current object is a folder, it returns the contained objects. /// /// An iterator over all contained objects that match the specified string. /// A shell expression (similar to the argument of Directory.GetFiles()). IEnumerable GetFilesAndFolders(string match); /// /// Size of the object in bytes (if not a folder). The size can be -1 when it is unknown. /// long Size { get; } /// /// An exception is stored here if the object could not be manipulated. /// Exception Exception { get; } /// /// Short name of the object. /// string Name { get; } /// /// Date when object was created. /// DateTime CreationTime { get; } /// /// For a folder object, returns the contained file with the specified name. /// /// File name within the folder. /// The file within the folder. IClusterResidentObject GetFile(string filename); /// /// For a folder object, returns the contained folder with the specified name. /// /// Folder name within the folder. /// The subfolder within the folder. IClusterResidentObject GetFolder(string foldername); /// /// If false do not cache this object. For a folder, do not cache anything underneath that is reached through this folder. /// bool ShouldCacheLocally { get; set; } } /// /// Common substrate for caching object contents locally. /// [Serializable] public abstract class CachedClusterResidentObject : IClusterResidentObject { /// /// Cluster where the file resides. /// public ClusterConfiguration Config { get; protected set; } /// /// Cache files from the cluster in this directory. If null do not cache files. /// public static string CacheDirectory { get; set; } /// /// If false do not cache this object. /// public bool ShouldCacheLocally { get; set; } /// /// The path to the local cached version of the object; if null the object is not cached. /// public string LocalCachePath { get; protected set; } /// /// Job that owns the files. /// public DryadLinqJobSummary Job { get; protected set; } /// /// Remember for each job all the locally cached files. /// static Dictionary> perJobFiles = new Dictionary>(); /// /// Initialize an empty cached cluster resident object. /// /// Cluster where the file resides. /// Job who owns these files. protected CachedClusterResidentObject(ClusterConfiguration config, DryadLinqJobSummary job) { this.cacheWriter = null; this.tempFileName = null; this.Job = job; this.Config = config; } /// /// Record that the job owns this cached file. /// /// Job. /// Cached file belonging to this job. public static void RecordCachedFile(DryadLinqJobSummary job, string path) { HashSet list; if (!perJobFiles.ContainsKey(job)) { list = new HashSet(); perJobFiles.Add(job, list); } else { list = perJobFiles[job]; } list.Add(path); } /// /// Record the creation of this particular cluster resident object so it can be retrieved from the job summary. /// /// Object to record. private static void Record(CachedClusterResidentObject file) { if (file.Job == null) return; if (string.IsNullOrEmpty(file.LocalCachePath)) throw new ClusterException("Missing expected LocalCachePath"); CachedClusterResidentObject.RecordCachedFile(file.Job, file.LocalCachePath); } /// /// Get all the files cached associated with a given job. /// /// Job with cached files. /// An iterator over all files cached belonging to this job. public static IEnumerable CachedJobFiles(DryadLinqJobSummary job) { if (perJobFiles.ContainsKey(job)) return perJobFiles[job]; return new List(); } /// /// Stream used to write to cache. /// private StreamWriter cacheWriter; /// /// Cache to a temporary file, and then rename when the file is closed. /// private string tempFileName; /// /// Create a temporary file-backed local stream. (Save it in the cacheWriter.) /// /// The writer to the temp stream created. protected StreamWriter CreateTempStream() { this.tempFileName = Path.GetTempFileName(); this.cacheWriter = new StreamWriter(this.tempFileName); return this.cacheWriter; } /// /// True if this is a folder. /// public virtual bool RepresentsAFolder { get; protected set; } /// /// A stream to the local cache, or null if the file is not cached. /// /// A stream to access the file. /// If true keep newlines. public virtual ISharedStreamReader GetStream(bool keepNewline) { if (this.LocalCachePath != null && File.Exists(this.LocalCachePath)) { CachedClusterResidentObject.Record(this); return new FileSharedStreamReader(this.LocalCachePath, keepNewline); } return null; } /// /// /// /// Expression matching children. /// public abstract IEnumerable GetFilesAndFolders(string match); /// /// Size of the object in bytes (if not a folder). The size can be -1 when it is unknown. /// public virtual long Size { get { if (this.LocalCachePath != null && File.Exists(this.LocalCachePath)) { FileInfo info = new FileInfo(this.LocalCachePath); return info.Length; } return -1; } } /// /// Exception stored here if the object could not be manipulated. /// public Exception Exception { get; protected set; } /// /// Short name of the object. /// public virtual string Name { get; protected set; } /// /// Date when object was created. /// public virtual DateTime CreationTime { get; protected set; } /// /// For a folder, the file with the specified name within the folder. /// /// File to find. /// An object corresponding to the specified file. public abstract IClusterResidentObject GetFile(string filename); /// /// For a folder object, returns the contained folder with the specified name. /// /// Folder name within the folder. /// The subfolder within the folder. public abstract IClusterResidentObject GetFolder(string foldername); /// /// This is closed when the cached file has been completely read. /// protected void OnClose() { // save the file to its proper place if (this.cacheWriter != null) { // this.cacheWriter.Close(); -- the callee should have done this already try { Utilities.EnsureDirectoryExistsForFile(this.LocalCachePath); Utilities.Move(this.tempFileName, this.LocalCachePath); Trace.TraceInformation("Writing to cache {0}", this.LocalCachePath); CachedClusterResidentObject.Record(this); } catch (Exception e) { Trace.TraceInformation("Exception {0} during move", e.Message); } this.tempFileName = null; this.cacheWriter = null; } } } /// /// A cluster-resident object, accessed through its UNC pathname. /// [Serializable] public class UNCFile : CachedClusterResidentObject { /// /// Path to the object, if it is accessed through a path; could be null. /// public UNCPathname Pathname { get; protected set; } /// /// True if the object is a folder. /// public override bool RepresentsAFolder { get { return this.Pathname.IsDirectory; } } /// /// Create a cluster resident object corresponding to a given pathname. /// /// Path to the cluster-resident object. /// Cluster where the file resides. /// If true the file should be cached. /// Job who owns this file. public UNCFile(ClusterConfiguration config, DryadLinqJobSummary job, UNCPathname path, bool shouldCache) : base(config, job) { this.Pathname = path; this.Exception = null; this.ShouldCacheLocally = shouldCache; //if (! this.RepresentsAFolder) this.LocalCachePath = this.CachePath(this.Pathname); } /// /// From the URL extract a Path to a filename in the local cache. /// /// Path that is to be cached. /// A local pathname, or null if file should not be cached. private string CachePath(UNCPathname path) { if (CachedClusterResidentObject.CacheDirectory == null || !this.ShouldCacheLocally) return null; { return null; } } /// /// Create a cluster-resident object that only contains an exception. /// /// Exception that occurred when building the object. public UNCFile(Exception ex) : base(null, null) { this.Exception = ex; this.Pathname = null; this.ShouldCacheLocally = false; } /// /// The stream with the file contents. /// /// A stream reder. /// If true keep the newlines. public override ISharedStreamReader GetStream(bool keepNewlines) { try { if (!this.RepresentsAFolder) { //this.LocalCachePath = this.CachePath(this.Pathname); ISharedStreamReader baseStream = base.GetStream(keepNewlines); if (baseStream != null) { // file is cached Trace.TraceInformation("Reading from local cache {0}", baseStream); return baseStream; } } if (this.LocalCachePath != null && this.ShouldCacheLocally) { // cache it if (this.RepresentsAFolder) throw new ClusterException("Cannot cache folders"); StreamWriter writer = this.CreateTempStream(); return new FileSharedStreamReader(this.Pathname.ToString(), writer, keepNewlines, this.OnClose); } else { // dont cache it return new FileSharedStreamReader(this.Pathname.ToString(), keepNewlines); } } catch (Exception ex) { return new FileSharedStreamReader(ex); } } /// /// The contents of the folder. /// /// Pattern to match. /// The matching objects. public override IEnumerable GetFilesAndFolders(string match) { if (!this.RepresentsAFolder) yield break; string[] dirs = null, files = null; Exception exception = null; try { dirs = Directory.GetDirectories(this.Pathname.ToString(), match); } catch (Exception ex) { exception = ex; } if (exception != null) { yield return new UNCFile(exception); yield break; } foreach (string dir in dirs) { UNCPathname dirpath = new UNCPathname(this.Pathname); // ReSharper disable once AssignNullToNotNullAttribute dirpath.Directory = Path.Combine(dirpath.Directory, Path.GetFileName(dir)); yield return new UNCFile(this.Config, this.Job, dirpath, this.ShouldCacheLocally); } try { files = Directory.GetFiles(this.Pathname.ToString(), match); } catch (Exception ex) { exception = ex; } if (exception != null) { yield return new UNCFile(exception); yield break; } foreach (string file in files) { UNCPathname dirpath = new UNCPathname(this.Pathname); dirpath.Filename = Path.GetFileName(file); yield return new UNCFile(this.Config, this.Job, dirpath, this.ShouldCacheLocally); } } /// /// For a folder, the file with the specified name within the folder. /// /// File to find. /// An object corresponding to the specified file. public override IClusterResidentObject GetFile(string filename) { if (!this.RepresentsAFolder) throw new InvalidOperationException("Cannot find file within non-folder"); UNCPathname dirpath = new UNCPathname(this.Pathname); dirpath.Filename = Path.GetFileName(filename); return new UNCFile(this.Config, this.Job, dirpath, this.ShouldCacheLocally); } /// /// Date when object was created. /// public override DateTime CreationTime { get { if (this.Exception != null) return DateTime.MinValue; if (this.RepresentsAFolder) { return Directory.GetCreationTime(this.Pathname.ToString()); } else { return File.GetCreationTime(this.Pathname.ToString()); } } } /// /// For a folder object, returns the contained folder with the specified name. /// /// Folder name within the folder. /// The subfolder within the folder. public override IClusterResidentObject GetFolder(string foldername) { if (!this.RepresentsAFolder) throw new InvalidOperationException("Cannot find file within non-folder"); UNCPathname dirpath = new UNCPathname(this.Pathname); dirpath.Directory = Path.Combine(dirpath.Directory, foldername); return new UNCFile(this.Config, this.Job, dirpath, this.ShouldCacheLocally); } /// /// Size of the file. /// public override long Size { get { if (this.RepresentsAFolder) throw new ClusterException("Cannot get size of a folder"); if (File.Exists(this.LocalCachePath)) { FileInfo info = new FileInfo(this.LocalCachePath); return info.Length; } if (File.Exists(this.Pathname.ToString())) { FileInfo info = new FileInfo(this.Pathname.ToString()); return info.Length; } return -1; } } /// /// String representation of the file. /// /// A string describing the file. public override string ToString() { if (this.Exception != null) return "Exception: " + this.Exception.Message; return this.Pathname.ToString(); } /// /// Short name of the object. /// public override string Name { get { if (this.Exception != null) return "Exception"; else if (this.Pathname.IsDirectory) return Path.GetFileName(this.Pathname.Directory); else return this.Pathname.Filename; } } } /// /// A wrapper around a folder on a cached cluster. Since folders are not cached, we use this trick to find files in the cache. /// [Serializable] public class FolderInCachedCluster : CachedClusterResidentObject { /// /// Original folder which is cached. /// public CachedClusterResidentObject OriginalFolder { get; protected set; } /// /// Create a wrapper around a folder in the cache. /// /// /// Folder to represent. public FolderInCachedCluster(CachedClusterResidentObject folder) : base(folder.Config, folder.Job) { if (!folder.RepresentsAFolder) throw new ArgumentException(folder + " is not a folder"); this.OriginalFolder = folder; // ReSharper disable once DoNotCallOverridableMethodsInConstructor this.RepresentsAFolder = true; } /// /// The contents of the folder. /// /// Pattern to match. /// The matching objects. public override IEnumerable GetFilesAndFolders(string match) { if (!this.RepresentsAFolder) yield break; string[] dirs = null, files = null; Exception exception = null; try { dirs = Directory.GetDirectories(this.OriginalFolder.LocalCachePath, match); } catch (Exception ex) { exception = ex; } if (exception != null) { yield return new UNCFile(exception); yield break; } foreach (string dir in dirs) { IClusterResidentObject folder = this.OriginalFolder.GetFolder(dir); yield return new FolderInCachedCluster(folder as CachedClusterResidentObject); } try { files = Directory.GetFiles(this.OriginalFolder.LocalCachePath, match); } catch (Exception ex) { exception = ex; } if (exception != null) { yield return new UNCFile(exception); yield break; } foreach (string file in files) { IClusterResidentObject originalFile = this.OriginalFolder.GetFile(Path.GetFileName(file)); yield return originalFile; } } /// /// The file with the specified name within the folder. /// /// File to find. /// An object corresponding to the specified file. public override IClusterResidentObject GetFile(string filename) { return this.OriginalFolder.GetFile(filename); } /// /// Returns the contained folder with the specified name. /// /// Folder name within the folder. /// The subfolder within the folder. public override IClusterResidentObject GetFolder(string foldername) { return new FolderInCachedCluster(this.OriginalFolder.GetFolder(foldername) as CachedClusterResidentObject); } /// /// The contents of the folder. /// /// The contents of the folder. /// If true keep newlines. public override ISharedStreamReader GetStream(bool keepNewline) { return this.OriginalFolder.GetStream(keepNewline); } /// /// String representation of the Folder. /// /// A string describing the folder. public override string ToString() { return this.OriginalFolder.ToString(); } } /// /// A constant string extracted from (part) of a file. /// [Serializable] public class ContentsOfFile : IClusterResidentObject { IEnumerable contents; /// /// The contents of the file. /// /// Contents of the file. /// Time file was created. /// File name. public ContentsOfFile( IEnumerable contents, DateTime creationTime, string name) { this.contents = contents; this.CreationTime = creationTime; this.Name = name; this.size = -1; this.Exception = null; } /// /// Never cache locally. /// public bool ShouldCacheLocally { get { return false; } set { } } /// /// Exception occurred while obtaining file. /// /// Exception that occurred. public ContentsOfFile(Exception ex) { this.Exception = ex; this.contents = null; this.size = 0; } /// /// True if this is a folder; never. /// public bool RepresentsAFolder { get { return false; } } /// /// A stream returning the contents. /// /// The contents of this object. /// If true keep newlines. public ISharedStreamReader GetStream(bool keepNewlines) { return new StringIteratorStreamReader(this.contents, keepNewlines); } /// /// The files and folders contained in the string. Throws an exception. /// /// Return only matching files. /// Throws an exception. public IEnumerable GetFilesAndFolders(string match) { throw new ClusterException("Object is not a folder"); } private long size; /// /// The size of the contents. /// public long Size { get { if (this.size == -1) { this.size = 0; foreach (var c in this.contents) this.size += c.Length; } return this.size; } } /// /// Exception thrown by this stream. /// public Exception Exception { get; protected set; } /// /// Name of this stream. /// public string Name { get; protected set; } /// /// Stream creation time. /// public DateTime CreationTime { get; protected set; } /// /// Contained file with the specified name. /// /// File with specified name. /// Throws an exception. public IClusterResidentObject GetFile(string filename) { throw new ClusterException("Object is not a folder"); } /// /// Contained folder with the specified name. /// /// Folder with specified name. /// Throws an exception. public IClusterResidentObject GetFolder(string foldername) { throw new ClusterException("Object is not a folder"); } } /// /// A file on the local machine. /// public class LocalFile : IClusterResidentObject { private string path; /// /// Cached here on demand. /// private FileInfo info; /// /// A local file reachable with the specified path. /// /// Path to file. public LocalFile(string path) { this.path = path; this.Exception = null; this.info = null; } /// /// True if the object is a folder. /// public bool RepresentsAFolder { get { return Directory.Exists(this.path); } } /// /// Returns a stream that can be used to access the contents of the object, if the object is not a folder. /// /// A stream that can be used to access the object contents. /// If true keep the newlines. public ISharedStreamReader GetStream(bool keepNewlines) { return new FileSharedStreamReader(this.path, keepNewlines); } /// /// If the current object is a folder, it returns the contained objects. /// /// An iterator over all contained objects that match the specified string. /// A shell expression (similar to the argument of Directory.GetFiles()). public IEnumerable GetFilesAndFolders(string match) { foreach (var p in Directory.GetFiles(this.path, match)) yield return new LocalFile(p); foreach (var p in Directory.GetDirectories(this.path, match)) yield return new LocalFile(p); } /// /// Size of the object in bytes (if not a folder). The size can be -1 when it is unknown. /// public long Size { get { if (this.info == null) this.info = new FileInfo(this.path); return this.info.Length; } } /// /// An exception is stored here if the object could not be manipulated. /// public Exception Exception { get; private set; } /// /// Short name of the object. /// public string Name { get { return Path.GetFileName(this.path); } } /// /// Date when object was created. /// public DateTime CreationTime { get { return File.GetCreationTime(this.path); } } /// /// For a folder object, returns the contained file with the specified name. /// /// File name within the folder. /// The file within the folder. public IClusterResidentObject GetFile(string filename) { return new LocalFile(Path.Combine(this.path, filename)); } /// /// For a folder object, returns the contained folder with the specified name. /// /// Folder name within the folder. /// The subfolder within the folder. public IClusterResidentObject GetFolder(string foldername) { return new LocalFile(Path.Combine(this.path, foldername)); } /// /// If false do not cache this object. For a folder, do not cache anything underneath that is reached through this folder. /// public bool ShouldCacheLocally { get { return false; } set { // noop } } /// /// Returns a string that represents the current object. /// /// /// A string that represents the current object. /// /// 2 public override string ToString() { return this.path; } } /// /// A file residing on AzureDfs. /// public class AzureDfsFile : CachedClusterResidentObject { private string path; private AzureDfsClient client; /// /// If true the file is a DFS stream, otherwise it's an azure log. /// public bool IsDfsStream; /// /// A file with the specified path. /// /// Path to the file. /// Azure client. /// Cluster configuration. /// Job accessing this file. /// If true this must be a folder. /// True if the file can be cached (it is immutable for sure). public AzureDfsFile(ClusterConfiguration config, DryadLinqJobSummary job, AzureDfsClient client, string path, bool canCache, bool isFolder) : base(config, job) { this.client = client; this.path = path; this.ShouldCacheLocally = canCache; this.RepresentsAFolder = isFolder; this.size = -1; if (!String.IsNullOrEmpty(CacheDirectory)) this.LocalCachePath = Path.Combine(CacheDirectory, this.path); } /// /// True if the object is a folder. /// public override bool RepresentsAFolder { get; protected set; } /// /// Create a File URI from a file path. /// /// Path to file. /// The File uri. /// Azure cluster configuration. public static Uri UriFromPath(AzureDfsClusterConfiguration config, string path) { if (path.StartsWith(config.Container)) throw new InvalidOperationException("Path contains container name"); var retval = Microsoft.Research.Peloponnese.Azure.Utils.ToAzureUri(config.AccountName, config.Container, path, null, config.AccountKey); //Console.WriteLine("Uri {0}", retval); return retval; } /// /// Create a path from a URI. /// /// URI of an HDFS file. /// The path to the file. /// Azure cluster configuration. public static string PathFromUri(AzureDfsClusterConfiguration config, Uri uri) { string path = uri.LocalPath; path = path.Trim('/'); if (path.StartsWith(config.Container)) path = path.Substring(config.Container.Length); path = path.Trim('/'); return path; } /// /// Returns a stream that can be used to access the contents of the object, if the object is not a folder. /// /// A stream that can be used to access the object contents. /// If true keep the newlines. public override ISharedStreamReader GetStream(bool keepNewlines) { ISharedStreamReader baseStream = base.GetStream(keepNewlines); if (baseStream != null) { // file is cached Trace.TraceInformation("Reading from local cache {0}", baseStream); return baseStream; } Stream stream; if (this.IsDfsStream) { Uri uri = UriFromPath(this.Config as AzureDfsClusterConfiguration, this.path); stream = this.client.GetDfsStreamReader(uri); } else { string p = AzureDfsClusterStatus.GetBlobName(this.client.ContainerName,this.path); stream = new AzureLogReaderStream( this.client.AccountName, this.client.AccountKey, this.client.ContainerName, p); } long sz = this.Size; int bufferSize = 1024*1024; if (sz >= 0) { bufferSize = (int)(sz/10); if (bufferSize < 1024*1024) bufferSize = 1024*1024; if (bufferSize > 20*1024*1024) bufferSize = 20*1024*1024; } SimpleStreamReader reader = new SimpleStreamReader(stream, true, Encoding.UTF8, false, bufferSize); if (this.ShouldCacheLocally && this.LocalCachePath != null) { // cache it if (this.RepresentsAFolder) throw new ClusterException("Cannot cache folders"); StreamWriter writer = this.CreateTempStream(); return new SharedStreamReader(reader, writer, keepNewlines, this.OnClose); } else { // dont cache it return new SharedStreamReader(reader, keepNewlines); } } // Cache blobs inside a folder; map from name to length private Dictionary blocks; private Dictionary pages; private void PopulateCache() { if (this.blocks == null) { this.blocks = new Dictionary(); this.pages = new Dictionary(); // can happen when we are looking at cached results if (this.client == null) return; var cloudBlobdir = this.client.Container.GetDirectoryReference(this.path); var blobs = cloudBlobdir.ListBlobs(); foreach (IListBlobItem item in blobs) { if (item is CloudBlockBlob) { CloudBlockBlob blob = (CloudBlockBlob)item; this.blocks.Add(blob.Name, blob.Properties.Length); } else if (item is CloudPageBlob) { CloudPageBlob pageBlob = (CloudPageBlob)item; // not accurate //pages.Add(pageBlob.Name, pageBlob.Properties.Length); pageBlob.FetchAttributes(); var metadata = pageBlob.Metadata; if (metadata.ContainsKey("writePosition")) { long sz; if (Int64.TryParse(metadata["writePosition"], out sz)) this.pages.Add(pageBlob.Name, sz); } } else if (item is CloudBlobDirectory) { //CloudBlobDirectory directory = (CloudBlobDirectory)item; } } } } /// /// If the current object is a folder, it returns the contained objects. /// /// An iterator over all contained objects that match the specified string. /// A shell expression (similar to the argument of Directory.GetFiles()). public override IEnumerable GetFilesAndFolders(string match) { this.PopulateCache(); long length = -1; Uri uri = UriFromPath(this.Config as AzureDfsClusterConfiguration, this.path); Console.WriteLine("AzureDfsFile.GetFileAndFolders({0}) -> {1}", this.path, uri); foreach (var child in this.client.ExpandFileOrDirectory(uri)) { Regex re = Utilities.RegexFromSearchPattern(match); if (!re.IsMatch(child.AbsolutePath)) continue; bool isFolder = false; bool isDfsStream = false; if (this.blocks.ContainsKey(child.AbsolutePath)) { isDfsStream = true; length = this.blocks[child.AbsolutePath]; } else if (this.pages.ContainsKey(child.AbsolutePath)) { isDfsStream = false; length = this.pages[child.AbsolutePath]; } var file = new AzureDfsFile(this.Config, this.Job, this.client, PathFromUri(this.Config as AzureDfsClusterConfiguration, child), this.ShouldCacheLocally, isFolder); file.IsDfsStream = isDfsStream; file.size = length; yield return file; } } private long size; /// /// Size of the object in bytes (if not a folder). The size can be -1 when it is unknown. /// public override long Size { get { return this.size; } } /// /// Short name of the object. /// public override string Name { get { return Path.GetFileName(this.path); } } /// /// Date when object was created. /// public override DateTime CreationTime { get { return DateTime.Now; } } /// /// For a folder object, returns the contained file with the specified name. /// /// File name within the folder. /// The file within the folder. public override IClusterResidentObject GetFile(string filename) { this.PopulateCache(); string combined = Path.Combine(this.path, filename); Uri filepath = UriFromPath(this.Config as AzureDfsClusterConfiguration, combined); bool isFolder = false; bool isDfsStream = false; long sz = -1; if (this.blocks.ContainsKey(combined)) { isDfsStream = true; sz = this.blocks[filepath.AbsolutePath]; } else if (this.pages.ContainsKey(combined)) { isDfsStream = false; sz = this.pages[filepath.AbsolutePath]; } var file = new AzureDfsFile(this.Config, this.Job, this.client, combined, this.ShouldCacheLocally, isFolder); file.IsDfsStream = isDfsStream; file.size = sz; return file; } /// /// For a folder object, returns the contained folder with the specified name. /// /// Folder name within the folder. /// The subfolder within the folder. public override IClusterResidentObject GetFolder(string foldername) { this.PopulateCache(); var file = this.GetFile(foldername); if (! file.RepresentsAFolder) throw new InvalidOperationException(foldername + " is not a folder"); return file; } /// /// Returns a string that represents the current object. /// /// /// A string that represents the current object. /// /// 2 public override string ToString() { return this.path; } } /// /// A file residing on HDFS (accessed either using HDFS or WebHdfs). /// public class DfsFile : CachedClusterResidentObject { private Uri baseUri; private Uri uri; private HdfsClientBase client; /// /// A file with the specified path. /// /// Path to the file. /// Azure client. /// Cluster configuration. /// Job accessing this file. /// Uri to base folder. /// If true this must be a folder. /// True if the file can be cached (it is immutable for sure). public DfsFile(ClusterConfiguration config, Uri jobFolderUri, DryadLinqJobSummary job, HdfsClientBase client, string path, bool canCache, bool isFolder) : base(config, job) { this.client = client; this.Exception = null; this.baseUri = jobFolderUri; this.uri = UriFromPath(jobFolderUri, path); this.ShouldCacheLocally = canCache; this.RepresentsAFolder = isFolder; this.size = -1; Console.WriteLine("DfsFile Uri={0}", this.uri); if (!string.IsNullOrEmpty(CachedClusterResidentObject.CacheDirectory)) this.LocalCachePath = Path.Combine(CachedClusterResidentObject.CacheDirectory, PathFromUri(this.baseUri, this.uri)); } /// /// Create a File URI from a file path. /// /// Base URI point to jobs folder. /// Path to file. /// The File uri. public static Uri UriFromPath(Uri baseUri, string path) { UriBuilder builder = new UriBuilder(baseUri); builder.Path = builder.Path.TrimEnd('/') + "/" + path.TrimStart('/'); return builder.Uri; } /// /// Create a path from a URI. /// /// URI of an HDFS file. /// Base URI (should be a prefix of the other uri). /// The path to the file. public static string PathFromUri(Uri baseUri, Uri uri) { string path = uri.PathAndQuery; if (path.StartsWith(baseUri.PathAndQuery)) path = path.Substring(baseUri.PathAndQuery.Length); path = path.Trim('/'); return path; } private DfsFile(ClusterConfiguration config, DryadLinqJobSummary job, Exception ex) : base(config, job) { this.Exception = ex; } /// /// True if the object is a folder. /// public override bool RepresentsAFolder { get; protected set; } /// /// Returns a stream that can be used to access the contents of the object, if the object is not a folder. /// /// A stream that can be used to access the object contents. /// If true keep the newlines. public override ISharedStreamReader GetStream(bool keepNewlines) { ISharedStreamReader baseStream = base.GetStream(keepNewlines); if (baseStream != null) { // file is cached Trace.TraceInformation("Reading from local cache {0}", baseStream); return baseStream; } Stream stream = this.client.GetDfsStreamReader(this.uri); long sz = this.Size; int bufferSize = 1024 * 1024; if (sz >= 0) { bufferSize = (int)(sz / 10); if (bufferSize < 1024 * 1024) bufferSize = 1024 * 1024; if (bufferSize > 20 * 1024 * 1024) bufferSize = 20 * 1024 * 1024; } SimpleStreamReader reader = new SimpleStreamReader(stream, true, Encoding.UTF8, false, bufferSize); if (this.ShouldCacheLocally && this.LocalCachePath != null) { // cache it if (this.RepresentsAFolder) throw new ClusterException("Cannot cache folders"); StreamWriter writer = this.CreateTempStream(); return new SharedStreamReader(reader, writer, keepNewlines, this.OnClose); } else { // dont cache it return new SharedStreamReader(reader, keepNewlines); } } /// /// If the current object is a folder, it returns the contained objects. /// /// An iterator over all contained objects that match the specified string. /// A shell expression (similar to the argument of Directory.GetFiles()). public override IEnumerable GetFilesAndFolders(string match) { HashSet folders = new HashSet(); foreach (var child in this.client.EnumerateSubdirectories(this.uri)) { folders.Add(child); } Regex re = Utilities.RegexFromSearchPattern(match); foreach (var child in this.client.ExpandFileOrDirectory(this.uri)) { if (!re.IsMatch(PathFromUri(this.baseUri, child))) continue; bool isFolder = folders.Contains(child); var file = new DfsFile(this.Config, this.baseUri, this.Job, this.client, PathFromUri(this.baseUri, child), this.ShouldCacheLocally, isFolder); long length; long time; this.client.GetFileStatus(child, out time, out length); file.size = length; file.CreationTime = TimeFromLong(time); yield return file; } } private long size; /// /// Size of the object in bytes (if not a folder). The size can be -1 when it is unknown. /// public override long Size { get { return this.size; } } /// /// Short name of the object. /// public override string Name { get { return Path.GetFileName(PathFromUri(this.baseUri, this.uri)); } } private static DateTime origin = new DateTime(1970, 1, 1); /// /// Convert file status time into a DateTime. /// /// File time obtained from client. /// A DateTime object. public static DateTime TimeFromLong(long fileTime) { return origin + TimeSpan.FromMilliseconds(fileTime); } /// /// Date when object was created. /// public override DateTime CreationTime { get { return DateTime.Now; } } /// /// For a folder object, returns the contained file with the specified name. /// /// File name within the folder. /// The file within the folder. public override IClusterResidentObject GetFile(string filename) { var matchingFiles = this.GetFilesAndFolders(filename).ToList(); if (matchingFiles.Count == 1) return matchingFiles[0]; return new DfsFile(this.Config, this.Job, new Exception("Ambiguous name " + filename + " in " + this)); } /// /// For a folder object, returns the contained folder with the specified name. /// /// Folder name within the folder. /// The subfolder within the folder. public override IClusterResidentObject GetFolder(string foldername) { var file = this.GetFile(foldername); if (!file.RepresentsAFolder) throw new InvalidOperationException(foldername + " is not a folder"); return file; } /// /// Returns a string that represents the current object. /// /// /// A string that represents the current object. /// /// 2 public override string ToString() { return this.uri.ToString(); } } }