mirror of https://github.com/zkat/cacache-rs.git
Add support for choosing between async-std and tokio
This commit is contained in:
parent
6d84ff0aed
commit
54935bb1e8
15
Cargo.toml
15
Cargo.toml
|
|
@ -27,13 +27,28 @@ walkdir = "2.3.2"
|
|||
either = "1.6.1"
|
||||
async-std = { version = "1.10.0", features = ["unstable"] }
|
||||
thiserror = "1.0.29"
|
||||
tokio = { version = "1.12.0", features = [
|
||||
"fs",
|
||||
"io-util",
|
||||
"macros",
|
||||
"rt",
|
||||
"rt-multi-thread",
|
||||
], optional = true }
|
||||
tokio-stream = { version = "0.1.7", features = ["io-util"], optional = true }
|
||||
|
||||
[features]
|
||||
futures = "0.3.17"
|
||||
memmap2 = "0.5"
|
||||
|
||||
[dev-dependencies]
|
||||
async-std = { version = "1.10.0", features = ["unstable"] }
|
||||
async-attributes = "1.1.2"
|
||||
criterion = "0.3.5"
|
||||
|
||||
[[bench]]
|
||||
name = "benchmarks"
|
||||
harness = false
|
||||
|
||||
[features]
|
||||
default = ["async-std"]
|
||||
tokio-runtime = ["tokio", "tokio-stream"]
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ Minimum supported Rust version is `1.43.0`.
|
|||
|
||||
## Features
|
||||
|
||||
- First-class async support, using [`async-std`](https://crates.io/crates/async-std) as its runtime. Sync APIs are available but secondary
|
||||
- First-class async support, using either [`async-std`](https://crates.io/crates/async-std) or [`tokio`](https://crates.io/crates/tokio) as its runtime. Sync APIs are available but secondary
|
||||
- `std::fs`-style API
|
||||
- Extraction by key or by content address (shasum, etc)
|
||||
- [Subresource Integrity](#integrity) web standard support
|
||||
|
|
|
|||
|
|
@ -0,0 +1,126 @@
|
|||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::fs::File;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::fs::File;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use futures::io::AsyncRead;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::io::AsyncRead;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use futures::io::AsyncReadExt;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::io::AsyncReadExt;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use futures::io::AsyncBufReadExt;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::io::AsyncBufReadExt;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use futures::io::AsyncWrite;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::io::AsyncWrite;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use futures::io::AsyncWriteExt;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::io::AsyncWriteExt;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::fs::read;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::fs::read;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::fs::copy;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::fs::copy;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::fs::metadata;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::fs::metadata;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::fs::remove_file;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::fs::remove_file;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::fs::create_dir_all;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::fs::create_dir_all;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::fs::remove_dir_all;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::fs::remove_dir_all;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::fs::DirBuilder;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::fs::DirBuilder;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::fs::OpenOptions;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::fs::OpenOptions;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::io::BufReader;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::io::BufReader;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
#[inline]
|
||||
pub fn lines_to_stream<R>(lines: futures::io::Lines<R>) -> futures::io::Lines<R> {
|
||||
lines
|
||||
}
|
||||
#[cfg(feature = "tokio")]
|
||||
#[inline]
|
||||
pub fn lines_to_stream<R>(lines: tokio::io::Lines<R>) -> tokio_stream::wrappers::LinesStream<R> {
|
||||
tokio_stream::wrappers::LinesStream::new(lines)
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::task::spawn_blocking;
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::task::spawn_blocking;
|
||||
|
||||
#[cfg(all(test, feature = "async-std"))]
|
||||
pub use async_std::task::block_on;
|
||||
#[cfg(all(test, feature = "tokio"))]
|
||||
#[inline]
|
||||
pub fn block_on<F, T>(future: F) -> T
|
||||
where
|
||||
F: std::future::Future<Output = T>,
|
||||
{
|
||||
tokio::runtime::Runtime::new().unwrap().block_on(future)
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
pub use async_std::task::JoinHandle;
|
||||
#[cfg(feature = "async-std")]
|
||||
#[inline]
|
||||
pub async fn unwrap_joinhandle<R>(handle: async_std::task::JoinHandle<R>) -> R {
|
||||
handle.await
|
||||
}
|
||||
#[cfg(feature = "async-std")]
|
||||
#[inline]
|
||||
pub fn unwrap_joinhandle_value<T>(value: T) -> T {
|
||||
value
|
||||
}
|
||||
#[cfg(feature = "tokio")]
|
||||
pub use tokio::task::JoinHandle;
|
||||
#[cfg(feature = "tokio")]
|
||||
#[inline]
|
||||
pub async fn unwrap_joinhandle<R>(handle: tokio::task::JoinHandle<R>) -> R {
|
||||
handle.await.unwrap()
|
||||
}
|
||||
#[cfg(feature = "tokio")]
|
||||
#[inline]
|
||||
pub fn unwrap_joinhandle_value<T>(value: Result<T, tokio::task::JoinError>) -> T {
|
||||
value.unwrap()
|
||||
}
|
||||
|
|
@ -3,10 +3,9 @@ use std::path::Path;
|
|||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use async_std;
|
||||
use futures::prelude::*;
|
||||
use ssri::{Algorithm, Integrity, IntegrityChecker};
|
||||
|
||||
use crate::async_lib::AsyncRead;
|
||||
use crate::content::path;
|
||||
use crate::errors::{Internal, Result};
|
||||
|
||||
|
|
@ -30,11 +29,12 @@ impl Reader {
|
|||
}
|
||||
|
||||
pub struct AsyncReader {
|
||||
fd: async_std::fs::File,
|
||||
fd: crate::async_lib::File,
|
||||
checker: IntegrityChecker,
|
||||
}
|
||||
|
||||
impl AsyncRead for AsyncReader {
|
||||
#[cfg(feature = "async-std")]
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
|
|
@ -44,6 +44,22 @@ impl AsyncRead for AsyncReader {
|
|||
self.checker.input(&buf[..amt]);
|
||||
Poll::Ready(Ok(amt))
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> Poll<tokio::io::Result<()>> {
|
||||
let pre_len = buf.filled().len();
|
||||
futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?;
|
||||
let post_len = buf.filled().len();
|
||||
if post_len - pre_len == 0 {
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
self.checker.input(&buf.filled()[pre_len..]);
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncReader {
|
||||
|
|
@ -63,7 +79,7 @@ pub fn open(cache: &Path, sri: Integrity) -> Result<Reader> {
|
|||
pub async fn open_async(cache: &Path, sri: Integrity) -> Result<AsyncReader> {
|
||||
let cpath = path::content_path(cache, &sri);
|
||||
Ok(AsyncReader {
|
||||
fd: async_std::fs::File::open(cpath).await.to_internal()?,
|
||||
fd: crate::async_lib::File::open(cpath).await.to_internal()?,
|
||||
checker: IntegrityChecker::new(sri),
|
||||
})
|
||||
}
|
||||
|
|
@ -77,7 +93,7 @@ pub fn read(cache: &Path, sri: &Integrity) -> Result<Vec<u8>> {
|
|||
|
||||
pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result<Vec<u8>> {
|
||||
let cpath = path::content_path(cache, sri);
|
||||
let ret = async_std::fs::read(&cpath).await.to_internal()?;
|
||||
let ret = crate::async_lib::read(&cpath).await.to_internal()?;
|
||||
sri.check(&ret)?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
|
@ -92,8 +108,8 @@ pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64> {
|
|||
|
||||
pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result<u64> {
|
||||
let cpath = path::content_path(cache, sri);
|
||||
let ret = async_std::fs::copy(&cpath, to).await.to_internal()?;
|
||||
let data = async_std::fs::read(cpath).await.to_internal()?;
|
||||
let ret = crate::async_lib::copy(&cpath, to).await.to_internal()?;
|
||||
let data = crate::async_lib::read(cpath).await.to_internal()?;
|
||||
sri.check(data)?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
|
@ -107,7 +123,7 @@ pub fn has_content(cache: &Path, sri: &Integrity) -> Option<Integrity> {
|
|||
}
|
||||
|
||||
pub async fn has_content_async(cache: &Path, sri: &Integrity) -> Option<Integrity> {
|
||||
if async_std::fs::metadata(path::content_path(cache, sri))
|
||||
if crate::async_lib::metadata(path::content_path(cache, sri))
|
||||
.await
|
||||
.is_ok()
|
||||
{
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
use async_std::fs as afs;
|
||||
use ssri::Integrity;
|
||||
|
||||
use crate::content::path;
|
||||
|
|
@ -13,7 +12,7 @@ pub fn rm(cache: &Path, sri: &Integrity) -> Result<()> {
|
|||
}
|
||||
|
||||
pub async fn rm_async(cache: &Path, sri: &Integrity) -> Result<()> {
|
||||
afs::remove_file(path::content_path(cache, sri))
|
||||
crate::async_lib::remove_file(path::content_path(cache, sri))
|
||||
.await
|
||||
.to_internal()?;
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -3,11 +3,9 @@ use std::io::prelude::*;
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
use std::sync::Mutex;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
use async_std::fs as afs;
|
||||
use async_std::future::Future;
|
||||
use async_std::task::{self, Context, JoinHandle, Poll};
|
||||
use futures::io::AsyncWrite;
|
||||
use crate::async_lib::{AsyncWrite, JoinHandle};
|
||||
use futures::prelude::*;
|
||||
use memmap2::MmapMut;
|
||||
use ssri::{Algorithm, Integrity, IntegrityOpts};
|
||||
|
|
@ -116,11 +114,12 @@ impl AsyncWriter {
|
|||
let cache_path = cache.to_path_buf();
|
||||
let mut tmp_path = cache_path.clone();
|
||||
tmp_path.push("tmp");
|
||||
afs::DirBuilder::new()
|
||||
crate::async_lib::DirBuilder::new()
|
||||
.recursive(true)
|
||||
.create(&tmp_path)
|
||||
.await
|
||||
.to_internal()?;
|
||||
let tmpfile = crate::async_lib::unwrap_joinhandle(crate::async_lib::spawn_blocking(|| {
|
||||
let mut tmpfile = task::spawn_blocking(|| NamedTempFile::new_in(tmp_path))
|
||||
.await
|
||||
.to_internal()?;
|
||||
|
|
@ -162,7 +161,7 @@ impl AsyncWriter {
|
|||
let cpath = path::content_path(&inner.cache, &sri);
|
||||
|
||||
// Start the operation asynchronously.
|
||||
*state = State::Busy(task::spawn_blocking(|| {
|
||||
*state = State::Busy(crate::async_lib::spawn_blocking(|| {
|
||||
let res = std::fs::DirBuilder::new()
|
||||
.recursive(true)
|
||||
// Safe unwrap. cpath always has multiple segments
|
||||
|
|
@ -204,7 +203,11 @@ impl AsyncWriter {
|
|||
}
|
||||
},
|
||||
// Poll the asynchronous operation the file is currently blocked on.
|
||||
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
|
||||
State::Busy(task) => {
|
||||
*state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(
|
||||
Pin::new(task).poll(cx)
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
|
@ -255,7 +258,7 @@ impl AsyncWrite for AsyncWriter {
|
|||
inner.buf[..buf.len()].copy_from_slice(buf);
|
||||
|
||||
// Start the operation asynchronously.
|
||||
*state = State::Busy(task::spawn_blocking(|| {
|
||||
*state = State::Busy(crate::async_lib::spawn_blocking(|| {
|
||||
inner.builder.input(&inner.buf);
|
||||
if let Some(mmap) = &mut inner.mmap {
|
||||
mmap.copy_from_slice(&inner.buf);
|
||||
|
|
@ -270,7 +273,12 @@ impl AsyncWrite for AsyncWriter {
|
|||
}
|
||||
}
|
||||
// Poll the asynchronous operation the file is currently blocked on.
|
||||
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
|
||||
State::Busy(task) => {
|
||||
*state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new(
|
||||
task
|
||||
)
|
||||
.poll(cx)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -302,7 +310,7 @@ impl AsyncWrite for AsyncWriter {
|
|||
}
|
||||
|
||||
// Start the operation asynchronously.
|
||||
*state = State::Busy(task::spawn_blocking(|| {
|
||||
*state = State::Busy(crate::async_lib::spawn_blocking(|| {
|
||||
let res = inner.tmpfile.flush();
|
||||
inner.last_op = Some(Operation::Flush(res));
|
||||
State::Idle(Some(inner))
|
||||
|
|
@ -310,12 +318,33 @@ impl AsyncWrite for AsyncWriter {
|
|||
}
|
||||
}
|
||||
// Poll the asynchronous operation the file is currently blocked on.
|
||||
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
|
||||
State::Busy(task) => {
|
||||
*state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new(
|
||||
task
|
||||
)
|
||||
.poll(cx)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||
self.poll_close_impl(cx)
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
|
||||
self.poll_close_impl(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWriter {
|
||||
#[inline]
|
||||
fn poll_close_impl(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
let state = &mut *self.0.lock().unwrap();
|
||||
|
||||
loop {
|
||||
|
|
@ -329,13 +358,18 @@ impl AsyncWrite for AsyncWriter {
|
|||
};
|
||||
|
||||
// Start the operation asynchronously.
|
||||
*state = State::Busy(task::spawn_blocking(|| {
|
||||
*state = State::Busy(crate::async_lib::spawn_blocking(|| {
|
||||
drop(inner);
|
||||
State::Idle(None)
|
||||
}));
|
||||
}
|
||||
// Poll the asynchronous operation the file is currently blocked on.
|
||||
State::Busy(task) => *state = futures::ready!(Pin::new(task).poll(cx)),
|
||||
State::Busy(task) => {
|
||||
*state = crate::async_lib::unwrap_joinhandle_value(futures::ready!(Pin::new(
|
||||
task
|
||||
)
|
||||
.poll(cx)))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -348,7 +382,7 @@ fn io_error(err: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> std::io
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_std::task;
|
||||
use crate::async_lib::AsyncWriteExt;
|
||||
use tempfile;
|
||||
#[test]
|
||||
fn basic_write() {
|
||||
|
|
@ -368,7 +402,7 @@ mod tests {
|
|||
fn basic_async_write() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
task::block_on(async {
|
||||
crate::async_lib::block_on(async {
|
||||
let mut writer = AsyncWriter::new(&dir, Algorithm::Sha256, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
|
|
|||
37
src/get.rs
37
src/get.rs
|
|
@ -3,10 +3,9 @@ use std::path::Path;
|
|||
use std::pin::Pin;
|
||||
use std::task::{Context as TaskContext, Poll};
|
||||
|
||||
use futures::prelude::*;
|
||||
|
||||
use ssri::{Algorithm, Integrity};
|
||||
|
||||
use crate::async_lib::AsyncRead;
|
||||
use crate::content::read;
|
||||
use crate::errors::{Error, Result};
|
||||
use crate::index::{self, Metadata};
|
||||
|
|
@ -24,6 +23,7 @@ pub struct Reader {
|
|||
}
|
||||
|
||||
impl AsyncRead for Reader {
|
||||
#[cfg(feature = "async-std")]
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut TaskContext<'_>,
|
||||
|
|
@ -31,6 +31,15 @@ impl AsyncRead for Reader {
|
|||
) -> Poll<std::io::Result<usize>> {
|
||||
Pin::new(&mut self.reader).poll_read(cx, buf)
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut TaskContext<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> Poll<tokio::io::Result<()>> {
|
||||
Pin::new(&mut self.reader).poll_read(cx, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl Reader {
|
||||
|
|
@ -457,11 +466,15 @@ pub fn exists_sync<P: AsRef<Path>>(cache: P, sri: &Integrity) -> bool {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use async_std::fs as afs;
|
||||
use async_std::prelude::*;
|
||||
use crate::async_lib::AsyncReadExt;
|
||||
use std::fs;
|
||||
|
||||
#[async_attributes::test]
|
||||
#[cfg(feature = "async-std")]
|
||||
use async_attributes::test as async_test;
|
||||
#[cfg(feature = "tokio")]
|
||||
use tokio::test as async_test;
|
||||
|
||||
#[async_test]
|
||||
async fn test_open() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
|
|
@ -474,7 +487,7 @@ mod tests {
|
|||
assert_eq!(str, String::from("hello world"));
|
||||
}
|
||||
|
||||
#[async_attributes::test]
|
||||
#[async_test]
|
||||
async fn test_open_hash() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
|
|
@ -515,7 +528,7 @@ mod tests {
|
|||
assert_eq!(str, String::from("hello world"));
|
||||
}
|
||||
|
||||
#[async_attributes::test]
|
||||
#[async_test]
|
||||
async fn test_read() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
|
|
@ -525,7 +538,7 @@ mod tests {
|
|||
assert_eq!(data, b"hello world");
|
||||
}
|
||||
|
||||
#[async_attributes::test]
|
||||
#[async_test]
|
||||
async fn test_read_hash() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
|
|
@ -555,7 +568,7 @@ mod tests {
|
|||
assert_eq!(data, b"hello world");
|
||||
}
|
||||
|
||||
#[async_attributes::test]
|
||||
#[async_test]
|
||||
async fn test_copy() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path();
|
||||
|
|
@ -563,11 +576,11 @@ mod tests {
|
|||
crate::write(&dir, "my-key", b"hello world").await.unwrap();
|
||||
|
||||
crate::copy(&dir, "my-key", &dest).await.unwrap();
|
||||
let data = afs::read(&dest).await.unwrap();
|
||||
let data = crate::async_lib::read(&dest).await.unwrap();
|
||||
assert_eq!(data, b"hello world");
|
||||
}
|
||||
|
||||
#[async_attributes::test]
|
||||
#[async_test]
|
||||
async fn test_copy_hash() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path();
|
||||
|
|
@ -575,7 +588,7 @@ mod tests {
|
|||
let sri = crate::write(&dir, "my-key", b"hello world").await.unwrap();
|
||||
|
||||
crate::copy_hash(&dir, &sri, &dest).await.unwrap();
|
||||
let data = afs::read(&dest).await.unwrap();
|
||||
let data = crate::async_lib::read(&dest).await.unwrap();
|
||||
assert_eq!(data, b"hello world");
|
||||
}
|
||||
|
||||
|
|
|
|||
23
src/index.rs
23
src/index.rs
|
|
@ -5,11 +5,8 @@ use std::io::{ErrorKind, Write};
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use async_std::fs as afs;
|
||||
use async_std::io::BufReader;
|
||||
use digest::Digest;
|
||||
use either::{Left, Right};
|
||||
use futures::io::{AsyncBufReadExt, AsyncWriteExt};
|
||||
use futures::stream::StreamExt;
|
||||
use serde_derive::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
|
|
@ -18,6 +15,7 @@ use sha2::Sha256;
|
|||
use ssri::Integrity;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::async_lib::{AsyncBufReadExt, AsyncWriteExt};
|
||||
use crate::errors::{Internal, InternalResult, Result};
|
||||
use crate::put::WriteOpts;
|
||||
|
||||
|
|
@ -97,7 +95,7 @@ pub fn insert(cache: &Path, key: &str, opts: WriteOpts) -> Result<Integrity> {
|
|||
|
||||
pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: WriteOpts) -> Result<Integrity> {
|
||||
let bucket = bucket_path(cache, key);
|
||||
afs::create_dir_all(bucket.parent().unwrap())
|
||||
crate::async_lib::create_dir_all(bucket.parent().unwrap())
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
|
|
@ -114,7 +112,7 @@ pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: WriteOpts) ->
|
|||
})
|
||||
.with_context(|| format!("Failed to serialize entry with key `{}`", key))?;
|
||||
|
||||
let mut buck = async_std::fs::OpenOptions::new()
|
||||
let mut buck = crate::async_lib::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&bucket)
|
||||
|
|
@ -311,7 +309,7 @@ fn bucket_entries(bucket: &Path) -> InternalResult<Vec<SerializableMetadata>> {
|
|||
}
|
||||
|
||||
async fn bucket_entries_async(bucket: &Path) -> InternalResult<Vec<SerializableMetadata>> {
|
||||
let file_result = afs::File::open(bucket).await;
|
||||
let file_result = crate::async_lib::File::open(bucket).await;
|
||||
let file = if let Err(err) = file_result {
|
||||
if err.kind() == ErrorKind::NotFound {
|
||||
return Ok(Vec::new());
|
||||
|
|
@ -321,7 +319,8 @@ async fn bucket_entries_async(bucket: &Path) -> InternalResult<Vec<SerializableM
|
|||
file_result.unwrap()
|
||||
};
|
||||
let mut vec = Vec::new();
|
||||
let mut lines = BufReader::new(file).lines();
|
||||
let mut lines =
|
||||
crate::async_lib::lines_to_stream(crate::async_lib::BufReader::new(file).lines());
|
||||
while let Some(line) = lines.next().await {
|
||||
if let Ok(entry) = line {
|
||||
let entry_str = match entry.split('\t').collect::<Vec<&str>>()[..] {
|
||||
|
|
@ -340,7 +339,6 @@ async fn bucket_entries_async(bucket: &Path) -> InternalResult<Vec<SerializableM
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_std::task;
|
||||
use serde_json::json;
|
||||
|
||||
const MOCK_ENTRY: &str = "\n251d18a2b33264ea8655695fd23c88bd874cdea2c3dc9d8f9b7596717ad30fec\t{\"key\":\"hello\",\"integrity\":\"sha1-deadbeef\",\"time\":1234567,\"size\":0,\"metadata\":null}";
|
||||
|
|
@ -364,7 +362,7 @@ mod tests {
|
|||
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
|
||||
let time = 1_234_567;
|
||||
let opts = WriteOpts::new().integrity(sri).time(time);
|
||||
task::block_on(async {
|
||||
crate::async_lib::block_on(async {
|
||||
insert_async(&dir, "hello", opts).await.unwrap();
|
||||
});
|
||||
let entry = std::fs::read_to_string(bucket_path(&dir, "hello")).unwrap();
|
||||
|
|
@ -420,7 +418,7 @@ mod tests {
|
|||
let time = 1_234_567;
|
||||
let opts = WriteOpts::new().integrity(sri).time(time);
|
||||
insert(&dir, "hello", opts).unwrap();
|
||||
task::block_on(async {
|
||||
crate::async_lib::block_on(async {
|
||||
delete_async(&dir, "hello").await.unwrap();
|
||||
});
|
||||
assert_eq!(find(&dir, "hello").unwrap(), None);
|
||||
|
|
@ -454,10 +452,11 @@ mod tests {
|
|||
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
|
||||
let time = 1_234_567;
|
||||
let opts = WriteOpts::new().integrity(sri.clone()).time(time);
|
||||
task::block_on(async {
|
||||
crate::async_lib::block_on(async {
|
||||
insert_async(&dir, "hello", opts).await.unwrap();
|
||||
});
|
||||
let entry = task::block_on(async { find_async(&dir, "hello").await.unwrap().unwrap() });
|
||||
let entry =
|
||||
crate::async_lib::block_on(async { find_async(&dir, "hello").await.unwrap().unwrap() });
|
||||
assert_eq!(
|
||||
entry,
|
||||
Metadata {
|
||||
|
|
|
|||
|
|
@ -123,9 +123,17 @@
|
|||
//! ```
|
||||
#![warn(missing_docs, missing_doc_code_examples)]
|
||||
|
||||
#[cfg(not(any(feature = "async-std", feature = "tokio-runtime")))]
|
||||
compile_error!("Either feature \"async-std\" or \"tokio-runtime\" must be enabled for this crate.");
|
||||
|
||||
#[cfg(all(feature = "async-std", feature = "tokio-runtime"))]
|
||||
compile_error!("Only either feature \"async-std\" or \"tokio-runtime\" must be enabled for this crate, not both.");
|
||||
|
||||
pub use serde_json::Value;
|
||||
pub use ssri::Algorithm;
|
||||
|
||||
mod async_lib;
|
||||
|
||||
mod content;
|
||||
mod errors;
|
||||
mod index;
|
||||
|
|
|
|||
19
src/put.rs
19
src/put.rs
|
|
@ -3,11 +3,10 @@ use std::io::prelude::*;
|
|||
use std::path::{Path, PathBuf};
|
||||
use std::pin::Pin;
|
||||
|
||||
use futures::prelude::*;
|
||||
|
||||
use serde_json::Value;
|
||||
use ssri::{Algorithm, Integrity};
|
||||
|
||||
use crate::async_lib::{AsyncWrite, AsyncWriteExt};
|
||||
use crate::content::write;
|
||||
use crate::errors::{Error, Internal, Result};
|
||||
use crate::index;
|
||||
|
|
@ -102,9 +101,18 @@ impl AsyncWrite for Writer {
|
|||
Pin::new(&mut self.writer).poll_flush(cx)
|
||||
}
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<std::io::Result<()>> {
|
||||
Pin::new(&mut self.writer).poll_close(cx)
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
fn poll_shutdown(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut TaskContext<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
Pin::new(&mut self.writer).poll_shutdown(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl Writer {
|
||||
|
|
@ -423,7 +431,12 @@ impl SyncWriter {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
#[async_attributes::test]
|
||||
#[cfg(feature = "async-std")]
|
||||
use async_attributes::test as async_test;
|
||||
#[cfg(feature = "tokio")]
|
||||
use tokio::test as async_test;
|
||||
|
||||
#[async_test]
|
||||
async fn round_trip() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
|
|
|
|||
14
src/rm.rs
14
src/rm.rs
|
|
@ -2,8 +2,6 @@
|
|||
use std::fs;
|
||||
use std::path::Path;
|
||||
|
||||
use async_std::fs as afs;
|
||||
|
||||
use ssri::Integrity;
|
||||
|
||||
use crate::content::rm;
|
||||
|
|
@ -93,7 +91,9 @@ pub async fn remove_hash<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<()
|
|||
/// ```
|
||||
pub async fn clear<P: AsRef<Path>>(cache: P) -> Result<()> {
|
||||
for entry in (cache.as_ref().read_dir().to_internal()?).flatten() {
|
||||
afs::remove_dir_all(entry.path()).await.to_internal()?;
|
||||
crate::async_lib::remove_dir_all(entry.path())
|
||||
.await
|
||||
.to_internal()?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -182,11 +182,9 @@ pub fn clear_sync<P: AsRef<Path>>(cache: P) -> Result<()> {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use async_std::task;
|
||||
|
||||
#[test]
|
||||
fn test_remove() {
|
||||
task::block_on(async {
|
||||
crate::async_lib::block_on(async {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
let sri = crate::write(&dir, "key", b"my-data").await.unwrap();
|
||||
|
|
@ -203,7 +201,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_remove_data() {
|
||||
task::block_on(async {
|
||||
crate::async_lib::block_on(async {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
let sri = crate::write(&dir, "key", b"my-data").await.unwrap();
|
||||
|
|
@ -220,7 +218,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_clear() {
|
||||
task::block_on(async {
|
||||
crate::async_lib::block_on(async {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
let sri = crate::write(&dir, "key", b"my-data").await.unwrap();
|
||||
|
|
|
|||
Loading…
Reference in New Issue