1use std::collections::HashMap;
11use std::ffi::CString;
12use std::path::Path;
13
14use libsqlite3_sys as ffi;
15use serde::{Deserialize, Serialize};
16use tracing::{info, warn};
17
18use super::storage::{StorageError, SyncStorage};
19use crate::encryption::EncryptionService;
20
21const SNAPSHOT_CHANGESET_THRESHOLD: u64 = 100;
23
24const SNAPSHOT_HOURS_THRESHOLD: u64 = 24;
26
27#[derive(Debug, thiserror::Error)]
29pub enum SnapshotError {
30 #[error("VACUUM INTO failed: {0}")]
31 VacuumFailed(String),
32 #[error("IO error: {0}")]
33 Io(#[from] std::io::Error),
34 #[error("storage error: {0}")]
35 Bucket(#[from] StorageError),
36 #[error("decryption failed: {0}")]
37 Decryption(String),
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct SnapshotMeta {
43 pub cursors: HashMap<String, u64>,
46 pub created_at: String,
48}
49
50#[derive(Debug)]
52pub struct BootstrapResult {
53 pub cursors: HashMap<String, u64>,
56}
57
58pub unsafe fn create_snapshot(
66 db: *mut ffi::sqlite3,
67 temp_dir: &Path,
68 encryption: &EncryptionService,
69) -> Result<Vec<u8>, SnapshotError> {
70 let snapshot_path = temp_dir.join("snapshot.db");
71 let path_str = snapshot_path
72 .to_str()
73 .expect("temp path should be valid UTF-8");
74
75 let _ = std::fs::remove_file(&snapshot_path);
77
78 let sql = format!("VACUUM INTO '{}'", path_str.replace('\'', "''"));
80 let c_sql = CString::new(sql).expect("SQL should not contain null bytes");
81 let rc = ffi::sqlite3_exec(
82 db,
83 c_sql.as_ptr(),
84 None,
85 std::ptr::null_mut(),
86 std::ptr::null_mut(),
87 );
88 if rc != ffi::SQLITE_OK {
89 let err = ffi::sqlite3_errmsg(db);
90 let msg = if err.is_null() {
91 format!("sqlite3 error code {rc}")
92 } else {
93 std::ffi::CStr::from_ptr(err).to_string_lossy().into_owned()
94 };
95 let _ = std::fs::remove_file(&snapshot_path);
96 return Err(SnapshotError::VacuumFailed(msg));
97 }
98
99 let plaintext = std::fs::read(&snapshot_path)?;
101 let _ = std::fs::remove_file(&snapshot_path);
102
103 let encrypted = encryption.encrypt(&plaintext);
104
105 info!(
106 plaintext_size = plaintext.len(),
107 encrypted_size = encrypted.len(),
108 "created snapshot"
109 );
110
111 Ok(encrypted)
112}
113
114pub async fn push_snapshot(
120 storage: &dyn SyncStorage,
121 encrypted_snapshot: Vec<u8>,
122 device_id: &str,
123 current_seq: u64,
124 clock: &dyn crate::clock::Clock,
125) -> Result<(), SnapshotError> {
126 let size = encrypted_snapshot.len();
127 let timestamp = clock.now().to_rfc3339();
128
129 storage.put_snapshot(encrypted_snapshot).await?;
131
132 let heads = storage.list_heads().await?;
134 let mut cursors: HashMap<String, u64> =
135 heads.iter().map(|h| (h.device_id.clone(), h.seq)).collect();
136 cursors.insert(device_id.to_string(), current_seq);
138
139 let meta = SnapshotMeta {
140 cursors,
141 created_at: timestamp.clone(),
142 };
143 let meta_json =
144 serde_json::to_vec(&meta).map_err(|e| SnapshotError::Io(std::io::Error::other(e)))?;
145
146 storage.put_snapshot_meta(meta_json).await?;
147
148 storage
150 .put_head(device_id, current_seq, Some(current_seq), ×tamp)
151 .await?;
152
153 info!(
154 device_id,
155 snapshot_seq = current_seq,
156 size,
157 "pushed snapshot to sync storage"
158 );
159
160 Ok(())
161}
162
163pub fn should_create_snapshot(
171 local_seq: u64,
172 last_snapshot_seq: Option<u64>,
173 hours_since_snapshot: Option<u64>,
174) -> bool {
175 let Some(snap_seq) = last_snapshot_seq else {
177 return local_seq > 0;
178 };
179
180 let changesets_since = local_seq.saturating_sub(snap_seq);
181 if changesets_since >= SNAPSHOT_CHANGESET_THRESHOLD {
182 return true;
183 }
184
185 if let Some(hours) = hours_since_snapshot {
186 if hours >= SNAPSHOT_HOURS_THRESHOLD && changesets_since > 0 {
187 return true;
188 }
189 }
190
191 false
192}
193
194pub async fn garbage_collect(storage: &dyn SyncStorage) -> Result<GcResult, SnapshotError> {
204 let meta_json = match storage.get_snapshot_meta().await {
206 Ok(data) => data,
207 Err(StorageError::NotFound(_)) => {
208 info!("no snapshot metadata found, skipping GC");
210 return Ok(GcResult {
211 deleted: 0,
212 errors: 0,
213 });
214 }
215 Err(e) => return Err(SnapshotError::Bucket(e)),
216 };
217
218 let meta: SnapshotMeta = serde_json::from_slice(&meta_json)
219 .map_err(|e| SnapshotError::Io(std::io::Error::other(e)))?;
220
221 let heads = storage.list_heads().await?;
222 let mut deleted = 0u64;
223 let mut errors = 0u64;
224
225 for head in &heads {
226 let safe_seq = match meta.cursors.get(&head.device_id) {
228 Some(&seq) => seq,
229 None => continue, };
231
232 let seqs = match storage.list_changesets(&head.device_id).await {
233 Ok(s) => s,
234 Err(e) => {
235 warn!(
236 device_id = %head.device_id,
237 error = %e,
238 "failed to list changesets for GC, skipping device"
239 );
240 errors += 1;
241 continue;
242 }
243 };
244
245 for seq in seqs {
246 if seq > safe_seq {
247 continue;
248 }
249
250 match storage.delete_changeset(&head.device_id, seq).await {
251 Ok(()) => deleted += 1,
252 Err(e) => {
253 warn!(
254 device_id = %head.device_id,
255 seq,
256 error = %e,
257 "failed to delete changeset during GC"
258 );
259 errors += 1;
260 }
261 }
262 }
263 }
264
265 info!(deleted, errors, "garbage collection complete");
266
267 Ok(GcResult { deleted, errors })
268}
269
270#[derive(Debug, PartialEq, Eq)]
272pub struct GcResult {
273 pub deleted: u64,
275 pub errors: u64,
277}
278
279pub async fn bootstrap_from_snapshot(
288 storage: &dyn SyncStorage,
289 encryption: &EncryptionService,
290 target_path: &Path,
291) -> Result<BootstrapResult, SnapshotError> {
292 let meta_json = storage
299 .get_snapshot_meta()
300 .await
301 .map_err(SnapshotError::Bucket)?;
302 let meta: SnapshotMeta = serde_json::from_slice(&meta_json)
303 .map_err(|e| SnapshotError::Io(std::io::Error::other(e)))?;
304 let cursors = meta.cursors;
305
306 let encrypted = storage.get_snapshot().await?;
307 let plaintext = encryption
308 .decrypt(&encrypted)
309 .map_err(|e| SnapshotError::Decryption(e.to_string()))?;
310
311 if let Some(parent) = target_path.parent() {
312 std::fs::create_dir_all(parent)?;
313 }
314 std::fs::write(target_path, &plaintext)?;
315
316 info!(
317 num_devices = cursors.len(),
318 db_size = plaintext.len(),
319 path = %target_path.display(),
320 "bootstrapped from snapshot"
321 );
322
323 Ok(BootstrapResult { cursors })
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329 use crate::sync::session::SyncSession;
330 use crate::sync::storage::DeviceHead;
331 use crate::sync::test_helpers::*;
332 use async_trait::async_trait;
333 use std::collections::HashMap;
334 use std::sync::Mutex;
335
336 struct MockSyncStorage {
338 changesets: Mutex<HashMap<String, Vec<u8>>>,
339 heads: Mutex<HashMap<String, (u64, Option<u64>)>>,
340 snapshot: Mutex<Option<Vec<u8>>>,
341 snapshot_meta: Mutex<Option<Vec<u8>>>,
342 min_schema_version: Mutex<Option<u32>>,
343 }
344
345 impl MockSyncStorage {
346 fn new() -> Self {
347 MockSyncStorage {
348 changesets: Mutex::new(HashMap::new()),
349 heads: Mutex::new(HashMap::new()),
350 snapshot: Mutex::new(None),
351 snapshot_meta: Mutex::new(None),
352 min_schema_version: Mutex::new(None),
353 }
354 }
355
356 fn add_changeset(&self, device_id: &str, seq: u64, data: Vec<u8>) {
358 let key = format!("{device_id}/{seq}");
359 self.changesets.lock().unwrap().insert(key, data);
360
361 let mut heads = self.heads.lock().unwrap();
362 let entry = heads.entry(device_id.to_string()).or_insert((0, None));
363 if seq > entry.0 {
364 entry.0 = seq;
365 }
366 }
367
368 fn changeset_count(&self) -> usize {
370 self.changesets.lock().unwrap().len()
371 }
372
373 fn get_stored_snapshot(&self) -> Option<Vec<u8>> {
375 self.snapshot.lock().unwrap().clone()
376 }
377
378 fn get_stored_snapshot_meta(&self) -> Option<Vec<u8>> {
380 self.snapshot_meta.lock().unwrap().clone()
381 }
382 }
383
384 #[async_trait]
385 impl SyncStorage for MockSyncStorage {
386 async fn list_heads(&self) -> Result<Vec<DeviceHead>, StorageError> {
387 let heads = self.heads.lock().unwrap();
388 Ok(heads
389 .iter()
390 .map(|(id, (seq, snap))| DeviceHead {
391 device_id: id.clone(),
392 seq: *seq,
393 snapshot_seq: *snap,
394 last_sync: None,
395 })
396 .collect())
397 }
398
399 async fn get_changeset(&self, device_id: &str, seq: u64) -> Result<Vec<u8>, StorageError> {
400 let key = format!("{device_id}/{seq}");
401 let cs = self.changesets.lock().unwrap();
402 cs.get(&key).cloned().ok_or(StorageError::NotFound(key))
403 }
404
405 async fn put_changeset(
406 &self,
407 device_id: &str,
408 seq: u64,
409 data: Vec<u8>,
410 ) -> Result<(), StorageError> {
411 let key = format!("{device_id}/{seq}");
412 self.changesets.lock().unwrap().insert(key, data);
413 Ok(())
414 }
415
416 async fn put_head(
417 &self,
418 device_id: &str,
419 seq: u64,
420 snapshot_seq: Option<u64>,
421 _timestamp: &str,
422 ) -> Result<(), StorageError> {
423 let mut heads = self.heads.lock().unwrap();
424 let entry = heads.entry(device_id.to_string()).or_insert((0, None));
425 entry.0 = seq;
426 if snapshot_seq.is_some() {
427 entry.1 = snapshot_seq;
428 }
429 Ok(())
430 }
431
432 async fn put_blob(
433 &self,
434 _namespace: &str,
435 _id: &str,
436 _scope: crate::blob::BlobScope,
437 _data: Vec<u8>,
438 ) -> Result<(), StorageError> {
439 Ok(())
440 }
441
442 async fn get_blob(
443 &self,
444 namespace: &str,
445 id: &str,
446 _scope: crate::blob::BlobScope,
447 ) -> Result<Vec<u8>, StorageError> {
448 Err(StorageError::NotFound(format!("{namespace}/{id}")))
449 }
450
451 async fn put_snapshot(&self, data: Vec<u8>) -> Result<(), StorageError> {
452 *self.snapshot.lock().unwrap() = Some(data);
453 Ok(())
454 }
455
456 async fn get_snapshot(&self) -> Result<Vec<u8>, StorageError> {
457 self.snapshot
458 .lock()
459 .unwrap()
460 .clone()
461 .ok_or(StorageError::NotFound("snapshot.db.enc".into()))
462 }
463
464 async fn delete_changeset(&self, device_id: &str, seq: u64) -> Result<(), StorageError> {
465 let key = format!("{device_id}/{seq}");
466 self.changesets.lock().unwrap().remove(&key);
467 Ok(())
468 }
469
470 async fn list_changesets(&self, device_id: &str) -> Result<Vec<u64>, StorageError> {
471 let prefix = format!("{device_id}/");
472 let cs = self.changesets.lock().unwrap();
473 let mut seqs: Vec<u64> = cs
474 .keys()
475 .filter_map(|k| k.strip_prefix(&prefix).and_then(|s| s.parse().ok()))
476 .collect();
477 seqs.sort();
478 Ok(seqs)
479 }
480
481 async fn get_min_schema_version(&self) -> Result<Option<u32>, StorageError> {
482 Ok(*self.min_schema_version.lock().unwrap())
483 }
484
485 async fn set_min_schema_version(&self, version: u32) -> Result<(), StorageError> {
486 *self.min_schema_version.lock().unwrap() = Some(version);
487 Ok(())
488 }
489
490 async fn put_membership_entry(
491 &self,
492 _author_pubkey: &str,
493 _seq: u64,
494 _data: Vec<u8>,
495 ) -> Result<(), StorageError> {
496 Ok(())
497 }
498
499 async fn get_membership_entry(
500 &self,
501 author_pubkey: &str,
502 seq: u64,
503 ) -> Result<Vec<u8>, StorageError> {
504 Err(StorageError::NotFound(format!(
505 "membership/{author_pubkey}/{seq}"
506 )))
507 }
508
509 async fn list_membership_entries(&self) -> Result<Vec<(String, u64)>, StorageError> {
510 Ok(vec![])
511 }
512
513 async fn put_wrapped_key(
514 &self,
515 _user_pubkey: &str,
516 _data: Vec<u8>,
517 ) -> Result<(), StorageError> {
518 Ok(())
519 }
520
521 async fn get_wrapped_key(&self, user_pubkey: &str) -> Result<Vec<u8>, StorageError> {
522 Err(StorageError::NotFound(format!("keys/{user_pubkey}")))
523 }
524
525 async fn delete_wrapped_key(&self, _user_pubkey: &str) -> Result<(), StorageError> {
526 Ok(())
527 }
528
529 async fn put_snapshot_meta(&self, data: Vec<u8>) -> Result<(), StorageError> {
530 *self.snapshot_meta.lock().unwrap() = Some(data);
531 Ok(())
532 }
533
534 async fn get_snapshot_meta(&self) -> Result<Vec<u8>, StorageError> {
535 self.snapshot_meta
536 .lock()
537 .unwrap()
538 .clone()
539 .ok_or(StorageError::NotFound("snapshot_meta.json.enc".into()))
540 }
541 }
542
543 fn test_encryption() -> EncryptionService {
544 EncryptionService::new_with_key(&[0x42u8; 32])
545 }
546
547 #[test]
550 fn snapshot_policy_no_previous_snapshot_with_changes() {
551 assert!(should_create_snapshot(1, None, None));
552 assert!(should_create_snapshot(50, None, None));
553 }
554
555 #[test]
556 fn snapshot_policy_no_previous_snapshot_no_changes() {
557 assert!(!should_create_snapshot(0, None, None));
558 }
559
560 #[test]
561 fn snapshot_policy_below_threshold() {
562 assert!(!should_create_snapshot(60, Some(50), Some(1)));
564 }
565
566 #[test]
567 fn snapshot_policy_changeset_threshold_reached() {
568 assert!(should_create_snapshot(150, Some(50), Some(1)));
570 assert!(should_create_snapshot(200, Some(50), Some(1)));
572 }
573
574 #[test]
575 fn snapshot_policy_time_threshold_reached() {
576 assert!(should_create_snapshot(60, Some(50), Some(24)));
578 assert!(should_create_snapshot(60, Some(50), Some(48)));
579 }
580
581 #[test]
582 fn snapshot_policy_time_threshold_no_new_changes() {
583 assert!(!should_create_snapshot(50, Some(50), Some(24)));
585 }
586
587 #[test]
590 fn create_snapshot_produces_encrypted_db() {
591 unsafe {
592 let db = open_memory_db();
593 create_synced_schema(db);
594
595 exec(
596 db,
597 "INSERT INTO notes (id, title, body, _updated_at, created_at) \
598 VALUES ('n1', 'Note One', NULL, '0000000001000-0000-dev1', '2026-01-01')",
599 );
600
601 let temp = tempfile::tempdir().unwrap();
602 let enc = test_encryption();
603
604 let encrypted =
605 create_snapshot(db, temp.path(), &enc).expect("create_snapshot should succeed");
606
607 assert!(!encrypted.is_empty());
609
610 let plaintext = enc.decrypt(&encrypted).expect("decrypt should succeed");
612 assert!(!plaintext.is_empty());
613
614 assert!(
616 plaintext.starts_with(b"SQLite format 3\0"),
617 "snapshot should be a valid SQLite database"
618 );
619
620 ffi::sqlite3_close(db);
621 }
622 }
623
624 #[test]
625 fn create_snapshot_contains_data() {
626 unsafe {
627 let db = open_memory_db();
628 create_synced_schema(db);
629
630 exec(
631 db,
632 "INSERT INTO notes (id, title, _updated_at, created_at) \
633 VALUES ('a1', 'Artist One', '0000000001000-0000-dev1', '2026-01-01')",
634 );
635 exec(
636 db,
637 "INSERT INTO note_tags (id, tag, note_id, _updated_at, created_at) \
638 VALUES ('al1', 'Album One', 'a1', '0000000001000-0000-dev1', '2026-01-01')",
639 );
640
641 let temp = tempfile::tempdir().unwrap();
642 let enc = test_encryption();
643
644 let encrypted = create_snapshot(db, temp.path(), &enc).expect("snapshot");
645 let plaintext = enc.decrypt(&encrypted).expect("decrypt");
646
647 let db_path = temp.path().join("verify.db");
649 std::fs::write(&db_path, &plaintext).unwrap();
650
651 let db2 = {
652 let c_path = CString::new(db_path.to_str().unwrap()).unwrap();
653 let mut ptr: *mut ffi::sqlite3 = std::ptr::null_mut();
654 let rc = ffi::sqlite3_open(c_path.as_ptr(), &mut ptr);
655 assert_eq!(rc, ffi::SQLITE_OK);
656 ptr
657 };
658
659 let name = query_text(db2, "SELECT title FROM notes WHERE id = 'a1'");
660 assert_eq!(name, "Artist One");
661
662 let title = query_text(db2, "SELECT tag FROM note_tags WHERE id = 'al1'");
663 assert_eq!(title, "Album One");
664
665 ffi::sqlite3_close(db2);
666 ffi::sqlite3_close(db);
667 }
668 }
669
670 #[tokio::test]
673 async fn push_snapshot_uploads_and_updates_head() {
674 let storage = MockSyncStorage::new();
675 storage
677 .put_head("dev-2", 15, None, "2026-02-10T00:00:00Z")
678 .await
679 .unwrap();
680 let data = vec![1, 2, 3, 4, 5];
681
682 push_snapshot(
683 &storage,
684 data.clone(),
685 "dev-1",
686 42,
687 &crate::clock::SystemClock,
688 )
689 .await
690 .expect("push_snapshot should succeed");
691
692 assert_eq!(storage.get_stored_snapshot(), Some(data));
694
695 let heads = storage.list_heads().await.unwrap();
697 let dev1_head = heads.iter().find(|h| h.device_id == "dev-1").unwrap();
698 assert_eq!(dev1_head.seq, 42);
699 assert_eq!(dev1_head.snapshot_seq, Some(42));
700
701 let meta_json = storage
703 .get_stored_snapshot_meta()
704 .expect("metadata should be written");
705 let meta: SnapshotMeta = serde_json::from_slice(&meta_json).unwrap();
706 assert_eq!(meta.cursors.get("dev-1"), Some(&42));
707 assert_eq!(meta.cursors.get("dev-2"), Some(&15));
708 assert_eq!(meta.cursors.len(), 2);
709 }
710
711 #[tokio::test]
714 async fn gc_deletes_changesets_per_device_cursors() {
715 let storage = MockSyncStorage::new();
716
717 for seq in 1..=5 {
719 storage.add_changeset("dev-a", seq, vec![seq as u8]);
720 }
721 for seq in 1..=3 {
723 storage.add_changeset("dev-b", seq, vec![seq as u8]);
724 }
725
726 assert_eq!(storage.changeset_count(), 8);
727
728 let meta = SnapshotMeta {
730 cursors: HashMap::from([("dev-a".to_string(), 3), ("dev-b".to_string(), 2)]),
731 created_at: "2026-02-10T00:00:00Z".to_string(),
732 };
733 storage
734 .put_snapshot_meta(serde_json::to_vec(&meta).unwrap())
735 .await
736 .unwrap();
737
738 let result = garbage_collect(&storage).await.expect("gc");
739
740 assert_eq!(result.deleted, 5);
742 assert_eq!(result.errors, 0);
743 assert_eq!(storage.changeset_count(), 3); let remaining_a = storage.list_changesets("dev-a").await.unwrap();
747 assert_eq!(remaining_a, vec![4, 5]);
748
749 let remaining_b = storage.list_changesets("dev-b").await.unwrap();
750 assert_eq!(remaining_b, vec![3]);
751 }
752
753 #[tokio::test]
754 async fn gc_with_no_changesets_to_delete() {
755 let storage = MockSyncStorage::new();
756 storage.add_changeset("dev-a", 10, vec![10]);
757
758 let meta = SnapshotMeta {
760 cursors: HashMap::from([("dev-a".to_string(), 5)]),
761 created_at: "2026-02-10T00:00:00Z".to_string(),
762 };
763 storage
764 .put_snapshot_meta(serde_json::to_vec(&meta).unwrap())
765 .await
766 .unwrap();
767
768 let result = garbage_collect(&storage).await.expect("gc");
769
770 assert_eq!(result.deleted, 0);
771 assert_eq!(storage.changeset_count(), 1);
772 }
773
774 #[tokio::test]
775 async fn gc_with_empty_bucket() {
776 let storage = MockSyncStorage::new();
777 let result = garbage_collect(&storage).await.expect("gc");
780
781 assert_eq!(result.deleted, 0);
782 assert_eq!(result.errors, 0);
783 }
784
785 #[tokio::test]
788 async fn bootstrap_downloads_decrypts_and_writes_db() {
789 unsafe {
790 let db = open_memory_db();
792 create_synced_schema(db);
793
794 exec(
795 db,
796 "INSERT INTO notes (id, title, _updated_at, created_at) \
797 VALUES ('a1', 'Artist One', '0000000001000-0000-dev1', '2026-01-01')",
798 );
799
800 let temp = tempfile::tempdir().unwrap();
801 let enc = test_encryption();
802
803 let encrypted = create_snapshot(db, temp.path(), &enc).expect("snapshot");
804 ffi::sqlite3_close(db);
805
806 let storage = MockSyncStorage::new();
808 storage.put_snapshot(encrypted).await.unwrap();
809
810 let meta = SnapshotMeta {
811 cursors: HashMap::from([("dev-1".to_string(), 10), ("dev-2".to_string(), 7)]),
812 created_at: "2026-02-10T00:00:00Z".to_string(),
813 };
814 storage
815 .put_snapshot_meta(serde_json::to_vec(&meta).unwrap())
816 .await
817 .unwrap();
818
819 let target = temp.path().join("bootstrapped.db");
821 let result = bootstrap_from_snapshot(&storage, &enc, &target)
822 .await
823 .expect("bootstrap");
824
825 assert_eq!(result.cursors.get("dev-1"), Some(&10));
827 assert_eq!(result.cursors.get("dev-2"), Some(&7));
828 assert_eq!(result.cursors.len(), 2);
829 assert!(target.exists());
830
831 let c_path = CString::new(target.to_str().unwrap()).unwrap();
833 let mut db2: *mut ffi::sqlite3 = std::ptr::null_mut();
834 let rc = ffi::sqlite3_open(c_path.as_ptr(), &mut db2);
835 assert_eq!(rc, ffi::SQLITE_OK);
836
837 let name = query_text(db2, "SELECT title FROM notes WHERE id = 'a1'");
838 assert_eq!(name, "Artist One");
839
840 ffi::sqlite3_close(db2);
841 }
842 }
843
844 #[tokio::test]
845 async fn bootstrap_fails_when_no_snapshot_exists() {
846 let storage = MockSyncStorage::new();
847 let enc = test_encryption();
848 let temp = tempfile::tempdir().unwrap();
849 let target = temp.path().join("nope.db");
850
851 let result = bootstrap_from_snapshot(&storage, &enc, &target).await;
852
853 assert!(result.is_err());
854 assert!(!target.exists());
855 }
856
857 #[tokio::test]
860 async fn full_snapshot_round_trip() {
861 unsafe {
862 let db = open_memory_db();
864 create_synced_schema(db);
865
866 exec(
867 db,
868 "INSERT INTO notes (id, title, _updated_at, created_at) \
869 VALUES ('a1', 'Artist One', '0000000001000-0000-dev1', '2026-01-01')",
870 );
871 exec(
872 db,
873 "INSERT INTO note_tags (id, tag, note_id, _updated_at, created_at) \
874 VALUES ('al1', 'Album One', 'a1', '0000000001000-0000-dev1', '2026-01-01')",
875 );
876
877 let temp = tempfile::tempdir().unwrap();
878 let enc = test_encryption();
879 let storage = MockSyncStorage::new();
880
881 let encrypted = create_snapshot(db, temp.path(), &enc).expect("snapshot");
883 push_snapshot(&storage, encrypted, "dev-1", 5, &crate::clock::SystemClock)
884 .await
885 .expect("push");
886
887 ffi::sqlite3_close(db);
888
889 let target = temp.path().join("device2.db");
891 let result = bootstrap_from_snapshot(&storage, &enc, &target)
892 .await
893 .expect("bootstrap");
894
895 assert_eq!(result.cursors.get("dev-1"), Some(&5));
896
897 let c_path = CString::new(target.to_str().unwrap()).unwrap();
899 let mut db2: *mut ffi::sqlite3 = std::ptr::null_mut();
900 let rc = ffi::sqlite3_open(c_path.as_ptr(), &mut db2);
901 assert_eq!(rc, ffi::SQLITE_OK);
902
903 let name = query_text(db2, "SELECT title FROM notes WHERE id = 'a1'");
904 assert_eq!(name, "Artist One");
905
906 let title = query_text(db2, "SELECT tag FROM note_tags WHERE id = 'al1'");
907 assert_eq!(title, "Album One");
908
909 ffi::sqlite3_close(db2);
913 }
914 }
915
916 #[tokio::test]
919 async fn snapshot_plus_changesets_equals_full_replay() {
920 unsafe {
921 let enc = test_encryption();
922 let temp = tempfile::tempdir().unwrap();
923
924 let db_source = open_memory_db();
927 create_synced_schema(db_source);
928
929 let session1 = SyncSession::start(db_source).expect("session");
931 exec(
932 db_source,
933 "INSERT INTO notes (id, title, _updated_at, created_at) \
934 VALUES ('a1', 'Artist One', '0000000001000-0000-dev1', '2026-01-01')",
935 );
936 exec(
937 db_source,
938 "INSERT INTO notes (id, title, _updated_at, created_at) \
939 VALUES ('a2', 'Artist Two', '0000000002000-0000-dev1', '2026-01-01')",
940 );
941 let cs1 = session1.changeset().unwrap().unwrap();
942 let cs1_bytes = cs1.as_bytes().to_vec();
943 drop(session1);
944
945 let snapshot_encrypted =
947 create_snapshot(db_source, temp.path(), &enc).expect("snapshot");
948
949 let session2 = SyncSession::start(db_source).expect("session2");
951 exec(
952 db_source,
953 "INSERT INTO notes (id, title, _updated_at, created_at) \
954 VALUES ('a3', 'Artist Three', '0000000003000-0000-dev1', '2026-01-01')",
955 );
956 exec(
957 db_source,
958 "UPDATE notes SET title = 'Artist One Updated' \
959 WHERE id = 'a1'",
960 );
961 let cs2 = session2.changeset().unwrap().unwrap();
962 let cs2_bytes = cs2.as_bytes().to_vec();
963 drop(session2);
964
965 ffi::sqlite3_close(db_source);
966
967 let snapshot_plain = enc.decrypt(&snapshot_encrypted).unwrap();
970 let path_a = temp.path().join("path_a.db");
971 std::fs::write(&path_a, &snapshot_plain).unwrap();
972
973 let db_a = {
974 let c = CString::new(path_a.to_str().unwrap()).unwrap();
975 let mut p: *mut ffi::sqlite3 = std::ptr::null_mut();
976 ffi::sqlite3_open(c.as_ptr(), &mut p);
977 p
978 };
979
980 let cs2_obj = crate::sync::session_ext::Changeset::from_bytes(&cs2_bytes);
981 crate::sync::apply::apply_changeset_lww(db_a, &cs2_obj).expect("apply cs2");
982
983 let db_b = open_memory_db();
986 create_synced_schema(db_b);
987
988 let cs1_obj = crate::sync::session_ext::Changeset::from_bytes(&cs1_bytes);
989 crate::sync::apply::apply_changeset_lww(db_b, &cs1_obj).expect("apply cs1");
990
991 let cs2_obj2 = crate::sync::session_ext::Changeset::from_bytes(&cs2_bytes);
992 crate::sync::apply::apply_changeset_lww(db_b, &cs2_obj2).expect("apply cs2");
993
994 let count_a = query_int(db_a, "SELECT COUNT(*) FROM notes");
997 let count_b = query_int(db_b, "SELECT COUNT(*) FROM notes");
998 assert_eq!(count_a, count_b, "artist count should match");
999 assert_eq!(count_a, 3);
1000
1001 let name_a = query_text(db_a, "SELECT title FROM notes WHERE id = 'a1'");
1002 let name_b = query_text(db_b, "SELECT title FROM notes WHERE id = 'a1'");
1003 assert_eq!(name_a, name_b);
1004 assert_eq!(name_a, "Artist One Updated");
1005
1006 let name_a3 = query_text(db_a, "SELECT title FROM notes WHERE id = 'a3'");
1007 let name_b3 = query_text(db_b, "SELECT title FROM notes WHERE id = 'a3'");
1008 assert_eq!(name_a3, name_b3);
1009 assert_eq!(name_a3, "Artist Three");
1010
1011 ffi::sqlite3_close(db_a);
1012 ffi::sqlite3_close(db_b);
1013 }
1014 }
1015
1016 #[tokio::test]
1021 async fn gc_does_not_delete_post_snapshot_changesets() {
1022 let storage = MockSyncStorage::new();
1023
1024 for seq in 1..=50 {
1026 storage.add_changeset("dev-a", seq, vec![seq as u8]);
1027 }
1028 for seq in 1..=35 {
1029 storage.add_changeset("dev-b", seq, vec![seq as u8]);
1030 }
1031
1032 let meta = SnapshotMeta {
1035 cursors: HashMap::from([("dev-a".to_string(), 50), ("dev-b".to_string(), 30)]),
1036 created_at: "2026-02-10T00:00:00Z".to_string(),
1037 };
1038 storage
1039 .put_snapshot_meta(serde_json::to_vec(&meta).unwrap())
1040 .await
1041 .unwrap();
1042
1043 let result = garbage_collect(&storage).await.expect("gc");
1044
1045 assert_eq!(result.deleted, 80);
1047 assert_eq!(result.errors, 0);
1048
1049 let remaining_b = storage.list_changesets("dev-b").await.unwrap();
1051 assert_eq!(remaining_b, vec![31, 32, 33, 34, 35]);
1052
1053 let remaining_a = storage.list_changesets("dev-a").await.unwrap();
1055 assert!(remaining_a.is_empty());
1056 }
1057
1058 #[tokio::test]
1061 async fn gc_ignores_device_not_in_snapshot_meta() {
1062 let storage = MockSyncStorage::new();
1063
1064 for seq in 1..=5 {
1066 storage.add_changeset("dev-a", seq, vec![seq as u8]);
1067 }
1068 for seq in 1..=3 {
1070 storage.add_changeset("dev-c", seq, vec![seq as u8]);
1071 }
1072
1073 let meta = SnapshotMeta {
1075 cursors: HashMap::from([("dev-a".to_string(), 5)]),
1076 created_at: "2026-02-10T00:00:00Z".to_string(),
1077 };
1078 storage
1079 .put_snapshot_meta(serde_json::to_vec(&meta).unwrap())
1080 .await
1081 .unwrap();
1082
1083 let result = garbage_collect(&storage).await.expect("gc");
1084
1085 assert_eq!(result.deleted, 5);
1087 assert_eq!(result.errors, 0);
1088
1089 let remaining_c = storage.list_changesets("dev-c").await.unwrap();
1091 assert_eq!(remaining_c, vec![1, 2, 3]);
1092 }
1093
1094 #[tokio::test]
1099 async fn bootstrap_fails_when_snapshot_meta_missing() {
1100 unsafe {
1101 let db = open_memory_db();
1102 create_synced_schema(db);
1103
1104 exec(
1105 db,
1106 "INSERT INTO notes (id, title, _updated_at, created_at) \
1107 VALUES ('a1', 'Artist One', '0000000001000-0000-dev1', '2026-01-01')",
1108 );
1109
1110 let temp = tempfile::tempdir().unwrap();
1111 let enc = test_encryption();
1112
1113 let encrypted = create_snapshot(db, temp.path(), &enc).expect("snapshot");
1114 ffi::sqlite3_close(db);
1115
1116 let storage = MockSyncStorage::new();
1118 storage.put_snapshot(encrypted).await.unwrap();
1119 storage
1120 .put_head("dev-1", 20, Some(15), "2026-02-10T00:00:00Z")
1121 .await
1122 .unwrap();
1123
1124 let target = temp.path().join("torn.db");
1125 let err = bootstrap_from_snapshot(&storage, &enc, &target)
1126 .await
1127 .expect_err("bootstrap must refuse torn bucket");
1128 assert!(
1129 matches!(err, SnapshotError::Bucket(StorageError::NotFound(_))),
1130 "expected Bucket(NotFound), got {err:?}",
1131 );
1132 assert!(
1133 !target.exists(),
1134 "no DB should be written when metadata is missing",
1135 );
1136 }
1137 }
1138}