feat(write): Use mmap for small writes (#26)

BREAKING CHANGE: This bumps the minimum Rust version from 1.39 to 1.43. Thanks, remove_dir_all! >:(
This commit is contained in:
Kat Marchán 2020-07-18 10:43:38 -07:00 committed by GitHub
parent addf858ab7
commit 803d0c3ede
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 235 additions and 40 deletions

View File

@ -28,7 +28,7 @@ jobs:
runs-on: ${{ matrix.os }}
strategy:
matrix:
rust: [1.39.0, stable]
rust: [1.43.0, stable]
os: [ubuntu-latest, macOS-latest, windows-latest]
steps:

View File

@ -31,6 +31,7 @@ either = "1.5.3"
async-std = { version = "1.0.1", features = ["unstable"] }
thiserror = "1.0.5"
futures = "0.3.1"
memmap = "0.7.0"
[dev-dependencies]
async-attributes = "1.1.1"

View File

@ -30,6 +30,8 @@ Using [`cargo-edit`](https://crates.io/crates/cargo-edit)
`$ cargo add cacache`
Minimum supported Rust version is `1.43.0`.
## Documentation
- [API Docs](https://docs.rs/cacache)

View File

@ -174,6 +174,20 @@ fn read_hash_async_big_data(c: &mut Criterion) {
});
}
fn write_hash_async(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
c.bench_function("put::data", move |b| {
b.iter_custom(|iters| {
let start = std::time::Instant::now();
for i in 0..iters {
task::block_on(cacache::write_hash(&cache, format!("hello world{}", i))).unwrap();
}
start.elapsed()
})
});
}
criterion_group!(
benches,
baseline_read_sync,
@ -182,9 +196,10 @@ criterion_group!(
baseline_read_many_async,
read_hash_async,
read_hash_many_async,
read_async,
write_hash_async,
read_hash_sync,
read_hash_many_sync,
read_async,
read_sync,
read_hash_async_big_data,
read_hash_sync_big_data

View File

@ -9,20 +9,24 @@ use async_std::future::Future;
use async_std::task::{self, Context, JoinHandle, Poll};
use futures::io::AsyncWrite;
use futures::prelude::*;
use memmap::MmapMut;
use ssri::{Algorithm, Integrity, IntegrityOpts};
use tempfile::NamedTempFile;
use crate::content::path;
use crate::errors::{Internal, Result};
pub const MAX_MMAP_SIZE: usize = 1 * 1024 * 1024;
pub struct Writer {
cache: PathBuf,
builder: IntegrityOpts,
mmap: Option<MmapMut>,
tmpfile: NamedTempFile,
}
impl Writer {
pub fn new(cache: &Path, algo: Algorithm) -> Result<Writer> {
pub fn new(cache: &Path, algo: Algorithm, size: Option<usize>) -> Result<Writer> {
let cache_path = cache.to_path_buf();
let mut tmp_path = cache_path.clone();
tmp_path.push("tmp");
@ -30,10 +34,21 @@ impl Writer {
.recursive(true)
.create(&tmp_path)
.to_internal()?;
let tmpfile = NamedTempFile::new_in(tmp_path).to_internal()?;
let mmap = if let Some(size) = size {
if size <= MAX_MMAP_SIZE {
unsafe { MmapMut::map_mut(tmpfile.as_file()).ok() }
} else {
None
}
} else {
None
};
Ok(Writer {
cache: cache_path,
builder: IntegrityOpts::new().algorithm(algo),
tmpfile: NamedTempFile::new_in(tmp_path).to_internal()?,
tmpfile,
mmap,
})
}
@ -45,7 +60,13 @@ impl Writer {
// Safe unwrap. cpath always has multiple segments
.create(cpath.parent().unwrap())
.to_internal()?;
self.tmpfile.persist(cpath).to_internal()?;
let res = self.tmpfile.persist(&cpath).to_internal();
if res.is_err() {
// We might run into conflicts sometimes when persisting files.
// This is ok. We can deal. Let's just make sure the destination
// file actually exists, and we can move on.
std::fs::metadata(cpath).to_internal()?;
}
Ok(sri)
}
}
@ -53,7 +74,12 @@ impl Writer {
impl Write for Writer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.builder.input(&buf);
self.tmpfile.write(&buf)
if let Some(mmap) = &mut self.mmap {
mmap.copy_from_slice(&buf);
Ok(buf.len())
} else {
self.tmpfile.write(&buf)
}
}
fn flush(&mut self) -> std::io::Result<()> {
@ -72,6 +98,7 @@ struct Inner {
cache: PathBuf,
builder: IntegrityOpts,
tmpfile: NamedTempFile,
mmap: Option<MmapMut>,
buf: Vec<u8>,
last_op: Option<Operation>,
}
@ -84,7 +111,7 @@ enum Operation {
impl AsyncWriter {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::needless_lifetimes)]
pub async fn new(cache: &Path, algo: Algorithm) -> Result<AsyncWriter> {
pub async fn new(cache: &Path, algo: Algorithm, size: Option<usize>) -> Result<AsyncWriter> {
let cache_path = cache.to_path_buf();
let mut tmp_path = cache_path.clone();
tmp_path.push("tmp");
@ -93,12 +120,23 @@ impl AsyncWriter {
.create(&tmp_path)
.await
.to_internal()?;
let tmpfile = task::spawn_blocking(|| NamedTempFile::new_in(tmp_path))
.await
.to_internal()?;
let mmap = if let Some(size) = size {
if size <= MAX_MMAP_SIZE {
unsafe { MmapMut::map_mut(tmpfile.as_file()).ok() }
} else {
None
}
} else {
None
};
Ok(AsyncWriter(Mutex::new(State::Idle(Some(Inner {
cache: cache_path,
builder: IntegrityOpts::new().algorithm(algo),
tmpfile: task::spawn_blocking(|| NamedTempFile::new_in(tmp_path))
.await
.to_internal()?,
mmap,
tmpfile,
buf: vec![],
last_op: None,
})))))
@ -122,12 +160,11 @@ impl AsyncWriter {
let cpath = path::content_path(&inner.cache, &sri);
// Start the operation asynchronously.
*state = State::Busy(task::spawn(async move {
let res = afs::DirBuilder::new()
*state = State::Busy(task::spawn_blocking(|| {
let res = std::fs::DirBuilder::new()
.recursive(true)
// Safe unwrap. cpath always has multiple segments
.create(cpath.parent().unwrap())
.await
.with_context(|| {
format!(
"building directory {} failed",
@ -137,10 +174,26 @@ impl AsyncWriter {
if res.is_err() {
let _ = s.send(res.map(|_| sri));
} else {
let res = tmpfile.persist(cpath).with_context(|| {
let res = tmpfile.persist(&cpath).with_context(|| {
String::from("persisting tempfile failed")
});
let _ = s.send(res.map(|_| sri));
if res.is_err() {
// We might run into conflicts
// sometimes when persisting files.
// This is ok. We can deal. Let's just
// make sure the destination file
// actually exists, and we can move
// on.
let _ = s.send(
std::fs::metadata(cpath)
.with_context(|| {
String::from("File still doesn't exist")
})
.map(|_| sri),
);
} else {
let _ = s.send(res.map(|_| sri));
}
}
State::Idle(None)
}));
@ -202,9 +255,15 @@ impl AsyncWrite for AsyncWriter {
// Start the operation asynchronously.
*state = State::Busy(task::spawn_blocking(|| {
inner.builder.input(&inner.buf);
let res = inner.tmpfile.write(&inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
if let Some(mmap) = &mut inner.mmap {
mmap.copy_from_slice(&inner.buf);
inner.last_op = Some(Operation::Write(Ok(inner.buf.len())));
State::Idle(Some(inner))
} else {
let res = inner.tmpfile.write(&inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
}
}));
}
}
@ -233,6 +292,13 @@ impl AsyncWrite for AsyncWriter {
} else {
let mut inner = opt.take().unwrap();
if let Some(mmap) = &inner.mmap {
match mmap.flush_async() {
Ok(_) => (),
Err(e) => return Poll::Ready(Err(e)),
};
}
// Start the operation asynchronously.
*state = State::Busy(task::spawn_blocking(|| {
let res = inner.tmpfile.flush();
@ -286,7 +352,7 @@ mod tests {
fn basic_write() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut writer = Writer::new(&dir, Algorithm::Sha256).unwrap();
let mut writer = Writer::new(&dir, Algorithm::Sha256, None).unwrap();
writer.write_all(b"hello world").unwrap();
let sri = writer.close().unwrap();
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());
@ -301,7 +367,9 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
task::block_on(async {
let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256).await.unwrap();
let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None)
.await
.unwrap();
writer.write_all(b"hello world").await.unwrap();
let sri = writer.close().await.unwrap();
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());

View File

@ -32,7 +32,11 @@ where
D: AsRef<[u8]>,
K: AsRef<str>,
{
let mut writer = Writer::create(cache.as_ref(), key.as_ref()).await?;
let mut writer = WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(data.as_ref().len())
.open(cache.as_ref(), key.as_ref())
.await?;
writer.write_all(data.as_ref()).await.with_context(|| {
format!(
"Failed to write to cache data for key {} for cache at {:?}",
@ -43,10 +47,41 @@ where
writer.commit().await
}
/// Writes `data` to the `cache`, skipping associating an index key with it.
///
/// ## Example
/// ```no_run
/// use async_attributes;
///
/// #[async_attributes::main]
/// async fn main() -> cacache::Result<()> {
/// cacache::write_hash("./my-cache", b"hello").await?;
/// Ok(())
/// }
/// ```
pub async fn write_hash<P, D>(cache: P, data: D) -> Result<Integrity>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
{
let mut writer = WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(data.as_ref().len())
.open_hash(cache.as_ref())
.await?;
writer.write_all(data.as_ref()).await.with_context(|| {
format!(
"Failed to write to cache data for cache at {:?}",
cache.as_ref()
)
})?;
writer.commit().await
}
/// A reference to an open file writing to the cache.
pub struct Writer {
cache: PathBuf,
key: String,
key: Option<String>,
written: usize,
pub(crate) writer: write::AsyncWriter,
opts: WriteOpts,
@ -58,7 +93,9 @@ impl AsyncWrite for Writer {
cx: &mut TaskContext<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.writer).poll_write(cx, buf)
let amt = futures::ready!(Pin::new(&mut self.writer).poll_write(cx, buf))?;
self.written += amt;
Poll::Ready(Ok(amt))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<std::io::Result<()>> {
@ -103,7 +140,6 @@ impl Writer {
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub async fn commit(mut self) -> Result<Integrity> {
let key = self.key;
let cache = self.cache;
let writer_sri = self.writer.close().await?;
if let Some(sri) = &self.opts.sri {
@ -111,14 +147,18 @@ impl Writer {
return Err(ssri::Error::IntegrityCheckError(sri.clone(), writer_sri))?;
}
} else {
self.opts.sri = Some(writer_sri);
self.opts.sri = Some(writer_sri.clone());
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError(size, self.written));
}
}
index::insert_async(&cache, &key, self.opts).await
if let Some(key) = self.key {
index::insert_async(&cache, &key, self.opts).await
} else {
Ok(writer_sri)
}
}
}
@ -147,6 +187,37 @@ where
cache.as_ref()
)
})?;
writer.written = data.as_ref().len();
writer.commit()
}
/// Writes `data` to the `cache` synchronously, skipping associating a key with it.
///
/// ## Example
/// ```no_run
/// use std::io::Read;
///
/// fn main() -> cacache::Result<()> {
/// let data = cacache::write_hash_sync("./my-cache", b"hello")?;
/// Ok(())
/// }
/// ```
pub fn write_hash_sync<P, D>(cache: P, data: D) -> Result<Integrity>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
{
let mut writer = WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(data.as_ref().len())
.open_hash_sync(cache.as_ref())?;
writer.write_all(data.as_ref()).with_context(|| {
format!(
"Failed to write to cache data for cache at {:?}",
cache.as_ref()
)
})?;
writer.written = data.as_ref().len();
writer.commit()
}
@ -174,11 +245,31 @@ impl WriteOpts {
{
Ok(Writer {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
key: Some(String::from(key.as_ref())),
written: 0,
writer: write::AsyncWriter::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
None,
)
.await?,
opts: self,
})
}
/// Opens the file handle for writing, without a key returning an Writer instance.
pub async fn open_hash<P>(self, cache: P) -> Result<Writer>
where
P: AsRef<Path>,
{
Ok(Writer {
cache: cache.as_ref().to_path_buf(),
key: None,
written: 0,
writer: write::AsyncWriter::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
self.size,
)
.await?,
opts: self,
@ -193,11 +284,30 @@ impl WriteOpts {
{
Ok(SyncWriter {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
key: Some(String::from(key.as_ref())),
written: 0,
writer: write::Writer::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
self.size,
)?,
opts: self,
})
}
/// Opens the file handle for writing, without a key returning an SyncWriter instance.
pub fn open_hash_sync<P>(self, cache: P) -> Result<SyncWriter>
where
P: AsRef<Path>,
{
Ok(SyncWriter {
cache: cache.as_ref().to_path_buf(),
key: None,
written: 0,
writer: write::Writer::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
self.size,
)?,
opts: self,
})
@ -242,7 +352,7 @@ impl WriteOpts {
/// A reference to an open file writing to the cache.
pub struct SyncWriter {
cache: PathBuf,
key: String,
key: Option<String>,
written: usize,
pub(crate) writer: write::Writer,
opts: WriteOpts,
@ -250,7 +360,9 @@ pub struct SyncWriter {
impl Write for SyncWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.writer.write(buf)
let written = self.writer.write(buf)?;
self.written += written;
Ok(written)
}
fn flush(&mut self) -> std::io::Result<()> {
self.writer.flush()
@ -287,28 +399,25 @@ impl SyncWriter {
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub fn commit(mut self) -> Result<Integrity> {
let key = self.key;
let cache = self.cache;
let writer_sri = self.writer.close().with_context(|| {
format!(
"Failed to properly close save file data for key {} in cache at {:?}",
key, cache
)
})?;
let writer_sri = self.writer.close()?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
if sri.matches(&writer_sri).is_none() {
return Err(ssri::Error::IntegrityCheckError(sri.clone(), writer_sri))?;
}
} else {
self.opts.sri = Some(writer_sri);
self.opts.sri = Some(writer_sri.clone());
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError(size, self.written))?;
}
}
index::insert(&cache, &key, self.opts)
if let Some(key) = self.key {
index::insert(&cache, &key, self.opts)
} else {
Ok(writer_sri)
}
}
}