diff --git a/Cargo.toml b/Cargo.toml index 42dd6e7..4a6e102 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,13 +27,28 @@ walkdir = "2.3.2" either = "1.6.1" async-std = { version = "1.10.0", features = ["unstable"] } thiserror = "1.0.29" +tokio = { version = "1.12.0", features = [ + "fs", + "io-util", + "macros", + "rt", + "rt-multi-thread", +], optional = true } +tokio-stream = { version = "0.1.7", features = ["io-util"], optional = true } + +[features] futures = "0.3.17" memmap2 = "0.5" [dev-dependencies] +async-std = { version = "1.10.0", features = ["unstable"] } async-attributes = "1.1.2" criterion = "0.3.5" [[bench]] name = "benchmarks" harness = false + +[features] +default = ["async-std"] +tokio-runtime = ["tokio", "tokio-stream"] diff --git a/README.md b/README.md index 915aafa..790d2d8 100644 --- a/README.md +++ b/README.md @@ -38,7 +38,7 @@ Minimum supported Rust version is `1.43.0`. ## Features -- First-class async support, using [`async-std`](https://crates.io/crates/async-std) as its runtime. Sync APIs are available but secondary +- First-class async support, using either [`async-std`](https://crates.io/crates/async-std) or [`tokio`](https://crates.io/crates/tokio) as its runtime. Sync APIs are available but secondary - `std::fs`-style API - Extraction by key or by content address (shasum, etc) - [Subresource Integrity](#integrity) web standard support diff --git a/src/async_lib.rs b/src/async_lib.rs new file mode 100644 index 0000000..97bd06e --- /dev/null +++ b/src/async_lib.rs @@ -0,0 +1,126 @@ +#[cfg(feature = "async-std")] +pub use async_std::fs::File; +#[cfg(feature = "tokio")] +pub use tokio::fs::File; + +#[cfg(feature = "async-std")] +pub use futures::io::AsyncRead; +#[cfg(feature = "tokio")] +pub use tokio::io::AsyncRead; + +#[cfg(feature = "async-std")] +pub use futures::io::AsyncReadExt; +#[cfg(feature = "tokio")] +pub use tokio::io::AsyncReadExt; + +#[cfg(feature = "async-std")] +pub use futures::io::AsyncBufReadExt; +#[cfg(feature = "tokio")] +pub use tokio::io::AsyncBufReadExt; + +#[cfg(feature = "async-std")] +pub use futures::io::AsyncWrite; +#[cfg(feature = "tokio")] +pub use tokio::io::AsyncWrite; + +#[cfg(feature = "async-std")] +pub use futures::io::AsyncWriteExt; +#[cfg(feature = "tokio")] +pub use tokio::io::AsyncWriteExt; + +#[cfg(feature = "async-std")] +pub use async_std::fs::read; +#[cfg(feature = "tokio")] +pub use tokio::fs::read; + +#[cfg(feature = "async-std")] +pub use async_std::fs::copy; +#[cfg(feature = "tokio")] +pub use tokio::fs::copy; + +#[cfg(feature = "async-std")] +pub use async_std::fs::metadata; +#[cfg(feature = "tokio")] +pub use tokio::fs::metadata; + +#[cfg(feature = "async-std")] +pub use async_std::fs::remove_file; +#[cfg(feature = "tokio")] +pub use tokio::fs::remove_file; + +#[cfg(feature = "async-std")] +pub use async_std::fs::create_dir_all; +#[cfg(feature = "tokio")] +pub use tokio::fs::create_dir_all; + +#[cfg(feature = "async-std")] +pub use async_std::fs::remove_dir_all; +#[cfg(feature = "tokio")] +pub use tokio::fs::remove_dir_all; + +#[cfg(feature = "async-std")] +pub use async_std::fs::DirBuilder; +#[cfg(feature = "tokio")] +pub use tokio::fs::DirBuilder; + +#[cfg(feature = "async-std")] +pub use async_std::fs::OpenOptions; +#[cfg(feature = "tokio")] +pub use tokio::fs::OpenOptions; + +#[cfg(feature = "async-std")] +pub use async_std::io::BufReader; +#[cfg(feature = "tokio")] +pub use tokio::io::BufReader; + +#[cfg(feature = "async-std")] +#[inline] +pub fn lines_to_stream(lines: futures::io::Lines) -> futures::io::Lines { + lines +} +#[cfg(feature = "tokio")] +#[inline] +pub fn lines_to_stream(lines: tokio::io::Lines) -> tokio_stream::wrappers::LinesStream { + tokio_stream::wrappers::LinesStream::new(lines) +} + +#[cfg(feature = "async-std")] +pub use async_std::task::spawn_blocking; +#[cfg(feature = "tokio")] +pub use tokio::task::spawn_blocking; + +#[cfg(all(test, feature = "async-std"))] +pub use async_std::task::block_on; +#[cfg(all(test, feature = "tokio"))] +#[inline] +pub fn block_on(future: F) -> T +where + F: std::future::Future, +{ + tokio::runtime::Runtime::new().unwrap().block_on(future) +} + +#[cfg(feature = "async-std")] +pub use async_std::task::JoinHandle; +#[cfg(feature = "async-std")] +#[inline] +pub async fn unwrap_joinhandle(handle: async_std::task::JoinHandle) -> R { + handle.await +} +#[cfg(feature = "async-std")] +#[inline] +pub fn unwrap_joinhandle_value(value: T) -> T { + value +} +#[cfg(feature = "tokio")] +pub use tokio::task::JoinHandle; +#[cfg(feature = "tokio")] +#[inline] +pub async fn unwrap_joinhandle(handle: tokio::task::JoinHandle) -> R { + handle.await.unwrap() +} +#[cfg(feature = "tokio")] +#[inline] +pub fn unwrap_joinhandle_value(value: Result) -> T { + value.unwrap() +} diff --git a/src/content/read.rs b/src/content/read.rs index 816d035..dd34643 100644 --- a/src/content/read.rs +++ b/src/content/read.rs @@ -3,10 +3,9 @@ use std::path::Path; use std::pin::Pin; use std::task::{Context, Poll}; -use async_std; -use futures::prelude::*; use ssri::{Algorithm, Integrity, IntegrityChecker}; +use crate::async_lib::AsyncRead; use crate::content::path; use crate::errors::{Internal, Result}; @@ -30,11 +29,12 @@ impl Reader { } pub struct AsyncReader { - fd: async_std::fs::File, + fd: crate::async_lib::File, checker: IntegrityChecker, } impl AsyncRead for AsyncReader { + #[cfg(feature = "async-std")] fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -44,6 +44,22 @@ impl AsyncRead for AsyncReader { self.checker.input(&buf[..amt]); Poll::Ready(Ok(amt)) } + + #[cfg(feature = "tokio")] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let pre_len = buf.filled().len(); + futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?; + let post_len = buf.filled().len(); + if post_len - pre_len == 0 { + return Poll::Ready(Ok(())); + } + self.checker.input(&buf.filled()[pre_len..]); + Poll::Ready(Ok(())) + } } impl AsyncReader { @@ -63,7 +79,7 @@ pub fn open(cache: &Path, sri: Integrity) -> Result { pub async fn open_async(cache: &Path, sri: Integrity) -> Result { let cpath = path::content_path(cache, &sri); Ok(AsyncReader { - fd: async_std::fs::File::open(cpath).await.to_internal()?, + fd: crate::async_lib::File::open(cpath).await.to_internal()?, checker: IntegrityChecker::new(sri), }) } @@ -77,7 +93,7 @@ pub fn read(cache: &Path, sri: &Integrity) -> Result> { pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result> { let cpath = path::content_path(cache, sri); - let ret = async_std::fs::read(&cpath).await.to_internal()?; + let ret = crate::async_lib::read(&cpath).await.to_internal()?; sri.check(&ret)?; Ok(ret) } @@ -92,8 +108,8 @@ pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result { pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result { let cpath = path::content_path(cache, sri); - let ret = async_std::fs::copy(&cpath, to).await.to_internal()?; - let data = async_std::fs::read(cpath).await.to_internal()?; + let ret = crate::async_lib::copy(&cpath, to).await.to_internal()?; + let data = crate::async_lib::read(cpath).await.to_internal()?; sri.check(data)?; Ok(ret) } @@ -107,7 +123,7 @@ pub fn has_content(cache: &Path, sri: &Integrity) -> Option { } pub async fn has_content_async(cache: &Path, sri: &Integrity) -> Option { - if async_std::fs::metadata(path::content_path(cache, sri)) + if crate::async_lib::metadata(path::content_path(cache, sri)) .await .is_ok() { diff --git a/src/content/rm.rs b/src/content/rm.rs index 7a60869..3a0509d 100644 --- a/src/content/rm.rs +++ b/src/content/rm.rs @@ -1,7 +1,6 @@ use std::fs; use std::path::Path; -use async_std::fs as afs; use ssri::Integrity; use crate::content::path; @@ -13,7 +12,7 @@ pub fn rm(cache: &Path, sri: &Integrity) -> Result<()> { } pub async fn rm_async(cache: &Path, sri: &Integrity) -> Result<()> { - afs::remove_file(path::content_path(cache, sri)) + crate::async_lib::remove_file(path::content_path(cache, sri)) .await .to_internal()?; Ok(()) diff --git a/src/content/write.rs b/src/content/write.rs index f446f95..0a6e236 100644 --- a/src/content/write.rs +++ b/src/content/write.rs @@ -3,11 +3,9 @@ use std::io::prelude::*; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::sync::Mutex; +use std::task::{Context, Poll}; -use async_std::fs as afs; -use async_std::future::Future; -use async_std::task::{self, Context, JoinHandle, Poll}; -use futures::io::AsyncWrite; +use crate::async_lib::{AsyncWrite, JoinHandle}; use futures::prelude::*; use memmap2::MmapMut; use ssri::{Algorithm, Integrity, IntegrityOpts}; @@ -116,11 +114,12 @@ impl AsyncWriter { let cache_path = cache.to_path_buf(); let mut tmp_path = cache_path.clone(); tmp_path.push("tmp"); - afs::DirBuilder::new() + crate::async_lib::DirBuilder::new() .recursive(true) .create(&tmp_path) .await .to_internal()?; + let tmpfile = crate::async_lib::unwrap_joinhandle(crate::async_lib::spawn_blocking(|| { let mut tmpfile = task::spawn_blocking(|| NamedTempFile::new_in(tmp_path)) .await .to_internal()?; @@ -162,7 +161,7 @@ impl AsyncWriter { let cpath = path::content_path(&inner.cache, &sri); // Start the operation asynchronously. - *state = State::Busy(task::spawn_blocking(|| { + *state = State::Busy(crate::async_lib::spawn_blocking(|| { let res = std::fs::DirBuilder::new() .recursive(true) // Safe unwrap. cpath always has multiple segments @@ -204,7 +203,11 @@ impl AsyncWriter { } }, // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => { + *state = crate::async_lib::unwrap_joinhandle_value(futures::ready!( + Pin::new(task).poll(cx) + )) + } } } }) @@ -255,7 +258,7 @@ impl AsyncWrite for AsyncWriter { inner.buf[..buf.len()].copy_from_slice(buf); // Start the operation asynchronously. - *state = State::Busy(task::spawn_blocking(|| { + *state = State::Busy(crate::async_lib::spawn_blocking(|| { inner.builder.input(&inner.buf); if let Some(mmap) = &mut inner.mmap { mmap.copy_from_slice(&inner.buf); @@ -270,7 +273,12 @@ impl AsyncWrite for AsyncWriter { } } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => { + *state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new( + task + ) + .poll(cx))) + } } } } @@ -302,7 +310,7 @@ impl AsyncWrite for AsyncWriter { } // Start the operation asynchronously. - *state = State::Busy(task::spawn_blocking(|| { + *state = State::Busy(crate::async_lib::spawn_blocking(|| { let res = inner.tmpfile.flush(); inner.last_op = Some(Operation::Flush(res)); State::Idle(Some(inner)) @@ -310,12 +318,33 @@ impl AsyncWrite for AsyncWriter { } } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => { + *state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new( + task + ) + .poll(cx))) + } } } } + #[cfg(feature = "async-std")] fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_close_impl(cx) + } + + #[cfg(feature = "tokio")] + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.poll_close_impl(cx) + } +} + +impl AsyncWriter { + #[inline] + fn poll_close_impl( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { let state = &mut *self.0.lock().unwrap(); loop { @@ -329,13 +358,18 @@ impl AsyncWrite for AsyncWriter { }; // Start the operation asynchronously. - *state = State::Busy(task::spawn_blocking(|| { + *state = State::Busy(crate::async_lib::spawn_blocking(|| { drop(inner); State::Idle(None) })); } // Poll the asynchronous operation the file is currently blocked on. - State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)), + State::Busy(task) => { + *state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new( + task + ) + .poll(cx))) + } } } } @@ -348,7 +382,7 @@ fn io_error(err: impl Into>) -> std::io #[cfg(test)] mod tests { use super::*; - use async_std::task; + use crate::async_lib::AsyncWriteExt; use tempfile; #[test] fn basic_write() { @@ -368,7 +402,7 @@ mod tests { fn basic_async_write() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); - task::block_on(async { + crate::async_lib::block_on(async { let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None) .await .unwrap(); diff --git a/src/get.rs b/src/get.rs index 55432d1..b29cae3 100644 --- a/src/get.rs +++ b/src/get.rs @@ -3,10 +3,9 @@ use std::path::Path; use std::pin::Pin; use std::task::{Context as TaskContext, Poll}; -use futures::prelude::*; - use ssri::{Algorithm, Integrity}; +use crate::async_lib::AsyncRead; use crate::content::read; use crate::errors::{Error, Result}; use crate::index::{self, Metadata}; @@ -24,6 +23,7 @@ pub struct Reader { } impl AsyncRead for Reader { + #[cfg(feature = "async-std")] fn poll_read( mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>, @@ -31,6 +31,15 @@ impl AsyncRead for Reader { ) -> Poll> { Pin::new(&mut self.reader).poll_read(cx, buf) } + + #[cfg(feature = "tokio")] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut self.reader).poll_read(cx, buf) + } } impl Reader { @@ -457,11 +466,15 @@ pub fn exists_sync>(cache: P, sri: &Integrity) -> bool { #[cfg(test)] mod tests { - use async_std::fs as afs; - use async_std::prelude::*; + use crate::async_lib::AsyncReadExt; use std::fs; - #[async_attributes::test] + #[cfg(feature = "async-std")] + use async_attributes::test as async_test; + #[cfg(feature = "tokio")] + use tokio::test as async_test; + + #[async_test] async fn test_open() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); @@ -474,7 +487,7 @@ mod tests { assert_eq!(str, String::from("hello world")); } - #[async_attributes::test] + #[async_test] async fn test_open_hash() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); @@ -515,7 +528,7 @@ mod tests { assert_eq!(str, String::from("hello world")); } - #[async_attributes::test] + #[async_test] async fn test_read() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); @@ -525,7 +538,7 @@ mod tests { assert_eq!(data, b"hello world"); } - #[async_attributes::test] + #[async_test] async fn test_read_hash() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); @@ -555,7 +568,7 @@ mod tests { assert_eq!(data, b"hello world"); } - #[async_attributes::test] + #[async_test] async fn test_copy() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path(); @@ -563,11 +576,11 @@ mod tests { crate::write(&dir, "my-key", b"hello world").await.unwrap(); crate::copy(&dir, "my-key", &dest).await.unwrap(); - let data = afs::read(&dest).await.unwrap(); + let data = crate::async_lib::read(&dest).await.unwrap(); assert_eq!(data, b"hello world"); } - #[async_attributes::test] + #[async_test] async fn test_copy_hash() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path(); @@ -575,7 +588,7 @@ mod tests { let sri = crate::write(&dir, "my-key", b"hello world").await.unwrap(); crate::copy_hash(&dir, &sri, &dest).await.unwrap(); - let data = afs::read(&dest).await.unwrap(); + let data = crate::async_lib::read(&dest).await.unwrap(); assert_eq!(data, b"hello world"); } diff --git a/src/index.rs b/src/index.rs index 274c679..4fdcb37 100644 --- a/src/index.rs +++ b/src/index.rs @@ -5,11 +5,8 @@ use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::time::{SystemTime, UNIX_EPOCH}; -use async_std::fs as afs; -use async_std::io::BufReader; use digest::Digest; use either::{Left, Right}; -use futures::io::{AsyncBufReadExt, AsyncWriteExt}; use futures::stream::StreamExt; use serde_derive::{Deserialize, Serialize}; use serde_json::Value; @@ -18,6 +15,7 @@ use sha2::Sha256; use ssri::Integrity; use walkdir::WalkDir; +use crate::async_lib::{AsyncBufReadExt, AsyncWriteExt}; use crate::errors::{Internal, InternalResult, Result}; use crate::put::WriteOpts; @@ -97,7 +95,7 @@ pub fn insert(cache: &Path, key: &str, opts: WriteOpts) -> Result { pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: WriteOpts) -> Result { let bucket = bucket_path(cache, key); - afs::create_dir_all(bucket.parent().unwrap()) + crate::async_lib::create_dir_all(bucket.parent().unwrap()) .await .with_context(|| { format!( @@ -114,7 +112,7 @@ pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: WriteOpts) -> }) .with_context(|| format!("Failed to serialize entry with key `{}`", key))?; - let mut buck = async_std::fs::OpenOptions::new() + let mut buck = crate::async_lib::OpenOptions::new() .create(true) .append(true) .open(&bucket) @@ -311,7 +309,7 @@ fn bucket_entries(bucket: &Path) -> InternalResult> { } async fn bucket_entries_async(bucket: &Path) -> InternalResult> { - let file_result = afs::File::open(bucket).await; + let file_result = crate::async_lib::File::open(bucket).await; let file = if let Err(err) = file_result { if err.kind() == ErrorKind::NotFound { return Ok(Vec::new()); @@ -321,7 +319,8 @@ async fn bucket_entries_async(bucket: &Path) -> InternalResult>()[..] { @@ -340,7 +339,6 @@ async fn bucket_entries_async(bucket: &Path) -> InternalResult, cx: &mut TaskContext<'_>) -> Poll> { Pin::new(&mut self.writer).poll_close(cx) } + + #[cfg(feature = "tokio")] + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + ) -> Poll> { + Pin::new(&mut self.writer).poll_shutdown(cx) + } } impl Writer { @@ -423,7 +431,12 @@ impl SyncWriter { #[cfg(test)] mod tests { - #[async_attributes::test] + #[cfg(feature = "async-std")] + use async_attributes::test as async_test; + #[cfg(feature = "tokio")] + use tokio::test as async_test; + + #[async_test] async fn round_trip() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); diff --git a/src/rm.rs b/src/rm.rs index c185e3d..f0992c2 100644 --- a/src/rm.rs +++ b/src/rm.rs @@ -2,8 +2,6 @@ use std::fs; use std::path::Path; -use async_std::fs as afs; - use ssri::Integrity; use crate::content::rm; @@ -93,7 +91,9 @@ pub async fn remove_hash>(cache: P, sri: &Integrity) -> Result<() /// ``` pub async fn clear>(cache: P) -> Result<()> { for entry in (cache.as_ref().read_dir().to_internal()?).flatten() { - afs::remove_dir_all(entry.path()).await.to_internal()?; + crate::async_lib::remove_dir_all(entry.path()) + .await + .to_internal()?; } Ok(()) } @@ -182,11 +182,9 @@ pub fn clear_sync>(cache: P) -> Result<()> { #[cfg(test)] mod tests { - use async_std::task; - #[test] fn test_remove() { - task::block_on(async { + crate::async_lib::block_on(async { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); let sri = crate::write(&dir, "key", b"my-data").await.unwrap(); @@ -203,7 +201,7 @@ mod tests { #[test] fn test_remove_data() { - task::block_on(async { + crate::async_lib::block_on(async { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); let sri = crate::write(&dir, "key", b"my-data").await.unwrap(); @@ -220,7 +218,7 @@ mod tests { #[test] fn test_clear() { - task::block_on(async { + crate::async_lib::block_on(async { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); let sri = crate::write(&dir, "key", b"my-data").await.unwrap();