feat(link): Add support for linking existing files into the cache

This commit is contained in:
Scott Haug 2023-02-27 09:05:22 -08:00
parent 4d85f3ca84
commit 3eaeabebfb
8 changed files with 1000 additions and 5 deletions

View File

@ -43,10 +43,10 @@ jobs:
- name: Clippy
run: cargo clippy -- -D warnings
- name: Check (async-std)
run: cargo check
run: cargo check --features link
- name: Run tests (async-std)
run: cargo test --verbose
run: cargo test --verbose --features link
- name: Check (Tokio)
run: cargo check --no-default-features --features tokio-runtime
run: cargo check --no-default-features --features tokio-runtime,link
- name: Run unit tests (Tokio)
run: cargo test --verbose --no-default-features --features tokio-runtime --lib
run: cargo test --verbose --no-default-features --features tokio-runtime,link --lib

View File

@ -1,6 +1,6 @@
[package]
name = "cacache"
version = "11.1.1-alpha.0"
version = "11.2.0-alpha.0"
authors = ["Kat Marchán <kzm@zkat.tech>"]
edition = "2021"
description = "Content-addressable, key-value, high-performance, on-disk cache."
@ -53,4 +53,5 @@ harness = false
[features]
default = ["async-std", "async-attributes"]
link = []
tokio-runtime = ["tokio", "tokio-stream"]

View File

