feat(async): add extra async versions of APIs (#6)

This commit is contained in:
Kat Marchán 2019-10-13 20:06:20 -04:00 committed by GitHub
parent 642e40b47c
commit 18190bfc35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1543 additions and 235 deletions

View File

@ -1,26 +1,29 @@
name: Rust
on: [push]
on: [push, pull_request]
jobs:
build:
runs-on: ${{ matrix.os }}
strategy:
matrix:
rust: [stable, beta, nightly]
rust: [beta, nightly]
os: [ubuntu-latest, windows-latest]
steps:
- uses: hecrj/setup-rust-action@master
with:
rust-version: ${{ matrix.rust }}
- uses: actions/checkout@v1
- name: Components
run: rustup component add clippy
- name: Build
run: cargo build --verbose
- name: Clippy
run: cargo clippy -- -D warnings
- name: Run tests
run: cargo test --verbose
- uses: hecrj/setup-rust-action@master
with:
rust-version: ${{ matrix.rust }}
- uses: actions/checkout@v1
- name: Add Clippy
run: rustup component add clippy
- name: Add Rustfmt
run: rustup component add rustfmt
- name: Build
run: cargo build --verbose
- name: Rustfmt
run: cargo fmt --all -- --check
- name: Clippy
run: cargo clippy -- -D warnings
- name: Run tests
run: cargo test --verbose

808
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -31,6 +31,8 @@ failure = "0.1.5"
walkdir = "2.2.7"
either = "1.5.2"
mkdirp = "1.0.0"
futures-preview = "0.3.0-alpha.18"
async-std = { version = "0.99.9", features = ["unstable"]}
[target.'cfg(unix)'.dependencies]
chownr = "2.0.0"

View File

@ -2,19 +2,97 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
use async_std::task;
use cacache;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use tempfile;
fn get(c: &mut Criterion) {
fn read_hash(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("read_hash", move |b| {
b.iter(|| cacache::get::read_hash(black_box(&cache), black_box(&sri)))
b.iter(|| cacache::get::read_hash(black_box(&cache), black_box(&sri)).unwrap())
});
}
criterion_group!(benches, get);
fn read(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
cacache::put::data(&cache, "hello", data).unwrap();
cacache::get::read(&cache, "hello").unwrap();
c.bench_function("read", move |b| {
b.iter(|| cacache::get::read(black_box(&cache), black_box(String::from("hello"))).unwrap())
});
}
fn read_hash_big_data(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = vec![1; 1024 * 1024 * 5];
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("read_hash_big_data", move |b| {
b.iter(|| cacache::get::read_hash(black_box(&cache), black_box(&sri)).unwrap())
});
}
fn async_read_hash(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_read_hash", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::read_hash(
black_box(&cache),
black_box(&sri),
))
.unwrap()
})
});
}
fn async_read(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = b"hello world".to_vec();
cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_read", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::read(
black_box(&cache),
black_box("hello"),
))
.unwrap()
})
});
}
fn async_read_hash_big_data(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
let data = vec![1; 1024 * 1024 * 5];
let sri = cacache::put::data(&cache, "hello", data).unwrap();
c.bench_function("async_read_hash_big_data", move |b| {
b.iter(|| {
task::block_on(cacache::async_get::read_hash(
black_box(&cache),
black_box(&sri),
))
.unwrap()
})
});
}
criterion_group!(
benches,
read_hash,
read,
async_read_hash,
async_read,
read_hash_big_data,
async_read_hash_big_data,
);
criterion_main!(benches);

134
src/async_get.rs Normal file
View File

