feature(async): Add tokio as an executor option

This PR is based on the work @alexschrod did in PR #29. All
I did was carry it over the finish line.

This PR adds a feature to the crate named `tokio-runtime`. If
you disable default features and enable this new one, cacache
uses tokio as its async executor. This makes integrating cacache
with tokio-using projects easier, because the file types leak out
if you use anything more than the top-level convenience functions.

The PR implements the feature using shims in a new submodule named
`async_lib`. This module conditionally uses either async-std
or tokio based on feature selection, and hides some differences with
convenience functions.

This change should not be a breaking change, because the default is
still async-std.

There are a few other small changes in this PR worth noting.

- The README shows how to switch runtimes.
- There's a justfile to run common tasks, including those in makefile.toml.
  The default shell is `sh`, so this might not work out of the box for
  Windows users.
- The tests can now run under either runtime. The justfile has a recipe
  that runs them both.
- The benchmarks can also run under either runtime. The justfile has two
  recipe for this, one using bench and the other using criterion's runner.
- The dependencies now pull in async-attributes by default along with
  async-std. This made it easier to swap runtimes in the tests.
- All dependency versions have been bumped.

Co-authored-by: @alexschrod
This commit is contained in:
C J Silverio 2023-01-23 11:33:18 -08:00
parent 54935bb1e8
commit 15536afe8d
No known key found for this signature in database
9 changed files with 161 additions and 89 deletions

View File

