From 62298cdf351d7ed10b54417ae7a702d07b4b4765 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kat=20March=C3=A1n?= Date: Sun, 20 Oct 2019 23:43:17 -0400 Subject: [PATCH] feat(errors): improved errors messaging and context (#20) Fixes: #19 BREAKING CHANGE: Error values have changed. If you were inspecting or matching against actual return values, you'll need to change your code to use the new enums. --- Cargo.lock | 6 +- Cargo.toml | 2 +- README.md | 1 + src/content/read.rs | 37 ++++-------- src/errors.rs | 17 +++--- src/get.rs | 44 +++++++++++---- src/index.rs | 54 +++++++++++++----- src/put.rs | 134 ++++++++++++++++++++++++++++++++------------ src/rm.rs | 36 ++++++++++-- 9 files changed, 228 insertions(+), 103 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6eeb696..e8cb35d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,7 +173,7 @@ dependencies = [ "serde_json 1.0.41 (registry+https://github.com/rust-lang/crates.io-index)", "sha-1 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", - "ssri 4.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "ssri 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "thiserror 1.0.3 (registry+https://github.com/rust-lang/crates.io-index)", "walkdir 2.2.9 (registry+https://github.com/rust-lang/crates.io-index)", @@ -897,7 +897,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [[package]] name = "ssri" -version = "4.1.0" +version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ "base64 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -1153,7 +1153,7 @@ dependencies = [ "checksum sha2 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7b4d8bfd0e469f417657573d8451fb33d16cfe0989359b93baf3a1ffc639543d" "checksum slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" "checksum smallvec 0.6.10 (registry+https://github.com/rust-lang/crates.io-index)" = "ab606a9c5e214920bb66c458cd7be8ef094f813f20fe77a54cc7dbfff220d4b7" -"checksum ssri 4.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "c848bd203b24e8fa9512198fb91cafad085e4b43917c0d39c53bd59178d9f17b" +"checksum ssri 5.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e875e58f12c33f832d09cff94b079aee84a48d8188d7a7132b3434358afb70c9" "checksum syn 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "66850e97125af79138385e9b88339cbcd037e3f28ceab8c5ad98e64f0f1f80bf" "checksum tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6e24d9338a0a5be79593e2fa15a648add6138caa803e2d5bc782c371732ca9" "checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" diff --git a/Cargo.toml b/Cargo.toml index f27a649..65d92ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ categories = [ maintenance = { status = "actively-developed" } [dependencies] -ssri = "4.1.0" +ssri = "5.0.0" hex = "0.3.2" tempfile = "3.0.8" sha-1 = "0.8.1" diff --git a/README.md b/README.md index 33a100a..eead088 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,7 @@ Using [`cargo-edit`](https://crates.io/crates/cargo-edit) - Fault tolerance (immune to corruption, partial writes, process races, etc) - Consistency guarantees on read and write (full data verification) - Lockless, high-concurrency cache access +- Really helpful, contextual error messages - Large file support - Pretty darn fast - Arbitrary metadata storage diff --git a/src/content/read.rs b/src/content/read.rs index 14bf7c6..304d243 100644 --- a/src/content/read.rs +++ b/src/content/read.rs @@ -9,7 +9,6 @@ use futures::prelude::*; use ssri::{Algorithm, Integrity, IntegrityChecker}; use crate::content::path; -use crate::errors::Error; pub struct Reader { fd: File, @@ -26,9 +25,7 @@ impl std::io::Read for Reader { impl Reader { pub fn check(self) -> Result { - self.checker - .result() - .ok_or(anyhow::Error::new(Error::IntegrityError)) + Ok(self.checker.result()?) } } @@ -51,9 +48,7 @@ impl AsyncRead for AsyncReader { impl AsyncReader { pub fn check(self) -> Result { - self.checker - .result() - .ok_or(anyhow::Error::new(Error::IntegrityError)) + Ok(self.checker.result()?) } } @@ -76,43 +71,31 @@ pub async fn open_async(cache: &Path, sri: Integrity) -> Result { pub fn read(cache: &Path, sri: &Integrity) -> Result> { let cpath = path::content_path(&cache, &sri); let ret = fs::read(&cpath)?; - if sri.check(&ret).is_some() { - Ok(ret) - } else { - Err(Error::IntegrityError)? - } + sri.check(&ret)?; + Ok(ret) } pub async fn read_async<'a>(cache: &'a Path, sri: &'a Integrity) -> Result> { let cpath = path::content_path(&cache, &sri); let ret = async_std::fs::read(&cpath).await?; - if sri.check(&ret).is_some() { - Ok(ret) - } else { - Err(Error::IntegrityError)? - } + sri.check(&ret)?; + Ok(ret) } pub fn copy(cache: &Path, sri: &Integrity, to: &Path) -> Result { let cpath = path::content_path(&cache, &sri); let ret = fs::copy(&cpath, to)?; let data = fs::read(cpath)?; - if sri.check(data).is_some() { - Ok(ret) - } else { - Err(Error::IntegrityError)? - } + sri.check(data)?; + Ok(ret) } pub async fn copy_async<'a>(cache: &'a Path, sri: &'a Integrity, to: &'a Path) -> Result { let cpath = path::content_path(&cache, &sri); let ret = async_std::fs::copy(&cpath, to).await?; let data = async_std::fs::read(cpath).await?; - if sri.check(data).is_some() { - Ok(ret) - } else { - Err(Error::IntegrityError)? - } + sri.check(data)?; + Ok(ret) } pub fn has_content(cache: &Path, sri: &Integrity) -> Option { diff --git a/src/errors.rs b/src/errors.rs index 79c9c2a..c2759dc 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,16 +1,19 @@ +use std::path::PathBuf; + +use ssri::Integrity; use thiserror::Error; /// Error type returned by all API calls. #[derive(Error, Debug)] pub enum Error { - /// Returned when an index or content entry could not be found during + /// Returned when an index entry could not be found during /// lookup. - #[error("not found")] - NotFound, + #[error("Entry not found for key {1:?} in cache {0:?}")] + EntryNotFound(PathBuf, String), /// Returned when an integrity check has failed. - #[error("integrity check failed")] - IntegrityError, + #[error("Integrity check failed.\n\tWanted: {0}\n\tActual: {1}")] + IntegrityError(Integrity, Integrity), /// Returned when a size check has failed. - #[error("size check failed")] - SizeError, + #[error("Size check failed.\n\tWanted: {0}\n\tActual: {1}")] + SizeError(usize, usize), } diff --git a/src/get.rs b/src/get.rs index 452ed0e..21c0599 100644 --- a/src/get.rs +++ b/src/get.rs @@ -1,11 +1,11 @@ //! Functions for reading from cache. use std::path::Path; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{Context as TaskContext, Poll}; use futures::prelude::*; -use anyhow::Result; +use anyhow::{Context, Result}; use ssri::{Algorithm, Integrity}; use crate::content::read::{self, AsyncReader, Reader}; @@ -27,7 +27,7 @@ pub struct AsyncGet { impl AsyncRead for AsyncGet { fn poll_read( mut self: Pin<&mut Self>, - cx: &mut Context<'_>, + cx: &mut TaskContext<'_>, buf: &mut [u8], ) -> Poll> { Pin::new(&mut self.reader).poll_read(cx, buf) @@ -61,7 +61,9 @@ impl AsyncGet { /// # } /// ``` pub fn check(self) -> Result { - self.reader.check() + self.reader + .check() + .context("Cache read data verification check failed.") } } @@ -97,7 +99,10 @@ where if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? { open_hash(cache, entry.integrity).await } else { - Err(Error::NotFound)? + Err(Error::EntryNotFound( + cache.as_ref().to_path_buf(), + key.as_ref().into(), + ))? } } @@ -162,7 +167,10 @@ where if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? { data_hash(cache, &entry.integrity).await } else { - Err(Error::NotFound)? + Err(Error::EntryNotFound( + cache.as_ref().to_path_buf(), + key.as_ref().into(), + ))? } } @@ -222,7 +230,10 @@ where if let Some(entry) = index::find_async(cache.as_ref(), key.as_ref()).await? { copy_hash(cache, &entry.integrity, to).await } else { - Err(Error::NotFound)? + Err(Error::EntryNotFound( + cache.as_ref().to_path_buf(), + key.as_ref().into(), + ))? } } @@ -308,7 +319,9 @@ impl SyncGet { /// # } /// ``` pub fn check(self) -> Result { - self.reader.check() + self.reader + .check() + .context("Cache read data verification check failed.") } } @@ -336,7 +349,10 @@ where if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? { open_hash_sync(cache, entry.integrity) } else { - Err(Error::NotFound)? + Err(Error::EntryNotFound( + cache.as_ref().to_path_buf(), + key.as_ref().into(), + ))? } } @@ -385,7 +401,10 @@ where if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? { data_hash_sync(cache, &entry.integrity) } else { - Err(Error::NotFound)? + Err(Error::EntryNotFound( + cache.as_ref().to_path_buf(), + key.as_ref().into(), + ))? } } @@ -429,7 +448,10 @@ where if let Some(entry) = index::find(cache.as_ref(), key.as_ref())? { copy_hash_sync(cache, &entry.integrity, to) } else { - Err(Error::NotFound)? + Err(Error::EntryNotFound( + cache.as_ref().to_path_buf(), + key.as_ref().into(), + ))? } } diff --git a/src/index.rs b/src/index.rs index b1b5bd9..e5f2f34 100644 --- a/src/index.rs +++ b/src/index.rs @@ -67,27 +67,45 @@ pub fn insert(cache: &Path, key: &str, opts: PutOpts) -> Result { let bucket = bucket_path(&cache, &key); #[cfg(unix)] { - if let Some(path) = mkdirp::mkdirp(bucket.parent().unwrap())? { - chownr::chownr(&path, opts.uid, opts.gid)?; + if let Some(path) = mkdirp::mkdirp(bucket.parent().unwrap()).with_context(|| { + format!( + "Failed to create index bucket directory: {:?}", + bucket.parent().unwrap() + ) + })? { + chownr::chownr(&path, opts.uid, opts.gid) + .with_context(|| format!("Failed to chown new index directories: {:?}", path))?; } } #[cfg(windows)] - mkdirp::mkdirp(bucket.parent().unwrap())?; + mkdirp::mkdirp(bucket.parent().unwrap()).with_context(|| { + format!( + "Failed to create index bucket directory: {:?}", + bucket.parent().unwrap() + ) + })?; let stringified = serde_json::to_string(&SerializableEntry { key: key.to_owned(), integrity: opts.sri.clone().map(|x| x.to_string()), time: opts.time.unwrap_or_else(now), size: opts.size.unwrap_or(0), metadata: opts.metadata.unwrap_or_else(|| json!(null)), - })?; + }) + .with_context(|| format!("Failed to serialize entry with key `{}`", key))?; - let mut buck = OpenOptions::new().create(true).append(true).open(&bucket)?; + let mut buck = OpenOptions::new() + .create(true) + .append(true) + .open(&bucket) + .with_context(|| format!("Failed to create or open index bucket at {:?}", bucket))?; let out = format!("\n{}\t{}", hash_entry(&stringified), stringified); - buck.write_all(out.as_bytes())?; + buck.write_all(out.as_bytes()) + .with_context(|| format!("Failed to write to index bucket at {:?}", bucket))?; buck.flush()?; #[cfg(unix)] - chownr::chownr(&bucket, opts.uid, opts.gid)?; + chownr::chownr(&bucket, opts.uid, opts.gid) + .with_context(|| format!("Failed to chown index bucket at {:?}", bucket))?; Ok(opts .sri .or_else(|| "sha1-deadbeef".parse::().ok()) @@ -115,7 +133,8 @@ pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: PutOpts) -> R } } #[cfg(windows)] - mkdirp::mkdirp(parent)?; + mkdirp::mkdirp(parent) + .with_context(|| format!("failed to create index bucket parent dir: {:?}", parent))?; Ok::<(), anyhow::Error>(()) }) .await?; @@ -125,19 +144,24 @@ pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: PutOpts) -> R time: opts.time.unwrap_or_else(now), size: opts.size.unwrap_or(0), metadata: opts.metadata.unwrap_or_else(|| json!(null)), - })?; + }) + .with_context(|| format!("Failed to serialize entry with key `{}`", key))?; let mut buck = async_std::fs::OpenOptions::new() .create(true) .append(true) .open(&bucket) - .await?; + .await + .with_context(|| format!("Failed to create or open index bucket at {:?}", bucket))?; let out = format!("\n{}\t{}", hash_entry(&stringified), stringified); - buck.write_all(out.as_bytes()).await?; + buck.write_all(out.as_bytes()) + .await + .with_context(|| format!("Failed to write to index bucket at {:?}", bucket))?; buck.flush().await?; #[cfg(unix)] - chownr::chownr(&bucket, opts.uid, opts.gid)?; + chownr::chownr(&bucket, opts.uid, opts.gid) + .with_context(|| format!("Failed to chown index bucket at {:?}", bucket))?; Ok(opts .sri .or_else(|| "sha1-deadbeef".parse::().ok()) @@ -146,7 +170,8 @@ pub async fn insert_async<'a>(cache: &'a Path, key: &'a str, opts: PutOpts) -> R pub fn find(cache: &Path, key: &str) -> Result> { let bucket = bucket_path(cache, &key); - Ok(bucket_entries(&bucket)? + Ok(bucket_entries(&bucket) + .with_context(|| format!("Failed to read index bucket entries from {:?}", bucket))? .into_iter() .fold(None, |acc, entry| { if entry.key == key { @@ -174,7 +199,8 @@ pub fn find(cache: &Path, key: &str) -> Result> { pub async fn find_async(cache: &Path, key: &str) -> Result> { let bucket = bucket_path(cache, &key); Ok(bucket_entries_async(&bucket) - .await? + .await + .with_context(|| format!("Failed to read index bucket entries from {:?}", bucket))? .into_iter() .fold(None, |acc, entry| { if entry.key == key { diff --git a/src/put.rs b/src/put.rs index 87be7b8..0712489 100644 --- a/src/put.rs +++ b/src/put.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use futures::prelude::*; -use anyhow::Result; +use anyhow::{Context, Result}; #[cfg(unix)] use nix::unistd::{Gid, Uid}; use serde_json::Value; @@ -15,7 +15,7 @@ use crate::content::write; use crate::errors::Error; use crate::index; -use std::task::{Context, Poll}; +use std::task::{Context as TaskContext, Poll}; /// Writes `data` to the `cache`, indexing it under `key`. /// @@ -45,9 +45,28 @@ where let mut writer = PutOpts::new() .algorithm(Algorithm::Sha256) .open(cache.as_ref(), key.as_ref()) - .await?; - writer.write_all(data.as_ref()).await?; - writer.commit().await + .await + .with_context(|| { + format!( + "Failed to open a write handle for key {} for cache at {:?}", + key.as_ref(), + cache.as_ref() + ) + })?; + writer.write_all(data.as_ref()).await.with_context(|| { + format!( + "Failed to write to cache data for key {} for cache at {:?}", + key.as_ref(), + cache.as_ref() + ) + })?; + writer.commit().await.with_context(|| { + format!( + "Failed to write to commit data for key {} for cache at {:?}", + key.as_ref(), + cache.as_ref() + ) + }) } /// A reference to an open file writing to the cache. @@ -62,17 +81,17 @@ pub struct AsyncPut { impl AsyncWrite for AsyncPut { fn poll_write( mut self: Pin<&mut Self>, - cx: &mut Context<'_>, + cx: &mut TaskContext<'_>, buf: &[u8], ) -> Poll> { Pin::new(&mut self.writer).poll_write(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { Pin::new(&mut self.writer).poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { Pin::new(&mut self.writer).poll_close(cx) } } @@ -83,27 +102,41 @@ impl AsyncPut { /// Must be called manually in order to complete the writing process, /// otherwise everything will be thrown out. pub async fn commit(mut self) -> Result { - let writer_sri = self.writer.close().await?; + let key = self.key; + let cache = self.cache; + let writer_sri = self.writer.close().await.with_context(|| { + format!( + "Failed to properly close save file data for key {} in cache at {:?}", + key, cache + ) + })?; if let Some(sri) = &self.opts.sri { - // TODO - ssri should have a .matches method - let algo = sri.pick_algorithm(); - let matched = sri - .hashes - .iter() - .take_while(|h| h.algorithm == algo) - .find(|&h| *h == writer_sri.hashes[0]); - if matched.is_none() { - return Err(Error::IntegrityError)?; + if sri.matches(&writer_sri).is_none() { + return Err(Error::IntegrityError(sri.clone(), writer_sri)).with_context(|| { + format!( + "Failed to verify data integrity while inserting {} into cache at {:?}", + key, cache + ) + })?; } } else { self.opts.sri = Some(writer_sri); } if let Some(size) = self.opts.size { if size != self.written { - return Err(Error::SizeError)?; + return Err(Error::SizeError(size, self.written)).with_context(|| { + format!("A size was passed in but the value inserted into {} could not be verified for cache at {:?}", key, cache) + })?; } } - index::insert_async(&self.cache, &self.key, self.opts).await + index::insert_async(&cache, &key, self.opts) + .await + .with_context(|| { + format!( + "Failed to write index entry for {} in cache at {:?}", + key, cache + ) + }) } } @@ -126,10 +159,28 @@ where { let mut writer = PutOpts::new() .algorithm(Algorithm::Sha256) - .open_sync(cache.as_ref(), key.as_ref())?; - writer.write_all(data.as_ref())?; - writer.flush()?; - writer.commit() + .open_sync(cache.as_ref(), key.as_ref()) + .with_context(|| { + format!( + "Failed to open a write handle for key {} for cache at {:?}", + key.as_ref(), + cache.as_ref() + ) + })?; + writer.write_all(data.as_ref()).with_context(|| { + format!( + "Failed to write to cache data for key {} for cache at {:?}", + key.as_ref(), + cache.as_ref() + ) + })?; + writer.commit().with_context(|| { + format!( + "Failed to write to commit data for key {} for cache at {:?}", + key.as_ref(), + cache.as_ref() + ) + }) } /// Builder for options and flags for opening a new cache file to write data into. @@ -258,27 +309,40 @@ impl SyncPut { /// Must be called manually in order to complete the writing process, /// otherwise everything will be thrown out. pub fn commit(mut self) -> Result { - let writer_sri = self.writer.close()?; + let key = self.key; + let cache = self.cache; + let writer_sri = self.writer.close().with_context(|| { + format!( + "Failed to properly close save file data for key {} in cache at {:?}", + key, cache + ) + })?; if let Some(sri) = &self.opts.sri { // TODO - ssri should have a .matches method - let algo = sri.pick_algorithm(); - let matched = sri - .hashes - .iter() - .take_while(|h| h.algorithm == algo) - .find(|&h| *h == writer_sri.hashes[0]); - if matched.is_none() { - return Err(Error::IntegrityError)?; + if sri.matches(&writer_sri).is_none() { + return Err(Error::IntegrityError(sri.clone(), writer_sri)).with_context(|| { + format!( + "Failed to verify data integrity while inserting {} into cache at {:?}", + key, cache + ) + })?; } } else { self.opts.sri = Some(writer_sri); } if let Some(size) = self.opts.size { if size != self.written { - return Err(Error::SizeError)?; + return Err(Error::SizeError(size, self.written)).with_context(|| { + format!("A size was passed in but the value inserted into {} could not be verified for cache at {:?}", key, cache) + })?; } } - Ok(index::insert(&self.cache, &self.key, self.opts)?) + index::insert(&cache, &key, self.opts).with_context(|| { + format!( + "Failed to write index entry for {} in cache at {:?}", + key, cache + ) + }) } } diff --git a/src/rm.rs b/src/rm.rs index 3b435b6..899c2c4 100644 --- a/src/rm.rs +++ b/src/rm.rs @@ -4,7 +4,7 @@ use std::path::Path; use async_std::fs as afs; -use anyhow::Result; +use anyhow::{Context, Result}; use ssri::Integrity; use crate::content::rm; @@ -43,7 +43,15 @@ where P: AsRef, K: AsRef, { - index::delete_async(cache.as_ref(), key.as_ref()).await + index::delete_async(cache.as_ref(), key.as_ref()) + .await + .with_context(|| { + format!( + "Failed to delete cache entry for {} in cache at {:?}", + key.as_ref(), + cache.as_ref() + ) + }) } /// Removes an individual content entry. Any index entries pointing to this @@ -76,7 +84,13 @@ where /// # } /// ``` pub async fn content>(cache: P, sri: &Integrity) -> Result<()> { - rm::rm_async(cache.as_ref(), &sri).await + rm::rm_async(cache.as_ref(), &sri).await.with_context(|| { + format!( + "Failed to remove content under {} in cache at {:?}", + sri.to_string(), + cache.as_ref() + ) + }) } /// Removes entire contents of the cache, including temporary files, the entry @@ -140,7 +154,13 @@ where P: AsRef, K: AsRef, { - index::delete(cache.as_ref(), key.as_ref()) + index::delete(cache.as_ref(), key.as_ref()).with_context(|| { + format!( + "Failed to delete cache entry for {} in cache at {:?}", + key.as_ref(), + cache.as_ref() + ) + }) } /// Removes an individual content entry synchronously. Any index entries @@ -165,7 +185,13 @@ where /// # } /// ``` pub fn content_sync>(cache: P, sri: &Integrity) -> Result<()> { - rm::rm(cache.as_ref(), &sri) + rm::rm(cache.as_ref(), &sri).with_context(|| { + format!( + "Failed to remove content under {} in cache at {:?}", + sri.to_string(), + cache.as_ref() + ) + }) } /// Removes entire contents of the cache synchronously, including temporary