@ -0,0 +1,134 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
//! Functions for reading asynchronously from cache.
//!
//! Asynchronous operations are able to trade off some linear performance in
//! exchange for potentially much higher performance on heavily-concurrent
//! loads.
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::prelude::*;
use ssri::{Algorithm, Integrity};
use crate::content::read::{self, AsyncReader};
use crate::errors::Error;
use crate::index::{self, Entry};
/// File handle for asynchronously reading from a content entry.
///
/// Make sure to call `.check()` when done reading to verify that the
/// extracted data passes integrity verification.
pub struct AsyncGet {
reader: AsyncReader,
}
impl AsyncRead for AsyncGet {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
}
impl AsyncGet {
/// Checks that data read from disk passes integrity checks. Returns the
/// algorithm that was used verified the data. Should be called only after
/// all data has been read from disk.
pub fn check(self) -> Result<Algorithm, Error> {
self.reader.check()
}
}
/// Opens a new file handle into the cache, looking it up in the index using
/// `key`.
pub async fn open<P, K>(cache: P, key: K) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
open_hash(cache, entry.integrity).await
} else {
Err(Error::NotFound)
}
}
/// Opens a new file handle into the cache, based on its integrity address.
pub async fn open_hash<P>(cache: P, sri: Integrity) -> Result<AsyncGet, Error>
where
P: AsRef<Path>,
{
Ok(AsyncGet {
reader: read::open_async(cache.as_ref(), sri).await?,
})
}
/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by key.
pub async fn read<P, K>(cache: P, key: K) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
read_hash(cache, &entry.integrity).await
} else {
Err(Error::NotFound)
}
}
/// Reads the entire contents of a cache file into a bytes vector, looking the
/// data up by its content address.
#[allow(clippy::needless_lifetimes)]
pub async fn read_hash<P>(cache: P, sri: &Integrity) -> Result<Vec<u8>, Error>
where
P: AsRef<Path>,
{
Ok(read::read_async(cache.as_ref(), sri).await?)
}
/// Copies a cache entry by key to a specified location.
pub async fn copy<P, K, Q>(cache: P, key: K, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
Q: AsRef<Path>,
{
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
copy_hash(cache, &entry.integrity, to).await
} else {
Err(Error::NotFound)
}
}
/// Copies a cache entry by integrity address to a specified location.
#[allow(clippy::needless_lifetimes)]
pub async fn copy_hash<P, Q>(cache: P, sri: &Integrity, to: Q) -> Result<u64, Error>
where
P: AsRef<Path>,
Q: AsRef<Path>,
{
read::copy_async(cache.as_ref(), sri, to.as_ref()).await
}
/// Gets entry information and metadata for a certain key.
pub async fn info<P, K>(cache: P, key: K) -> Result<Option<Entry>, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
index::find_async(cache.as_ref(), key.as_ref()).await
}
/// Returns true if the given hash exists in the cache.
pub async fn hash_exists<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool {
read::has_content_async(cache.as_ref(), &sri)
.await
.is_some()
}

131
src/async_put.rs Normal file
View File

@ -0,0 +1,131 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
//! Functions for asynchronously writing to cache.
//!
//! Asynchronous operations are able to trade off some linear performance in
//! exchange for potentially much higher performance on heavily-concurrent
//! loads.
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::prelude::*;
use ssri::{Algorithm, Integrity};
use crate::content::write;
use crate::errors::Error;
use crate::index;
pub use crate::put::PutOpts;
/// Writes `data` to the `cache`, indexing it under `key`.
pub async fn data<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity, Error>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
K: AsRef<str>,
{
let mut writer = PutOpts::new()
.algorithm(Algorithm::Sha256)
.open_async(cache.as_ref(), key.as_ref())
.await?;
writer.write_all(data.as_ref()).await?;
writer.commit().await
}
impl PutOpts {
/// Opens the file handle for writing, returning a Put instance.
pub async fn open_async<P, K>(self, cache: P, key: K) -> Result<AsyncPut, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
Ok(AsyncPut {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
written: 0,
writer: write::AsyncWriter::new(
cache.as_ref(),
*self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256),
)
.await?,
opts: self,
})
}
}
/// A reference to an open file writing to the cache.
pub struct AsyncPut {
cache: PathBuf,
key: String,
written: usize,
pub(crate) writer: write::AsyncWriter,
opts: PutOpts,
}
impl AsyncWrite for AsyncPut {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.writer).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_close(cx)
}
}
impl AsyncPut {
/// Closes the Put handle and writes content and index entries. Also
/// verifies data against `size` and `integrity` options, if provided.
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub async fn commit(mut self) -> Result<Integrity, Error> {
let writer_sri = self.writer.close().await?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
let algo = sri.pick_algorithm();
let matched = sri
.hashes
.iter()
.take_while(|h| h.algorithm == algo)
.find(|&h| *h == writer_sri.hashes[0]);
if matched.is_none() {
return Err(Error::IntegrityError);
}
} else {
self.opts.sri = Some(writer_sri);
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError);
}
}
index::insert_async(&self.cache, &self.key, self.opts).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::async_get;
use async_std::task;
#[test]
fn round_trip() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
task::block_on(async {
data(&dir, "hello", b"hello").await.unwrap();
});
let data = task::block_on(async { async_get::read(&dir, "hello").await.unwrap() });
assert_eq!(data, b"hello");
}
}

