feature(async): Add tokio as an executor option (#36)

This PR is based on the work @alexschrod did in PR #29. All
I did was carry it over the finish line.

This PR adds a feature to the crate named `tokio-runtime`. If
you disable default features and enable this new one, cacache
uses tokio as its async executor. This makes integrating cacache
with tokio-using projects easier, because the file types leak out
if you use anything more than the top-level convenience functions.

The PR implements the feature using shims in a new submodule named
`async_lib`. This module conditionally uses either async-std
or tokio based on feature selection, and hides some differences with
convenience functions.

This change should not be a breaking change, because the default is
still async-std.

There are a few other small changes in this PR worth noting.

- The README shows how to switch runtimes.
- There's a justfile to run common tasks, including those in makefile.toml.
  The default shell is `sh`, so this might not work out of the box for
  Windows users.
- The tests can now run under either runtime. The justfile has a recipe
  that runs them both.
- The benchmarks can also run under either runtime. The justfile has two
  recipe for this, one using bench and the other using criterion's runner.
- The dependencies now pull in async-attributes by default along with
  async-std. This made it easier to swap runtimes in the tests.
- All dependency versions have been bumped.

Co-authored-by: @alexschrod
This commit is contained in:
C J Silverio 2023-01-28 13:01:59 -08:00 committed by GitHub
parent 6d84ff0aed
commit e34dcfdc25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 407 additions and 114 deletions

View File

@ -8,32 +8,41 @@ license = "Apache-2.0"
repository = "https://github.com/zkat/cacache-rs" repository = "https://github.com/zkat/cacache-rs"
homepage = "https://github.com/zkat/cacache-rs" homepage = "https://github.com/zkat/cacache-rs"
readme = "README.md" readme = "README.md"
categories = [ categories = ["caching", "filesystem"]
"caching",
"filesystem"
]
[dependencies] [dependencies]
ssri = "7.0.0" async-attributes = { version = "1.1.2", optional = true }
async-std = { version = "1.10.0", features = ["unstable"], optional = true }
digest = "0.10.6"
either = "1.6.1"
futures = "0.3.17"
hex = "0.4.3" hex = "0.4.3"
tempfile = "3.2.0" memmap2 = "0.5.8"
sha-1 = "0.9.8"
sha2 = "0.9.8"
digest = "0.9.0"
serde_json = "1.0.68"
serde = "1.0.130" serde = "1.0.130"
serde_derive = "1.0.130" serde_derive = "1.0.130"
walkdir = "2.3.2" serde_json = "1.0.68"
either = "1.6.1" sha1 = "0.10.5"
async-std = { version = "1.10.0", features = ["unstable"] } sha2 = "0.10.6"
ssri = "7.0.0"
tempfile = "3.2.0"
thiserror = "1.0.29" thiserror = "1.0.29"
futures = "0.3.17" tokio = { version = "1.12.0", features = [
memmap2 = "0.5" "fs",
"io-util",
"macros",
"rt",
"rt-multi-thread",
], optional = true }
tokio-stream = { version = "0.1.7", features = ["io-util"], optional = true }
walkdir = "2.3.2"
[dev-dependencies] [dev-dependencies]
async-attributes = "1.1.2" criterion = "0.4.0"
criterion = "0.3.5"
[[bench]] [[bench]]
name = "benchmarks" name = "benchmarks"
harness = false harness = false
[features]
default = ["async-std", "async-attributes"]
tokio-runtime = ["tokio", "tokio-stream"]

View File

@ -38,7 +38,7 @@ Minimum supported Rust version is `1.43.0`.
## Features ## Features
- First-class async support, using [`async-std`](https://crates.io/crates/async-std) as its runtime. Sync APIs are available but secondary - First-class async support, using either [`async-std`](https://crates.io/crates/async-std) or [`tokio`](https://crates.io/crates/tokio) as its runtime. Sync APIs are available but secondary
- `std::fs`-style API - `std::fs`-style API
- Extraction by key or by content address (shasum, etc) - Extraction by key or by content address (shasum, etc)
- [Subresource Integrity](#integrity) web standard support - [Subresource Integrity](#integrity) web standard support
@ -55,6 +55,13 @@ Minimum supported Rust version is `1.43.0`.
- Cross-platform: Windows and case-(in)sensitive filesystem support - Cross-platform: Windows and case-(in)sensitive filesystem support
- Punches nazis - Punches nazis
`async-std` is the default async runtime. To use `tokio` instead, turn off default features and enable the `tokio-runtime` feature, like this:
```toml
[dependencies]
cacache = { version = "*", default-features = false, features = ["tokio-runtime"] }
```
## Contributing ## Contributing
The cacache team enthusiastically welcomes contributions and project participation! There's a bunch of things you can do if you want to contribute! The [Contributor Guide](CONTRIBUTING.md) has all the information you need for everything from reporting bugs to contributing entire new features. Please don't hesitate to jump in if you'd like to, or even ask us questions if something isn't clear. The cacache team enthusiastically welcomes contributions and project participation! There's a bunch of things you can do if you want to contribute! The [Contributor Guide](CONTRIBUTING.md) has all the information you need for everything from reporting bugs to contributing entire new features. Please don't hesitate to jump in if you'd like to, or even ask us questions if something isn't clear.

View File

@ -1,4 +1,19 @@
use async_std::{fs as afs, task}; #[cfg(feature = "async-std")]
use async_std::fs as afs;
#[cfg(all(test, feature = "tokio"))]
use tokio::fs as afs;
#[cfg(all(test, feature = "async-std"))]
pub use async_std::task::block_on;
#[cfg(all(test, feature = "tokio"))]
#[inline]
pub fn block_on<F, T>(future: F) -> T
where
F: std::future::Future<Output = T>,
{
tokio::runtime::Runtime::new().unwrap().block_on(future)
}
use std::fs::{self, File}; use std::fs::{self, File};
use std::io::prelude::*; use std::io::prelude::*;
@ -47,7 +62,7 @@ fn baseline_read_async(c: &mut Criterion) {
fd.write_all(data).unwrap(); fd.write_all(data).unwrap();
drop(fd); drop(fd);
c.bench_function("baseline_read_async", move |b| { c.bench_function("baseline_read_async", move |b| {
b.iter(|| task::block_on(afs::read(&path))) b.iter(|| block_on(afs::read(&path)))
}); });
} }
@ -66,7 +81,7 @@ fn baseline_read_many_async(c: &mut Criterion) {
c.bench_function("baseline_read_many_async", move |b| { c.bench_function("baseline_read_many_async", move |b| {
b.iter(|| { b.iter(|| {
let tasks = paths.iter().map(|path| afs::read(black_box(path))); let tasks = paths.iter().map(|path| afs::read(black_box(path)));
task::block_on(futures::future::join_all(tasks)); block_on(futures::future::join_all(tasks));
}) })
}); });
} }
@ -137,7 +152,7 @@ fn read_hash_many_async(c: &mut Criterion) {
let tasks = sris let tasks = sris
.iter() .iter()
.map(|sri| cacache::read_hash(black_box(&cache), black_box(sri))); .map(|sri| cacache::read_hash(black_box(&cache), black_box(sri)));
task::block_on(futures::future::join_all(tasks)); block_on(futures::future::join_all(tasks));
}) })
}); });
} }
@ -148,7 +163,7 @@ fn read_hash_async(c: &mut Criterion) {
let data = b"hello world".to_vec(); let data = b"hello world".to_vec();
let sri = cacache::write_sync(&cache, "hello", data).unwrap(); let sri = cacache::write_sync(&cache, "hello", data).unwrap();
c.bench_function("get::data_hash", move |b| { c.bench_function("get::data_hash", move |b| {
b.iter(|| task::block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap()) b.iter(|| block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap())
}); });
} }
@ -158,7 +173,7 @@ fn read_async(c: &mut Criterion) {
let data = b"hello world".to_vec(); let data = b"hello world".to_vec();
cacache::write_sync(&cache, "hello", data).unwrap(); cacache::write_sync(&cache, "hello", data).unwrap();
c.bench_function("get::data", move |b| { c.bench_function("get::data", move |b| {
b.iter(|| task::block_on(cacache::read(black_box(&cache), black_box("hello"))).unwrap()) b.iter(|| block_on(cacache::read(black_box(&cache), black_box("hello"))).unwrap())
}); });
} }
@ -168,7 +183,7 @@ fn read_hash_async_big_data(c: &mut Criterion) {
let data = vec![1; 1024 * 1024 * 5]; let data = vec![1; 1024 * 1024 * 5];
let sri = cacache::write_sync(&cache, "hello", data).unwrap(); let sri = cacache::write_sync(&cache, "hello", data).unwrap();
c.bench_function("get::data_big_data", move |b| { c.bench_function("get::data_big_data", move |b| {
b.iter(|| task::block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap()) b.iter(|| block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap())
}); });
} }
@ -179,7 +194,7 @@ fn write_hash_async(c: &mut Criterion) {
b.iter_custom(|iters| { b.iter_custom(|iters| {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
for i in 0..iters { for i in 0..iters {
task::block_on(cacache::write_hash(&cache, format!("hello world{}", i))).unwrap(); block_on(cacache::write_hash(&cache, format!("hello world{}", i))).unwrap();
} }
start.elapsed() start.elapsed()
}) })

44
justfile Normal file
View File

@ -0,0 +1,44 @@
# List available just recipes
@help:
just -l
# Run tests on both runtimes with cargo nextest
@test:
echo "----------\nasync-std:\n"
cargo nextest run
echo "\n----------\ntokio:\n"
cargo nextest run --no-default-features --features tokio-runtime
# Run benchmarks with `cargo bench`
@bench:
echo "----------\nasync-std:\n"
cargo bench
echo "\n----------\ntokio:\n"
cargo bench --no-default-features --features tokio-runtime
# Run benchmarks with `cargo criterion`
@criterion:
echo "----------\nasync-std:\n"
cargo criterion
echo "\n----------\ntokio:\n"
cargo criterion --no-default-features --features tokio-runtime
# Generate a changelog with git-cliff
changelog TAG:
git-cliff --prepend CHANGELOG.md -u --tag {{TAG}}
# Prepare a release
release *args:
cargo release --workspace {{args}}
# Install workspace tools
@install-tools:
cargo install cargo-nextest
cargo install cargo-release
cargo install git-cliff
cargo install cargo-criterion
# Lint and automatically fix what we can fix
@lint:
cargo clippy --fix --allow-dirty --allow-staged
cargo fmt

125
src/async_lib.rs Normal file
View File

@ -0,0 +1,125 @@
#[cfg(feature = "async-std")]
pub use async_std::fs::File;
#[cfg(feature = "tokio")]
pub use tokio::fs::File;
#[cfg(feature = "async-std")]
pub use futures::io::AsyncRead;
#[cfg(feature = "tokio")]
pub use tokio::io::AsyncRead;
#[cfg(feature = "async-std")]
pub use futures::io::AsyncReadExt;
#[cfg(feature = "tokio")]
pub use tokio::io::AsyncReadExt;
#[cfg(feature = "async-std")]
pub use futures::io::AsyncBufReadExt;
#[cfg(feature = "tokio")]
pub use tokio::io::AsyncBufReadExt;
#[cfg(feature = "async-std")]
pub use futures::io::AsyncWrite;
#[cfg(feature = "tokio")]
pub use tokio::io::AsyncWrite;
#[cfg(feature = "async-std")]
pub use futures::io::AsyncWriteExt;
#[cfg(feature = "tokio")]
pub use tokio::io::AsyncWriteExt;
#[cfg(feature = "async-std")]
pub use async_std::fs::read;
#[cfg(feature = "tokio")]
pub use tokio::fs::read;
#[cfg(feature = "async-std")]
pub use async_std::fs::copy;
#[cfg(feature = "tokio")]
pub use tokio::fs::copy;
#[cfg(feature = "async-std")]
pub use async_std::fs::metadata;
#[cfg(feature = "tokio")]
pub use tokio::fs::metadata;
#[cfg(feature = "async-std")]
pub use async_std::fs::remove_file;
#[cfg(feature = "tokio")]
pub use tokio::fs::remove_file;
#[cfg(feature = "async-std")]
pub use async_std::fs::create_dir_all;
#[cfg(feature = "tokio")]
pub use tokio::fs::create_dir_all;
#[cfg(feature = "async-std")]
pub use async_std::fs::remove_dir_all;
#[cfg(feature = "tokio")]
pub use tokio::fs::remove_dir_all;
#[cfg(feature = "async-std")]
pub use async_std::fs::DirBuilder;
#[cfg(feature = "tokio")]
pub use tokio::fs::DirBuilder;
#[cfg(feature = "async-std")]
pub use async_std::fs::OpenOptions;
#[cfg(feature = "tokio")]
pub use tokio::fs::OpenOptions;
#[cfg(feature = "async-std")]
pub use async_std::io::BufReader;
#[cfg(feature = "tokio")]
pub use tokio::io::BufReader;
#[cfg(feature = "async-std")]
#[inline]
pub fn lines_to_stream<R>(lines: futures::io::Lines<R>) -> futures::io::Lines<R> {
lines
}
#[cfg(feature = "tokio")]
#[inline]
pub fn lines_to_stream<R>(lines: tokio::io::Lines<R>) -> tokio_stream::wrappers::LinesStream<R> {
tokio_stream::wrappers::LinesStream::new(lines)
}
#[cfg(feature = "async-std")]
pub use async_std::task::spawn_blocking;
#[cfg(feature = "tokio")]
pub use tokio::task::spawn_blocking;
#[cfg(feature = "async-std")]
pub use async_std::task::JoinHandle;
#[cfg(feature = "async-std")]
#[inline]
pub fn unwrap_joinhandle_value<T>(value: T) -> T {
value
}
#[cfg(feature = "tokio")]
pub use tokio::task::JoinHandle;
#[cfg(feature = "tokio")]
#[inline]
pub fn unwrap_joinhandle_value<T>(value: Result<T, tokio::task::JoinError>) -> T {
value.unwrap()
}
use crate::errors::{Internal, InternalResult};
use tempfile::NamedTempFile;
#[cfg(feature = "async-std")]
#[inline]
pub async fn create_named_tempfile(tmp_path: std::path::PathBuf) -> InternalResult<NamedTempFile> {
spawn_blocking(|| NamedTempFile::new_in(tmp_path))
.await
.to_internal()
}
#[cfg(feature = "tokio")]
#[inline]
pub async fn create_named_tempfile(tmp_path: std::path::PathBuf) -> InternalResult<NamedTempFile> {
let tmpfile = spawn_blocking(|| NamedTempFile::new_in(tmp_path))
.await
.to_internal()?;
tmpfile.to_internal()
}

View File

@ -3,10 +3,9 @@ use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use async_std;
use futures::prelude::*;
use ssri::{Algorithm, Integrity, IntegrityChecker}; use ssri::{Algorithm, Integrity, IntegrityChecker};
use crate::async_lib::AsyncRead;
use crate::content::path; use crate::content::path;
use crate::errors::{Internal, Result}; use crate::errors::{Internal, Result};
@ -30,11 +29,12 @@ impl Reader {
} }
pub struct AsyncReader { pub struct AsyncReader {
fd: async_std::fs::File, fd: crate::async_lib::File,
checker: IntegrityChecker, checker: IntegrityChecker,
} }
impl AsyncRead for AsyncReader { impl AsyncRead for AsyncReader {
#[cfg(feature = "async-std")]
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut Context<'_>, cx: &mut Context<'_>,
@ -44,6 +44,22 @@ impl AsyncRead for AsyncReader {
self.checker.input(&buf[..amt]); self.checker.input(&buf[..amt]);
Poll::Ready(Ok(amt)) Poll::Ready(Ok(amt))
} }
#[cfg(feature = "tokio")]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
let pre_len = buf.filled().len();
futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?;
let post_len = buf.filled().len();
if post_len - pre_len == 0 {
return Poll::Ready(Ok(()));
}
self.checker.input(&buf.filled()[pre_len..]);
Poll::Ready(Ok(()))
}
} }
impl AsyncReader { impl AsyncReader {
@ -63,7 +79,7 @@ pub fn open(cache: &Path, sri: Integrity) -> Result<Reader> {
pub async fn open_async(cache: &Path, sri: Integrity) -> Result<AsyncReader> { pub async fn open_async(cache: &Path, sri: Integrity) -> Result<AsyncReader> {
let cpath = path::content_path(cache, &sri); let cpath = path::content_path(cache, &sri);
Ok(AsyncReader { Ok(AsyncReader {
fd: async_std::fs::File::open(cpath).await.to_internal()?, fd: crate::async_lib::File::open(cpath).await.to_internal()?,
checker: IntegrityChecker::new(sri), checker: IntegrityChecker::new(sri),
}) })
} }
@ -77,7 +93,7 @@ pub fn read(cache: &Path, sri: &Integrity) -> Result<Vec<u8>> {
pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result<Vec<u8>> { pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result<Vec<u8>> {
let cpath = path::content_path(cache, sri); let cpath = path::content_path(cache, sri);
let ret = async_std::fs::read(&cpath).await.to_internal()?; let ret = crate::async_lib::read(&cpath).await.to_internal()?;
sri.check(&ret)?; sri.check(&ret)?;
Ok(ret) Ok(ret)
} }
@ -92,8 +108,8 @@ pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64> {
pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result<u64> { pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result<u64> {
let cpath = path::content_path(cache, sri); let cpath = path::content_path(cache, sri);
let ret = async_std::fs::copy(&cpath, to).await.to_internal()?; let ret = crate::async_lib::copy(&cpath, to).await.to_internal()?;
let data = async_std::fs::read(cpath).await.to_internal()?; let data = crate::async_lib::read(cpath).await.to_internal()?;
sri.check(data)?; sri.check(data)?;
Ok(ret) Ok(ret)
} }
@ -107,7 +123,7 @@ pub fn has_content(cache: &Path, sri: &Integrity) -> Option<Integrity> {
} }
pub async fn has_content_async(cache: &Path, sri: &Integrity) -> Option<Integrity> { pub async fn has_content_async(cache: &Path, sri: &Integrity) -> Option<Integrity> {
if async_std::fs::metadata(path::content_path(cache, sri)) if crate::async_lib::metadata(path::content_path(cache, sri))
.await .await
.is_ok() .is_ok()
{ {

View File

@ -1,7 +1,6 @@
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
use async_std::fs as afs;
use ssri::Integrity; use ssri::Integrity;
use crate::content::path; use crate::content::path;
@ -13,7 +12,7 @@ pub fn rm(cache: &Path, sri: &Integrity) -> Result<()> {
} }
pub async fn rm_async(cache: &Path, sri: &Integrity) -> Result<()> { pub async fn rm_async(cache: &Path, sri: &Integrity) -> Result<()> {
afs::remove_file(path::content_path(cache, sri)) crate::async_lib::remove_file(path::content_path(cache, sri))
.await .await
.to_internal()?; .to_internal()?;
Ok(()) Ok(())

View File

@ -3,16 +3,14 @@ use std::io::prelude::*;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex; use std::sync::Mutex;
use std::task::{Context, Poll};
use async_std::fs as afs;
use async_std::future::Future;
use async_std::task::{self, Context, JoinHandle, Poll};
use futures::io::AsyncWrite;
use futures::prelude::*; use futures::prelude::*;
use memmap2::MmapMut; use memmap2::MmapMut;
use ssri::{Algorithm, Integrity, IntegrityOpts}; use ssri::{Algorithm, Integrity, IntegrityOpts};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use crate::async_lib::{AsyncWrite, JoinHandle};
use crate::content::path; use crate::content::path;
use crate::errors::{Internal, Result}; use crate::errors::{Internal, Result};
@ -116,14 +114,12 @@ impl AsyncWriter {
let cache_path = cache.to_path_buf(); let cache_path = cache.to_path_buf();
let mut tmp_path = cache_path.clone(); let mut tmp_path = cache_path.clone();
tmp_path.push("tmp"); tmp_path.push("tmp");
afs::DirBuilder::new() crate::async_lib::DirBuilder::new()
.recursive(true) .recursive(true)
.create(&tmp_path) .create(&tmp_path)
.await .await
.to_internal()?; .to_internal()?;
let mut tmpfile = task::spawn_blocking(|| NamedTempFile::new_in(tmp_path)) let mut tmpfile = crate::async_lib::create_named_tempfile(tmp_path).await?;
.await
.to_internal()?;
let mmap = if let Some(size) = size { let mmap = if let Some(size) = size {
if size <= MAX_MMAP_SIZE { if size <= MAX_MMAP_SIZE {
tmpfile.as_file_mut().set_len(size as u64).to_internal()?; tmpfile.as_file_mut().set_len(size as u64).to_internal()?;
@ -162,7 +158,7 @@ impl AsyncWriter {
let cpath = path::content_path(&inner.cache, &sri); let cpath = path::content_path(&inner.cache, &sri);
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(task::spawn_blocking(|| { *state = State::Busy(crate::async_lib::spawn_blocking(|| {
let res = std::fs::DirBuilder::new() let res = std::fs::DirBuilder::new()
.recursive(true) .recursive(true)
// Safe unwrap. cpath always has multiple segments // Safe unwrap. cpath always has multiple segments
@ -204,7 +200,11 @@ impl AsyncWriter {
} }
}, },
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => {
*state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(
Pin::new(task).poll(cx)
))
}
} }
} }
}) })
@ -255,7 +255,7 @@ impl AsyncWrite for AsyncWriter {
inner.buf[..buf.len()].copy_from_slice(buf); inner.buf[..buf.len()].copy_from_slice(buf);
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(task::spawn_blocking(|| { *state = State::Busy(crate::async_lib::spawn_blocking(|| {
inner.builder.input(&inner.buf); inner.builder.input(&inner.buf);
if let Some(mmap) = &mut inner.mmap { if let Some(mmap) = &mut inner.mmap {
mmap.copy_from_slice(&inner.buf); mmap.copy_from_slice(&inner.buf);
@ -270,7 +270,12 @@ impl AsyncWrite for AsyncWriter {
} }
} }
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => {
*state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new(
task
)
.poll(cx)))
}
} }
} }
} }
@ -302,7 +307,7 @@ impl AsyncWrite for AsyncWriter {
} }
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(task::spawn_blocking(|| { *state = State::Busy(crate::async_lib::spawn_blocking(|| {
let res = inner.tmpfile.flush(); let res = inner.tmpfile.flush();
inner.last_op = Some(Operation::Flush(res)); inner.last_op = Some(Operation::Flush(res));
State::Idle(Some(inner)) State::Idle(Some(inner))
@ -310,12 +315,33 @@ impl AsyncWrite for AsyncWriter {
} }
} }
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => {
*state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new(
task
)
.poll(cx)))
}
} }
} }
} }
#[cfg(feature = "async-std")]
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> { fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.poll_close_impl(cx)
}
#[cfg(feature = "tokio")]
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
self.poll_close_impl(cx)
}
}
impl AsyncWriter {
#[inline]
fn poll_close_impl(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<std::io::Result<()>> {
let state = &mut *self.0.lock().unwrap(); let state = &mut *self.0.lock().unwrap();
loop { loop {
@ -329,13 +355,18 @@ impl AsyncWrite for AsyncWriter {
}; };
// Start the operation asynchronously. // Start the operation asynchronously.
*state = State::Busy(task::spawn_blocking(|| { *state = State::Busy(crate::async_lib::spawn_blocking(|| {
drop(inner); drop(inner);
State::Idle(None) State::Idle(None)
})); }));
} }
// Poll the asynchronous operation the file is currently blocked on. // Poll the asynchronous operation the file is currently blocked on.
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), State::Busy(task) => {
*state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new(
task
)
.poll(cx)))
}
} }
} }
} }
@ -348,8 +379,14 @@ fn io_error(err: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> std::io
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use async_std::task; use crate::async_lib::AsyncWriteExt;
use tempfile; use tempfile;
#[cfg(feature = "async-std")]
use async_attributes::test as async_test;
#[cfg(feature = "tokio")]
use tokio::test as async_test;
#[test] #[test]
fn basic_write() { fn basic_write() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
@ -364,21 +401,19 @@ mod tests {
); );
} }
#[test] #[async_test]
fn basic_async_write() { async fn basic_async_write() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
task::block_on(async { let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None)
let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None) .await
.await .unwrap();
.unwrap(); writer.write_all(b"hello world").await.unwrap();
writer.write_all(b"hello world").await.unwrap(); let sri = writer.close().await.unwrap();
let sri = writer.close().await.unwrap(); assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string()); assert_eq!(
assert_eq!( std::fs::read(path::content_path(&dir, &sri)).unwrap(),
std::fs::read(path::content_path(&dir, &sri)).unwrap(), b"hello world"
b"hello world" );
);
});
} }
} }

