feat(async): reorganize async APIs to be the primary APIs

BREAKING CHANGE: the async_* namespaces are gone, and all the previously-sync APIs (get::data, put::data, etc), are all suffixed with _sync now. You'll need to adjust your usage accordingly.
This commit is contained in:
Kat Marchán 2019-10-16 18:32:31 -04:00
parent b02f41e07f
commit 662aea9b5a
9 changed files with 350 additions and 396 deletions

View File

@ -1,19 +1,27 @@
# cacache
A Rust port of [`cacache` for Node.js](https://npm.im/cacache).
A high-performance, concurrent, content-addressable disk cache.
A high-performance, concurrent, content-addressable disk cache, optimized for async APIs.
## Example
```rust
use cacache;
use tempfile;
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
cacache::put::data(&dir, "key", b"my-data").unwrap();
let data = cacache::get::read(&dir, "key").unwrap();
assert_eq!(data, b"my-data");
use async_attributes;
#[async_attributes::main]
async fn main() -> Result<(), cacache::Error> {
let dir = String::from("./my-cache");
// Write some data!
cacache::put::data(&dir, "key", b"my-async-data").await?;
// Get the data back!
let data = cacache::get::data(&dir, "key").await?;
assert_eq!(data, b"my-async-data");
// Clean up the data!
cacache::rm::all(&dir).await?;
}
```
## Install
@ -28,6 +36,7 @@ Using [`cargo-edit`](https://crates.io/crates/cargo-edit)
## Features
- First-class async support, using [`async-std`](https://crates.io/crates/async-std) as its runtime. Sync APIs are available but secondary.
- Extraction by key or by content address (shasum, etc)
- [Subresource Integrity](#integrity) web standard support
- Multi-hash support - safely host sha1, sha512, etc, in a single cache

View File

@ -3,92 +3,82 @@ use cacache;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tempfile;
fn get_hash(c: &mut Criterion) {
fn get_data_hash_sync(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("get_hash", move |b| {
b.iter(|| cacache::get::data_hash(black_box(&cache), black_box(&sri)).unwrap())
let sri = cacache::put::data_sync(&cache, "hello", data).unwrap();
c.bench_function("get::data_hash_sync", move |b| {
b.iter(|| cacache::get::data_hash_sync(black_box(&cache), black_box(&sri)).unwrap())
});
}
fn get(c: &mut Criterion) {
fn get_data_sync(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
cacache::put::data(&cache, "hello", data).unwrap();
cacache::get::data(&cache, "hello").unwrap();
c.bench_function("get", move |b| {
b.iter(|| cacache::get::data(black_box(&cache), black_box(String::from("hello"))).unwrap())
cacache::put::data_sync(&cache, "hello", data).unwrap();
cacache::get::data_sync(&cache, "hello").unwrap();
c.bench_function("get::data_sync", move |b| {
b.iter(|| {
cacache::get::data_sync(black_box(&cache), black_box(String::from("hello"))).unwrap()
})
});
}
fn get_hash_big_data(c: &mut Criterion) {
fn get_data_hash_sync_big_data(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = vec![1; 1024 * 1024 * 5];
let sri = cacache::put::data(&cache, "hello", data).unwrap();
let sri = cacache::put::data_sync(&cache, "hello", data).unwrap();
c.bench_function("get_hash_big_data", move |b| {
b.iter(|| cacache::get::data_hash(black_box(&cache), black_box(&sri)).unwrap())
b.iter(|| cacache::get::data_hash_sync(black_box(&cache), black_box(&sri)).unwrap())
});
}
fn async_get_hash(c: &mut Criterion) {
fn get_data_hash_async(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_get_hash", move |b| {
let sri = cacache::put::data_sync(&cache, "hello", data).unwrap();
c.bench_function("get::data_hash", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::data_hash(
black_box(&cache),
black_box(&sri),
))
.unwrap()
task::block_on(cacache::get::data_hash(black_box(&cache), black_box(&sri))).unwrap()
})
});
}
fn async_get(c: &mut Criterion) {
fn get_data_async(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_get", move |b| {
cacache::put::data_sync(&cache, "hello", data).unwrap();
c.bench_function("get::data", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::data(
black_box(&cache),
black_box("hello"),
))
.unwrap()
task::block_on(cacache::get::data(black_box(&cache), black_box("hello"))).unwrap()
})
});
}
fn async_get_hash_big_data(c: &mut Criterion) {
fn get_data_hash_async_big_data(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = vec![1; 1024 * 1024 * 5];
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_get_hash_big_data", move |b| {
let sri = cacache::put::data_sync(&cache, "hello", data).unwrap();
c.bench_function("get::data_big_data", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::data_hash(
black_box(&cache),
black_box(&sri),
))
.unwrap()
task::block_on(cacache::get::data_hash(black_box(&cache), black_box(&sri))).unwrap()
})
});
}
criterion_group!(
benches,
get_hash,
get,
async_get_hash,
async_get,
get_hash_big_data,
async_get_hash_big_data,
get_data_hash_async,
get_data_hash_sync,
get_data_async,
get_data_sync,
get_data_hash_async_big_data,
get_data_hash_sync_big_data
);
criterion_main!(benches);

View File

@ -1,128 +0,0 @@
//! Functions for reading asynchronously from cache.
//!
//! Asynchronous operations are able to trade off some linear performance in
//! exchange for potentially much higher performance on heavily-concurrent
//! loads.
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::prelude::*;
use ssri::{Algorithm, Integrity};
use crate::content::read::{self, AsyncReader};
use crate::errors::Error;
use crate::index::{self, Entry};
/// File handle for asynchronously reading from a content entry.
///
/// Make sure to call `.check()` when done reading to verify that the
/// extracted data passes integrity verification.
pub struct AsyncGet {
reader: AsyncReader,
}
impl AsyncRead for AsyncGet {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}
impl AsyncGet {
/// Checks that data read from disk passes integrity checks. Returns the
/// algorithm that was used verified the data. Should be called only after
/// all data has been read from disk.
pub fn check(self) -> Result<Algorithm, Error> {
self.reader.check()
}
}
/// Opens a new file handle into the cache, looking it up in the index using
/// `key`.
pub async fn open<P, K>(cache: P, key: K) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
open_hash(cache, entry.integrity).await
} else {
Err(Error::NotFound)
}
}
/// Opens a new file handle into the cache, based on its integrity address.
pub async fn open_hash<P>(cache: P, sri: Integrity) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
{
Ok(AsyncGet {
reader: read::open_async(cache.as_ref(), sri).await?,
})
}
/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by key.
pub async fn data<P, K>(cache: P, key: K) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
data_hash(cache, &entry.integrity).await
} else {
Err(Error::NotFound)
}
}
/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by its content address.
pub async fn data_hash<P>(cache: P, sri: &Integrity) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
{
Ok(read::read_async(cache.as_ref(), sri).await?)
}
/// Copies a cache entry by key to a specified location.
pub async fn copy<P, K, Q>(cache: P, key: K, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
Q: AsRef<Path>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
copy_hash(cache, &entry.integrity, to).await
} else {
Err(Error::NotFound)
}
}
/// Copies a cache entry by integrity address to a specified location.
pub async fn copy_hash<P, Q>(cache: P, sri: &Integrity, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
Q: AsRef<Path>,
{
read::copy_async(cache.as_ref(), sri, to.as_ref()).await
}
/// Gets entry information and metadata for a certain key.
pub async fn info<P, K>(cache: P, key: K) -> Result<Option<Entry>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
index::find_async(cache.as_ref(), key.as_ref()).await
}
/// Returns true if the given hash exists in the cache.
pub async fn hash_exists<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool {
read::has_content_async(cache.as_ref(), &sri)
.await
.is_some()
}

