/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ namespace Microsoft.Research.Tools { using System.Collections.Generic; using System.IO; using System; using System.Text; using System.Linq; /// /// Exceptions thrown during operations on partitioned files. /// public sealed class PartitionedFileException : Exception { /// /// Create an exception for processing partitioned files. /// /// Message to throw. public PartitionedFileException(string message) : base(message) { } } /// /// Representation for the metadata of a partitioned file (provider file://). /// public class PartitionedFileMetadata { private List partitions; /// /// How many partitions does the table have? /// public int NumberOfPartitions { get { return this.partitions.Count; } } /// /// The partitions that compose the file. /// public IEnumerable Partitions { get { return this.partitions; } } /// /// One partition of a replicated partitioned file. /// public class Partition { int partitionNumber; long size; List replicas; /// /// Copy information from another partition. /// /// Partition to copy. public Partition(Partition other) { this.partitionNumber = other.partitionNumber; this.size = other.size; this.replicas = new List(); this.replicas.AddRange(other.replicas); } /// /// Create a Partition object by parsing a PartitionedTable metadata file line. /// /// A line from a metadata for a partitionedTable. /// Prefix for partition from metadata. internal Partition(string metadataline, string defaultprefix) { string[] parts = metadataline.Split(','); if (parts.Length < 3) throw new PartitionedFileException("Cannot parse metadata line " + metadataline); partitionNumber = int.Parse(parts[0]); size = long.Parse(parts[1]); replicas = new List(parts.Length - 2); for (int i = 2; i < parts.Length; i++) { string[] md = parts[i].Split(':'); string machine = md[0]; string path; if (md.Length == 2) path = md[1]; else path = string.Format("{0}.{1:X8}", defaultprefix, partitionNumber); if (md.Length > 2) throw new PartitionedFileException("Cannot understand pathname to file partition in " + metadataline); this.AddReplica(new UNCPathname(machine, path)); } } /// /// Create a partition from basic information. /// /// Partition number. /// Size of partition. /// Machine storing the partition. /// Prefix of the partition (global per partitioned file) public Partition(int partitionNo, long partitionSize, string machine, string prefix) { this.partitionNumber = partitionNo; this.size = partitionSize; this.replicas = new List(); this.replicas.Add(new UNCPathname(machine, string.Format("{0}.{1:X8}", prefix, partitionNo))); } /// /// The unique machine for a single-replica partition. /// public string Machine { get { if (replicas.Count > 1) throw new ArgumentException("query for machine in replicated partition"); return replicas[0].Machine; } } /// /// Add a new replica to a partition. /// /// Pathname to the replica to add. public void AddReplica(UNCPathname path) { replicas.Add(path); } /// /// Representation of the partition as used in a metadata file. /// /// A string suitable for insertion in a metadata file. public override string ToString() { StringBuilder builder = new StringBuilder(); builder.AppendFormat("{0},{1}", partitionNumber, size); foreach (UNCPathname p in replicas) { builder.AppendFormat(",{0}:{1}", p.Machine, p.DirectoryAndFilename); } return builder.ToString(); } /// /// The prefix of the partition; this only makes sense if the partition is in the canonical form. /// public string Prefix { get { string filename = this.replicas[0].DirectoryAndFilename; // remove file extension int index = filename.LastIndexOf('.'); return filename.Substring(0, index); } } /// /// This form is used when all partitions have the same prefix. /// /// A short description of the list of replicas. public string ToShortString() { StringBuilder builder = new StringBuilder(); builder.AppendFormat("{0},{1}", partitionNumber, size); foreach (UNCPathname p in replicas) { builder.AppendFormat(",{0}", p.Machine); } return builder.ToString(); } /// /// One of the replicas. /// /// The index of the replica. /// The index-th replica. public UNCPathname Replica(int index) { return replicas[index]; } /// /// Partition number. /// public int Number { get { return this.partitionNumber; } } /// /// Partition size. /// public long Size { get { return this.size; } } /// /// Number of replicas of this partition. /// public int NumberOfReplicas { get { return this.replicas.Count; } } } /// /// Create a partitioned table to represent a list of partitions. /// /// List of partitions to represent. public PartitionedFileMetadata(IEnumerable partitions) { this.partitions = partitions.ToList(); } /// /// Create an empty metadata. /// public PartitionedFileMetadata() { this.partitions = new List(); } /// /// Add a new partition to the file. /// /// Partition to add. public void Add(Partition partition) { this.partitions.Add(partition); } /// /// Create a PartitionedTable metadata file from a list of partitions. /// /// Pathname for file containing the metadata. /// The URI to use to read this metadata file. public string CreateMetadataFile(UNCPathname metadataFile) { // compute the prefix of the first partition string prefix = partitions.First().Prefix; Utilities.EnsureDirectoryExistsForFile(metadataFile.ToString()); StreamWriter sw = new StreamWriter(metadataFile.ToString()); sw.WriteLine(prefix); sw.WriteLine(this.partitions.Count); foreach (var p in this.partitions) sw.WriteLine(p.ToShortString()); sw.Close(); return Uri.UriSchemeFile + @"://" + metadataFile; } /// /// Read the metadata from a partitioned file. /// /// Metadata file to read. public PartitionedFileMetadata(UNCPathname metadataFile) { StreamReader sr = new StreamReader(metadataFile.ToString()); string defaultPrefix = sr.ReadLine(); string str = sr.ReadLine(); if (string.IsNullOrEmpty(str)) throw new InvalidDataException("Expected a partitioned count, found none"); int partitionCount = int.Parse(str); partitions = new List(partitionCount); for (int i = 0; i < partitionCount; i++) this.Add(new Partition(sr.ReadLine(), defaultPrefix)); } /// /// All the machines referred in the partitioned file. /// /// The list of partitions. public IEnumerable AllMachines() { return partitions.Select(part => part.Machine).Distinct().OrderBy(machine => machine); } } }