mirror of https://github.com/zkat/cacache-rs.git
refactor(index): idiomatise various iterators (#1)
* chore(index): idiomatise variously * chore(index): exploit hashset to dedupe entries * chore(index): format
This commit is contained in:
parent
19929c5020
commit
ad74518a9d
137
src/index.rs
137
src/index.rs
|
|
@ -1,5 +1,6 @@
|
|||
use std::collections::hash_map::HashMap;
|
||||
use std::collections::HashSet;
|
||||
use std::fs::{self, OpenOptions};
|
||||
use std::hash::{Hash, Hasher};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
|
@ -36,7 +37,7 @@ pub struct Entry {
|
|||
pub metadata: Value,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, PartialEq, Debug)]
|
||||
#[derive(Deserialize, Serialize, Debug)]
|
||||
struct SerializableEntry {
|
||||
key: String,
|
||||
integrity: Option<String>,
|
||||
|
|
@ -45,6 +46,20 @@ struct SerializableEntry {
|
|||
metadata: Value,
|
||||
}
|
||||
|
||||
impl PartialEq for SerializableEntry {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.key == other.key
|
||||
}
|
||||
}
|
||||
|
||||
impl Eq for SerializableEntry {}
|
||||
|
||||
impl Hash for SerializableEntry {
|
||||
fn hash<H: Hasher>(&self, state: &mut H) {
|
||||
self.key.hash(state);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(cache: &Path, key: &str, opts: PutOpts) -> Result<Integrity, Error> {
|
||||
let bucket = bucket_path(&cache, &key);
|
||||
if let Some(path) = mkdirp::mkdirp(bucket.parent().unwrap())? {
|
||||
|
|
@ -57,16 +72,15 @@ pub fn insert(cache: &Path, key: &str, opts: PutOpts) -> Result<Integrity, Error
|
|||
size: opts.size.unwrap_or(0),
|
||||
metadata: opts.metadata.unwrap_or_else(|| json!(null)),
|
||||
})?;
|
||||
let str = format!("\n{}\t{}", hash_entry(&stringified), stringified);
|
||||
OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(&bucket)?
|
||||
.write_all(&str.into_bytes())?;
|
||||
|
||||
let mut buck = OpenOptions::new().create(true).append(true).open(&bucket)?;
|
||||
|
||||
write!(buck, "\n{}\t{}", hash_entry(&stringified), stringified)?;
|
||||
chownr::chownr(&bucket, opts.uid, opts.gid)?;
|
||||
Ok(opts
|
||||
.sri
|
||||
.unwrap_or_else(|| "sha1-deadbeef".parse::<Integrity>().unwrap()))
|
||||
.or_else(|| "sha1-deadbeef".parse::<Integrity>().ok())
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
pub fn find(cache: &Path, key: &str) -> Result<Option<Entry>, Error> {
|
||||
|
|
@ -109,40 +123,37 @@ pub fn delete(cache: &Path, key: &str) -> Result<(), Error> {
|
|||
uid: None,
|
||||
gid: None,
|
||||
},
|
||||
)?;
|
||||
Ok(())
|
||||
)
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
pub fn ls(cache: &Path) -> impl Iterator<Item = Result<Entry, Error>> {
|
||||
let mut path = PathBuf::new();
|
||||
path.push(cache);
|
||||
path.push(format!("index-v{}", INDEX_VERSION));
|
||||
WalkDir::new(path)
|
||||
WalkDir::new(cache.join(format!("index-v{}", INDEX_VERSION)))
|
||||
.into_iter()
|
||||
.map(|bucket| {
|
||||
let bucket = bucket?;
|
||||
if bucket.file_type().is_dir() {
|
||||
return Ok(core::iter::empty().collect::<Vec<Entry>>());
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
let entries = bucket_entries(bucket.path())?;
|
||||
let mut dedupe: HashMap<String, SerializableEntry> = HashMap::new();
|
||||
for entry in entries {
|
||||
dedupe.insert(entry.key.clone(), entry);
|
||||
}
|
||||
let iter = dedupe
|
||||
|
||||
Ok(bucket_entries(bucket.path())?
|
||||
.into_iter()
|
||||
.filter(|se| se.1.integrity.is_some())
|
||||
.map(|se| {
|
||||
let se = se.1;
|
||||
Entry {
|
||||
key: se.key,
|
||||
integrity: se.integrity.unwrap().parse().unwrap(),
|
||||
time: se.time,
|
||||
size: se.size,
|
||||
metadata: se.metadata,
|
||||
.collect::<HashSet<SerializableEntry>>()
|
||||
.into_iter()
|
||||
.filter_map(|se| {
|
||||
if let Some(i) = se.integrity {
|
||||
Some(Entry {
|
||||
key: se.key,
|
||||
integrity: i.parse().unwrap(),
|
||||
time: se.time,
|
||||
size: se.size,
|
||||
metadata: se.metadata,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
Ok(iter.collect::<Vec<Entry>>())
|
||||
})
|
||||
.collect())
|
||||
})
|
||||
.flat_map(|res| match res {
|
||||
Ok(it) => Left(it.into_iter().map(Ok)),
|
||||
|
|
@ -152,13 +163,11 @@ pub fn ls(cache: &Path) -> impl Iterator<Item = Result<Entry, Error>> {
|
|||
|
||||
fn bucket_path(cache: &Path, key: &str) -> PathBuf {
|
||||
let hashed = hash_key(&key);
|
||||
let mut path = PathBuf::new();
|
||||
path.push(cache);
|
||||
path.push(format!("index-v{}", INDEX_VERSION));
|
||||
path.push(&hashed[0..2]);
|
||||
path.push(&hashed[2..4]);
|
||||
path.push(&hashed[4..]);
|
||||
path
|
||||
cache
|
||||
.join(format!("index-v{}", INDEX_VERSION))
|
||||
.join(&hashed[0..2])
|
||||
.join(&hashed[2..4])
|
||||
.join(&hashed[4..])
|
||||
}
|
||||
|
||||
fn hash_key(key: &str) -> String {
|
||||
|
|
@ -181,33 +190,29 @@ fn now() -> u128 {
|
|||
}
|
||||
|
||||
fn bucket_entries(bucket: &Path) -> Result<Vec<SerializableEntry>, Error> {
|
||||
let lines = match fs::read_to_string(bucket) {
|
||||
Ok(data) => Ok(data),
|
||||
Err(ref e) if e.kind() == ErrorKind::NotFound => Ok(String::from("")),
|
||||
err => err,
|
||||
}?;
|
||||
Ok(lines.split('\n').fold(vec![], |mut acc, entry: &str| {
|
||||
if entry.is_empty() {
|
||||
return acc;
|
||||
}
|
||||
let entry_str = match entry.split('\t').collect::<Vec<&str>>()[..] {
|
||||
[hash, entry_str] => {
|
||||
if hash_entry(entry_str) != hash {
|
||||
// Hash is no good! Corruption or malice? Doesn't matter!
|
||||
// EJECT EJECT
|
||||
return acc;
|
||||
} else {
|
||||
entry_str
|
||||
}
|
||||
use std::io::{BufRead, BufReader};
|
||||
fs::File::open(bucket)
|
||||
.map(|file| {
|
||||
BufReader::new(file)
|
||||
.lines()
|
||||
.filter_map(Result::ok)
|
||||
.filter_map(|entry| {
|
||||
let entry_str = match entry.split('\t').collect::<Vec<&str>>()[..] {
|
||||
[hash, entry_str] if hash_entry(entry_str) == hash => entry_str,
|
||||
// Something's wrong with the entry. Abort.
|
||||
_ => return None,
|
||||
};
|
||||
serde_json::from_str::<SerializableEntry>(entry_str).ok()
|
||||
})
|
||||
.collect()
|
||||
})
|
||||
.or_else(|err| {
|
||||
if err.kind() == ErrorKind::NotFound {
|
||||
Ok(Vec::new())
|
||||
} else {
|
||||
Err(err.into())
|
||||
}
|
||||
// Something's wrong with the entry. Abort.
|
||||
_ => return acc,
|
||||
};
|
||||
if let Ok(entry) = serde_json::from_str::<SerializableEntry>(entry_str) {
|
||||
acc.push(entry)
|
||||
}
|
||||
acc
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
|||
|
|
@ -4,8 +4,8 @@
|
|||
|
||||
#![warn(missing_docs, missing_doc_code_examples)]
|
||||
|
||||
pub use ssri::Algorithm;
|
||||
pub use serde_json::Value;
|
||||
pub use ssri::Algorithm;
|
||||
|
||||
mod content;
|
||||
mod errors;
|
||||
|
|
|
|||
Loading…
Reference in New Issue