coven/sync/
pull.rs

1/// Pull changesets from the sync storage and apply them to the local database.
2///
3/// Protocol:
4/// 1. List heads from the sync storage (one S3 LIST call).
5/// 2. Compare each device's seq to our local `sync_cursors` table.
6/// 3. For each device that's ahead of our cursor, fetch new changesets.
7/// 4. Unpack envelope, check schema_version, apply with LWW.
8/// 5. Update sync_cursors for that device.
9///
10/// After all changesets are applied, any that had FK constraint violations
11/// are retried once -- the parent rows should now exist from other devices'
12/// changesets applied in the same batch.
13use std::collections::HashMap;
14
15use tracing::{debug, info, warn};
16
17use super::apply::apply_changeset_lww;
18use super::envelope::{self, verify_changeset_signature};
19use super::membership::MembershipChain;
20use super::push::SCHEMA_VERSION;
21use super::session_ext::Changeset;
22use super::storage::{DeviceHead, SyncStorage};
23use crate::blob::BlobPlan;
24use crate::changeset::RowChange;
25use crate::library_dir::LibraryDir;
26
27/// Cursor value meaning "we have applied no changesets from this device".
28/// Per the sync protocol, device sequence numbers start at 1 (the first
29/// changeset a device produces is `local_seq + 1` where `local_seq` is
30/// initially 0), so a cursor of 0 selects every changeset that device has
31/// ever produced. A missing entry in the `sync_cursors` table is equivalent
32/// to this initial value — the device is simply one we've never pulled from.
33const INITIAL_CURSOR: u64 = 0;
34
35/// Look up our applied seq for a remote device, returning the protocol's
36/// initial cursor (0) and logging when we encounter the device for the first
37/// time. The log line is the visible trace that distinguishes "never seen"
38/// from "seen and at seq 0" (which is impossible — device seqs start at 1).
39fn cursor_for_device(cursors: &HashMap<String, u64>, device_id: &str) -> u64 {
40    match cursors.get(device_id) {
41        Some(seq) => *seq,
42        None => {
43            debug!(%device_id, "no cursor for device, starting from initial");
44            INITIAL_CURSOR
45        }
46    }
47}
48
49/// Summary of a pull operation.
50#[derive(Debug)]
51pub struct PullResult {
52    /// Total changesets successfully applied.
53    pub changesets_applied: u64,
54    /// Number of distinct remote devices we pulled from.
55    pub devices_pulled: u64,
56    /// Asset downloads failed — cursor not advanced, will retry next cycle.
57    pub asset_downloads_failed: bool,
58    /// Changesets skipped due to schema version being newer than ours.
59    pub skipped_schema: u64,
60    /// All device heads fetched during this pull (including our own).
61    /// Used by the sync status UI to show other devices' activity.
62    pub remote_heads: Vec<DeviceHead>,
63    /// Row changes from all applied changesets, for the host to map to domain
64    /// events. Empty if nothing was applied.
65    pub row_changes: Vec<RowChange>,
66}
67
68/// A changeset that had FK violations on first apply and needs retry.
69struct DeferredChangeset {
70    device_id: String,
71    seq: u64,
72    changeset: Changeset,
73}
74
75/// Pull and apply all new changesets from the sync storage.
76///
77/// `db` is a raw sqlite3 connection pointer. The caller MUST ensure no
78/// SyncSession is active -- the protocol requires ending the session before
79/// pulling to avoid contaminating the next outgoing changeset.
80///
81/// `cursors` maps device_id -> last_seq we've applied from that device.
82///
83/// Returns the updated cursors map and a summary of what was applied.
84///
85/// # Safety
86/// `db` must be a valid, open sqlite3 connection pointer.
87pub async unsafe fn pull_changes(
88    db: *mut libsqlite3_sys::sqlite3,
89    storage: &dyn SyncStorage,
90    our_device_id: &str,
91    cursors: &HashMap<String, u64>,
92    library_dir: &LibraryDir,
93    blob_plan: &dyn BlobPlan,
94) -> Result<(HashMap<String, u64>, PullResult), PullError> {
95    let _ = library_dir;
96    // Check min_schema_version before processing any changesets.
97    // If the storage has a minimum that's higher than ours, refuse to sync.
98    if let Some(min_version) = storage
99        .get_min_schema_version()
100        .await
101        .map_err(PullError::Storage)?
102    {
103        if SCHEMA_VERSION < min_version {
104            return Err(PullError::SchemaVersionTooOld {
105                local_version: SCHEMA_VERSION,
106                min_version,
107            });
108        }
109    }
110
111    let heads = storage.list_heads().await.map_err(PullError::Storage)?;
112
113    // Load the current membership chain (if any) to validate changeset
114    // authorship. A solo library has no chain, so this stays None and the
115    // validation below is skipped.
116    let membership_chain: Option<MembershipChain> = match storage.list_membership_entries().await {
117        Ok(entries) if !entries.is_empty() => {
118            match super::membership_ops::download_chain(storage, &entries).await {
119                Ok(chain) => Some(chain),
120                Err(e) => {
121                    warn!("failed to load membership chain for validation: {e}");
122                    None
123                }
124            }
125        }
126        Ok(_) => None,
127        Err(e) => {
128            warn!("failed to list membership entries for validation: {e}");
129            None
130        }
131    };
132
133    let mut updated_cursors = cursors.clone();
134    let mut result = PullResult {
135        changesets_applied: 0,
136        devices_pulled: 0,
137        asset_downloads_failed: false,
138        skipped_schema: 0,
139        remote_heads: heads.clone(),
140        row_changes: Vec::new(),
141    };
142    let mut deferred: Vec<DeferredChangeset> = Vec::new();
143
144    for head in &heads {
145        // Skip our own device.
146        if head.device_id == our_device_id {
147            continue;
148        }
149
150        let local_seq = cursor_for_device(cursors, &head.device_id);
151        if head.seq <= local_seq {
152            continue;
153        }
154
155        info!(
156            device_id = %head.device_id,
157            local_seq,
158            remote_seq = head.seq,
159            "pulling changesets"
160        );
161
162        let mut pulled_any = false;
163
164        for seq in (local_seq + 1)..=head.seq {
165            // The storage client returns already-decrypted bytes per its trait
166            // contract. Implementations handle download + decryption internally.
167            let envelope_bytes = match storage.get_changeset(&head.device_id, seq).await {
168                Ok(data) => data,
169                Err(e) => {
170                    warn!(
171                        device_id = %head.device_id,
172                        seq,
173                        error = %e,
174                        "failed to fetch changeset, stopping pull for this device"
175                    );
176                    break;
177                }
178            };
179
180            let (env, changeset_bytes) =
181                envelope::unpack(&envelope_bytes).map_err(PullError::InvalidEnvelope)?;
182
183            // Validate that changeset_size in the envelope matches the actual
184            // bytes. A mismatch indicates corruption or a buggy encoder.
185            if env.changeset_size != changeset_bytes.len() {
186                warn!(
187                    device_id = %head.device_id,
188                    seq,
189                    expected = env.changeset_size,
190                    actual = changeset_bytes.len(),
191                    "changeset_size mismatch in envelope"
192                );
193            }
194
195            // Schema version check: skip changesets from a newer schema.
196            if env.schema_version > SCHEMA_VERSION {
197                warn!(
198                    device_id = %head.device_id,
199                    seq,
200                    remote_version = env.schema_version,
201                    local_version = SCHEMA_VERSION,
202                    "skipping changeset with newer schema version"
203                );
204                result.skipped_schema += 1;
205                // Advance cursor past this seq so we don't re-fetch it.
206                // When we upgrade, a new snapshot will reconcile.
207                updated_cursors.insert(head.device_id.clone(), seq);
208                continue;
209            }
210
211            // Signature check: reject changesets with invalid signatures.
212            if !verify_changeset_signature(&env, &changeset_bytes) {
213                warn!(
214                    device_id = %head.device_id,
215                    seq,
216                    "changeset has invalid signature, skipping"
217                );
218                updated_cursors.insert(head.device_id.clone(), seq);
219                continue;
220            }
221
222            // Membership validation: in a chain-enabled library every changeset
223            // must be signed (its signature is verified above) by a member who
224            // was authorized at the signature-bound timestamp. Coven always
225            // signs at creation, so an unsigned or non-member changeset here is
226            // forged -- reject it.
227            if let Some(chain) = membership_chain.as_ref() {
228                let authorized = env
229                    .author_pubkey
230                    .as_ref()
231                    .is_some_and(|pk| chain.is_member_at(pk, &env.timestamp));
232                if !authorized {
233                    warn!(
234                        device_id = %head.device_id,
235                        seq,
236                        author = ?env.author_pubkey,
237                        "changeset not signed by a current member, skipping"
238                    );
239                    updated_cursors.insert(head.device_id.clone(), seq);
240                    continue;
241                }
242            }
243
244            if changeset_bytes.is_empty() {
245                updated_cursors.insert(head.device_id.clone(), seq);
246                continue;
247            }
248
249            let cs = Changeset::from_bytes(&changeset_bytes);
250            let apply_result = apply_changeset_lww(db, &cs).map_err(PullError::Apply)?;
251
252            // Walk the applied changeset once: it drives blob downloads and is
253            // surfaced to the host for domain-event mapping.
254            let changes = match crate::changeset::walk(&changeset_bytes) {
255                Ok(c) => c,
256                Err(e) => {
257                    warn!("Failed to walk changeset: {e}");
258                    Vec::new()
259                }
260            };
261
262            // Download any blobs the changeset references. If any download fails,
263            // don't advance the cursor — retry next cycle.
264            let blobs_ok = download_changeset_blobs(&changes, blob_plan, storage).await;
265
266            if apply_result.had_fk_violations {
267                deferred.push(DeferredChangeset {
268                    device_id: head.device_id.clone(),
269                    seq,
270                    changeset: Changeset::from_bytes(&changeset_bytes),
271                });
272            }
273
274            result.changesets_applied += 1;
275            result.row_changes.extend(changes);
276
277            pulled_any = true;
278            if blobs_ok {
279                updated_cursors.insert(head.device_id.clone(), seq);
280            } else {
281                warn!(
282                    "Blob download failed for {}/{}, cursor not advanced",
283                    head.device_id, seq
284                );
285                result.asset_downloads_failed = true;
286            }
287        }
288
289        if pulled_any {
290            result.devices_pulled += 1;
291        }
292    }
293
294    // Retry changesets that had FK constraint violations. After applying all
295    // changesets from all devices, the parent rows should now exist.
296    if !deferred.is_empty() {
297        info!(
298            count = deferred.len(),
299            "retrying changesets with FK violations"
300        );
301
302        for d in &deferred {
303            let retry_result = apply_changeset_lww(db, &d.changeset).map_err(PullError::Apply)?;
304
305            if retry_result.had_fk_violations {
306                warn!(
307                    device_id = %d.device_id,
308                    seq = d.seq,
309                    "changeset still has FK violations after retry, skipping"
310                );
311            }
312        }
313    }
314
315    Ok((updated_cursors, result))
316}
317
318/// Download blobs a changeset references. Returns true if all succeeded.
319/// Skips blobs whose local file already exists. The host's [`BlobPlan`] decides
320/// which row-changes carry blobs, their cloud namespace/scope, and the local
321/// destination path.
322async fn download_changeset_blobs(
323    changes: &[RowChange],
324    blob_plan: &dyn BlobPlan,
325    storage: &dyn SyncStorage,
326) -> bool {
327    let mut all_ok = true;
328    for blob in blob_plan.blobs_to_pull(changes) {
329        if blob.local_path.exists() {
330            continue;
331        }
332
333        match storage
334            .get_blob(&blob.namespace, &blob.id, blob.scope.clone())
335            .await
336        {
337            Ok(bytes) => {
338                if let Some(parent) = blob.local_path.parent() {
339                    if let Err(e) = std::fs::create_dir_all(parent) {
340                        warn!(id = %blob.id, error = %e, "failed to create blob directory");
341                        all_ok = false;
342                        continue;
343                    }
344                }
345
346                if let Err(e) = std::fs::write(&blob.local_path, bytes) {
347                    warn!(id = %blob.id, error = %e, "failed to write blob");
348                    all_ok = false;
349                }
350            }
351            Err(e) => {
352                warn!(id = %blob.id, namespace = %blob.namespace, error = %e, "failed to download blob");
353                all_ok = false;
354            }
355        }
356    }
357    all_ok
358}
359
360#[derive(Debug)]
361pub enum PullError {
362    Storage(super::storage::StorageError),
363    InvalidEnvelope(super::envelope::UnpackError),
364    Apply(super::session::SyncError),
365    /// The sync storage requires a schema version newer than ours.
366    /// The client must upgrade before syncing.
367    SchemaVersionTooOld {
368        local_version: u32,
369        min_version: u32,
370    },
371}
372
373impl std::fmt::Display for PullError {
374    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
375        match self {
376            PullError::Storage(e) => write!(f, "storage error: {e}"),
377            PullError::InvalidEnvelope(e) => write!(f, "invalid changeset envelope: {e}"),
378            PullError::Apply(e) => write!(f, "changeset apply failed: {e}"),
379            PullError::SchemaVersionTooOld {
380                local_version,
381                min_version,
382            } => write!(
383                f,
384                "local schema version {local_version} is below the storage minimum {min_version}, upgrade required"
385            ),
386        }
387    }
388}
389
390impl std::error::Error for PullError {}