@ -8,24 +8,23 @@ license = "Apache-2.0"
repository = "https://github.com/zkat/cacache-rs"
homepage = "https://github.com/zkat/cacache-rs"
readme = "README.md"
categories = [
"caching",
"filesystem"
]
categories = ["caching", "filesystem"]
[dependencies]
ssri = "7.0.0"
async-attributes = { version = "1.1.2", optional = true }
async-std = { version = "1.10.0", features = ["unstable"], optional = true }
digest = "0.10.6"
either = "1.6.1"
futures = "0.3.17"
hex = "0.4.3"
tempfile = "3.2.0"
sha-1 = "0.9.8"
sha2 = "0.9.8"
digest = "0.9.0"
serde_json = "1.0.68"
memmap2 = "0.5.8"
serde = "1.0.130"
serde_derive = "1.0.130"
walkdir = "2.3.2"
either = "1.6.1"
async-std = { version = "1.10.0", features = ["unstable"] }
serde_json = "1.0.68"
sha1 = "0.10.5"
sha2 = "0.10.6"
ssri = "7.0.0"
tempfile = "3.2.0"
thiserror = "1.0.29"
tokio = { version = "1.12.0", features = [
"fs",
@ -35,20 +34,15 @@ tokio = { version = "1.12.0", features = [
"rt-multi-thread",
], optional = true }
tokio-stream = { version = "0.1.7", features = ["io-util"], optional = true }
[features]
futures = "0.3.17"
memmap2 = "0.5"
walkdir = "2.3.2"
[dev-dependencies]
async-std = { version = "1.10.0", features = ["unstable"] }
async-attributes = "1.1.2"
criterion = "0.3.5"
criterion = "0.4.0"
[[bench]]
name = "benchmarks"
harness = false
[features]
default = ["async-std"]
default = ["async-std", "async-attributes"]
tokio-runtime = ["tokio", "tokio-stream"]

View File

@ -55,6 +55,13 @@ Minimum supported Rust version is `1.43.0`.
- Cross-platform: Windows and case-(in)sensitive filesystem support
- Punches nazis
`async-std` is the default async runtime. To use `tokio` instead, turn off default features and enable the `tokio-runtime` feature, like this:
```toml
[dependencies]
cacache = { version = "*", default-features = false, features = ["tokio-runtime"] }
```
## Contributing
The cacache team enthusiastically welcomes contributions and project participation! There's a bunch of things you can do if you want to contribute! The [Contributor Guide](CONTRIBUTING.md) has all the information you need for everything from reporting bugs to contributing entire new features. Please don't hesitate to jump in if you'd like to, or even ask us questions if something isn't clear.

View File

@ -1,4 +1,19 @@
use async_std::{fs as afs, task};
#[cfg(feature = "async-std")]
use async_std::fs as afs;
#[cfg(all(test, feature = "tokio"))]
use tokio::fs as afs;
#[cfg(all(test, feature = "async-std"))]
pub use async_std::task::block_on;
#[cfg(all(test, feature = "tokio"))]
#[inline]
pub fn block_on<F, T>(future: F) -> T
where
F: std::future::Future<Output = T>,
{
tokio::runtime::Runtime::new().unwrap().block_on(future)
}
use std::fs::{self, File};
use std::io::prelude::*;
@ -47,7 +62,7 @@ fn baseline_read_async(c: &mut Criterion) {
fd.write_all(data).unwrap();
drop(fd);
c.bench_function("baseline_read_async", move |b| {
b.iter(|| task::block_on(afs::read(&path)))
b.iter(|| block_on(afs::read(&path)))
});
}
@ -66,7 +81,7 @@ fn baseline_read_many_async(c: &mut Criterion) {
c.bench_function("baseline_read_many_async", move |b| {
b.iter(|| {
let tasks = paths.iter().map(|path| afs::read(black_box(path)));
task::block_on(futures::future::join_all(tasks));
block_on(futures::future::join_all(tasks));
})
});
}
@ -137,7 +152,7 @@ fn read_hash_many_async(c: &mut Criterion) {
let tasks = sris
.iter()
.map(|sri| cacache::read_hash(black_box(&cache), black_box(sri)));
task::block_on(futures::future::join_all(tasks));
block_on(futures::future::join_all(tasks));
})
});
}
@ -148,7 +163,7 @@ fn read_hash_async(c: &mut Criterion) {
let data = b"hello world".to_vec();
let sri = cacache::write_sync(&cache, "hello", data).unwrap();
c.bench_function("get::data_hash", move |b| {
b.iter(|| task::block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap())
b.iter(|| block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap())
});
}
@ -158,7 +173,7 @@ fn read_async(c: &mut Criterion) {
let data = b"hello world".to_vec();
cacache::write_sync(&cache, "hello", data).unwrap();
c.bench_function("get::data", move |b| {
b.iter(|| task::block_on(cacache::read(black_box(&cache), black_box("hello"))).unwrap())
b.iter(|| block_on(cacache::read(black_box(&cache), black_box("hello"))).unwrap())
});
}
@ -168,7 +183,7 @@ fn read_hash_async_big_data(c: &mut Criterion) {
let data = vec![1; 1024 * 1024 * 5];
let sri = cacache::write_sync(&cache, "hello", data).unwrap();
c.bench_function("get::data_big_data", move |b| {
b.iter(|| task::block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap())
b.iter(|| block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap())
});
}
@ -179,7 +194,7 @@ fn write_hash_async(c: &mut Criterion) {
b.iter_custom(|iters| {
let start = std::time::Instant::now();
for i in 0..iters {
task::block_on(cacache::write_hash(&cache, format!("hello world{}", i))).unwrap();
block_on(cacache::write_hash(&cache, format!("hello world{}", i))).unwrap();
}
start.elapsed()
})

44
justfile Normal file
View File

@ -0,0 +1,44 @@
# List available just recipes
@help:
just -l
# Run tests on both runtimes with cargo nextest
@test:
echo "----------\nasync-std:\n"
cargo nextest run
echo "\n----------\ntokio:\n"
cargo nextest run --no-default-features --features tokio-runtime
# Run benchmarks with `cargo bench`
@bench:
echo "----------\nasync-std:\n"
cargo bench
echo "\n----------\ntokio:\n"
cargo bench --no-default-features --features tokio-runtime
# Run benchmarks with `cargo criterion`
@criterion:
echo "----------\nasync-std:\n"
cargo criterion
echo "\n----------\ntokio:\n"
cargo criterion --no-default-features --features tokio-runtime
# Generate a changelog with git-cliff
changelog TAG:
git-cliff --prepend CHANGELOG.md -u --tag {{TAG}}
# Prepare a release
release *args:
cargo release --workspace {{args}}
# Install workspace tools
@install-tools:
cargo install cargo-nextest
cargo install cargo-release
cargo install git-cliff
cargo install cargo-criterion
# Lint and automatically fix what we can fix
@lint:
cargo clippy --fix --allow-dirty --allow-staged
cargo fmt

View File

@ -89,26 +89,10 @@ pub use async_std::task::spawn_blocking;
#[cfg(feature = "tokio")]
pub use tokio::task::spawn_blocking;
#[cfg(all(test, feature = "async-std"))]
pub use async_std::task::block_on;
#[cfg(all(test, feature = "tokio"))]
#[inline]
pub fn block_on<F, T>(future: F) -> T
where
F: std::future::Future<Output = T>,
{
tokio::runtime::Runtime::new().unwrap().block_on(future)
}
#[cfg(feature = "async-std")]
pub use async_std::task::JoinHandle;
#[cfg(feature = "async-std")]
#[inline]
pub async fn unwrap_joinhandle<R>(handle: async_std::task::JoinHandle<R>) -> R {
handle.await
}
#[cfg(feature = "async-std")]
#[inline]
pub fn unwrap_joinhandle_value<T>(value: T) -> T {
value
}
@ -116,11 +100,26 @@ pub fn unwrap_joinhandle_value<T>(value: T) -> T {
pub use tokio::task::JoinHandle;
#[cfg(feature = "tokio")]
#[inline]
pub async fn unwrap_joinhandle<R>(handle: tokio::task::JoinHandle<R>) -> R {
handle.await.unwrap()
}
#[cfg(feature = "tokio")]
#[inline]
pub fn unwrap_joinhandle_value<T>(value: Result<T, tokio::task::JoinError>) -> T {
value.unwrap()
}
use crate::errors::{Internal, InternalResult};
use tempfile::NamedTempFile;
#[cfg(feature = "async-std")]
#[inline]
pub async fn create_named_tempfile(tmp_path: std::path::PathBuf) -> InternalResult<NamedTempFile> {
spawn_blocking(|| NamedTempFile::new_in(tmp_path))
.await
.to_internal()
}
#[cfg(feature = "tokio")]
#[inline]
pub async fn create_named_tempfile(tmp_path: std::path::PathBuf) -> InternalResult<NamedTempFile> {
let tmpfile = spawn_blocking(|| NamedTempFile::new_in(tmp_path))
.await
.to_internal()?;
tmpfile.to_internal()
}

View File

@ -119,10 +119,7 @@ impl AsyncWriter {
.create(&tmp_path)
.await
.to_internal()?;
let tmpfile = crate::async_lib::unwrap_joinhandle(crate::async_lib::spawn_blocking(|| {
let mut tmpfile = task::spawn_blocking(|| NamedTempFile::new_in(tmp_path))
.await
.to_internal()?;
let mut tmpfile = crate::async_lib::create_named_tempfile(tmp_path).await?;
let mmap = if let Some(size) = size {
if size <= MAX_MMAP_SIZE {
tmpfile.as_file_mut().set_len(size as u64).to_internal()?;
@ -384,6 +381,12 @@ mod tests {
use super::*;
use crate::async_lib::AsyncWriteExt;
use tempfile;
#[cfg(feature = "async-std")]
use async_attributes::test as async_test;
#[cfg(feature = "tokio")]
use tokio::test as async_test;
#[test]
fn basic_write() {
let tmp = tempfile::tempdir().unwrap();
@ -398,21 +401,19 @@ mod tests {
);
}
#[test]
fn basic_async_write() {
#[async_test]
async fn basic_async_write() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
crate::async_lib::block_on(async {
let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None)
.await
.unwrap();
writer.write_all(b"hello world").await.unwrap();
let sri = writer.close().await.unwrap();
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());
assert_eq!(
std::fs::read(path::content_path(&dir, &sri)).unwrap(),
b"hello world"
);
});
let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None)
.await
.unwrap();
writer.write_all(b"hello world").await.unwrap();
let sri = writer.close().await.unwrap();
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());
assert_eq!(
std::fs::read(path::content_path(&dir, &sri)).unwrap(),
b"hello world"
);
}
}

View File

@ -341,6 +341,11 @@ mod tests {
use super::*;
use serde_json::json;
#[cfg(feature = "async-std")]
use async_attributes::test as async_test;
#[cfg(feature = "tokio")]
use tokio::test as async_test;
const MOCK_ENTRY: &str = "\n251d18a2b33264ea8655695fd23c88bd874cdea2c3dc9d8f9b7596717ad30fec\t{\"key\":\"hello\",\"integrity\":\"sha1-deadbeef\",\"time\":1234567,\"size\":0,\"metadata\":null}";
#[test]
@ -355,14 +360,14 @@ mod tests {
assert_eq!(entry, MOCK_ENTRY);
}
#[test]
fn insert_async_basic() {
#[async_test]
async fn insert_async_basic() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567;
let opts = WriteOpts::new().integrity(sri).time(time);
crate::async_lib::block_on(async {
futures::executor::block_on(async {
insert_async(&dir, "hello", opts).await.unwrap();
});
let entry = std::fs::read_to_string(bucket_path(&dir, "hello")).unwrap();
@ -410,15 +415,15 @@ mod tests {
assert_eq!(find(&dir, "hello").unwrap(), None);
}
#[test]
fn delete_async_basic() {
#[async_test]
async fn delete_async_basic() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567;
let opts = WriteOpts::new().integrity(sri).time(time);
insert(&dir, "hello", opts).unwrap();
crate::async_lib::block_on(async {
futures::executor::block_on(async {
delete_async(&dir, "hello").await.unwrap();
});
assert_eq!(find(&dir, "hello").unwrap(), None);
@ -445,18 +450,19 @@ mod tests {
);
}
#[test]
fn round_trip_async() {
#[async_test]
async fn round_trip_async() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567;
let opts = WriteOpts::new().integrity(sri.clone()).time(time);
crate::async_lib::block_on(async {
futures::executor::block_on(async {
insert_async(&dir, "hello", opts).await.unwrap();
});
let entry =
crate::async_lib::block_on(async { find_async(&dir, "hello").await.unwrap().unwrap() });
let entry = futures::executor::block_on(async {
find_async(&dir, "hello").await.unwrap().unwrap()
});
assert_eq!(
entry,
Metadata {

View File

@ -468,7 +468,7 @@ mod tests {
assert_eq!(result, original, "we did not read back what we wrote");
}
#[async_attributes::test]
#[async_test]
async fn hash_write_async() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();

View File

@ -182,9 +182,15 @@ pub fn clear_sync<P: AsRef<Path>>(cache: P) -> Result<()> {
#[cfg(test)]
mod tests {
#[test]
fn test_remove() {
crate::async_lib::block_on(async {
#[cfg(feature = "async-std")]
use async_attributes::test as async_test;
#[cfg(feature = "tokio")]
use tokio::test as async_test;
#[async_test]
async fn test_remove() {
futures::executor::block_on(async {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri = crate::write(&dir, "key", b"my-data").await.unwrap();
@ -199,9 +205,9 @@ mod tests {
});
}
#[test]
fn test_remove_data() {
crate::async_lib::block_on(async {
#[async_test]
async fn test_remove_data() {
futures::executor::block_on(async {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri = crate::write(&dir, "key", b"my-data").await.unwrap();
@ -216,9 +222,9 @@ mod tests {
});
}
#[test]
fn test_clear() {
crate::async_lib::block_on(async {
#[async_test]
async fn test_clear() {
futures::executor::block_on(async {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri = crate::write(&dir, "key", b"my-data").await.unwrap();