@ -63,6 +63,9 @@ Minimum supported Rust version is `1.43.0`.
cacache = { version = "*", default-features = false, features = ["tokio-runtime"] }
```
Experimental support for tracking existing files through symlinks is provided
via the "link" feature.
## Contributing
The cacache team enthusiastically welcomes contributions and project participation! There's a bunch of things you can do if you want to contribute! The [Contributor Guide](CONTRIBUTING.md) has all the information you need for everything from reporting bugs to contributing entire new features. Please don't hesitate to jump in if you'd like to, or even ask us questions if something isn't clear.

View File

@ -22,6 +22,7 @@ where
use std::fs::{self, File};
use std::io::prelude::*;
use std::path::PathBuf;
use criterion::{black_box, criterion_group, criterion_main, Criterion};
@ -207,6 +208,76 @@ fn write_hash_async(c: &mut Criterion) {
});
}
fn create_tmpfile(tmp: &tempfile::TempDir, buf: &[u8]) -> PathBuf {
let dir = tmp.path().to_owned();
let target = dir.join("target-file");
std::fs::create_dir_all(target.parent().unwrap().clone()).unwrap();
let mut file = File::create(target.clone()).unwrap();
file.write_all(buf).unwrap();
file.flush().unwrap();
target
}
#[cfg(feature = "link")]
fn link_async(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
c.bench_function("link::file", move |b| {
b.iter_custom(|iters| {
let start = std::time::Instant::now();
for i in 0..iters {
block_on(cacache::link(&cache, format!("key{}", i), target.clone())).unwrap();
}
start.elapsed()
})
});
}
#[cfg(feature = "link")]
fn link_hash_async(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
c.bench_function("link::file_hash", move |b| {
b.iter(|| block_on(cacache::link_hash(&cache, target.clone())).unwrap())
});
}
#[cfg(feature = "link")]
fn link_sync(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
c.bench_function("link::file_sync", move |b| {
b.iter_custom(|iters| {
let start = std::time::Instant::now();
for i in 0..iters {
cacache::link_sync(&cache, format!("key{}", i), target.clone()).unwrap();
}
start.elapsed()
})
});
}
#[cfg(feature = "link")]
fn link_hash_sync(c: &mut Criterion) {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let cache = tmp.path().to_owned();
c.bench_function("link::file_hash_sync", move |b| {
b.iter(|| cacache::link_hash_sync(&cache, target.clone()).unwrap())
});
}
criterion_group!(
benches,
baseline_read_sync,
@ -223,4 +294,17 @@ criterion_group!(
read_hash_async_big_data,
read_hash_sync_big_data
);
#[cfg(feature = "link")]
criterion_group!(
link_benches,
link_async,
link_hash_async,
link_sync,
link_hash_sync
);
#[cfg(feature = "link")]
criterion_main!(benches, link_benches);
#[cfg(not(feature = "link"))]
criterion_main!(benches);

View File

@ -2,3 +2,6 @@ pub mod path;
pub mod read;
pub mod rm;
pub mod write;
#[cfg(feature = "link")]
pub mod symlink;

254
src/content/symlink.rs Normal file
View File

@ -0,0 +1,254 @@
use ssri::{Algorithm, Integrity, IntegrityOpts};
use std::fs::DirBuilder;
use std::fs::File;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::async_lib::AsyncRead;
use crate::content::path;
use crate::errors::{IoErrorExt, Result};
fn symlink_file<P, Q>(src: P, dst: Q) -> std::io::Result<()>
where
P: AsRef<Path>,
Q: AsRef<Path>,
{
#[cfg(unix)]
{
use std::os::unix::fs::symlink;
symlink(src, dst)
}
#[cfg(windows)]
{
use std::os::windows::fs::symlink_file;
symlink_file(src, dst)
}
#[cfg(not(any(unix, windows)))]
{
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"symlinking is not supported on this platform",
))
}
}
fn create_symlink(sri: Integrity, cache: &PathBuf, target: &PathBuf) -> Result<Integrity> {
let cpath = path::content_path(cache.as_ref(), &sri);
DirBuilder::new()
.recursive(true)
// Safe unwrap. cpath always has multiple segments
.create(cpath.parent().unwrap())
.with_context(|| {
format!(
"Failed to create destination directory for linked cache file, at {}",
cpath.parent().unwrap().display()
)
})?;
if let Err(e) = symlink_file(target, cpath.clone()) {
// If symlinking fails because there's *already* a file at the desired
// destination, that is ok -- all the cache should care about is that
// there is **some** valid file associated with the computed integrity.
if !cpath.exists() {
return Err(e).with_context(|| {
format!(
"Failed to create cache symlink for {} at {}",
target.display(),
cpath.display()
)
});
}
}
Ok(sri)
}
/// A `Read`-like type that calculates the integrity of a file as it is read.
/// When the linker is committed, the file is symlinked into the cache using the
/// integrity computed from the target file's contents.
pub struct Linker {
/// The path to the target file that will be symlinked from the cache.
target: PathBuf,
/// The path to the root of the cache directory.
cache: PathBuf,
/// The file descriptor to the target file.
fd: File,
/// The integrity builder for calculating the target file's integrity.
builder: IntegrityOpts,
}
impl Linker {
pub fn new(cache: &Path, algo: Algorithm, target: &Path) -> Result<Self> {
let file = File::open(target)
.with_context(|| format!("Failed to open reader to {}", target.display()))?;
Ok(Self {
target: target.to_path_buf(),
cache: cache.to_path_buf(),
fd: file,
builder: IntegrityOpts::new().algorithm(algo),
})
}
/// Consume the Linker and commit the symlink to the cache.
pub fn commit(self) -> Result<Integrity> {
create_symlink(self.builder.result(), &self.cache, &self.target)
}
}
impl std::io::Read for Linker {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let amt = self.fd.read(buf)?;
if amt > 0 {
self.builder.input(&buf[..amt]);
}
Ok(amt)
}
}
/// An `AsyncRead`-like type that calculates the integrity of a file as it is
/// read. When the linker is committed, the file is symlinked into the cache
/// using the integrity computed from the target file's contents.
pub struct AsyncLinker {
/// The path to the target file that will be symlinked from the cache.
target: PathBuf,
/// The path to the root of the cache directory.
cache: PathBuf,
/// The async-enabled file descriptor to the target file.
fd: crate::async_lib::File,
/// The integrity builder for calculating the target file's integrity.
builder: IntegrityOpts,
}
impl AsyncRead for AsyncLinker {
#[cfg(feature = "async-std")]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
let amt = futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?;
if amt > 0 {
self.builder.input(&buf[..amt]);
}
Poll::Ready(Ok(amt))
}
#[cfg(feature = "tokio")]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
let pre_len = buf.filled().len();
futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?;
if buf.filled().len() > pre_len {
self.builder.input(&buf.filled()[pre_len..]);
}
Poll::Ready(Ok(()))
}
}
impl AsyncLinker {
pub async fn new(cache: &Path, algo: Algorithm, target: &Path) -> Result<Self> {
let file = crate::async_lib::File::open(target)
.await
.with_context(|| format!("Failed to open reader to {}", target.display()))?;
Ok(Self {
target: target.to_path_buf(),
cache: cache.to_path_buf(),
fd: file,
builder: IntegrityOpts::new().algorithm(algo),
})
}
/// Consume the Linker and commit the symlink to the cache.
pub async fn commit(self) -> Result<Integrity> {
create_symlink(self.builder.result(), &self.cache, &self.target)
}
}
#[cfg(test)]
mod tests {
use std::io::{Read, Write};
use super::*;
#[cfg(feature = "async-std")]
use async_attributes::test as async_test;
#[cfg(feature = "tokio")]
use tokio::test as async_test;
#[cfg(feature = "async-std")]
use futures::io::AsyncReadExt;
#[cfg(feature = "tokio")]
use tokio::io::AsyncReadExt;
fn create_tmpfile(tmp: &tempfile::TempDir, buf: &[u8]) -> PathBuf {
let dir = tmp.path().to_owned();
let target = dir.join("target-file");
std::fs::create_dir_all(target.parent().unwrap().clone()).unwrap();
let mut file = File::create(target.clone()).unwrap();
file.write_all(buf).unwrap();
file.flush().unwrap();
target
}
#[test]
fn basic_link() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut linker = Linker::new(&dir, Algorithm::Sha256, &target).unwrap();
// read all of the data from the linker, which will calculate the integrity
// hash.
let mut buf = Vec::new();
linker.read_to_end(&mut buf).unwrap();
assert_eq!(buf, b"hello world");
// commit the linker, creating a symlink in the cache and an integrity
// hash.
let sri = linker.commit().unwrap();
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());
let cpath = path::content_path(&dir, &sri);
assert!(cpath.exists());
let metadata = std::fs::symlink_metadata(cpath.clone()).unwrap();
let file_type = metadata.file_type();
assert!(file_type.is_symlink());
assert_eq!(std::fs::read(cpath).unwrap(), b"hello world");
}
#[async_test]
async fn basic_async_link() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut linker = AsyncLinker::new(&dir, Algorithm::Sha256, &target)
.await
.unwrap();
// read all of the data from the linker, which will calculate the integrity
// hash.
let mut buf: Vec<u8> = Vec::new();
AsyncReadExt::read_to_end(&mut linker, &mut buf)
.await
.unwrap();
assert_eq!(buf, b"hello world");
// commit the linker, creating a symlink in the cache and an integrity
// hash.
let sri = linker.commit().await.unwrap();
assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string());
let cpath = path::content_path(&dir, &sri);
assert!(cpath.exists());
let metadata = std::fs::symlink_metadata(cpath.clone()).unwrap();
let file_type = metadata.file_type();
assert!(file_type.is_symlink());
assert_eq!(std::fs::read(cpath).unwrap(), b"hello world");
}
}

View File

@ -121,6 +121,27 @@
//! Ok(())
//! }
//! ```
//!
//! ### Linking exsiting files
//!
//! The `link` feature enables an additional set of APIs for adding existing
//! files into the cache via symlinks, without having to duplicate the data.
//! Once linked into the cache, these files can be accessed by key just like
//! other cached data, with the same integrity checking.
//!
//! The `link` methods are available in both async and sync variants, using the
//! same suffixes as the other APIs.
//!
//! ```no_run
//! #[async_attributes::main]
//! async fn main() -> cacache::Result<()> {
//! #[cfg(feature = "link")]
//! cacache::link("./my-cache", "key", "/path/to/my-other-file.txt").await?;
//! let data = cacache::read("./my-cache", "key").await?;
//! assert_eq!(data, b"my-data");
//! Ok(())
//! }
//! ```
#![warn(missing_docs)]
#[cfg(not(any(feature = "async-std", feature = "tokio-runtime")))]
@ -139,6 +160,8 @@ mod errors;
pub mod index;
mod get;
#[cfg(feature = "link")]
mod link;
mod ls;
mod put;
mod rm;
@ -147,6 +170,8 @@ pub use errors::{Error, Result};
pub use index::Metadata;
pub use get::*;
#[cfg(feature = "link")]
pub use link::*;
pub use ls::*;
pub use put::*;
pub use rm::*;