View File

@ -1,127 +0,0 @@
//! Functions for asynchronously writing to cache.
//!
//! Asynchronous operations are able to trade off some linear performance in
//! exchange for potentially much higher performance on heavily-concurrent
//! loads.
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::prelude::*;
use ssri::{Algorithm, Integrity};
use crate::content::write;
use crate::errors::Error;
use crate::index;
pub use crate::put::PutOpts;
/// Writes `data` to the `cache`, indexing it under `key`.
pub async fn data<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity, Error>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
K: AsRef<str>,
{
let mut writer = PutOpts::new()
.algorithm(Algorithm::Sha256)
.open_async(cache.as_ref(), key.as_ref())
.await?;
writer.write_all(data.as_ref()).await?;
writer.commit().await
}
impl PutOpts {
/// Opens the file handle for writing, returning a Put instance.
pub async fn open_async<P, K>(self, cache: P, key: K) -> Result<AsyncPut, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
Ok(AsyncPut {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
written: 0,
writer: write::AsyncWriter::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
)
.await?,
opts: self,
})
}
}
/// A reference to an open file writing to the cache.
pub struct AsyncPut {
cache: PathBuf,
key: String,
written: usize,
pub(crate) writer: write::AsyncWriter,
opts: PutOpts,
}
impl AsyncWrite for AsyncPut {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.writer).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_close(cx)
}
}
impl AsyncPut {
/// Closes the Put handle and writes content and index entries. Also
/// verifies data against `size` and `integrity` options, if provided.
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub async fn commit(mut self) -> Result<Integrity, Error> {
let writer_sri = self.writer.close().await?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
let algo = sri.pick_algorithm();
let matched = sri
.hashes
.iter()
.take_while(|h| h.algorithm == algo)
.find(|&h| *h == writer_sri.hashes[0]);
if matched.is_none() {
return Err(Error::IntegrityError);
}
} else {
self.opts.sri = Some(writer_sri);
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError);
}
}
index::insert_async(&self.cache, &self.key, self.opts).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::async_get;
use async_std::task;
#[test]
fn round_trip() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
task::block_on(async {
data(&dir, "hello", b"hello").await.unwrap();
});
let data = task::block_on(async { async_get::data(&dir, "hello").await.unwrap() });
assert_eq!(data, b"hello");
}
}

