coven/storage/local/
traits.rs1use crate::storage::local::storage_path;
3use thiserror::Error;
4
5#[derive(Error, Debug)]
6pub enum StorageError {
7 #[error("IO error: {0}")]
8 Io(#[from] std::io::Error),
9 #[error("Storage not configured")]
10 NotConfigured,
11 #[error("Cloud storage error: {0}")]
12 Cloud(String),
13 #[error("Database error: {0}")]
14 Database(String),
15}
16
17pub type ProgressCallback = Box<dyn Fn(usize, usize) + Send + Sync>;
19
20#[derive(Clone)]
26pub struct BlobStore {
27 library_dir: crate::library_dir::LibraryDir,
28}
29
30impl BlobStore {
31 pub fn new_local(library_dir: crate::library_dir::LibraryDir) -> Self {
33 Self { library_dir }
34 }
35
36 pub async fn store_bytes(
40 &self,
41 file_id: &str,
42 data: &[u8],
43 on_progress: ProgressCallback,
44 ) -> Result<(), StorageError> {
45 use tokio::io::AsyncWriteExt;
46
47 let total_bytes = data.len();
48 on_progress(0, total_bytes);
49
50 let rel_path = storage_path(file_id);
51 let path = self.library_dir.join(&rel_path);
52
53 if let Some(parent) = path.parent() {
54 tokio::fs::create_dir_all(parent).await?;
55 }
56
57 let batch_size = 1_048_576;
58 let file = tokio::fs::File::create(&path).await?;
59 let mut writer = tokio::io::BufWriter::new(file);
60 let mut bytes_written = 0usize;
61
62 for chunk in data.chunks(batch_size) {
63 writer.write_all(chunk).await?;
64 bytes_written += chunk.len();
65 on_progress(bytes_written.min(total_bytes), total_bytes);
66 }
67
68 writer.flush().await?;
69
70 Ok(())
71 }
72
73 pub async fn store_from_path(
77 &self,
78 file_id: &str,
79 source: &std::path::Path,
80 on_progress: ProgressCallback,
81 ) -> Result<(), StorageError> {
82 use tokio::io::{AsyncReadExt, AsyncWriteExt};
83
84 let total_bytes = tokio::fs::metadata(source).await?.len() as usize;
85 on_progress(0, total_bytes);
86
87 let rel_path = storage_path(file_id);
88 let path = self.library_dir.join(&rel_path);
89
90 if let Some(parent) = path.parent() {
91 tokio::fs::create_dir_all(parent).await?;
92 }
93
94 let batch_size = 1_048_576;
95 let mut reader = tokio::fs::File::open(source).await?;
96 let dest = tokio::fs::File::create(&path).await?;
97 let mut writer = tokio::io::BufWriter::new(dest);
98 let mut buf = vec![0u8; batch_size];
99 let mut bytes_written = 0usize;
100
101 loop {
106 let mut filled = 0usize;
107 while filled < batch_size {
108 let n = reader.read(&mut buf[filled..]).await?;
109 if n == 0 {
110 break;
111 }
112 filled += n;
113 }
114 if filled == 0 {
115 break;
116 }
117 writer.write_all(&buf[..filled]).await?;
118 bytes_written += filled;
119 on_progress(bytes_written.min(total_bytes), total_bytes);
120 }
121
122 writer.flush().await?;
123
124 Ok(())
125 }
126}
127
128#[cfg(test)]
129mod tests {
130 use super::*;
131 use std::sync::{Arc, Mutex};
132 use tempfile::TempDir;
133
134 #[tokio::test]
135 async fn store_from_path_copies_bytes_and_reports_1mib_cadence() {
136 let temp = TempDir::new().unwrap();
137 let library_dir = crate::library_dir::LibraryDir::new(temp.path());
138 let storage = BlobStore::new_local(library_dir);
139
140 let total = 2_621_440usize;
142 let source_bytes: Vec<u8> = (0..total).map(|i| (i % 256) as u8).collect();
143 let source_path = temp.path().join("source.bin");
144 tokio::fs::write(&source_path, &source_bytes).await.unwrap();
145
146 let calls: Arc<Mutex<Vec<(usize, usize)>>> = Arc::new(Mutex::new(Vec::new()));
147 let calls_clone = calls.clone();
148 let file_id = "abcdef1234567890";
149 storage
150 .store_from_path(
151 file_id,
152 &source_path,
153 Box::new(move |written, total| {
154 calls_clone.lock().unwrap().push((written, total));
155 }),
156 )
157 .await
158 .unwrap();
159
160 let dest_path = temp.path().join(storage_path(file_id));
161 let dest_bytes = tokio::fs::read(&dest_path).await.unwrap();
162 assert_eq!(dest_bytes, source_bytes, "destination equals source");
163
164 let calls = calls.lock().unwrap();
165 assert_eq!(
166 &*calls,
167 &[
168 (0, total),
169 (1_048_576, total),
170 (2_097_152, total),
171 (total, total),
172 ],
173 "progress fires once per 1 MiB batch (plus initial and final partial)",
174 );
175 }
176}