625
src/link.rs Normal file
View File

@ -0,0 +1,625 @@
use crate::async_lib::AsyncRead;
use crate::content::symlink;
use crate::errors::{Error, IoErrorExt, Result};
use crate::{index, WriteOpts};
use ssri::{Algorithm, Integrity};
use std::io::Read;
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::task::{Context as TaskContext, Poll};
#[cfg(feature = "async-std")]
use futures::io::AsyncReadExt;
#[cfg(feature = "tokio")]
use tokio::io::AsyncReadExt;
const BUF_SIZE: usize = 16 * 1024;
const PROBE_SIZE: usize = 8;
/// Asynchronously adds `target` to the `cache` with a symlink, indexing it
/// under `key`.
///
/// ## Example
/// ```no_run
/// use async_attributes;
/// use std::path::Path;
///
/// #[async_attributes::main]
/// async fn main() -> cacache::Result<()> {
/// cacache::link("./my-cache", "my-key", "../my-other-files/my-file.tgz").await?;
/// Ok(())
/// }
/// ```
pub async fn link<P, K, T>(cache: P, key: K, target: T) -> Result<Integrity>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
Linker::open(cache, key, target).await?.commit().await
}
/// Asynchrounously adds `target` to the `cache` with a symlink, skipping
/// associating an index key with it.
///
/// ## Example
/// ```no_run
/// use async_attributes;
/// use std::path::Path;
///
/// #[async_attributes::main]
/// async fn main() -> cacache::Result<()> {
/// cacache::link_hash("./my-cache", "../my-other-files/my-file.tgz").await?;
/// Ok(())
/// }
/// ```
pub async fn link_hash<P, T>(cache: P, target: T) -> Result<Integrity>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
Linker::open_hash(cache, target).await?.commit().await
}
/// Synchronously adds `target` to the `cache` with a symlink, indexing it under
/// `key`.
///
/// ## Example
/// ```no_run
/// use std::io::Read;
/// use std::path::Path;
///
/// fn main() -> cacache::Result<()> {
/// cacache::link_sync("./my-cache", "my-key", "../my-other-files/my-file.tgz")?;
/// Ok(())
/// }
/// ```
pub fn link_sync<P, K, T>(cache: P, key: K, target: T) -> Result<Integrity>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
SyncLinker::open(cache, key, target)?.commit()
}
/// Synchronously adds `target` to the `cache` with a symlink, skipping
/// associating an index key with it.
///
/// ## Example
/// ```no_run
/// use std::io::Read;
/// use std::path::Path;
///
/// fn main() -> cacache::Result<()> {
/// cacache::link_hash_sync("./my-cache", "../foo/bar.tgz")?;
/// Ok(())
/// }
/// ```
pub fn link_hash_sync<P, T>(cache: P, target: T) -> Result<Integrity>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
SyncLinker::open_hash(cache, target)?.commit()
}
/// Extend the `WriteOpts` struct with factories for creating `Linker` and
/// `SyncLinker` instances.
impl WriteOpts {
/// Opens the target file handle for reading, returning a Linker instance.
pub async fn link<P, K, T>(self, cache: P, key: K, target: T) -> Result<Linker>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
async fn inner(opts: WriteOpts, cache: &Path, key: &str, target: &Path) -> Result<Linker> {
Ok(Linker {
cache: cache.to_path_buf(),
key: Some(String::from(key)),
read: 0,
linker: symlink::AsyncLinker::new(
cache,
opts.algorithm.unwrap_or(Algorithm::Sha256),
target,
)
.await?,
opts,
})
}
inner(self, cache.as_ref(), key.as_ref(), target.as_ref()).await
}
/// Opens the target file handle for reading, without a key, returning a
/// Linker instance.
pub async fn link_hash<P, T>(self, cache: P, target: T) -> Result<Linker>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
async fn inner(opts: WriteOpts, cache: &Path, target: &Path) -> Result<Linker> {
Ok(Linker {
cache: cache.to_path_buf(),
key: None,
read: 0,
linker: symlink::AsyncLinker::new(
cache,
opts.algorithm.unwrap_or(Algorithm::Sha256),
target,
)
.await?,
opts,
})
}
inner(self, cache.as_ref(), target.as_ref()).await
}
/// Opens the target file handle for reading synchronously, returning a
/// SyncLinker instance.
pub fn link_sync<P, K, T>(self, cache: P, key: K, target: T) -> Result<SyncLinker>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
fn inner(opts: WriteOpts, cache: &Path, key: &str, target: &Path) -> Result<SyncLinker> {
Ok(SyncLinker {
cache: cache.to_path_buf(),
key: Some(String::from(key)),
read: 0,
linker: symlink::Linker::new(
cache,
opts.algorithm.unwrap_or(Algorithm::Sha256),
target,
)?,
opts,
})
}
inner(self, cache.as_ref(), key.as_ref(), target.as_ref())
}
/// Opens the target file handle for reading synchronously, without a key,
/// returning a SyncLinker instance.
pub fn link_hash_sync<P, T>(self, cache: P, target: T) -> Result<SyncLinker>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
fn inner(opts: WriteOpts, cache: &Path, target: &Path) -> Result<SyncLinker> {
Ok(SyncLinker {
cache: cache.to_path_buf(),
key: None,
read: 0,
linker: symlink::Linker::new(
cache,
opts.algorithm.unwrap_or(Algorithm::Sha256),
target,
)?,
opts,
})
}
inner(self, cache.as_ref(), target.as_ref())
}
}
/// A file handle for asynchronously reading data from a file to be added to the
/// cache via a symlink.
///
/// Make sure to call `.commit()` when done reading to actually add the file to
/// the cache.
pub struct Linker {
cache: PathBuf,
key: Option<String>,
read: usize,
pub(crate) linker: symlink::AsyncLinker,
opts: WriteOpts,
}
impl AsyncRead for Linker {
#[cfg(feature = "async-std")]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
let amt = futures::ready!(Pin::new(&mut self.linker).poll_read(cx, buf))?;
self.read += amt;
Poll::Ready(Ok(amt))
}
#[cfg(feature = "tokio")]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
let pre_len = buf.filled().len();
futures::ready!(Pin::new(&mut self.linker).poll_read(cx, buf))?;
self.read += buf.filled().len() - pre_len;
Poll::Ready(Ok(()))
}
}
impl Linker {
/// Creates a new asynchronous readable file handle into the cache.
pub async fn open<P, K, T>(cache: P, key: K, target: T) -> Result<Self>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
async fn inner(cache: &Path, key: &str, target: &Path) -> Result<Linker> {
let size = target
.metadata()
.with_context(|| format!("Failed to get metadata of {}", target.display()))?
.len() as usize;
WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(size)
.link(cache, key, target)
.await
}
inner(cache.as_ref(), key.as_ref(), target.as_ref()).await
}
/// Creates a new asynchronous readable file handle into the cache.
pub async fn open_hash<P, T>(cache: P, target: T) -> Result<Self>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
async fn inner(cache: &Path, target: &Path) -> Result<Linker> {
let size = target
.metadata()
.with_context(|| format!("Failed to get metadata of {}", target.display()))?
.len() as usize;
WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(size)
.link_hash(cache, target)
.await
}
inner(cache.as_ref(), target.as_ref()).await
}
/// Consumes the rest of the file handle, creates a symlink into the cache,
/// and creates index entries for the linked file. Also verifies data
/// against `size` and `integrity` options, if provided. Must be called
/// manually in order to complete the writing process, otherwise everything
/// will be thrown out.
pub async fn commit(mut self) -> Result<Integrity> {
self.consume().await?;
let linker_sri = self.linker.commit().await?;
if let Some(sri) = &self.opts.sri {
if sri.matches(&linker_sri).is_none() {
return Err(ssri::Error::IntegrityCheckError(sri.clone(), linker_sri).into());
}
} else {
self.opts.sri = Some(linker_sri.clone());
}
if let Some(size) = self.opts.size {
if size != self.read {
return Err(Error::SizeMismatch(size, self.read));
}
}
if let Some(key) = self.key {
index::insert(&self.cache, &key, self.opts)
} else {
Ok(linker_sri)
}
}
// "Consume" the remainder of the reader, so that the integrity is properly
// calculated.
async fn consume(&mut self) -> Result<()> {
// Do a small 'test' read to avoid allocating a larger buffer if it
// isn't necessary.
let mut probe = [0; PROBE_SIZE];
if self.context_read(&mut probe).await? > 0 {
// Make sure all the bytes are read so that the integrity is
// properly calculated.
let mut buf = [0; BUF_SIZE];
while self.context_read(&mut buf).await? > 0 {}
}
Ok(())
}
async fn context_read(&mut self, buf: &mut [u8]) -> Result<usize> {
AsyncReadExt::read(self, buf).await.with_context(|| {
"Failed to read target file contents while calculating integrity".into()
})
}
}
/// A file handle for synchronously reading data from a file to be added to the
/// cache via a symlink.
///
/// Make sure to call `.commit()` when done reading to actually add the file
/// to the cache.
pub struct SyncLinker {
cache: PathBuf,
key: Option<String>,
read: usize,
pub(crate) linker: symlink::Linker,
opts: WriteOpts,
}
impl std::io::Read for SyncLinker {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let amt = self.linker.read(buf)?;
self.read += amt;
Ok(amt)
}
}
impl SyncLinker {
/// Creates a new readable file handle to a file that will be linked into
/// the cache, indexed at the provided key, on commit.
///
/// It is not necessary to read any of the file before calling `.commit()`.
///
/// ## Example
/// ```no_run
/// use std::io::prelude::*;
///
/// fn main() -> cacache::Result<()> {
/// let path = "../my-other-files/my-file.tgz";
/// let mut fd = cacache::SyncLinker::open("./my-cache", "my-key", path)?;
/// let mut str = String::new();
/// fd.read_to_string(&mut str).expect("Failed to read to string");
/// // The file is not linked into the cache until you commit it.
/// fd.commit()?;
/// Ok(())
/// }
/// ```
pub fn open<P, K, T>(cache: P, key: K, target: T) -> Result<Self>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
fn inner(cache: &Path, key: &str, target: &Path) -> Result<SyncLinker> {
let size = target
.metadata()
.with_context(|| format!("Failed to get metadata of {}", target.display()))?
.len() as usize;
WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(size)
.link_sync(cache, key, target)
}
inner(cache.as_ref(), key.as_ref(), target.as_ref())
}
/// Creates a new readable file handle to a file that will be linked into
/// the cache, without an indexe key, on commit.
///
/// It is not necessary to read any of the file before calling `.commit()`.
///
/// ## Example
/// ```no_run
/// use std::io::prelude::*;
///
/// fn main() -> cacache::Result<()> {
/// let path = "../my-other-files/my-file.tgz";
/// let mut fd = cacache::SyncLinker::open_hash("./my-cache", path)?;
/// let mut str = String::new();
/// fd.read_to_string(&mut str).expect("Failed to read to string");
/// // The file is not linked into the cache until you commit it.
/// fd.commit()?;
/// Ok(())
/// }
/// ```
pub fn open_hash<P, T>(cache: P, target: T) -> Result<Self>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
fn inner(cache: &Path, target: &Path) -> Result<SyncLinker> {
let size = target
.metadata()
.with_context(|| format!("Failed to get metadata of {}", target.display()))?
.len() as usize;
WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(size)
.link_hash_sync(cache, target)
}
inner(cache.as_ref(), target.as_ref())
}
/// Consumes the rest of the file handle, creates a symlink into the cache,
/// and creates index entries for the linked file. Also verifies data
/// against `size` and `integrity` options, if provided. Must be called
/// manually in order to complete the writing process, otherwise everything
/// will be thrown out.
pub fn commit(mut self) -> Result<Integrity> {
self.consume()?;
let cache = self.cache;
let linker_sri = self.linker.commit()?;
if let Some(sri) = &self.opts.sri {
if sri.matches(&linker_sri).is_none() {
return Err(ssri::Error::IntegrityCheckError(sri.clone(), linker_sri).into());
}
} else {
self.opts.sri = Some(linker_sri.clone());
}
if let Some(size) = self.opts.size {
if size != self.read {
return Err(Error::SizeMismatch(size, self.read));
}
}
if let Some(key) = self.key {
index::insert(&cache, &key, self.opts)
} else {
Ok(linker_sri)
}
}
fn consume(&mut self) -> Result<()> {
// Do a small 'test' read to avoid allocating a larger buffer if it
// isn't necessary.
let mut probe = [0; PROBE_SIZE];
if self.context_read(&mut probe)? > 0 {
// Make sure all the bytes are read so that the integrity is
// properly calculated.
let mut buf = [0; BUF_SIZE];
while self.context_read(&mut buf)? > 0 {}
}
Ok(())
}
fn context_read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.read(buf).with_context(|| {
"Failed to read target file contents while calculating integrity".into()
})
}
}
#[cfg(test)]
mod tests {
use std::fs::File;
use std::io::Write;
use super::*;
#[cfg(feature = "async-std")]
use async_attributes::test as async_test;
#[cfg(feature = "tokio")]
use tokio::test as async_test;
fn create_tmpfile(tmp: &tempfile::TempDir, buf: &[u8]) -> PathBuf {
let dir = tmp.path().to_owned();
let target = dir.join("target-file");
std::fs::create_dir_all(target.parent().unwrap().clone()).unwrap();
let mut file = File::create(target.clone()).unwrap();
file.write_all(buf).unwrap();
file.flush().unwrap();
target
}
#[async_test]
async fn test_link() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
crate::link(&dir, "my-key", target).await.unwrap();
let buf = crate::read(&dir, "my-key").await.unwrap();
assert_eq!(buf, b"hello world");
}
#[async_test]
async fn test_link_hash() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri = crate::link_hash(&dir, target).await.unwrap();
let buf = crate::read_hash(&dir, &sri).await.unwrap();
assert_eq!(buf, b"hello world");
}
#[test]
fn test_link_sync() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
crate::link_sync(&dir, "my-key", target).unwrap();
let buf = crate::read_sync(&dir, "my-key").unwrap();
assert_eq!(buf, b"hello world");
}
#[test]
fn test_link_hash_sync() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri = crate::link_hash_sync(&dir, target).unwrap();
let buf = crate::read_hash_sync(&dir, &sri).unwrap();
assert_eq!(buf, b"hello world");
}
#[async_test]
async fn test_open() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut handle = crate::Linker::open(&dir, "my-key", target).await.unwrap();
let mut buf = Vec::new();
handle.read_to_end(&mut buf).await.unwrap();
handle.commit().await.unwrap();
assert_eq!(buf, b"hello world");
let buf = crate::read_sync(&dir, "my-key").unwrap();
assert_eq!(buf, b"hello world");
}
#[async_test]
async fn test_open_hash() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut handle = crate::Linker::open_hash(&dir, target).await.unwrap();
let mut buf = Vec::new();
handle.read_to_end(&mut buf).await.unwrap();
let sri = handle.commit().await.unwrap();
assert_eq!(buf, b"hello world");
let buf = crate::read_hash_sync(&dir, &sri).unwrap();
assert_eq!(buf, b"hello world");
}
#[test]
fn test_open_sync() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut handle = crate::SyncLinker::open(&dir, "my-key", target).unwrap();
let mut buf = Vec::new();
handle.read_to_end(&mut buf).unwrap();
handle.commit().unwrap();
assert_eq!(buf, b"hello world");
let buf = crate::read_sync(&dir, "my-key").unwrap();
assert_eq!(buf, b"hello world");
}
#[test]
fn test_open_hash_sync() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut handle = crate::SyncLinker::open_hash(&dir, target).unwrap();
let mut buf = Vec::new();
handle.read_to_end(&mut buf).unwrap();
let sri = handle.commit().unwrap();
assert_eq!(buf, b"hello world");
let buf = crate::read_hash_sync(&dir, &sri).unwrap();
assert_eq!(buf, b"hello world");
}
}