| //! [Session Extension](https://sqlite.org/sessionintro.html) |
| #![allow(non_camel_case_types)] |
| |
| use std::ffi::CStr; |
| use std::io::{Read, Write}; |
| use std::marker::PhantomData; |
| use std::os::raw::{c_char, c_int, c_uchar, c_void}; |
| use std::panic::{catch_unwind, RefUnwindSafe}; |
| use std::ptr; |
| use std::slice::{from_raw_parts, from_raw_parts_mut}; |
| |
| use fallible_streaming_iterator::FallibleStreamingIterator; |
| |
| use crate::error::{check, error_from_sqlite_code}; |
| use crate::ffi; |
| use crate::hooks::Action; |
| use crate::types::ValueRef; |
| use crate::{errmsg_to_string, str_to_cstring, Connection, DatabaseName, Result}; |
| |
| // https://sqlite.org/session.html |
| |
| /// An instance of this object is a session that can be |
| /// used to record changes to a database. |
| pub struct Session<'conn> { |
| phantom: PhantomData<&'conn Connection>, |
| s: *mut ffi::sqlite3_session, |
| filter: Option<Box<dyn Fn(&str) -> bool>>, |
| } |
| |
| impl Session<'_> { |
| /// Create a new session object |
| #[inline] |
| pub fn new(db: &Connection) -> Result<Session<'_>> { |
| Session::new_with_name(db, DatabaseName::Main) |
| } |
| |
| /// Create a new session object |
| #[inline] |
| pub fn new_with_name<'conn>( |
| db: &'conn Connection, |
| name: DatabaseName<'_>, |
| ) -> Result<Session<'conn>> { |
| let name = name.as_cstring()?; |
| |
| let db = db.db.borrow_mut().db; |
| |
| let mut s: *mut ffi::sqlite3_session = ptr::null_mut(); |
| check(unsafe { ffi::sqlite3session_create(db, name.as_ptr(), &mut s) })?; |
| |
| Ok(Session { |
| phantom: PhantomData, |
| s, |
| filter: None, |
| }) |
| } |
| |
| /// Set a table filter |
| pub fn table_filter<F>(&mut self, filter: Option<F>) |
| where |
| F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, |
| { |
| unsafe extern "C" fn call_boxed_closure<F>( |
| p_arg: *mut c_void, |
| tbl_str: *const c_char, |
| ) -> c_int |
| where |
| F: Fn(&str) -> bool + RefUnwindSafe, |
| { |
| use std::str; |
| |
| let boxed_filter: *mut F = p_arg as *mut F; |
| let tbl_name = { |
| let c_slice = CStr::from_ptr(tbl_str).to_bytes(); |
| str::from_utf8(c_slice) |
| }; |
| if let Ok(true) = |
| catch_unwind(|| (*boxed_filter)(tbl_name.expect("non-utf8 table name"))) |
| { |
| 1 |
| } else { |
| 0 |
| } |
| } |
| |
| match filter { |
| Some(filter) => { |
| let boxed_filter = Box::new(filter); |
| unsafe { |
| ffi::sqlite3session_table_filter( |
| self.s, |
| Some(call_boxed_closure::<F>), |
| &*boxed_filter as *const F as *mut _, |
| ); |
| } |
| self.filter = Some(boxed_filter); |
| } |
| _ => { |
| unsafe { ffi::sqlite3session_table_filter(self.s, None, ptr::null_mut()) } |
| self.filter = None; |
| } |
| }; |
| } |
| |
| /// Attach a table. `None` means all tables. |
| pub fn attach(&mut self, table: Option<&str>) -> Result<()> { |
| let table = if let Some(table) = table { |
| Some(str_to_cstring(table)?) |
| } else { |
| None |
| }; |
| let table = table.as_ref().map(|s| s.as_ptr()).unwrap_or(ptr::null()); |
| check(unsafe { ffi::sqlite3session_attach(self.s, table) }) |
| } |
| |
| /// Generate a Changeset |
| pub fn changeset(&mut self) -> Result<Changeset> { |
| let mut n = 0; |
| let mut cs: *mut c_void = ptr::null_mut(); |
| check(unsafe { ffi::sqlite3session_changeset(self.s, &mut n, &mut cs) })?; |
| Ok(Changeset { cs, n }) |
| } |
| |
| /// Write the set of changes represented by this session to `output`. |
| #[inline] |
| pub fn changeset_strm(&mut self, output: &mut dyn Write) -> Result<()> { |
| let output_ref = &output; |
| check(unsafe { |
| ffi::sqlite3session_changeset_strm( |
| self.s, |
| Some(x_output), |
| output_ref as *const &mut dyn Write as *mut c_void, |
| ) |
| }) |
| } |
| |
| /// Generate a Patchset |
| #[inline] |
| pub fn patchset(&mut self) -> Result<Changeset> { |
| let mut n = 0; |
| let mut ps: *mut c_void = ptr::null_mut(); |
| check(unsafe { ffi::sqlite3session_patchset(self.s, &mut n, &mut ps) })?; |
| // TODO Validate: same struct |
| Ok(Changeset { cs: ps, n }) |
| } |
| |
| /// Write the set of patches represented by this session to `output`. |
| #[inline] |
| pub fn patchset_strm(&mut self, output: &mut dyn Write) -> Result<()> { |
| let output_ref = &output; |
| check(unsafe { |
| ffi::sqlite3session_patchset_strm( |
| self.s, |
| Some(x_output), |
| output_ref as *const &mut dyn Write as *mut c_void, |
| ) |
| }) |
| } |
| |
| /// Load the difference between tables. |
| pub fn diff(&mut self, from: DatabaseName<'_>, table: &str) -> Result<()> { |
| let from = from.as_cstring()?; |
| let table = str_to_cstring(table)?; |
| let table = table.as_ptr(); |
| unsafe { |
| let mut errmsg = ptr::null_mut(); |
| let r = |
| ffi::sqlite3session_diff(self.s, from.as_ptr(), table, &mut errmsg as *mut *mut _); |
| if r != ffi::SQLITE_OK { |
| let errmsg: *mut c_char = errmsg; |
| let message = errmsg_to_string(&*errmsg); |
| ffi::sqlite3_free(errmsg as *mut c_void); |
| return Err(error_from_sqlite_code(r, Some(message))); |
| } |
| } |
| Ok(()) |
| } |
| |
| /// Test if a changeset has recorded any changes |
| #[inline] |
| pub fn is_empty(&self) -> bool { |
| unsafe { ffi::sqlite3session_isempty(self.s) != 0 } |
| } |
| |
| /// Query the current state of the session |
| #[inline] |
| pub fn is_enabled(&self) -> bool { |
| unsafe { ffi::sqlite3session_enable(self.s, -1) != 0 } |
| } |
| |
| /// Enable or disable the recording of changes |
| #[inline] |
| pub fn set_enabled(&mut self, enabled: bool) { |
| unsafe { |
| ffi::sqlite3session_enable(self.s, if enabled { 1 } else { 0 }); |
| } |
| } |
| |
| /// Query the current state of the indirect flag |
| #[inline] |
| pub fn is_indirect(&self) -> bool { |
| unsafe { ffi::sqlite3session_indirect(self.s, -1) != 0 } |
| } |
| |
| /// Set or clear the indirect change flag |
| #[inline] |
| pub fn set_indirect(&mut self, indirect: bool) { |
| unsafe { |
| ffi::sqlite3session_indirect(self.s, if indirect { 1 } else { 0 }); |
| } |
| } |
| } |
| |
| impl Drop for Session<'_> { |
| #[inline] |
| fn drop(&mut self) { |
| if self.filter.is_some() { |
| self.table_filter(None::<fn(&str) -> bool>); |
| } |
| unsafe { ffi::sqlite3session_delete(self.s) }; |
| } |
| } |
| |
| /// Invert a changeset |
| #[inline] |
| pub fn invert_strm(input: &mut dyn Read, output: &mut dyn Write) -> Result<()> { |
| let input_ref = &input; |
| let output_ref = &output; |
| check(unsafe { |
| ffi::sqlite3changeset_invert_strm( |
| Some(x_input), |
| input_ref as *const &mut dyn Read as *mut c_void, |
| Some(x_output), |
| output_ref as *const &mut dyn Write as *mut c_void, |
| ) |
| }) |
| } |
| |
| /// Combine two changesets |
| #[inline] |
| pub fn concat_strm( |
| input_a: &mut dyn Read, |
| input_b: &mut dyn Read, |
| output: &mut dyn Write, |
| ) -> Result<()> { |
| let input_a_ref = &input_a; |
| let input_b_ref = &input_b; |
| let output_ref = &output; |
| check(unsafe { |
| ffi::sqlite3changeset_concat_strm( |
| Some(x_input), |
| input_a_ref as *const &mut dyn Read as *mut c_void, |
| Some(x_input), |
| input_b_ref as *const &mut dyn Read as *mut c_void, |
| Some(x_output), |
| output_ref as *const &mut dyn Write as *mut c_void, |
| ) |
| }) |
| } |
| |
| /// Changeset or Patchset |
| pub struct Changeset { |
| cs: *mut c_void, |
| n: c_int, |
| } |
| |
| impl Changeset { |
| /// Invert a changeset |
| #[inline] |
| pub fn invert(&self) -> Result<Changeset> { |
| let mut n = 0; |
| let mut cs = ptr::null_mut(); |
| check(unsafe { |
| ffi::sqlite3changeset_invert(self.n, self.cs, &mut n, &mut cs as *mut *mut _) |
| })?; |
| Ok(Changeset { cs, n }) |
| } |
| |
| /// Create an iterator to traverse a changeset |
| #[inline] |
| pub fn iter(&self) -> Result<ChangesetIter<'_>> { |
| let mut it = ptr::null_mut(); |
| check(unsafe { ffi::sqlite3changeset_start(&mut it as *mut *mut _, self.n, self.cs) })?; |
| Ok(ChangesetIter { |
| phantom: PhantomData, |
| it, |
| item: None, |
| }) |
| } |
| |
| /// Concatenate two changeset objects |
| #[inline] |
| pub fn concat(a: &Changeset, b: &Changeset) -> Result<Changeset> { |
| let mut n = 0; |
| let mut cs = ptr::null_mut(); |
| check(unsafe { |
| ffi::sqlite3changeset_concat(a.n, a.cs, b.n, b.cs, &mut n, &mut cs as *mut *mut _) |
| })?; |
| Ok(Changeset { cs, n }) |
| } |
| } |
| |
| impl Drop for Changeset { |
| #[inline] |
| fn drop(&mut self) { |
| unsafe { |
| ffi::sqlite3_free(self.cs); |
| } |
| } |
| } |
| |
| /// Cursor for iterating over the elements of a changeset |
| /// or patchset. |
| pub struct ChangesetIter<'changeset> { |
| phantom: PhantomData<&'changeset Changeset>, |
| it: *mut ffi::sqlite3_changeset_iter, |
| item: Option<ChangesetItem>, |
| } |
| |
| impl ChangesetIter<'_> { |
| /// Create an iterator on `input` |
| #[inline] |
| pub fn start_strm<'input>(input: &&'input mut dyn Read) -> Result<ChangesetIter<'input>> { |
| let mut it = ptr::null_mut(); |
| check(unsafe { |
| ffi::sqlite3changeset_start_strm( |
| &mut it as *mut *mut _, |
| Some(x_input), |
| input as *const &mut dyn Read as *mut c_void, |
| ) |
| })?; |
| Ok(ChangesetIter { |
| phantom: PhantomData, |
| it, |
| item: None, |
| }) |
| } |
| } |
| |
| impl FallibleStreamingIterator for ChangesetIter<'_> { |
| type Error = crate::error::Error; |
| type Item = ChangesetItem; |
| |
| #[inline] |
| fn advance(&mut self) -> Result<()> { |
| let rc = unsafe { ffi::sqlite3changeset_next(self.it) }; |
| match rc { |
| ffi::SQLITE_ROW => { |
| self.item = Some(ChangesetItem { it: self.it }); |
| Ok(()) |
| } |
| ffi::SQLITE_DONE => { |
| self.item = None; |
| Ok(()) |
| } |
| code => Err(error_from_sqlite_code(code, None)), |
| } |
| } |
| |
| #[inline] |
| fn get(&self) -> Option<&ChangesetItem> { |
| self.item.as_ref() |
| } |
| } |
| |
| /// Operation |
| pub struct Operation<'item> { |
| table_name: &'item str, |
| number_of_columns: i32, |
| code: Action, |
| indirect: bool, |
| } |
| |
| impl Operation<'_> { |
| /// Returns the table name. |
| #[inline] |
| pub fn table_name(&self) -> &str { |
| self.table_name |
| } |
| |
| /// Returns the number of columns in table |
| #[inline] |
| pub fn number_of_columns(&self) -> i32 { |
| self.number_of_columns |
| } |
| |
| /// Returns the action code. |
| #[inline] |
| pub fn code(&self) -> Action { |
| self.code |
| } |
| |
| /// Returns `true` for an 'indirect' change. |
| #[inline] |
| pub fn indirect(&self) -> bool { |
| self.indirect |
| } |
| } |
| |
| impl Drop for ChangesetIter<'_> { |
| #[inline] |
| fn drop(&mut self) { |
| unsafe { |
| ffi::sqlite3changeset_finalize(self.it); |
| } |
| } |
| } |
| |
| /// An item passed to a conflict-handler by |
| /// [`Connection::apply`](crate::Connection::apply), or an item generated by |
| /// [`ChangesetIter::next`](ChangesetIter::next). |
| // TODO enum ? Delete, Insert, Update, ... |
| pub struct ChangesetItem { |
| it: *mut ffi::sqlite3_changeset_iter, |
| } |
| |
| impl ChangesetItem { |
| /// Obtain conflicting row values |
| /// |
| /// May only be called with an `SQLITE_CHANGESET_DATA` or |
| /// `SQLITE_CHANGESET_CONFLICT` conflict handler callback. |
| #[inline] |
| pub fn conflict(&self, col: usize) -> Result<ValueRef<'_>> { |
| unsafe { |
| let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut(); |
| check(ffi::sqlite3changeset_conflict( |
| self.it, |
| col as i32, |
| &mut p_value, |
| ))?; |
| Ok(ValueRef::from_value(p_value)) |
| } |
| } |
| |
| /// Determine the number of foreign key constraint violations |
| /// |
| /// May only be called with an `SQLITE_CHANGESET_FOREIGN_KEY` conflict |
| /// handler callback. |
| #[inline] |
| pub fn fk_conflicts(&self) -> Result<i32> { |
| unsafe { |
| let mut p_out = 0; |
| check(ffi::sqlite3changeset_fk_conflicts(self.it, &mut p_out))?; |
| Ok(p_out) |
| } |
| } |
| |
| /// Obtain new.* Values |
| /// |
| /// May only be called if the type of change is either `SQLITE_UPDATE` or |
| /// `SQLITE_INSERT`. |
| #[inline] |
| pub fn new_value(&self, col: usize) -> Result<ValueRef<'_>> { |
| unsafe { |
| let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut(); |
| check(ffi::sqlite3changeset_new(self.it, col as i32, &mut p_value))?; |
| Ok(ValueRef::from_value(p_value)) |
| } |
| } |
| |
| /// Obtain old.* Values |
| /// |
| /// May only be called if the type of change is either `SQLITE_DELETE` or |
| /// `SQLITE_UPDATE`. |
| #[inline] |
| pub fn old_value(&self, col: usize) -> Result<ValueRef<'_>> { |
| unsafe { |
| let mut p_value: *mut ffi::sqlite3_value = ptr::null_mut(); |
| check(ffi::sqlite3changeset_old(self.it, col as i32, &mut p_value))?; |
| Ok(ValueRef::from_value(p_value)) |
| } |
| } |
| |
| /// Obtain the current operation |
| #[inline] |
| pub fn op(&self) -> Result<Operation<'_>> { |
| let mut number_of_columns = 0; |
| let mut code = 0; |
| let mut indirect = 0; |
| let tab = unsafe { |
| let mut pz_tab: *const c_char = ptr::null(); |
| check(ffi::sqlite3changeset_op( |
| self.it, |
| &mut pz_tab, |
| &mut number_of_columns, |
| &mut code, |
| &mut indirect, |
| ))?; |
| CStr::from_ptr(pz_tab) |
| }; |
| let table_name = tab.to_str()?; |
| Ok(Operation { |
| table_name, |
| number_of_columns, |
| code: Action::from(code), |
| indirect: indirect != 0, |
| }) |
| } |
| |
| /// Obtain the primary key definition of a table |
| #[inline] |
| pub fn pk(&self) -> Result<&[u8]> { |
| let mut number_of_columns = 0; |
| unsafe { |
| let mut pks: *mut c_uchar = ptr::null_mut(); |
| check(ffi::sqlite3changeset_pk( |
| self.it, |
| &mut pks, |
| &mut number_of_columns, |
| ))?; |
| Ok(from_raw_parts(pks, number_of_columns as usize)) |
| } |
| } |
| } |
| |
| /// Used to combine two or more changesets or |
| /// patchsets |
| pub struct Changegroup { |
| cg: *mut ffi::sqlite3_changegroup, |
| } |
| |
| impl Changegroup { |
| /// Create a new change group. |
| #[inline] |
| pub fn new() -> Result<Self> { |
| let mut cg = ptr::null_mut(); |
| check(unsafe { ffi::sqlite3changegroup_new(&mut cg) })?; |
| Ok(Changegroup { cg }) |
| } |
| |
| /// Add a changeset |
| #[inline] |
| pub fn add(&mut self, cs: &Changeset) -> Result<()> { |
| check(unsafe { ffi::sqlite3changegroup_add(self.cg, cs.n, cs.cs) }) |
| } |
| |
| /// Add a changeset read from `input` to this change group. |
| #[inline] |
| pub fn add_stream(&mut self, input: &mut dyn Read) -> Result<()> { |
| let input_ref = &input; |
| check(unsafe { |
| ffi::sqlite3changegroup_add_strm( |
| self.cg, |
| Some(x_input), |
| input_ref as *const &mut dyn Read as *mut c_void, |
| ) |
| }) |
| } |
| |
| /// Obtain a composite Changeset |
| #[inline] |
| pub fn output(&mut self) -> Result<Changeset> { |
| let mut n = 0; |
| let mut output: *mut c_void = ptr::null_mut(); |
| check(unsafe { ffi::sqlite3changegroup_output(self.cg, &mut n, &mut output) })?; |
| Ok(Changeset { cs: output, n }) |
| } |
| |
| /// Write the combined set of changes to `output`. |
| #[inline] |
| pub fn output_strm(&mut self, output: &mut dyn Write) -> Result<()> { |
| let output_ref = &output; |
| check(unsafe { |
| ffi::sqlite3changegroup_output_strm( |
| self.cg, |
| Some(x_output), |
| output_ref as *const &mut dyn Write as *mut c_void, |
| ) |
| }) |
| } |
| } |
| |
| impl Drop for Changegroup { |
| #[inline] |
| fn drop(&mut self) { |
| unsafe { |
| ffi::sqlite3changegroup_delete(self.cg); |
| } |
| } |
| } |
| |
| impl Connection { |
| /// Apply a changeset to a database |
| pub fn apply<F, C>(&self, cs: &Changeset, filter: Option<F>, conflict: C) -> Result<()> |
| where |
| F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, |
| C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static, |
| { |
| let db = self.db.borrow_mut().db; |
| |
| let filtered = filter.is_some(); |
| let tuple = &mut (filter, conflict); |
| check(unsafe { |
| if filtered { |
| ffi::sqlite3changeset_apply( |
| db, |
| cs.n, |
| cs.cs, |
| Some(call_filter::<F, C>), |
| Some(call_conflict::<F, C>), |
| tuple as *mut (Option<F>, C) as *mut c_void, |
| ) |
| } else { |
| ffi::sqlite3changeset_apply( |
| db, |
| cs.n, |
| cs.cs, |
| None, |
| Some(call_conflict::<F, C>), |
| tuple as *mut (Option<F>, C) as *mut c_void, |
| ) |
| } |
| }) |
| } |
| |
| /// Apply a changeset to a database |
| pub fn apply_strm<F, C>( |
| &self, |
| input: &mut dyn Read, |
| filter: Option<F>, |
| conflict: C, |
| ) -> Result<()> |
| where |
| F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, |
| C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static, |
| { |
| let input_ref = &input; |
| let db = self.db.borrow_mut().db; |
| |
| let filtered = filter.is_some(); |
| let tuple = &mut (filter, conflict); |
| check(unsafe { |
| if filtered { |
| ffi::sqlite3changeset_apply_strm( |
| db, |
| Some(x_input), |
| input_ref as *const &mut dyn Read as *mut c_void, |
| Some(call_filter::<F, C>), |
| Some(call_conflict::<F, C>), |
| tuple as *mut (Option<F>, C) as *mut c_void, |
| ) |
| } else { |
| ffi::sqlite3changeset_apply_strm( |
| db, |
| Some(x_input), |
| input_ref as *const &mut dyn Read as *mut c_void, |
| None, |
| Some(call_conflict::<F, C>), |
| tuple as *mut (Option<F>, C) as *mut c_void, |
| ) |
| } |
| }) |
| } |
| } |
| |
| /// Constants passed to the conflict handler |
| /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_CONFLICT) for details. |
| #[allow(missing_docs)] |
| #[repr(i32)] |
| #[derive(Debug, PartialEq)] |
| #[non_exhaustive] |
| #[allow(clippy::upper_case_acronyms)] |
| pub enum ConflictType { |
| UNKNOWN = -1, |
| SQLITE_CHANGESET_DATA = ffi::SQLITE_CHANGESET_DATA, |
| SQLITE_CHANGESET_NOTFOUND = ffi::SQLITE_CHANGESET_NOTFOUND, |
| SQLITE_CHANGESET_CONFLICT = ffi::SQLITE_CHANGESET_CONFLICT, |
| SQLITE_CHANGESET_CONSTRAINT = ffi::SQLITE_CHANGESET_CONSTRAINT, |
| SQLITE_CHANGESET_FOREIGN_KEY = ffi::SQLITE_CHANGESET_FOREIGN_KEY, |
| } |
| impl From<i32> for ConflictType { |
| fn from(code: i32) -> ConflictType { |
| match code { |
| ffi::SQLITE_CHANGESET_DATA => ConflictType::SQLITE_CHANGESET_DATA, |
| ffi::SQLITE_CHANGESET_NOTFOUND => ConflictType::SQLITE_CHANGESET_NOTFOUND, |
| ffi::SQLITE_CHANGESET_CONFLICT => ConflictType::SQLITE_CHANGESET_CONFLICT, |
| ffi::SQLITE_CHANGESET_CONSTRAINT => ConflictType::SQLITE_CHANGESET_CONSTRAINT, |
| ffi::SQLITE_CHANGESET_FOREIGN_KEY => ConflictType::SQLITE_CHANGESET_FOREIGN_KEY, |
| _ => ConflictType::UNKNOWN, |
| } |
| } |
| } |
| |
| /// Constants returned by the conflict handler |
| /// See [here](https://sqlite.org/session.html#SQLITE_CHANGESET_ABORT) for details. |
| #[allow(missing_docs)] |
| #[repr(i32)] |
| #[derive(Debug, PartialEq)] |
| #[non_exhaustive] |
| #[allow(clippy::upper_case_acronyms)] |
| pub enum ConflictAction { |
| SQLITE_CHANGESET_OMIT = ffi::SQLITE_CHANGESET_OMIT, |
| SQLITE_CHANGESET_REPLACE = ffi::SQLITE_CHANGESET_REPLACE, |
| SQLITE_CHANGESET_ABORT = ffi::SQLITE_CHANGESET_ABORT, |
| } |
| |
| unsafe extern "C" fn call_filter<F, C>(p_ctx: *mut c_void, tbl_str: *const c_char) -> c_int |
| where |
| F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, |
| C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static, |
| { |
| use std::str; |
| |
| let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C); |
| let tbl_name = { |
| let c_slice = CStr::from_ptr(tbl_str).to_bytes(); |
| str::from_utf8(c_slice) |
| }; |
| match *tuple { |
| (Some(ref filter), _) => { |
| if let Ok(true) = catch_unwind(|| filter(tbl_name.expect("illegal table name"))) { |
| 1 |
| } else { |
| 0 |
| } |
| } |
| _ => unimplemented!(), |
| } |
| } |
| |
| unsafe extern "C" fn call_conflict<F, C>( |
| p_ctx: *mut c_void, |
| e_conflict: c_int, |
| p: *mut ffi::sqlite3_changeset_iter, |
| ) -> c_int |
| where |
| F: Fn(&str) -> bool + Send + RefUnwindSafe + 'static, |
| C: Fn(ConflictType, ChangesetItem) -> ConflictAction + Send + RefUnwindSafe + 'static, |
| { |
| let tuple: *mut (Option<F>, C) = p_ctx as *mut (Option<F>, C); |
| let conflict_type = ConflictType::from(e_conflict); |
| let item = ChangesetItem { it: p }; |
| if let Ok(action) = catch_unwind(|| (*tuple).1(conflict_type, item)) { |
| action as c_int |
| } else { |
| ffi::SQLITE_CHANGESET_ABORT |
| } |
| } |
| |
| unsafe extern "C" fn x_input(p_in: *mut c_void, data: *mut c_void, len: *mut c_int) -> c_int { |
| if p_in.is_null() { |
| return ffi::SQLITE_MISUSE; |
| } |
| let bytes: &mut [u8] = from_raw_parts_mut(data as *mut u8, *len as usize); |
| let input = p_in as *mut &mut dyn Read; |
| match (*input).read(bytes) { |
| Ok(n) => { |
| *len = n as i32; // TODO Validate: n = 0 may not mean the reader will always no longer be able to |
| // produce bytes. |
| ffi::SQLITE_OK |
| } |
| Err(_) => ffi::SQLITE_IOERR_READ, // TODO check if err is a (ru)sqlite Error => propagate |
| } |
| } |
| |
| unsafe extern "C" fn x_output(p_out: *mut c_void, data: *const c_void, len: c_int) -> c_int { |
| if p_out.is_null() { |
| return ffi::SQLITE_MISUSE; |
| } |
| // The sessions module never invokes an xOutput callback with the third |
| // parameter set to a value less than or equal to zero. |
| let bytes: &[u8] = from_raw_parts(data as *const u8, len as usize); |
| let output = p_out as *mut &mut dyn Write; |
| match (*output).write_all(bytes) { |
| Ok(_) => ffi::SQLITE_OK, |
| Err(_) => ffi::SQLITE_IOERR_WRITE, // TODO check if err is a (ru)sqlite Error => propagate |
| } |
| } |
| |
| #[cfg(test)] |
| mod test { |
| use fallible_streaming_iterator::FallibleStreamingIterator; |
| use std::io::Read; |
| use std::sync::atomic::{AtomicBool, Ordering}; |
| |
| use super::{Changeset, ChangesetIter, ConflictAction, ConflictType, Session}; |
| use crate::hooks::Action; |
| use crate::{Connection, Result}; |
| |
| fn one_changeset() -> Result<Changeset> { |
| let db = Connection::open_in_memory()?; |
| db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?; |
| |
| let mut session = Session::new(&db)?; |
| assert!(session.is_empty()); |
| |
| session.attach(None)?; |
| db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?; |
| |
| session.changeset() |
| } |
| |
| fn one_changeset_strm() -> Result<Vec<u8>> { |
| let db = Connection::open_in_memory()?; |
| db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?; |
| |
| let mut session = Session::new(&db)?; |
| assert!(session.is_empty()); |
| |
| session.attach(None)?; |
| db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?; |
| |
| let mut output = Vec::new(); |
| session.changeset_strm(&mut output)?; |
| Ok(output) |
| } |
| |
| #[test] |
| fn test_changeset() -> Result<()> { |
| let changeset = one_changeset()?; |
| let mut iter = changeset.iter()?; |
| let item = iter.next()?; |
| assert!(item.is_some()); |
| |
| let item = item.unwrap(); |
| let op = item.op()?; |
| assert_eq!("foo", op.table_name()); |
| assert_eq!(1, op.number_of_columns()); |
| assert_eq!(Action::SQLITE_INSERT, op.code()); |
| assert!(!op.indirect()); |
| |
| let pk = item.pk()?; |
| assert_eq!(&[1], pk); |
| |
| let new_value = item.new_value(0)?; |
| assert_eq!(Ok("bar"), new_value.as_str()); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_changeset_strm() -> Result<()> { |
| let output = one_changeset_strm()?; |
| assert!(!output.is_empty()); |
| assert_eq!(14, output.len()); |
| |
| let input: &mut dyn Read = &mut output.as_slice(); |
| let mut iter = ChangesetIter::start_strm(&input)?; |
| let item = iter.next()?; |
| assert!(item.is_some()); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_changeset_apply() -> Result<()> { |
| let changeset = one_changeset()?; |
| |
| let db = Connection::open_in_memory()?; |
| db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?; |
| |
| static CALLED: AtomicBool = AtomicBool::new(false); |
| db.apply( |
| &changeset, |
| None::<fn(&str) -> bool>, |
| |_conflict_type, _item| { |
| CALLED.store(true, Ordering::Relaxed); |
| ConflictAction::SQLITE_CHANGESET_OMIT |
| }, |
| )?; |
| |
| assert!(!CALLED.load(Ordering::Relaxed)); |
| let check = db.query_row("SELECT 1 FROM foo WHERE t = ?", ["bar"], |row| { |
| row.get::<_, i32>(0) |
| })?; |
| assert_eq!(1, check); |
| |
| // conflict expected when same changeset applied again on the same db |
| db.apply( |
| &changeset, |
| None::<fn(&str) -> bool>, |
| |conflict_type, item| { |
| CALLED.store(true, Ordering::Relaxed); |
| assert_eq!(ConflictType::SQLITE_CHANGESET_CONFLICT, conflict_type); |
| let conflict = item.conflict(0).unwrap(); |
| assert_eq!(Ok("bar"), conflict.as_str()); |
| ConflictAction::SQLITE_CHANGESET_OMIT |
| }, |
| )?; |
| assert!(CALLED.load(Ordering::Relaxed)); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_changeset_apply_strm() -> Result<()> { |
| let output = one_changeset_strm()?; |
| |
| let db = Connection::open_in_memory()?; |
| db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?; |
| |
| let mut input = output.as_slice(); |
| db.apply_strm( |
| &mut input, |
| None::<fn(&str) -> bool>, |
| |_conflict_type, _item| ConflictAction::SQLITE_CHANGESET_OMIT, |
| )?; |
| |
| let check = db.query_row("SELECT 1 FROM foo WHERE t = ?", ["bar"], |row| { |
| row.get::<_, i32>(0) |
| })?; |
| assert_eq!(1, check); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_session_empty() -> Result<()> { |
| let db = Connection::open_in_memory()?; |
| db.execute_batch("CREATE TABLE foo(t TEXT PRIMARY KEY NOT NULL);")?; |
| |
| let mut session = Session::new(&db)?; |
| assert!(session.is_empty()); |
| |
| session.attach(None)?; |
| db.execute("INSERT INTO foo (t) VALUES (?);", ["bar"])?; |
| |
| assert!(!session.is_empty()); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_session_set_enabled() -> Result<()> { |
| let db = Connection::open_in_memory()?; |
| |
| let mut session = Session::new(&db)?; |
| assert!(session.is_enabled()); |
| session.set_enabled(false); |
| assert!(!session.is_enabled()); |
| Ok(()) |
| } |
| |
| #[test] |
| fn test_session_set_indirect() -> Result<()> { |
| let db = Connection::open_in_memory()?; |
| |
| let mut session = Session::new(&db)?; |
| assert!(!session.is_indirect()); |
| session.set_indirect(true); |
| assert!(session.is_indirect()); |
| Ok(()) |
| } |
| } |