coven/sync/
session.rs

1/// Production session management for sync.
2///
3/// `SyncSession` wraps the low-level FFI `Session` and attaches the
4/// synced tables. It provides a clean start/changeset/end lifecycle.
5use std::sync::OnceLock;
6
7use super::session_ext::{Changeset, Session};
8
9/// The tables that participate in changeset sync, declared once at startup by
10/// the host via [`set_synced_tables`].
11static SYNCED_TABLES: OnceLock<Vec<String>> = OnceLock::new();
12
13/// Declare the tables that participate in changeset sync. Call once at startup,
14/// before any sync session is created.
15///
16/// Each table must have an `id` text primary key at column 0 and an
17/// `_updated_at TEXT NOT NULL` column (the HLC/LWW timestamp). Tables not listed
18/// here are local-only and never synced.
19pub fn set_synced_tables(tables: &[&str]) {
20    let _ = SYNCED_TABLES.set(tables.iter().map(|t| t.to_string()).collect());
21}
22
23/// The configured synced tables, or empty if [`set_synced_tables`] was never called.
24pub fn synced_tables() -> &'static [String] {
25    SYNCED_TABLES.get().map(Vec::as_slice).unwrap_or(&[])
26}
27
28/// A sync session that tracks changes to all synced tables on a single connection.
29///
30/// Lifecycle:
31/// 1. `SyncSession::start(db)` -- creates and attaches
32/// 2. App writes normally through the connection
33/// 3. `session.changeset()` -- grabs the binary diff (None if no changes)
34/// 4. Session is dropped (or explicitly ended by dropping)
35///
36/// The session must be dropped before applying incoming changesets to avoid
37/// contaminating the next outgoing changeset with other devices' changes.
38pub struct SyncSession {
39    session: Session,
40}
41
42impl SyncSession {
43    /// Create a new sync session on the given raw sqlite3 connection,
44    /// attaching all synced tables.
45    ///
46    /// # Safety
47    /// `db` must be a valid, open sqlite3 connection pointer. The session
48    /// must be dropped before the connection is closed.
49    pub unsafe fn start(db: *mut libsqlite3_sys::sqlite3) -> Result<Self, SyncError> {
50        let session = Session::new(db).map_err(SyncError::SessionCreate)?;
51
52        for table in synced_tables() {
53            session
54                .attach(Some(table.as_str()))
55                .map_err(|rc| SyncError::SessionAttach(table.clone(), rc))?;
56        }
57
58        Ok(SyncSession { session })
59    }
60
61    /// Grab the binary changeset of all changes since the session started.
62    /// Returns `None` if no changes were made (avoids pushing empty changesets).
63    pub fn changeset(&self) -> Result<Option<Changeset>, SyncError> {
64        let cs = self
65            .session
66            .changeset()
67            .map_err(SyncError::ChangesetExtract)?;
68
69        if cs.is_empty() {
70            Ok(None)
71        } else {
72            Ok(Some(cs))
73        }
74    }
75}
76
77#[derive(Debug)]
78pub enum SyncError {
79    /// Failed to create a session (sqlite3 error code).
80    SessionCreate(i32),
81    /// Failed to attach a table (table name, sqlite3 error code).
82    SessionAttach(String, i32),
83    /// Failed to extract a changeset (sqlite3 error code).
84    ChangesetExtract(i32),
85    /// Failed to apply a changeset (sqlite3 error code).
86    ChangesetApply(i32),
87}
88
89impl std::fmt::Display for SyncError {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            SyncError::SessionCreate(rc) => write!(f, "session create failed (rc={rc})"),
93            SyncError::SessionAttach(table, rc) => {
94                write!(f, "session attach failed for {table} (rc={rc})")
95            }
96            SyncError::ChangesetExtract(rc) => write!(f, "changeset extract failed (rc={rc})"),
97            SyncError::ChangesetApply(rc) => write!(f, "changeset apply failed (rc={rc})"),
98        }
99    }
100}
101
102impl std::error::Error for SyncError {}