40
src/async_rm.rs Normal file
View File

@ -0,0 +1,40 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
//! Functions for asynchronously removing things from the cache.
//!
//! Asynchronous operations are able to trade off some linear performance in
//! exchange for potentially much higher performance on heavily-concurrent
//! loads.
use std::path::Path;
use async_std::fs as afs;
use ssri::Integrity;
use crate::content::rm;
use crate::errors::Error;
use crate::index;
/// Removes an individual index entry. The associated content will be left
/// intact.
pub async fn entry<P: AsRef<Path>>(cache: P, key: &str) -> Result<(), Error> {
index::delete_async(cache.as_ref(), &key).await
}
/// Removes an individual content entry. Any index entries pointing to this
/// content will become invalidated.
pub async fn content<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<(), Error> {
rm::rm_async(cache.as_ref(), &sri).await
}
/// Removes entire contents of the cache, including temporary files, the entry
/// index, and all content data.
pub async fn all<P: AsRef<Path>>(cache: P) -> Result<(), Error> {
for entry in cache.as_ref().read_dir()? {
if let Ok(entry) = entry {
afs::remove_dir_all(entry.path()).await?;
}
}
Ok(())
}

View File

@ -41,9 +41,6 @@ mod tests {
wanted.push("b9");
wanted.push("4d");
wanted.push("27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9");
assert_eq!(
cpath.to_str().unwrap(),
wanted.to_str().unwrap()
);
assert_eq!(cpath.to_str().unwrap(), wanted.to_str().unwrap());
}
}

View File

