diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 559c55c..f6dda61 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,10 +43,10 @@ jobs: - name: Clippy run: cargo clippy -- -D warnings - name: Check (async-std) - run: cargo check + run: cargo check --features link_to - name: Run tests (async-std) - run: cargo test --verbose + run: cargo test --verbose --features link_to - name: Check (Tokio) - run: cargo check --no-default-features --features tokio-runtime + run: cargo check --no-default-features --features tokio-runtime,link_to - name: Run unit tests (Tokio) - run: cargo test --verbose --no-default-features --features tokio-runtime --lib + run: cargo test --verbose --no-default-features --features tokio-runtime,link_to --lib diff --git a/Cargo.toml b/Cargo.toml index 2489400..4ae3910 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,4 +53,5 @@ harness = false [features] default = ["async-std", "async-attributes"] +link_to = [] tokio-runtime = ["tokio", "tokio-stream"] diff --git a/README.md b/README.md index fd89578..a4d39d3 100644 --- a/README.md +++ b/README.md @@ -63,6 +63,9 @@ Minimum supported Rust version is `1.43.0`. cacache = { version = "*", default-features = false, features = ["tokio-runtime"] } ``` +Experimental support for symlinking to existing files is provided via the +"link_to" feature. + ## Contributing The cacache team enthusiastically welcomes contributions and project participation! There's a bunch of things you can do if you want to contribute! The [Contributor Guide](CONTRIBUTING.md) has all the information you need for everything from reporting bugs to contributing entire new features. Please don't hesitate to jump in if you'd like to, or even ask us questions if something isn't clear. diff --git a/benches/benchmarks.rs b/benches/benchmarks.rs index 123c04a..01ab9e3 100644 --- a/benches/benchmarks.rs +++ b/benches/benchmarks.rs @@ -22,6 +22,7 @@ where use std::fs::{self, File}; use std::io::prelude::*; +use std::path::PathBuf; use criterion::{black_box, criterion_group, criterion_main, Criterion}; @@ -207,6 +208,82 @@ fn write_hash_async(c: &mut Criterion) { }); } +#[cfg(feature = "link_to")] +fn create_tmpfile(tmp: &tempfile::TempDir, buf: &[u8]) -> PathBuf { + let dir = tmp.path().to_owned(); + let target = dir.join("target-file"); + std::fs::create_dir_all(target.parent().unwrap().clone()).unwrap(); + let mut file = File::create(target.clone()).unwrap(); + file.write_all(buf).unwrap(); + file.flush().unwrap(); + target +} + +#[cfg(feature = "link_to")] +fn link_to_async(c: &mut Criterion) { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let cache = tmp.path().to_owned(); + c.bench_function("link_to::file", move |b| { + b.iter_custom(|iters| { + let start = std::time::Instant::now(); + for i in 0..iters { + block_on(cacache::link_to( + &cache, + format!("key{}", i), + target.clone(), + )) + .unwrap(); + } + start.elapsed() + }) + }); +} + +#[cfg(feature = "link_to")] +fn link_to_hash_async(c: &mut Criterion) { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let cache = tmp.path().to_owned(); + c.bench_function("link_to::file_hash", move |b| { + b.iter(|| block_on(cacache::link_to_hash(&cache, target.clone())).unwrap()) + }); +} + +#[cfg(feature = "link_to")] +fn link_to_sync(c: &mut Criterion) { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let cache = tmp.path().to_owned(); + c.bench_function("link_to::file_sync", move |b| { + b.iter_custom(|iters| { + let start = std::time::Instant::now(); + for i in 0..iters { + cacache::link_to_sync(&cache, format!("key{}", i), target.clone()).unwrap(); + } + start.elapsed() + }) + }); +} + +#[cfg(feature = "link_to")] +fn link_to_hash_sync(c: &mut Criterion) { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let cache = tmp.path().to_owned(); + c.bench_function("link_to::file_hash_sync", move |b| { + b.iter(|| cacache::link_to_hash_sync(&cache, target.clone()).unwrap()) + }); +} + criterion_group!( benches, baseline_read_sync, @@ -223,4 +300,17 @@ criterion_group!( read_hash_async_big_data, read_hash_sync_big_data ); + +#[cfg(feature = "link_to")] +criterion_group!( + link_to_benches, + link_to_async, + link_to_hash_async, + link_to_sync, + link_to_hash_sync +); + +#[cfg(feature = "link_to")] +criterion_main!(benches, link_to_benches); +#[cfg(not(feature = "link_to"))] criterion_main!(benches); diff --git a/src/content/linkto.rs b/src/content/linkto.rs new file mode 100644 index 0000000..aafee45 --- /dev/null +++ b/src/content/linkto.rs @@ -0,0 +1,250 @@ +use ssri::{Algorithm, Integrity, IntegrityOpts}; +use std::fs::DirBuilder; +use std::fs::File; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::async_lib::AsyncRead; +use crate::content::path; +use crate::errors::{IoErrorExt, Result}; + +#[cfg(not(any(unix, windows)))] +compile_error!("Symlinking is not supported on this platform."); + +fn symlink_file(src: P, dst: Q) -> std::io::Result<()> +where + P: AsRef, + Q: AsRef, +{ + #[cfg(unix)] + { + use std::os::unix::fs::symlink; + symlink(src, dst) + } + #[cfg(windows)] + { + use std::os::windows::fs::symlink_file; + symlink_file(src, dst) + } +} + +fn create_symlink(sri: Integrity, cache: &PathBuf, target: &PathBuf) -> Result { + let cpath = path::content_path(cache.as_ref(), &sri); + DirBuilder::new() + .recursive(true) + // Safe unwrap. cpath always has multiple segments + .create(cpath.parent().unwrap()) + .with_context(|| { + format!( + "Failed to create destination directory for linked cache file, at {}", + cpath.parent().unwrap().display() + ) + })?; + if let Err(e) = symlink_file(target, cpath.clone()) { + // If symlinking fails because there's *already* a file at the desired + // destination, that is ok -- all the cache should care about is that + // there is **some** valid file associated with the computed integrity. + if !cpath.exists() { + return Err(e).with_context(|| { + format!( + "Failed to create cache symlink for {} at {}", + target.display(), + cpath.display() + ) + }); + } + } + Ok(sri) +} + +/// A `Read`-like type that calculates the integrity of a file as it is read. +/// When the linker is committed, a symlink is created from the cache to the +/// target file using the integrity computed from the file's contents. +pub struct ToLinker { + /// The path to the target file that will be symlinked from the cache. + target: PathBuf, + /// The path to the root of the cache directory. + cache: PathBuf, + /// The file descriptor to the target file. + fd: File, + /// The integrity builder for calculating the target file's integrity. + builder: IntegrityOpts, +} + +impl ToLinker { + pub fn new(cache: &Path, algo: Algorithm, target: &Path) -> Result { + let file = File::open(target) + .with_context(|| format!("Failed to open reader to {}", target.display()))?; + Ok(Self { + target: target.to_path_buf(), + cache: cache.to_path_buf(), + fd: file, + builder: IntegrityOpts::new().algorithm(algo), + }) + } + + /// Add the symlink to the target file from the cache. + pub fn commit(self) -> Result { + create_symlink(self.builder.result(), &self.cache, &self.target) + } +} + +impl std::io::Read for ToLinker { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let amt = self.fd.read(buf)?; + if amt > 0 { + self.builder.input(&buf[..amt]); + } + Ok(amt) + } +} + +/// An `AsyncRead`-like type that calculates the integrity of a file as it is +/// read. When the linker is committed, a symlink is created from the cache to +/// the target file using the integrity computed from the file's contents. +pub struct AsyncToLinker { + /// The path to the target file that will be symlinked from the cache. + target: PathBuf, + /// The path to the root of the cache directory. + cache: PathBuf, + /// The async-enabled file descriptor to the target file. + fd: crate::async_lib::File, + /// The integrity builder for calculating the target file's integrity. + builder: IntegrityOpts, +} + +impl AsyncRead for AsyncToLinker { + #[cfg(feature = "async-std")] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let amt = futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?; + if amt > 0 { + self.builder.input(&buf[..amt]); + } + Poll::Ready(Ok(amt)) + } + + #[cfg(feature = "tokio")] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let pre_len = buf.filled().len(); + futures::ready!(Pin::new(&mut self.fd).poll_read(cx, buf))?; + if buf.filled().len() > pre_len { + self.builder.input(&buf.filled()[pre_len..]); + } + Poll::Ready(Ok(())) + } +} + +impl AsyncToLinker { + pub async fn new(cache: &Path, algo: Algorithm, target: &Path) -> Result { + let file = crate::async_lib::File::open(target) + .await + .with_context(|| format!("Failed to open reader to {}", target.display()))?; + Ok(Self { + target: target.to_path_buf(), + cache: cache.to_path_buf(), + fd: file, + builder: IntegrityOpts::new().algorithm(algo), + }) + } + + /// Add the symlink to the target file from the cache. + pub async fn commit(self) -> Result { + create_symlink(self.builder.result(), &self.cache, &self.target) + } +} + +#[cfg(test)] +mod tests { + use std::io::{Read, Write}; + + use super::*; + + #[cfg(feature = "async-std")] + use async_attributes::test as async_test; + #[cfg(feature = "tokio")] + use tokio::test as async_test; + + #[cfg(feature = "async-std")] + use futures::io::AsyncReadExt; + #[cfg(feature = "tokio")] + use tokio::io::AsyncReadExt; + + fn create_tmpfile(tmp: &tempfile::TempDir, buf: &[u8]) -> PathBuf { + let dir = tmp.path().to_owned(); + let target = dir.join("target-file"); + std::fs::create_dir_all(target.parent().unwrap().clone()).unwrap(); + let mut file = File::create(target.clone()).unwrap(); + file.write_all(buf).unwrap(); + file.flush().unwrap(); + target + } + + #[test] + fn basic_link() { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + let mut linker = ToLinker::new(&dir, Algorithm::Sha256, &target).unwrap(); + + // read all of the data from the linker, which will calculate the integrity + // hash. + let mut buf = Vec::new(); + linker.read_to_end(&mut buf).unwrap(); + assert_eq!(buf, b"hello world"); + + // commit the linker, creating a symlink in the cache and an integrity + // hash. + let sri = linker.commit().unwrap(); + assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string()); + + let cpath = path::content_path(&dir, &sri); + assert!(cpath.exists()); + let metadata = std::fs::symlink_metadata(cpath.clone()).unwrap(); + let file_type = metadata.file_type(); + assert!(file_type.is_symlink()); + assert_eq!(std::fs::read(cpath).unwrap(), b"hello world"); + } + + #[async_test] + async fn basic_async_link() { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + let mut linker = AsyncToLinker::new(&dir, Algorithm::Sha256, &target) + .await + .unwrap(); + + // read all of the data from the linker, which will calculate the integrity + // hash. + let mut buf: Vec = Vec::new(); + AsyncReadExt::read_to_end(&mut linker, &mut buf) + .await + .unwrap(); + assert_eq!(buf, b"hello world"); + + // commit the linker, creating a symlink in the cache and an integrity + // hash. + let sri = linker.commit().await.unwrap(); + assert_eq!(sri.to_string(), Integrity::from(b"hello world").to_string()); + + let cpath = path::content_path(&dir, &sri); + assert!(cpath.exists()); + let metadata = std::fs::symlink_metadata(cpath.clone()).unwrap(); + let file_type = metadata.file_type(); + assert!(file_type.is_symlink()); + assert_eq!(std::fs::read(cpath).unwrap(), b"hello world"); + } +} diff --git a/src/content/mod.rs b/src/content/mod.rs index 0b58856..689aa20 100644 --- a/src/content/mod.rs +++ b/src/content/mod.rs @@ -2,3 +2,6 @@ pub mod path; pub mod read; pub mod rm; pub mod write; + +#[cfg(feature = "link_to")] +pub mod linkto; diff --git a/src/lib.rs b/src/lib.rs index 54a4ea7..eb299b4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,6 +121,27 @@ //! Ok(()) //! } //! ``` +//! +//! ### Linking to existing files +//! +//! The `link_to` feature enables an additional set of APIs for adding existing +//! files into the cache via symlinks, without having to duplicate their data. +//! Once the cache links to them, these files can be accessed by key just like +//! other cached data, with the same integrity checking. +//! +//! The `link_to` methods are available in both async and sync variants, using +//! the same suffixes as the other APIs. +//! +//! ```no_run +//! #[async_attributes::main] +//! async fn main() -> cacache::Result<()> { +//! #[cfg(feature = "link_to")] +//! cacache::link_to("./my-cache", "key", "/path/to/my-other-file.txt").await?; +//! let data = cacache::read("./my-cache", "key").await?; +//! assert_eq!(data, b"my-data"); +//! Ok(()) +//! } +//! ``` #![warn(missing_docs)] #[cfg(not(any(feature = "async-std", feature = "tokio-runtime")))] @@ -139,6 +160,8 @@ mod errors; pub mod index; mod get; +#[cfg(feature = "link_to")] +mod linkto; mod ls; mod put; mod rm; @@ -147,6 +170,8 @@ pub use errors::{Error, Result}; pub use index::Metadata; pub use get::*; +#[cfg(feature = "link_to")] +pub use linkto::*; pub use ls::*; pub use put::*; pub use rm::*; diff --git a/src/linkto.rs b/src/linkto.rs new file mode 100644 index 0000000..3726c2b --- /dev/null +++ b/src/linkto.rs @@ -0,0 +1,625 @@ +use crate::async_lib::AsyncRead; +use crate::content::linkto; +use crate::errors::{Error, IoErrorExt, Result}; +use crate::{index, WriteOpts}; +use ssri::{Algorithm, Integrity}; +use std::io::Read; +use std::path::{Path, PathBuf}; +use std::pin::Pin; +use std::task::{Context as TaskContext, Poll}; + +#[cfg(feature = "async-std")] +use futures::io::AsyncReadExt; +#[cfg(feature = "tokio")] +use tokio::io::AsyncReadExt; + +const BUF_SIZE: usize = 16 * 1024; +const PROBE_SIZE: usize = 8; + +/// Asynchronously adds `target` to the `cache` with a symlink, indexing it +/// under `key`. +/// +/// ## Example +/// ```no_run +/// use async_attributes; +/// use std::path::Path; +/// +/// #[async_attributes::main] +/// async fn main() -> cacache::Result<()> { +/// cacache::link_to("./my-cache", "my-key", "../my-other-files/my-file.tgz").await?; +/// Ok(()) +/// } +/// ``` +pub async fn link_to(cache: P, key: K, target: T) -> Result +where + P: AsRef, + K: AsRef, + T: AsRef, +{ + ToLinker::open(cache, key, target).await?.commit().await +} + +/// Asynchrounously adds `target` to the `cache` with a symlink, skipping +/// associating an index key with it. +/// +/// ## Example +/// ```no_run +/// use async_attributes; +/// use std::path::Path; +/// +/// #[async_attributes::main] +/// async fn main() -> cacache::Result<()> { +/// cacache::link_to_hash("./my-cache", "../my-other-files/my-file.tgz").await?; +/// Ok(()) +/// } +/// ``` +pub async fn link_to_hash(cache: P, target: T) -> Result +where + P: AsRef, + T: AsRef, +{ + ToLinker::open_hash(cache, target).await?.commit().await +} + +/// Synchronously creates a symlink in the `cache` to the `target`, indexing it +/// under `key`. +/// +/// ## Example +/// ```no_run +/// use std::io::Read; +/// use std::path::Path; +/// +/// fn main() -> cacache::Result<()> { +/// cacache::link_to_sync("./my-cache", "my-key", "../my-other-files/my-file.tgz")?; +/// Ok(()) +/// } +/// ``` +pub fn link_to_sync(cache: P, key: K, target: T) -> Result +where + P: AsRef, + K: AsRef, + T: AsRef, +{ + SyncToLinker::open(cache, key, target)?.commit() +} + +/// Synchronously creates a symlink in the `cache` to the `target`, skipping +/// associating an index key with it. +/// +/// ## Example +/// ```no_run +/// use std::io::Read; +/// use std::path::Path; +/// +/// fn main() -> cacache::Result<()> { +/// cacache::link_to_hash_sync("./my-cache", "../foo/bar.tgz")?; +/// Ok(()) +/// } +/// ``` +pub fn link_to_hash_sync(cache: P, target: T) -> Result +where + P: AsRef, + T: AsRef, +{ + SyncToLinker::open_hash(cache, target)?.commit() +} + +/// Extend the `WriteOpts` struct with factories for creating `ToLinker` and +/// `SyncToLinker` instances. +impl WriteOpts { + /// Opens the target file handle for reading, returning a ToLinker instance. + pub async fn link_to(self, cache: P, key: K, target: T) -> Result + where + P: AsRef, + K: AsRef, + T: AsRef, + { + async fn inner( + opts: WriteOpts, + cache: &Path, + key: &str, + target: &Path, + ) -> Result { + Ok(ToLinker { + cache: cache.to_path_buf(), + key: Some(String::from(key)), + read: 0, + linker: linkto::AsyncToLinker::new( + cache, + opts.algorithm.unwrap_or(Algorithm::Sha256), + target, + ) + .await?, + opts, + }) + } + inner(self, cache.as_ref(), key.as_ref(), target.as_ref()).await + } + + /// Opens the target file handle for reading, without a key, returning a + /// ToLinker instance. + pub async fn link_to_hash(self, cache: P, target: T) -> Result + where + P: AsRef, + T: AsRef, + { + async fn inner(opts: WriteOpts, cache: &Path, target: &Path) -> Result { + Ok(ToLinker { + cache: cache.to_path_buf(), + key: None, + read: 0, + linker: linkto::AsyncToLinker::new( + cache, + opts.algorithm.unwrap_or(Algorithm::Sha256), + target, + ) + .await?, + opts, + }) + } + inner(self, cache.as_ref(), target.as_ref()).await + } + + /// Opens the target file handle for reading synchronously, returning a + /// SyncToLinker instance. + pub fn link_to_sync(self, cache: P, key: K, target: T) -> Result + where + P: AsRef, + K: AsRef, + T: AsRef, + { + fn inner(opts: WriteOpts, cache: &Path, key: &str, target: &Path) -> Result { + Ok(SyncToLinker { + cache: cache.to_path_buf(), + key: Some(String::from(key)), + read: 0, + linker: linkto::ToLinker::new( + cache, + opts.algorithm.unwrap_or(Algorithm::Sha256), + target, + )?, + opts, + }) + } + inner(self, cache.as_ref(), key.as_ref(), target.as_ref()) + } + + /// Opens the target file handle for reading synchronously, without a key, + /// returning a SyncToLinker instance. + pub fn link_to_hash_sync(self, cache: P, target: T) -> Result + where + P: AsRef, + T: AsRef, + { + fn inner(opts: WriteOpts, cache: &Path, target: &Path) -> Result { + Ok(SyncToLinker { + cache: cache.to_path_buf(), + key: None, + read: 0, + linker: linkto::ToLinker::new( + cache, + opts.algorithm.unwrap_or(Algorithm::Sha256), + target, + )?, + opts, + }) + } + inner(self, cache.as_ref(), target.as_ref()) + } +} + +/// A file handle for asynchronously reading in data from a file to be added to +/// the cache via a symlink to the target file. +/// +/// Make sure to call `.commit()` when done reading to actually add the file to +/// the cache. +pub struct ToLinker { + cache: PathBuf, + key: Option, + read: usize, + pub(crate) linker: linkto::AsyncToLinker, + opts: WriteOpts, +} + +impl AsyncRead for ToLinker { + #[cfg(feature = "async-std")] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + buf: &mut [u8], + ) -> Poll> { + let amt = futures::ready!(Pin::new(&mut self.linker).poll_read(cx, buf))?; + self.read += amt; + Poll::Ready(Ok(amt)) + } + + #[cfg(feature = "tokio")] + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut TaskContext<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + let pre_len = buf.filled().len(); + futures::ready!(Pin::new(&mut self.linker).poll_read(cx, buf))?; + self.read += buf.filled().len() - pre_len; + Poll::Ready(Ok(())) + } +} + +fn filesize(target: &Path) -> Result { + Ok(target + .metadata() + .with_context(|| format!("Failed to get metadata of {}", target.display()))? + .len() as usize) +} + +impl ToLinker { + /// Creates a new asynchronous readable file handle into the cache. + pub async fn open(cache: P, key: K, target: T) -> Result + where + P: AsRef, + K: AsRef, + T: AsRef, + { + async fn inner(cache: &Path, key: &str, target: &Path) -> Result { + let size = filesize(target)?; + WriteOpts::new() + .algorithm(Algorithm::Sha256) + .size(size) + .link_to(cache, key, target) + .await + } + inner(cache.as_ref(), key.as_ref(), target.as_ref()).await + } + + /// Creates a new asynchronous readable file handle into the cache. + pub async fn open_hash(cache: P, target: T) -> Result + where + P: AsRef, + T: AsRef, + { + async fn inner(cache: &Path, target: &Path) -> Result { + let size = filesize(target)?; + WriteOpts::new() + .algorithm(Algorithm::Sha256) + .size(size) + .link_to_hash(cache, target) + .await + } + inner(cache.as_ref(), target.as_ref()).await + } + + /// Consumes the rest of the file handle, creates an symlink into + /// the cache, and creates index entries for the linked file. Also verifies + /// data against `size` and `integrity` options, if provided. Must be called + /// manually in order to complete the writing process, otherwise everything + /// will be thrown out. + pub async fn commit(mut self) -> Result { + self.consume().await?; + let linker_sri = self.linker.commit().await?; + if let Some(sri) = &self.opts.sri { + if sri.matches(&linker_sri).is_none() { + return Err(ssri::Error::IntegrityCheckError(sri.clone(), linker_sri).into()); + } + } else { + self.opts.sri = Some(linker_sri.clone()); + } + if let Some(size) = self.opts.size { + if size != self.read { + return Err(Error::SizeMismatch(size, self.read)); + } + } + if let Some(key) = self.key { + index::insert(&self.cache, &key, self.opts) + } else { + Ok(linker_sri) + } + } + + // "Consume" the remainder of the reader, so that the integrity is properly + // calculated. + async fn consume(&mut self) -> Result<()> { + // Do a small 'test' read to avoid allocating a larger buffer if it + // isn't necessary. + let mut probe = [0; PROBE_SIZE]; + if self.context_read(&mut probe).await? > 0 { + // Make sure all the bytes are read so that the integrity is + // properly calculated. + let mut buf = [0; BUF_SIZE]; + while self.context_read(&mut buf).await? > 0 {} + } + Ok(()) + } + + async fn context_read(&mut self, buf: &mut [u8]) -> Result { + AsyncReadExt::read(self, buf).await.with_context(|| { + "Failed to read target file contents while calculating integrity".into() + }) + } +} + +/// A file handle for synchronously reading data from a file to be added to the +/// cache via a symlink. +/// +/// Make sure to call `.commit()` when done reading to actually add the file +/// to the cache. +pub struct SyncToLinker { + cache: PathBuf, + key: Option, + read: usize, + pub(crate) linker: linkto::ToLinker, + opts: WriteOpts, +} + +impl std::io::Read for SyncToLinker { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let amt = self.linker.read(buf)?; + self.read += amt; + Ok(amt) + } +} + +impl SyncToLinker { + /// Creates a new readable file handle to a file the cache will link to, + /// indexed at the provided key, on commit. + /// + /// It is not necessary to read any of the file before calling `.commit()`. + /// + /// ## Example + /// ```no_run + /// use std::io::prelude::*; + /// + /// fn main() -> cacache::Result<()> { + /// let path = "../my-other-files/my-file.tgz"; + /// let mut fd = cacache::SyncToLinker::open("./my-cache", "my-key", path)?; + /// let mut str = String::new(); + /// fd.read_to_string(&mut str).expect("Failed to read to string"); + /// // The file is not linked into the cache until you commit it. + /// fd.commit()?; + /// Ok(()) + /// } + /// ``` + pub fn open(cache: P, key: K, target: T) -> Result + where + P: AsRef, + K: AsRef, + T: AsRef, + { + fn inner(cache: &Path, key: &str, target: &Path) -> Result { + let size = filesize(target)?; + WriteOpts::new() + .algorithm(Algorithm::Sha256) + .size(size) + .link_to_sync(cache, key, target) + } + inner(cache.as_ref(), key.as_ref(), target.as_ref()) + } + + /// Creates a new readable file handle to a file that the cache will link + /// to, without an indexe key, on commit. + /// + /// It is not necessary to read any of the file before calling `.commit()`. + /// + /// ## Example + /// ```no_run + /// use std::io::prelude::*; + /// + /// fn main() -> cacache::Result<()> { + /// let path = "../my-other-files/my-file.tgz"; + /// let mut fd = cacache::SyncToLinker::open_hash("./my-cache", path)?; + /// let mut str = String::new(); + /// fd.read_to_string(&mut str).expect("Failed to read to string"); + /// // The file is not linked into the cache until you commit it. + /// fd.commit()?; + /// Ok(()) + /// } + /// ``` + pub fn open_hash(cache: P, target: T) -> Result + where + P: AsRef, + T: AsRef, + { + fn inner(cache: &Path, target: &Path) -> Result { + let size = filesize(target)?; + WriteOpts::new() + .algorithm(Algorithm::Sha256) + .size(size) + .link_to_hash_sync(cache, target) + } + inner(cache.as_ref(), target.as_ref()) + } + + /// Consumes the rest of the file handle, creates a symlink to the file, and + /// creates index entries for the linked file. Also verifies data against + /// `size` and `integrity` options, if provided. Must be called manually in + /// order to complete the writing process, otherwise everything will be + /// thrown out. + pub fn commit(mut self) -> Result { + self.consume()?; + let cache = self.cache; + let linker_sri = self.linker.commit()?; + if let Some(sri) = &self.opts.sri { + if sri.matches(&linker_sri).is_none() { + return Err(ssri::Error::IntegrityCheckError(sri.clone(), linker_sri).into()); + } + } else { + self.opts.sri = Some(linker_sri.clone()); + } + if let Some(size) = self.opts.size { + if size != self.read { + return Err(Error::SizeMismatch(size, self.read)); + } + } + if let Some(key) = self.key { + index::insert(&cache, &key, self.opts) + } else { + Ok(linker_sri) + } + } + + fn consume(&mut self) -> Result<()> { + // Do a small 'test' read to avoid allocating a larger buffer if it + // isn't necessary. + let mut probe = [0; PROBE_SIZE]; + if self.context_read(&mut probe)? > 0 { + // Make sure all the bytes are read so that the integrity is + // properly calculated. + let mut buf = [0; BUF_SIZE]; + while self.context_read(&mut buf)? > 0 {} + } + Ok(()) + } + + fn context_read(&mut self, buf: &mut [u8]) -> Result { + self.read(buf).with_context(|| { + "Failed to read target file contents while calculating integrity".into() + }) + } +} + +#[cfg(test)] +mod tests { + use std::fs::File; + use std::io::Write; + + use super::*; + + #[cfg(feature = "async-std")] + use async_attributes::test as async_test; + #[cfg(feature = "tokio")] + use tokio::test as async_test; + + fn create_tmpfile(tmp: &tempfile::TempDir, buf: &[u8]) -> PathBuf { + let dir = tmp.path().to_owned(); + let target = dir.join("target-file"); + std::fs::create_dir_all(target.parent().unwrap().clone()).unwrap(); + let mut file = File::create(target.clone()).unwrap(); + file.write_all(buf).unwrap(); + file.flush().unwrap(); + target + } + + #[async_test] + async fn test_link() { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + crate::link_to(&dir, "my-key", target).await.unwrap(); + + let buf = crate::read(&dir, "my-key").await.unwrap(); + assert_eq!(buf, b"hello world"); + } + + #[async_test] + async fn test_link_to_hash() { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + let sri = crate::link_to_hash(&dir, target).await.unwrap(); + + let buf = crate::read_hash(&dir, &sri).await.unwrap(); + assert_eq!(buf, b"hello world"); + } + + #[test] + fn test_link_to_sync() { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + crate::link_to_sync(&dir, "my-key", target).unwrap(); + + let buf = crate::read_sync(&dir, "my-key").unwrap(); + assert_eq!(buf, b"hello world"); + } + + #[test] + fn test_link_to_hash_sync() { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + let sri = crate::link_to_hash_sync(&dir, target).unwrap(); + + let buf = crate::read_hash_sync(&dir, &sri).unwrap(); + assert_eq!(buf, b"hello world"); + } + + #[async_test] + async fn test_open() { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + let mut handle = crate::ToLinker::open(&dir, "my-key", target).await.unwrap(); + + let mut buf = Vec::new(); + handle.read_to_end(&mut buf).await.unwrap(); + handle.commit().await.unwrap(); + assert_eq!(buf, b"hello world"); + + let buf = crate::read_sync(&dir, "my-key").unwrap(); + assert_eq!(buf, b"hello world"); + } + + #[async_test] + async fn test_open_hash() { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + let mut handle = crate::ToLinker::open_hash(&dir, target).await.unwrap(); + + let mut buf = Vec::new(); + handle.read_to_end(&mut buf).await.unwrap(); + let sri = handle.commit().await.unwrap(); + assert_eq!(buf, b"hello world"); + + let buf = crate::read_hash_sync(&dir, &sri).unwrap(); + assert_eq!(buf, b"hello world"); + } + + #[test] + fn test_open_sync() { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + let mut handle = crate::SyncToLinker::open(&dir, "my-key", target).unwrap(); + + let mut buf = Vec::new(); + handle.read_to_end(&mut buf).unwrap(); + handle.commit().unwrap(); + assert_eq!(buf, b"hello world"); + + let buf = crate::read_sync(&dir, "my-key").unwrap(); + assert_eq!(buf, b"hello world"); + } + + #[test] + fn test_open_hash_sync() { + let tmp = tempfile::tempdir().unwrap(); + let target = create_tmpfile(&tmp, b"hello world"); + + let tmp = tempfile::tempdir().unwrap(); + let dir = tmp.path().to_owned(); + let mut handle = crate::SyncToLinker::open_hash(&dir, target).unwrap(); + + let mut buf = Vec::new(); + handle.read_to_end(&mut buf).unwrap(); + let sri = handle.commit().unwrap(); + assert_eq!(buf, b"hello world"); + + let buf = crate::read_hash_sync(&dir, &sri).unwrap(); + assert_eq!(buf, b"hello world"); + } +}