feat(errors): improved errors messaging and context (#20)

Fixes: #19

BREAKING CHANGE: Error values have changed. If you were inspecting or
matching against actual return values, you'll need to change your code
to use the new enums.
This commit is contained in:
Kat Marchán 2019-10-20 23:43:17 -04:00
parent 649398512f
commit 62298cdf35
No known key found for this signature in database
GPG Key ID: AEB529C08A3C7E9E
9 changed files with 228 additions and 103 deletions

6
Cargo.lock generated
View File

@ -173,7 +173,7 @@ dependencies = [
"serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)",
"sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ssri 4.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"ssri 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"thiserror 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)",
"walkdir 2.2.9 (registry+https://github.com/rust-lang/crates.io-index)",
@ -897,7 +897,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ssri"
version = "4.1.0"
version = "5.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -1153,7 +1153,7 @@ dependencies = [
"checksum sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b4d8bfd0e469f417657573d8451fb33d16cfe0989359b93baf3a1ffc639543d"
"checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
"checksum smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "ab606a9c5e214920bb66c458cd7be8ef094f813f20fe77a54cc7dbfff220d4b7"
"checksum ssri 4.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c848bd203b24e8fa9512198fb91cafad085e4b43917c0d39c53bd59178d9f17b"
"checksum ssri 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e875e58f12c33f832d09cff94b079aee84a48d8188d7a7132b3434358afb70c9"
"checksum syn 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "66850e97125af79138385e9b88339cbcd037e3f28ceab8c5ad98e64f0f1f80bf"
"checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9"
"checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"

View File

@ -17,7 +17,7 @@ categories = [
maintenance = { status = "actively-developed" }
[dependencies]
ssri = "4.1.0"
ssri = "5.0.0"
hex = "0.3.2"
tempfile = "3.0.8"
sha-1 = "0.8.1"

View File

@ -44,6 +44,7 @@ Using [`cargo-edit`](https://crates.io/crates/cargo-edit)
- Fault tolerance (immune to corruption, partial writes, process races, etc)
- Consistency guarantees on read and write (full data verification)
- Lockless, high-concurrency cache access
- Really helpful, contextual error messages
- Large file support
- Pretty darn fast
- Arbitrary metadata storage

View File

@ -9,7 +9,6 @@ use futures::prelude::*;
use ssri::{Algorithm, Integrity, IntegrityChecker};
use crate::content::path;
use crate::errors::Error;
pub struct Reader {
fd: File,
@ -26,9 +25,7 @@ impl std::io::Read for Reader {
impl Reader {
pub fn check(self) -> Result<Algorithm> {
self.checker
.result()
.ok_or(anyhow::Error::new(Error::IntegrityError))
Ok(self.checker.result()?)
}
}
@ -51,9 +48,7 @@ impl AsyncRead for AsyncReader {
impl AsyncReader {
pub fn check(self) -> Result<Algorithm> {
self.checker
.result()
.ok_or(anyhow::Error::new(Error::IntegrityError))
Ok(self.checker.result()?)
}
}
@ -76,43 +71,31 @@ pub async fn open_async(cache: &Path, sri: Integrity) -> Result<AsyncReader> {
pub fn read(cache: &Path, sri: &Integrity) -> Result<Vec<u8>> {
let cpath = path::content_path(&cache, &sri);
let ret = fs::read(&cpath)?;
if sri.check(&ret).is_some() {
Ok(ret)
} else {
Err(Error::IntegrityError)?
}
sri.check(&ret)?;
Ok(ret)
}
pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result<Vec<u8>> {
let cpath = path::content_path(&cache, &sri);
let ret = async_std::fs::read(&cpath).await?;
if sri.check(&ret).is_some() {
Ok(ret)
} else {
Err(Error::IntegrityError)?
}
sri.check(&ret)?;
Ok(ret)
}
pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result<u64> {
let cpath = path::content_path(&cache, &sri);
let ret = fs::copy(&cpath, to)?;
let data = fs::read(cpath)?;
if sri.check(data).is_some() {
Ok(ret)
} else {
Err(Error::IntegrityError)?
}
sri.check(data)?;
Ok(ret)
}
pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result<u64> {
let cpath = path::content_path(&cache, &sri);
let ret = async_std::fs::copy(&cpath, to).await?;
let data = async_std::fs::read(cpath).await?;
if sri.check(data).is_some() {
Ok(ret)
} else {
Err(Error::IntegrityError)?
}
sri.check(data)?;
Ok(ret)
}
pub fn has_content(cache: &Path, sri: &Integrity) -> Option<Integrity> {

View File

@ -1,16 +1,19 @@
use std::path::PathBuf;
use ssri::Integrity;
use thiserror::Error;
/// Error type returned by all API calls.
#[derive(Error, Debug)]
pub enum Error {
/// Returned when an index or content entry could not be found during
/// Returned when an index entry could not be found during
/// lookup.
#[error("not found")]
NotFound,
#[error("Entry not found for key {1:?} in cache {0:?}")]
EntryNotFound(PathBuf, String),
/// Returned when an integrity check has failed.
#[error("integrity check failed")]
IntegrityError,
#[error("Integrity check failed.\n\tWanted: {0}\n\tActual: {1}")]
IntegrityError(Integrity, Integrity),
/// Returned when a size check has failed.
#[error("size check failed")]
SizeError,
#[error("Size check failed.\n\tWanted: {0}\n\tActual: {1}")]
SizeError(usize, usize),
}

View File

@ -1,11 +1,11 @@
//! Functions for reading from cache.
use std::path::Path;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::task::{Context as TaskContext, Poll};
use futures::prelude::*;
use anyhow::Result;
use anyhow::{Context, Result};
use ssri::{Algorithm, Integrity};
use crate::content::read::{self, AsyncReader, Reader};
@ -27,7 +27,7 @@ pub struct AsyncGet {
impl AsyncRead for AsyncGet {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
cx: &mut TaskContext<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.reader).poll_read(cx, buf)
@ -61,7 +61,9 @@ impl AsyncGet {
/// # }
/// ```
pub fn check(self) -> Result<Algorithm> {
self.reader.check()
self.reader
.check()
.context("Cache read data verification check failed.")
}
}
@ -97,7 +99,10 @@ where
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
open_hash(cache, entry.integrity).await
} else {
Err(Error::NotFound)?
Err(Error::EntryNotFound(
cache.as_ref().to_path_buf(),
key.as_ref().into(),
))?
}
}
@ -162,7 +167,10 @@ where
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
data_hash(cache, &entry.integrity).await
} else {
Err(Error::NotFound)?
Err(Error::EntryNotFound(
cache.as_ref().to_path_buf(),
key.as_ref().into(),
))?
}
}
@ -222,7 +230,10 @@ where
if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? {
copy_hash(cache, &entry.integrity, to).await
} else {
Err(Error::NotFound)?
Err(Error::EntryNotFound(
cache.as_ref().to_path_buf(),
key.as_ref().into(),
))?
}
}
@ -308,7 +319,9 @@ impl SyncGet {
/// # }
/// ```
pub fn check(self) -> Result<Algorithm> {
self.reader.check()
self.reader
.check()
.context("Cache read data verification check failed.")
}
}
@ -336,7 +349,10 @@ where
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? {
open_hash_sync(cache, entry.integrity)
} else {
Err(Error::NotFound)?
Err(Error::EntryNotFound(
cache.as_ref().to_path_buf(),
key.as_ref().into(),
))?
}
}
@ -385,7 +401,10 @@ where
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? {
data_hash_sync(cache, &entry.integrity)
} else {
Err(Error::NotFound)?
Err(Error::EntryNotFound(
cache.as_ref().to_path_buf(),
key.as_ref().into(),
))?
}
}
@ -429,7 +448,10 @@ where
if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? {
copy_hash_sync(cache, &entry.integrity, to)
} else {
Err(Error::NotFound)?
Err(Error::EntryNotFound(
cache.as_ref().to_path_buf(),
key.as_ref().into(),
))?
}
}

View File

@ -67,27 +67,45 @@ pub fn insert(cache: &Path, key: &str, opts: PutOpts) -> Result<Integrity> {
let bucket = bucket_path(&cache, &key);
#[cfg(unix)]
{
if let Some(path) = mkdirp::mkdirp(bucket.parent().unwrap())? {
chownr::chownr(&path, opts.uid, opts.gid)?;
if let Some(path) = mkdirp::mkdirp(bucket.parent().unwrap()).with_context(|| {
format!(
"Failed to create index bucket directory: {:?}",
bucket.parent().unwrap()
)
})? {
chownr::chownr(&path, opts.uid, opts.gid)
.with_context(|| format!("Failed to chown new index directories: {:?}", path))?;
}
}
#[cfg(windows)]
mkdirp::mkdirp(bucket.parent().unwrap())?;
mkdirp::mkdirp(bucket.parent().unwrap()).with_context(|| {
format!(
"Failed to create index bucket directory: {:?}",
bucket.parent().unwrap()
)
})?;
let stringified = serde_json::to_string(&SerializableEntry {
key: key.to_owned(),
integrity: opts.sri.clone().map(|x| x.to_string()),
time: opts.time.unwrap_or_else(now),
size: opts.size.unwrap_or(0),
metadata: opts.metadata.unwrap_or_else(|| json!(null)),
})?;
})
.with_context(|| format!("Failed to serialize entry with key `{}`", key))?;
let mut buck = OpenOptions::new().create(true).append(true).open(&bucket)?;
let mut buck = OpenOptions::new()
.create(true)
.append(true)
.open(&bucket)
.with_context(|| format!("Failed to create or open index bucket at {:?}", bucket))?;
let out = format!("\n{}\t{}", hash_entry(&stringified), stringified);
buck.write_all(out.as_bytes())?;
buck.write_all(out.as_bytes())
.with_context(|| format!("Failed to write to index bucket at {:?}", bucket))?;
buck.flush()?;
#[cfg(unix)]
chownr::chownr(&bucket, opts.uid, opts.gid)?;
chownr::chownr(&bucket, opts.uid, opts.gid)
.with_context(|| format!("Failed to chown index bucket at {:?}", bucket))?;
Ok(opts
.sri
.or_else(|| "sha1-deadbeef".parse::<Integrity>().ok())
@ -115,7 +133,8 @@ pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: PutOpts) -> R
}
}
#[cfg(windows)]
mkdirp::mkdirp(parent)?;
mkdirp::mkdirp(parent)
.with_context(|| format!("failed to create index bucket parent dir: {:?}", parent))?;
Ok::<(), anyhow::Error>(())
})
.await?;
@ -125,19 +144,24 @@ pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: PutOpts) -> R
time: opts.time.unwrap_or_else(now),
size: opts.size.unwrap_or(0),
metadata: opts.metadata.unwrap_or_else(|| json!(null)),
})?;
})
.with_context(|| format!("Failed to serialize entry with key `{}`", key))?;
let mut buck = async_std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(&bucket)
.await?;
.await
.with_context(|| format!("Failed to create or open index bucket at {:?}", bucket))?;
let out = format!("\n{}\t{}", hash_entry(&stringified), stringified);
buck.write_all(out.as_bytes()).await?;
buck.write_all(out.as_bytes())
.await
.with_context(|| format!("Failed to write to index bucket at {:?}", bucket))?;
buck.flush().await?;
#[cfg(unix)]
chownr::chownr(&bucket, opts.uid, opts.gid)?;
chownr::chownr(&bucket, opts.uid, opts.gid)
.with_context(|| format!("Failed to chown index bucket at {:?}", bucket))?;
Ok(opts
.sri
.or_else(|| "sha1-deadbeef".parse::<Integrity>().ok())
@ -146,7 +170,8 @@ pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: PutOpts) -> R
pub fn find(cache: &Path, key: &str) -> Result<Option<Entry>> {
let bucket = bucket_path(cache, &key);
Ok(bucket_entries(&bucket)?
Ok(bucket_entries(&bucket)
.with_context(|| format!("Failed to read index bucket entries from {:?}", bucket))?
.into_iter()
.fold(None, |acc, entry| {
if entry.key == key {
@ -174,7 +199,8 @@ pub fn find(cache: &Path, key: &str) -> Result<Option<Entry>> {
pub async fn find_async(cache: &Path, key: &str) -> Result<Option<Entry>> {
let bucket = bucket_path(cache, &key);
Ok(bucket_entries_async(&bucket)
.await?
.await
.with_context(|| format!("Failed to read index bucket entries from {:?}", bucket))?
.into_iter()
.fold(None, |acc, entry| {
if entry.key == key {

View File

@ -5,7 +5,7 @@ use std::pin::Pin;
use futures::prelude::*;
use anyhow::Result;
use anyhow::{Context, Result};
#[cfg(unix)]
use nix::unistd::{Gid, Uid};
use serde_json::Value;
@ -15,7 +15,7 @@ use crate::content::write;
use crate::errors::Error;
use crate::index;
use std::task::{Context, Poll};
use std::task::{Context as TaskContext, Poll};
/// Writes `data` to the `cache`, indexing it under `key`.
///
@ -45,9 +45,28 @@ where
let mut writer = PutOpts::new()
.algorithm(Algorithm::Sha256)
.open(cache.as_ref(), key.as_ref())
.await?;
writer.write_all(data.as_ref()).await?;
writer.commit().await
.await
.with_context(|| {
format!(
"Failed to open a write handle for key {} for cache at {:?}",
key.as_ref(),
cache.as_ref()
)
})?;
writer.write_all(data.as_ref()).await.with_context(|| {
format!(
"Failed to write to cache data for key {} for cache at {:?}",
key.as_ref(),
cache.as_ref()
)
})?;
writer.commit().await.with_context(|| {
format!(
"Failed to write to commit data for key {} for cache at {:?}",
key.as_ref(),
cache.as_ref()
)
})
}
/// A reference to an open file writing to the cache.
@ -62,17 +81,17 @@ pub struct AsyncPut {
impl AsyncWrite for AsyncPut {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
cx: &mut TaskContext<'_>,
buf: &[u8],
) -> Poll<std::io::Result<usize>> {
Pin::new(&mut self.writer).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::io::Result<()>> {
fn poll_close(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<std::io::Result<()>> {
Pin::new(&mut self.writer).poll_close(cx)
}
}
@ -83,27 +102,41 @@ impl AsyncPut {
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub async fn commit(mut self) -> Result<Integrity> {
let writer_sri = self.writer.close().await?;
let key = self.key;
let cache = self.cache;
let writer_sri = self.writer.close().await.with_context(|| {
format!(
"Failed to properly close save file data for key {} in cache at {:?}",
key, cache
)
})?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
let algo = sri.pick_algorithm();
let matched = sri
.hashes
.iter()
.take_while(|h| h.algorithm == algo)
.find(|&h| *h == writer_sri.hashes[0]);
if matched.is_none() {
return Err(Error::IntegrityError)?;
if sri.matches(&writer_sri).is_none() {
return Err(Error::IntegrityError(sri.clone(), writer_sri)).with_context(|| {
format!(
"Failed to verify data integrity while inserting {} into cache at {:?}",
key, cache
)
})?;
}
} else {
self.opts.sri = Some(writer_sri);
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError)?;
return Err(Error::SizeError(size, self.written)).with_context(|| {
format!("A size was passed in but the value inserted into {} could not be verified for cache at {:?}", key, cache)
})?;
}
}
index::insert_async(&self.cache, &self.key, self.opts).await
index::insert_async(&cache, &key, self.opts)
.await
.with_context(|| {
format!(
"Failed to write index entry for {} in cache at {:?}",
key, cache
)
})
}
}
@ -126,10 +159,28 @@ where
{
let mut writer = PutOpts::new()
.algorithm(Algorithm::Sha256)
.open_sync(cache.as_ref(), key.as_ref())?;
writer.write_all(data.as_ref())?;
writer.flush()?;
writer.commit()
.open_sync(cache.as_ref(), key.as_ref())
.with_context(|| {
format!(
"Failed to open a write handle for key {} for cache at {:?}",
key.as_ref(),
cache.as_ref()
)
})?;
writer.write_all(data.as_ref()).with_context(|| {
format!(
"Failed to write to cache data for key {} for cache at {:?}",
key.as_ref(),
cache.as_ref()
)
})?;
writer.commit().with_context(|| {
format!(
"Failed to write to commit data for key {} for cache at {:?}",
key.as_ref(),
cache.as_ref()
)
})
}
/// Builder for options and flags for opening a new cache file to write data into.
@ -258,27 +309,40 @@ impl SyncPut {
/// Must be called manually in order to complete the writing process,
/// otherwise everything will be thrown out.
pub fn commit(mut self) -> Result<Integrity> {
let writer_sri = self.writer.close()?;
let key = self.key;
let cache = self.cache;
let writer_sri = self.writer.close().with_context(|| {
format!(
"Failed to properly close save file data for key {} in cache at {:?}",
key, cache
)
})?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
let algo = sri.pick_algorithm();
let matched = sri
.hashes
.iter()
.take_while(|h| h.algorithm == algo)
.find(|&h| *h == writer_sri.hashes[0]);
if matched.is_none() {
return Err(Error::IntegrityError)?;
if sri.matches(&writer_sri).is_none() {
return Err(Error::IntegrityError(sri.clone(), writer_sri)).with_context(|| {
format!(
"Failed to verify data integrity while inserting {} into cache at {:?}",
key, cache
)
})?;
}
} else {
self.opts.sri = Some(writer_sri);
}
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError)?;
return Err(Error::SizeError(size, self.written)).with_context(|| {
format!("A size was passed in but the value inserted into {} could not be verified for cache at {:?}", key, cache)
})?;
}
}
Ok(index::insert(&self.cache, &self.key, self.opts)?)
index::insert(&cache, &key, self.opts).with_context(|| {
format!(
"Failed to write index entry for {} in cache at {:?}",
key, cache
)
})
}
}

View File

@ -4,7 +4,7 @@ use std::path::Path;
use async_std::fs as afs;
use anyhow::Result;
use anyhow::{Context, Result};
use ssri::Integrity;
use crate::content::rm;
@ -43,7 +43,15 @@ where
P: AsRef<Path>,
K: AsRef<str>,
{
index::delete_async(cache.as_ref(), key.as_ref()).await
index::delete_async(cache.as_ref(), key.as_ref())
.await
.with_context(|| {
format!(
"Failed to delete cache entry for {} in cache at {:?}",
key.as_ref(),
cache.as_ref()
)
})
}
/// Removes an individual content entry. Any index entries pointing to this
@ -76,7 +84,13 @@ where
/// # }
/// ```
pub async fn content<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<()> {
rm::rm_async(cache.as_ref(), &sri).await
rm::rm_async(cache.as_ref(), &sri).await.with_context(|| {
format!(
"Failed to remove content under {} in cache at {:?}",
sri.to_string(),
cache.as_ref()
)
})
}
/// Removes entire contents of the cache, including temporary files, the entry
@ -140,7 +154,13 @@ where
P: AsRef<Path>,
K: AsRef<str>,
{
index::delete(cache.as_ref(), key.as_ref())
index::delete(cache.as_ref(), key.as_ref()).with_context(|| {
format!(
"Failed to delete cache entry for {} in cache at {:?}",
key.as_ref(),
cache.as_ref()
)
})
}
/// Removes an individual content entry synchronously. Any index entries
@ -165,7 +185,13 @@ where
/// # }
/// ```
pub fn content_sync<P: AsRef<Path>>(cache: P, sri: &Integrity) -> Result<()> {
rm::rm(cache.as_ref(), &sri)
rm::rm(cache.as_ref(), &sri).with_context(|| {
format!(
"Failed to remove content under {} in cache at {:?}",
sri.to_string(),
cache.as_ref()
)
})
}
/// Removes entire contents of the cache synchronously, including temporary