From 39f8ecd055da4d8216b1999c2a0ab1b972f7d6e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kat=20March=C3=A1n?= Date: Sat, 18 Jul 2020 10:14:50 -0700 Subject: [PATCH] mmap file writes and add unindexed writing to cache --- Cargo.toml | 1 + benches/benchmarks.rs | 24 +++++----- src/content/write.rs | 104 ++++++++++++++++++++++++++++++++++-------- src/put.rs | 88 +++++++++++++++++++++++++++++------ 4 files changed, 172 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 412ffe6..0bde7f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ either = "1.5.3" async-std = { version = "1.0.1", features = ["unstable"] } thiserror = "1.0.5" futures = "0.3.1" +memmap = "0.7.0" [dev-dependencies] async-attributes = "1.1.1" diff --git a/benches/benchmarks.rs b/benches/benchmarks.rs index 6defd34..3084833 100644 --- a/benches/benchmarks.rs +++ b/benches/benchmarks.rs @@ -194,18 +194,18 @@ fn write_hash_async(c: &mut Criterion) { criterion_group!( benches, - // baseline_read_sync, - // baseline_read_many_sync, - // baseline_read_async, - // baseline_read_many_async, - // read_hash_async, - // read_hash_many_async, - // read_async, + baseline_read_sync, + baseline_read_many_sync, + baseline_read_async, + baseline_read_many_async, + read_hash_async, + read_hash_many_async, + read_async, write_hash_async, - // read_hash_sync, - // read_hash_many_sync, - // read_sync, - // read_hash_async_big_data, - // read_hash_sync_big_data + read_hash_sync, + read_hash_many_sync, + read_sync, + read_hash_async_big_data, + read_hash_sync_big_data ); criterion_main!(benches); diff --git a/src/content/write.rs b/src/content/write.rs index 80825b8..a6079d8 100644 --- a/src/content/write.rs +++ b/src/content/write.rs @@ -9,20 +9,24 @@ use async_std::future::Future; use async_std::task::{self, Context, JoinHandle, Poll}; use futures::io::AsyncWrite; use futures::prelude::*; +use memmap::MmapMut; use ssri::{Algorithm, Integrity, IntegrityOpts}; use tempfile::NamedTempFile; use crate::content::path; use crate::errors::{Internal, Result}; +pub const MAX_MMAP_SIZE: usize = 1 * 1024 * 1024; + pub struct Writer { cache: PathBuf, builder: IntegrityOpts, + mmap: Option, tmpfile: NamedTempFile, } impl Writer { - pub fn new(cache: &Path, algo: Algorithm) -> Result { + pub fn new(cache: &Path, algo: Algorithm, size: Option) -> Result { let cache_path = cache.to_path_buf(); let mut tmp_path = cache_path.clone(); tmp_path.push("tmp"); @@ -30,10 +34,21 @@ impl Writer { .recursive(true) .create(&tmp_path) .to_internal()?; + let tmpfile = NamedTempFile::new_in(tmp_path).to_internal()?; + let mmap = if let Some(size) = size { + if size <= MAX_MMAP_SIZE { + unsafe { MmapMut::map_mut(tmpfile.as_file()).ok() } + } else { + None + } + } else { + None + }; Ok(Writer { cache: cache_path, builder: IntegrityOpts::new().algorithm(algo), - tmpfile: NamedTempFile::new_in(tmp_path).to_internal()?, + tmpfile, + mmap, }) } @@ -45,7 +60,13 @@ impl Writer { // Safe unwrap. cpath always has multiple segments .create(cpath.parent().unwrap()) .to_internal()?; - self.tmpfile.persist(cpath).to_internal()?; + let res = self.tmpfile.persist(&cpath).to_internal(); + if res.is_err() { + // We might run into conflicts sometimes when persisting files. + // This is ok. We can deal. Let's just make sure the destination + // file actually exists, and we can move on. + std::fs::metadata(cpath).to_internal()?; + } Ok(sri) } } @@ -53,7 +74,12 @@ impl Writer { impl Write for Writer { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.builder.input(&buf); - self.tmpfile.write(&buf) + if let Some(mmap) = &mut self.mmap { + mmap.copy_from_slice(&buf); + Ok(buf.len()) + } else { + self.tmpfile.write(&buf) + } } fn flush(&mut self) -> std::io::Result<()> { @@ -72,6 +98,7 @@ struct Inner { cache: PathBuf, builder: IntegrityOpts, tmpfile: NamedTempFile, + mmap: Option, buf: Vec, last_op: Option, } @@ -84,7 +111,7 @@ enum Operation { impl AsyncWriter { #[allow(clippy::new_ret_no_self)] #[allow(clippy::needless_lifetimes)] - pub async fn new(cache: &Path, algo: Algorithm) -> Result { + pub async fn new(cache: &Path, algo: Algorithm, size: Option) -> Result { let cache_path = cache.to_path_buf(); let mut tmp_path = cache_path.clone(); tmp_path.push("tmp"); @@ -93,12 +120,23 @@ impl AsyncWriter { .create(&tmp_path) .await .to_internal()?; + let tmpfile = task::spawn_blocking(|| NamedTempFile::new_in(tmp_path)) + .await + .to_internal()?; + let mmap = if let Some(size) = size { + if size <= MAX_MMAP_SIZE { + unsafe { MmapMut::map_mut(tmpfile.as_file()).ok() } + } else { + None + } + } else { + None + }; Ok(AsyncWriter(Mutex::new(State::Idle(Some(Inner { cache: cache_path, builder: IntegrityOpts::new().algorithm(algo), - tmpfile: task::spawn_blocking(|| NamedTempFile::new_in(tmp_path)) - .await - .to_internal()?, + mmap, + tmpfile, buf: vec![], last_op: None, }))))) @@ -122,12 +160,11 @@ impl AsyncWriter { let cpath = path::content_path(&inner.cache, &sri); // Start the operation asynchronously. - *state = State::Busy(task::spawn(async move { - let res = afs::DirBuilder::new() + *state = State::Busy(task::spawn_blocking(|| { + let res = std::fs::DirBuilder::new() .recursive(true) // Safe unwrap. cpath always has multiple segments .create(cpath.parent().unwrap()) - .await .with_context(|| { format!( "building directory {} failed", @@ -137,10 +174,26 @@ impl AsyncWriter { if res.is_err() { let _ = s.send(res.map(|_| sri)); } else { - let res = tmpfile.persist(cpath).with_context(|| { + let res = tmpfile.persist(&cpath).with_context(|| { String::from("persisting tempfile failed") }); - let _ = s.send(res.map(|_| sri)); + if res.is_err() { + // We might run into conflicts + // sometimes when persisting files. + // This is ok. We can deal. Let's just + // make sure the destination file + // actually exists, and we can move + // on. + let _ = s.send( + std::fs::metadata(cpath) + .with_context(|| { + String::from("File still doesn't exist") + }) + .map(|_| sri), + ); + } else { + let _ = s.send(res.map(|_| sri)); + } } State::Idle(None) })); @@ -202,9 +255,15 @@ impl AsyncWrite for AsyncWriter { // Start the operation asynchronously. *state = State::Busy(task::spawn_blocking(|| { inner.builder.input(&inner.buf); - let res = inner.tmpfile.write(&inner.buf); - inner.last_op = Some(Operation::Write(res)); - State::Idle(Some(inner)) + if let Some(mmap) = &mut inner.mmap { + mmap.copy_from_slice(&inner.buf); + inner.last_op = Some(Operation::Write(Ok(inner.buf.len()))); + State::Idle(Some(inner)) + } else { + let res = inner.tmpfile.write(&inner.buf); + inner.last_op = Some(Operation::Write(res)); + State::Idle(Some(inner)) + } })); } } @@ -233,6 +292,13 @@ impl AsyncWrite for AsyncWriter { } else { let mut inner = opt.take().unwrap(); + if let Some(mmap) = &inner.mmap { + match mmap.flush_async() { + Ok(_) => (), + Err(e) => return Poll::Ready(Err(e)), + }; + } + // Start the operation asynchronously. *state = State::Busy(task::spawn_blocking(|| { let res = inner.tmpfile.flush(); @@ -286,7 +352,7 @@ mod tests { fn basic_write() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); - let mut writer = Writer::new(&dir, Algorithm::Sha256).unwrap(); + let mut writer = Writer::new(&dir, Algorithm::Sha256, None).unwrap(); writer.write_all(b"hello world").unwrap(); let sri = writer.close().unwrap(); assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string()); @@ -301,7 +367,9 @@ mod tests { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); task::block_on(async { - let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256).await.unwrap(); + let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None) + .await + .unwrap(); writer.write_all(b"hello world").await.unwrap(); let sri = writer.close().await.unwrap(); assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string()); diff --git a/src/put.rs b/src/put.rs index e25ce62..cfaf962 100644 --- a/src/put.rs +++ b/src/put.rs @@ -32,7 +32,11 @@ where D: AsRef<[u8]>, K: AsRef, { - let mut writer = Writer::create(cache.as_ref(), key.as_ref()).await?; + let mut writer = WriteOpts::new() + .algorithm(Algorithm::Sha256) + .size(data.as_ref().len()) + .open(cache.as_ref(), key.as_ref()) + .await?; writer.write_all(data.as_ref()).await.with_context(|| { format!( "Failed to write to cache data for key {} for cache at {:?}", @@ -62,6 +66,7 @@ where { let mut writer = WriteOpts::new() .algorithm(Algorithm::Sha256) + .size(data.as_ref().len()) .open_hash(cache.as_ref()) .await?; writer.write_all(data.as_ref()).await.with_context(|| { @@ -88,7 +93,9 @@ impl AsyncWrite for Writer { cx: &mut TaskContext<'_>, buf: &[u8], ) -> Poll> { - Pin::new(&mut self.writer).poll_write(cx, buf) + let amt = futures::ready!(Pin::new(&mut self.writer).poll_write(cx, buf))?; + self.written += amt; + Poll::Ready(Ok(amt)) } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { @@ -180,6 +187,37 @@ where cache.as_ref() ) })?; + writer.written = data.as_ref().len(); + writer.commit() +} + +/// Writes `data` to the `cache` synchronously, skipping associating a key with it. +/// +/// ## Example +/// ```no_run +/// use std::io::Read; +/// +/// fn main() -> cacache::Result<()> { +/// let data = cacache::write_hash_sync("./my-cache", b"hello")?; +/// Ok(()) +/// } +/// ``` +pub fn write_hash_sync(cache: P, data: D) -> Result +where + P: AsRef, + D: AsRef<[u8]>, +{ + let mut writer = WriteOpts::new() + .algorithm(Algorithm::Sha256) + .size(data.as_ref().len()) + .open_hash_sync(cache.as_ref())?; + writer.write_all(data.as_ref()).with_context(|| { + format!( + "Failed to write to cache data for cache at {:?}", + cache.as_ref() + ) + })?; + writer.written = data.as_ref().len(); writer.commit() } @@ -212,6 +250,7 @@ impl WriteOpts { writer: write::AsyncWriter::new( cache.as_ref(), *self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256), + None, ) .await?, opts: self, @@ -230,6 +269,7 @@ impl WriteOpts { writer: write::AsyncWriter::new( cache.as_ref(), *self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256), + self.size, ) .await?, opts: self, @@ -244,11 +284,30 @@ impl WriteOpts { { Ok(SyncWriter { cache: cache.as_ref().to_path_buf(), - key: String::from(key.as_ref()), + key: Some(String::from(key.as_ref())), written: 0, writer: write::Writer::new( cache.as_ref(), *self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256), + self.size, + )?, + opts: self, + }) + } + + /// Opens the file handle for writing, without a key returning an SyncWriter instance. + pub fn open_hash_sync

(self, cache: P) -> Result + where + P: AsRef, + { + Ok(SyncWriter { + cache: cache.as_ref().to_path_buf(), + key: None, + written: 0, + writer: write::Writer::new( + cache.as_ref(), + *self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256), + self.size, )?, opts: self, }) @@ -293,7 +352,7 @@ impl WriteOpts { /// A reference to an open file writing to the cache. pub struct SyncWriter { cache: PathBuf, - key: String, + key: Option, written: usize, pub(crate) writer: write::Writer, opts: WriteOpts, @@ -301,7 +360,9 @@ pub struct SyncWriter { impl Write for SyncWriter { fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.writer.write(buf) + let written = self.writer.write(buf)?; + self.written += written; + Ok(written) } fn flush(&mut self) -> std::io::Result<()> { self.writer.flush() @@ -338,28 +399,25 @@ impl SyncWriter { /// Must be called manually in order to complete the writing process, /// otherwise everything will be thrown out. pub fn commit(mut self) -> Result { - let key = self.key; let cache = self.cache; - let writer_sri = self.writer.close().with_context(|| { - format!( - "Failed to properly close save file data for key {} in cache at {:?}", - key, cache - ) - })?; + let writer_sri = self.writer.close()?; if let Some(sri) = &self.opts.sri { - // TODO - ssri should have a .matches method if sri.matches(&writer_sri).is_none() { return Err(ssri::Error::IntegrityCheckError(sri.clone(), writer_sri))?; } } else { - self.opts.sri = Some(writer_sri); + self.opts.sri = Some(writer_sri.clone()); } if let Some(size) = self.opts.size { if size != self.written { return Err(Error::SizeError(size, self.written))?; } } - index::insert(&cache, &key, self.opts) + if let Some(key) = self.key { + index::insert(&cache, &key, self.opts) + } else { + Ok(writer_sri) + } } }