coven/sync/
sync_manager.rs

1//! High-level sync manager: lifecycle, membership, status.
2//!
3//! Owns the sync lifecycle — cloud home + sync loop — and starts/stops it when
4//! a provider is connected/disconnected, no app restart required. The host
5//! supplies the config snapshot, keys, encryption, database, clock, and blob
6//! handling; coven drives the rest.
7
8use std::sync::{Arc, RwLock};
9
10use tracing::info;
11
12use crate::blob::{BlobPlan, BlobUploadObserver};
13use crate::clock::ClockRef;
14use crate::config::Config;
15use crate::db::SyncDb;
16use crate::encryption::EncryptionService;
17use crate::keys::KeyService;
18use crate::storage::cloud::CloudHome;
19use crate::sync::membership::MemberRole;
20use crate::sync::storage::SyncStorage;
21use crate::sync::sync_loop::SyncLoopHandle;
22
23/// Supplies the host's current config on demand. coven reads it fresh each call
24/// — never snapshotting or writing it — so a host with reactive config sees
25/// changes without rebuilding the manager.
26pub type ConfigProvider = Arc<dyn Fn() -> Config + Send + Sync>;
27
28/// High-level sync manager.
29///
30/// Always has a valid EncryptionService — if no encryption key exists,
31/// don't create a SyncManager at all.
32pub struct SyncManager {
33    config_provider: ConfigProvider,
34    key_service: KeyService,
35    encryption_service: EncryptionService,
36    db: Arc<dyn SyncDb>,
37    clock: ClockRef,
38    blob_plan: Arc<dyn BlobPlan>,
39    observer: Option<Arc<dyn BlobUploadObserver>>,
40
41    // Mutable sync state — updated when providers are connected/disconnected
42    sync_loop_handle: RwLock<Option<Arc<SyncLoopHandle>>>,
43    cloud_home: RwLock<Option<Arc<dyn CloudHome>>>,
44}
45
46/// A member as returned by get_members.
47#[derive(Debug, Clone)]
48pub struct MemberInfo {
49    pub pubkey: String,
50    pub role: MemberRole,
51    pub is_self: bool,
52}
53
54/// Sync status snapshot.
55#[derive(Debug, Clone)]
56pub struct SyncStatus {
57    pub configured: bool,
58    pub syncing: bool,
59    pub last_sync_time: Option<String>,
60    pub error: Option<String>,
61    pub device_count: u32,
62}
63
64impl SyncManager {
65    pub fn new(
66        config_provider: ConfigProvider,
67        key_service: KeyService,
68        encryption_service: EncryptionService,
69        db: Arc<dyn SyncDb>,
70        clock: ClockRef,
71        blob_plan: Arc<dyn BlobPlan>,
72        observer: Option<Arc<dyn BlobUploadObserver>>,
73    ) -> Self {
74        Self {
75            config_provider,
76            key_service,
77            encryption_service,
78            db,
79            clock,
80            blob_plan,
81            observer,
82            sync_loop_handle: RwLock::new(None),
83            cloud_home: RwLock::new(None),
84        }
85    }
86
87    pub fn encryption_service(&self) -> &EncryptionService {
88        &self.encryption_service
89    }
90
91    pub fn cloud_home(&self) -> Option<Arc<dyn CloudHome>> {
92        self.cloud_home.read().unwrap().clone()
93    }
94
95    pub fn sync_loop_handle(&self) -> Option<Arc<SyncLoopHandle>> {
96        self.sync_loop_handle.read().unwrap().clone()
97    }
98
99    // =========================================================================
100    // Sync lifecycle
101    // =========================================================================
102
103    /// Initialize cloud home and sync loop from current config.
104    /// Called at startup (if already configured) and after connecting a provider.
105    pub async fn start_sync(&self) {
106        let config = (self.config_provider)();
107
108        // Create cloud home
109        let cloud_home: Option<Arc<dyn CloudHome>> = match crate::storage::cloud::create_cloud_home(
110            &config,
111            &self.key_service,
112            self.clock.clone(),
113        )
114        .await
115        {
116            Ok(ch) => Some(Arc::from(ch)),
117            Err(e) => {
118                info!("Cloud home not available: {e}");
119                return;
120            }
121        };
122
123        *self.cloud_home.write().unwrap() = cloud_home;
124
125        if !config.sync_enabled(&self.key_service) {
126            return;
127        }
128
129        // Initialize sync loop
130        let sync_loop = crate::sync::cycle::init_sync(
131            &config,
132            &self.key_service,
133            self.db.as_ref(),
134            self.clock.clone(),
135            &self.encryption_service,
136        )
137        .await;
138
139        if let Some(components) = sync_loop {
140            let library_dir = config.library_dir.clone();
141            let handle = Arc::new(SyncLoopHandle::new(
142                components,
143                self.db.clone(),
144                self.clock.clone(),
145                library_dir,
146                self.blob_plan.clone(),
147                self.observer.clone(),
148            ));
149            handle.start();
150
151            info!("Sync loop started");
152            *self.sync_loop_handle.write().unwrap() = Some(handle);
153        }
154    }
155
156    /// Tear down the sync loop and cloud home.
157    pub fn stop_sync(&self) {
158        *self.sync_loop_handle.write().unwrap() = None;
159        *self.cloud_home.write().unwrap() = None;
160
161        info!("Sync loop stopped");
162    }
163
164    // =========================================================================
165    // Status / config queries
166    // =========================================================================
167
168    pub fn is_sync_ready(&self) -> bool {
169        self.sync_loop_handle
170            .read()
171            .unwrap()
172            .as_ref()
173            .is_some_and(|h| h.is_running())
174    }
175
176    pub fn trigger_sync(&self) {
177        if let Some(ref sync_loop) = *self.sync_loop_handle.read().unwrap() {
178            sync_loop.trigger();
179        }
180    }
181
182    // =========================================================================
183    // Keys / codes
184    // =========================================================================
185
186    pub fn get_user_pubkey(&self) -> Result<Option<String>, String> {
187        self.key_service
188            .get_user_public_key()
189            .map(|opt| opt.map(hex::encode))
190            .map_err(|e| format!("Failed to read user public key: {e}"))
191    }
192
193    pub fn generate_restore_code(&self) -> Result<String, String> {
194        let config = (self.config_provider)();
195        crate::storage::cloud::setup::generate_restore_code(&config, &self.key_service)
196            .map_err(|e| e.to_string())
197    }
198
199    // =========================================================================
200    // Membership
201    // =========================================================================
202
203    pub async fn get_members(&self) -> Result<Vec<MemberInfo>, String> {
204        let config = (self.config_provider)();
205        if !config.sync_enabled(&self.key_service) {
206            return Ok(Vec::new());
207        }
208
209        let storage = crate::storage::cloud::setup::create_sync_storage(
210            &config,
211            &self.key_service,
212            &Some(self.encryption_service.clone()),
213            self.clock.clone(),
214        )
215        .await
216        .map_err(|e| format!("Failed to create storage client: {e}"))?;
217
218        let user_pubkey = self
219            .key_service
220            .get_user_public_key()
221            .map_err(|e| format!("Failed to read user public key: {e}"))?;
222        let members = crate::sync::membership_ops::get_members(
223            &storage,
224            user_pubkey.as_ref().map(|k| k.as_slice()),
225        )
226        .await
227        .map_err(|e| e.0)?;
228
229        Ok(members
230            .into_iter()
231            .map(|m| MemberInfo {
232                pubkey: m.pubkey,
233                role: m.role,
234                is_self: m.is_self,
235            })
236            .collect())
237    }
238
239    pub async fn invite_member(
240        &self,
241        public_key_hex: &str,
242        role: MemberRole,
243    ) -> Result<String, String> {
244        let sync_loop = self
245            .sync_loop_handle
246            .read()
247            .unwrap()
248            .clone()
249            .ok_or("Sync is not configured")?;
250
251        let encryption_key_hex = self
252            .key_service
253            .get_encryption_key()
254            .map_err(|e| format!("Failed to read encryption key: {e}"))?
255            .ok_or("Encryption key not configured")?;
256
257        let key_bytes: [u8; 32] = hex::decode(&encryption_key_hex)
258            .map_err(|e| format!("Invalid encryption key hex: {e}"))?
259            .try_into()
260            .map_err(|_| "Encryption key wrong length".to_string())?;
261
262        let (library_id, library_name) = {
263            let config = (self.config_provider)();
264            (config.library_id.clone(), config.library_name.clone())
265        };
266
267        let storage: &dyn SyncStorage = &**sync_loop.storage();
268        let cloud_home = sync_loop.storage().cloud_home();
269
270        let invite_code = crate::sync::membership_ops::invite_member(
271            storage,
272            cloud_home,
273            sync_loop.user_keypair(),
274            sync_loop.hlc(),
275            public_key_hex,
276            role,
277            &key_bytes,
278            &library_id,
279            &library_name,
280        )
281        .await
282        .map_err(|e| e.0)?;
283
284        Ok(crate::join_code::encode(&invite_code))
285    }
286
287    pub async fn remove_member(&self, public_key_hex: &str) -> Result<String, String> {
288        let sync_loop = self
289            .sync_loop_handle
290            .read()
291            .unwrap()
292            .clone()
293            .ok_or("Sync is not configured")?;
294
295        let storage: &dyn SyncStorage = &**sync_loop.storage();
296        let cloud_home = sync_loop.storage().cloud_home();
297
298        let new_key = crate::sync::membership_ops::remove_member(
299            storage,
300            cloud_home,
301            sync_loop.user_keypair(),
302            sync_loop.hlc(),
303            public_key_hex,
304        )
305        .await
306        .map_err(|e| e.0)?;
307
308        // Rotate the in-use key; the host records the returned fingerprint and
309        // that a key is stored in its own config.
310        let fingerprint = crate::sync::membership_ops::apply_key_rotation(
311            new_key,
312            &self.key_service,
313            sync_loop.encryption(),
314        )
315        .map_err(|e| e.0)?;
316
317        Ok(fingerprint)
318    }
319}