coven/storage/local/
traits.rs

1//! Local managed blob storage: plaintext files at content-addressed paths.
2use crate::storage::local::storage_path;
3use thiserror::Error;
4
5#[derive(Error, Debug)]
6pub enum StorageError {
7    #[error("IO error: {0}")]
8    Io(#[from] std::io::Error),
9    #[error("Storage not configured")]
10    NotConfigured,
11    #[error("Cloud storage error: {0}")]
12    Cloud(String),
13    #[error("Database error: {0}")]
14    Database(String),
15}
16
17/// Progress callback type: (bytes_written, total_bytes)
18pub type ProgressCallback = Box<dyn Fn(usize, usize) + Send + Sync>;
19
20/// Storage implementation for managed local storage.
21///
22/// Writes files to `library_dir/storage/ab/cd/{file_id}` as plaintext.
23/// Local files are never encrypted -- encryption only happens when uploading
24/// to the cloud home.
25#[derive(Clone)]
26pub struct BlobStore {
27    library_dir: crate::library_dir::LibraryDir,
28}
29
30impl BlobStore {
31    /// Create storage for managed local blobs.
32    pub fn new_local(library_dir: crate::library_dir::LibraryDir) -> Self {
33        Self { library_dir }
34    }
35
36    /// Write bytes to local storage without creating a DB record.
37    ///
38    /// Uses the given `file_id` for the hash-based storage path.
39    pub async fn store_bytes(
40        &self,
41        file_id: &str,
42        data: &[u8],
43        on_progress: ProgressCallback,
44    ) -> Result<(), StorageError> {
45        use tokio::io::AsyncWriteExt;
46
47        let total_bytes = data.len();
48        on_progress(0, total_bytes);
49
50        let rel_path = storage_path(file_id);
51        let path = self.library_dir.join(&rel_path);
52
53        if let Some(parent) = path.parent() {
54            tokio::fs::create_dir_all(parent).await?;
55        }
56
57        let batch_size = 1_048_576;
58        let file = tokio::fs::File::create(&path).await?;
59        let mut writer = tokio::io::BufWriter::new(file);
60        let mut bytes_written = 0usize;
61
62        for chunk in data.chunks(batch_size) {
63            writer.write_all(chunk).await?;
64            bytes_written += chunk.len();
65            on_progress(bytes_written.min(total_bytes), total_bytes);
66        }
67
68        writer.flush().await?;
69
70        Ok(())
71    }
72
73    /// Stream a source file from disk into local storage without buffering the
74    /// whole thing in memory. Progress is reported in 1 MiB batches to match
75    /// the cadence of `store_bytes`.
76    pub async fn store_from_path(
77        &self,
78        file_id: &str,
79        source: &std::path::Path,
80        on_progress: ProgressCallback,
81    ) -> Result<(), StorageError> {
82        use tokio::io::{AsyncReadExt, AsyncWriteExt};
83
84        let total_bytes = tokio::fs::metadata(source).await?.len() as usize;
85        on_progress(0, total_bytes);
86
87        let rel_path = storage_path(file_id);
88        let path = self.library_dir.join(&rel_path);
89
90        if let Some(parent) = path.parent() {
91            tokio::fs::create_dir_all(parent).await?;
92        }
93
94        let batch_size = 1_048_576;
95        let mut reader = tokio::fs::File::open(source).await?;
96        let dest = tokio::fs::File::create(&path).await?;
97        let mut writer = tokio::io::BufWriter::new(dest);
98        let mut buf = vec![0u8; batch_size];
99        let mut bytes_written = 0usize;
100
101        // Fill `buf` up to `batch_size` per iteration so progress fires once per
102        // full batch. A single `read` on `tokio::fs::File` (even via `BufReader`)
103        // can return far less than the requested length, so without the inner
104        // fill loop we'd report progress on every short read.
105        loop {
106            let mut filled = 0usize;
107            while filled < batch_size {
108                let n = reader.read(&mut buf[filled..]).await?;
109                if n == 0 {
110                    break;
111                }
112                filled += n;
113            }
114            if filled == 0 {
115                break;
116            }
117            writer.write_all(&buf[..filled]).await?;
118            bytes_written += filled;
119            on_progress(bytes_written.min(total_bytes), total_bytes);
120        }
121
122        writer.flush().await?;
123
124        Ok(())
125    }
126}
127
128#[cfg(test)]
129mod tests {
130    use super::*;
131    use std::sync::{Arc, Mutex};
132    use tempfile::TempDir;
133
134    #[tokio::test]
135    async fn store_from_path_copies_bytes_and_reports_1mib_cadence() {
136        let temp = TempDir::new().unwrap();
137        let library_dir = crate::library_dir::LibraryDir::new(temp.path());
138        let storage = BlobStore::new_local(library_dir);
139
140        // 2.5 MiB — two full batches plus a partial tail.
141        let total = 2_621_440usize;
142        let source_bytes: Vec<u8> = (0..total).map(|i| (i % 256) as u8).collect();
143        let source_path = temp.path().join("source.bin");
144        tokio::fs::write(&source_path, &source_bytes).await.unwrap();
145
146        let calls: Arc<Mutex<Vec<(usize, usize)>>> = Arc::new(Mutex::new(Vec::new()));
147        let calls_clone = calls.clone();
148        let file_id = "abcdef1234567890";
149        storage
150            .store_from_path(
151                file_id,
152                &source_path,
153                Box::new(move |written, total| {
154                    calls_clone.lock().unwrap().push((written, total));
155                }),
156            )
157            .await
158            .unwrap();
159
160        let dest_path = temp.path().join(storage_path(file_id));
161        let dest_bytes = tokio::fs::read(&dest_path).await.unwrap();
162        assert_eq!(dest_bytes, source_bytes, "destination equals source");
163
164        let calls = calls.lock().unwrap();
165        assert_eq!(
166            &*calls,
167            &[
168                (0, total),
169                (1_048_576, total),
170                (2_097_152, total),
171                (total, total),
172            ],
173            "progress fires once per 1 MiB batch (plus initial and final partial)",
174        );
175    }
176}