mirror of https://github.com/zkat/cacache-rs.git
feat(errors): integrate miette and generally improve error reporting (#38)
BREAKING CHANGE: This bumps the MSRV to 1.67.0 and documents it in the README.
This commit is contained in:
parent
da259ae432
commit
c2d5390a84
|
|
@ -28,7 +28,7 @@ jobs:
|
|||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
rust: [1.59.0, stable]
|
||||
rust: [1.67.0, stable]
|
||||
os: [ubuntu-latest, macOS-latest, windows-latest]
|
||||
|
||||
steps:
|
||||
|
|
@ -52,7 +52,7 @@ jobs:
|
|||
runs-on: ${{ matrix.os }}
|
||||
strategy:
|
||||
matrix:
|
||||
rust: [1.59.0, stable]
|
||||
rust: [1.67.0, stable]
|
||||
os: [ubuntu-latest, macOS-latest, windows-latest]
|
||||
|
||||
steps:
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ either = "1.6.1"
|
|||
futures = "0.3.17"
|
||||
hex = "0.4.3"
|
||||
memmap2 = "0.5.8"
|
||||
miette = "5.5.0"
|
||||
serde = "1.0.130"
|
||||
serde_derive = "1.0.130"
|
||||
serde_json = "1.0.68"
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ Minimum supported Rust version is `1.43.0`.
|
|||
- Pretty darn fast
|
||||
- Arbitrary metadata storage
|
||||
- Cross-platform: Windows and case-(in)sensitive filesystem support
|
||||
- [`miette`](https://crates.io/crates/miette) integration for detailed, helpful error reporting.
|
||||
- Punches nazis
|
||||
|
||||
`async-std` is the default async runtime. To use `tokio` instead, turn off default features and enable the `tokio-runtime` feature, like this:
|
||||
|
|
@ -70,6 +71,11 @@ All participants and maintainers in this project are expected to follow [Code of
|
|||
|
||||
Happy hacking!
|
||||
|
||||
## MSRV
|
||||
|
||||
The Minimum Supported Rust Version for cacache is `1.67.0`. Any changes to the
|
||||
MSRV will be considered breaking changes.
|
||||
|
||||
## License
|
||||
|
||||
This project is licensed under [the Apache-2.0 License](LICENSE.md).
|
||||
|
|
|
|||
|
|
@ -104,22 +104,25 @@ pub fn unwrap_joinhandle_value<T>(value: Result<T, tokio::task::JoinError>) -> T
|
|||
value.unwrap()
|
||||
}
|
||||
|
||||
use crate::errors::{Internal, InternalResult};
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
use crate::errors::IoErrorExt;
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
#[inline]
|
||||
pub async fn create_named_tempfile(tmp_path: std::path::PathBuf) -> InternalResult<NamedTempFile> {
|
||||
pub async fn create_named_tempfile(tmp_path: std::path::PathBuf) -> crate::Result<NamedTempFile> {
|
||||
let cloned = tmp_path.clone();
|
||||
spawn_blocking(|| NamedTempFile::new_in(tmp_path))
|
||||
.await
|
||||
.to_internal()
|
||||
.with_context(|| format!("Failed to create a temp file at {}", cloned.display()))
|
||||
}
|
||||
|
||||
#[cfg(feature = "tokio")]
|
||||
#[inline]
|
||||
pub async fn create_named_tempfile(tmp_path: std::path::PathBuf) -> InternalResult<NamedTempFile> {
|
||||
let tmpfile = spawn_blocking(|| NamedTempFile::new_in(tmp_path))
|
||||
pub async fn create_named_tempfile(tmp_path: std::path::PathBuf) -> crate::Result<NamedTempFile> {
|
||||
let cloned = tmp_path.clone();
|
||||
Ok(spawn_blocking(|| NamedTempFile::new_in(tmp_path))
|
||||
.await
|
||||
.to_internal()?;
|
||||
tmpfile.to_internal()
|
||||
.unwrap()
|
||||
.with_context(|| format!("Failed to create a temp file at {}", cloned.display()))?)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,13 +1,19 @@
|
|||
use std::fs::{self, File};
|
||||
use std::io::Read;
|
||||
use std::path::Path;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
#[cfg(feature = "async-std")]
|
||||
use futures::io::AsyncReadExt;
|
||||
#[cfg(feature = "tokio")]
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
use ssri::{Algorithm, Integrity, IntegrityChecker};
|
||||
|
||||
use crate::async_lib::AsyncRead;
|
||||
use crate::content::path;
|
||||
use crate::errors::{Internal, Result};
|
||||
use crate::errors::{IoErrorExt, Result};
|
||||
|
||||
pub struct Reader {
|
||||
fd: File,
|
||||
|
|
@ -71,7 +77,12 @@ impl AsyncReader {
|
|||
pub fn open(cache: &Path, sri: Integrity) -> Result<Reader> {
|
||||
let cpath = path::content_path(cache, &sri);
|
||||
Ok(Reader {
|
||||
fd: File::open(cpath).to_internal()?,
|
||||
fd: File::open(cpath).with_context(|| {
|
||||
format!(
|
||||
"Failed to open reader to {}",
|
||||
path::content_path(cache, &sri).display()
|
||||
)
|
||||
})?,
|
||||
checker: IntegrityChecker::new(sri),
|
||||
})
|
||||
}
|
||||
|
|
@ -79,38 +90,85 @@ pub fn open(cache: &Path, sri: Integrity) -> Result<Reader> {
|
|||
pub async fn open_async(cache: &Path, sri: Integrity) -> Result<AsyncReader> {
|
||||
let cpath = path::content_path(cache, &sri);
|
||||
Ok(AsyncReader {
|
||||
fd: crate::async_lib::File::open(cpath).await.to_internal()?,
|
||||
fd: crate::async_lib::File::open(cpath).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to open reader to {}",
|
||||
path::content_path(cache, &sri).display()
|
||||
)
|
||||
})?,
|
||||
checker: IntegrityChecker::new(sri),
|
||||
})
|
||||
}
|
||||
|
||||
pub fn read(cache: &Path, sri: &Integrity) -> Result<Vec<u8>> {
|
||||
let cpath = path::content_path(cache, sri);
|
||||
let ret = fs::read(cpath).to_internal()?;
|
||||
let ret = fs::read(cpath).with_context(|| {
|
||||
format!(
|
||||
"Failed to read contents for file at {}",
|
||||
path::content_path(cache, sri).display()
|
||||
)
|
||||
})?;
|
||||
sri.check(&ret)?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result<Vec<u8>> {
|
||||
let cpath = path::content_path(cache, sri);
|
||||
let ret = crate::async_lib::read(&cpath).await.to_internal()?;
|
||||
let ret = crate::async_lib::read(&cpath).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to read contents for file at {}",
|
||||
path::content_path(cache, sri).display()
|
||||
)
|
||||
})?;
|
||||
sri.check(&ret)?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64> {
|
||||
let cpath = path::content_path(cache, sri);
|
||||
let ret = fs::copy(&cpath, to).to_internal()?;
|
||||
let data = fs::read(cpath).to_internal()?;
|
||||
sri.check(data)?;
|
||||
let ret = fs::copy(&cpath, to).with_context(|| {
|
||||
format!(
|
||||
"Failed to copy cache contents from {} to {}",
|
||||
path::content_path(cache, sri).display(),
|
||||
to.display()
|
||||
)
|
||||
})?;
|
||||
let mut reader = open(cache, sri.clone())?;
|
||||
let mut buf: [u8; 1024] = [0; 1024];
|
||||
while reader.read(&mut buf).with_context(|| {
|
||||
format!(
|
||||
"Failed to read cache contents while verifying integrity for {}",
|
||||
path::content_path(cache, sri).display()
|
||||
)
|
||||
})? > 0
|
||||
{}
|
||||
reader.check()?;
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result<u64> {
|
||||
let cpath = path::content_path(cache, sri);
|
||||
let ret = crate::async_lib::copy(&cpath, to).await.to_internal()?;
|
||||
let data = crate::async_lib::read(cpath).await.to_internal()?;
|
||||
sri.check(data)?;
|
||||
let ret = crate::async_lib::copy(&cpath, to).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to copy cache contents from {} to {}",
|
||||
path::content_path(cache, sri).display(),
|
||||
to.display()
|
||||
)
|
||||
})?;
|
||||
let mut reader = open_async(cache, sri.clone()).await?;
|
||||
let mut buf: [u8; 1024] = [0; 1024];
|
||||
while AsyncReadExt::read(&mut reader, &mut buf)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read cache contents while verifying integrity for {}",
|
||||
path::content_path(cache, sri).display()
|
||||
)
|
||||
})?
|
||||
> 0
|
||||
{}
|
||||
reader.check()?;
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,16 +4,26 @@ use std::path::Path;
|
|||
use ssri::Integrity;
|
||||
|
||||
use crate::content::path;
|
||||
use crate::errors::{Internal, Result};
|
||||
use crate::errors::{IoErrorExt, Result};
|
||||
|
||||
pub fn rm(cache: &Path, sri: &Integrity) -> Result<()> {
|
||||
fs::remove_file(path::content_path(cache, sri)).to_internal()?;
|
||||
fs::remove_file(path::content_path(cache, sri)).with_context(|| {
|
||||
format!(
|
||||
"Failed to remove cache file {}",
|
||||
path::content_path(cache, sri).display()
|
||||
)
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn rm_async(cache: &Path, sri: &Integrity) -> Result<()> {
|
||||
crate::async_lib::remove_file(path::content_path(cache, sri))
|
||||
.await
|
||||
.to_internal()?;
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to remove cache file {}",
|
||||
path::content_path(cache, sri).display()
|
||||
)
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ use tempfile::NamedTempFile;
|
|||
|
||||
use crate::async_lib::{AsyncWrite, JoinHandle};
|
||||
use crate::content::path;
|
||||
use crate::errors::{Internal, Result};
|
||||
use crate::errors::{IoErrorExt, Result};
|
||||
|
||||
pub const MAX_MMAP_SIZE: usize = 1024 * 1024;
|
||||
|
||||
|
|
@ -31,11 +31,30 @@ impl Writer {
|
|||
DirBuilder::new()
|
||||
.recursive(true)
|
||||
.create(&tmp_path)
|
||||
.to_internal()?;
|
||||
let mut tmpfile = NamedTempFile::new_in(tmp_path).to_internal()?;
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to create cache directory for temporary files, at {}",
|
||||
tmp_path.display()
|
||||
)
|
||||
})?;
|
||||
let tmp_path_clone = tmp_path.clone();
|
||||
let mut tmpfile = NamedTempFile::new_in(tmp_path).with_context(|| {
|
||||
format!(
|
||||
"Failed to create temp file while initializing a writer, inside {}",
|
||||
tmp_path_clone.display()
|
||||
)
|
||||
})?;
|
||||
let mmap = if let Some(size) = size {
|
||||
if size <= MAX_MMAP_SIZE {
|
||||
tmpfile.as_file_mut().set_len(size as u64).to_internal()?;
|
||||
tmpfile
|
||||
.as_file_mut()
|
||||
.set_len(size as u64)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to configure file length for temp file at {}",
|
||||
tmpfile.path().display()
|
||||
)
|
||||
})?;
|
||||
unsafe { MmapMut::map_mut(tmpfile.as_file()).ok() }
|
||||
} else {
|
||||
None
|
||||
|
|
@ -58,13 +77,31 @@ impl Writer {
|
|||
.recursive(true)
|
||||
// Safe unwrap. cpath always has multiple segments
|
||||
.create(cpath.parent().unwrap())
|
||||
.to_internal()?;
|
||||
let res = self.tmpfile.persist(&cpath).to_internal();
|
||||
if res.is_err() {
|
||||
// We might run into conflicts sometimes when persisting files.
|
||||
// This is ok. We can deal. Let's just make sure the destination
|
||||
// file actually exists, and we can move on.
|
||||
std::fs::metadata(cpath).to_internal()?;
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to create destination directory for cache contents, at {}",
|
||||
path::content_path(&self.cache, &sri)
|
||||
.parent()
|
||||
.unwrap()
|
||||
.display()
|
||||
)
|
||||
})?;
|
||||
let res = self.tmpfile.persist(&cpath);
|
||||
match res {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
// We might run into conflicts sometimes when persisting files.
|
||||
// This is ok. We can deal. Let's just make sure the destination
|
||||
// file actually exists, and we can move on.
|
||||
if !cpath.exists() {
|
||||
return Err(e.error).with_context(|| {
|
||||
format!(
|
||||
"Failed to persist cache contents while closing writer, at {}",
|
||||
path::content_path(&self.cache, &sri).display()
|
||||
)
|
||||
})?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(sri)
|
||||
}
|
||||
|
|
@ -118,11 +155,24 @@ impl AsyncWriter {
|
|||
.recursive(true)
|
||||
.create(&tmp_path)
|
||||
.await
|
||||
.to_internal()?;
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to create cache directory for temporary files, at {}",
|
||||
tmp_path.display()
|
||||
)
|
||||
})?;
|
||||
let mut tmpfile = crate::async_lib::create_named_tempfile(tmp_path).await?;
|
||||
let mmap = if let Some(size) = size {
|
||||
if size <= MAX_MMAP_SIZE {
|
||||
tmpfile.as_file_mut().set_len(size as u64).to_internal()?;
|
||||
tmpfile
|
||||
.as_file_mut()
|
||||
.set_len(size as u64)
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to configure file length for temp file at {}",
|
||||
tmpfile.path().display()
|
||||
)
|
||||
})?;
|
||||
unsafe { MmapMut::map_mut(tmpfile.as_file()).ok() }
|
||||
} else {
|
||||
None
|
||||
|
|
@ -144,7 +194,7 @@ impl AsyncWriter {
|
|||
// NOTE: How do I even get access to `inner` safely???
|
||||
// let inner = ???;
|
||||
// Blocking, but should be a very fast op.
|
||||
Ok(futures::future::poll_fn(|cx| {
|
||||
futures::future::poll_fn(|cx| {
|
||||
let state = &mut *self.0.lock().unwrap();
|
||||
|
||||
loop {
|
||||
|
|
@ -172,9 +222,12 @@ impl AsyncWriter {
|
|||
if res.is_err() {
|
||||
let _ = s.send(res.map(|_| sri));
|
||||
} else {
|
||||
let res = tmpfile.persist(&cpath).with_context(|| {
|
||||
String::from("persisting tempfile failed")
|
||||
});
|
||||
let res = tmpfile
|
||||
.persist(&cpath)
|
||||
.map_err(|e| e.error)
|
||||
.with_context(|| {
|
||||
format!("persisting file {} failed", cpath.display())
|
||||
});
|
||||
if res.is_err() {
|
||||
// We might run into conflicts
|
||||
// sometimes when persisting files.
|
||||
|
|
@ -208,11 +261,12 @@ impl AsyncWriter {
|
|||
}
|
||||
}
|
||||
})
|
||||
.map(|opt| opt.ok_or_else(|| io_error("file closed")))
|
||||
.map(|opt| opt.ok_or_else(|| crate::errors::io_error("file closed")))
|
||||
.await
|
||||
.to_internal()?
|
||||
.with_context(|| "Error while closing cache contents".to_string())?
|
||||
.await
|
||||
.to_internal()??)
|
||||
.map_err(|_| crate::errors::io_error("Operation cancelled"))
|
||||
.with_context(|| "Error while closing cache contents".to_string())?
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -229,7 +283,9 @@ impl AsyncWrite for AsyncWriter {
|
|||
State::Idle(opt) => {
|
||||
// Grab a reference to the inner representation of the file or return an error
|
||||
// if the file is closed.
|
||||
let inner = opt.as_mut().ok_or_else(|| io_error("file closed"))?;
|
||||
let inner = opt
|
||||
.as_mut()
|
||||
.ok_or_else(|| crate::errors::io_error("file closed"))?;
|
||||
|
||||
// Check if the operation has completed.
|
||||
if let Some(Operation::Write(res)) = inner.last_op.take() {
|
||||
|
|
@ -372,10 +428,6 @@ impl AsyncWriter {
|
|||
}
|
||||
}
|
||||
|
||||
fn io_error(err: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> std::io::Error {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, err)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
|
|
|||
|
|
@ -1,65 +1,63 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use miette::Diagnostic;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
#[error("{source}\n\n {}", context.join("\n "))]
|
||||
pub struct InternalError {
|
||||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
context: Vec<String>,
|
||||
}
|
||||
|
||||
pub trait Internal<T> {
|
||||
fn to_internal(self) -> InternalResult<T>;
|
||||
fn with_context<F: FnOnce() -> String>(self, f: F) -> InternalResult<T>;
|
||||
}
|
||||
|
||||
impl<T, E: 'static + std::error::Error + Send + Sync> Internal<T> for std::result::Result<T, E> {
|
||||
fn to_internal(self) -> InternalResult<T> {
|
||||
self.map_err(|e| InternalError {
|
||||
source: Box::new(e),
|
||||
context: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
fn with_context<F: FnOnce() -> String>(self, f: F) -> InternalResult<T> {
|
||||
self.map_err(|e| InternalError {
|
||||
source: Box::new(e),
|
||||
context: vec![f()],
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Error type returned by all API calls.
|
||||
#[derive(Error, Debug)]
|
||||
#[derive(Error, Debug, Diagnostic)]
|
||||
pub enum Error {
|
||||
/// Returned when an index entry could not be found during
|
||||
/// lookup.
|
||||
#[error("Entry not found for key {1:?} in cache {0:?}")]
|
||||
#[diagnostic(code(cacache::entry_not_found), url(docsrs))]
|
||||
EntryNotFound(PathBuf, String),
|
||||
|
||||
/// Returned when a size check has failed.
|
||||
#[error("Size check failed.\n\tWanted: {0}\n\tActual: {1}")]
|
||||
SizeError(usize, usize),
|
||||
#[diagnostic(code(cacache::size_mismatch), url(docsrs))]
|
||||
SizeMismatch(usize, usize),
|
||||
|
||||
/// Returned when a general IO error has occurred.
|
||||
#[error("{1}")]
|
||||
#[diagnostic(code(cacache::io_error), url(docsrs))]
|
||||
IoError(#[source] std::io::Error, String),
|
||||
|
||||
/// Returned when a general serde error has occurred.
|
||||
#[error("{1}")]
|
||||
#[diagnostic(code(cacache::serde_error), url(docsrs))]
|
||||
SerdeError(#[source] serde_json::Error, String),
|
||||
|
||||
/// Returned when an integrity check has failed.
|
||||
#[error(transparent)]
|
||||
IntegrityError {
|
||||
#[from]
|
||||
/// The underlying error
|
||||
source: ssri::Error,
|
||||
},
|
||||
|
||||
/// Returned if an internal (e.g. io) operation has failed.
|
||||
#[error(transparent)]
|
||||
InternalError {
|
||||
#[from]
|
||||
/// The underlying error
|
||||
source: InternalError,
|
||||
},
|
||||
#[diagnostic(code(cacache::integrity_error), url(docsrs))]
|
||||
IntegrityError(#[from] ssri::Error),
|
||||
}
|
||||
|
||||
/// The result type returned by calls to this library
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
|
||||
pub type InternalResult<T> = std::result::Result<T, InternalError>;
|
||||
pub trait IoErrorExt<T> {
|
||||
fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T>;
|
||||
}
|
||||
|
||||
impl<T> IoErrorExt<T> for std::result::Result<T, std::io::Error> {
|
||||
fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T> {
|
||||
match self {
|
||||
Ok(t) => Ok(t),
|
||||
Err(e) => Err(Error::IoError(e, f())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> IoErrorExt<T> for std::result::Result<T, serde_json::Error> {
|
||||
fn with_context<F: FnOnce() -> String>(self, f: F) -> Result<T> {
|
||||
match self {
|
||||
Ok(t) => Ok(t),
|
||||
Err(e) => Err(Error::SerdeError(e, f())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn io_error(err: impl Into<Box<dyn std::error::Error + Send + Sync>>) -> std::io::Error {
|
||||
std::io::Error::new(std::io::ErrorKind::Other, err)
|
||||
}
|
||||
|
|
|
|||
12
src/get.rs
12
src/get.rs
|
|
@ -97,7 +97,7 @@ impl Reader {
|
|||
if let Some(entry) = index::find_async(cache, key).await? {
|
||||
Reader::open_hash(cache, entry.integrity).await
|
||||
} else {
|
||||
return Err(Error::EntryNotFound(cache.to_path_buf(), key.into()));
|
||||
Err(Error::EntryNotFound(cache.to_path_buf(), key.into()))
|
||||
}
|
||||
}
|
||||
inner(cache.as_ref(), key.as_ref()).await
|
||||
|
|
@ -154,7 +154,7 @@ where
|
|||
if let Some(entry) = index::find_async(cache, key).await? {
|
||||
read_hash(cache, &entry.integrity).await
|
||||
} else {
|
||||
return Err(Error::EntryNotFound(cache.to_path_buf(), key.into()));
|
||||
Err(Error::EntryNotFound(cache.to_path_buf(), key.into()))
|
||||
}
|
||||
}
|
||||
inner(cache.as_ref(), key.as_ref()).await
|
||||
|
|
@ -206,7 +206,7 @@ where
|
|||
if let Some(entry) = index::find_async(cache, key).await? {
|
||||
copy_hash(cache, &entry.integrity, to).await
|
||||
} else {
|
||||
return Err(Error::EntryNotFound(cache.to_path_buf(), key.into()));
|
||||
Err(Error::EntryNotFound(cache.to_path_buf(), key.into()))
|
||||
}
|
||||
}
|
||||
inner(cache.as_ref(), key.as_ref(), to.as_ref()).await
|
||||
|
|
@ -319,7 +319,7 @@ impl SyncReader {
|
|||
if let Some(entry) = index::find(cache, key)? {
|
||||
SyncReader::open_hash(cache, entry.integrity)
|
||||
} else {
|
||||
return Err(Error::EntryNotFound(cache.to_path_buf(), key.into()));
|
||||
Err(Error::EntryNotFound(cache.to_path_buf(), key.into()))
|
||||
}
|
||||
}
|
||||
inner(cache.as_ref(), key.as_ref())
|
||||
|
|
@ -372,7 +372,7 @@ where
|
|||
if let Some(entry) = index::find(cache, key)? {
|
||||
read_hash_sync(cache, &entry.integrity)
|
||||
} else {
|
||||
return Err(Error::EntryNotFound(cache.to_path_buf(), key.into()));
|
||||
Err(Error::EntryNotFound(cache.to_path_buf(), key.into()))
|
||||
}
|
||||
}
|
||||
inner(cache.as_ref(), key.as_ref())
|
||||
|
|
@ -420,7 +420,7 @@ where
|
|||
if let Some(entry) = index::find(cache, key)? {
|
||||
copy_hash_sync(cache, &entry.integrity, to)
|
||||
} else {
|
||||
return Err(Error::EntryNotFound(cache.to_path_buf(), key.into()));
|
||||
Err(Error::EntryNotFound(cache.to_path_buf(), key.into()))
|
||||
}
|
||||
}
|
||||
inner(cache.as_ref(), key.as_ref(), to.as_ref())
|
||||
|
|
|
|||
34
src/index.rs
34
src/index.rs
|
|
@ -16,7 +16,7 @@ use ssri::Integrity;
|
|||
use walkdir::WalkDir;
|
||||
|
||||
use crate::async_lib::{AsyncBufReadExt, AsyncWriteExt};
|
||||
use crate::errors::{Internal, InternalResult, Result};
|
||||
use crate::errors::{IoErrorExt, Result};
|
||||
use crate::put::WriteOpts;
|
||||
|
||||
const INDEX_VERSION: &str = "5";
|
||||
|
|
@ -220,16 +220,32 @@ pub async fn delete_async(cache: &Path, key: &str) -> Result<()> {
|
|||
}
|
||||
|
||||
pub fn ls(cache: &Path) -> impl Iterator<Item = Result<Metadata>> {
|
||||
WalkDir::new(cache.join(format!("index-v{INDEX_VERSION}")))
|
||||
let cache_path = cache.join(format!("index-v{INDEX_VERSION}"));
|
||||
let cloned = cache_path.clone();
|
||||
WalkDir::new(&cache_path)
|
||||
.into_iter()
|
||||
.map(|bucket| {
|
||||
let bucket = bucket.to_internal()?;
|
||||
.map(move |bucket| {
|
||||
let bucket = bucket
|
||||
.map_err(|e| match e.io_error() {
|
||||
Some(io_err) => std::io::Error::new(io_err.kind(), io_err.kind().to_string()),
|
||||
None => crate::errors::io_error("Unexpected error"),
|
||||
})
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Error while walking cache index directory at {}",
|
||||
cloned.display()
|
||||
)
|
||||
})?;
|
||||
|
||||
if bucket.file_type().is_dir() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
Ok(bucket_entries(bucket.path())?
|
||||
let owned_path = bucket.path().to_owned();
|
||||
Ok(bucket_entries(bucket.path())
|
||||
.with_context(|| {
|
||||
format!("Error getting bucket entries from {}", owned_path.display())
|
||||
})?
|
||||
.into_iter()
|
||||
.collect::<HashSet<SerializableMetadata>>()
|
||||
.into_iter()
|
||||
|
|
@ -282,7 +298,7 @@ fn now() -> u128 {
|
|||
.as_millis()
|
||||
}
|
||||
|
||||
fn bucket_entries(bucket: &Path) -> InternalResult<Vec<SerializableMetadata>> {
|
||||
fn bucket_entries(bucket: &Path) -> std::io::Result<Vec<SerializableMetadata>> {
|
||||
use std::io::{BufRead, BufReader};
|
||||
fs::File::open(bucket)
|
||||
.map(|file| {
|
||||
|
|
@ -303,18 +319,18 @@ fn bucket_entries(bucket: &Path) -> InternalResult<Vec<SerializableMetadata>> {
|
|||
if err.kind() == ErrorKind::NotFound {
|
||||
Ok(Vec::new())
|
||||
} else {
|
||||
Err(err).to_internal()?
|
||||
Err(err)?
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async fn bucket_entries_async(bucket: &Path) -> InternalResult<Vec<SerializableMetadata>> {
|
||||
async fn bucket_entries_async(bucket: &Path) -> std::io::Result<Vec<SerializableMetadata>> {
|
||||
let file_result = crate::async_lib::File::open(bucket).await;
|
||||
let file = if let Err(err) = file_result {
|
||||
if err.kind() == ErrorKind::NotFound {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
return Err(err).to_internal()?;
|
||||
return Err(err)?;
|
||||
} else {
|
||||
file_result.unwrap()
|
||||
};
|
||||
|
|
|
|||
56
src/ls.rs
56
src/ls.rs
|
|
@ -1,28 +1,28 @@
|
|||
//! Functions for iterating over the cache.
|
||||
use std::path::Path;
|
||||
|
||||
use crate::errors::Result;
|
||||
use crate::index;
|
||||
|
||||
/// Returns a synchronous iterator that lists all cache index entries.
|
||||
pub fn list_sync<P: AsRef<Path>>(cache: P) -> impl Iterator<Item = Result<index::Metadata>> {
|
||||
index::ls(cache.as_ref())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_list_sync() {
|
||||
// check that the public interface to list elements can actually use the
|
||||
// Iterator::Item
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
|
||||
assert!(list_sync(dir)
|
||||
.map(|x| Ok(x?.key))
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.is_err())
|
||||
}
|
||||
}
|
||||
//! Functions for iterating over the cache.
|
||||
use std::path::Path;
|
||||
|
||||
use crate::errors::Result;
|
||||
use crate::index;
|
||||
|
||||
/// Returns a synchronous iterator that lists all cache index entries.
|
||||
pub fn list_sync<P: AsRef<Path>>(cache: P) -> impl Iterator<Item = Result<index::Metadata>> {
|
||||
index::ls(cache.as_ref())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_list_sync() {
|
||||
// check that the public interface to list elements can actually use the
|
||||
// Iterator::Item
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let dir = tmp.path().to_owned();
|
||||
|
||||
assert!(list_sync(dir)
|
||||
.map(|x| Ok(x?.key))
|
||||
.collect::<Result<Vec<_>>>()
|
||||
.is_err())
|
||||
}
|
||||
}
|
||||
|
|
|
|||
24
src/put.rs
24
src/put.rs
|
|
@ -8,7 +8,7 @@ use ssri::{Algorithm, Integrity};
|
|||
|
||||
use crate::async_lib::{AsyncWrite, AsyncWriteExt};
|
||||
use crate::content::write;
|
||||
use crate::errors::{Error, Internal, Result};
|
||||
use crate::errors::{Error, IoErrorExt, Result};
|
||||
use crate::index;
|
||||
|
||||
use std::task::{Context as TaskContext, Poll};
|
||||
|
|
@ -38,10 +38,7 @@ where
|
|||
.open(cache, key)
|
||||
.await?;
|
||||
writer.write_all(data).await.with_context(|| {
|
||||
format!(
|
||||
"Failed to write to cache data for key {} for cache at {:?}",
|
||||
key, cache
|
||||
)
|
||||
format!("Failed to write to cache data for key {key} for cache at {cache:?}")
|
||||
})?;
|
||||
writer.commit().await
|
||||
}
|
||||
|
|
@ -74,7 +71,7 @@ where
|
|||
writer
|
||||
.write_all(data)
|
||||
.await
|
||||
.with_context(|| format!("Failed to write to cache data for cache at {:?}", cache))?;
|
||||
.with_context(|| format!("Failed to write to cache data for cache at {cache:?}"))?;
|
||||
writer.commit().await
|
||||
}
|
||||
inner(cache.as_ref(), data.as_ref()).await
|
||||
|
|
@ -165,7 +162,7 @@ impl Writer {
|
|||
}
|
||||
if let Some(size) = self.opts.size {
|
||||
if size != self.written {
|
||||
return Err(Error::SizeError(size, self.written));
|
||||
return Err(Error::SizeMismatch(size, self.written));
|
||||
}
|
||||
}
|
||||
if let Some(key) = self.key {
|
||||
|
|
@ -196,10 +193,7 @@ where
|
|||
fn inner(cache: &Path, key: &str, data: &[u8]) -> Result<Integrity> {
|
||||
let mut writer = SyncWriter::create(cache, key)?;
|
||||
writer.write_all(data).with_context(|| {
|
||||
format!(
|
||||
"Failed to write to cache data for key {} for cache at {:?}",
|
||||
key, cache
|
||||
)
|
||||
format!("Failed to write to cache data for key {key} for cache at {cache:?}")
|
||||
})?;
|
||||
writer.written = data.as_ref().len();
|
||||
writer.commit()
|
||||
|
|
@ -230,7 +224,7 @@ where
|
|||
.open_hash_sync(cache)?;
|
||||
writer
|
||||
.write_all(data)
|
||||
.with_context(|| format!("Failed to write to cache data for cache at {:?}", cache))?;
|
||||
.with_context(|| format!("Failed to write to cache data for cache at {cache:?}"))?;
|
||||
writer.written = data.len();
|
||||
writer.commit()
|
||||
}
|
||||
|
|
@ -265,7 +259,7 @@ impl WriteOpts {
|
|||
key: Some(String::from(key)),
|
||||
written: 0,
|
||||
writer: write::AsyncWriter::new(
|
||||
cache.as_ref(),
|
||||
cache,
|
||||
me.algorithm.unwrap_or(Algorithm::Sha256),
|
||||
None,
|
||||
)
|
||||
|
|
@ -310,7 +304,7 @@ impl WriteOpts {
|
|||
key: Some(String::from(key)),
|
||||
written: 0,
|
||||
writer: write::Writer::new(
|
||||
cache.as_ref(),
|
||||
cache,
|
||||
me.algorithm.unwrap_or(Algorithm::Sha256),
|
||||
me.size,
|
||||
)?,
|
||||
|
|
@ -441,7 +435,7 @@ impl SyncWriter {
|
|||
}
|
||||
if let Some(size) = self.opts.size {
|
||||
if size != self.written {
|
||||
return Err(Error::SizeError(size, self.written));
|
||||
return Err(Error::SizeMismatch(size, self.written));
|
||||
}
|
||||
}
|
||||
if let Some(key) = self.key {
|
||||
|
|
|
|||
29
src/rm.rs
29
src/rm.rs
|
|
@ -5,7 +5,7 @@ use std::path::Path;
|
|||
use ssri::Integrity;
|
||||
|
||||
use crate::content::rm;
|
||||
use crate::errors::{Internal, Result};
|
||||
use crate::errors::{IoErrorExt, Result};
|
||||
use crate::index;
|
||||
|
||||
/// Removes an individual index metadata entry. The associated content will be
|
||||
|
|
@ -91,10 +91,19 @@ pub async fn remove_hash<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<()
|
|||
/// ```
|
||||
pub async fn clear<P: AsRef<Path>>(cache: P) -> Result<()> {
|
||||
async fn inner(cache: &Path) -> Result<()> {
|
||||
for entry in cache.read_dir().to_internal()?.flatten() {
|
||||
for entry in cache
|
||||
.read_dir()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read directory contents while clearing cache, at {}",
|
||||
cache.display()
|
||||
)
|
||||
})?
|
||||
.flatten()
|
||||
{
|
||||
crate::async_lib::remove_dir_all(entry.path())
|
||||
.await
|
||||
.to_internal()?;
|
||||
.with_context(|| format!("Failed to clear cache at {}", cache.display()))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -178,8 +187,18 @@ pub fn remove_hash_sync<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<()>
|
|||
/// ```
|
||||
pub fn clear_sync<P: AsRef<Path>>(cache: P) -> Result<()> {
|
||||
fn inner(cache: &Path) -> Result<()> {
|
||||
for entry in cache.read_dir().to_internal()?.flatten() {
|
||||
fs::remove_dir_all(entry.path()).to_internal()?;
|
||||
for entry in cache
|
||||
.read_dir()
|
||||
.with_context(|| {
|
||||
format!(
|
||||
"Failed to read directory contents while clearing cache, at {}",
|
||||
cache.display()
|
||||
)
|
||||
})?
|
||||
.flatten()
|
||||
{
|
||||
fs::remove_dir_all(entry.path())
|
||||
.with_context(|| format!("Failed to clear cache at {}", cache.display()))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue