coven/sync/
service.rs

1/// Full sync orchestrator: push local changes, pull remote changes.
2///
3/// Protocol:
4/// 1. Grab changeset from the current session.
5/// 2. End the session (so incoming applies don't contaminate outgoing).
6/// 3. Push our changeset to S3 (handled by push module, stubbed here).
7/// 4. Pull incoming changesets (NO session active -- critical).
8/// 5. Apply incoming with conflict handler.
9/// 6. Start a new session for the next round.
10///
11/// The SyncService holds the configuration for a sync cycle but does NOT own
12/// the session or the raw sqlite3 handle. Those are passed in by the caller
13/// because session lifetime is tied to the write connection lock.
14use std::collections::HashMap;
15
16use tracing::{info, warn};
17
18use crate::blob::BlobPlan;
19use crate::keys::UserKeypair;
20use crate::library_dir::LibraryDir;
21
22use super::envelope::{self, sign_envelope, ChangesetEnvelope};
23use super::pull::{self, PullResult};
24use super::push::{OutgoingChangeset, SCHEMA_VERSION};
25use super::session::SyncSession;
26use super::storage::SyncStorage;
27
28/// Configuration for a sync service.
29pub struct SyncService {
30    pub device_id: String,
31}
32
33/// Everything the caller needs after a sync cycle.
34pub struct SyncResult {
35    /// The outgoing changeset bytes (if any local changes existed).
36    /// The caller is responsible for pushing this to the storage.
37    pub outgoing: Option<OutgoingChangeset>,
38    /// Pull results (how many incoming changesets were applied).
39    pub pull: PullResult,
40    /// Updated cursor map (caller should persist to sync_cursors table).
41    pub updated_cursors: HashMap<String, u64>,
42}
43
44impl SyncService {
45    pub fn new(device_id: String) -> Self {
46        SyncService { device_id }
47    }
48
49    /// Run a full sync cycle.
50    ///
51    /// This takes the current session, grabs its changeset, drops the session,
52    /// pulls remote changes, and returns what the caller needs to push and
53    /// to start a new session.
54    ///
55    /// The `message` parameter is a human-readable description of what changed
56    /// (e.g., "Imported Album One"). Callers derive this from the app event
57    /// that triggered the sync.
58    ///
59    /// The caller should:
60    /// 1. Push `outgoing` to the storage (if Some).
61    /// 2. Persist `updated_cursors` to the sync_cursors table.
62    /// 3. Start a new SyncSession on the write connection.
63    ///
64    /// # Safety
65    /// `db` must be a valid, open sqlite3 connection pointer.
66    /// The session must have been created on this same connection.
67    pub async unsafe fn sync(
68        &self,
69        db: *mut libsqlite3_sys::sqlite3,
70        session: SyncSession,
71        local_seq: u64,
72        cursors: &HashMap<String, u64>,
73        storage: &dyn SyncStorage,
74        timestamp: &str,
75        message: &str,
76        keypair: &UserKeypair,
77        library_dir: &LibraryDir,
78        blob_plan: &dyn BlobPlan,
79    ) -> Result<SyncResult, SyncCycleError> {
80        let _ = library_dir;
81
82        // Step 1: grab outgoing changeset from the session.
83        let outgoing_cs = session.changeset().map_err(SyncCycleError::Session)?;
84
85        // Step 2: end the session (drop it).
86        drop(session);
87
88        // Step 3: upload blobs the outgoing changeset references, before the
89        // envelope, so pullers can fetch them as soon as they see the change.
90        if let Some(ref cs) = outgoing_cs {
91            let changes =
92                crate::changeset::walk(cs.as_bytes()).map_err(SyncCycleError::AssetScan)?;
93            for blob in blob_plan.blobs_to_push(&changes) {
94                if !blob.local_path.exists() {
95                    warn!(id = %blob.id, "blob file not found locally, skipping upload");
96                    continue;
97                }
98                let bytes = std::fs::read(&blob.local_path)
99                    .map_err(|e| SyncCycleError::AssetUpload(e.to_string()))?;
100                storage
101                    .put_blob(&blob.namespace, &blob.id, blob.scope.clone(), bytes)
102                    .await
103                    .map_err(|e| SyncCycleError::AssetUpload(e.to_string()))?;
104                info!(id = %blob.id, namespace = %blob.namespace, "uploaded blob");
105            }
106        }
107
108        let outgoing = outgoing_cs.map(|cs| {
109            let next_seq = local_seq + 1;
110            let mut env = ChangesetEnvelope {
111                device_id: self.device_id.clone(),
112                seq: next_seq,
113                schema_version: SCHEMA_VERSION,
114                message: message.to_string(),
115                timestamp: timestamp.to_string(),
116                changeset_size: cs.len(),
117                author_pubkey: None,
118                signature: None,
119            };
120            sign_envelope(&mut env, keypair, cs.as_bytes());
121            let packed = envelope::pack(&env, cs.as_bytes());
122            OutgoingChangeset {
123                packed,
124                seq: next_seq,
125            }
126        });
127
128        // Step 4 + 5: pull incoming changesets (no session active).
129        let (updated_cursors, pull_result) = pull::pull_changes(
130            db,
131            storage,
132            &self.device_id,
133            cursors,
134            library_dir,
135            blob_plan,
136        )
137        .await
138        .map_err(SyncCycleError::Pull)?;
139
140        if pull_result.changesets_applied > 0 {
141            info!(
142                applied = pull_result.changesets_applied,
143                devices = pull_result.devices_pulled,
144                "pull complete"
145            );
146        }
147
148        // Step 6: the caller starts a new session after this returns.
149
150        Ok(SyncResult {
151            outgoing,
152            pull: pull_result,
153            updated_cursors,
154        })
155    }
156}
157
158#[derive(Debug)]
159pub enum SyncCycleError {
160    Session(super::session::SyncError),
161    Pull(pull::PullError),
162    AssetScan(String),
163    AssetUpload(String),
164}
165
166impl std::fmt::Display for SyncCycleError {
167    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168        match self {
169            SyncCycleError::Session(e) => write!(f, "session error: {e}"),
170            SyncCycleError::Pull(e) => write!(f, "pull error: {e}"),
171            SyncCycleError::AssetScan(e) => write!(f, "asset scan error: {e}"),
172            SyncCycleError::AssetUpload(e) => write!(f, "asset upload error: {e}"),
173        }
174    }
175}
176
177impl std::error::Error for SyncCycleError {}