coven/sync/
snapshot.rs

1/// Snapshots and garbage collection for the sync system.
2///
3/// Periodically, a device creates a full snapshot of the database via
4/// `VACUUM INTO`, encrypts it, and uploads as `snapshot.db.enc`. This
5/// allows new devices to bootstrap without replaying the entire changeset
6/// history, and enables GC of old changesets.
7///
8/// Snapshot creation policy: after every N changesets (default 100) or
9/// T hours (default 24) since the last snapshot.
10use std::collections::HashMap;
11use std::ffi::CString;
12use std::path::Path;
13
14use libsqlite3_sys as ffi;
15use serde::{Deserialize, Serialize};
16use tracing::{info, warn};
17
18use super::storage::{StorageError, SyncStorage};
19use crate::encryption::EncryptionService;
20
21/// Default: create a snapshot after this many changesets since the last one.
22const SNAPSHOT_CHANGESET_THRESHOLD: u64 = 100;
23
24/// Default: create a snapshot after this many hours since the last one.
25const SNAPSHOT_HOURS_THRESHOLD: u64 = 24;
26
27/// Error type for snapshot operations.
28#[derive(Debug, thiserror::Error)]
29pub enum SnapshotError {
30    #[error("VACUUM INTO failed: {0}")]
31    VacuumFailed(String),
32    #[error("IO error: {0}")]
33    Io(#[from] std::io::Error),
34    #[error("storage error: {0}")]
35    Bucket(#[from] StorageError),
36    #[error("decryption failed: {0}")]
37    Decryption(String),
38}
39
40/// Metadata stored alongside a snapshot in `snapshot_meta.json.enc`.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct SnapshotMeta {
43    /// Per-device cursors at snapshot time: device_id -> head seq.
44    /// A bootstrapping device uses these as initial sync_cursors.
45    pub cursors: HashMap<String, u64>,
46    /// RFC 3339 timestamp when the snapshot was created.
47    pub created_at: String,
48}
49
50/// Result of bootstrapping from a snapshot.
51#[derive(Debug)]
52pub struct BootstrapResult {
53    /// Per-device cursors from the snapshot metadata.
54    /// The bootstrapping device should use these as initial sync_cursors.
55    pub cursors: HashMap<String, u64>,
56}
57
58/// Create a snapshot of the database as encrypted bytes.
59///
60/// Uses `VACUUM INTO` to create a clean copy of the database at a temp path,
61/// reads the bytes, encrypts, and returns the encrypted blob.
62///
63/// # Safety
64/// `db` must be a valid, open sqlite3 connection pointer.
65pub unsafe fn create_snapshot(
66    db: *mut ffi::sqlite3,
67    temp_dir: &Path,
68    encryption: &EncryptionService,
69) -> Result<Vec<u8>, SnapshotError> {
70    let snapshot_path = temp_dir.join("snapshot.db");
71    let path_str = snapshot_path
72        .to_str()
73        .expect("temp path should be valid UTF-8");
74
75    // Remove any leftover snapshot file from a previous failed attempt.
76    let _ = std::fs::remove_file(&snapshot_path);
77
78    // VACUUM INTO creates a clean, defragmented copy of the database.
79    let sql = format!("VACUUM INTO '{}'", path_str.replace('\'', "''"));
80    let c_sql = CString::new(sql).expect("SQL should not contain null bytes");
81    let rc = ffi::sqlite3_exec(
82        db,
83        c_sql.as_ptr(),
84        None,
85        std::ptr::null_mut(),
86        std::ptr::null_mut(),
87    );
88    if rc != ffi::SQLITE_OK {
89        let err = ffi::sqlite3_errmsg(db);
90        let msg = if err.is_null() {
91            format!("sqlite3 error code {rc}")
92        } else {
93            std::ffi::CStr::from_ptr(err).to_string_lossy().into_owned()
94        };
95        let _ = std::fs::remove_file(&snapshot_path);
96        return Err(SnapshotError::VacuumFailed(msg));
97    }
98
99    // Read the snapshot file and encrypt.
100    let plaintext = std::fs::read(&snapshot_path)?;
101    let _ = std::fs::remove_file(&snapshot_path);
102
103    let encrypted = encryption.encrypt(&plaintext);
104
105    info!(
106        plaintext_size = plaintext.len(),
107        encrypted_size = encrypted.len(),
108        "created snapshot"
109    );
110
111    Ok(encrypted)
112}
113
114/// Upload a snapshot to the sync storage and update the device head.
115///
116/// Also uploads per-device cursor metadata (`snapshot_meta.json.enc`) so that
117/// bootstrapping devices know where each device was at snapshot time, and GC
118/// can safely delete only changesets covered by the snapshot.
119pub async fn push_snapshot(
120    storage: &dyn SyncStorage,
121    encrypted_snapshot: Vec<u8>,
122    device_id: &str,
123    current_seq: u64,
124    clock: &dyn crate::clock::Clock,
125) -> Result<(), SnapshotError> {
126    let size = encrypted_snapshot.len();
127    let timestamp = clock.now().to_rfc3339();
128
129    // Upload snapshot (overwrites previous).
130    storage.put_snapshot(encrypted_snapshot).await?;
131
132    // Read all heads and build per-device cursor map for snapshot metadata.
133    let heads = storage.list_heads().await?;
134    let mut cursors: HashMap<String, u64> =
135        heads.iter().map(|h| (h.device_id.clone(), h.seq)).collect();
136    // Ensure our own current_seq is included (our head hasn't been updated yet).
137    cursors.insert(device_id.to_string(), current_seq);
138
139    let meta = SnapshotMeta {
140        cursors,
141        created_at: timestamp.clone(),
142    };
143    let meta_json =
144        serde_json::to_vec(&meta).map_err(|e| SnapshotError::Io(std::io::Error::other(e)))?;
145
146    storage.put_snapshot_meta(meta_json).await?;
147
148    // Update head with snapshot_seq.
149    storage
150        .put_head(device_id, current_seq, Some(current_seq), &timestamp)
151        .await?;
152
153    info!(
154        device_id,
155        snapshot_seq = current_seq,
156        size,
157        "pushed snapshot to sync storage"
158    );
159
160    Ok(())
161}
162
163/// Check whether it's time to create a new snapshot.
164///
165/// Returns true if:
166/// - `changesets_since_snapshot` >= the changeset threshold (100), OR
167/// - `hours_since_snapshot` >= the time threshold (24h), OR
168/// - No snapshot has ever been created (`last_snapshot_seq` is None)
169///   AND at least one changeset has been pushed.
170pub fn should_create_snapshot(
171    local_seq: u64,
172    last_snapshot_seq: Option<u64>,
173    hours_since_snapshot: Option<u64>,
174) -> bool {
175    // Never created a snapshot, and we have at least one changeset.
176    let Some(snap_seq) = last_snapshot_seq else {
177        return local_seq > 0;
178    };
179
180    let changesets_since = local_seq.saturating_sub(snap_seq);
181    if changesets_since >= SNAPSHOT_CHANGESET_THRESHOLD {
182        return true;
183    }
184
185    if let Some(hours) = hours_since_snapshot {
186        if hours >= SNAPSHOT_HOURS_THRESHOLD && changesets_since > 0 {
187            return true;
188        }
189    }
190
191    false
192}
193
194/// Delete changesets that are superseded by a snapshot.
195///
196/// Reads snapshot metadata to get per-device cursors at snapshot time.
197/// For each device, only deletes changesets with seq <= the device's cursor
198/// in the snapshot. This ensures changesets pushed AFTER the snapshot are
199/// preserved, even if their seq is below another device's snapshot seq.
200///
201/// Devices that don't appear in the snapshot metadata are skipped entirely
202/// (they appeared after the snapshot was created).
203pub async fn garbage_collect(storage: &dyn SyncStorage) -> Result<GcResult, SnapshotError> {
204    // Read snapshot metadata.
205    let meta_json = match storage.get_snapshot_meta().await {
206        Ok(data) => data,
207        Err(StorageError::NotFound(_)) => {
208            // No snapshot metadata -- nothing to GC.
209            info!("no snapshot metadata found, skipping GC");
210            return Ok(GcResult {
211                deleted: 0,
212                errors: 0,
213            });
214        }
215        Err(e) => return Err(SnapshotError::Bucket(e)),
216    };
217
218    let meta: SnapshotMeta = serde_json::from_slice(&meta_json)
219        .map_err(|e| SnapshotError::Io(std::io::Error::other(e)))?;
220
221    let heads = storage.list_heads().await?;
222    let mut deleted = 0u64;
223    let mut errors = 0u64;
224
225    for head in &heads {
226        // Only GC changesets up to what the snapshot covers for THIS device.
227        let safe_seq = match meta.cursors.get(&head.device_id) {
228            Some(&seq) => seq,
229            None => continue, // Device appeared after snapshot -- don't touch.
230        };
231
232        let seqs = match storage.list_changesets(&head.device_id).await {
233            Ok(s) => s,
234            Err(e) => {
235                warn!(
236                    device_id = %head.device_id,
237                    error = %e,
238                    "failed to list changesets for GC, skipping device"
239                );
240                errors += 1;
241                continue;
242            }
243        };
244
245        for seq in seqs {
246            if seq > safe_seq {
247                continue;
248            }
249
250            match storage.delete_changeset(&head.device_id, seq).await {
251                Ok(()) => deleted += 1,
252                Err(e) => {
253                    warn!(
254                        device_id = %head.device_id,
255                        seq,
256                        error = %e,
257                        "failed to delete changeset during GC"
258                    );
259                    errors += 1;
260                }
261            }
262        }
263    }
264
265    info!(deleted, errors, "garbage collection complete");
266
267    Ok(GcResult { deleted, errors })
268}
269
270/// Result of a garbage collection run.
271#[derive(Debug, PartialEq, Eq)]
272pub struct GcResult {
273    /// Number of changesets successfully deleted.
274    pub deleted: u64,
275    /// Number of errors encountered (logged but not fatal).
276    pub errors: u64,
277}
278
279/// Bootstrap a new device from a snapshot.
280///
281/// Downloads `snapshot.db.enc`, decrypts, and writes the plaintext database
282/// to `target_path`. The caller should then open this as their local database
283/// and pull any changesets newer than the per-device cursors in the result.
284///
285/// Returns a `BootstrapResult` with per-device cursors so the caller knows
286/// where to start pulling changesets from each device.
287pub async fn bootstrap_from_snapshot(
288    storage: &dyn SyncStorage,
289    encryption: &EncryptionService,
290    target_path: &Path,
291) -> Result<BootstrapResult, SnapshotError> {
292    // Download both the snapshot blob and its per-device cursor metadata
293    // before touching disk. `push_snapshot` writes the metadata immediately
294    // after the snapshot blob, so its absence here means the bucket is in
295    // a torn state (e.g., a previous push failed between the two uploads).
296    // We refuse to bootstrap from incomplete data, and we fetch metadata
297    // first so we don't leave a half-written DB on the target path.
298    let meta_json = storage
299        .get_snapshot_meta()
300        .await
301        .map_err(SnapshotError::Bucket)?;
302    let meta: SnapshotMeta = serde_json::from_slice(&meta_json)
303        .map_err(|e| SnapshotError::Io(std::io::Error::other(e)))?;
304    let cursors = meta.cursors;
305
306    let encrypted = storage.get_snapshot().await?;
307    let plaintext = encryption
308        .decrypt(&encrypted)
309        .map_err(|e| SnapshotError::Decryption(e.to_string()))?;
310
311    if let Some(parent) = target_path.parent() {
312        std::fs::create_dir_all(parent)?;
313    }
314    std::fs::write(target_path, &plaintext)?;
315
316    info!(
317        num_devices = cursors.len(),
318        db_size = plaintext.len(),
319        path = %target_path.display(),
320        "bootstrapped from snapshot"
321    );
322
323    Ok(BootstrapResult { cursors })
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329    use crate::sync::session::SyncSession;
330    use crate::sync::storage::DeviceHead;
331    use crate::sync::test_helpers::*;
332    use async_trait::async_trait;
333    use std::collections::HashMap;
334    use std::sync::Mutex;
335
336    /// Full-featured mock storage for snapshot tests.
337    struct MockSyncStorage {
338        changesets: Mutex<HashMap<String, Vec<u8>>>,
339        heads: Mutex<HashMap<String, (u64, Option<u64>)>>,
340        snapshot: Mutex<Option<Vec<u8>>>,
341        snapshot_meta: Mutex<Option<Vec<u8>>>,
342        min_schema_version: Mutex<Option<u32>>,
343    }
344
345    impl MockSyncStorage {
346        fn new() -> Self {
347            MockSyncStorage {
348                changesets: Mutex::new(HashMap::new()),
349                heads: Mutex::new(HashMap::new()),
350                snapshot: Mutex::new(None),
351                snapshot_meta: Mutex::new(None),
352                min_schema_version: Mutex::new(None),
353            }
354        }
355
356        /// Helper to add a changeset directly.
357        fn add_changeset(&self, device_id: &str, seq: u64, data: Vec<u8>) {
358            let key = format!("{device_id}/{seq}");
359            self.changesets.lock().unwrap().insert(key, data);
360
361            let mut heads = self.heads.lock().unwrap();
362            let entry = heads.entry(device_id.to_string()).or_insert((0, None));
363            if seq > entry.0 {
364                entry.0 = seq;
365            }
366        }
367
368        /// Count remaining changesets.
369        fn changeset_count(&self) -> usize {
370            self.changesets.lock().unwrap().len()
371        }
372
373        /// Get stored snapshot data.
374        fn get_stored_snapshot(&self) -> Option<Vec<u8>> {
375            self.snapshot.lock().unwrap().clone()
376        }
377
378        /// Get stored snapshot metadata.
379        fn get_stored_snapshot_meta(&self) -> Option<Vec<u8>> {
380            self.snapshot_meta.lock().unwrap().clone()
381        }
382    }
383
384    #[async_trait]
385    impl SyncStorage for MockSyncStorage {
386        async fn list_heads(&self) -> Result<Vec<DeviceHead>, StorageError> {
387            let heads = self.heads.lock().unwrap();
388            Ok(heads
389                .iter()
390                .map(|(id, (seq, snap))| DeviceHead {
391                    device_id: id.clone(),
392                    seq: *seq,
393                    snapshot_seq: *snap,
394                    last_sync: None,
395                })
396                .collect())
397        }
398
399        async fn get_changeset(&self, device_id: &str, seq: u64) -> Result<Vec<u8>, StorageError> {
400            let key = format!("{device_id}/{seq}");
401            let cs = self.changesets.lock().unwrap();
402            cs.get(&key).cloned().ok_or(StorageError::NotFound(key))
403        }
404
405        async fn put_changeset(
406            &self,
407            device_id: &str,
408            seq: u64,
409            data: Vec<u8>,
410        ) -> Result<(), StorageError> {
411            let key = format!("{device_id}/{seq}");
412            self.changesets.lock().unwrap().insert(key, data);
413            Ok(())
414        }
415
416        async fn put_head(
417            &self,
418            device_id: &str,
419            seq: u64,
420            snapshot_seq: Option<u64>,
421            _timestamp: &str,
422        ) -> Result<(), StorageError> {
423            let mut heads = self.heads.lock().unwrap();
424            let entry = heads.entry(device_id.to_string()).or_insert((0, None));
425            entry.0 = seq;
426            if snapshot_seq.is_some() {
427                entry.1 = snapshot_seq;
428            }
429            Ok(())
430        }
431
432        async fn put_blob(
433            &self,
434            _namespace: &str,
435            _id: &str,
436            _scope: crate::blob::BlobScope,
437            _data: Vec<u8>,
438        ) -> Result<(), StorageError> {
439            Ok(())
440        }
441
442        async fn get_blob(
443            &self,
444            namespace: &str,
445            id: &str,
446            _scope: crate::blob::BlobScope,
447        ) -> Result<Vec<u8>, StorageError> {
448            Err(StorageError::NotFound(format!("{namespace}/{id}")))
449        }
450
451        async fn put_snapshot(&self, data: Vec<u8>) -> Result<(), StorageError> {
452            *self.snapshot.lock().unwrap() = Some(data);
453            Ok(())
454        }
455
456        async fn get_snapshot(&self) -> Result<Vec<u8>, StorageError> {
457            self.snapshot
458                .lock()
459                .unwrap()
460                .clone()
461                .ok_or(StorageError::NotFound("snapshot.db.enc".into()))
462        }
463
464        async fn delete_changeset(&self, device_id: &str, seq: u64) -> Result<(), StorageError> {
465            let key = format!("{device_id}/{seq}");
466            self.changesets.lock().unwrap().remove(&key);
467            Ok(())
468        }
469
470        async fn list_changesets(&self, device_id: &str) -> Result<Vec<u64>, StorageError> {
471            let prefix = format!("{device_id}/");
472            let cs = self.changesets.lock().unwrap();
473            let mut seqs: Vec<u64> = cs
474                .keys()
475                .filter_map(|k| k.strip_prefix(&prefix).and_then(|s| s.parse().ok()))
476                .collect();
477            seqs.sort();
478            Ok(seqs)
479        }
480
481        async fn get_min_schema_version(&self) -> Result<Option<u32>, StorageError> {
482            Ok(*self.min_schema_version.lock().unwrap())
483        }
484
485        async fn set_min_schema_version(&self, version: u32) -> Result<(), StorageError> {
486            *self.min_schema_version.lock().unwrap() = Some(version);
487            Ok(())
488        }
489
490        async fn put_membership_entry(
491            &self,
492            _author_pubkey: &str,
493            _seq: u64,
494            _data: Vec<u8>,
495        ) -> Result<(), StorageError> {
496            Ok(())
497        }
498
499        async fn get_membership_entry(
500            &self,
501            author_pubkey: &str,
502            seq: u64,
503        ) -> Result<Vec<u8>, StorageError> {
504            Err(StorageError::NotFound(format!(
505                "membership/{author_pubkey}/{seq}"
506            )))
507        }
508
509        async fn list_membership_entries(&self) -> Result<Vec<(String, u64)>, StorageError> {
510            Ok(vec![])
511        }
512
513        async fn put_wrapped_key(
514            &self,
515            _user_pubkey: &str,
516            _data: Vec<u8>,
517        ) -> Result<(), StorageError> {
518            Ok(())
519        }
520
521        async fn get_wrapped_key(&self, user_pubkey: &str) -> Result<Vec<u8>, StorageError> {
522            Err(StorageError::NotFound(format!("keys/{user_pubkey}")))
523        }
524
525        async fn delete_wrapped_key(&self, _user_pubkey: &str) -> Result<(), StorageError> {
526            Ok(())
527        }
528
529        async fn put_snapshot_meta(&self, data: Vec<u8>) -> Result<(), StorageError> {
530            *self.snapshot_meta.lock().unwrap() = Some(data);
531            Ok(())
532        }
533
534        async fn get_snapshot_meta(&self) -> Result<Vec<u8>, StorageError> {
535            self.snapshot_meta
536                .lock()
537                .unwrap()
538                .clone()
539                .ok_or(StorageError::NotFound("snapshot_meta.json.enc".into()))
540        }
541    }
542
543    fn test_encryption() -> EncryptionService {
544        EncryptionService::new_with_key(&[0x42u8; 32])
545    }
546
547    // ---- should_create_snapshot tests ----
548
549    #[test]
550    fn snapshot_policy_no_previous_snapshot_with_changes() {
551        assert!(should_create_snapshot(1, None, None));
552        assert!(should_create_snapshot(50, None, None));
553    }
554
555    #[test]
556    fn snapshot_policy_no_previous_snapshot_no_changes() {
557        assert!(!should_create_snapshot(0, None, None));
558    }
559
560    #[test]
561    fn snapshot_policy_below_threshold() {
562        // 10 changesets since last snapshot, only 1 hour elapsed.
563        assert!(!should_create_snapshot(60, Some(50), Some(1)));
564    }
565
566    #[test]
567    fn snapshot_policy_changeset_threshold_reached() {
568        // Exactly 100 changesets since snapshot.
569        assert!(should_create_snapshot(150, Some(50), Some(1)));
570        // Over 100.
571        assert!(should_create_snapshot(200, Some(50), Some(1)));
572    }
573
574    #[test]
575    fn snapshot_policy_time_threshold_reached() {
576        // Only 10 changesets but 24+ hours have passed.
577        assert!(should_create_snapshot(60, Some(50), Some(24)));
578        assert!(should_create_snapshot(60, Some(50), Some(48)));
579    }
580
581    #[test]
582    fn snapshot_policy_time_threshold_no_new_changes() {
583        // 24 hours but zero changesets since snapshot.
584        assert!(!should_create_snapshot(50, Some(50), Some(24)));
585    }
586
587    // ---- create_snapshot tests ----
588
589    #[test]
590    fn create_snapshot_produces_encrypted_db() {
591        unsafe {
592            let db = open_memory_db();
593            create_synced_schema(db);
594
595            exec(
596                db,
597                "INSERT INTO notes (id, title, body, _updated_at, created_at) \
598                 VALUES ('n1', 'Note One', NULL, '0000000001000-0000-dev1', '2026-01-01')",
599            );
600
601            let temp = tempfile::tempdir().unwrap();
602            let enc = test_encryption();
603
604            let encrypted =
605                create_snapshot(db, temp.path(), &enc).expect("create_snapshot should succeed");
606
607            // Should be non-empty encrypted bytes.
608            assert!(!encrypted.is_empty());
609
610            // Should be decryptable.
611            let plaintext = enc.decrypt(&encrypted).expect("decrypt should succeed");
612            assert!(!plaintext.is_empty());
613
614            // The plaintext should be a valid SQLite database (starts with "SQLite format 3\0").
615            assert!(
616                plaintext.starts_with(b"SQLite format 3\0"),
617                "snapshot should be a valid SQLite database"
618            );
619
620            ffi::sqlite3_close(db);
621        }
622    }
623
624    #[test]
625    fn create_snapshot_contains_data() {
626        unsafe {
627            let db = open_memory_db();
628            create_synced_schema(db);
629
630            exec(
631                db,
632                "INSERT INTO notes (id, title, _updated_at, created_at) \
633                 VALUES ('a1', 'Artist One', '0000000001000-0000-dev1', '2026-01-01')",
634            );
635            exec(
636                db,
637                "INSERT INTO note_tags (id, tag, note_id, _updated_at, created_at) \
638                 VALUES ('al1', 'Album One', 'a1', '0000000001000-0000-dev1', '2026-01-01')",
639            );
640
641            let temp = tempfile::tempdir().unwrap();
642            let enc = test_encryption();
643
644            let encrypted = create_snapshot(db, temp.path(), &enc).expect("snapshot");
645            let plaintext = enc.decrypt(&encrypted).expect("decrypt");
646
647            // Write to file and open to verify contents.
648            let db_path = temp.path().join("verify.db");
649            std::fs::write(&db_path, &plaintext).unwrap();
650
651            let db2 = {
652                let c_path = CString::new(db_path.to_str().unwrap()).unwrap();
653                let mut ptr: *mut ffi::sqlite3 = std::ptr::null_mut();
654                let rc = ffi::sqlite3_open(c_path.as_ptr(), &mut ptr);
655                assert_eq!(rc, ffi::SQLITE_OK);
656                ptr
657            };
658
659            let name = query_text(db2, "SELECT title FROM notes WHERE id = 'a1'");
660            assert_eq!(name, "Artist One");
661
662            let title = query_text(db2, "SELECT tag FROM note_tags WHERE id = 'al1'");
663            assert_eq!(title, "Album One");
664
665            ffi::sqlite3_close(db2);
666            ffi::sqlite3_close(db);
667        }
668    }
669
670    // ---- push_snapshot tests ----
671
672    #[tokio::test]
673    async fn push_snapshot_uploads_and_updates_head() {
674        let storage = MockSyncStorage::new();
675        // Simulate another device that already has a head.
676        storage
677            .put_head("dev-2", 15, None, "2026-02-10T00:00:00Z")
678            .await
679            .unwrap();
680        let data = vec![1, 2, 3, 4, 5];
681
682        push_snapshot(
683            &storage,
684            data.clone(),
685            "dev-1",
686            42,
687            &crate::clock::SystemClock,
688        )
689        .await
690        .expect("push_snapshot should succeed");
691
692        // Snapshot should be stored.
693        assert_eq!(storage.get_stored_snapshot(), Some(data));
694
695        // Head should be updated with snapshot_seq.
696        let heads = storage.list_heads().await.unwrap();
697        let dev1_head = heads.iter().find(|h| h.device_id == "dev-1").unwrap();
698        assert_eq!(dev1_head.seq, 42);
699        assert_eq!(dev1_head.snapshot_seq, Some(42));
700
701        // Snapshot metadata should contain cursors for both devices.
702        let meta_json = storage
703            .get_stored_snapshot_meta()
704            .expect("metadata should be written");
705        let meta: SnapshotMeta = serde_json::from_slice(&meta_json).unwrap();
706        assert_eq!(meta.cursors.get("dev-1"), Some(&42));
707        assert_eq!(meta.cursors.get("dev-2"), Some(&15));
708        assert_eq!(meta.cursors.len(), 2);
709    }
710
711    // ---- garbage_collect tests ----
712
713    #[tokio::test]
714    async fn gc_deletes_changesets_per_device_cursors() {
715        let storage = MockSyncStorage::new();
716
717        // Device A: changesets 1-5.
718        for seq in 1..=5 {
719            storage.add_changeset("dev-a", seq, vec![seq as u8]);
720        }
721        // Device B: changesets 1-3.
722        for seq in 1..=3 {
723            storage.add_changeset("dev-b", seq, vec![seq as u8]);
724        }
725
726        assert_eq!(storage.changeset_count(), 8);
727
728        // Snapshot metadata: dev-a was at seq 3, dev-b was at seq 2.
729        let meta = SnapshotMeta {
730            cursors: HashMap::from([("dev-a".to_string(), 3), ("dev-b".to_string(), 2)]),
731            created_at: "2026-02-10T00:00:00Z".to_string(),
732        };
733        storage
734            .put_snapshot_meta(serde_json::to_vec(&meta).unwrap())
735            .await
736            .unwrap();
737
738        let result = garbage_collect(&storage).await.expect("gc");
739
740        // dev-a: 1,2,3 deleted (<=3), dev-b: 1,2 deleted (<=2)
741        assert_eq!(result.deleted, 5);
742        assert_eq!(result.errors, 0);
743        assert_eq!(storage.changeset_count(), 3); // dev-a: 4,5 + dev-b: 3
744
745        // Verify remaining changesets.
746        let remaining_a = storage.list_changesets("dev-a").await.unwrap();
747        assert_eq!(remaining_a, vec![4, 5]);
748
749        let remaining_b = storage.list_changesets("dev-b").await.unwrap();
750        assert_eq!(remaining_b, vec![3]);
751    }
752
753    #[tokio::test]
754    async fn gc_with_no_changesets_to_delete() {
755        let storage = MockSyncStorage::new();
756        storage.add_changeset("dev-a", 10, vec![10]);
757
758        // Snapshot metadata says dev-a was at seq 5 -- changeset 10 is newer.
759        let meta = SnapshotMeta {
760            cursors: HashMap::from([("dev-a".to_string(), 5)]),
761            created_at: "2026-02-10T00:00:00Z".to_string(),
762        };
763        storage
764            .put_snapshot_meta(serde_json::to_vec(&meta).unwrap())
765            .await
766            .unwrap();
767
768        let result = garbage_collect(&storage).await.expect("gc");
769
770        assert_eq!(result.deleted, 0);
771        assert_eq!(storage.changeset_count(), 1);
772    }
773
774    #[tokio::test]
775    async fn gc_with_empty_bucket() {
776        let storage = MockSyncStorage::new();
777        // No snapshot metadata -- GC should be a no-op.
778
779        let result = garbage_collect(&storage).await.expect("gc");
780
781        assert_eq!(result.deleted, 0);
782        assert_eq!(result.errors, 0);
783    }
784
785    // ---- bootstrap_from_snapshot tests ----
786
787    #[tokio::test]
788    async fn bootstrap_downloads_decrypts_and_writes_db() {
789        unsafe {
790            // First create a snapshot from a real database.
791            let db = open_memory_db();
792            create_synced_schema(db);
793
794            exec(
795                db,
796                "INSERT INTO notes (id, title, _updated_at, created_at) \
797                 VALUES ('a1', 'Artist One', '0000000001000-0000-dev1', '2026-01-01')",
798            );
799
800            let temp = tempfile::tempdir().unwrap();
801            let enc = test_encryption();
802
803            let encrypted = create_snapshot(db, temp.path(), &enc).expect("snapshot");
804            ffi::sqlite3_close(db);
805
806            // Put snapshot in mock storage with metadata.
807            let storage = MockSyncStorage::new();
808            storage.put_snapshot(encrypted).await.unwrap();
809
810            let meta = SnapshotMeta {
811                cursors: HashMap::from([("dev-1".to_string(), 10), ("dev-2".to_string(), 7)]),
812                created_at: "2026-02-10T00:00:00Z".to_string(),
813            };
814            storage
815                .put_snapshot_meta(serde_json::to_vec(&meta).unwrap())
816                .await
817                .unwrap();
818
819            // Bootstrap a new database.
820            let target = temp.path().join("bootstrapped.db");
821            let result = bootstrap_from_snapshot(&storage, &enc, &target)
822                .await
823                .expect("bootstrap");
824
825            // Should have per-device cursors from metadata.
826            assert_eq!(result.cursors.get("dev-1"), Some(&10));
827            assert_eq!(result.cursors.get("dev-2"), Some(&7));
828            assert_eq!(result.cursors.len(), 2);
829            assert!(target.exists());
830
831            // Open the bootstrapped DB and verify data.
832            let c_path = CString::new(target.to_str().unwrap()).unwrap();
833            let mut db2: *mut ffi::sqlite3 = std::ptr::null_mut();
834            let rc = ffi::sqlite3_open(c_path.as_ptr(), &mut db2);
835            assert_eq!(rc, ffi::SQLITE_OK);
836
837            let name = query_text(db2, "SELECT title FROM notes WHERE id = 'a1'");
838            assert_eq!(name, "Artist One");
839
840            ffi::sqlite3_close(db2);
841        }
842    }
843
844    #[tokio::test]
845    async fn bootstrap_fails_when_no_snapshot_exists() {
846        let storage = MockSyncStorage::new();
847        let enc = test_encryption();
848        let temp = tempfile::tempdir().unwrap();
849        let target = temp.path().join("nope.db");
850
851        let result = bootstrap_from_snapshot(&storage, &enc, &target).await;
852
853        assert!(result.is_err());
854        assert!(!target.exists());
855    }
856
857    // ---- Integration: create, push, bootstrap, verify ----
858
859    #[tokio::test]
860    async fn full_snapshot_round_trip() {
861        unsafe {
862            // Device 1 creates some data.
863            let db = open_memory_db();
864            create_synced_schema(db);
865
866            exec(
867                db,
868                "INSERT INTO notes (id, title, _updated_at, created_at) \
869                 VALUES ('a1', 'Artist One', '0000000001000-0000-dev1', '2026-01-01')",
870            );
871            exec(
872                db,
873                "INSERT INTO note_tags (id, tag, note_id, _updated_at, created_at) \
874                 VALUES ('al1', 'Album One', 'a1', '0000000001000-0000-dev1', '2026-01-01')",
875            );
876
877            let temp = tempfile::tempdir().unwrap();
878            let enc = test_encryption();
879            let storage = MockSyncStorage::new();
880
881            // Create and push snapshot at seq 5.
882            let encrypted = create_snapshot(db, temp.path(), &enc).expect("snapshot");
883            push_snapshot(&storage, encrypted, "dev-1", 5, &crate::clock::SystemClock)
884                .await
885                .expect("push");
886
887            ffi::sqlite3_close(db);
888
889            // Device 2 bootstraps.
890            let target = temp.path().join("device2.db");
891            let result = bootstrap_from_snapshot(&storage, &enc, &target)
892                .await
893                .expect("bootstrap");
894
895            assert_eq!(result.cursors.get("dev-1"), Some(&5));
896
897            // Open and verify.
898            let c_path = CString::new(target.to_str().unwrap()).unwrap();
899            let mut db2: *mut ffi::sqlite3 = std::ptr::null_mut();
900            let rc = ffi::sqlite3_open(c_path.as_ptr(), &mut db2);
901            assert_eq!(rc, ffi::SQLITE_OK);
902
903            let name = query_text(db2, "SELECT title FROM notes WHERE id = 'a1'");
904            assert_eq!(name, "Artist One");
905
906            let title = query_text(db2, "SELECT tag FROM note_tags WHERE id = 'al1'");
907            assert_eq!(title, "Album One");
908
909            // Device 2 can now pull only changesets > per-device cursors.
910            // (Not tested here since pull is already tested in pull_tests.rs.)
911
912            ffi::sqlite3_close(db2);
913        }
914    }
915
916    /// Verify that a snapshot + subsequent changesets produces the same state
917    /// as applying all changesets from scratch.
918    #[tokio::test]
919    async fn snapshot_plus_changesets_equals_full_replay() {
920        unsafe {
921            let enc = test_encryption();
922            let temp = tempfile::tempdir().unwrap();
923
924            // --- Phase 1: create data, snapshot, then more data ---
925
926            let db_source = open_memory_db();
927            create_synced_schema(db_source);
928
929            // Initial data (before snapshot).
930            let session1 = SyncSession::start(db_source).expect("session");
931            exec(
932                db_source,
933                "INSERT INTO notes (id, title, _updated_at, created_at) \
934                 VALUES ('a1', 'Artist One', '0000000001000-0000-dev1', '2026-01-01')",
935            );
936            exec(
937                db_source,
938                "INSERT INTO notes (id, title, _updated_at, created_at) \
939                 VALUES ('a2', 'Artist Two', '0000000002000-0000-dev1', '2026-01-01')",
940            );
941            let cs1 = session1.changeset().unwrap().unwrap();
942            let cs1_bytes = cs1.as_bytes().to_vec();
943            drop(session1);
944
945            // Create snapshot after cs1.
946            let snapshot_encrypted =
947                create_snapshot(db_source, temp.path(), &enc).expect("snapshot");
948
949            // More data after snapshot.
950            let session2 = SyncSession::start(db_source).expect("session2");
951            exec(
952                db_source,
953                "INSERT INTO notes (id, title, _updated_at, created_at) \
954                 VALUES ('a3', 'Artist Three', '0000000003000-0000-dev1', '2026-01-01')",
955            );
956            exec(
957                db_source,
958                "UPDATE notes SET title = 'Artist One Updated' \
959                 WHERE id = 'a1'",
960            );
961            let cs2 = session2.changeset().unwrap().unwrap();
962            let cs2_bytes = cs2.as_bytes().to_vec();
963            drop(session2);
964
965            ffi::sqlite3_close(db_source);
966
967            // --- Path A: bootstrap from snapshot + apply cs2 ---
968
969            let snapshot_plain = enc.decrypt(&snapshot_encrypted).unwrap();
970            let path_a = temp.path().join("path_a.db");
971            std::fs::write(&path_a, &snapshot_plain).unwrap();
972
973            let db_a = {
974                let c = CString::new(path_a.to_str().unwrap()).unwrap();
975                let mut p: *mut ffi::sqlite3 = std::ptr::null_mut();
976                ffi::sqlite3_open(c.as_ptr(), &mut p);
977                p
978            };
979
980            let cs2_obj = crate::sync::session_ext::Changeset::from_bytes(&cs2_bytes);
981            crate::sync::apply::apply_changeset_lww(db_a, &cs2_obj).expect("apply cs2");
982
983            // --- Path B: fresh DB + apply cs1 + apply cs2 ---
984
985            let db_b = open_memory_db();
986            create_synced_schema(db_b);
987
988            let cs1_obj = crate::sync::session_ext::Changeset::from_bytes(&cs1_bytes);
989            crate::sync::apply::apply_changeset_lww(db_b, &cs1_obj).expect("apply cs1");
990
991            let cs2_obj2 = crate::sync::session_ext::Changeset::from_bytes(&cs2_bytes);
992            crate::sync::apply::apply_changeset_lww(db_b, &cs2_obj2).expect("apply cs2");
993
994            // --- Compare: both paths should have identical data ---
995
996            let count_a = query_int(db_a, "SELECT COUNT(*) FROM notes");
997            let count_b = query_int(db_b, "SELECT COUNT(*) FROM notes");
998            assert_eq!(count_a, count_b, "artist count should match");
999            assert_eq!(count_a, 3);
1000
1001            let name_a = query_text(db_a, "SELECT title FROM notes WHERE id = 'a1'");
1002            let name_b = query_text(db_b, "SELECT title FROM notes WHERE id = 'a1'");
1003            assert_eq!(name_a, name_b);
1004            assert_eq!(name_a, "Artist One Updated");
1005
1006            let name_a3 = query_text(db_a, "SELECT title FROM notes WHERE id = 'a3'");
1007            let name_b3 = query_text(db_b, "SELECT title FROM notes WHERE id = 'a3'");
1008            assert_eq!(name_a3, name_b3);
1009            assert_eq!(name_a3, "Artist Three");
1010
1011            ffi::sqlite3_close(db_a);
1012            ffi::sqlite3_close(db_b);
1013        }
1014    }
1015
1016    // ---- new safety tests ----
1017
1018    /// Device A creates snapshot when Device B is at seq 30. Device B later
1019    /// pushes seq 31-35. GC must NOT delete Device B's 31-35.
1020    #[tokio::test]
1021    async fn gc_does_not_delete_post_snapshot_changesets() {
1022        let storage = MockSyncStorage::new();
1023
1024        // Device A: changesets 1-50. Device B: changesets 1-35.
1025        for seq in 1..=50 {
1026            storage.add_changeset("dev-a", seq, vec![seq as u8]);
1027        }
1028        for seq in 1..=35 {
1029            storage.add_changeset("dev-b", seq, vec![seq as u8]);
1030        }
1031
1032        // Snapshot taken when dev-a was at 50, dev-b was at 30.
1033        // (Dev-b pushed 31-35 after the snapshot.)
1034        let meta = SnapshotMeta {
1035            cursors: HashMap::from([("dev-a".to_string(), 50), ("dev-b".to_string(), 30)]),
1036            created_at: "2026-02-10T00:00:00Z".to_string(),
1037        };
1038        storage
1039            .put_snapshot_meta(serde_json::to_vec(&meta).unwrap())
1040            .await
1041            .unwrap();
1042
1043        let result = garbage_collect(&storage).await.expect("gc");
1044
1045        // dev-a: all 50 deleted (<=50), dev-b: 1-30 deleted (<=30)
1046        assert_eq!(result.deleted, 80);
1047        assert_eq!(result.errors, 0);
1048
1049        // dev-b's 31-35 must survive.
1050        let remaining_b = storage.list_changesets("dev-b").await.unwrap();
1051        assert_eq!(remaining_b, vec![31, 32, 33, 34, 35]);
1052
1053        // dev-a has nothing remaining.
1054        let remaining_a = storage.list_changesets("dev-a").await.unwrap();
1055        assert!(remaining_a.is_empty());
1056    }
1057
1058    /// Device C appears after snapshot was created. GC should not touch
1059    /// any of Device C's changesets.
1060    #[tokio::test]
1061    async fn gc_ignores_device_not_in_snapshot_meta() {
1062        let storage = MockSyncStorage::new();
1063
1064        // Device A: changesets 1-5 (present in snapshot).
1065        for seq in 1..=5 {
1066            storage.add_changeset("dev-a", seq, vec![seq as u8]);
1067        }
1068        // Device C: changesets 1-3 (NOT in snapshot metadata).
1069        for seq in 1..=3 {
1070            storage.add_changeset("dev-c", seq, vec![seq as u8]);
1071        }
1072
1073        // Snapshot only knows about dev-a.
1074        let meta = SnapshotMeta {
1075            cursors: HashMap::from([("dev-a".to_string(), 5)]),
1076            created_at: "2026-02-10T00:00:00Z".to_string(),
1077        };
1078        storage
1079            .put_snapshot_meta(serde_json::to_vec(&meta).unwrap())
1080            .await
1081            .unwrap();
1082
1083        let result = garbage_collect(&storage).await.expect("gc");
1084
1085        // Only dev-a's changesets should be deleted.
1086        assert_eq!(result.deleted, 5);
1087        assert_eq!(result.errors, 0);
1088
1089        // dev-c's changesets are untouched.
1090        let remaining_c = storage.list_changesets("dev-c").await.unwrap();
1091        assert_eq!(remaining_c, vec![1, 2, 3]);
1092    }
1093
1094    /// A snapshot blob without its accompanying `snapshot_meta.json.enc`
1095    /// is a torn bucket state (e.g., a previous push failed between the
1096    /// snapshot upload and the metadata upload). Bootstrap must refuse
1097    /// rather than seed cursors from a heuristic on `heads`.
1098    #[tokio::test]
1099    async fn bootstrap_fails_when_snapshot_meta_missing() {
1100        unsafe {
1101            let db = open_memory_db();
1102            create_synced_schema(db);
1103
1104            exec(
1105                db,
1106                "INSERT INTO notes (id, title, _updated_at, created_at) \
1107                 VALUES ('a1', 'Artist One', '0000000001000-0000-dev1', '2026-01-01')",
1108            );
1109
1110            let temp = tempfile::tempdir().unwrap();
1111            let enc = test_encryption();
1112
1113            let encrypted = create_snapshot(db, temp.path(), &enc).expect("snapshot");
1114            ffi::sqlite3_close(db);
1115
1116            // Put snapshot in storage WITHOUT metadata (torn-bucket simulation).
1117            let storage = MockSyncStorage::new();
1118            storage.put_snapshot(encrypted).await.unwrap();
1119            storage
1120                .put_head("dev-1", 20, Some(15), "2026-02-10T00:00:00Z")
1121                .await
1122                .unwrap();
1123
1124            let target = temp.path().join("torn.db");
1125            let err = bootstrap_from_snapshot(&storage, &enc, &target)
1126                .await
1127                .expect_err("bootstrap must refuse torn bucket");
1128            assert!(
1129                matches!(err, SnapshotError::Bucket(StorageError::NotFound(_))),
1130                "expected Bucket(NotFound), got {err:?}",
1131            );
1132            assert!(
1133                !target.exists(),
1134                "no DB should be written when metadata is missing",
1135            );
1136        }
1137    }
1138}