coven/sync/
storage.rs

1/// Sync storage: reads/writes to the layout used for changeset sync.
2///
3/// Layout:
4/// ```text
5/// changes/{device_id}/{seq}.enc          -- encrypted changeset envelopes
6/// heads/{device_id}.json.enc             -- encrypted head pointers
7/// images/{ab}/{cd}/{id}                  -- encrypted library images
8/// snapshot.db.enc                        -- full DB snapshot for bootstrapping
9/// snapshot_meta.json.enc                 -- per-device cursors at snapshot time
10/// membership/{author_pubkey}/{seq}.enc   -- encrypted membership entries
11/// keys/{user_pubkey}.enc                 -- wrapped library keys per member
12/// ```
13///
14/// All data is encrypted before upload and decrypted after download.
15/// The trait is async and mockable for testing.
16use async_trait::async_trait;
17
18/// Per-device head: the latest sequence number for a device.
19#[derive(Debug, Clone, PartialEq, Eq)]
20pub struct DeviceHead {
21    pub device_id: String,
22    pub seq: u64,
23    /// The seq up to which the latest snapshot covers. None if no snapshot
24    /// has been created by this device.
25    pub snapshot_seq: Option<u64>,
26    /// RFC 3339 timestamp of when this head was last updated (i.e., when
27    /// the device last synced). None for heads written before this field
28    /// was added.
29    pub last_sync: Option<String>,
30}
31
32/// Error type for storage operations.
33#[derive(Debug, thiserror::Error)]
34pub enum StorageError {
35    #[error("S3 operation failed: {0}")]
36    S3(String),
37    #[error("object not found: {0}")]
38    NotFound(String),
39    #[error("decryption failed: {0}")]
40    Decryption(String),
41}
42
43impl From<crate::storage::cloud::CloudHomeError> for StorageError {
44    fn from(e: crate::storage::cloud::CloudHomeError) -> Self {
45        match e {
46            crate::storage::cloud::CloudHomeError::NotFound(key) => StorageError::NotFound(key),
47            crate::storage::cloud::CloudHomeError::Storage(msg) => StorageError::S3(msg),
48            crate::storage::cloud::CloudHomeError::Io(io_err) => {
49                StorageError::S3(format!("I/O error: {io_err}"))
50            }
51        }
52    }
53}
54
55#[async_trait]
56pub trait SyncStorage: Send + Sync {
57    /// List all device heads (one LIST call to `heads/`).
58    async fn list_heads(&self) -> Result<Vec<DeviceHead>, StorageError>;
59
60    /// Fetch a single changeset by device_id and seq.
61    ///
62    /// Returns the **decrypted** envelope bytes from `changes/{device_id}/{seq}.enc`.
63    /// Implementations must handle downloading the encrypted blob and decrypting
64    /// it before returning. Callers receive plaintext ready for `envelope::unpack()`.
65    async fn get_changeset(&self, device_id: &str, seq: u64) -> Result<Vec<u8>, StorageError>;
66
67    /// Upload a changeset blob (plaintext — the implementation encrypts it).
68    /// Writes to `changes/{device_id}/{seq}.enc`.
69    async fn put_changeset(
70        &self,
71        device_id: &str,
72        seq: u64,
73        data: Vec<u8>,
74    ) -> Result<(), StorageError>;
75
76    /// Update the head pointer for a device.
77    /// Writes to `heads/{device_id}.json.enc`.
78    /// If `snapshot_seq` is Some, the head records that a snapshot covers
79    /// all changesets up to that seq. `timestamp` is the RFC 3339 time of
80    /// this sync (used by the sync status UI).
81    async fn put_head(
82        &self,
83        device_id: &str,
84        seq: u64,
85        snapshot_seq: Option<u64>,
86        timestamp: &str,
87    ) -> Result<(), StorageError>;
88
89    /// Upload an encrypted blob to `{namespace}/{id[0..2]}/{id[2..4]}/{id}`.
90    /// The plaintext is encrypted with the key selected by `scope`
91    /// (master, or a per-scope derived key).
92    async fn put_blob(
93        &self,
94        namespace: &str,
95        id: &str,
96        scope: crate::blob::BlobScope,
97        data: Vec<u8>,
98    ) -> Result<(), StorageError>;
99
100    /// Download and decrypt a blob from `{namespace}/{id[0..2]}/{id[2..4]}/{id}`,
101    /// using the key selected by `scope`.
102    async fn get_blob(
103        &self,
104        namespace: &str,
105        id: &str,
106        scope: crate::blob::BlobScope,
107    ) -> Result<Vec<u8>, StorageError>;
108
109    /// Upload an encrypted snapshot.
110    /// Writes to `snapshot.db.enc` (overwrites any previous snapshot).
111    async fn put_snapshot(&self, data: Vec<u8>) -> Result<(), StorageError>;
112
113    /// Download the encrypted snapshot.
114    /// Returns bytes from `snapshot.db.enc`.
115    async fn get_snapshot(&self) -> Result<Vec<u8>, StorageError>;
116
117    /// Delete a single changeset from storage.
118    /// Removes `changes/{device_id}/{seq}.enc`.
119    async fn delete_changeset(&self, device_id: &str, seq: u64) -> Result<(), StorageError>;
120
121    /// List all changeset keys for a device.
122    /// Returns the sequence numbers that exist in `changes/{device_id}/`.
123    async fn list_changesets(&self, device_id: &str) -> Result<Vec<u64>, StorageError>;
124
125    /// Get the minimum schema version required to sync with this storage.
126    ///
127    /// Returns `None` if no minimum has been set (backwards compat: any version
128    /// can sync). Reads from `min_schema_version.json.enc`.
129    async fn get_min_schema_version(&self) -> Result<Option<u32>, StorageError>;
130
131    /// Set the minimum schema version required to sync with this storage.
132    ///
133    /// Writes to `min_schema_version.json.enc`. Used when a breaking migration
134    /// bumps the schema and all devices must upgrade before syncing.
135    async fn set_min_schema_version(&self, version: u32) -> Result<(), StorageError>;
136
137    /// Upload a membership entry.
138    /// Writes to `membership/{author_pubkey_hex}/{seq}.enc`.
139    async fn put_membership_entry(
140        &self,
141        author_pubkey: &str,
142        seq: u64,
143        data: Vec<u8>,
144    ) -> Result<(), StorageError>;
145
146    /// Download a membership entry.
147    /// Reads from `membership/{author_pubkey_hex}/{seq}.enc`.
148    async fn get_membership_entry(
149        &self,
150        author_pubkey: &str,
151        seq: u64,
152    ) -> Result<Vec<u8>, StorageError>;
153
154    /// List all membership entry keys.
155    /// Returns tuples of (author_pubkey, seq).
156    async fn list_membership_entries(&self) -> Result<Vec<(String, u64)>, StorageError>;
157
158    /// Upload a wrapped library key for a member.
159    /// Writes to `keys/{user_pubkey_hex}.enc`.
160    async fn put_wrapped_key(&self, user_pubkey: &str, data: Vec<u8>) -> Result<(), StorageError>;
161
162    /// Download a wrapped library key for a member.
163    /// Reads from `keys/{user_pubkey_hex}.enc`.
164    async fn get_wrapped_key(&self, user_pubkey: &str) -> Result<Vec<u8>, StorageError>;
165
166    /// Delete a wrapped library key.
167    /// Removes `keys/{user_pubkey_hex}.enc`.
168    async fn delete_wrapped_key(&self, user_pubkey: &str) -> Result<(), StorageError>;
169
170    /// Upload snapshot metadata (plaintext -- the implementation encrypts it).
171    /// Writes to `snapshot_meta.json.enc`.
172    async fn put_snapshot_meta(&self, data: Vec<u8>) -> Result<(), StorageError>;
173
174    /// Download snapshot metadata (decrypted).
175    /// Reads from `snapshot_meta.json.enc`. Returns NotFound if no metadata exists.
176    async fn get_snapshot_meta(&self) -> Result<Vec<u8>, StorageError>;
177}