1use std::sync::Arc;
8use std::time::Duration;
9
10use tokio::sync::mpsc::error::TrySendError;
11use tracing::{debug, info, warn};
12
13use crate::blob::{BlobPlan, BlobUploadObserver};
14use crate::changeset::RowChange;
15use crate::clock::ClockRef;
16use crate::db::SyncBookkeeping;
17use crate::encryption::EncryptionService;
18use crate::keys::UserKeypair;
19use crate::library_dir::LibraryDir;
20
21use super::cycle::{SyncComponents, SyncCycleOutcome};
22use super::encrypted_storage::EncryptedSyncStorage;
23use super::hlc::Hlc;
24use super::session::SyncSession;
25use super::storage::SyncStorage;
26
27#[derive(Debug, Clone)]
29pub struct SyncLoopStatus {
30 pub configured: bool,
31 pub syncing: bool,
32 pub last_sync_time: Option<String>,
33 pub error: Option<String>,
34 pub device_count: u32,
35 pub data_changed: bool,
36 pub row_changes: Option<Vec<RowChange>>,
39}
40
41pub struct SyncLoopHandle {
43 inner: Arc<SyncLoopInner>,
44 db: Arc<dyn SyncBookkeeping>,
45 clock: ClockRef,
46 library_dir: LibraryDir,
47 trigger_tx: tokio::sync::mpsc::Sender<()>,
48 trigger_rx: std::sync::Mutex<Option<tokio::sync::mpsc::Receiver<()>>>,
49 event_tx: tokio::sync::broadcast::Sender<SyncLoopStatus>,
50 loop_handle: std::sync::Mutex<Option<std::thread::JoinHandle<()>>>,
51}
52
53struct SyncLoopInner {
54 storage: Arc<EncryptedSyncStorage>,
55 hlc: Arc<Hlc>,
56 device_id: String,
57 encryption: Arc<std::sync::RwLock<EncryptionService>>,
58 raw_db: *mut libsqlite3_sys::sqlite3,
59 session: tokio::sync::Mutex<Option<SyncSession>>,
60 user_keypair: UserKeypair,
61 blob_plan: Arc<dyn BlobPlan>,
62 observer: Option<Arc<dyn BlobUploadObserver>>,
63}
64
65unsafe impl Send for SyncLoopInner {}
69unsafe impl Sync for SyncLoopInner {}
70
71impl SyncLoopHandle {
72 pub fn new(
73 components: SyncComponents,
74 db: Arc<dyn SyncBookkeeping>,
75 clock: ClockRef,
76 library_dir: LibraryDir,
77 blob_plan: Arc<dyn BlobPlan>,
78 observer: Option<Arc<dyn BlobUploadObserver>>,
79 ) -> Self {
80 let (trigger_tx, trigger_rx) = tokio::sync::mpsc::channel(1);
81 let (event_tx, _) = tokio::sync::broadcast::channel(16);
82
83 Self {
84 inner: Arc::new(SyncLoopInner {
85 storage: components.storage,
86 hlc: components.hlc,
87 device_id: components.device_id,
88 encryption: components.encryption,
89 raw_db: components.raw_db,
90 session: tokio::sync::Mutex::new(Some(components.session)),
91 user_keypair: components.user_keypair,
92 blob_plan,
93 observer,
94 }),
95 db,
96 clock,
97 library_dir,
98 trigger_tx,
99 trigger_rx: std::sync::Mutex::new(Some(trigger_rx)),
100 event_tx,
101 loop_handle: std::sync::Mutex::new(None),
102 }
103 }
104
105 pub fn start(&self) {
111 {
112 let guard = self.loop_handle.lock().unwrap();
113 if guard.is_some() {
114 return;
115 }
116 }
117
118 let mut trigger_rx = match self.trigger_rx.lock().unwrap().take() {
119 Some(rx) => rx,
120 None => {
121 warn!("Sync trigger receiver already taken");
122 return;
123 }
124 };
125
126 let inner = Arc::clone(&self.inner);
127 let event_tx = self.event_tx.clone();
128 let db = Arc::clone(&self.db);
129 let clock = self.clock.clone();
130 let library_dir = self.library_dir.clone();
131
132 let handle = std::thread::Builder::new()
133 .name("coven-sync-loop".to_string())
134 .spawn(move || {
135 let rt = match tokio::runtime::Builder::new_current_thread()
136 .enable_all()
137 .build()
138 {
139 Ok(rt) => rt,
140 Err(e) => {
141 warn!("Failed to create sync loop runtime: {e}");
142 return;
143 }
144 };
145
146 rt.block_on(async {
147 tokio::time::sleep(Duration::from_secs(3)).await;
149
150 loop {
151 match run_single_cycle(&inner, db.as_ref(), clock.as_ref(), &library_dir)
152 .await
153 {
154 Ok(result) => {
155 let error = if result.asset_downloads_failed {
156 Some("Some files failed to download, will retry".to_string())
157 } else {
158 None
159 };
160 let data_changed = result.changesets_applied > 0;
161 let row_changes = if data_changed && !result.row_changes.is_empty()
162 {
163 Some(result.row_changes)
164 } else {
165 None
166 };
167 let status = SyncLoopStatus {
168 configured: true,
169 syncing: false,
170 last_sync_time: Some(result.sync_time),
171 error,
172 device_count: (result.other_device_count + 1) as u32,
173 data_changed,
174 row_changes,
175 };
176 let _ = event_tx.send(status);
177 }
178 Err(e) => {
179 let status = SyncLoopStatus {
180 configured: true,
181 syncing: false,
182 last_sync_time: None,
183 error: Some(e),
184 device_count: 0,
185 data_changed: false,
186 row_changes: None,
187 };
188 let _ = event_tx.send(status);
189 }
190 }
191
192 tokio::select! {
193 _ = tokio::time::sleep(Duration::from_secs(30)) => {}
194 msg = trigger_rx.recv() => {
195 if msg.is_none() {
196 info!("Sync trigger channel closed, stopping sync loop");
197 break;
198 }
199 }
200 }
201 }
202 });
203 })
204 .expect("Failed to spawn sync loop thread");
205
206 *self.loop_handle.lock().unwrap() = Some(handle);
207 }
208
209 pub fn is_running(&self) -> bool {
211 self.loop_handle.lock().unwrap().is_some()
212 }
213
214 pub fn trigger(&self) {
220 match self.trigger_tx.try_send(()) {
221 Ok(()) | Err(TrySendError::Full(())) => {}
222 Err(TrySendError::Closed(())) => {
223 debug!("Sync trigger channel closed, loop is not running");
224 }
225 }
226 }
227
228 pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<SyncLoopStatus> {
230 self.event_tx.subscribe()
231 }
232
233 pub fn storage(&self) -> &Arc<EncryptedSyncStorage> {
236 &self.inner.storage
237 }
238
239 pub fn user_keypair(&self) -> &UserKeypair {
240 &self.inner.user_keypair
241 }
242
243 pub fn hlc(&self) -> &Arc<Hlc> {
244 &self.inner.hlc
245 }
246
247 pub fn encryption(&self) -> &Arc<std::sync::RwLock<EncryptionService>> {
248 &self.inner.encryption
249 }
250}
251
252async fn run_single_cycle(
254 inner: &SyncLoopInner,
255 db: &dyn SyncBookkeeping,
256 clock: &dyn crate::clock::Clock,
257 library_dir: &LibraryDir,
258) -> Result<super::cycle::SyncCycleResult, String> {
259 let storage: &dyn SyncStorage = &*inner.storage;
260
261 let session = match inner.session.lock().await.take() {
262 Some(s) => s,
263 None => {
264 warn!("Sync session was None, creating a new one");
265 unsafe { SyncSession::start(inner.raw_db) }
266 .map_err(|e| format!("Failed to create replacement sync session: {e}"))?
267 }
268 };
269
270 let cloud_home = inner.storage.cloud_home();
271
272 let outcome = unsafe {
273 super::cycle::run_single_sync_cycle(
274 storage,
275 &inner.device_id,
276 &inner.hlc,
277 clock,
278 inner.raw_db,
279 session,
280 &inner.encryption,
281 &inner.user_keypair,
282 db,
283 library_dir,
284 Some(cloud_home),
285 inner.blob_plan.as_ref(),
286 inner.observer.as_deref(),
287 )
288 .await
289 };
290
291 match outcome {
292 SyncCycleOutcome::Ok(result, new_session) => {
293 *inner.session.lock().await = Some(new_session);
294 Ok(result)
295 }
296 SyncCycleOutcome::ErrWithSession(e, new_session) => {
297 *inner.session.lock().await = Some(new_session);
298 Err(e)
299 }
300 SyncCycleOutcome::ErrNoSession(e) => Err(e),
301 }
302}