coven/sync/
sync_loop.rs

1//! Sync loop handle: manages the background sync loop thread.
2//!
3//! Owns the sync infrastructure (storage client, HLC, session, etc.) and
4//! spawns a dedicated OS thread that runs sync cycles on a timer or manual
5//! trigger. Emits `SyncLoopStatus` events through a broadcast channel.
6
7use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::mpsc::error::TrySendError;
11use tracing::{debug, info, warn};
12
13use crate::blob::{BlobPlan, BlobUploadObserver};
14use crate::changeset::RowChange;
15use crate::clock::ClockRef;
16use crate::db::SyncBookkeeping;
17use crate::encryption::EncryptionService;
18use crate::keys::UserKeypair;
19use crate::library_dir::LibraryDir;
20
21use super::cycle::{SyncComponents, SyncCycleOutcome};
22use super::encrypted_storage::EncryptedSyncStorage;
23use super::hlc::Hlc;
24use super::session::SyncSession;
25use super::storage::SyncStorage;
26
27/// Status emitted by the sync loop after each cycle.
28#[derive(Debug, Clone)]
29pub struct SyncLoopStatus {
30    pub configured: bool,
31    pub syncing: bool,
32    pub last_sync_time: Option<String>,
33    pub error: Option<String>,
34    pub device_count: u32,
35    pub data_changed: bool,
36    /// Row changes from applied changesets, for the host to map to domain
37    /// events. Present when `data_changed` is true.
38    pub row_changes: Option<Vec<RowChange>>,
39}
40
41/// Manages the background sync loop and provides access to sync components.
42pub struct SyncLoopHandle {
43    inner: Arc<SyncLoopInner>,
44    db: Arc<dyn SyncBookkeeping>,
45    clock: ClockRef,
46    library_dir: LibraryDir,
47    trigger_tx: tokio::sync::mpsc::Sender<()>,
48    trigger_rx: std::sync::Mutex<Option<tokio::sync::mpsc::Receiver<()>>>,
49    event_tx: tokio::sync::broadcast::Sender<SyncLoopStatus>,
50    loop_handle: std::sync::Mutex<Option<std::thread::JoinHandle<()>>>,
51}
52
53struct SyncLoopInner {
54    storage: Arc<EncryptedSyncStorage>,
55    hlc: Arc<Hlc>,
56    device_id: String,
57    encryption: Arc<std::sync::RwLock<EncryptionService>>,
58    raw_db: *mut libsqlite3_sys::sqlite3,
59    session: tokio::sync::Mutex<Option<SyncSession>>,
60    user_keypair: UserKeypair,
61    blob_plan: Arc<dyn BlobPlan>,
62    observer: Option<Arc<dyn BlobUploadObserver>>,
63}
64
65// SAFETY: The raw sqlite3 pointer is only used for session extension operations
66// which are serialized through the sync loop. The pointer itself is stable
67// (heap-allocated write connection held by the host).
68unsafe impl Send for SyncLoopInner {}
69unsafe impl Sync for SyncLoopInner {}
70
71impl SyncLoopHandle {
72    pub fn new(
73        components: SyncComponents,
74        db: Arc<dyn SyncBookkeeping>,
75        clock: ClockRef,
76        library_dir: LibraryDir,
77        blob_plan: Arc<dyn BlobPlan>,
78        observer: Option<Arc<dyn BlobUploadObserver>>,
79    ) -> Self {
80        let (trigger_tx, trigger_rx) = tokio::sync::mpsc::channel(1);
81        let (event_tx, _) = tokio::sync::broadcast::channel(16);
82
83        Self {
84            inner: Arc::new(SyncLoopInner {
85                storage: components.storage,
86                hlc: components.hlc,
87                device_id: components.device_id,
88                encryption: components.encryption,
89                raw_db: components.raw_db,
90                session: tokio::sync::Mutex::new(Some(components.session)),
91                user_keypair: components.user_keypair,
92                blob_plan,
93                observer,
94            }),
95            db,
96            clock,
97            library_dir,
98            trigger_tx,
99            trigger_rx: std::sync::Mutex::new(Some(trigger_rx)),
100            event_tx,
101            loop_handle: std::sync::Mutex::new(None),
102        }
103    }
104
105    /// Start the background sync loop. No-op if already running.
106    ///
107    /// Spawns a dedicated OS thread with its own tokio runtime because
108    /// the sync session holds a raw sqlite3 pointer (not Send across
109    /// tokio task boundaries).
110    pub fn start(&self) {
111        {
112            let guard = self.loop_handle.lock().unwrap();
113            if guard.is_some() {
114                return;
115            }
116        }
117
118        let mut trigger_rx = match self.trigger_rx.lock().unwrap().take() {
119            Some(rx) => rx,
120            None => {
121                warn!("Sync trigger receiver already taken");
122                return;
123            }
124        };
125
126        let inner = Arc::clone(&self.inner);
127        let event_tx = self.event_tx.clone();
128        let db = Arc::clone(&self.db);
129        let clock = self.clock.clone();
130        let library_dir = self.library_dir.clone();
131
132        let handle = std::thread::Builder::new()
133            .name("coven-sync-loop".to_string())
134            .spawn(move || {
135                let rt = match tokio::runtime::Builder::new_current_thread()
136                    .enable_all()
137                    .build()
138                {
139                    Ok(rt) => rt,
140                    Err(e) => {
141                        warn!("Failed to create sync loop runtime: {e}");
142                        return;
143                    }
144                };
145
146                rt.block_on(async {
147                    // Short delay to avoid racing with app startup
148                    tokio::time::sleep(Duration::from_secs(3)).await;
149
150                    loop {
151                        match run_single_cycle(&inner, db.as_ref(), clock.as_ref(), &library_dir)
152                            .await
153                        {
154                            Ok(result) => {
155                                let error = if result.asset_downloads_failed {
156                                    Some("Some files failed to download, will retry".to_string())
157                                } else {
158                                    None
159                                };
160                                let data_changed = result.changesets_applied > 0;
161                                let row_changes = if data_changed && !result.row_changes.is_empty()
162                                {
163                                    Some(result.row_changes)
164                                } else {
165                                    None
166                                };
167                                let status = SyncLoopStatus {
168                                    configured: true,
169                                    syncing: false,
170                                    last_sync_time: Some(result.sync_time),
171                                    error,
172                                    device_count: (result.other_device_count + 1) as u32,
173                                    data_changed,
174                                    row_changes,
175                                };
176                                let _ = event_tx.send(status);
177                            }
178                            Err(e) => {
179                                let status = SyncLoopStatus {
180                                    configured: true,
181                                    syncing: false,
182                                    last_sync_time: None,
183                                    error: Some(e),
184                                    device_count: 0,
185                                    data_changed: false,
186                                    row_changes: None,
187                                };
188                                let _ = event_tx.send(status);
189                            }
190                        }
191
192                        tokio::select! {
193                            _ = tokio::time::sleep(Duration::from_secs(30)) => {}
194                            msg = trigger_rx.recv() => {
195                                if msg.is_none() {
196                                    info!("Sync trigger channel closed, stopping sync loop");
197                                    break;
198                                }
199                            }
200                        }
201                    }
202                });
203            })
204            .expect("Failed to spawn sync loop thread");
205
206        *self.loop_handle.lock().unwrap() = Some(handle);
207    }
208
209    /// Whether the background sync thread is running.
210    pub fn is_running(&self) -> bool {
211        self.loop_handle.lock().unwrap().is_some()
212    }
213
214    /// Signal the sync loop to run a cycle immediately.
215    ///
216    /// `Full` means a trigger is already pending — our request collapses
217    /// into the existing one, which is exactly what the capacity-1 channel
218    /// is for. `Closed` means the loop is gone, so the trigger is moot.
219    pub fn trigger(&self) {
220        match self.trigger_tx.try_send(()) {
221            Ok(()) | Err(TrySendError::Full(())) => {}
222            Err(TrySendError::Closed(())) => {
223                debug!("Sync trigger channel closed, loop is not running");
224            }
225        }
226    }
227
228    /// Subscribe to sync loop status events.
229    pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<SyncLoopStatus> {
230        self.event_tx.subscribe()
231    }
232
233    // -- Accessors for membership operations --
234
235    pub fn storage(&self) -> &Arc<EncryptedSyncStorage> {
236        &self.inner.storage
237    }
238
239    pub fn user_keypair(&self) -> &UserKeypair {
240        &self.inner.user_keypair
241    }
242
243    pub fn hlc(&self) -> &Arc<Hlc> {
244        &self.inner.hlc
245    }
246
247    pub fn encryption(&self) -> &Arc<std::sync::RwLock<EncryptionService>> {
248        &self.inner.encryption
249    }
250}
251
252/// Run a single sync cycle, managing session lifecycle.
253async fn run_single_cycle(
254    inner: &SyncLoopInner,
255    db: &dyn SyncBookkeeping,
256    clock: &dyn crate::clock::Clock,
257    library_dir: &LibraryDir,
258) -> Result<super::cycle::SyncCycleResult, String> {
259    let storage: &dyn SyncStorage = &*inner.storage;
260
261    let session = match inner.session.lock().await.take() {
262        Some(s) => s,
263        None => {
264            warn!("Sync session was None, creating a new one");
265            unsafe { SyncSession::start(inner.raw_db) }
266                .map_err(|e| format!("Failed to create replacement sync session: {e}"))?
267        }
268    };
269
270    let cloud_home = inner.storage.cloud_home();
271
272    let outcome = unsafe {
273        super::cycle::run_single_sync_cycle(
274            storage,
275            &inner.device_id,
276            &inner.hlc,
277            clock,
278            inner.raw_db,
279            session,
280            &inner.encryption,
281            &inner.user_keypair,
282            db,
283            library_dir,
284            Some(cloud_home),
285            inner.blob_plan.as_ref(),
286            inner.observer.as_deref(),
287        )
288        .await
289    };
290
291    match outcome {
292        SyncCycleOutcome::Ok(result, new_session) => {
293            *inner.session.lock().await = Some(new_session);
294            Ok(result)
295        }
296        SyncCycleOutcome::ErrWithSession(e, new_session) => {
297            *inner.session.lock().await = Some(new_session);
298            Err(e)
299        }
300        SyncCycleOutcome::ErrNoSession(e) => Err(e),
301    }
302}