diff --git a/Cargo.lock b/Cargo.lock index 3572667..c87803f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,16 +8,6 @@ dependencies = [ "nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "atomicwrites" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "nix 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", - "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "atty" version = "0.2.11" @@ -100,7 +90,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" name = "cacache" version = "0.2.0" dependencies = [ - "atomicwrites 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)", "chownr 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)", "criterion 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", "digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", @@ -411,18 +400,6 @@ dependencies = [ "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "rand" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)", - "libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)", - "rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", - "rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", - "winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "rand" version = "0.6.5" @@ -685,15 +662,6 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] -[[package]] -name = "tempdir" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -dependencies = [ - "rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", - "remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)", -] - [[package]] name = "tempfile" version = "3.0.8" @@ -794,7 +762,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" [metadata] "checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71" -"checksum atomicwrites 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c4e1aa99513c90202b4b04cfbe3c9d51dd914f2e26215a4caa76574b00bb6393" "checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652" "checksum autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "0e49efa51329a5fd37e7c79db4621af617cd4e3e5bc224939808d076077077bf" "checksum backtrace 0.3.31 (registry+https://github.com/rust-lang/crates.io-index)" = "e0f77aa27f55a4beb477ff6bc4d9bf72f90eb422b19c1d8e5a644b8aeb674d66" @@ -842,7 +809,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum opaque-debug 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "93f5bb2e8e8dec81642920ccff6b61f1eb94fa3020c5a325c9851ff604152409" "checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" "checksum quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "faf4799c5d274f3868a4aae320a0a182cbd2baee377b378f080e16a23e9d80db" -"checksum rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" "checksum rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca" "checksum rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef" "checksum rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" @@ -872,7 +838,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" "checksum ssri 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "251b1e2911708b842bcfcf544f8aa77f96fdefa40dcbafca591eb2acb5b817bb" "checksum syn 0.15.38 (registry+https://github.com/rust-lang/crates.io-index)" = "37ea458a750f59ab679b47fef9b6722c586c5742f4cfe18a120bbc807e5e01fd" "checksum synstructure 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "02353edf96d6e4dc81aea2d8490a7e9db177bf8acb0e951c24940bf866cb313f" -"checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8" "checksum tempfile 3.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7dc4738f2e68ed2855de5ac9cdbe05c9216773ecde4739b2f095002ab03a13ef" "checksum termion 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a8fb22f7cde82c8220e5aeacb3258ed7ce996142c77cba193f203515e26c330" "checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" diff --git a/Cargo.toml b/Cargo.toml index a44ad3c..418e59e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,6 @@ homepage = "https://github.com/zkat/cacache-rs" [dependencies] ssri = "2.0.0" hex = "0.3.2" -atomicwrites = "0.2.3" tempfile = "3.0.8" sha-1 = "0.8.1" sha2 = "0.8.0" diff --git a/src/content/write.rs b/src/content/write.rs index 1d40a8b..50c1148 100644 --- a/src/content/write.rs +++ b/src/content/write.rs @@ -1,20 +1,52 @@ use std::fs::DirBuilder; use std::io::prelude::*; -use std::path::Path; +use std::path::{Path, PathBuf}; -use atomicwrites::{AtomicFile, AllowOverwrite}; -use ssri::Integrity; +use ssri::{Algorithm, Integrity, IntegrityOpts}; +use tempfile::NamedTempFile; use crate::content::path; use crate::errors::Error; -pub fn write(cache: &Path, data: &[u8]) -> Result { - let sri = Integrity::from(&data); - let cpath = path::content_path(&cache, &sri); - DirBuilder::new().recursive(true).create(cpath.parent().unwrap())?; - let file = AtomicFile::new(&cpath, AllowOverwrite); - file.write(|f| f.write_all(&data))?; - Ok(sri) +pub struct Writer { + cache: PathBuf, + builder: IntegrityOpts, + tmpfile: NamedTempFile, +} + +impl Writer { + pub fn new(cache: &Path, algo: Algorithm) -> Result { + let cache_path = cache.to_path_buf(); + let mut tmp_path = cache_path.clone(); + tmp_path.push("tmp"); + DirBuilder::new().recursive(true).create(&tmp_path)?; + Ok(Writer { + cache: cache_path, + builder: IntegrityOpts::new().algorithm(algo), + tmpfile: NamedTempFile::new_in(tmp_path)?, + }) + } + + pub fn close(self) -> Result { + let sri = self.builder.result(); + let cpath = path::content_path(&self.cache, &sri); + DirBuilder::new() + .recursive(true) + .create(cpath.parent().unwrap())?; + self.tmpfile.persist(cpath)?; + Ok(sri) + } +} + +impl Write for Writer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.builder.input(&buf); + self.tmpfile.write(&buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.tmpfile.flush() + } } #[cfg(test)] @@ -25,11 +57,10 @@ mod tests { fn basic_write() { let tmp = tempfile::tempdir().unwrap(); let dir = tmp.path().to_owned(); - let sri = write(&dir, b"hello world").unwrap(); - assert_eq!( - sri.to_string(), - Integrity::from(b"hello world").to_string() - ); + let mut writer = Writer::new(&dir, Algorithm::Sha256).unwrap(); + writer.write_all(b"hello world").unwrap(); + let sri = writer.close().unwrap(); + assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string()); assert_eq!( std::fs::read(path::content_path(&dir, &sri)).unwrap(), b"hello world" diff --git a/src/errors.rs b/src/errors.rs index ca8b25d..9c9cd5a 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,9 +1,9 @@ use std::io; -use atomicwrites; use chownr; use failure::Fail; use serde_json; +use tempfile; use walkdir; /// Error type returned by all API calls. @@ -22,7 +22,7 @@ pub enum Error { #[fail(display = "{}", _0)] SerdeJson(#[fail(cause)] serde_json::error::Error), #[fail(display = "{}", _0)] - AtomicWrite(#[fail(cause)] atomicwrites::Error), + PersistError(#[fail(cause)] tempfile::PersistError), #[fail(display = "{}", _0)] WalkDir(#[fail(cause)] walkdir::Error), } @@ -45,9 +45,9 @@ impl From for Error { } } -impl From> for Error { - fn from(error: atomicwrites::Error) -> Self { - Error::AtomicWrite(error) +impl From for Error { + fn from(error: tempfile::PersistError) -> Self { + Error::PersistError(error) } } diff --git a/src/index.rs b/src/index.rs index 3c162fd..b5ed3f2 100644 --- a/src/index.rs +++ b/src/index.rs @@ -16,8 +16,8 @@ use sha2::Sha256; use ssri::Integrity; use walkdir::WalkDir; -use crate::put::Writer; use crate::errors::Error; +use crate::put::PutOpts; const INDEX_VERSION: &str = "5"; @@ -40,17 +40,17 @@ struct SerializableEntry { metadata: Value, } -pub fn insert(inserter: Writer) -> Result { - let bucket = bucket_path(&inserter.cache, &inserter.key); +pub fn insert(cache: &Path, key: &str, opts: PutOpts) -> Result { + let bucket = bucket_path(&cache, &key); if let Some(path) = mkdirp::mkdirp(bucket.parent().unwrap())? { - chownr::chownr(&path, inserter.uid, inserter.gid)?; + chownr::chownr(&path, opts.uid, opts.gid)?; } let stringified = serde_json::to_string(&SerializableEntry { - key: inserter.key.to_owned(), - integrity: inserter.sri.clone().map(|x| x.to_string()), - time: inserter.time.unwrap_or_else(now), - size: inserter.size.unwrap_or(0), - metadata: inserter.metadata.unwrap_or_else(|| json!(null)), + key: key.to_owned(), + integrity: opts.sri.clone().map(|x| x.to_string()), + time: opts.time.unwrap_or_else(now), + size: opts.size.unwrap_or(0), + metadata: opts.metadata.unwrap_or_else(|| json!(null)), })?; let str = format!("\n{}\t{}", hash_entry(&stringified), stringified); OpenOptions::new() @@ -58,47 +58,50 @@ pub fn insert(inserter: Writer) -> Result { .append(true) .open(&bucket)? .write_all(&str.into_bytes())?; - chownr::chownr(&bucket, inserter.uid, inserter.gid)?; - Ok(inserter.sri.unwrap_or_else(|| "sha1-deadbeef".parse::().unwrap())) + chownr::chownr(&bucket, opts.uid, opts.gid)?; + Ok(opts + .sri + .unwrap_or_else(|| "sha1-deadbeef".parse::().unwrap())) } pub fn find(cache: &Path, key: &str) -> Result, Error> { let bucket = bucket_path(cache, &key); - Ok(bucket_entries(&bucket)?.into_iter().fold(None, |acc, entry| { - if entry.key == key { - if let Some(integrity) = entry.integrity { - let integrity: Integrity = match integrity.parse() { - Ok(sri) => sri, - _ => return acc - }; - Some(Entry { - key: entry.key, - integrity, - size: entry.size, - time: entry.time, - metadata: entry.metadata - }) + Ok(bucket_entries(&bucket)? + .into_iter() + .fold(None, |acc, entry| { + if entry.key == key { + if let Some(integrity) = entry.integrity { + let integrity: Integrity = match integrity.parse() { + Ok(sri) => sri, + _ => return acc, + }; + Some(Entry { + key: entry.key, + integrity, + size: entry.size, + time: entry.time, + metadata: entry.metadata, + }) + } else { + None + } } else { - None + acc } - } else { - acc - } - })) + })) } pub fn delete(cache: &Path, key: &str) -> Result<(), Error> { - let inserter = Writer { - cache: cache.to_path_buf(), - key: String::from(key), - size: None, - sri: None, - time: None, - metadata: None, - uid: None, - gid: None, - }; - insert(inserter)?; + insert(cache, key, PutOpts { + algorithm: None, + size: None, + sri: None, + time: None, + metadata: None, + uid: None, + gid: None, + } + )?; Ok(()) } @@ -106,37 +109,37 @@ pub fn ls(cache: &Path) -> impl Iterator> { let mut path = PathBuf::new(); path.push(cache); path.push(format!("index-v{}", INDEX_VERSION)); - WalkDir::new(path).into_iter().map(|bucket| { - let bucket = bucket?; - if bucket.file_type().is_dir() { - return Ok(core::iter::empty().collect::>()) - } - let entries = bucket_entries(bucket.path())?; - let mut dedupe: HashMap = HashMap::new(); - for entry in entries { - dedupe.insert(entry.key.clone(), entry); - } - let iter = dedupe - .into_iter() - .filter(|se| se.1.integrity.is_some()) - .map(|se| { - let se = se.1; - Entry { - key: se.key, - integrity: se.integrity.unwrap().parse().unwrap(), - time: se.time, - size: se.size, - metadata: se.metadata, - } - }); - Ok(iter.collect::>()) - }) - .flat_map(|res| { - match res { + WalkDir::new(path) + .into_iter() + .map(|bucket| { + let bucket = bucket?; + if bucket.file_type().is_dir() { + return Ok(core::iter::empty().collect::>()); + } + let entries = bucket_entries(bucket.path())?; + let mut dedupe: HashMap = HashMap::new(); + for entry in entries { + dedupe.insert(entry.key.clone(), entry); + } + let iter = dedupe + .into_iter() + .filter(|se| se.1.integrity.is_some()) + .map(|se| { + let se = se.1; + Entry { + key: se.key, + integrity: se.integrity.unwrap().parse().unwrap(), + time: se.time, + size: se.size, + metadata: se.metadata, + } + }); + Ok(iter.collect::>()) + }) + .flat_map(|res| match res { Ok(it) => Left(it.into_iter().map(Ok)), - Err(err) => Right(std::iter::once(Err(err))) - } - }) + Err(err) => Right(std::iter::once(Err(err))), + }) } fn bucket_path(cache: &Path, key: &str) -> PathBuf { @@ -176,17 +179,19 @@ fn bucket_entries(bucket: &Path) -> Result, Error> { err => err, }?; Ok(lines.split('\n').fold(vec![], |mut acc, entry: &str| { - if entry.is_empty() { return acc } + if entry.is_empty() { + return acc; + } let entry_str = match entry.split('\t').collect::>()[..] { [hash, entry_str] => { if hash_entry(entry_str) != hash { // Hash is no good! Corruption or malice? Doesn't matter! // EJECT EJECT - return acc + return acc; } else { entry_str } - }, + } // Something's wrong with the entry. Abort. _ => return acc, }; @@ -210,10 +215,8 @@ mod tests { let dir = tmp.path().to_owned(); let sri: Integrity = "sha1-deadbeef".parse().unwrap(); let time = 1_234_567; - let writer = Writer::new(&dir, "hello") - .integrity(sri) - .time(time); - insert(writer).unwrap(); + let opts = PutOpts::new().integrity(sri).time(time); + insert(&dir, "hello", opts).unwrap(); let entry = std::fs::read_to_string(bucket_path(&dir, "hello")).unwrap(); assert_eq!(entry, MOCK_ENTRY); } @@ -253,10 +256,8 @@ mod tests { let dir = tmp.path().to_owned(); let sri: Integrity = "sha1-deadbeef".parse().unwrap(); let time = 1_234_567; - let writer = Writer::new(&dir, "hello") - .integrity(sri) - .time(time); - insert(writer).unwrap(); + let opts = PutOpts::new().integrity(sri).time(time); + insert(&dir, "hello", opts).unwrap(); delete(&dir, "hello").unwrap(); assert_eq!(find(&dir, "hello").unwrap(), None); } @@ -267,14 +268,10 @@ mod tests { let dir = tmp.path().to_owned(); let sri: Integrity = "sha1-deadbeef".parse().unwrap(); let time = 1_234_567; - let writer = Writer::new(&dir, "hello") - .integrity(sri.clone()) - .time(time); - insert(writer).unwrap(); - let writer = Writer::new(&dir, "world") - .integrity(sri.clone()) - .time(time); - insert(writer).unwrap(); + let opts = PutOpts::new().integrity(sri.clone()).time(time); + insert(&dir, "hello", opts).unwrap(); + let opts = PutOpts::new().integrity(sri).time(time); + insert(&dir, "world", opts).unwrap(); let mut entries = ls(&dir) .map(|x| Ok(x?.key)) diff --git a/src/put.rs b/src/put.rs index 113b413..80aef07 100644 --- a/src/put.rs +++ b/src/put.rs @@ -1,22 +1,31 @@ //! Functions for writing to cache. +use std::io::prelude::*; use std::path::{Path, PathBuf}; -use nix::unistd::{Uid, Gid}; +use nix::unistd::{Gid, Uid}; use serde_json::Value; -use ssri::Integrity; +use ssri::{Algorithm, Integrity}; use crate::content::write; -use crate::index; use crate::errors::Error; +use crate::index; -pub fn data, D: AsRef<[u8]>, K: AsRef>(cache: P, key: K, data: D) -> Result { - let sri = write::write(cache.as_ref(), data.as_ref())?; - Writer::new(cache.as_ref(), key.as_ref()).integrity(sri).commit(data) +pub fn data(cache: P, key: K, data: D) -> Result +where + P: AsRef, + D: AsRef<[u8]>, + K: AsRef, +{ + let mut writer = PutOpts::new() + .algorithm(Algorithm::Sha256) + .open(cache.as_ref(), key.as_ref())?; + writer.write_all(data.as_ref())?; + writer.commit() } -pub struct Writer { - pub cache: PathBuf, - pub key: String, +#[derive(Default)] +pub struct PutOpts { + pub algorithm: Option, pub sri: Option, pub size: Option, pub time: Option, @@ -25,18 +34,31 @@ pub struct Writer { pub gid: Option, } -impl Writer { - pub fn new, K: AsRef>(cache: P, key: K) -> Writer { - Writer { +impl PutOpts { + pub fn new() -> PutOpts { + Default::default() + } + + pub fn open(self, cache: P, key: K) -> Result + where + P: AsRef, + K: AsRef, + { + Ok(Put { cache: cache.as_ref().to_path_buf(), key: String::from(key.as_ref()), - sri: None, - size: None, - time: None, - metadata: None, - uid: None, - gid: None - } + written: 0, + writer: write::Writer::new( + cache.as_ref(), + self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256).clone() + )?, + opts: self, + }) + } + + pub fn algorithm(mut self, algo: Algorithm) -> Self { + self.algorithm = Some(algo); + self } pub fn size(mut self, size: usize) -> Self { @@ -64,20 +86,46 @@ impl Writer { self.gid = gid; self } +} - pub fn commit>(self, data: D) -> Result { - if let Some(sri) = &self.sri { - if sri.check(&data).is_none() { +pub struct Put { + pub cache: PathBuf, + pub key: String, + pub written: usize, + pub(crate) writer: write::Writer, + pub opts: PutOpts, +} + +impl Write for Put { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.writer.write(buf) + } + fn flush(&mut self) -> std::io::Result<()> { + self.writer.flush() + } +} + +impl Put { + pub fn commit(self) -> Result { + let writer_sri = self.writer.close()?; + if let Some(sri) = &self.opts.sri { + // TODO - ssri should have a .matches method + let algo = sri.pick_algorithm(); + let matched = sri + .hashes + .iter() + .take_while(|h| h.algorithm == algo) + .find(|&h| *h == writer_sri.hashes[0]); + if matched.is_none() { return Err(Error::IntegrityError); } } - if let Some(size) = self.size { - if size != data.as_ref().len() { + if let Some(size) = self.opts.size { + if size != self.written { return Err(Error::SizeError); } } - let sri = write::write(&self.cache, data.as_ref())?; - index::insert(self)?; - Ok(sri) + index::insert(&self.cache, &self.key, self.opts)?; + Ok(writer_sri) } }