From 662aea9b5a829ca4ca9673f2d82917065d675c62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kat=20March=C3=A1n?= Date: Wed, 16 Oct 2019 18:32:31 -0400 Subject: [PATCH] feat(async): reorganize async APIs to be the primary APIs BREAKING CHANGE: the async_* namespaces are gone, and all the previously-sync APIs (get::data, put::data, etc), are all suffixed with _sync now. You'll need to adjust your usage accordingly. --- README.md | 27 +++++--- benches/benchmarks.rs | 74 ++++++++++------------ src/async_get.rs | 128 -------------------------------------- src/async_put.rs | 127 ------------------------------------- src/async_rm.rs | 36 ----------- src/get.rs | 141 ++++++++++++++++++++++++++++++++++++++---- src/lib.rs | 31 +++++----- src/put.rs | 129 ++++++++++++++++++++++++++++++++++---- src/rm.rs | 53 +++++++++++----- 9 files changed, 350 insertions(+), 396 deletions(-) delete mode 100644 src/async_get.rs delete mode 100644 src/async_put.rs delete mode 100644 src/async_rm.rs diff --git a/README.md b/README.md index 58e7e2e..8608c48 100644 --- a/README.md +++ b/README.md @@ -1,19 +1,27 @@ # cacache -A Rust port of [`cacache` for Node.js](https://npm.im/cacache). - -A high-performance, concurrent, content-addressable disk cache. +A high-performance, concurrent, content-addressable disk cache, optimized for async APIs. ## Example ```rust use cacache; -use tempfile; -let tmp = tempfile::tempdir().unwrap(); -let dir = tmp.path().to_owned(); -cacache::put::data(&dir, "key", b"my-data").unwrap(); -let data = cacache::get::read(&dir, "key").unwrap(); -assert_eq!(data, b"my-data"); +use async_attributes; + +#[async_attributes::main] +async fn main() -> Result<(), cacache::Error> { + let dir = String::from("./my-cache"); + + // Write some data! + cacache::put::data(&dir, "key", b"my-async-data").await?; + + // Get the data back! + let data = cacache::get::data(&dir, "key").await?; + assert_eq!(data, b"my-async-data"); + + // Clean up the data! + cacache::rm::all(&dir).await?; +} ``` ## Install @@ -28,6 +36,7 @@ Using [`cargo-edit`](https://crates.io/crates/cargo-edit) ## Features +- First-class async support, using [`async-std`](https://crates.io/crates/async-std) as its runtime. Sync APIs are available but secondary. - Extraction by key or by content address (shasum, etc) - [Subresource Integrity](#integrity) web standard support - Multi-hash support - safely host sha1, sha512, etc, in a single cache diff --git a/benches/benchmarks.rs b/benches/benchmarks.rs index 6d2345c..bb1709a 100644 --- a/benches/benchmarks.rs +++ b/benches/benchmarks.rs @@ -3,92 +3,82 @@ use cacache; use criterion::{black_box, criterion_group, criterion_main, Criterion}; use tempfile; -fn get_hash(c: &mut Criterion) { +fn get_data_hash_sync(c: &mut Criterion) { let tmp = tempfile::tempdir().unwrap(); let cache = tmp.path().to_owned(); let data = b"hello world".to_vec(); - let sri = cacache::put::data(&cache, "hello", data).unwrap(); - c.bench_function("get_hash", move |b| { - b.iter(|| cacache::get::data_hash(black_box(&cache), black_box(&sri)).unwrap()) + let sri = cacache::put::data_sync(&cache, "hello", data).unwrap(); + c.bench_function("get::data_hash_sync", move |b| { + b.iter(|| cacache::get::data_hash_sync(black_box(&cache), black_box(&sri)).unwrap()) }); } -fn get(c: &mut Criterion) { +fn get_data_sync(c: &mut Criterion) { let tmp = tempfile::tempdir().unwrap(); let cache = tmp.path().to_owned(); let data = b"hello world".to_vec(); - cacache::put::data(&cache, "hello", data).unwrap(); - cacache::get::data(&cache, "hello").unwrap(); - c.bench_function("get", move |b| { - b.iter(|| cacache::get::data(black_box(&cache), black_box(String::from("hello"))).unwrap()) + cacache::put::data_sync(&cache, "hello", data).unwrap(); + cacache::get::data_sync(&cache, "hello").unwrap(); + c.bench_function("get::data_sync", move |b| { + b.iter(|| { + cacache::get::data_sync(black_box(&cache), black_box(String::from("hello"))).unwrap() + }) }); } -fn get_hash_big_data(c: &mut Criterion) { +fn get_data_hash_sync_big_data(c: &mut Criterion) { let tmp = tempfile::tempdir().unwrap(); let cache = tmp.path().to_owned(); let data = vec![1; 1024 * 1024 * 5]; - let sri = cacache::put::data(&cache, "hello", data).unwrap(); + let sri = cacache::put::data_sync(&cache, "hello", data).unwrap(); c.bench_function("get_hash_big_data", move |b| { - b.iter(|| cacache::get::data_hash(black_box(&cache), black_box(&sri)).unwrap()) + b.iter(|| cacache::get::data_hash_sync(black_box(&cache), black_box(&sri)).unwrap()) }); } -fn async_get_hash(c: &mut Criterion) { +fn get_data_hash_async(c: &mut Criterion) { let tmp = tempfile::tempdir().unwrap(); let cache = tmp.path().to_owned(); let data = b"hello world".to_vec(); - let sri = cacache::put::data(&cache, "hello", data).unwrap(); - c.bench_function("async_get_hash", move |b| { + let sri = cacache::put::data_sync(&cache, "hello", data).unwrap(); + c.bench_function("get::data_hash", move |b| { b.iter(|| { - task::block_on(cacache::async_get::data_hash( - black_box(&cache), - black_box(&sri), - )) - .unwrap() + task::block_on(cacache::get::data_hash(black_box(&cache), black_box(&sri))).unwrap() }) }); } -fn async_get(c: &mut Criterion) { +fn get_data_async(c: &mut Criterion) { let tmp = tempfile::tempdir().unwrap(); let cache = tmp.path().to_owned(); let data = b"hello world".to_vec(); - cacache::put::data(&cache, "hello", data).unwrap(); - c.bench_function("async_get", move |b| { + cacache::put::data_sync(&cache, "hello", data).unwrap(); + c.bench_function("get::data", move |b| { b.iter(|| { - task::block_on(cacache::async_get::data( - black_box(&cache), - black_box("hello"), - )) - .unwrap() + task::block_on(cacache::get::data(black_box(&cache), black_box("hello"))).unwrap() }) }); } -fn async_get_hash_big_data(c: &mut Criterion) { +fn get_data_hash_async_big_data(c: &mut Criterion) { let tmp = tempfile::tempdir().unwrap(); let cache = tmp.path().to_owned(); let data = vec![1; 1024 * 1024 * 5]; - let sri = cacache::put::data(&cache, "hello", data).unwrap(); - c.bench_function("async_get_hash_big_data", move |b| { + let sri = cacache::put::data_sync(&cache, "hello", data).unwrap(); + c.bench_function("get::data_big_data", move |b| { b.iter(|| { - task::block_on(cacache::async_get::data_hash( - black_box(&cache), - black_box(&sri), - )) - .unwrap() + task::block_on(cacache::get::data_hash(black_box(&cache), black_box(&sri))).unwrap() }) }); } criterion_group!( benches, - get_hash, - get, - async_get_hash, - async_get, - get_hash_big_data, - async_get_hash_big_data, + get_data_hash_async, + get_data_hash_sync, + get_data_async, + get_data_sync, + get_data_hash_async_big_data, + get_data_hash_sync_big_data ); criterion_main!(benches); diff --git a/src/async_get.rs b/src/async_get.rs deleted file mode 100644 index 4bfe0c1..0000000 --- a/src/async_get.rs +++ /dev/null @@ -1,128 +0,0 @@ -//! Functions for reading asynchronously from cache. -//! -//! Asynchronous operations are able to trade off some linear performance in -//! exchange for potentially much higher performance on heavily-concurrent -//! loads. -use std::path::Path; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures::prelude::*; -use ssri::{Algorithm, Integrity}; - -use crate::content::read::{self, AsyncReader}; -use crate::errors::Error; -use crate::index::{self, Entry}; - -/// File handle for asynchronously reading from a content entry. -/// -/// Make sure to call `.check()` when done reading to verify that the -/// extracted data passes integrity verification. -pub struct AsyncGet { - reader: AsyncReader, -} - -impl AsyncRead for AsyncGet { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - Pin::new(&mut self.reader).poll_read(cx, buf) - } -} - -impl AsyncGet { - /// Checks that data read from disk passes integrity checks. Returns the - /// algorithm that was used verified the data. Should be called only after - /// all data has been read from disk. - pub fn check(self) -> Result { - self.reader.check() - } -} - -/// Opens a new file handle into the cache, looking it up in the index using -/// `key`. -pub async fn open(cache: P, key: K) -> Result -where - P: AsRef, - K: AsRef, -{ - if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? { - open_hash(cache, entry.integrity).await - } else { - Err(Error::NotFound) - } -} - -/// Opens a new file handle into the cache, based on its integrity address. -pub async fn open_hash

