diff --git a/Cargo.toml b/Cargo.toml index 42dd6e7..17043d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,32 +8,41 @@ license = "Apache-2.0" repository = "https://github.com/zkat/cacache-rs" homepage = "https://github.com/zkat/cacache-rs" readme = "README.md" -categories = [ - "caching", - "filesystem" -] +categories = ["caching", "filesystem"] [dependencies] -ssri = "7.0.0" +async-attributes = { version = "1.1.2", optional = true } +async-std = { version = "1.10.0", features = ["unstable"], optional = true } +digest = "0.10.6" +either = "1.6.1" +futures = "0.3.17" hex = "0.4.3" -tempfile = "3.2.0" -sha-1 = "0.9.8" -sha2 = "0.9.8" -digest = "0.9.0" -serde_json = "1.0.68" +memmap2 = "0.5.8" serde = "1.0.130" serde_derive = "1.0.130" -walkdir = "2.3.2" -either = "1.6.1" -async-std = { version = "1.10.0", features = ["unstable"] } +serde_json = "1.0.68" +sha1 = "0.10.5" +sha2 = "0.10.6" +ssri = "7.0.0" +tempfile = "3.2.0" thiserror = "1.0.29" -futures = "0.3.17" -memmap2 = "0.5" +tokio = { version = "1.12.0", features = [ + "fs", + "io-util", + "macros", + "rt", + "rt-multi-thread", +], optional = true } +tokio-stream = { version = "0.1.7", features = ["io-util"], optional = true } +walkdir = "2.3.2" [dev-dependencies] -async-attributes = "1.1.2" -criterion = "0.3.5" +criterion = "0.4.0" [[bench]] name = "benchmarks" harness = false + +[features] +default = ["async-std", "async-attributes"] +tokio-runtime = ["tokio", "tokio-stream"] diff --git a/README.md b/README.md index 915aafa..1302cfd 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ Minimum supported Rust version is `1.43.0`. ## Features -- First-class async support, using [`async-std`](https://crates.io/crates/async-std) as its runtime. Sync APIs are available but secondary +- First-class async support, using either [`async-std`](https://crates.io/crates/async-std) or [`tokio`](https://crates.io/crates/tokio) as its runtime. Sync APIs are available but secondary - `std::fs`-style API - Extraction by key or by content address (shasum, etc) - [Subresource Integrity](#integrity) web standard support @@ -55,6 +55,13 @@ Minimum supported Rust version is `1.43.0`. - Cross-platform: Windows and case-(in)sensitive filesystem support - Punches nazis +`async-std` is the default async runtime. To use `tokio` instead, turn off default features and enable the `tokio-runtime` feature, like this: + +```toml +[dependencies] +cacache = { version = "*", default-features = false, features = ["tokio-runtime"] } +``` + ## Contributing The cacache team enthusiastically welcomes contributions and project participation! There's a bunch of things you can do if you want to contribute! The [Contributor Guide](CONTRIBUTING.md) has all the information you need for everything from reporting bugs to contributing entire new features. Please don't hesitate to jump in if you'd like to, or even ask us questions if something isn't clear. diff --git a/benches/benchmarks.rs b/benches/benchmarks.rs index 99c974c..4de127e 100644 --- a/benches/benchmarks.rs +++ b/benches/benchmarks.rs @@ -1,4 +1,19 @@ -use async_std::{fs as afs, task}; +#[cfg(feature = "async-std")] +use async_std::fs as afs; +#[cfg(all(test, feature = "tokio"))] +use tokio::fs as afs; + +#[cfg(all(test, feature = "async-std"))] +pub use async_std::task::block_on; +#[cfg(all(test, feature = "tokio"))] +#[inline] +pub fn block_on(future: F) -> T +where + F: std::future::Future, +{ + tokio::runtime::Runtime::new().unwrap().block_on(future) +} + use std::fs::{self, File}; use std::io::prelude::*; @@ -47,7 +62,7 @@ fn baseline_read_async(c: &mut Criterion) { fd.write_all(data).unwrap(); drop(fd); c.bench_function("baseline_read_async", move |b| { - b.iter(|| task::block_on(afs::read(&path))) + b.iter(|| block_on(afs::read(&path))) }); } @@ -66,7 +81,7 @@ fn baseline_read_many_async(c: &mut Criterion) { c.bench_function("baseline_read_many_async", move |b| { b.iter(|| { let tasks = paths.iter().map(|path| afs::read(black_box(path))); - task::block_on(futures::future::join_all(tasks)); + block_on(futures::future::join_all(tasks)); }) }); } @@ -137,7 +152,7 @@ fn read_hash_many_async(c: &mut Criterion) { let tasks = sris .iter() .map(|sri| cacache::read_hash(black_box(&cache), black_box(sri))); - task::block_on(futures::future::join_all(tasks)); + block_on(futures::future::join_all(tasks)); }) }); } @@ -148,7 +163,7 @@ fn read_hash_async(c: &mut Criterion) { let data = b"hello world".to_vec(); let sri = cacache::write_sync(&cache, "hello", data).unwrap(); c.bench_function("get::data_hash", move |b| { - b.iter(|| task::block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap()) + b.iter(|| block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap()) }); } @@ -158,7 +173,7 @@ fn read_async(c: &mut Criterion) { let data = b"hello world".to_vec(); cacache::write_sync(&cache, "hello", data).unwrap(); c.bench_function("get::data", move |b| { - b.iter(|| task::block_on(cacache::read(black_box(&cache), black_box("hello"))).unwrap()) + b.iter(|| block_on(cacache::read(black_box(&cache), black_box("hello"))).unwrap()) }); } @@ -168,7 +183,7 @@ fn read_hash_async_big_data(c: &mut Criterion) { let data = vec![1; 1024 * 1024 * 5]; let sri = cacache::write_sync(&cache, "hello", data).unwrap(); c.bench_function("get::data_big_data", move |b| { - b.iter(|| task::block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap()) + b.iter(|| block_on(cacache::read_hash(black_box(&cache), black_box(&sri))).unwrap()) }); } @@ -179,7 +194,7 @@ fn write_hash_async(c: &mut Criterion) { b.iter_custom(|iters| { let start = std::time::Instant::now(); for i in 0..iters { - task::block_on(cacache::write_hash(&cache, format!("hello world{}", i))).unwrap(); + block_on(cacache::write_hash(&cache, format!("hello world{}", i))).unwrap(); } start.elapsed() }) diff --git a/justfile b/justfile new file mode 100644 index 0000000..fb183c1 --- /dev/null +++ b/justfile @@ -0,0 +1,44 @@ +# List available just recipes +@help: + just -l + +# Run tests on both runtimes with cargo nextest +@test: + echo "----------\nasync-std:\n" + cargo nextest run + echo "\n----------\ntokio:\n" + cargo nextest run --no-default-features --features tokio-runtime + +# Run benchmarks with `cargo bench` +@bench: + echo "----------\nasync-std:\n" + cargo bench + echo "\n----------\ntokio:\n" + cargo bench --no-default-features --features tokio-runtime + +# Run benchmarks with `cargo criterion` +@criterion: + echo "----------\nasync-std:\n" + cargo criterion + echo "\n----------\ntokio:\n" + cargo criterion --no-default-features --features tokio-runtime + +# Generate a changelog with git-cliff +changelog TAG: + git-cliff --prepend CHANGELOG.md -u --tag {{TAG}} + +# Prepare a release +release *args: + cargo release --workspace {{args}} + +# Install workspace tools +@install-tools: + cargo install cargo-nextest + cargo install cargo-release + cargo install git-cliff + cargo install cargo-criterion + +# Lint and automatically fix what we can fix +@lint: + cargo clippy --fix --allow-dirty --allow-staged + cargo fmt diff --git a/src/async_lib.rs b/src/async_lib.rs new file mode 100644 index 0000000..b138ddc --- /dev/null +++ b/src/async_lib.rs @@ -0,0 +1,125 @@ +#[cfg(feature = "async-std")] +pub use async_std::fs::File; +#[cfg(feature = "tokio")] +pub use tokio::fs::File; + +#[cfg(feature = "async-std")] +pub use futures::io::AsyncRead; +#[cfg(feature = "tokio")] +pub use tokio::io::AsyncRead; + +#[cfg(feature = "async-std")] +pub use futures::io::AsyncReadExt; +#[cfg(feature = "tokio")] +pub use tokio::io::AsyncReadExt; + +#[cfg(feature = "async-std")] +pub use futures::io::AsyncBufReadExt; +#[cfg(feature = "tokio")] +pub use tokio::io::AsyncBufReadExt; + +#[cfg(feature = "async-std")] +pub use futures::io::AsyncWrite; +#[cfg(feature = "tokio")] +pub use tokio::io::AsyncWrite; + +#[cfg(feature = "async-std")] +pub use futures::io::AsyncWriteExt; +#[cfg(feature = "tokio")] +pub use tokio::io::AsyncWriteExt; + +#[cfg(feature = "async-std")] +pub use async_std::fs::read; +#[cfg(feature = "tokio")] +pub use tokio::fs::read; + +#[cfg(feature = "async-std")] +pub use async_std::fs::copy; +#[cfg(feature = "tokio")] +pub use tokio::fs::copy; + +#[cfg(feature = "async-std")] +pub use async_std::fs::metadata; +#[cfg(feature = "tokio")] +pub use tokio::fs::metadata; + +#[cfg(feature = "async-std")] +pub use async_std::fs::remove_file; +#[cfg(feature = "tokio")] +pub use tokio::fs::remove_file; + +#[cfg(feature = "async-std")] +pub use async_std::fs::create_dir_all; +#[cfg(feature = "tokio")] +pub use tokio::fs::create_dir_all; + +#[cfg(feature = "async-std")] +pub use async_std::fs::remove_dir_all; +#[cfg(feature = "tokio")] +pub use tokio::fs::remove_dir_all; + +#[cfg(feature = "async-std")] +pub use async_std::fs::DirBuilder; +#[cfg(feature = "tokio")] +pub use tokio::fs::DirBuilder; + +#[cfg(feature = "async-std")] +pub use async_std::fs::OpenOptions; +#[cfg(feature = "tokio")] +pub use tokio::fs::OpenOptions; + +#[cfg(feature = "async-std")] +pub use async_std::io::BufReader; +#[cfg(feature = "tokio")] +pub use tokio::io::BufReader; + +#[cfg(feature = "async-std")] +#[inline] +pub fn lines_to_stream(lines: futures::io::Lines) -> futures::io::Lines { + lines +} +#[cfg(feature = "tokio")] +#[inline] +pub fn lines_to_stream(lines: tokio::io::Lines) -> tokio_stream::wrappers::LinesStream { + tokio_stream::wrappers::LinesStream::new(lines) +} + +#[cfg(feature = "async-std")] +pub use async_std::task::spawn_blocking; +#[cfg(feature = "tokio")] +pub use tokio::task::spawn_blocking; + +#[cfg(feature = "async-std")] +pub use async_std::task::JoinHandle; +#[cfg(feature = "async-std")] +#[inline] +pub fn unwrap_joinhandle_value(value: T) -> T { + value +} +#[cfg(feature = "tokio")] +pub use tokio::task::JoinHandle; +#[cfg(feature = "tokio")] +#[inline] +pub fn unwrap_joinhandle_value(value: Result) -> T { + value.unwrap() +} + +use crate::errors::{Internal, InternalResult}; +use tempfile::NamedTempFile; + +#[cfg(feature = "async-std")] +#[inline] +pub async fn create_named_tempfile(tmp_path: std::path::PathBuf) -> InternalResult { + spawn_blocking(|| NamedTempFile::new_in(tmp_path)) + .await + .to_internal() +} + +#[cfg(feature = "tokio")] +#[inline] +pub async fn create_named_tempfile(tmp_path: std::path::PathBuf) -> InternalResult { + let tmpfile = spawn_blocking(|| NamedTempFile::new_in(tmp_path)) + .await + .to_internal()?; + tmpfile.to_internal() +} diff --git a/src/content/read.rs b/src/content/read.rs index 816d035..dd34643 100644 --- a/src/content/read.rs +++ b/src/content/read.rs @@ -3,10 +3,9 @@ use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; -use async_std; -use futures::prelude::*; use ssri::{Algorithm, Integrity, IntegrityChecker}; +use crate::async_lib::AsyncRead; use crate::content::path; use crate::errors::{Internal, Result}; @@ -30,11 +29,12 @@ impl Reader { } pub struct AsyncReader { - fd: async_std::fs::File, + fd: crate::async_lib::File, checker: IntegrityChecker, } impl AsyncRead for AsyncReader { + #[cfg(feature = "async-std")] fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -44,6 +44,22 @@ impl AsyncRead for AsyncReader { self.checker.input(&buf[..amt]); Poll::Ready(Ok(amt)) } + + #[cfg(feature = "tokio")] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let pre_len = buf.filled().len(); + futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?; + let post_len = buf.filled().len(); + if post_len - pre_len == 0 { + return Poll::Ready(Ok(())); + } + self.checker.input(&buf.filled()[pre_len..]); + Poll::Ready(Ok(())) + } } impl AsyncReader { @@ -63,7 +79,7 @@ pub fn open(cache: &Path, sri: Integrity) -> Result { pub async fn open_async(cache: &Path, sri: Integrity) -> Result { let cpath = path::content_path(cache, &sri); Ok(AsyncReader { - fd: async_std::fs::File::open(cpath).await.to_internal()?, + fd: crate::async_lib::File::open(cpath).await.to_internal()?, checker: IntegrityChecker::new(sri), }) } @@ -77,7 +93,7 @@ pub fn read(cache: &Path, sri: &Integrity) -> Result> { pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result> { let cpath = path::content_path(cache, sri); - let ret = async_std::fs::read(&cpath).await.to_internal()?; + let ret = crate::async_lib::read(&cpath).await.to_internal()?; sri.check(&ret)?; Ok(ret) } @@ -92,8 +108,8 @@ pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result { pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result { let cpath = path::content_path(cache, sri); - let ret = async_std::fs::copy(&cpath, to).await.to_internal()?; - let data = async_std::fs::read(cpath).await.to_internal()?; + let ret = crate::async_lib::copy(&cpath, to).await.to_internal()?; + let data = crate::async_lib::read(cpath).await.to_internal()?; sri.check(data)?; Ok(ret) } @@ -107,7 +123,7 @@ pub fn has_content(cache: &Path, sri: &Integrity) -> Option { } pub async fn has_content_async(cache: &Path, sri: &Integrity) -> Option { - if async_std::fs::metadata(path::content_path(cache, sri)) + if crate::async_lib::metadata(path::content_path(cache, sri)) .await .is_ok() { diff --git a/src/content/rm.rs b/src/content/rm.rs index 7a60869..3a0509d 100644 --- a/src/content/rm.rs +++ b/src/content/rm.rs @@ -1,7 +1,6 @@ use std::fs; use std::path::Path; -use async_std::fs as afs; use ssri::Integrity; use crate::content::path; @@ -13,7 +12,7 @@ pub fn rm(cache: &Path, sri: &Integrity) -> Result<()> { } pub async fn rm_async(cache: &Path, sri: &Integrity) -> Result<()> { - afs::remove_file(path::content_path(cache, sri)) + crate::async_lib::remove_file(path::content_path(cache, sri)) .await .to_internal()?; Ok(()) diff --git a/src/content/write.rs b/src/content/write.rs index f446f95..c542c3f 100644 --- a/src/content/write.rs +++ b/src/content/write.rs @@ -3,16 +3,14 @@ use std::io::prelude::*; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Mutex; +use std::task::{Context, Poll}; -use async_std::fs as afs; -use async_std::future::Future; -use async_std::task::{self, Context, JoinHandle, Poll}; -use futures::io::AsyncWrite; use futures::prelude::*; use memmap2::MmapMut; use ssri::{Algorithm, Integrity, IntegrityOpts}; use tempfile::NamedTempFile; +use crate::async_lib::{AsyncWrite, JoinHandle}; use crate::content::path; use crate::errors::{Internal, Result}; @@ -116,14 +114,12 @@ impl AsyncWriter { let cache_path = cache.to_path_buf(); let mut tmp_path = cache_path.clone(); tmp_path.push("tmp"); - afs::DirBuilder::new() + crate::async_lib::DirBuilder::new() .recursive(true) .create(&tmp_path) .await .to_internal()?; - let mut tmpfile = task::spawn_blocking(|| NamedTempFile::new_in(tmp_path)) - .await - .to_internal()?; + let mut tmpfile = crate::async_lib::create_named_tempfile(tmp_path).await?; let mmap = if let Some(size) = size { if size <= MAX_MMAP_SIZE { tmpfile.as_file_mut().set_len(size as u64).to_internal()?; @@ -162,7 +158,7 @@ impl AsyncWriter { let cpath = path::content_path(&inner.cache, &sri); // Start the operation asynchronously. - *state = State::Busy(task::spawn_blocking(|| { + *state = State::Busy(crate::async_lib::spawn_blocking(|| { let res = std::fs::DirBuilder::new() .recursive(true) // Safe unwrap. cpath always has multiple segments @@ -204,7 +200,11 @@ impl AsyncWriter { } }, // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => { + *state = crate::async_lib::unwrap_joinhandle_value(futures::ready!( + Pin::new(task).poll(cx) + )) + } } } }) @@ -255,7 +255,7 @@ impl AsyncWrite for AsyncWriter { inner.buf[..buf.len()].copy_from_slice(buf); // Start the operation asynchronously. - *state = State::Busy(task::spawn_blocking(|| { + *state = State::Busy(crate::async_lib::spawn_blocking(|| { inner.builder.input(&inner.buf); if let Some(mmap) = &mut inner.mmap { mmap.copy_from_slice(&inner.buf); @@ -270,7 +270,12 @@ impl AsyncWrite for AsyncWriter { } } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => { + *state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new( + task + ) + .poll(cx))) + } } } } @@ -302,7 +307,7 @@ impl AsyncWrite for AsyncWriter { } // Start the operation asynchronously. - *state = State::Busy(task::spawn_blocking(|| { + *state = State::Busy(crate::async_lib::spawn_blocking(|| { let res = inner.tmpfile.flush(); inner.last_op = Some(Operation::Flush(res)); State::Idle(Some(inner)) @@ -310,12 +315,33 @@ impl AsyncWrite for AsyncWriter { } } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => { + *state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new( + task + ) + .poll(cx))) + } } } } + #[cfg(feature = "async-std")] fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_close_impl(cx) + } + + #[cfg(feature = "tokio")] + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_close_impl(cx) + } +} + +impl AsyncWriter { + #[inline] + fn poll_close_impl( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { let state = &mut *self.0.lock().unwrap(); loop { @@ -329,13 +355,18 @@ impl AsyncWrite for AsyncWriter { }; // Start the operation asynchronously. - *state = State::Busy(task::spawn_blocking(|| { + *state = State::Busy(crate::async_lib::spawn_blocking(|| { drop(inner); State::Idle(None) })); } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => { + *state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new( + task + ) + .poll(cx))) + } } } } @@ -348,8 +379,14 @@ fn io_error(err: impl Into>) -> std::io #[cfg(test)] mod tests { use super::*; - use async_std::task; + use crate::async_lib::AsyncWriteExt; use tempfile; + + #[cfg(feature = "async-std")] + use async_attributes::test as async_test; + #[cfg(feature = "tokio")] + use tokio::test as async_test; + #[test] fn basic_write() { let tmp = tempfile::tempdir().unwrap(); @@ -364,21 +401,19 @@ mod tests { ); } - #[test] - fn basic_async_write() { + #[async_test] + async fn basic_async_write() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); - task::block_on(async { - let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None) - .await - .unwrap(); - writer.write_all(b"hello world").await.unwrap(); - let sri = writer.close().await.unwrap(); - assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string()); - assert_eq!( - std::fs::read(path::content_path(&dir, &sri)).unwrap(), - b"hello world" - ); - }); + let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None) + .await + .unwrap(); + writer.write_all(b"hello world").await.unwrap(); + let sri = writer.close().await.unwrap(); + assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string()); + assert_eq!( + std::fs::read(path::content_path(&dir, &sri)).unwrap(), + b"hello world" + ); } } diff --git a/src/get.rs b/src/get.rs index 55432d1..b29cae3 100644 --- a/src/get.rs +++ b/src/get.rs @@ -3,10 +3,9 @@ use std::path::Path; use std::pin::Pin; use std::task::{Context as TaskContext, Poll}; -use futures::prelude::*; - use ssri::{Algorithm, Integrity}; +use crate::async_lib::AsyncRead; use crate::content::read; use crate::errors::{Error, Result}; use crate::index::{self, Metadata}; @@ -24,6 +23,7 @@ pub struct Reader { } impl AsyncRead for Reader { + #[cfg(feature = "async-std")] fn poll_read( mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>, @@ -31,6 +31,15 @@ impl AsyncRead for Reader { ) -> Poll> { Pin::new(&mut self.reader).poll_read(cx, buf) } + + #[cfg(feature = "tokio")] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.reader).poll_read(cx, buf) + } } impl Reader { @@ -457,11 +466,15 @@ pub fn exists_sync>(cache: P, sri: &Integrity) -> bool { #[cfg(test)] mod tests { - use async_std::fs as afs; - use async_std::prelude::*; + use crate::async_lib::AsyncReadExt; use std::fs; - #[async_attributes::test] + #[cfg(feature = "async-std")] + use async_attributes::test as async_test; + #[cfg(feature = "tokio")] + use tokio::test as async_test; + + #[async_test] async fn test_open() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); @@ -474,7 +487,7 @@ mod tests { assert_eq!(str, String::from("hello world")); } - #[async_attributes::test] + #[async_test] async fn test_open_hash() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); @@ -515,7 +528,7 @@ mod tests { assert_eq!(str, String::from("hello world")); } - #[async_attributes::test] + #[async_test] async fn test_read() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); @@ -525,7 +538,7 @@ mod tests { assert_eq!(data, b"hello world"); } - #[async_attributes::test] + #[async_test] async fn test_read_hash() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); @@ -555,7 +568,7 @@ mod tests { assert_eq!(data, b"hello world"); } - #[async_attributes::test] + #[async_test] async fn test_copy() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path(); @@ -563,11 +576,11 @@ mod tests { crate::write(&dir, "my-key", b"hello world").await.unwrap(); crate::copy(&dir, "my-key", &dest).await.unwrap(); - let data = afs::read(&dest).await.unwrap(); + let data = crate::async_lib::read(&dest).await.unwrap(); assert_eq!(data, b"hello world"); } - #[async_attributes::test] + #[async_test] async fn test_copy_hash() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path(); @@ -575,7 +588,7 @@ mod tests { let sri = crate::write(&dir, "my-key", b"hello world").await.unwrap(); crate::copy_hash(&dir, &sri, &dest).await.unwrap(); - let data = afs::read(&dest).await.unwrap(); + let data = crate::async_lib::read(&dest).await.unwrap(); assert_eq!(data, b"hello world"); } diff --git a/src/index.rs b/src/index.rs index 274c679..2f1a0b1 100644 --- a/src/index.rs +++ b/src/index.rs @@ -5,11 +5,8 @@ use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::time::{SystemTime, UNIX_EPOCH}; -use async_std::fs as afs; -use async_std::io::BufReader; use digest::Digest; use either::{Left, Right}; -use futures::io::{AsyncBufReadExt, AsyncWriteExt}; use futures::stream::StreamExt; use serde_derive::{Deserialize, Serialize}; use serde_json::Value; @@ -18,6 +15,7 @@ use sha2::Sha256; use ssri::Integrity; use walkdir::WalkDir; +use crate::async_lib::{AsyncBufReadExt, AsyncWriteExt}; use crate::errors::{Internal, InternalResult, Result}; use crate::put::WriteOpts; @@ -97,7 +95,7 @@ pub fn insert(cache: &Path, key: &str, opts: WriteOpts) -> Result { pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: WriteOpts) -> Result { let bucket = bucket_path(cache, key); - afs::create_dir_all(bucket.parent().unwrap()) + crate::async_lib::create_dir_all(bucket.parent().unwrap()) .await .with_context(|| { format!( @@ -114,7 +112,7 @@ pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: WriteOpts) -> }) .with_context(|| format!("Failed to serialize entry with key `{}`", key))?; - let mut buck = async_std::fs::OpenOptions::new() + let mut buck = crate::async_lib::OpenOptions::new() .create(true) .append(true) .open(&bucket) @@ -311,7 +309,7 @@ fn bucket_entries(bucket: &Path) -> InternalResult> { } async fn bucket_entries_async(bucket: &Path) -> InternalResult> { - let file_result = afs::File::open(bucket).await; + let file_result = crate::async_lib::File::open(bucket).await; let file = if let Err(err) = file_result { if err.kind() == ErrorKind::NotFound { return Ok(Vec::new()); @@ -321,7 +319,8 @@ async fn bucket_entries_async(bucket: &Path) -> InternalResult>()[..] { @@ -340,9 +339,13 @@ async fn bucket_entries_async(bucket: &Path) -> InternalResult, cx: &mut TaskContext<'_>) -> Poll> { Pin::new(&mut self.writer).poll_close(cx) } + + #[cfg(feature = "tokio")] + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + ) -> Poll> { + Pin::new(&mut self.writer).poll_shutdown(cx) + } } impl Writer { @@ -423,7 +431,12 @@ impl SyncWriter { #[cfg(test)] mod tests { - #[async_attributes::test] + #[cfg(feature = "async-std")] + use async_attributes::test as async_test; + #[cfg(feature = "tokio")] + use tokio::test as async_test; + + #[async_test] async fn round_trip() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); @@ -455,7 +468,7 @@ mod tests { assert_eq!(result, original, "we did not read back what we wrote"); } - #[async_attributes::test] + #[async_test] async fn hash_write_async() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); diff --git a/src/rm.rs b/src/rm.rs index c185e3d..48b76aa 100644 --- a/src/rm.rs +++ b/src/rm.rs @@ -2,8 +2,6 @@ use std::fs; use std::path::Path; -use async_std::fs as afs; - use ssri::Integrity; use crate::content::rm; @@ -93,7 +91,9 @@ pub async fn remove_hash>(cache: P, sri: &Integrity) -> Result<() /// ``` pub async fn clear>(cache: P) -> Result<()> { for entry in (cache.as_ref().read_dir().to_internal()?).flatten() { - afs::remove_dir_all(entry.path()).await.to_internal()?; + crate::async_lib::remove_dir_all(entry.path()) + .await + .to_internal()?; } Ok(()) } @@ -182,11 +182,15 @@ pub fn clear_sync>(cache: P) -> Result<()> { #[cfg(test)] mod tests { - use async_std::task; - #[test] - fn test_remove() { - task::block_on(async { + #[cfg(feature = "async-std")] + use async_attributes::test as async_test; + #[cfg(feature = "tokio")] + use tokio::test as async_test; + + #[async_test] + async fn test_remove() { + futures::executor::block_on(async { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); let sri = crate::write(&dir, "key", b"my-data").await.unwrap(); @@ -201,9 +205,9 @@ mod tests { }); } - #[test] - fn test_remove_data() { - task::block_on(async { + #[async_test] + async fn test_remove_data() { + futures::executor::block_on(async { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); let sri = crate::write(&dir, "key", b"my-data").await.unwrap(); @@ -218,9 +222,9 @@ mod tests { }); } - #[test] - fn test_clear() { - task::block_on(async { + #[async_test] + async fn test_clear() { + futures::executor::block_on(async { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); let sri = crate::write(&dir, "key", b"my-data").await.unwrap();