@ -4,7 +4,11 @@
use std::fs::{self, File};
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use async_std;
use futures::prelude::*;
use ssri::{Algorithm, Integrity, IntegrityChecker};
use crate::content::path;
@ -29,6 +33,29 @@ impl Reader {
}
}
pub struct AsyncReader {
fd: async_std::fs::File,
checker: IntegrityChecker,
}
impl AsyncRead for AsyncReader {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
let amt = futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?;
self.checker.input(&buf);
Poll::Ready(Ok(amt))
}
}
impl AsyncReader {
pub fn check(self) -> Result<Algorithm, Error> {
self.checker.result().ok_or(Error::IntegrityError)
}
}
pub fn open(cache: &Path, sri: Integrity) -> Result<Reader, Error> {
Ok(Reader {
fd: File::open(cache)?,
@ -36,6 +63,14 @@ pub fn open(cache: &Path, sri: Integrity) -> Result<Reader, Error> {
})
}
#[allow(clippy::needless_lifetimes)]
pub async fn open_async(cache: &Path, sri: Integrity) -> Result<AsyncReader, Error> {
Ok(AsyncReader {
fd: async_std::fs::File::open(cache).await?,
checker: IntegrityChecker::new(sri),
})
}
pub fn read(cache: &Path, sri: &Integrity) -> Result<Vec<u8>, Error> {
let cpath = path::content_path(&cache, &sri);
let ret = fs::read(&cpath)?;
@ -46,6 +81,16 @@ pub fn read(cache: &Path, sri: &Integrity) -> Result<Vec<u8>, Error> {
}
}
pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result<Vec<u8>, Error> {
let cpath = path::content_path(&cache, &sri);
let ret = async_std::fs::read(&cpath).await?;
if sri.check(&ret).is_some() {
Ok(ret)
} else {
Err(Error::IntegrityError)
}
}
pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64, Error> {
let cpath = path::content_path(&cache, &sri);
let ret = fs::copy(&cpath, to)?;
@ -57,6 +102,21 @@ pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64, Error> {
}
}
pub async fn copy_async<'a>(
cache: &'a Path,
sri: &'a Integrity,
to: &'a Path,
) -> Result<u64, Error> {
let cpath = path::content_path(&cache, &sri);
let ret = async_std::fs::copy(&cpath, to).await?;
let data = async_std::fs::read(cpath).await?;
if sri.check(data).is_some() {
Ok(ret)
} else {
Err(Error::IntegrityError)
}
}
pub fn has_content(cache: &Path, sri: &Integrity) -> Option<Integrity> {
if path::content_path(&cache, &sri).exists() {
Some(sri.clone())
@ -64,3 +124,14 @@ pub fn has_content(cache: &Path, sri: &Integrity) -> Option<Integrity> {
None
}
}
pub async fn has_content_async(cache: &Path, sri: &Integrity) -> Option<Integrity> {
if async_std::fs::metadata(path::content_path(&cache, &sri))
.await
.is_ok()
{
Some(sri.clone())
} else {
None
}
}

View File

@ -5,6 +5,7 @@
use std::fs;
use std::path::Path;
use async_std::fs as afs;
use ssri::Integrity;
use crate::content::path;
@ -14,3 +15,8 @@ pub fn rm(cache: &Path, sri: &Integrity) -> Result<(), Error> {
fs::remove_file(path::content_path(&cache, &sri))?;
Ok(())
}
pub async fn rm_async(cache: &Path, sri: &Integrity) -> Result<(), Error> {
afs::remove_file(path::content_path(&cache, &sri)).await?;
Ok(())
}

View File

@ -5,7 +5,14 @@
use std::fs::DirBuilder;
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::Mutex;
use async_std::fs as afs;
use async_std::future::Future;
use async_std::task::{blocking, Context, JoinHandle, Poll};
use futures::io::AsyncWrite;
use futures::prelude::*;
use ssri::{Algorithm, Integrity, IntegrityOpts};
use tempfile::NamedTempFile;
@ -54,9 +61,216 @@ impl Write for Writer {
}
}
pub struct AsyncWriter(Mutex<State>);
enum State {
Idle(Option<Inner>),
Busy(JoinHandle<State>),
}
struct Inner {
cache: PathBuf,
builder: IntegrityOpts,
tmpfile: NamedTempFile,
buf: Vec<u8>,
last_op: Option<Operation>,
}
enum Operation {
Write(std::io::Result<usize>),
Flush(std::io::Result<()>),
}
impl AsyncWriter {
#[allow(clippy::new_ret_no_self)]
#[allow(clippy::needless_lifetimes)]
pub async fn new(cache: &Path, algo: Algorithm) -> Result<AsyncWriter, Error> {
let cache_path = cache.to_path_buf();
let mut tmp_path = cache_path.clone();
tmp_path.push("tmp");
afs::DirBuilder::new()
.recursive(true)
.create(&tmp_path)
.await?;
Ok(AsyncWriter(Mutex::new(State::Idle(Some(Inner {
cache: cache_path,
builder: IntegrityOpts::new().algorithm(algo),
tmpfile: blocking(async move { NamedTempFile::new_in(tmp_path) }).await?,
buf: vec![],
last_op: None,
})))))
}
pub async fn close(self) -> Result<Integrity, Error> {
// NOTE: How do I even get access to `inner` safely???
// let inner = ???;
// Blocking, but should be a very fast op.
futures::future::poll_fn(|cx| {
let state = &mut *self.0.lock().unwrap();
loop {
match state {
State::Idle(opt) => match opt.take() {
None => return Poll::Ready(None),
Some(inner) => {
let (s, r) = futures::channel::oneshot::channel();
let tmpfile = inner.tmpfile;
let sri = inner.builder.result();
let cpath = path::content_path(&inner.cache, &sri);
// Start the operation asynchronously.
*state = State::Busy(blocking(async move {
let res = afs::DirBuilder::new()
.recursive(true)
// Safe unwrap. cpath always has multiple segments
.create(cpath.parent().unwrap())
.await
.map_err(Error::Io);
if res.is_err() {
let _ = s.send(res.map(|_| sri));
} else {
let res = tmpfile.persist(cpath);
let res = res.map_err(Error::PersistError);
let _ = s.send(res.map(|_| sri));
}
State::Idle(None)
}));
return Poll::Ready(Some(r));
}
},
// Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
}
}
})
.map(|opt| opt.ok_or_else(|| io_error("file closed")))
.await?
.map_err(|_| Error::from(io_error("blocking task failed")))
.await?
}
}
impl AsyncWrite for AsyncWriter {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
let state = &mut *self.0.lock().unwrap();
loop {
match state {
State::Idle(opt) => {
// Grab a reference to the inner representation of the file or return an error
// if the file is closed.
let inner = opt.as_mut().ok_or_else(|| io_error("file closed"))?;
// Check if the operation has completed.
if let Some(Operation::Write(res)) = inner.last_op.take() {
let n = res?;
// If more data was written than is available in the buffer, let's retry
// the write operation.
if n <= buf.len() {
return Poll::Ready(Ok(n));
}
} else {
let mut inner = opt.take().unwrap();
// Set the length of the inner buffer to the length of the provided buffer.
if inner.buf.len() < buf.len() {
inner.buf.reserve(buf.len() - inner.buf.len());
}
unsafe {
inner.buf.set_len(buf.len());
}
// Copy the data to write into the inner buffer.
inner.buf[..buf.len()].copy_from_slice(buf);
// Start the operation asynchronously.
*state = State::Busy(blocking(async move {
inner.builder.input(&inner.buf);
let res = inner.tmpfile.write(&inner.buf);
inner.last_op = Some(Operation::Write(res));
State::Idle(Some(inner))
}));
}
}
// Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
}
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
let state = &mut *self.0.lock().unwrap();
loop {
match state {
State::Idle(opt) => {
// Grab a reference to the inner representation of the file or return if the
// file is closed.
let inner = match opt.as_mut() {
None => return Poll::Ready(Ok(())),
Some(s) => s,
};
// Check if the operation has completed.
if let Some(Operation::Flush(res)) = inner.last_op.take() {
return Poll::Ready(res);
} else {
let mut inner = opt.take().unwrap();
// Start the operation asynchronously.
*state = State::Busy(blocking(async move {
let res = inner.tmpfile.flush();
inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner))
}));
}
}
// Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
}
}
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
let state = &mut *self.0.lock().unwrap();
loop {
match state {
State::Idle(opt) => {
// Grab a reference to the inner representation of the file or return if the
// file is closed.
let inner = match opt.take() {
None => return Poll::Ready(Ok(())),
Some(s) => s,
};
// Start the operation asynchronously.
*state = State::Busy(blocking(async move {
drop(inner);
State::Idle(None)
}));
}
// Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
}
}
}
}
fn io_error(err: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> std::io::Error {
std::io::Error::new(std::io::ErrorKind::Other, err)
}
#[cfg(test)]
mod tests {
use super::*;
use async_std::task;
use tempfile;
#[test]
fn basic_write() {
@ -71,4 +285,20 @@ mod tests {
b"hello world"
);
}
#[test]
fn basic_async_write() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
task::block_on(async {
let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256).await.unwrap();
writer.write_all(b"hello world").await.unwrap();
let sri = writer.close().await.unwrap();
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());
assert_eq!(
std::fs::read(path::content_path(&dir, &sri)).unwrap(),
b"hello world"
);
});
}
}