(cache: P, sri: Integrity) -> Result -where - P: AsRef, -{ - Ok(AsyncGet { - reader: read::open_async(cache.as_ref(), sri).await?, - }) -} - -/// Reads the entire contents of a cache file into a bytes vector, looking the -/// data up by key. -pub async fn data(cache: P, key: K) -> Result, Error> -where - P: AsRef, - K: AsRef, -{ - if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? { - data_hash(cache, &entry.integrity).await - } else { - Err(Error::NotFound) - } -} - -/// Reads the entire contents of a cache file into a bytes vector, looking the -/// data up by its content address. -pub async fn data_hash

(cache: P, sri: &Integrity) -> Result, Error> -where - P: AsRef, -{ - Ok(read::read_async(cache.as_ref(), sri).await?) -} - -/// Copies a cache entry by key to a specified location. -pub async fn copy(cache: P, key: K, to: Q) -> Result -where - P: AsRef, - K: AsRef, - Q: AsRef, -{ - if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? { - copy_hash(cache, &entry.integrity, to).await - } else { - Err(Error::NotFound) - } -} - -/// Copies a cache entry by integrity address to a specified location. -pub async fn copy_hash(cache: P, sri: &Integrity, to: Q) -> Result -where - P: AsRef, - Q: AsRef, -{ - read::copy_async(cache.as_ref(), sri, to.as_ref()).await -} - -/// Gets entry information and metadata for a certain key. -pub async fn info(cache: P, key: K) -> Result, Error> -where - P: AsRef, - K: AsRef, -{ - index::find_async(cache.as_ref(), key.as_ref()).await -} - -/// Returns true if the given hash exists in the cache. -pub async fn hash_exists>(cache: P, sri: &Integrity) -> bool { - read::has_content_async(cache.as_ref(), &sri) - .await - .is_some() -} diff --git a/src/async_put.rs b/src/async_put.rs deleted file mode 100644 index 1ef42f1..0000000 --- a/src/async_put.rs +++ /dev/null @@ -1,127 +0,0 @@ -//! Functions for asynchronously writing to cache. -//! -//! Asynchronous operations are able to trade off some linear performance in -//! exchange for potentially much higher performance on heavily-concurrent -//! loads. -use std::path::{Path, PathBuf}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use futures::prelude::*; -use ssri::{Algorithm, Integrity}; - -use crate::content::write; -use crate::errors::Error; -use crate::index; -pub use crate::put::PutOpts; - -/// Writes `data` to the `cache`, indexing it under `key`. -pub async fn data(cache: P, key: K, data: D) -> Result -where - P: AsRef, - D: AsRef<[u8]>, - K: AsRef, -{ - let mut writer = PutOpts::new() - .algorithm(Algorithm::Sha256) - .open_async(cache.as_ref(), key.as_ref()) - .await?; - writer.write_all(data.as_ref()).await?; - writer.commit().await -} - -impl PutOpts { - /// Opens the file handle for writing, returning a Put instance. - pub async fn open_async(self, cache: P, key: K) -> Result - where - P: AsRef, - K: AsRef, - { - Ok(AsyncPut { - cache: cache.as_ref().to_path_buf(), - key: String::from(key.as_ref()), - written: 0, - writer: write::AsyncWriter::new( - cache.as_ref(), - *self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256), - ) - .await?, - opts: self, - }) - } -} - -/// A reference to an open file writing to the cache. -pub struct AsyncPut { - cache: PathBuf, - key: String, - written: usize, - pub(crate) writer: write::AsyncWriter, - opts: PutOpts, -} - -impl AsyncWrite for AsyncPut { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.writer).poll_write(cx, buf) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.writer).poll_flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.writer).poll_close(cx) - } -} - -impl AsyncPut { - /// Closes the Put handle and writes content and index entries. Also - /// verifies data against `size` and `integrity` options, if provided. - /// Must be called manually in order to complete the writing process, - /// otherwise everything will be thrown out. - pub async fn commit(mut self) -> Result { - let writer_sri = self.writer.close().await?; - if let Some(sri) = &self.opts.sri { - // TODO - ssri should have a .matches method - let algo = sri.pick_algorithm(); - let matched = sri - .hashes - .iter() - .take_while(|h| h.algorithm == algo) - .find(|&h| *h == writer_sri.hashes[0]); - if matched.is_none() { - return Err(Error::IntegrityError); - } - } else { - self.opts.sri = Some(writer_sri); - } - if let Some(size) = self.opts.size { - if size != self.written { - return Err(Error::SizeError); - } - } - index::insert_async(&self.cache, &self.key, self.opts).await - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::async_get; - use async_std::task; - - #[test] - fn round_trip() { - let tmp = tempfile::tempdir().unwrap(); - let dir = tmp.path().to_owned(); - task::block_on(async { - data(&dir, "hello", b"hello").await.unwrap(); - }); - let data = task::block_on(async { async_get::data(&dir, "hello").await.unwrap() }); - assert_eq!(data, b"hello"); - } -} diff --git a/src/async_rm.rs b/src/async_rm.rs deleted file mode 100644 index a79a951..0000000 --- a/src/async_rm.rs +++ /dev/null @@ -1,36 +0,0 @@ -//! Functions for asynchronously removing things from the cache. -//! -//! Asynchronous operations are able to trade off some linear performance in -//! exchange for potentially much higher performance on heavily-concurrent -//! loads. -use std::path::Path; - -use async_std::fs as afs; -use ssri::Integrity; - -use crate::content::rm; -use crate::errors::Error; -use crate::index; - -/// Removes an individual index entry. The associated content will be left -/// intact. -pub async fn entry>(cache: P, key: &str) -> Result<(), Error> { - index::delete_async(cache.as_ref(), &key).await -} - -/// Removes an individual content entry. Any index entries pointing to this -/// content will become invalidated. -pub async fn content>(cache: P, sri: &Integrity) -> Result<(), Error> { - rm::rm_async(cache.as_ref(), &sri).await -} - -/// Removes entire contents of the cache, including temporary files, the entry -/// index, and all content data. -pub async fn all>(cache: P) -> Result<(), Error> { - for entry in cache.as_ref().read_dir()? { - if let Ok(entry) = entry { - afs::remove_dir_all(entry.path()).await?; - } - } - Ok(()) -} diff --git a/src/get.rs b/src/get.rs index 2ff6348..a183f9e 100644 --- a/src/get.rs +++ b/src/get.rs @@ -1,12 +1,129 @@ //! Functions for reading from cache. use std::path::Path; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures::prelude::*; use ssri::{Algorithm, Integrity}; -use crate::content::read::{self, Reader}; +use crate::content::read::{self, AsyncReader, Reader}; use crate::errors::Error; use crate::index::{self, Entry}; +/// File handle for asynchronously reading from a content entry. +/// +/// Make sure to call `.check()` when done reading to verify that the +/// extracted data passes integrity verification. +pub struct AsyncGet { + reader: AsyncReader, +} + +impl AsyncRead for AsyncGet { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut self.reader).poll_read(cx, buf) + } +} + +impl AsyncGet { + /// Checks that data read from disk passes integrity checks. Returns the + /// algorithm that was used verified the data. Should be called only after + /// all data has been read from disk. + pub fn check(self) -> Result { + self.reader.check() + } +} + +/// Opens a new file handle into the cache, looking it up in the index using +/// `key`. +pub async fn open(cache: P, key: K) -> Result +where + P: AsRef, + K: AsRef, +{ + if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? { + open_hash(cache, entry.integrity).await + } else { + Err(Error::NotFound) + } +} + +/// Opens a new file handle into the cache, based on its integrity address. +pub async fn open_hash

