coven/sync/
session_ext.rs

1/// Safe-ish wrappers around the SQLite session extension FFI.
2///
3/// These operate on raw `*mut sqlite3` pointers so they can be used with
4/// sqlx's `LockedSqliteHandle::as_raw_handle()`.
5use std::ffi::{c_char, c_int, c_void, CStr, CString};
6use std::ptr;
7
8use libsqlite3_sys as ffi;
9
10/// A recorded binary changeset from a session.
11pub struct Changeset {
12    buf: *mut c_void,
13    len: c_int,
14}
15
16impl Changeset {
17    /// Create a `Changeset` from owned bytes. The bytes are copied into
18    /// sqlite3-managed memory so `sqlite3_free` works correctly on drop.
19    pub fn from_bytes(bytes: &[u8]) -> Self {
20        if bytes.is_empty() {
21            return Changeset {
22                buf: ptr::null_mut(),
23                len: 0,
24            };
25        }
26        unsafe {
27            let buf = ffi::sqlite3_malloc(bytes.len() as c_int);
28            assert!(!buf.is_null(), "sqlite3_malloc failed");
29            ptr::copy_nonoverlapping(bytes.as_ptr(), buf as *mut u8, bytes.len());
30            Changeset {
31                buf,
32                len: bytes.len() as c_int,
33            }
34        }
35    }
36
37    pub fn as_bytes(&self) -> &[u8] {
38        if self.buf.is_null() || self.len == 0 {
39            return &[];
40        }
41        unsafe { std::slice::from_raw_parts(self.buf as *const u8, self.len as usize) }
42    }
43
44    pub fn len(&self) -> usize {
45        self.len as usize
46    }
47
48    pub fn is_empty(&self) -> bool {
49        self.len == 0
50    }
51}
52
53impl Drop for Changeset {
54    fn drop(&mut self) {
55        if !self.buf.is_null() {
56            unsafe { ffi::sqlite3_free(self.buf) };
57        }
58    }
59}
60
61// SAFETY: The changeset buffer is just heap-allocated memory extracted from
62// sqlite3session_changeset. It has no thread affinity after extraction.
63unsafe impl Send for Changeset {}
64
65/// Action a conflict handler can return.
66#[repr(i32)]
67pub enum ConflictAction {
68    Omit = ffi::SQLITE_CHANGESET_OMIT,
69    Replace = ffi::SQLITE_CHANGESET_REPLACE,
70    Abort = ffi::SQLITE_CHANGESET_ABORT,
71}
72
73/// The type of conflict reported to the conflict handler.
74#[repr(i32)]
75#[derive(Debug, Clone, Copy, PartialEq, Eq)]
76pub enum ConflictType {
77    Data = ffi::SQLITE_CHANGESET_DATA,
78    NotFound = ffi::SQLITE_CHANGESET_NOTFOUND,
79    Conflict = ffi::SQLITE_CHANGESET_CONFLICT,
80    Constraint = ffi::SQLITE_CHANGESET_CONSTRAINT,
81    ForeignKey = ffi::SQLITE_CHANGESET_FOREIGN_KEY,
82}
83
84impl ConflictType {
85    fn from_raw(val: c_int) -> Self {
86        match val {
87            ffi::SQLITE_CHANGESET_DATA => ConflictType::Data,
88            ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::NotFound,
89            ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::Conflict,
90            ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::Constraint,
91            ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::ForeignKey,
92            other => panic!("unknown conflict type: {other}"),
93        }
94    }
95}
96
97/// Context available to a conflict handler during changeset application.
98///
99/// Wraps the raw `sqlite3_changeset_iter` to provide safe access to the
100/// table name, column count, and column values involved in a conflict.
101pub struct ConflictContext {
102    iter: *mut ffi::sqlite3_changeset_iter,
103}
104
105impl ConflictContext {
106    /// The table name for this conflict's operation.
107    pub fn table_name(&self) -> &str {
108        unsafe {
109            let mut table: *const c_char = ptr::null();
110            let mut ncol: c_int = 0;
111            let mut op: c_int = 0;
112            let mut indirect: c_int = 0;
113            ffi::sqlite3changeset_op(self.iter, &mut table, &mut ncol, &mut op, &mut indirect);
114            CStr::from_ptr(table)
115                .to_str()
116                .expect("SQLite table names are always UTF-8")
117        }
118    }
119
120    /// The number of columns in this table.
121    pub fn column_count(&self) -> usize {
122        unsafe {
123            let mut table: *const c_char = ptr::null();
124            let mut ncol: c_int = 0;
125            let mut op: c_int = 0;
126            let mut indirect: c_int = 0;
127            ffi::sqlite3changeset_op(self.iter, &mut table, &mut ncol, &mut op, &mut indirect);
128            ncol as usize
129        }
130    }
131
132    /// Get the "new" value for a column (the incoming value from the changeset).
133    /// Available for DATA and CONSTRAINT conflicts. Returns None if the column
134    /// was not changed or the value is NULL.
135    pub fn new_value(&self, col: usize) -> Option<String> {
136        unsafe {
137            let mut val: *mut ffi::sqlite3_value = ptr::null_mut();
138            let rc = ffi::sqlite3changeset_new(self.iter, col as c_int, &mut val);
139            if rc != ffi::SQLITE_OK as c_int || val.is_null() {
140                return None;
141            }
142            value_to_string(val)
143        }
144    }
145
146    /// Get the "conflict" value for a column (the current local value).
147    /// Available for DATA and CONFLICT conflicts.
148    pub fn conflict_value(&self, col: usize) -> Option<String> {
149        unsafe {
150            let mut val: *mut ffi::sqlite3_value = ptr::null_mut();
151            let rc = ffi::sqlite3changeset_conflict(self.iter, col as c_int, &mut val);
152            if rc != ffi::SQLITE_OK as c_int || val.is_null() {
153                return None;
154            }
155            value_to_string(val)
156        }
157    }
158
159    /// Get the "old" value for a column (the value expected by the changeset).
160    /// Available for DATA and NOTFOUND conflicts.
161    pub fn old_value(&self, col: usize) -> Option<String> {
162        unsafe {
163            let mut val: *mut ffi::sqlite3_value = ptr::null_mut();
164            let rc = ffi::sqlite3changeset_old(self.iter, col as c_int, &mut val);
165            if rc != ffi::SQLITE_OK as c_int || val.is_null() {
166                return None;
167            }
168            value_to_string(val)
169        }
170    }
171}
172
173/// Extract a text string from a sqlite3_value, or None if NULL.
174pub(crate) unsafe fn value_to_string(val: *mut ffi::sqlite3_value) -> Option<String> {
175    let vtype = ffi::sqlite3_value_type(val);
176    if vtype == ffi::SQLITE_NULL as c_int {
177        return None;
178    }
179    let text = ffi::sqlite3_value_text(val);
180    if text.is_null() {
181        return None;
182    }
183    Some(
184        CStr::from_ptr(text as *const c_char)
185            .to_string_lossy()
186            .into_owned(),
187    )
188}
189
190/// A session that tracks changes to a database.
191///
192/// Wraps `sqlite3_session*`. Must be created and used on the same connection,
193/// and must be deleted before the connection is closed.
194pub struct Session {
195    raw: *mut ffi::sqlite3_session,
196}
197
198// SAFETY: The sqlite3_session is a heap-allocated C structure with no thread
199// affinity. It's safe to move between threads as long as it's only accessed
200// from one thread at a time (which is guaranteed by the sync loop's sequential
201// access pattern and the write connection's Mutex).
202unsafe impl Send for Session {}
203
204impl Session {
205    /// Create a new session on the given database connection, tracking
206    /// the "main" database.
207    ///
208    /// # Safety
209    /// `db` must be a valid, open sqlite3 connection pointer.
210    pub unsafe fn new(db: *mut ffi::sqlite3) -> Result<Self, i32> {
211        let db_name = CString::new("main").unwrap();
212        let mut raw: *mut ffi::sqlite3_session = ptr::null_mut();
213        let rc = ffi::sqlite3session_create(db, db_name.as_ptr(), &mut raw);
214        if rc != ffi::SQLITE_OK as c_int {
215            return Err(rc);
216        }
217        Ok(Session { raw })
218    }
219
220    /// Attach a specific table to the session, or all tables if `table` is None.
221    pub fn attach(&self, table: Option<&str>) -> Result<(), i32> {
222        let c_table: Option<CString> = table.map(|t| CString::new(t).unwrap());
223        let ptr: *const c_char = c_table.as_ref().map(|c| c.as_ptr()).unwrap_or(ptr::null());
224        let rc = unsafe { ffi::sqlite3session_attach(self.raw, ptr) };
225        if rc != ffi::SQLITE_OK as c_int {
226            return Err(rc);
227        }
228        Ok(())
229    }
230
231    /// Extract the binary changeset recorded by this session.
232    pub fn changeset(&self) -> Result<Changeset, i32> {
233        let mut len: c_int = 0;
234        let mut buf: *mut c_void = ptr::null_mut();
235        let rc = unsafe { ffi::sqlite3session_changeset(self.raw, &mut len, &mut buf) };
236        if rc != ffi::SQLITE_OK as c_int {
237            return Err(rc);
238        }
239        Ok(Changeset { buf, len })
240    }
241}
242
243impl Drop for Session {
244    fn drop(&mut self) {
245        unsafe { ffi::sqlite3session_delete(self.raw) };
246    }
247}
248
249/// Apply a changeset to a database connection with a simple conflict handler.
250///
251/// The handler receives only the conflict type (no column value access).
252/// For production use, prefer `apply_changeset_with_context`.
253///
254/// # Safety
255/// `db` must be a valid, open sqlite3 connection pointer.
256pub unsafe fn apply_changeset<F>(
257    db: *mut ffi::sqlite3,
258    changeset: &Changeset,
259    mut conflict_handler: F,
260) -> Result<(), i32>
261where
262    F: FnMut(ConflictType) -> ConflictAction,
263{
264    apply_changeset_with_context(db, changeset, |ct, _ctx| conflict_handler(ct))
265}
266
267/// Apply a changeset to a database connection with full conflict context.
268///
269/// The handler receives the conflict type AND a `ConflictContext` that provides
270/// access to the table name, column count, and column values (new, old, conflict).
271///
272/// # Safety
273/// `db` must be a valid, open sqlite3 connection pointer.
274pub unsafe fn apply_changeset_with_context<F>(
275    db: *mut ffi::sqlite3,
276    changeset: &Changeset,
277    mut conflict_handler: F,
278) -> Result<(), i32>
279where
280    F: FnMut(ConflictType, &ConflictContext) -> ConflictAction,
281{
282    unsafe extern "C" fn filter_cb(_ctx: *mut c_void, _table: *const c_char) -> c_int {
283        // Accept all tables
284        1
285    }
286
287    unsafe extern "C" fn conflict_cb<F>(
288        ctx: *mut c_void,
289        conflict_type: c_int,
290        iter: *mut ffi::sqlite3_changeset_iter,
291    ) -> c_int
292    where
293        F: FnMut(ConflictType, &ConflictContext) -> ConflictAction,
294    {
295        let handler = &mut *(ctx as *mut F);
296        let ct = ConflictType::from_raw(conflict_type);
297        let context = ConflictContext { iter };
298        handler(ct, &context) as c_int
299    }
300
301    let rc = ffi::sqlite3changeset_apply(
302        db,
303        changeset.len,
304        changeset.buf,
305        Some(filter_cb),
306        Some(conflict_cb::<F>),
307        &mut conflict_handler as *mut F as *mut c_void,
308    );
309
310    if rc != ffi::SQLITE_OK as c_int {
311        return Err(rc);
312    }
313    Ok(())
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319    use crate::sync::test_helpers::*;
320
321    #[test]
322    fn test_basic_changeset_capture() {
323        unsafe {
324            let db = open_memory_db();
325            exec(db, "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)");
326
327            let session = Session::new(db).expect("session create");
328            session.attach(Some("items")).expect("attach");
329
330            exec(db, "INSERT INTO items VALUES (1, 'hello')");
331
332            let cs = session.changeset().expect("changeset");
333            assert!(!cs.is_empty(), "changeset should not be empty");
334
335            drop(session);
336            ffi::sqlite3_close(db);
337        }
338    }
339
340    #[test]
341    fn test_changeset_application() {
342        unsafe {
343            // DB1: record changes
344            let db1 = open_memory_db();
345            exec(
346                db1,
347                "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)",
348            );
349
350            let session = Session::new(db1).expect("session create");
351            session.attach(Some("items")).expect("attach");
352
353            exec(db1, "INSERT INTO items VALUES (1, 'alpha')");
354            exec(db1, "INSERT INTO items VALUES (2, 'beta')");
355
356            let cs = session.changeset().expect("changeset");
357            drop(session);
358
359            // DB2: apply changeset
360            let db2 = open_memory_db();
361            exec(
362                db2,
363                "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT)",
364            );
365
366            apply_changeset(db2, &cs, |_conflict_type| ConflictAction::Abort)
367                .expect("apply changeset");
368
369            let count = query_int(db2, "SELECT COUNT(*) FROM items");
370            assert_eq!(count, 2, "DB2 should have 2 rows");
371
372            let name = query_text(db2, "SELECT name FROM items WHERE id = 1");
373            assert_eq!(name, "alpha");
374
375            let name = query_text(db2, "SELECT name FROM items WHERE id = 2");
376            assert_eq!(name, "beta");
377
378            ffi::sqlite3_close(db1);
379            ffi::sqlite3_close(db2);
380        }
381    }
382
383    #[test]
384    fn test_conflict_handler() {
385        unsafe {
386            // DB1: insert a row then update it (session captures the update)
387            let db1 = open_memory_db();
388            exec(
389                db1,
390                "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, updated_at TEXT)",
391            );
392            exec(
393                db1,
394                "INSERT INTO items VALUES (1, 'original', '2026-01-01T00:00:00Z')",
395            );
396
397            let session = Session::new(db1).expect("session create");
398            session.attach(Some("items")).expect("attach");
399
400            // Update the row -- session captures this as a change
401            exec(
402                db1,
403                "UPDATE items SET name = 'from_db1', updated_at = '2026-01-03T00:00:00Z' WHERE id = 1",
404            );
405
406            let cs = session.changeset().expect("changeset");
407            assert!(!cs.is_empty());
408            drop(session);
409
410            // DB2: has the same row but with a different updated_at
411            let db2 = open_memory_db();
412            exec(
413                db2,
414                "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, updated_at TEXT)",
415            );
416            exec(
417                db2,
418                "INSERT INTO items VALUES (1, 'from_db2', '2026-01-02T00:00:00Z')",
419            );
420
421            // Apply changeset to DB2 -- this should trigger a DATA conflict
422            // because the row exists with different values than the changeset's
423            // "old" values.
424            let mut conflict_called = false;
425            let mut conflict_type_seen = None;
426
427            apply_changeset(db2, &cs, |ct| {
428                conflict_called = true;
429                conflict_type_seen = Some(ct);
430                // REPLACE: let the incoming changeset win
431                ConflictAction::Replace
432            })
433            .expect("apply changeset");
434
435            assert!(conflict_called, "conflict handler should have been called");
436            assert_eq!(
437                conflict_type_seen,
438                Some(ConflictType::Data),
439                "should be a DATA conflict"
440            );
441
442            // With REPLACE, the incoming changeset (db1's update) should win
443            let name = query_text(db2, "SELECT name FROM items WHERE id = 1");
444            assert_eq!(name, "from_db1", "incoming changeset should win");
445
446            let updated = query_text(db2, "SELECT updated_at FROM items WHERE id = 1");
447            assert_eq!(updated, "2026-01-03T00:00:00Z");
448
449            ffi::sqlite3_close(db1);
450            ffi::sqlite3_close(db2);
451        }
452    }
453
454    #[test]
455    fn test_conflict_handler_omit() {
456        unsafe {
457            // Same setup as above but the conflict handler returns OMIT (local wins)
458            let db1 = open_memory_db();
459            exec(
460                db1,
461                "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, updated_at TEXT)",
462            );
463            exec(
464                db1,
465                "INSERT INTO items VALUES (1, 'original', '2026-01-01T00:00:00Z')",
466            );
467
468            let session = Session::new(db1).expect("session create");
469            session.attach(Some("items")).expect("attach");
470            exec(
471                db1,
472                "UPDATE items SET name = 'from_db1', updated_at = '2026-01-02T00:00:00Z' WHERE id = 1",
473            );
474
475            let cs = session.changeset().expect("changeset");
476            drop(session);
477
478            let db2 = open_memory_db();
479            exec(
480                db2,
481                "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, updated_at TEXT)",
482            );
483            exec(
484                db2,
485                "INSERT INTO items VALUES (1, 'from_db2', '2026-01-05T00:00:00Z')",
486            );
487
488            apply_changeset(db2, &cs, |_ct| {
489                // OMIT: keep local version
490                ConflictAction::Omit
491            })
492            .expect("apply changeset");
493
494            // Local (db2) should still have its own values
495            let name = query_text(db2, "SELECT name FROM items WHERE id = 1");
496            assert_eq!(name, "from_db2", "local data should be preserved with OMIT");
497
498            ffi::sqlite3_close(db1);
499            ffi::sqlite3_close(db2);
500        }
501    }
502
503    #[test]
504    fn test_attach_all_tables() {
505        unsafe {
506            let db = open_memory_db();
507            exec(db, "CREATE TABLE t1 (id INTEGER PRIMARY KEY, val TEXT)");
508            exec(db, "CREATE TABLE t2 (id INTEGER PRIMARY KEY, val TEXT)");
509
510            let session = Session::new(db).expect("session create");
511            // Attach all tables by passing None
512            session.attach(None).expect("attach all");
513
514            exec(db, "INSERT INTO t1 VALUES (1, 'a')");
515            exec(db, "INSERT INTO t2 VALUES (1, 'b')");
516
517            let cs = session.changeset().expect("changeset");
518            assert!(
519                !cs.is_empty(),
520                "changeset should capture changes from both tables"
521            );
522
523            // Apply to a second DB
524            let db2 = open_memory_db();
525            exec(db2, "CREATE TABLE t1 (id INTEGER PRIMARY KEY, val TEXT)");
526            exec(db2, "CREATE TABLE t2 (id INTEGER PRIMARY KEY, val TEXT)");
527
528            apply_changeset(db2, &cs, |_| ConflictAction::Abort).expect("apply");
529
530            let v1 = query_text(db2, "SELECT val FROM t1 WHERE id = 1");
531            assert_eq!(v1, "a");
532            let v2 = query_text(db2, "SELECT val FROM t2 WHERE id = 1");
533            assert_eq!(v2, "b");
534
535            drop(session);
536            ffi::sqlite3_close(db);
537            ffi::sqlite3_close(db2);
538        }
539    }
540
541    #[test]
542    fn test_conflict_context_reads_values() {
543        unsafe {
544            let db1 = open_memory_db();
545            exec(
546                db1,
547                "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, _updated_at TEXT)",
548            );
549            exec(
550                db1,
551                "INSERT INTO items VALUES (1, 'orig', '2026-01-01T00:00:00Z')",
552            );
553
554            let session = Session::new(db1).expect("session create");
555            session.attach(Some("items")).expect("attach");
556            exec(
557                db1,
558                "UPDATE items SET name = 'updated', _updated_at = '2026-01-03T00:00:00Z' WHERE id = 1",
559            );
560            let cs = session.changeset().expect("changeset");
561            drop(session);
562
563            let db2 = open_memory_db();
564            exec(
565                db2,
566                "CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT, _updated_at TEXT)",
567            );
568            exec(
569                db2,
570                "INSERT INTO items VALUES (1, 'local', '2026-01-02T00:00:00Z')",
571            );
572
573            let mut seen_table = String::new();
574            let mut seen_new_ts = String::new();
575            let mut seen_conflict_ts = String::new();
576
577            apply_changeset_with_context(db2, &cs, |_ct, ctx| {
578                seen_table = ctx.table_name().to_string();
579                // _updated_at is column index 2
580                seen_new_ts = ctx.new_value(2).unwrap_or_default();
581                seen_conflict_ts = ctx.conflict_value(2).unwrap_or_default();
582                ConflictAction::Replace
583            })
584            .expect("apply");
585
586            assert_eq!(seen_table, "items");
587            assert_eq!(seen_new_ts, "2026-01-03T00:00:00Z");
588            assert_eq!(seen_conflict_ts, "2026-01-02T00:00:00Z");
589
590            ffi::sqlite3_close(db1);
591            ffi::sqlite3_close(db2);
592        }
593    }
594}