/* Copyright (c) Microsoft Corporation All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR NON-INFRINGEMENT. See the Apache Version 2.0 License for specific language governing permissions and limitations under the License. */ using System; using System.Collections.Generic; using System.IO; using System.IO.Compression; using System.Linq; using System.Text; using System.Diagnostics; namespace Microsoft.Research.DryadLinq { internal class DryadLinqMultiReaderStream : Stream { private DryadLinqBinaryReader[] m_readerArray; private DryadLinqBinaryReader m_curReader; private Int32 m_nextStreamIdx; public DryadLinqMultiReaderStream(DryadLinqBinaryReader[] streamArray) { this.m_readerArray = streamArray; this.m_curReader = streamArray[0]; this.m_nextStreamIdx = 1; } public override bool CanRead { get { return true; } } public override bool CanWrite { get { return false; } } public override bool CanSeek { get { return false; } } public override long Length { get { long len = 0; for (int i = 0; i < this.m_readerArray.Length; i++) { len += this.m_readerArray[i].GetTotalLength(); } return len; } } public override long Position { get { throw new DryadLinqException(DryadLinqErrorCode.PositionNotSupported, SR.PositionNotSupported); } set { throw new DryadLinqException(DryadLinqErrorCode.PositionNotSupported, SR.PositionNotSupported); } } protected override void Dispose(bool disposing) { try { foreach (DryadLinqBinaryReader s in this.m_readerArray) { s.Close(); } } finally { base.Dispose(disposing); } } public override void Flush() { } public override int Read(byte[] buffer, int offset, int count) { while (true) { int n = this.m_curReader.ReadBytes(buffer, offset, count); if (n != 0) return n; if (this.m_nextStreamIdx == this.m_readerArray.Length) return 0; this.m_curReader = this.m_readerArray[this.m_nextStreamIdx++]; } } public override int ReadByte() { while (true) { int b = this.m_curReader.ReadUByte(); if (b != -1) return b; if (this.m_nextStreamIdx == this.m_readerArray.Length) return -1; this.m_curReader = this.m_readerArray[this.m_nextStreamIdx++]; } } public override long Seek(long offset, SeekOrigin origin) { throw new DryadLinqException(DryadLinqErrorCode.SeekNotSupported, SR.SeekNotSupported); } public override void SetLength(long value) { throw new DryadLinqException(DryadLinqErrorCode.SetLengthNotSupported, SR.SetLengthNotSupported); } public override void Write(byte[] buffer, int offset, int count) { throw new DryadLinqException(DryadLinqErrorCode.WriteNotSupported, SR.WriteNotSupported); } public override void WriteByte(byte value) { throw new DryadLinqException(DryadLinqErrorCode.WriteByteNotSupported, SR.WriteByteNotSupported); } } internal class DryadLinqMultiFileStream : Stream { private const int DefaultBuffSize = 8192 * 128; private string[] m_filePathArray; private CompressionScheme m_compressionScheme; private Stream m_curStream; private int m_nextIndex; internal DryadLinqMultiFileStream(string[] filePathArray, CompressionScheme scheme) { this.m_filePathArray = filePathArray; this.m_compressionScheme = scheme; this.m_nextIndex = 0; this.InitNextStream(); } private void InitNextStream() { this.m_curStream = null; if (this.m_nextIndex < this.m_filePathArray.Length) { FileOptions options = FileOptions.SequentialScan; this.m_curStream = new FileStream(this.m_filePathArray[this.m_nextIndex], FileMode.Open, FileAccess.Read, FileShare.Read, DefaultBuffSize, options); if (this.m_compressionScheme != CompressionScheme.None) { if (this.m_compressionScheme == CompressionScheme.Gzip) { this.m_curStream = new GZipStream(this.m_curStream, CompressionMode.Decompress); } else { throw new DryadLinqException(DryadLinqErrorCode.UnknownCompressionScheme, SR.UnknownCompressionScheme); } } this.m_nextIndex++; } } public override bool CanRead { get { return true; } } public override bool CanWrite { get { return false; } } public override bool CanSeek { get { return false; } } public override long Length { get { long len = 0; for (int i = 0; i < this.m_filePathArray.Length; i++) { len += new FileInfo(this.m_filePathArray[i]).Length; } return len; } } public override long Position { get { throw new DryadLinqException(DryadLinqErrorCode.PositionNotSupported, SR.PositionNotSupported); } set { throw new DryadLinqException(DryadLinqErrorCode.PositionNotSupported, SR.PositionNotSupported); } } public override void Flush() { } public override int Read(byte[] buffer, int offset, int count) { while (true) { int n = this.m_curStream.Read(buffer, offset, count); if (n != 0) return n; this.InitNextStream(); if (this.m_curStream == null) return 0; } } public override int ReadByte() { while (true) { int b = this.m_curStream.ReadByte(); if (b != -1) return b; this.InitNextStream(); if (this.m_curStream == null) return 0; } } public override long Seek(long offset, SeekOrigin origin) { throw new DryadLinqException(DryadLinqErrorCode.SeekNotSupported, SR.SeekNotSupported); } public override void SetLength(long value) { throw new DryadLinqException(DryadLinqErrorCode.SetLengthNotSupported, SR.SetLengthNotSupported); } public override void Write(byte[] buffer, int offset, int count) { throw new DryadLinqException(DryadLinqErrorCode.WriteNotSupported, SR.WriteNotSupported); } public override void WriteByte(byte value) { throw new DryadLinqException(DryadLinqErrorCode.WriteByteNotSupported, SR.WriteByteNotSupported); } } }