coven/sync/session.rs
1/// Production session management for sync.
2///
3/// `SyncSession` wraps the low-level FFI `Session` and attaches the
4/// synced tables. It provides a clean start/changeset/end lifecycle.
5use std::sync::OnceLock;
6
7use super::session_ext::{Changeset, Session};
8
9/// The tables that participate in changeset sync, declared once at startup by
10/// the host via [`set_synced_tables`].
11static SYNCED_TABLES: OnceLock<Vec<String>> = OnceLock::new();
12
13/// Declare the tables that participate in changeset sync. Call once at startup,
14/// before any sync session is created.
15///
16/// Each table must have an `id` text primary key at column 0 and an
17/// `_updated_at TEXT NOT NULL` column (the HLC/LWW timestamp). Tables not listed
18/// here are local-only and never synced.
19pub fn set_synced_tables(tables: &[&str]) {
20 let _ = SYNCED_TABLES.set(tables.iter().map(|t| t.to_string()).collect());
21}
22
23/// The configured synced tables, or empty if [`set_synced_tables`] was never called.
24pub fn synced_tables() -> &'static [String] {
25 SYNCED_TABLES.get().map(Vec::as_slice).unwrap_or(&[])
26}
27
28/// A sync session that tracks changes to all synced tables on a single connection.
29///
30/// Lifecycle:
31/// 1. `SyncSession::start(db)` -- creates and attaches
32/// 2. App writes normally through the connection
33/// 3. `session.changeset()` -- grabs the binary diff (None if no changes)
34/// 4. Session is dropped (or explicitly ended by dropping)
35///
36/// The session must be dropped before applying incoming changesets to avoid
37/// contaminating the next outgoing changeset with other devices' changes.
38pub struct SyncSession {
39 session: Session,
40}
41
42impl SyncSession {
43 /// Create a new sync session on the given raw sqlite3 connection,
44 /// attaching all synced tables.
45 ///
46 /// # Safety
47 /// `db` must be a valid, open sqlite3 connection pointer. The session
48 /// must be dropped before the connection is closed.
49 pub unsafe fn start(db: *mut libsqlite3_sys::sqlite3) -> Result<Self, SyncError> {
50 let session = Session::new(db).map_err(SyncError::SessionCreate)?;
51
52 for table in synced_tables() {
53 session
54 .attach(Some(table.as_str()))
55 .map_err(|rc| SyncError::SessionAttach(table.clone(), rc))?;
56 }
57
58 Ok(SyncSession { session })
59 }
60
61 /// Grab the binary changeset of all changes since the session started.
62 /// Returns `None` if no changes were made (avoids pushing empty changesets).
63 pub fn changeset(&self) -> Result<Option<Changeset>, SyncError> {
64 let cs = self
65 .session
66 .changeset()
67 .map_err(SyncError::ChangesetExtract)?;
68
69 if cs.is_empty() {
70 Ok(None)
71 } else {
72 Ok(Some(cs))
73 }
74 }
75}
76
77#[derive(Debug)]
78pub enum SyncError {
79 /// Failed to create a session (sqlite3 error code).
80 SessionCreate(i32),
81 /// Failed to attach a table (table name, sqlite3 error code).
82 SessionAttach(String, i32),
83 /// Failed to extract a changeset (sqlite3 error code).
84 ChangesetExtract(i32),
85 /// Failed to apply a changeset (sqlite3 error code).
86 ChangesetApply(i32),
87}
88
89impl std::fmt::Display for SyncError {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 SyncError::SessionCreate(rc) => write!(f, "session create failed (rc={rc})"),
93 SyncError::SessionAttach(table, rc) => {
94 write!(f, "session attach failed for {table} (rc={rc})")
95 }
96 SyncError::ChangesetExtract(rc) => write!(f, "changeset extract failed (rc={rc})"),
97 SyncError::ChangesetApply(rc) => write!(f, "changeset apply failed (rc={rc})"),
98 }
99 }
100}
101
102impl std::error::Error for SyncError {}