View File

@ -9,10 +9,13 @@ use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use async_std::fs as afs;
use async_std::task::blocking;
#[cfg(unix)]
use chownr;
use digest::Digest;
use either::{Left, Right};
use futures::io::AsyncWriteExt;
use hex;
use mkdirp;
use serde_derive::{Deserialize, Serialize};
@ -85,7 +88,56 @@ pub fn insert(cache: &Path, key: &str, opts: PutOpts) -> Result<Integrity, Error
let mut buck = OpenOptions::new().create(true).append(true).open(&bucket)?;
write!(buck, "\n{}\t{}", hash_entry(&stringified), stringified)?;
let out = format!("\n{}\t{}", hash_entry(&stringified), stringified);
buck.write_all(out.as_bytes())?;
buck.flush()?;
#[cfg(unix)]
chownr::chownr(&bucket, opts.uid, opts.gid)?;
Ok(opts
.sri
.or_else(|| "sha1-deadbeef".parse::<Integrity>().ok())
.unwrap())
}
pub async fn insert_async<'a>(
cache: &'a Path,
key: &'a str,
opts: PutOpts,
) -> Result<Integrity, Error> {
let bucket = bucket_path(&cache, &key);
let tmpbucket = bucket.clone();
#[cfg(unix)]
let PutOpts { uid, gid, .. } = opts;
blocking(async move {
let parent = tmpbucket.parent().unwrap();
#[cfg(unix)]
{
if let Some(path) = mkdirp::mkdirp(parent)? {
chownr::chownr(&path, uid, gid)?;
}
}
#[cfg(windows)]
mkdirp::mkdirp(parent)?;
Ok::<(), Error>(())
})
.await?;
let stringified = serde_json::to_string(&SerializableEntry {
key: key.to_owned(),
integrity: opts.sri.clone().map(|x| x.to_string()),
time: opts.time.unwrap_or_else(now),
size: opts.size.unwrap_or(0),
metadata: opts.metadata.unwrap_or_else(|| json!(null)),
})?;
let mut buck = async_std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&bucket)
.await?;
let out = format!("\n{}\t{}", hash_entry(&stringified), stringified);
buck.write_all(out.as_bytes()).await?;
buck.flush().await?;
#[cfg(unix)]
chownr::chownr(&bucket, opts.uid, opts.gid)?;
Ok(opts
@ -121,6 +173,34 @@ pub fn find(cache: &Path, key: &str) -> Result<Option<Entry>, Error> {
}))
}
pub async fn find_async(cache: &Path, key: &str) -> Result<Option<Entry>, Error> {
let bucket = bucket_path(cache, &key);
Ok(bucket_entries_async(&bucket)
.await?
.into_iter()
.fold(None, |acc, entry| {
if entry.key == key {
if let Some(integrity) = entry.integrity {
let integrity: Integrity = match integrity.parse() {
Ok(sri) => sri,
_ => return acc,
};
Some(Entry {
key: entry.key,
integrity,
size: entry.size,
time: entry.time,
metadata: entry.metadata,
})
} else {
None
}
} else {
acc
}
}))
}
pub fn delete(cache: &Path, key: &str) -> Result<(), Error> {
insert(
cache,
@ -140,6 +220,25 @@ pub fn delete(cache: &Path, key: &str) -> Result<(), Error> {
.map(|_| ())
}
pub async fn delete_async(cache: &Path, key: &str) -> Result<(), Error> {
insert(
cache,
key,
PutOpts {
algorithm: None,
size: None,
sri: None,
time: None,
metadata: None,
#[cfg(unix)]
uid: None,
#[cfg(unix)]
gid: None,
},
)
.map(|_| ())
}
pub fn ls(cache: &Path) -> impl Iterator<Item = Result<Entry, Error>> {
WalkDir::new(cache.join(format!("index-v{}", INDEX_VERSION)))
.into_iter()
@ -228,9 +327,41 @@ fn bucket_entries(bucket: &Path) -> Result<Vec<SerializableEntry>, Error> {
})
}
async fn bucket_entries_async(bucket: &Path) -> Result<Vec<SerializableEntry>, Error> {
use async_std::io::BufReader;
use futures::io::AsyncBufReadExt;
use futures::stream::StreamExt;
let file_result = afs::File::open(bucket).await;
let file;
if let Err(err) = file_result {
if err.kind() == ErrorKind::NotFound {
return Ok(Vec::new());
}
return Err(err.into());
} else {
file = file_result.unwrap();
}
let mut vec = Vec::new();
let mut lines = BufReader::new(file).lines();
while let Some(line) = lines.next().await {
if let Ok(entry) = line {
let entry_str = match entry.split('\t').collect::<Vec<&str>>()[..] {
[hash, entry_str] if hash_entry(entry_str) == hash => entry_str,
// Something's wrong with the entry. Abort.
_ => continue,
};
if let Ok(serialized) = serde_json::from_str::<SerializableEntry>(entry_str) {
vec.push(serialized);
}
}
}
Ok(vec)
}
#[cfg(test)]
mod tests {
use super::*;
use async_std::task;
use tempfile;
const MOCK_ENTRY: &str = "\n251d18a2b33264ea8655695fd23c88bd874cdea2c3dc9d8f9b7596717ad30fec\t{\"key\":\"hello\",\"integrity\":\"sha1-deadbeef\",\"time\":1234567,\"size\":0,\"metadata\":null}";
@ -247,6 +378,20 @@ mod tests {
assert_eq!(entry, MOCK_ENTRY);
}
#[test]
fn insert_async_basic() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567;
let opts = PutOpts::new().integrity(sri).time(time);
task::block_on(async {
insert_async(&dir, "hello", opts).await.unwrap();
});
let entry = std::fs::read_to_string(bucket_path(&dir, "hello")).unwrap();
assert_eq!(entry, MOCK_ENTRY);
}
#[test]
fn find_basic() {
let tmp = tempfile::tempdir().unwrap();
@ -288,6 +433,64 @@ mod tests {
assert_eq!(find(&dir, "hello").unwrap(), None);
}
#[test]
fn delete_async_basic() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567;
let opts = PutOpts::new().integrity(sri).time(time);
insert(&dir, "hello", opts).unwrap();
task::block_on(async {
delete_async(&dir, "hello").await.unwrap();
});
assert_eq!(find(&dir, "hello").unwrap(), None);
}
#[test]
fn round_trip() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567;
let opts = PutOpts::new().integrity(sri.clone()).time(time);
insert(&dir, "hello", opts).unwrap();
let entry = find(&dir, "hello").unwrap().unwrap();
assert_eq!(
entry,
Entry {
key: String::from("hello"),
integrity: sri,
time,
size: 0,
metadata: json!(null)
}
);
}
#[test]
fn round_trip_async() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567;
let opts = PutOpts::new().integrity(sri.clone()).time(time);
task::block_on(async {
insert_async(&dir, "hello", opts).await.unwrap();
});
let entry = task::block_on(async { find_async(&dir, "hello").await.unwrap().unwrap() });
assert_eq!(
entry,
Entry {
key: String::from("hello"),
integrity: sri,
time,
size: 0,
metadata: json!(null)
}
);
}
#[test]
fn ls_basic() {
let tmp = tempfile::tempdir().unwrap();

View File

@ -20,5 +20,9 @@ pub mod ls;
pub mod put;
pub mod rm;
pub mod async_get;
pub mod async_put;
pub mod async_rm;
pub use errors::Error;
pub use index::Entry;

View File

@ -26,10 +26,11 @@ where
.algorithm(Algorithm::Sha256)
.open(cache.as_ref(), key.as_ref())?;
writer.write_all(data.as_ref())?;
writer.flush()?;
writer.commit()
}
/// Options and flags for opening a new cache file to write data into.
/// Builder for pptions and flags for opening a new cache file to write data into.
#[derive(Clone, Default)]
pub struct PutOpts {
pub(crate) algorithm: Option<Algorithm>,
@ -135,7 +136,7 @@ impl Put {
/// verifies data against `size` and `integrity` options, if provided.
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub fn commit(self) -> Result<Integrity, Error> {
pub fn commit(mut self) -> Result<Integrity, Error> {
let writer_sri = self.writer.close()?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
@ -148,13 +149,29 @@ impl Put {
if matched.is_none() {
return Err(Error::IntegrityError);
}
} else {
self.opts.sri = Some(writer_sri);
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError);
}
}
index::insert(&self.cache, &self.key, self.opts)?;
Ok(writer_sri)
index::insert(&self.cache, &self.key, self.opts)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::get;
#[test]
fn round_trip() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
data(&dir, "hello", b"hello").unwrap();
let data = get::read(&dir, "hello").unwrap();
assert_eq!(data, b"hello");
}
}