View File

@ -1,36 +0,0 @@
//! Functions for asynchronously removing things from the cache.
//!
//! Asynchronous operations are able to trade off some linear performance in
//! exchange for potentially much higher performance on heavily-concurrent
//! loads.
use std::path::Path;
use async_std::fs as afs;
use ssri::Integrity;
use crate::content::rm;
use crate::errors::Error;
use crate::index;
/// Removes an individual index entry. The associated content will be left
/// intact.
pub async fn entry<P: AsRef<Path>>(cache: P, key: &str) -> Result<(), Error> {
index::delete_async(cache.as_ref(), &key).await
}
/// Removes an individual content entry. Any index entries pointing to this
/// content will become invalidated.
pub async fn content<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<(), Error> {
rm::rm_async(cache.as_ref(), &sri).await
}
/// Removes entire contents of the cache, including temporary files, the entry
/// index, and all content data.
pub async fn all<P: AsRef<Path>>(cache: P) -> Result<(), Error> {
for entry in cache.as_ref().read_dir()? {
if let Ok(entry) = entry {
afs::remove_dir_all(entry.path()).await?;
}
}
Ok(())
}

View File

@ -1,12 +1,129 @@
//! Functions for reading from cache.
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::prelude::*;
use ssri::{Algorithm, Integrity};
use crate::content::read::{self, Reader};
use crate::content::read::{self, AsyncReader, Reader};
use crate::errors::Error;
use crate::index::{self, Entry};
/// File handle for asynchronously reading from a content entry.
///
/// Make sure to call `.check()` when done reading to verify that the
/// extracted data passes integrity verification.
pub struct AsyncGet {
reader: AsyncReader,
}
impl AsyncRead for AsyncGet {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}
impl AsyncGet {
/// Checks that data read from disk passes integrity checks. Returns the
/// algorithm that was used verified the data. Should be called only after
/// all data has been read from disk.
pub fn check(self) -> Result<Algorithm, Error> {
self.reader.check()
}
}
/// Opens a new file handle into the cache, looking it up in the index using
/// `key`.
pub async fn open<P, K>(cache: P, key: K) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
open_hash(cache, entry.integrity).await
} else {
Err(Error::NotFound)
}
}
/// Opens a new file handle into the cache, based on its integrity address.
pub async fn open_hash<P>(cache: P, sri: Integrity) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
{
Ok(AsyncGet {
reader: read::open_async(cache.as_ref(), sri).await?,
})
}
/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by key.
pub async fn data<P, K>(cache: P, key: K) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
data_hash(cache, &entry.integrity).await
} else {
Err(Error::NotFound)
}
}
/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by its content address.
pub async fn data_hash<P>(cache: P, sri: &Integrity) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
{
Ok(read::read_async(cache.as_ref(), sri).await?)
}
/// Copies a cache entry by key to a specified location.
pub async fn copy<P, K, Q>(cache: P, key: K, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
Q: AsRef<Path>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
copy_hash(cache, &entry.integrity, to).await
} else {
Err(Error::NotFound)
}
}
/// Copies a cache entry by integrity address to a specified location.
pub async fn copy_hash<P, Q>(cache: P, sri: &Integrity, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
Q: AsRef<Path>,
{
read::copy_async(cache.as_ref(), sri, to.as_ref()).await
}
/// Gets entry information and metadata for a certain key.
pub async fn info<P, K>(cache: P, key: K) -> Result<Option<Entry>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
index::find_async(cache.as_ref(), key.as_ref()).await
}
/// Returns true if the given hash exists in the cache.
pub async fn hash_exists<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool {
read::has_content_async(cache.as_ref(), &sri)
.await
.is_some()
}
/// File handle for reading from a content entry.
///
/// Make sure to call `get.check()` when done reading
@ -33,20 +150,20 @@ impl Get {
/// Opens a new file handle into the cache, looking it up in the index using
/// `key`.
pub fn open<P, K>(cache: P, key: K) -> Result<Get, Error>
pub fn open_sync<P, K>(cache: P, key: K) -> Result<Get, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? {
open_hash(cache, entry.integrity)
open_hash_sync(cache, entry.integrity)
} else {
Err(Error::NotFound)
}
}
/// Opens a new file handle into the cache, based on its integrity address.
pub fn open_hash<P>(cache: P, sri: Integrity) -> Result<Get, Error>
pub fn open_hash_sync<P>(cache: P, sri: Integrity) -> Result<Get, Error>
where
P: AsRef<Path>,
{
@ -57,13 +174,13 @@ where
/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by key.
pub fn data<P, K>(cache: P, key: K) -> Result<Vec<u8>, Error>
pub fn data_sync<P, K>(cache: P, key: K) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? {
data_hash(cache, &entry.integrity)
data_hash_sync(cache, &entry.integrity)
} else {
Err(Error::NotFound)
}
@ -71,7 +188,7 @@ where
/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by its content address.
pub fn data_hash<P>(cache: P, sri: &Integrity) -> Result<Vec<u8>, Error>
pub fn data_hash_sync<P>(cache: P, sri: &Integrity) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
{
@ -79,21 +196,21 @@ where
}
/// Copies a cache entry by key to a specified location.
pub fn copy<P, K, Q>(cache: P, key: K, to: Q) -> Result<u64, Error>
pub fn copy_sync<P, K, Q>(cache: P, key: K, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
Q: AsRef<Path>,
{
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? {
copy_hash(cache, &entry.integrity, to)
copy_hash_sync(cache, &entry.integrity, to)
} else {
Err(Error::NotFound)
}
}
/// Copies a cache entry by integrity address to a specified location.
pub fn copy_hash<P, Q>(cache: P, sri: &Integrity, to: Q) -> Result<u64, Error>
pub fn copy_hash_sync<P, Q>(cache: P, sri: &Integrity, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
Q: AsRef<Path>,
@ -102,7 +219,7 @@ where
}
/// Gets entry information and metadata for a certain key.
pub fn info<P, K>(cache: P, key: K) -> Result<Option<Entry>, Error>
pub fn info_sync<P, K>(cache: P, key: K) -> Result<Option<Entry>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
@ -111,6 +228,6 @@ where
}
/// Returns true if the given hash exists in the cache.
pub fn hash_exists<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool {
pub fn hash_exists_sync<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool {
read::has_content(cache.as_ref(), &sri).is_some()
}

View File

@ -4,27 +4,28 @@
//!
//! ## Examples
//! ```
//! use tempfile;
//! use cacache;
//! # use tempfile;
//! # let tmp = tempfile::tempdir().unwrap();
//! use async_std::task;
//!
//! let tmp = tempfile::tempdir().unwrap();
//! let dir = tmp.path().to_owned();
//! cacache::put::data(&dir, "key", b"my-data").unwrap();
//! let data = cacache::get::read(&dir, "key").unwrap();
//! assert_eq!(data, b"my-data");
//! task::block_on(async {
//! cacache::put::data(&dir, "key", b"my-async-data").await.unwrap();
//! let data = cacache::get::data(&dir, "key").await.unwrap();
//! assert_eq!(data, b"my-async-data");
//! })
//! ```
//!
//! You can also use the equivalent async APIs using async/await!
//! There are also sync APIs available if you don't want to use async/await:
//! ```
//! # use tempfile;
//! use cacache;
//! # use async_std::task;
//! # use tempfile;
//! # let tmp = tempfile::tempdir().unwrap();
//! let dir = tmp.path().to_owned();
//! # task::block_on(async {
//! cacache::async_put::data(&dir, "key", b"my-async-data").await.unwrap();
//! let data = cacache::async_get::read(&dir, "key").await.unwrap();
//! assert_eq!(data, b"my-async-data");
//! # })
//! cacache::put::data_sync(&dir, "key", b"my-data").unwrap();
//! let data = cacache::get::data_sync(&dir, "key").unwrap();
//! assert_eq!(data, b"my-data");
//! ```
#![warn(missing_docs, missing_doc_code_examples)]
@ -40,9 +41,5 @@ pub mod ls;
pub mod put;
pub mod rm;
pub mod async_get;
pub mod async_put;
pub mod async_rm;
pub use errors::Error;
pub use index::Entry;

View File

@ -1,6 +1,9 @@
//! Functions for writing to cache.
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use futures::prelude::*;
#[cfg(unix)]
use nix::unistd::{Gid, Uid};
@ -11,8 +14,10 @@ use crate::content::write;
use crate::errors::Error;
use crate::index;
use std::task::{Context, Poll};
/// Writes `data` to the `cache`, indexing it under `key`.
pub fn data<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity, Error>
pub async fn data<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity, Error>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
@ -20,13 +25,85 @@ where
{
let mut writer = PutOpts::new()
.algorithm(Algorithm::Sha256)
.open(cache.as_ref(), key.as_ref())?;
.open(cache.as_ref(), key.as_ref())
.await?;
writer.write_all(data.as_ref()).await?;
writer.commit().await
}
/// A reference to an open file writing to the cache.
pub struct AsyncPut {
cache: PathBuf,
key: String,
written: usize,
pub(crate) writer: write::AsyncWriter,
opts: PutOpts,
}
impl AsyncWrite for AsyncPut {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.writer).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_close(cx)
}
}
impl AsyncPut {
/// Closes the AsyncPut handle and writes content and index entries. Also
/// verifies data against `size` and `integrity` options, if provided.
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub async fn commit(mut self) -> Result<Integrity, Error> {
let writer_sri = self.writer.close().await?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
let algo = sri.pick_algorithm();
let matched = sri
.hashes
.iter()
.take_while(|h| h.algorithm == algo)
.find(|&h| *h == writer_sri.hashes[0]);
if matched.is_none() {
return Err(Error::IntegrityError);
}
} else {
self.opts.sri = Some(writer_sri);
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError);
}
}
index::insert_async(&self.cache, &self.key, self.opts).await
}
}
/// Writes `data` to the `cache` synchronously, indexing it under `key`.
pub fn data_sync<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity, Error>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
K: AsRef<str>,
{
let mut writer = PutOpts::new()
.algorithm(Algorithm::Sha256)
.open_sync(cache.as_ref(), key.as_ref())?;
writer.write_all(data.as_ref())?;
writer.flush()?;
writer.commit()
}
/// Builder for pptions and flags for opening a new cache file to write data into.
/// Builder for options and flags for opening a new cache file to write data into.
#[derive(Clone, Default)]
pub struct PutOpts {
pub(crate) algorithm: Option<Algorithm>,
@ -47,12 +124,31 @@ impl PutOpts {
}
/// Opens the file handle for writing, returning a Put instance.
pub fn open<P, K>(self, cache: P, key: K) -> Result<Put, Error>
pub async fn open<P, K>(self, cache: P, key: K) -> Result<AsyncPut, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
Ok(Put {
Ok(AsyncPut {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
written: 0,
writer: write::AsyncWriter::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
)
.await?,
opts: self,
})
}
/// Opens the file handle for writing synchronously, returning a Put instance.
pub fn open_sync<P, K>(self, cache: P, key: K) -> Result<SyncPut, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
Ok(SyncPut {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
written: 0,
@ -110,7 +206,7 @@ impl PutOpts {
}
/// A reference to an open file writing to the cache.
pub struct Put {
pub struct SyncPut {
cache: PathBuf,
key: String,
written: usize,
@ -118,7 +214,7 @@ pub struct Put {
opts: PutOpts,
}
impl Write for Put {
impl Write for SyncPut {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.writer.write(buf)
}
@ -127,7 +223,7 @@ impl Write for Put {
}
}
impl Put {
impl SyncPut {
/// Closes the Put handle and writes content and index entries. Also
/// verifies data against `size` and `integrity` options, if provided.
/// Must be called manually in order to complete the writing process,
@ -159,14 +255,25 @@ impl Put {
#[cfg(test)]
mod tests {
use super::*;
use async_std::task;
#[test]
fn round_trip() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
data(&dir, "hello", b"hello").unwrap();
let data = crate::get::data(&dir, "hello").unwrap();
task::block_on(async {
crate::put::data(&dir, "hello", b"hello").await.unwrap();
});
let data = task::block_on(async { crate::get::data(&dir, "hello").await.unwrap() });
assert_eq!(data, b"hello");
}
#[test]
fn round_trip_sync() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
crate::put::data_sync(&dir, "hello", b"hello").unwrap();
let data = crate::get::data_sync(&dir, "hello").unwrap();
assert_eq!(data, b"hello");
}
}

View File

@ -2,6 +2,8 @@
use std::fs;
use std::path::Path;
use async_std::fs as afs;
use ssri::Integrity;
use crate::content::rm;
@ -10,19 +12,42 @@ use crate::index;
/// Removes an individual index entry. The associated content will be left
/// intact.
pub fn entry<P: AsRef<Path>>(cache: P, key: &str) -> Result<(), Error> {
index::delete(cache.as_ref(), &key)
pub async fn entry<P: AsRef<Path>>(cache: P, key: &str) -> Result<(), Error> {
index::delete_async(cache.as_ref(), &key).await
}
/// Removes an individual content entry. Any index entries pointing to this
/// content will become invalidated.
pub fn content<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<(), Error> {
rm::rm(cache.as_ref(), &sri)
pub async fn content<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<(), Error> {
rm::rm_async(cache.as_ref(), &sri).await
}
/// Removes entire contents of the cache, including temporary files, the entry
/// index, and all content data.
pub fn all<P: AsRef<Path>>(cache: P) -> Result<(), Error> {
pub async fn all<P: AsRef<Path>>(cache: P) -> Result<(), Error> {
for entry in cache.as_ref().read_dir()? {
if let Ok(entry) = entry {
afs::remove_dir_all(entry.path()).await?;
}
}
Ok(())
}
/// Removes an individual index entry synchronously. The associated content
/// will be left intact.
pub fn entry_sync<P: AsRef<Path>>(cache: P, key: &str) -> Result<(), Error> {
index::delete(cache.as_ref(), &key)
}
/// Removes an individual content entry synchronously. Any index entries
/// pointing to this content will become invalidated.
pub fn content_sync<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<(), Error> {
rm::rm(cache.as_ref(), &sri)
}
/// Removes entire contents of the cache synchronously, including temporary files, the entry
/// index, and all content data.
pub fn all_sync<P: AsRef<Path>>(cache: P) -> Result<(), Error> {
for entry in cache.as_ref().read_dir()? {
if let Ok(entry) = entry {
fs::remove_dir_all(entry.path())?;
@ -34,17 +59,17 @@ pub fn all<P: AsRef<Path>>(cache: P) -> Result<(), Error> {
#[cfg(test)]
mod tests {
#[test]
fn all() {
fn all_sync() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri = crate::put::data(&dir, "key", b"my-data").unwrap();
let sri = crate::put::data_sync(&dir, "key", b"my-data").unwrap();
crate::rm::all(&dir).unwrap();
crate::rm::all_sync(&dir).unwrap();
let new_entry = crate::get::info(&dir, "key").unwrap();
let new_entry = crate::get::info_sync(&dir, "key").unwrap();
assert_eq!(new_entry, None);
let data_exists = crate::get::hash_exists(&dir, &sri);
let data_exists = crate::get::hash_exists_sync(&dir, &sri);
assert_eq!(data_exists, false);
}
@ -52,14 +77,14 @@ mod tests {
fn entry() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri = crate::put::data(&dir, "key", b"my-data").unwrap();
let sri = crate::put::data_sync(&dir, "key", b"my-data").unwrap();
crate::rm::entry(&dir, "key").unwrap();
crate::rm::entry_sync(&dir, "key").unwrap();
let new_entry = crate::get::info(&dir, "key").unwrap();
let new_entry = crate::get::info_sync(&dir, "key").unwrap();
assert_eq!(new_entry, None);
let data_exists = crate::get::hash_exists(&dir, &sri);
let data_exists = crate::get::hash_exists_sync(&dir, &sri);
assert_eq!(data_exists, true);
}
}