mmap file writes and add unindexed writing to cache

This commit is contained in:
Kat Marchán 2020-07-18 10:14:50 -07:00
parent d41bb21a69
commit 39f8ecd055
No known key found for this signature in database
GPG Key ID: AEB529C08A3C7E9E
4 changed files with 172 additions and 45 deletions

View File

@ -31,6 +31,7 @@ either = "1.5.3"
async-std = { version = "1.0.1", features = ["unstable"] }
thiserror = "1.0.5"
futures = "0.3.1"
memmap = "0.7.0"
[dev-dependencies]
async-attributes = "1.1.1"

View File

@ -194,18 +194,18 @@ fn write_hash_async(c: &mut Criterion) {
criterion_group!(
benches,
// baseline_read_sync,
// baseline_read_many_sync,
// baseline_read_async,
// baseline_read_many_async,
// read_hash_async,
// read_hash_many_async,
// read_async,
baseline_read_sync,
baseline_read_many_sync,
baseline_read_async,
baseline_read_many_async,
read_hash_async,
read_hash_many_async,
read_async,
write_hash_async,
// read_hash_sync,
// read_hash_many_sync,
// read_sync,
// read_hash_async_big_data,
// read_hash_sync_big_data
read_hash_sync,
read_hash_many_sync,
read_sync,
read_hash_async_big_data,
read_hash_sync_big_data
);
criterion_main!(benches);

View File

@ -9,20 +9,24 @@ use async_std::future::Future;
use async_std::task::{self, Context, JoinHandle, Poll};
use futures::io::AsyncWrite;
use futures::prelude::*;
use memmap::MmapMut;
use ssri::{Algorithm, Integrity, IntegrityOpts};
use tempfile::NamedTempFile;
use crate::content::path;
use crate::errors::{Internal, Result};
pub const MAX_MMAP_SIZE: usize = 1 * 1024 * 1024;
pub struct Writer {
cache: PathBuf,
builder: IntegrityOpts,
mmap: Option<MmapMut>,
tmpfile: NamedTempFile,
}
impl Writer {
pub fn new(cache: &Path, algo: Algorithm) -> Result<Writer> {
pub fn new(cache: &Path, algo: Algorithm, size: Option<usize>) -> Result<Writer> {
let cache_path = cache.to_path_buf();
let mut tmp_path = cache_path.clone();
tmp_path.push("tmp");
@ -30,10 +34,21 @@ impl Writer {
.recursive(true)
.create(&tmp_path)
.to_internal()?;
let tmpfile = NamedTempFile::new_in(tmp_path).to_internal()?;
let mmap = if let Some(size) = size {
if size <= MAX_MMAP_SIZE {
unsafe { MmapMut::map_mut(tmpfile.as_file()).ok() }
} else {
None
}
} else {
None
};
Ok(Writer {
cache: cache_path,
builder: IntegrityOpts::new().algorithm(algo),
tmpfile: NamedTempFile::new_in(tmp_path).to_internal()?,
tmpfile,
mmap,
})
}
@ -45,7 +60,13 @@ impl Writer {
// Safe unwrap. cpath always has multiple segments
.create(cpath.parent().unwrap())
.to_internal()?;
self.tmpfile.persist(cpath).to_internal()?;
let res = self.tmpfile.persist(&cpath).to_internal();
if res.is_err() {
// We might run into conflicts sometimes when persisting files.
// This is ok. We can deal. Let's just make sure the destination
// file actually exists, and we can move on.
std::fs::metadata(cpath).to_internal()?;
}
Ok(sri)
}
}
@ -53,7 +74,12 @@ impl Writer {
impl Write for Writer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.builder.input(&buf);
self.tmpfile.write(&buf)
if let Some(mmap) = &mut self.mmap {
mmap.copy_from_slice(&buf);
Ok(buf.len())
} else {
self.tmpfile.write(&buf)
}
}
fn flush(&mut self) -> std::io::Result<()> {
@ -72,6 +98,7 @@ struct Inner {
cache: PathBuf,
builder: IntegrityOpts,
tmpfile: NamedTempFile,
mmap: Option<MmapMut>,
buf: Vec<u8>,
last_op: Option<Operation>,
}
@ -84,7 +111,7 @@ enum Operation {
impl AsyncWriter {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::needless_lifetimes)]
pub async fn new(cache: &Path, algo: Algorithm) -> Result<AsyncWriter> {
pub async fn new(cache: &Path, algo: Algorithm, size: Option<usize>) -> Result<AsyncWriter> {
let cache_path = cache.to_path_buf();
let mut tmp_path = cache_path.clone();
tmp_path.push("tmp");
@ -93,12 +120,23 @@ impl AsyncWriter {
.create(&tmp_path)
.await
.to_internal()?;
let tmpfile = task::spawn_blocking(|| NamedTempFile::new_in(tmp_path))
.await
.to_internal()?;
let mmap = if let Some(size) = size {
if size <= MAX_MMAP_SIZE {
unsafe { MmapMut::map_mut(tmpfile.as_file()).ok() }
} else {
None
}
} else {
None
};
Ok(AsyncWriter(Mutex::new(State::Idle(Some(Inner {
cache: cache_path,
builder: IntegrityOpts::new().algorithm(algo),
tmpfile: task::spawn_blocking(|| NamedTempFile::new_in(tmp_path))
.await
.to_internal()?,
mmap,
tmpfile,
buf: vec![],
last_op: None,
})))))
@ -122,12 +160,11 @@ impl AsyncWriter {
let cpath = path::content_path(&inner.cache, &sri);
// Start the operation asynchronously.
*state = State::Busy(task::spawn(async move {
let res = afs::DirBuilder::new()
*state = State::Busy(task::spawn_blocking(|| {
let res = std::fs::DirBuilder::new()
.recursive(true)
// Safe unwrap. cpath always has multiple segments
.create(cpath.parent().unwrap())
.await
.with_context(|| {
format!(
"building directory {} failed",
@ -137,10 +174,26 @@ impl AsyncWriter {
if res.is_err() {
let _ = s.send(res.map(|_| sri));
} else {
let res = tmpfile.persist(cpath).with_context(|| {
let res = tmpfile.persist(&cpath).with_context(|| {
String::from("persisting tempfile failed")
});
let _ = s.send(res.map(|_| sri));
if res.is_err() {
// We might run into conflicts
// sometimes when persisting files.
// This is ok. We can deal. Let's just
// make sure the destination file
// actually exists, and we can move
// on.
let _ = s.send(
std::fs::metadata(cpath)
.with_context(|| {
String::from("File still doesn't exist")
})
.map(|_| sri),
);
} else {
let _ = s.send(res.map(|_| sri));
}
}
State::Idle(None)
}));
@ -202,9 +255,15 @@ impl AsyncWrite for AsyncWriter {
// Start the operation asynchronously.
*state = State::Busy(task::spawn_blocking(|| {
inner.builder.input(&inner.buf);
let res = inner.tmpfile.write(&inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
if let Some(mmap) = &mut inner.mmap {
mmap.copy_from_slice(&inner.buf);
inner.last_op = Some(Operation::Write(Ok(inner.buf.len())));
State::Idle(Some(inner))
} else {
let res = inner.tmpfile.write(&inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
}
}));
}
}
@ -233,6 +292,13 @@ impl AsyncWrite for AsyncWriter {
} else {
let mut inner = opt.take().unwrap();
if let Some(mmap) = &inner.mmap {
match mmap.flush_async() {
Ok(_) => (),
Err(e) => return Poll::Ready(Err(e)),
};
}
// Start the operation asynchronously.
*state = State::Busy(task::spawn_blocking(|| {
let res = inner.tmpfile.flush();
@ -286,7 +352,7 @@ mod tests {
fn basic_write() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut writer = Writer::new(&dir, Algorithm::Sha256).unwrap();
let mut writer = Writer::new(&dir, Algorithm::Sha256, None).unwrap();
writer.write_all(b"hello world").unwrap();
let sri = writer.close().unwrap();
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());
@ -301,7 +367,9 @@ mod tests {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
task::block_on(async {
let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256).await.unwrap();
let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None)
.await
.unwrap();
writer.write_all(b"hello world").await.unwrap();
let sri = writer.close().await.unwrap();
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());

View File

@ -32,7 +32,11 @@ where
D: AsRef<[u8]>,
K: AsRef<str>,
{
let mut writer = Writer::create(cache.as_ref(), key.as_ref()).await?;
let mut writer = WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(data.as_ref().len())
.open(cache.as_ref(), key.as_ref())
.await?;
writer.write_all(data.as_ref()).await.with_context(|| {
format!(
"Failed to write to cache data for key {} for cache at {:?}",
@ -62,6 +66,7 @@ where
{
let mut writer = WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(data.as_ref().len())
.open_hash(cache.as_ref())
.await?;
writer.write_all(data.as_ref()).await.with_context(|| {
@ -88,7 +93,9 @@ impl AsyncWrite for Writer {
cx: &mut TaskContext<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.writer).poll_write(cx, buf)
let amt = futures::ready!(Pin::new(&mut self.writer).poll_write(cx, buf))?;
self.written += amt;
Poll::Ready(Ok(amt))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<std::io::Result<()>> {
@ -180,6 +187,37 @@ where
cache.as_ref()
)
})?;
writer.written = data.as_ref().len();
writer.commit()
}
/// Writes `data` to the `cache` synchronously, skipping associating a key with it.
///
/// ## Example
/// ```no_run
/// use std::io::Read;
///
/// fn main() -> cacache::Result<()> {
/// let data = cacache::write_hash_sync("./my-cache", b"hello")?;
/// Ok(())
/// }
/// ```
pub fn write_hash_sync<P, D>(cache: P, data: D) -> Result<Integrity>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
{
let mut writer = WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(data.as_ref().len())
.open_hash_sync(cache.as_ref())?;
writer.write_all(data.as_ref()).with_context(|| {
format!(
"Failed to write to cache data for cache at {:?}",
cache.as_ref()
)
})?;
writer.written = data.as_ref().len();
writer.commit()
}
@ -212,6 +250,7 @@ impl WriteOpts {
writer: write::AsyncWriter::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
None,
)
.await?,
opts: self,
@ -230,6 +269,7 @@ impl WriteOpts {
writer: write::AsyncWriter::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
self.size,
)
.await?,
opts: self,
@ -244,11 +284,30 @@ impl WriteOpts {
{
Ok(SyncWriter {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
key: Some(String::from(key.as_ref())),
written: 0,
writer: write::Writer::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
self.size,
)?,
opts: self,
})
}
/// Opens the file handle for writing, without a key returning an SyncWriter instance.
pub fn open_hash_sync<P>(self, cache: P) -> Result<SyncWriter>
where
P: AsRef<Path>,
{
Ok(SyncWriter {
cache: cache.as_ref().to_path_buf(),
key: None,
written: 0,
writer: write::Writer::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
self.size,
)?,
opts: self,
})
@ -293,7 +352,7 @@ impl WriteOpts {
/// A reference to an open file writing to the cache.
pub struct SyncWriter {
cache: PathBuf,
key: String,
key: Option<String>,
written: usize,
pub(crate) writer: write::Writer,
opts: WriteOpts,
@ -301,7 +360,9 @@ pub struct SyncWriter {
impl Write for SyncWriter {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.writer.write(buf)
let written = self.writer.write(buf)?;
self.written += written;
Ok(written)
}
fn flush(&mut self) -> std::io::Result<()> {
self.writer.flush()
@ -338,28 +399,25 @@ impl SyncWriter {
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub fn commit(mut self) -> Result<Integrity> {
let key = self.key;
let cache = self.cache;
let writer_sri = self.writer.close().with_context(|| {
format!(
"Failed to properly close save file data for key {} in cache at {:?}",
key, cache
)
})?;
let writer_sri = self.writer.close()?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
if sri.matches(&writer_sri).is_none() {
return Err(ssri::Error::IntegrityCheckError(sri.clone(), writer_sri))?;
}
} else {
self.opts.sri = Some(writer_sri);
self.opts.sri = Some(writer_sri.clone());
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError(size, self.written))?;
}
}
index::insert(&cache, &key, self.opts)
if let Some(key) = self.key {
index::insert(&cache, &key, self.opts)
} else {
Ok(writer_sri)
}
}
}