feat(put): Add put::Put and put::PutOpts

This commit is contained in:
Kat Marchán 2019-06-26 21:32:12 -07:00
parent 248b631d77
commit 15f017fe21
No known key found for this signature in database
GPG Key ID: AEB529C08A3C7E9E
6 changed files with 213 additions and 173 deletions

35
Cargo.lock generated
View File

@ -8,16 +8,6 @@ dependencies = [
"nodrop 0.1.13 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "atomicwrites"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"nix 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "atty"
version = "0.2.11"
@ -100,7 +90,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "cacache"
version = "0.2.0"
dependencies = [
"atomicwrites 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)",
"chownr 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
"criterion 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)",
"digest 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
@ -411,18 +400,6 @@ dependencies = [
"proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rand"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"fuchsia-cprng 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.58 (registry+https://github.com/rust-lang/crates.io-index)",
"rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
"rdrand 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"winapi 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rand"
version = "0.6.5"
@ -685,15 +662,6 @@ dependencies = [
"unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tempdir"
version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
"remove_dir_all 0.5.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "tempfile"
version = "3.0.8"
@ -794,7 +762,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
"checksum arrayvec 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)" = "92c7fb76bc8826a8b33b4ee5bb07a247a81e76764ab4d55e8f73e3a4d8808c71"
"checksum atomicwrites 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c4e1aa99513c90202b4b04cfbe3c9d51dd914f2e26215a4caa76574b00bb6393"
"checksum atty 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)" = "9a7d5b8723950951411ee34d271d99dddcc2035a16ab25310ea2c8cfd4369652"
"checksum autocfg 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "0e49efa51329a5fd37e7c79db4621af617cd4e3e5bc224939808d076077077bf"
"checksum backtrace 0.3.31 (registry+https://github.com/rust-lang/crates.io-index)" = "e0f77aa27f55a4beb477ff6bc4d9bf72f90eb422b19c1d8e5a644b8aeb674d66"
@ -842,7 +809,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum opaque-debug 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "93f5bb2e8e8dec81642920ccff6b61f1eb94fa3020c5a325c9851ff604152409"
"checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759"
"checksum quote 0.6.12 (registry+https://github.com/rust-lang/crates.io-index)" = "faf4799c5d274f3868a4aae320a0a182cbd2baee377b378f080e16a23e9d80db"
"checksum rand 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)" = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293"
"checksum rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)" = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca"
"checksum rand_chacha 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef"
"checksum rand_core 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)" = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b"
@ -872,7 +838,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum ssri 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "251b1e2911708b842bcfcf544f8aa77f96fdefa40dcbafca591eb2acb5b817bb"
"checksum syn 0.15.38 (registry+https://github.com/rust-lang/crates.io-index)" = "37ea458a750f59ab679b47fef9b6722c586c5742f4cfe18a120bbc807e5e01fd"
"checksum synstructure 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)" = "02353edf96d6e4dc81aea2d8490a7e9db177bf8acb0e951c24940bf866cb313f"
"checksum tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)" = "15f2b5fb00ccdf689e0149d1b1b3c03fead81c2b37735d812fa8bddbbf41b6d8"
"checksum tempfile 3.0.8 (registry+https://github.com/rust-lang/crates.io-index)" = "7dc4738f2e68ed2855de5ac9cdbe05c9216773ecde4739b2f095002ab03a13ef"
"checksum termion 1.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a8fb22f7cde82c8220e5aeacb3258ed7ce996142c77cba193f203515e26c330"
"checksum textwrap 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060"

View File

@ -11,7 +11,6 @@ homepage = "https://github.com/zkat/cacache-rs"
[dependencies]
ssri = "2.0.0"
hex = "0.3.2"
atomicwrites = "0.2.3"
tempfile = "3.0.8"
sha-1 = "0.8.1"
sha2 = "0.8.0"

View File

@ -1,20 +1,52 @@
use std::fs::DirBuilder;
use std::io::prelude::*;
use std::path::Path;
use std::path::{Path, PathBuf};
use atomicwrites::{AtomicFile, AllowOverwrite};
use ssri::Integrity;
use ssri::{Algorithm, Integrity, IntegrityOpts};
use tempfile::NamedTempFile;
use crate::content::path;
use crate::errors::Error;
pub fn write(cache: &Path, data: &[u8]) -> Result<Integrity, Error> {
let sri = Integrity::from(&data);
let cpath = path::content_path(&cache, &sri);
DirBuilder::new().recursive(true).create(cpath.parent().unwrap())?;
let file = AtomicFile::new(&cpath, AllowOverwrite);
file.write(|f| f.write_all(&data))?;
Ok(sri)
pub struct Writer {
cache: PathBuf,
builder: IntegrityOpts,
tmpfile: NamedTempFile,
}
impl Writer {
pub fn new(cache: &Path, algo: Algorithm) -> Result<Writer, Error> {
let cache_path = cache.to_path_buf();
let mut tmp_path = cache_path.clone();
tmp_path.push("tmp");
DirBuilder::new().recursive(true).create(&tmp_path)?;
Ok(Writer {
cache: cache_path,
builder: IntegrityOpts::new().algorithm(algo),
tmpfile: NamedTempFile::new_in(tmp_path)?,
})
}
pub fn close(self) -> Result<Integrity, Error> {
let sri = self.builder.result();
let cpath = path::content_path(&self.cache, &sri);
DirBuilder::new()
.recursive(true)
.create(cpath.parent().unwrap())?;
self.tmpfile.persist(cpath)?;
Ok(sri)
}
}
impl Write for Writer {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.builder.input(&buf);
self.tmpfile.write(&buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.tmpfile.flush()
}
}
#[cfg(test)]
@ -25,11 +57,10 @@ mod tests {
fn basic_write() {
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri = write(&dir, b"hello world").unwrap();
assert_eq!(
sri.to_string(),
Integrity::from(b"hello world").to_string()
);
let mut writer = Writer::new(&dir, Algorithm::Sha256).unwrap();
writer.write_all(b"hello world").unwrap();
let sri = writer.close().unwrap();
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());
assert_eq!(
std::fs::read(path::content_path(&dir, &sri)).unwrap(),
b"hello world"

View File

@ -1,9 +1,9 @@
use std::io;
use atomicwrites;
use chownr;
use failure::Fail;
use serde_json;
use tempfile;
use walkdir;
/// Error type returned by all API calls.
@ -22,7 +22,7 @@ pub enum Error {
#[fail(display = "{}", _0)]
SerdeJson(#[fail(cause)] serde_json::error::Error),
#[fail(display = "{}", _0)]
AtomicWrite(#[fail(cause)] atomicwrites::Error<io::Error>),
PersistError(#[fail(cause)] tempfile::PersistError),
#[fail(display = "{}", _0)]
WalkDir(#[fail(cause)] walkdir::Error),
}
@ -45,9 +45,9 @@ impl From<serde_json::error::Error> for Error {
}
}
impl From<atomicwrites::Error<io::Error>> for Error {
fn from(error: atomicwrites::Error<io::Error>) -> Self {
Error::AtomicWrite(error)
impl From<tempfile::PersistError> for Error {
fn from(error: tempfile::PersistError) -> Self {
Error::PersistError(error)
}
}

View File

@ -16,8 +16,8 @@ use sha2::Sha256;
use ssri::Integrity;
use walkdir::WalkDir;
use crate::put::Writer;
use crate::errors::Error;
use crate::put::PutOpts;
const INDEX_VERSION: &str = "5";
@ -40,17 +40,17 @@ struct SerializableEntry {
metadata: Value,
}
pub fn insert(inserter: Writer) -> Result<Integrity, Error> {
let bucket = bucket_path(&inserter.cache, &inserter.key);
pub fn insert(cache: &Path, key: &str, opts: PutOpts) -> Result<Integrity, Error> {
let bucket = bucket_path(&cache, &key);
if let Some(path) = mkdirp::mkdirp(bucket.parent().unwrap())? {
chownr::chownr(&path, inserter.uid, inserter.gid)?;
chownr::chownr(&path, opts.uid, opts.gid)?;
}
let stringified = serde_json::to_string(&SerializableEntry {
key: inserter.key.to_owned(),
integrity: inserter.sri.clone().map(|x| x.to_string()),
time: inserter.time.unwrap_or_else(now),
size: inserter.size.unwrap_or(0),
metadata: inserter.metadata.unwrap_or_else(|| json!(null)),
key: key.to_owned(),
integrity: opts.sri.clone().map(|x| x.to_string()),
time: opts.time.unwrap_or_else(now),
size: opts.size.unwrap_or(0),
metadata: opts.metadata.unwrap_or_else(|| json!(null)),
})?;
let str = format!("\n{}\t{}", hash_entry(&stringified), stringified);
OpenOptions::new()
@ -58,47 +58,50 @@ pub fn insert(inserter: Writer) -> Result<Integrity, Error> {
.append(true)
.open(&bucket)?
.write_all(&str.into_bytes())?;
chownr::chownr(&bucket, inserter.uid, inserter.gid)?;
Ok(inserter.sri.unwrap_or_else(|| "sha1-deadbeef".parse::<Integrity>().unwrap()))
chownr::chownr(&bucket, opts.uid, opts.gid)?;
Ok(opts
.sri
.unwrap_or_else(|| "sha1-deadbeef".parse::<Integrity>().unwrap()))
}
pub fn find(cache: &Path, key: &str) -> Result<Option<Entry>, Error> {
let bucket = bucket_path(cache, &key);
Ok(bucket_entries(&bucket)?.into_iter().fold(None, |acc, entry| {
if entry.key == key {
if let Some(integrity) = entry.integrity {
let integrity: Integrity = match integrity.parse() {
Ok(sri) => sri,
_ => return acc
};
Some(Entry {
key: entry.key,
integrity,
size: entry.size,
time: entry.time,
metadata: entry.metadata
})
Ok(bucket_entries(&bucket)?
.into_iter()
.fold(None, |acc, entry| {
if entry.key == key {
if let Some(integrity) = entry.integrity {
let integrity: Integrity = match integrity.parse() {
Ok(sri) => sri,
_ => return acc,
};
Some(Entry {
key: entry.key,
integrity,
size: entry.size,
time: entry.time,
metadata: entry.metadata,
})
} else {
None
}
} else {
None
acc
}
} else {
acc
}
}))
}))
}
pub fn delete(cache: &Path, key: &str) -> Result<(), Error> {
let inserter = Writer {
cache: cache.to_path_buf(),
key: String::from(key),
size: None,
sri: None,
time: None,
metadata: None,
uid: None,
gid: None,
};
insert(inserter)?;
insert(cache, key, PutOpts {
algorithm: None,
size: None,
sri: None,
time: None,
metadata: None,
uid: None,
gid: None,
}
)?;
Ok(())
}
@ -106,37 +109,37 @@ pub fn ls(cache: &Path) -> impl Iterator<Item = Result<Entry, Error>> {
let mut path = PathBuf::new();
path.push(cache);
path.push(format!("index-v{}", INDEX_VERSION));
WalkDir::new(path).into_iter().map(|bucket| {
let bucket = bucket?;
if bucket.file_type().is_dir() {
return Ok(core::iter::empty().collect::<Vec<Entry>>())
}
let entries = bucket_entries(bucket.path())?;
let mut dedupe: HashMap<String, SerializableEntry> = HashMap::new();
for entry in entries {
dedupe.insert(entry.key.clone(), entry);
}
let iter = dedupe
.into_iter()
.filter(|se| se.1.integrity.is_some())
.map(|se| {
let se = se.1;
Entry {
key: se.key,
integrity: se.integrity.unwrap().parse().unwrap(),
time: se.time,
size: se.size,
metadata: se.metadata,
}
});
Ok(iter.collect::<Vec<Entry>>())
})
.flat_map(|res| {
match res {
WalkDir::new(path)
.into_iter()
.map(|bucket| {
let bucket = bucket?;
if bucket.file_type().is_dir() {
return Ok(core::iter::empty().collect::<Vec<Entry>>());
}
let entries = bucket_entries(bucket.path())?;
let mut dedupe: HashMap<String, SerializableEntry> = HashMap::new();
for entry in entries {
dedupe.insert(entry.key.clone(), entry);
}
let iter = dedupe
.into_iter()
.filter(|se| se.1.integrity.is_some())
.map(|se| {
let se = se.1;
Entry {
key: se.key,
integrity: se.integrity.unwrap().parse().unwrap(),
time: se.time,
size: se.size,
metadata: se.metadata,
}
});
Ok(iter.collect::<Vec<Entry>>())
})
.flat_map(|res| match res {
Ok(it) => Left(it.into_iter().map(Ok)),
Err(err) => Right(std::iter::once(Err(err)))
}
})
Err(err) => Right(std::iter::once(Err(err))),
})
}
fn bucket_path(cache: &Path, key: &str) -> PathBuf {
@ -176,17 +179,19 @@ fn bucket_entries(bucket: &Path) -> Result<Vec<SerializableEntry>, Error> {
err => err,
}?;
Ok(lines.split('\n').fold(vec![], |mut acc, entry: &str| {
if entry.is_empty() { return acc }
if entry.is_empty() {
return acc;
}
let entry_str = match entry.split('\t').collect::<Vec<&str>>()[..] {
[hash, entry_str] => {
if hash_entry(entry_str) != hash {
// Hash is no good! Corruption or malice? Doesn't matter!
// EJECT EJECT
return acc
return acc;
} else {
entry_str
}
},
}
// Something's wrong with the entry. Abort.
_ => return acc,
};
@ -210,10 +215,8 @@ mod tests {
let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567;
let writer = Writer::new(&dir, "hello")
.integrity(sri)
.time(time);
insert(writer).unwrap();
let opts = PutOpts::new().integrity(sri).time(time);
insert(&dir, "hello", opts).unwrap();
let entry = std::fs::read_to_string(bucket_path(&dir, "hello")).unwrap();
assert_eq!(entry, MOCK_ENTRY);
}
@ -253,10 +256,8 @@ mod tests {
let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567;
let writer = Writer::new(&dir, "hello")
.integrity(sri)
.time(time);
insert(writer).unwrap();
let opts = PutOpts::new().integrity(sri).time(time);
insert(&dir, "hello", opts).unwrap();
delete(&dir, "hello").unwrap();
assert_eq!(find(&dir, "hello").unwrap(), None);
}
@ -267,14 +268,10 @@ mod tests {
let dir = tmp.path().to_owned();
let sri: Integrity = "sha1-deadbeef".parse().unwrap();
let time = 1_234_567;
let writer = Writer::new(&dir, "hello")
.integrity(sri.clone())
.time(time);
insert(writer).unwrap();
let writer = Writer::new(&dir, "world")
.integrity(sri.clone())
.time(time);
insert(writer).unwrap();
let opts = PutOpts::new().integrity(sri.clone()).time(time);
insert(&dir, "hello", opts).unwrap();
let opts = PutOpts::new().integrity(sri).time(time);
insert(&dir, "world", opts).unwrap();
let mut entries = ls(&dir)
.map(|x| Ok(x?.key))

View File

@ -1,22 +1,31 @@
//! Functions for writing to cache.
use std::io::prelude::*;
use std::path::{Path, PathBuf};
use nix::unistd::{Uid, Gid};
use nix::unistd::{Gid, Uid};
use serde_json::Value;
use ssri::Integrity;
use ssri::{Algorithm, Integrity};
use crate::content::write;
use crate::index;
use crate::errors::Error;
use crate::index;
pub fn data<P: AsRef<Path>, D: AsRef<[u8]>, K: AsRef<str>>(cache: P, key: K, data: D) -> Result<Integrity, Error> {
let sri = write::write(cache.as_ref(), data.as_ref())?;
Writer::new(cache.as_ref(), key.as_ref()).integrity(sri).commit(data)
pub fn data<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity, Error>
where
P: AsRef<Path>,
D: AsRef<[u8]>,
K: AsRef<str>,
{
let mut writer = PutOpts::new()
.algorithm(Algorithm::Sha256)
.open(cache.as_ref(), key.as_ref())?;
writer.write_all(data.as_ref())?;
writer.commit()
}
pub struct Writer {
pub cache: PathBuf,
pub key: String,
#[derive(Default)]
pub struct PutOpts {
pub algorithm: Option<Algorithm>,
pub sri: Option<Integrity>,
pub size: Option<usize>,
pub time: Option<u128>,
@ -25,18 +34,31 @@ pub struct Writer {
pub gid: Option<Gid>,
}
impl Writer {
pub fn new<P: AsRef<Path>, K: AsRef<str>>(cache: P, key: K) -> Writer {
Writer {
impl PutOpts {
pub fn new() -> PutOpts {
Default::default()
}
pub fn open<P, K>(self, cache: P, key: K) -> Result<Put, Error>
where
P: AsRef<Path>,
K: AsRef<str>,
{
Ok(Put {
cache: cache.as_ref().to_path_buf(),
key: String::from(key.as_ref()),
sri: None,
size: None,
time: None,
metadata: None,
uid: None,
gid: None
}
written: 0,
writer: write::Writer::new(
cache.as_ref(),
self.algorithm.as_ref().unwrap_or(&Algorithm::Sha256).clone()
)?,
opts: self,
})
}
pub fn algorithm(mut self, algo: Algorithm) -> Self {
self.algorithm = Some(algo);
self
}
pub fn size(mut self, size: usize) -> Self {
@ -64,20 +86,46 @@ impl Writer {
self.gid = gid;
self
}
}
pub fn commit<D: AsRef<[u8]>>(self, data: D) -> Result<Integrity, Error> {
if let Some(sri) = &self.sri {
if sri.check(&data).is_none() {
pub struct Put {
pub cache: PathBuf,
pub key: String,
pub written: usize,
pub(crate) writer: write::Writer,
pub opts: PutOpts,
}
impl Write for Put {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.writer.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
self.writer.flush()
}
}
impl Put {
pub fn commit(self) -> Result<Integrity, Error> {
let writer_sri = self.writer.close()?;
if let Some(sri) = &self.opts.sri {
// TODO - ssri should have a .matches method
let algo = sri.pick_algorithm();
let matched = sri
.hashes
.iter()
.take_while(|h| h.algorithm == algo)
.find(|&h| *h == writer_sri.hashes[0]);
if matched.is_none() {
return Err(Error::IntegrityError);
}
}
if let Some(size) = self.size {
if size != data.as_ref().len() {
if let Some(size) = self.opts.size {
if size != self.written {
return Err(Error::SizeError);
}
}
let sri = write::write(&self.cache, data.as_ref())?;
index::insert(self)?;
Ok(sri)
index::insert(&self.cache, &self.key, self.opts)?;
Ok(writer_sri)
}
}