(cache: P, sri: Integrity) -> Result +where + P: AsRef, +{ + Ok(AsyncGet { + reader: read::open_async(cache.as_ref(), sri).await?, + }) +} + +/// Reads the entire contents of a cache file into a bytes vector, looking the +/// data up by key. +pub async fn data(cache: P, key: K) -> Result, Error> +where + P: AsRef, + K: AsRef, +{ + if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? { + data_hash(cache, &entry.integrity).await + } else { + Err(Error::NotFound) + } +} + +/// Reads the entire contents of a cache file into a bytes vector, looking the +/// data up by its content address. +pub async fn data_hash

(cache: P, sri: &Integrity) -> Result, Error> +where + P: AsRef, +{ + Ok(read::read_async(cache.as_ref(), sri).await?) +} + +/// Copies a cache entry by key to a specified location. +pub async fn copy(cache: P, key: K, to: Q) -> Result +where + P: AsRef, + K: AsRef, + Q: AsRef, +{ + if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? { + copy_hash(cache, &entry.integrity, to).await + } else { + Err(Error::NotFound) + } +} + +/// Copies a cache entry by integrity address to a specified location. +pub async fn copy_hash(cache: P, sri: &Integrity, to: Q) -> Result +where + P: AsRef, + Q: AsRef, +{ + read::copy_async(cache.as_ref(), sri, to.as_ref()).await +} + +/// Gets entry information and metadata for a certain key. +pub async fn info(cache: P, key: K) -> Result, Error> +where + P: AsRef, + K: AsRef, +{ + index::find_async(cache.as_ref(), key.as_ref()).await +} + +/// Returns true if the given hash exists in the cache. +pub async fn hash_exists>(cache: P, sri: &Integrity) -> bool { + read::has_content_async(cache.as_ref(), &sri) + .await + .is_some() +} + /// File handle for reading from a content entry. /// /// Make sure to call `get.check()` when done reading @@ -33,20 +150,20 @@ impl Get { /// Opens a new file handle into the cache, looking it up in the index using /// `key`. -pub fn open(cache: P, key: K) -> Result +pub fn open_sync(cache: P, key: K) -> Result where P: AsRef, K: AsRef, { if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? { - open_hash(cache, entry.integrity) + open_hash_sync(cache, entry.integrity) } else { Err(Error::NotFound) } } /// Opens a new file handle into the cache, based on its integrity address. -pub fn open_hash

