coven/storage/cloud/
cloudkit.rs

1//! CloudKit-backed `CloudHome` implementation.
2//!
3//! CloudKit's CKAsset has a 50MB limit, so large files are split into 10MB
4//! chunks stored as separate records: `key.part0`, `key.part1`, etc.
5//!
6//! The `CloudKitOps` trait defines synchronous record operations that are
7//! implemented in Swift via a UniFFI callback interface. `CloudKitCloudHome`
8//! wraps these ops, adds chunking logic, and implements `CloudHome`.
9
10use std::sync::Arc;
11
12use async_trait::async_trait;
13
14use super::{CloudHome, CloudHomeError, CloudHomeJoinInfo};
15
16const CHUNK_SIZE: usize = 10 * 1024 * 1024; // 10MB
17
18/// Synchronous interface for raw CloudKit record operations.
19/// Implemented in Swift via UniFFI callback interface.
20/// Methods block the calling thread while CloudKit async operations complete.
21pub trait CloudKitOps: Send + Sync {
22    fn write_record(&self, key: &str, data: Vec<u8>) -> Result<(), CloudHomeError>;
23    fn read_record(&self, key: &str) -> Result<Vec<u8>, CloudHomeError>;
24    fn list_records(&self, prefix: &str) -> Result<Vec<String>, CloudHomeError>;
25    fn delete_record(&self, key: &str) -> Result<(), CloudHomeError>;
26    fn record_exists(&self, key: &str) -> Result<bool, CloudHomeError>;
27    fn grant_access(&self, email: &str) -> Result<String, CloudHomeError>;
28    fn revoke_access(&self, user_record_id: &str) -> Result<(), CloudHomeError>;
29    fn accept_share(&self, share_url: &str) -> Result<(), CloudHomeError>;
30}
31
32/// CloudKit-backed cloud home with automatic chunking for large files.
33pub struct CloudKitCloudHome {
34    ops: Arc<dyn CloudKitOps>,
35}
36
37impl CloudKitCloudHome {
38    pub fn new(ops: Arc<dyn CloudKitOps>) -> Self {
39        Self { ops }
40    }
41}
42
43/// If a key ends with `.part{digits}`, strip that suffix to get the base key.
44fn strip_part_suffix(key: &str) -> &str {
45    if let Some(idx) = key.rfind(".part") {
46        let after = &key[idx + 5..];
47        if !after.is_empty() && after.chars().all(|c| c.is_ascii_digit()) {
48            return &key[..idx];
49        }
50    }
51    key
52}
53
54/// Delete old single record and chunk records for a key (best-effort).
55fn delete_all_variants(ops: &dyn CloudKitOps, key: &str) -> Result<(), CloudHomeError> {
56    // Delete single record (ignore not-found)
57    match ops.delete_record(key) {
58        Ok(()) | Err(CloudHomeError::NotFound(_)) => {}
59        Err(e) => return Err(e),
60    }
61
62    // Delete all chunk records
63    let chunk_prefix = format!("{key}.part");
64    let chunks = ops.list_records(&chunk_prefix)?;
65    for chunk_key in chunks {
66        match ops.delete_record(&chunk_key) {
67            Ok(()) | Err(CloudHomeError::NotFound(_)) => {}
68            Err(e) => return Err(e),
69        }
70    }
71
72    Ok(())
73}
74
75#[async_trait]
76impl CloudHome for CloudKitCloudHome {
77    async fn write(&self, key: &str, data: Vec<u8>) -> Result<(), CloudHomeError> {
78        let ops = self.ops.clone();
79        let key = key.to_string();
80        tokio::task::spawn_blocking(move || {
81            // Clean up any existing single or chunked records first
82            delete_all_variants(&*ops, &key)?;
83
84            if data.len() <= CHUNK_SIZE {
85                ops.write_record(&key, data)
86            } else {
87                for (i, chunk) in data.chunks(CHUNK_SIZE).enumerate() {
88                    let chunk_key = format!("{key}.part{i}");
89                    ops.write_record(&chunk_key, chunk.to_vec())?;
90                }
91                Ok(())
92            }
93        })
94        .await
95        .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
96    }
97
98    async fn read(&self, key: &str) -> Result<Vec<u8>, CloudHomeError> {
99        let ops = self.ops.clone();
100        let key = key.to_string();
101        tokio::task::spawn_blocking(move || {
102            // Try single record first
103            if ops.record_exists(&key)? {
104                return ops.read_record(&key);
105            }
106
107            // Try chunked records
108            let chunk_prefix = format!("{key}.part");
109            let chunk_keys = ops.list_records(&chunk_prefix)?;
110            if chunk_keys.is_empty() {
111                return Err(CloudHomeError::NotFound(key));
112            }
113
114            // Parse part numbers up front; a key without a valid `.part{N}` suffix
115            // would corrupt assembly order if treated as part 0.
116            let mut numbered: Vec<(usize, String)> = chunk_keys
117                .into_iter()
118                .map(|k| {
119                    let n = k
120                        .rsplit_once(".part")
121                        .and_then(|(_, suffix)| suffix.parse::<usize>().ok())
122                        .ok_or_else(|| {
123                            CloudHomeError::Storage(format!("chunk key {k:?} missing .part suffix"))
124                        })?;
125                    Ok::<_, CloudHomeError>((n, k))
126                })
127                .collect::<Result<_, _>>()?;
128            numbered.sort_by_key(|(n, _)| *n);
129
130            let mut result = Vec::new();
131            for (_, chunk_key) in &numbered {
132                let chunk = ops.read_record(chunk_key)?;
133                result.extend_from_slice(&chunk);
134            }
135            Ok(result)
136        })
137        .await
138        .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
139    }
140
141    async fn read_range(&self, key: &str, start: u64, end: u64) -> Result<Vec<u8>, CloudHomeError> {
142        if end <= start {
143            return Ok(Vec::new());
144        }
145
146        let ops = self.ops.clone();
147        let key = key.to_string();
148        tokio::task::spawn_blocking(move || {
149            let start = start as usize;
150            let end = end as usize;
151
152            // Try single record first
153            if ops.record_exists(&key)? {
154                let data = ops.read_record(&key)?;
155                if end > data.len() {
156                    return Err(CloudHomeError::Storage(format!(
157                        "range {start}..{end} exceeds file size {}",
158                        data.len()
159                    )));
160                }
161                return Ok(data[start..end].to_vec());
162            }
163
164            // Chunked read: calculate which chunks overlap [start, end)
165            let first_chunk = start / CHUNK_SIZE;
166            let last_chunk = (end - 1) / CHUNK_SIZE;
167
168            let mut result = Vec::with_capacity(end - start);
169            for i in first_chunk..=last_chunk {
170                let chunk_key = format!("{key}.part{i}");
171                let chunk = ops.read_record(&chunk_key)?;
172
173                let chunk_start = i * CHUNK_SIZE;
174                let slice_start = if i == first_chunk {
175                    start - chunk_start
176                } else {
177                    0
178                };
179                let slice_end = if i == last_chunk {
180                    end - chunk_start
181                } else {
182                    chunk.len()
183                };
184                result.extend_from_slice(&chunk[slice_start..slice_end]);
185            }
186            Ok(result)
187        })
188        .await
189        .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
190    }
191
192    async fn list(&self, prefix: &str) -> Result<Vec<String>, CloudHomeError> {
193        let ops = self.ops.clone();
194        let prefix = prefix.to_string();
195        tokio::task::spawn_blocking(move || {
196            let raw_keys = ops.list_records(&prefix)?;
197
198            // Strip .partN suffixes and deduplicate
199            let mut base_keys: Vec<String> = raw_keys
200                .iter()
201                .map(|k| strip_part_suffix(k).to_string())
202                .collect();
203            base_keys.sort();
204            base_keys.dedup();
205            Ok(base_keys)
206        })
207        .await
208        .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
209    }
210
211    async fn delete(&self, key: &str) -> Result<(), CloudHomeError> {
212        let ops = self.ops.clone();
213        let key = key.to_string();
214        tokio::task::spawn_blocking(move || delete_all_variants(&*ops, &key))
215            .await
216            .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
217    }
218
219    async fn exists(&self, key: &str) -> Result<bool, CloudHomeError> {
220        let ops = self.ops.clone();
221        let key = key.to_string();
222        tokio::task::spawn_blocking(move || {
223            if ops.record_exists(&key)? {
224                return Ok(true);
225            }
226            let chunk_prefix = format!("{key}.part");
227            let chunks = ops.list_records(&chunk_prefix)?;
228            Ok(!chunks.is_empty())
229        })
230        .await
231        .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
232    }
233
234    async fn grant_access(&self, member_id: &str) -> Result<CloudHomeJoinInfo, CloudHomeError> {
235        let ops = self.ops.clone();
236        let member_id = member_id.to_string();
237        tokio::task::spawn_blocking(move || {
238            let share_url = ops.grant_access(&member_id)?;
239            Ok(CloudHomeJoinInfo::CloudKit { share_url })
240        })
241        .await
242        .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
243    }
244
245    async fn revoke_access(&self, member_id: &str) -> Result<(), CloudHomeError> {
246        let ops = self.ops.clone();
247        let member_id = member_id.to_string();
248        tokio::task::spawn_blocking(move || ops.revoke_access(&member_id))
249            .await
250            .map_err(|e| CloudHomeError::Storage(format!("spawn_blocking failed: {e}")))?
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::*;
257    use std::collections::HashMap;
258    use std::sync::Mutex;
259
260    struct MockCloudKitOps {
261        store: Mutex<HashMap<String, Vec<u8>>>,
262    }
263
264    impl MockCloudKitOps {
265        fn new() -> Self {
266            Self {
267                store: Mutex::new(HashMap::new()),
268            }
269        }
270    }
271
272    impl CloudKitOps for MockCloudKitOps {
273        fn write_record(&self, key: &str, data: Vec<u8>) -> Result<(), CloudHomeError> {
274            self.store.lock().unwrap().insert(key.to_string(), data);
275            Ok(())
276        }
277
278        fn read_record(&self, key: &str) -> Result<Vec<u8>, CloudHomeError> {
279            self.store
280                .lock()
281                .unwrap()
282                .get(key)
283                .cloned()
284                .ok_or_else(|| CloudHomeError::NotFound(key.to_string()))
285        }
286
287        fn list_records(&self, prefix: &str) -> Result<Vec<String>, CloudHomeError> {
288            let store = self.store.lock().unwrap();
289            let mut keys: Vec<String> = store
290                .keys()
291                .filter(|k| k.starts_with(prefix))
292                .cloned()
293                .collect();
294            keys.sort();
295            Ok(keys)
296        }
297
298        fn delete_record(&self, key: &str) -> Result<(), CloudHomeError> {
299            self.store.lock().unwrap().remove(key);
300            Ok(())
301        }
302
303        fn record_exists(&self, key: &str) -> Result<bool, CloudHomeError> {
304            Ok(self.store.lock().unwrap().contains_key(key))
305        }
306
307        fn grant_access(&self, email: &str) -> Result<String, CloudHomeError> {
308            Ok(format!("https://www.icloud.com/share/{email}"))
309        }
310
311        fn revoke_access(&self, _user_record_id: &str) -> Result<(), CloudHomeError> {
312            Ok(())
313        }
314
315        fn accept_share(&self, _share_url: &str) -> Result<(), CloudHomeError> {
316            Ok(())
317        }
318    }
319
320    fn make_cloud_home() -> CloudKitCloudHome {
321        CloudKitCloudHome::new(Arc::new(MockCloudKitOps::new()))
322    }
323
324    #[tokio::test]
325    async fn test_small_file_roundtrip() {
326        let ch = make_cloud_home();
327        let data = b"hello world".to_vec();
328        ch.write("small.bin", data.clone()).await.unwrap();
329        let read = ch.read("small.bin").await.unwrap();
330        assert_eq!(read, data);
331    }
332
333    #[tokio::test]
334    async fn test_large_file_roundtrip() {
335        let ch = make_cloud_home();
336        // 25MB of data -- spans 3 chunks (10 + 10 + 5)
337        let data: Vec<u8> = (0..25 * 1024 * 1024).map(|i| (i % 256) as u8).collect();
338        ch.write("large.bin", data.clone()).await.unwrap();
339        let read = ch.read("large.bin").await.unwrap();
340        assert_eq!(read.len(), data.len());
341        assert_eq!(read, data);
342    }
343
344    #[tokio::test]
345    async fn test_read_range_single() {
346        let ch = make_cloud_home();
347        ch.write("range.bin", b"0123456789".to_vec()).await.unwrap();
348        let slice = ch.read_range("range.bin", 3, 7).await.unwrap();
349        assert_eq!(slice, b"3456");
350    }
351
352    #[tokio::test]
353    async fn test_read_range_chunked() {
354        let ch = make_cloud_home();
355        // Create data that spans 2 chunks: 15MB
356        let data: Vec<u8> = (0..15 * 1024 * 1024).map(|i| (i % 256) as u8).collect();
357        ch.write("big.bin", data.clone()).await.unwrap();
358
359        // Read a range that crosses the chunk boundary (last byte of chunk 0, first byte of chunk 1)
360        let boundary = CHUNK_SIZE;
361        let start = (boundary - 2) as u64;
362        let end = (boundary + 3) as u64;
363        let slice = ch.read_range("big.bin", start, end).await.unwrap();
364        assert_eq!(slice.len(), 5);
365        assert_eq!(slice, &data[start as usize..end as usize]);
366    }
367
368    #[tokio::test]
369    async fn test_list_deduplicates_chunks() {
370        let ch = make_cloud_home();
371        // Write a chunked file
372        let data: Vec<u8> = vec![0u8; 25 * 1024 * 1024];
373        ch.write("files/album.flac", data).await.unwrap();
374
375        // Also write a small file
376        ch.write("files/cover.jpg", b"img".to_vec()).await.unwrap();
377
378        let keys = ch.list("files/").await.unwrap();
379        assert_eq!(keys.len(), 2);
380        assert!(keys.contains(&"files/album.flac".to_string()));
381        assert!(keys.contains(&"files/cover.jpg".to_string()));
382    }
383
384    #[tokio::test]
385    async fn test_delete_removes_all_chunks() {
386        let ch = make_cloud_home();
387        let data: Vec<u8> = vec![0u8; 25 * 1024 * 1024];
388        ch.write("to-delete.bin", data).await.unwrap();
389
390        assert!(ch.exists("to-delete.bin").await.unwrap());
391
392        ch.delete("to-delete.bin").await.unwrap();
393
394        assert!(!ch.exists("to-delete.bin").await.unwrap());
395
396        // Verify the underlying ops store is empty of related keys
397        let ops = &ch.ops;
398        let keys = ops.list_records("to-delete.bin").unwrap();
399        assert!(keys.is_empty());
400    }
401
402    #[tokio::test]
403    async fn test_overwrite_chunked_with_single() {
404        let ch = make_cloud_home();
405        // Write large file (chunked)
406        let large_data: Vec<u8> = vec![0u8; 25 * 1024 * 1024];
407        ch.write("file.bin", large_data).await.unwrap();
408
409        // Overwrite with small file (single record)
410        let small_data = b"small".to_vec();
411        ch.write("file.bin", small_data.clone()).await.unwrap();
412
413        let read = ch.read("file.bin").await.unwrap();
414        assert_eq!(read, small_data);
415
416        // Verify no chunk records remain
417        let chunks = ch.ops.list_records("file.bin.part").unwrap();
418        assert!(chunks.is_empty());
419    }
420
421    #[tokio::test]
422    async fn test_overwrite_single_with_chunked() {
423        let ch = make_cloud_home();
424        // Write small file
425        ch.write("file.bin", b"small".to_vec()).await.unwrap();
426
427        // Overwrite with large file (chunked)
428        let large_data: Vec<u8> = vec![1u8; 25 * 1024 * 1024];
429        ch.write("file.bin", large_data.clone()).await.unwrap();
430
431        let read = ch.read("file.bin").await.unwrap();
432        assert_eq!(read, large_data);
433
434        // Verify no single record remains (the base key should not exist)
435        assert!(!ch.ops.record_exists("file.bin").unwrap());
436    }
437
438    #[tokio::test]
439    async fn test_exists() {
440        let ch = make_cloud_home();
441
442        assert!(!ch.exists("nope.bin").await.unwrap());
443
444        ch.write("yep.bin", b"data".to_vec()).await.unwrap();
445        assert!(ch.exists("yep.bin").await.unwrap());
446
447        // Chunked file
448        let data: Vec<u8> = vec![0u8; 15 * 1024 * 1024];
449        ch.write("chunked.bin", data).await.unwrap();
450        assert!(ch.exists("chunked.bin").await.unwrap());
451    }
452
453    #[tokio::test]
454    async fn test_read_range_empty_when_end_leq_start() {
455        let ch = make_cloud_home();
456        ch.write("range.bin", b"0123456789".to_vec()).await.unwrap();
457
458        // end == start returns empty
459        let slice = ch.read_range("range.bin", 3, 3).await.unwrap();
460        assert!(slice.is_empty());
461
462        // end < start returns empty
463        let slice = ch.read_range("range.bin", 5, 2).await.unwrap();
464        assert!(slice.is_empty());
465
466        // end == 0 returns empty (the underflow case)
467        let slice = ch.read_range("range.bin", 0, 0).await.unwrap();
468        assert!(slice.is_empty());
469    }
470
471    #[test]
472    fn test_strip_part_suffix() {
473        assert_eq!(strip_part_suffix("file.bin.part0"), "file.bin");
474        assert_eq!(strip_part_suffix("file.bin.part123"), "file.bin");
475        assert_eq!(strip_part_suffix("file.bin"), "file.bin");
476        assert_eq!(strip_part_suffix("file.partition"), "file.partition");
477        assert_eq!(strip_part_suffix("file.part"), "file.part"); // no digits after .part
478    }
479}