View File

@ -3,10 +3,9 @@ use std::path::Path;
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context as TaskContext, Poll}; use std::task::{Context as TaskContext, Poll};
use futures::prelude::*;
use ssri::{Algorithm, Integrity}; use ssri::{Algorithm, Integrity};
use crate::async_lib::AsyncRead;
use crate::content::read; use crate::content::read;
use crate::errors::{Error, Result}; use crate::errors::{Error, Result};
use crate::index::{self, Metadata}; use crate::index::{self, Metadata};
@ -24,6 +23,7 @@ pub struct Reader {
} }
impl AsyncRead for Reader { impl AsyncRead for Reader {
#[cfg(feature = "async-std")]
fn poll_read( fn poll_read(
mut self: Pin<&mut Self>, mut self: Pin<&mut Self>,
cx: &mut TaskContext<'_>, cx: &mut TaskContext<'_>,
@ -31,6 +31,15 @@ impl AsyncRead for Reader {
) -> Poll<std::io::Result<usize>> { ) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.reader).poll_read(cx, buf) Pin::new(&mut self.reader).poll_read(cx, buf)
} }
#[cfg(feature = "tokio")]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
}
} }
impl Reader { impl Reader {
@ -457,11 +466,15 @@ pub fn exists_sync<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use async_std::fs as afs; use crate::async_lib::AsyncReadExt;
use async_std::prelude::*;
use std::fs; use std::fs;
#[async_attributes::test] #[cfg(feature = "async-std")]
use async_attributes::test as async_test;
#[cfg(feature = "tokio")]
use tokio::test as async_test;
#[async_test]
async fn test_open() { async fn test_open() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
@ -474,7 +487,7 @@ mod tests {
assert_eq!(str, String::from("hello world")); assert_eq!(str, String::from("hello world"));
} }
#[async_attributes::test] #[async_test]
async fn test_open_hash() { async fn test_open_hash() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
@ -515,7 +528,7 @@ mod tests {
assert_eq!(str, String::from("hello world")); assert_eq!(str, String::from("hello world"));
} }
#[async_attributes::test] #[async_test]
async fn test_read() { async fn test_read() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
@ -525,7 +538,7 @@ mod tests {
assert_eq!(data, b"hello world"); assert_eq!(data, b"hello world");
} }
#[async_attributes::test] #[async_test]
async fn test_read_hash() { async fn test_read_hash() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
@ -555,7 +568,7 @@ mod tests {
assert_eq!(data, b"hello world"); assert_eq!(data, b"hello world");
} }
#[async_attributes::test] #[async_test]
async fn test_copy() { async fn test_copy() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path(); let dir = tmp.path();
@ -563,11 +576,11 @@ mod tests {
crate::write(&dir, "my-key", b"hello world").await.unwrap(); crate::write(&dir, "my-key", b"hello world").await.unwrap();
crate::copy(&dir, "my-key", &dest).await.unwrap(); crate::copy(&dir, "my-key", &dest).await.unwrap();
let data = afs::read(&dest).await.unwrap(); let data = crate::async_lib::read(&dest).await.unwrap();
assert_eq!(data, b"hello world"); assert_eq!(data, b"hello world");
} }
#[async_attributes::test] #[async_test]
async fn test_copy_hash() { async fn test_copy_hash() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path(); let dir = tmp.path();
@ -575,7 +588,7 @@ mod tests {
let sri = crate::write(&dir, "my-key", b"hello world").await.unwrap(); let sri = crate::write(&dir, "my-key", b"hello world").await.unwrap();
crate::copy_hash(&dir, &sri, &dest).await.unwrap(); crate::copy_hash(&dir, &sri, &dest).await.unwrap();
let data = afs::read(&dest).await.unwrap(); let data = crate::async_lib::read(&dest).await.unwrap();
assert_eq!(data, b"hello world"); assert_eq!(data, b"hello world");
} }

View File

@ -5,11 +5,8 @@ use std::io::{ErrorKind, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use async_std::fs as afs;
use async_std::io::BufReader;
use digest::Digest; use digest::Digest;
use either::{Left, Right}; use either::{Left, Right};
use futures::io::{AsyncBufReadExt, AsyncWriteExt};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use serde_derive::{Deserialize, Serialize}; use serde_derive::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
@ -18,6 +15,7 @@ use sha2::Sha256;
use ssri::Integrity; use ssri::Integrity;
use walkdir::WalkDir; use walkdir::WalkDir;
use crate::async_lib::{AsyncBufReadExt, AsyncWriteExt};
use crate::errors::{Internal, InternalResult, Result}; use crate::errors::{Internal, InternalResult, Result};
use crate::put::WriteOpts; use crate::put::WriteOpts;
@ -97,7 +95,7 @@ pub fn insert(cache: &Path, key: &str, opts: WriteOpts) -> Result<Integrity> {
pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: WriteOpts) -> Result<Integrity> { pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: WriteOpts) -> Result<Integrity> {
let bucket = bucket_path(cache, key); let bucket = bucket_path(cache, key);
afs::create_dir_all(bucket.parent().unwrap()) crate::async_lib::create_dir_all(bucket.parent().unwrap())
.await .await
.with_context(|| { .with_context(|| {
format!( format!(
@ -114,7 +112,7 @@ pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: WriteOpts) ->
}) })
.with_context(|| format!("Failed to serialize entry with key `{}`", key))?; .with_context(|| format!("Failed to serialize entry with key `{}`", key))?;
let mut buck = async_std::fs::OpenOptions::new() let mut buck = crate::async_lib::OpenOptions::new()
.create(true) .create(true)
.append(true) .append(true)
.open(&bucket) .open(&bucket)
@ -311,7 +309,7 @@ fn bucket_entries(bucket: &Path) -> InternalResult<Vec<SerializableMetadata>> {
} }
async fn bucket_entries_async(bucket: &Path) -> InternalResult<Vec<SerializableMetadata>> { async fn bucket_entries_async(bucket: &Path) -> InternalResult<Vec<SerializableMetadata>> {
let file_result = afs::File::open(bucket).await; let file_result = crate::async_lib::File::open(bucket).await;
let file = if let Err(err) = file_result { let file = if let Err(err) = file_result {
if err.kind() == ErrorKind::NotFound { if err.kind() == ErrorKind::NotFound {
return Ok(Vec::new()); return Ok(Vec::new());
@ -321,7 +319,8 @@ async fn bucket_entries_async(bucket: &Path) -> InternalResult<Vec<SerializableM
file_result.unwrap() file_result.unwrap()
}; };
let mut vec = Vec::new(); let mut vec = Vec::new();
let mut lines = BufReader::new(file).lines(); let mut lines =
crate::async_lib::lines_to_stream(crate::async_lib::BufReader::new(file).lines());
while let Some(line) = lines.next().await { while let Some(line) = lines.next().await {
if let Ok(entry) = line { if let Ok(entry) = line {
let entry_str = match entry.split('\t').collect::<Vec<&str>>()[..] { let entry_str = match entry.split('\t').collect::<Vec<&str>>()[..] {
@ -340,9 +339,13 @@ async fn bucket_entries_async(bucket: &Path) -> InternalResult<Vec<SerializableM
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use async_std::task;
use serde_json::json; use serde_json::json;
#[cfg(feature = "async-std")]
use async_attributes::test as async_test;
#[cfg(feature = "tokio")]
use tokio::test as async_test;
const MOCK_ENTRY: &str = "\n251d18a2b33264ea8655695fd23c88bd874cdea2c3dc9d8f9b7596717ad30fec\t{\"key\":\"hello\",\"integrity\":\"sha1-deadbeef\",\"time\":1234567,\"size\":0,\"metadata\":null}"; const MOCK_ENTRY: &str = "\n251d18a2b33264ea8655695fd23c88bd874cdea2c3dc9d8f9b7596717ad30fec\t{\"key\":\"hello\",\"integrity\":\"sha1-deadbeef\",\"time\":1234567,\"size\":0,\"metadata\":null}";
#[test] #[test]
@ -357,14 +360,14 @@ mod tests {
assert_eq!(entry, MOCK_ENTRY); assert_eq!(entry, MOCK_ENTRY);
} }
#[test] #[async_test]
fn insert_async_basic() { async fn insert_async_basic() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap(); let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567; let time = 1_234_567;
let opts = WriteOpts::new().integrity(sri).time(time); let opts = WriteOpts::new().integrity(sri).time(time);
task::block_on(async { futures::executor::block_on(async {
insert_async(&dir, "hello", opts).await.unwrap(); insert_async(&dir, "hello", opts).await.unwrap();
}); });
let entry = std::fs::read_to_string(bucket_path(&dir, "hello")).unwrap(); let entry = std::fs::read_to_string(bucket_path(&dir, "hello")).unwrap();
@ -412,15 +415,15 @@ mod tests {
assert_eq!(find(&dir, "hello").unwrap(), None); assert_eq!(find(&dir, "hello").unwrap(), None);
} }
#[test] #[async_test]
fn delete_async_basic() { async fn delete_async_basic() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap(); let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567; let time = 1_234_567;
let opts = WriteOpts::new().integrity(sri).time(time); let opts = WriteOpts::new().integrity(sri).time(time);
insert(&dir, "hello", opts).unwrap(); insert(&dir, "hello", opts).unwrap();
task::block_on(async { futures::executor::block_on(async {
delete_async(&dir, "hello").await.unwrap(); delete_async(&dir, "hello").await.unwrap();
}); });
assert_eq!(find(&dir, "hello").unwrap(), None); assert_eq!(find(&dir, "hello").unwrap(), None);
@ -447,17 +450,19 @@ mod tests {
); );
} }
#[test] #[async_test]
fn round_trip_async() { async fn round_trip_async() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap(); let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567; let time = 1_234_567;
let opts = WriteOpts::new().integrity(sri.clone()).time(time); let opts = WriteOpts::new().integrity(sri.clone()).time(time);
task::block_on(async { futures::executor::block_on(async {
insert_async(&dir, "hello", opts).await.unwrap(); insert_async(&dir, "hello", opts).await.unwrap();
}); });
let entry = task::block_on(async { find_async(&dir, "hello").await.unwrap().unwrap() }); let entry = futures::executor::block_on(async {
find_async(&dir, "hello").await.unwrap().unwrap()
});
assert_eq!( assert_eq!(
entry, entry,
Metadata { Metadata {

View File

@ -123,9 +123,17 @@
//! ``` //! ```
#![warn(missing_docs, missing_doc_code_examples)] #![warn(missing_docs, missing_doc_code_examples)]
#[cfg(not(any(feature = "async-std", feature = "tokio-runtime")))]
compile_error!("Either feature \"async-std\" or \"tokio-runtime\" must be enabled for this crate.");
#[cfg(all(feature = "async-std", feature = "tokio-runtime"))]
compile_error!("Only either feature \"async-std\" or \"tokio-runtime\" must be enabled for this crate, not both.");
pub use serde_json::Value; pub use serde_json::Value;
pub use ssri::Algorithm; pub use ssri::Algorithm;
mod async_lib;
mod content; mod content;
mod errors; mod errors;
mod index; mod index;

View File

@ -3,11 +3,10 @@ use std::io::prelude::*;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::pin::Pin; use std::pin::Pin;
use futures::prelude::*;
use serde_json::Value; use serde_json::Value;
use ssri::{Algorithm, Integrity}; use ssri::{Algorithm, Integrity};
use crate::async_lib::{AsyncWrite, AsyncWriteExt};
use crate::content::write; use crate::content::write;
use crate::errors::{Error, Internal, Result}; use crate::errors::{Error, Internal, Result};
use crate::index; use crate::index;
@ -102,9 +101,18 @@ impl AsyncWrite for Writer {
Pin::new(&mut self.writer).poll_flush(cx) Pin::new(&mut self.writer).poll_flush(cx)
} }
#[cfg(feature = "async-std")]
fn poll_close(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<std::io::Result<()>> { fn poll_close(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_close(cx) Pin::new(&mut self.writer).poll_close(cx)
} }
#[cfg(feature = "tokio")]
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_shutdown(cx)
}
} }
impl Writer { impl Writer {
@ -423,7 +431,12 @@ impl SyncWriter {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#[async_attributes::test] #[cfg(feature = "async-std")]
use async_attributes::test as async_test;
#[cfg(feature = "tokio")]
use tokio::test as async_test;
#[async_test]
async fn round_trip() { async fn round_trip() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
@ -455,7 +468,7 @@ mod tests {
assert_eq!(result, original, "we did not read back what we wrote"); assert_eq!(result, original, "we did not read back what we wrote");
} }
#[async_attributes::test] #[async_test]
async fn hash_write_async() { async fn hash_write_async() {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();

View File

@ -2,8 +2,6 @@
use std::fs; use std::fs;
use std::path::Path; use std::path::Path;
use async_std::fs as afs;
use ssri::Integrity; use ssri::Integrity;
use crate::content::rm; use crate::content::rm;
@ -93,7 +91,9 @@ pub async fn remove_hash<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<()
/// ``` /// ```
pub async fn clear<P: AsRef<Path>>(cache: P) -> Result<()> { pub async fn clear<P: AsRef<Path>>(cache: P) -> Result<()> {
for entry in (cache.as_ref().read_dir().to_internal()?).flatten() { for entry in (cache.as_ref().read_dir().to_internal()?).flatten() {
afs::remove_dir_all(entry.path()).await.to_internal()?; crate::async_lib::remove_dir_all(entry.path())
.await
.to_internal()?;
} }
Ok(()) Ok(())
} }
@ -182,11 +182,15 @@ pub fn clear_sync<P: AsRef<Path>>(cache: P) -> Result<()> {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use async_std::task;
#[test] #[cfg(feature = "async-std")]
fn test_remove() { use async_attributes::test as async_test;
task::block_on(async { #[cfg(feature = "tokio")]
use tokio::test as async_test;
#[async_test]
async fn test_remove() {
futures::executor::block_on(async {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
let sri = crate::write(&dir, "key", b"my-data").await.unwrap(); let sri = crate::write(&dir, "key", b"my-data").await.unwrap();
@ -201,9 +205,9 @@ mod tests {
}); });
} }
#[test] #[async_test]
fn test_remove_data() { async fn test_remove_data() {
task::block_on(async { futures::executor::block_on(async {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
let sri = crate::write(&dir, "key", b"my-data").await.unwrap(); let sri = crate::write(&dir, "key", b"my-data").await.unwrap();
@ -218,9 +222,9 @@ mod tests {
}); });
} }
#[test] #[async_test]
fn test_clear() { async fn test_clear() {
task::block_on(async { futures::executor::block_on(async {
let tmp = tempfile::tempdir().unwrap(); let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned(); let dir = tmp.path().to_owned();
let sri = crate::write(&dir, "key", b"my-data").await.unwrap(); let sri = crate::write(&dir, "key", b"my-data").await.unwrap();