(cache: P, sri: Integrity) -> Result +pub fn open_hash_sync

(cache: P, sri: Integrity) -> Result where P: AsRef, { @@ -57,13 +174,13 @@ where /// Reads the entire contents of a cache file into a bytes vector, looking the /// data up by key. -pub fn data(cache: P, key: K) -> Result, Error> +pub fn data_sync(cache: P, key: K) -> Result, Error> where P: AsRef, K: AsRef, { if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? { - data_hash(cache, &entry.integrity) + data_hash_sync(cache, &entry.integrity) } else { Err(Error::NotFound) } @@ -71,7 +188,7 @@ where /// Reads the entire contents of a cache file into a bytes vector, looking the /// data up by its content address. -pub fn data_hash

(cache: P, sri: &Integrity) -> Result, Error> +pub fn data_hash_sync

(cache: P, sri: &Integrity) -> Result, Error> where P: AsRef, { @@ -79,21 +196,21 @@ where } /// Copies a cache entry by key to a specified location. -pub fn copy(cache: P, key: K, to: Q) -> Result +pub fn copy_sync(cache: P, key: K, to: Q) -> Result where P: AsRef, K: AsRef, Q: AsRef, { if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? { - copy_hash(cache, &entry.integrity, to) + copy_hash_sync(cache, &entry.integrity, to) } else { Err(Error::NotFound) } } /// Copies a cache entry by integrity address to a specified location. -pub fn copy_hash(cache: P, sri: &Integrity, to: Q) -> Result +pub fn copy_hash_sync(cache: P, sri: &Integrity, to: Q) -> Result where P: AsRef, Q: AsRef, @@ -102,7 +219,7 @@ where } /// Gets entry information and metadata for a certain key. -pub fn info(cache: P, key: K) -> Result, Error> +pub fn info_sync(cache: P, key: K) -> Result, Error> where P: AsRef, K: AsRef, @@ -111,6 +228,6 @@ where } /// Returns true if the given hash exists in the cache. -pub fn hash_exists>(cache: P, sri: &Integrity) -> bool { +pub fn hash_exists_sync>(cache: P, sri: &Integrity) -> bool { read::has_content(cache.as_ref(), &sri).is_some() } diff --git a/src/lib.rs b/src/lib.rs index 6ef43da..f924cd7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,27 +4,28 @@ //! //! ## Examples //! ``` +//! use tempfile; //! use cacache; -//! # use tempfile; -//! # let tmp = tempfile::tempdir().unwrap(); +//! use async_std::task; +//! +//! let tmp = tempfile::tempdir().unwrap(); //! let dir = tmp.path().to_owned(); -//! cacache::put::data(&dir, "key", b"my-data").unwrap(); -//! let data = cacache::get::read(&dir, "key").unwrap(); -//! assert_eq!(data, b"my-data"); +//! task::block_on(async { +//! cacache::put::data(&dir, "key", b"my-async-data").await.unwrap(); +//! let data = cacache::get::data(&dir, "key").await.unwrap(); +//! assert_eq!(data, b"my-async-data"); +//! }) //! ``` //! -//! You can also use the equivalent async APIs using async/await! +//! There are also sync APIs available if you don't want to use async/await: //! ``` -//! # use tempfile; //! use cacache; -//! # use async_std::task; +//! # use tempfile; //! # let tmp = tempfile::tempdir().unwrap(); //! let dir = tmp.path().to_owned(); -//! # task::block_on(async { -//! cacache::async_put::data(&dir, "key", b"my-async-data").await.unwrap(); -//! let data = cacache::async_get::read(&dir, "key").await.unwrap(); -//! assert_eq!(data, b"my-async-data"); -//! # }) +//! cacache::put::data_sync(&dir, "key", b"my-data").unwrap(); +//! let data = cacache::get::data_sync(&dir, "key").unwrap(); +//! assert_eq!(data, b"my-data"); //! ``` #![warn(missing_docs, missing_doc_code_examples)] @@ -40,9 +41,5 @@ pub mod ls; pub mod put; pub mod rm; -pub mod async_get; -pub mod async_put; -pub mod async_rm; - pub use errors::Error; pub use index::Entry; diff --git a/src/put.rs b/src/put.rs index c613f8f..78a2bca 100644 --- a/src/put.rs +++ b/src/put.rs @@ -1,6 +1,9 @@ //! Functions for writing to cache. use std::io::prelude::*; use std::path::{Path, PathBuf}; +use std::pin::Pin; + +use futures::prelude::*; #[cfg(unix)] use nix::unistd::{Gid, Uid}; @@ -11,8 +14,10 @@ use crate::content::write; use crate::errors::Error; use crate::index; +use std::task::{Context, Poll}; + /// Writes `data` to the `cache`, indexing it under `key`. -pub fn data(cache: P, key: K, data: D) -> Result +pub async fn data(cache: P, key: K, data: D) -> Result where P: AsRef, D: AsRef<[u8]>, @@ -20,13 +25,85 @@ where { let mut writer = PutOpts::new() .algorithm(Algorithm::Sha256) - .open(cache.as_ref(), key.as_ref())?; + .open(cache.as_ref(), key.as_ref()) + .await?; + writer.write_all(data.as_ref()).await?; + writer.commit().await +} + +/// A reference to an open file writing to the cache. +pub struct AsyncPut { + cache: PathBuf, + key: String, + written: usize, + pub(crate) writer: write::AsyncWriter, + opts: PutOpts, +} + +impl AsyncWrite for AsyncPut { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.writer).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.writer).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.writer).poll_close(cx) + } +} + +impl AsyncPut { + /// Closes the AsyncPut handle and writes content and index entries. Also + /// verifies data against `size` and `integrity` options, if provided. + /// Must be called manually in order to complete the writing process, + /// otherwise everything will be thrown out. + pub async fn commit(mut self) -> Result { + let writer_sri = self.writer.close().await?; + if let Some(sri) = &self.opts.sri { + // TODO - ssri should have a .matches method + let algo = sri.pick_algorithm(); + let matched = sri + .hashes + .iter() + .take_while(|h| h.algorithm == algo) + .find(|&h| *h == writer_sri.hashes[0]); + if matched.is_none() { + return Err(Error::IntegrityError); + } + } else { + self.opts.sri = Some(writer_sri); + } + if let Some(size) = self.opts.size { + if size != self.written { + return Err(Error::SizeError); + } + } + index::insert_async(&self.cache, &self.key, self.opts).await + } +} + +/// Writes `data` to the `cache` synchronously, indexing it under `key`. +pub fn data_sync(cache: P, key: K, data: D) -> Result +where + P: AsRef, + D: AsRef<[u8]>, + K: AsRef, +{ + let mut writer = PutOpts::new() + .algorithm(Algorithm::Sha256) + .open_sync(cache.as_ref(), key.as_ref())?; writer.write_all(data.as_ref())?; writer.flush()?; writer.commit() } -/// Builder for pptions and flags for opening a new cache file to write data into. +/// Builder for options and flags for opening a new cache file to write data into. #[derive(Clone, Default)] pub struct PutOpts { pub(crate) algorithm: Option, @@ -47,12 +124,31 @@ impl PutOpts { } /// Opens the file handle for writing, returning a Put instance. - pub fn open(self, cache: P, key: K) -> Result + pub async fn open(self, cache: P, key: K) -> Result where P: AsRef, K: AsRef, { - Ok(Put { + Ok(AsyncPut { + cache: cache.as_ref().to_path_buf(), + key: String::from(key.as_ref()), + written: 0, + writer: write::AsyncWriter::new( + cache.as_ref(), + *self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256), + ) + .await?, + opts: self, + }) + } + + /// Opens the file handle for writing synchronously, returning a Put instance. + pub fn open_sync(self, cache: P, key: K) -> Result + where + P: AsRef, + K: AsRef, + { + Ok(SyncPut { cache: cache.as_ref().to_path_buf(), key: String::from(key.as_ref()), written: 0, @@ -110,7 +206,7 @@ impl PutOpts { } /// A reference to an open file writing to the cache. -pub struct Put { +pub struct SyncPut { cache: PathBuf, key: String, written: usize, @@ -118,7 +214,7 @@ pub struct Put { opts: PutOpts, } -impl Write for Put { +impl Write for SyncPut { fn write(&mut self, buf: &[u8]) -> std::io::Result { self.writer.write(buf) } @@ -127,7 +223,7 @@ impl Write for Put { } } -impl Put { +impl SyncPut { /// Closes the Put handle and writes content and index entries. Also /// verifies data against `size` and `integrity` options, if provided. /// Must be called manually in order to complete the writing process, @@ -159,14 +255,25 @@ impl Put { #[cfg(test)] mod tests { - use super::*; + use async_std::task; #[test] fn round_trip() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); - data(&dir, "hello", b"hello").unwrap(); - let data = crate::get::data(&dir, "hello").unwrap(); + task::block_on(async { + crate::put::data(&dir, "hello", b"hello").await.unwrap(); + }); + let data = task::block_on(async { crate::get::data(&dir, "hello").await.unwrap() }); + assert_eq!(data, b"hello"); + } + + #[test] + fn round_trip_sync() { + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + crate::put::data_sync(&dir, "hello", b"hello").unwrap(); + let data = crate::get::data_sync(&dir, "hello").unwrap(); assert_eq!(data, b"hello"); } } diff --git a/src/rm.rs b/src/rm.rs index a07ae57..6a7e99d 100644 --- a/src/rm.rs +++ b/src/rm.rs @@ -2,6 +2,8 @@ use std::fs; use std::path::Path; +use async_std::fs as afs; + use ssri::Integrity; use crate::content::rm; @@ -10,19 +12,42 @@ use crate::index; /// Removes an individual index entry. The associated content will be left /// intact. -pub fn entry>(cache: P, key: &str) -> Result<(), Error> { - index::delete(cache.as_ref(), &key) +pub async fn entry>(cache: P, key: &str) -> Result<(), Error> { + index::delete_async(cache.as_ref(), &key).await } /// Removes an individual content entry. Any index entries pointing to this /// content will become invalidated. -pub fn content>(cache: P, sri: &Integrity) -> Result<(), Error> { - rm::rm(cache.as_ref(), &sri) +pub async fn content>(cache: P, sri: &Integrity) -> Result<(), Error> { + rm::rm_async(cache.as_ref(), &sri).await } /// Removes entire contents of the cache, including temporary files, the entry /// index, and all content data. -pub fn all>(cache: P) -> Result<(), Error> { +pub async fn all>(cache: P) -> Result<(), Error> { + for entry in cache.as_ref().read_dir()? { + if let Ok(entry) = entry { + afs::remove_dir_all(entry.path()).await?; + } + } + Ok(()) +} + +/// Removes an individual index entry synchronously. The associated content +/// will be left intact. +pub fn entry_sync>(cache: P, key: &str) -> Result<(), Error> { + index::delete(cache.as_ref(), &key) +} + +/// Removes an individual content entry synchronously. Any index entries +/// pointing to this content will become invalidated. +pub fn content_sync>(cache: P, sri: &Integrity) -> Result<(), Error> { + rm::rm(cache.as_ref(), &sri) +} + +/// Removes entire contents of the cache synchronously, including temporary files, the entry +/// index, and all content data. +pub fn all_sync>(cache: P) -> Result<(), Error> { for entry in cache.as_ref().read_dir()? { if let Ok(entry) = entry { fs::remove_dir_all(entry.path())?; @@ -34,17 +59,17 @@ pub fn all>(cache: P) -> Result<(), Error> { #[cfg(test)] mod tests { #[test] - fn all() { + fn all_sync() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); - let sri = crate::put::data(&dir, "key", b"my-data").unwrap(); + let sri = crate::put::data_sync(&dir, "key", b"my-data").unwrap(); - crate::rm::all(&dir).unwrap(); + crate::rm::all_sync(&dir).unwrap(); - let new_entry = crate::get::info(&dir, "key").unwrap(); + let new_entry = crate::get::info_sync(&dir, "key").unwrap(); assert_eq!(new_entry, None); - let data_exists = crate::get::hash_exists(&dir, &sri); + let data_exists = crate::get::hash_exists_sync(&dir, &sri); assert_eq!(data_exists, false); } @@ -52,14 +77,14 @@ mod tests { fn entry() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); - let sri = crate::put::data(&dir, "key", b"my-data").unwrap(); + let sri = crate::put::data_sync(&dir, "key", b"my-data").unwrap(); - crate::rm::entry(&dir, "key").unwrap(); + crate::rm::entry_sync(&dir, "key").unwrap(); - let new_entry = crate::get::info(&dir, "key").unwrap(); + let new_entry = crate::get::info_sync(&dir, "key").unwrap(); assert_eq!(new_entry, None); - let data_exists = crate::get::hash_exists(&dir, &sri); + let data_exists = crate::get::hash_exists_sync(&dir, &sri); assert_eq!(data_exists, true); } }