coven/sync/storage.rs
1/// Sync storage: reads/writes to the layout used for changeset sync.
2///
3/// Layout:
4/// ```text
5/// changes/{device_id}/{seq}.enc -- encrypted changeset envelopes
6/// heads/{device_id}.json.enc -- encrypted head pointers
7/// images/{ab}/{cd}/{id} -- encrypted library images
8/// snapshot.db.enc -- full DB snapshot for bootstrapping
9/// snapshot_meta.json.enc -- per-device cursors at snapshot time
10/// membership/{author_pubkey}/{seq}.enc -- encrypted membership entries
11/// keys/{user_pubkey}.enc -- wrapped library keys per member
12/// ```
13///
14/// All data is encrypted before upload and decrypted after download.
15/// The trait is async and mockable for testing.
16use async_trait::async_trait;
17
18/// Per-device head: the latest sequence number for a device.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct DeviceHead {
21 pub device_id: String,
22 pub seq: u64,
23 /// The seq up to which the latest snapshot covers. None if no snapshot
24 /// has been created by this device.
25 pub snapshot_seq: Option<u64>,
26 /// RFC 3339 timestamp of when this head was last updated (i.e., when
27 /// the device last synced). None for heads written before this field
28 /// was added.
29 pub last_sync: Option<String>,
30}
31
32/// Error type for storage operations.
33#[derive(Debug, thiserror::Error)]
34pub enum StorageError {
35 #[error("S3 operation failed: {0}")]
36 S3(String),
37 #[error("object not found: {0}")]
38 NotFound(String),
39 #[error("decryption failed: {0}")]
40 Decryption(String),
41}
42
43impl From<crate::storage::cloud::CloudHomeError> for StorageError {
44 fn from(e: crate::storage::cloud::CloudHomeError) -> Self {
45 match e {
46 crate::storage::cloud::CloudHomeError::NotFound(key) => StorageError::NotFound(key),
47 crate::storage::cloud::CloudHomeError::Storage(msg) => StorageError::S3(msg),
48 crate::storage::cloud::CloudHomeError::Io(io_err) => {
49 StorageError::S3(format!("I/O error: {io_err}"))
50 }
51 }
52 }
53}
54
55#[async_trait]
56pub trait SyncStorage: Send + Sync {
57 /// List all device heads (one LIST call to `heads/`).
58 async fn list_heads(&self) -> Result<Vec<DeviceHead>, StorageError>;
59
60 /// Fetch a single changeset by device_id and seq.
61 ///
62 /// Returns the **decrypted** envelope bytes from `changes/{device_id}/{seq}.enc`.
63 /// Implementations must handle downloading the encrypted blob and decrypting
64 /// it before returning. Callers receive plaintext ready for `envelope::unpack()`.
65 async fn get_changeset(&self, device_id: &str, seq: u64) -> Result<Vec<u8>, StorageError>;
66
67 /// Upload a changeset blob (plaintext — the implementation encrypts it).
68 /// Writes to `changes/{device_id}/{seq}.enc`.
69 async fn put_changeset(
70 &self,
71 device_id: &str,
72 seq: u64,
73 data: Vec<u8>,
74 ) -> Result<(), StorageError>;
75
76 /// Update the head pointer for a device.
77 /// Writes to `heads/{device_id}.json.enc`.
78 /// If `snapshot_seq` is Some, the head records that a snapshot covers
79 /// all changesets up to that seq. `timestamp` is the RFC 3339 time of
80 /// this sync (used by the sync status UI).
81 async fn put_head(
82 &self,
83 device_id: &str,
84 seq: u64,
85 snapshot_seq: Option<u64>,
86 timestamp: &str,
87 ) -> Result<(), StorageError>;
88
89 /// Upload an encrypted blob to `{namespace}/{id[0..2]}/{id[2..4]}/{id}`.
90 /// The plaintext is encrypted with the key selected by `scope`
91 /// (master, or a per-scope derived key).
92 async fn put_blob(
93 &self,
94 namespace: &str,
95 id: &str,
96 scope: crate::blob::BlobScope,
97 data: Vec<u8>,
98 ) -> Result<(), StorageError>;
99
100 /// Download and decrypt a blob from `{namespace}/{id[0..2]}/{id[2..4]}/{id}`,
101 /// using the key selected by `scope`.
102 async fn get_blob(
103 &self,
104 namespace: &str,
105 id: &str,
106 scope: crate::blob::BlobScope,
107 ) -> Result<Vec<u8>, StorageError>;
108
109 /// Upload an encrypted snapshot.
110 /// Writes to `snapshot.db.enc` (overwrites any previous snapshot).
111 async fn put_snapshot(&self, data: Vec<u8>) -> Result<(), StorageError>;
112
113 /// Download the encrypted snapshot.
114 /// Returns bytes from `snapshot.db.enc`.
115 async fn get_snapshot(&self) -> Result<Vec<u8>, StorageError>;
116
117 /// Delete a single changeset from storage.
118 /// Removes `changes/{device_id}/{seq}.enc`.
119 async fn delete_changeset(&self, device_id: &str, seq: u64) -> Result<(), StorageError>;
120
121 /// List all changeset keys for a device.
122 /// Returns the sequence numbers that exist in `changes/{device_id}/`.
123 async fn list_changesets(&self, device_id: &str) -> Result<Vec<u64>, StorageError>;
124
125 /// Get the minimum schema version required to sync with this storage.
126 ///
127 /// Returns `None` if no minimum has been set (backwards compat: any version
128 /// can sync). Reads from `min_schema_version.json.enc`.
129 async fn get_min_schema_version(&self) -> Result<Option<u32>, StorageError>;
130
131 /// Set the minimum schema version required to sync with this storage.
132 ///
133 /// Writes to `min_schema_version.json.enc`. Used when a breaking migration
134 /// bumps the schema and all devices must upgrade before syncing.
135 async fn set_min_schema_version(&self, version: u32) -> Result<(), StorageError>;
136
137 /// Upload a membership entry.
138 /// Writes to `membership/{author_pubkey_hex}/{seq}.enc`.
139 async fn put_membership_entry(
140 &self,
141 author_pubkey: &str,
142 seq: u64,
143 data: Vec<u8>,
144 ) -> Result<(), StorageError>;
145
146 /// Download a membership entry.
147 /// Reads from `membership/{author_pubkey_hex}/{seq}.enc`.
148 async fn get_membership_entry(
149 &self,
150 author_pubkey: &str,
151 seq: u64,
152 ) -> Result<Vec<u8>, StorageError>;
153
154 /// List all membership entry keys.
155 /// Returns tuples of (author_pubkey, seq).
156 async fn list_membership_entries(&self) -> Result<Vec<(String, u64)>, StorageError>;
157
158 /// Upload a wrapped library key for a member.
159 /// Writes to `keys/{user_pubkey_hex}.enc`.
160 async fn put_wrapped_key(&self, user_pubkey: &str, data: Vec<u8>) -> Result<(), StorageError>;
161
162 /// Download a wrapped library key for a member.
163 /// Reads from `keys/{user_pubkey_hex}.enc`.
164 async fn get_wrapped_key(&self, user_pubkey: &str) -> Result<Vec<u8>, StorageError>;
165
166 /// Delete a wrapped library key.
167 /// Removes `keys/{user_pubkey_hex}.enc`.
168 async fn delete_wrapped_key(&self, user_pubkey: &str) -> Result<(), StorageError>;
169
170 /// Upload snapshot metadata (plaintext -- the implementation encrypts it).
171 /// Writes to `snapshot_meta.json.enc`.
172 async fn put_snapshot_meta(&self, data: Vec<u8>) -> Result<(), StorageError>;
173
174 /// Download snapshot metadata (decrypted).
175 /// Reads from `snapshot_meta.json.enc`. Returns NotFound if no metadata exists.
176 async fn get_snapshot_meta(&self) -> Result<Vec<u8>, StorageError>;
177}