1use async_trait::async_trait;
8use serde::{Deserialize, Serialize};
9use std::sync::{Arc, RwLock};
10
11use super::storage::{DeviceHead, StorageError, SyncStorage};
12use crate::encryption::EncryptionService;
13use crate::storage::cloud::CloudHome;
14
15#[derive(Serialize, Deserialize)]
17struct HeadJson {
18 seq: u64,
19 #[serde(skip_serializing_if = "Option::is_none")]
20 snapshot_seq: Option<u64>,
21 #[serde(skip_serializing_if = "Option::is_none")]
23 last_sync: Option<String>,
24}
25
26#[derive(Serialize, Deserialize)]
28struct MinSchemaVersionJson {
29 min_schema_version: u32,
30}
31
32pub struct EncryptedSyncStorage {
35 home: Box<dyn CloudHome>,
36 encryption: Arc<RwLock<EncryptionService>>,
37}
38
39impl EncryptedSyncStorage {
40 pub fn new(home: Box<dyn CloudHome>, encryption: EncryptionService) -> Self {
41 EncryptedSyncStorage {
42 home,
43 encryption: Arc::new(RwLock::new(encryption)),
44 }
45 }
46
47 pub fn shared_encryption(&self) -> Arc<RwLock<EncryptionService>> {
50 self.encryption.clone()
51 }
52
53 pub fn cloud_home(&self) -> &dyn CloudHome {
55 &*self.home
56 }
57
58 fn enc(&self) -> std::sync::RwLockReadGuard<'_, EncryptionService> {
60 self.encryption.read().unwrap()
61 }
62
63 pub fn blob_key(namespace: &str, id: &str) -> String {
65 crate::library_dir::LibraryDir::hashed_path(namespace, id)
66 }
67}
68
69#[async_trait]
70impl SyncStorage for EncryptedSyncStorage {
71 async fn list_heads(&self) -> Result<Vec<DeviceHead>, StorageError> {
72 let keys = self.home.list("heads/").await?;
73 let mut heads = Vec::new();
74
75 for key in &keys {
76 let device_id = key
78 .strip_prefix("heads/")
79 .and_then(|s| s.strip_suffix(".json.enc"))
80 .ok_or_else(|| StorageError::S3(format!("unexpected head key format: {key}")))?;
81
82 let encrypted = self.home.read(key).await?;
83 let decrypted = self
84 .enc()
85 .decrypt(&encrypted)
86 .map_err(|e| StorageError::Decryption(format!("head {device_id}: {e}")))?;
87
88 let head_json: HeadJson = serde_json::from_slice(&decrypted)
89 .map_err(|e| StorageError::S3(format!("parse head {device_id}: {e}")))?;
90
91 heads.push(DeviceHead {
92 device_id: device_id.to_string(),
93 seq: head_json.seq,
94 snapshot_seq: head_json.snapshot_seq,
95 last_sync: head_json.last_sync,
96 });
97 }
98
99 Ok(heads)
100 }
101
102 async fn get_changeset(&self, device_id: &str, seq: u64) -> Result<Vec<u8>, StorageError> {
103 let key = format!("changes/{device_id}/{seq}.enc");
104 let encrypted = self.home.read(&key).await?;
105 self.enc()
106 .decrypt(&encrypted)
107 .map_err(|e| StorageError::Decryption(format!("changeset {device_id}/{seq}: {e}")))
108 }
109
110 async fn put_changeset(
111 &self,
112 device_id: &str,
113 seq: u64,
114 data: Vec<u8>,
115 ) -> Result<(), StorageError> {
116 let key = format!("changes/{device_id}/{seq}.enc");
117 let encrypted = self.enc().encrypt(&data);
118 self.home.write(&key, encrypted).await?;
119 Ok(())
120 }
121
122 async fn put_head(
123 &self,
124 device_id: &str,
125 seq: u64,
126 snapshot_seq: Option<u64>,
127 timestamp: &str,
128 ) -> Result<(), StorageError> {
129 let head = HeadJson {
130 seq,
131 snapshot_seq,
132 last_sync: Some(timestamp.to_string()),
133 };
134 let json = serde_json::to_vec(&head)
135 .map_err(|e| StorageError::S3(format!("serialize head: {e}")))?;
136 let encrypted = self.enc().encrypt(&json);
137 let key = format!("heads/{device_id}.json.enc");
138 self.home.write(&key, encrypted).await?;
139 Ok(())
140 }
141
142 async fn put_blob(
143 &self,
144 namespace: &str,
145 id: &str,
146 scope: crate::blob::BlobScope,
147 data: Vec<u8>,
148 ) -> Result<(), StorageError> {
149 let key = Self::blob_key(namespace, id);
150 let enc = match scope {
151 crate::blob::BlobScope::Master => self.enc().clone(),
152 crate::blob::BlobScope::Derived(s) => self.enc().derive_scoped(&s),
153 };
154 let encrypted = enc.encrypt(&data);
155 self.home.write(&key, encrypted).await?;
156 Ok(())
157 }
158
159 async fn get_blob(
160 &self,
161 namespace: &str,
162 id: &str,
163 scope: crate::blob::BlobScope,
164 ) -> Result<Vec<u8>, StorageError> {
165 let key = Self::blob_key(namespace, id);
166 let encrypted = self.home.read(&key).await?;
167 let enc = match scope {
168 crate::blob::BlobScope::Master => self.enc().clone(),
169 crate::blob::BlobScope::Derived(s) => self.enc().derive_scoped(&s),
170 };
171 enc.decrypt(&encrypted)
172 .map_err(|e| StorageError::Decryption(format!("blob {namespace}/{id}: {e}")))
173 }
174
175 async fn put_snapshot(&self, data: Vec<u8>) -> Result<(), StorageError> {
176 self.home.write("snapshot.db.enc", data).await?;
177 Ok(())
178 }
179
180 async fn get_snapshot(&self) -> Result<Vec<u8>, StorageError> {
181 self.home
182 .read("snapshot.db.enc")
183 .await
184 .map_err(StorageError::from)
185 }
186
187 async fn delete_changeset(&self, device_id: &str, seq: u64) -> Result<(), StorageError> {
188 let key = format!("changes/{device_id}/{seq}.enc");
189 self.home.delete(&key).await?;
190 Ok(())
191 }
192
193 async fn list_changesets(&self, device_id: &str) -> Result<Vec<u64>, StorageError> {
194 let prefix = format!("changes/{device_id}/");
195 let keys = self.home.list(&prefix).await?;
196
197 let mut seqs: Vec<u64> = keys
198 .iter()
199 .filter_map(|k| {
200 k.strip_prefix(&prefix)
201 .and_then(|s| s.strip_suffix(".enc"))
202 .and_then(|s| s.parse().ok())
203 })
204 .collect();
205 seqs.sort();
206 Ok(seqs)
207 }
208
209 async fn get_min_schema_version(&self) -> Result<Option<u32>, StorageError> {
210 let key = "min_schema_version.json.enc";
211 let encrypted = match self.home.read(key).await {
212 Ok(data) => data,
213 Err(crate::storage::cloud::CloudHomeError::NotFound(_)) => return Ok(None),
214 Err(e) => return Err(StorageError::from(e)),
215 };
216
217 let decrypted = self
218 .enc()
219 .decrypt(&encrypted)
220 .map_err(|e| StorageError::Decryption(format!("min_schema_version: {e}")))?;
221
222 let parsed: MinSchemaVersionJson = serde_json::from_slice(&decrypted)
223 .map_err(|e| StorageError::S3(format!("parse min_schema_version: {e}")))?;
224
225 Ok(Some(parsed.min_schema_version))
226 }
227
228 async fn set_min_schema_version(&self, version: u32) -> Result<(), StorageError> {
229 let payload = MinSchemaVersionJson {
230 min_schema_version: version,
231 };
232 let json = serde_json::to_vec(&payload)
233 .map_err(|e| StorageError::S3(format!("serialize min_schema_version: {e}")))?;
234 let encrypted = self.enc().encrypt(&json);
235 self.home
236 .write("min_schema_version.json.enc", encrypted)
237 .await?;
238 Ok(())
239 }
240
241 async fn put_membership_entry(
242 &self,
243 author_pubkey: &str,
244 seq: u64,
245 data: Vec<u8>,
246 ) -> Result<(), StorageError> {
247 let key = format!("membership/{author_pubkey}/{seq}.enc");
248 let encrypted = self.enc().encrypt(&data);
249 self.home.write(&key, encrypted).await?;
250 Ok(())
251 }
252
253 async fn get_membership_entry(
254 &self,
255 author_pubkey: &str,
256 seq: u64,
257 ) -> Result<Vec<u8>, StorageError> {
258 let key = format!("membership/{author_pubkey}/{seq}.enc");
259 let encrypted = self.home.read(&key).await?;
260 self.enc()
261 .decrypt(&encrypted)
262 .map_err(|e| StorageError::Decryption(format!("membership {author_pubkey}/{seq}: {e}")))
263 }
264
265 async fn list_membership_entries(&self) -> Result<Vec<(String, u64)>, StorageError> {
266 let keys = self.home.list("membership/").await?;
267 let mut entries = Vec::new();
268
269 for key in &keys {
270 let rest = match key.strip_prefix("membership/") {
272 Some(r) => r,
273 None => continue,
274 };
275 let rest = match rest.strip_suffix(".enc") {
276 Some(r) => r,
277 None => continue,
278 };
279
280 if let Some(slash_pos) = rest.rfind('/') {
283 let author = &rest[..slash_pos];
284 if let Ok(seq) = rest[slash_pos + 1..].parse::<u64>() {
285 entries.push((author.to_string(), seq));
286 }
287 }
288 }
289
290 Ok(entries)
291 }
292
293 async fn put_wrapped_key(&self, user_pubkey: &str, data: Vec<u8>) -> Result<(), StorageError> {
294 let key = format!("keys/{user_pubkey}.enc");
295 self.home.write(&key, data).await?;
297 Ok(())
298 }
299
300 async fn get_wrapped_key(&self, user_pubkey: &str) -> Result<Vec<u8>, StorageError> {
301 let key = format!("keys/{user_pubkey}.enc");
302 self.home.read(&key).await.map_err(StorageError::from)
304 }
305
306 async fn delete_wrapped_key(&self, user_pubkey: &str) -> Result<(), StorageError> {
307 let key = format!("keys/{user_pubkey}.enc");
308 self.home.delete(&key).await?;
309 Ok(())
310 }
311
312 async fn put_snapshot_meta(&self, data: Vec<u8>) -> Result<(), StorageError> {
313 let encrypted = self.enc().encrypt(&data);
314 self.home.write("snapshot_meta.json.enc", encrypted).await?;
315 Ok(())
316 }
317
318 async fn get_snapshot_meta(&self) -> Result<Vec<u8>, StorageError> {
319 let encrypted = self.home.read("snapshot_meta.json.enc").await?;
320 self.enc()
321 .decrypt(&encrypted)
322 .map_err(|e| StorageError::Decryption(format!("snapshot_meta: {e}")))
323 }
324}