From 65594dce14d000c63d037be4251931d161754dbe Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Fri, 12 Dec 2025 10:29:51 -0800 Subject: [PATCH 1/5] [sqlite-watcher] scaffold crate with wal watcher + queue --- Cargo.lock | 13 ++ Cargo.toml | 6 + sqlite-watcher/Cargo.toml | 20 +++ sqlite-watcher/README.md | 41 +++++ sqlite-watcher/src/lib.rs | 2 + sqlite-watcher/src/main.rs | 261 ++++++++++++++++++++++++++++ sqlite-watcher/src/queue.rs | 222 +++++++++++++++++++++++ sqlite-watcher/src/wal.rs | 236 +++++++++++++++++++++++++ sqlite-watcher/tests/queue_tests.rs | 72 ++++++++ 9 files changed, 873 insertions(+) create mode 100644 sqlite-watcher/Cargo.toml create mode 100644 sqlite-watcher/README.md create mode 100644 sqlite-watcher/src/lib.rs create mode 100644 sqlite-watcher/src/main.rs create mode 100644 sqlite-watcher/src/queue.rs create mode 100644 sqlite-watcher/src/wal.rs create mode 100644 sqlite-watcher/tests/queue_tests.rs diff --git a/Cargo.lock b/Cargo.lock index eed6140..db13330 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3055,6 +3055,19 @@ dependencies = [ "windows-sys 0.60.2", ] +[[package]] +name = "sqlite-watcher" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap", + "dirs", + "rusqlite", + "tempfile", + "tracing", + "tracing-subscriber", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index 72b4e75..ffe5583 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,3 +1,9 @@ +[workspace] +resolver = "2" +members = [ + "sqlite-watcher", +] + [package] name = "database-replicator" version = "7.0.14" diff --git a/sqlite-watcher/Cargo.toml b/sqlite-watcher/Cargo.toml new file mode 100644 index 0000000..4152a5d --- /dev/null +++ b/sqlite-watcher/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "sqlite-watcher" +version = "0.1.0" +edition = "2021" +authors = ["SerenAI "] +description = "SQLite WAL tailer that streams change events to database-replicator." +license = "Apache-2.0" +repository = "https://github.com/serenorg/database-replicator" +readme = "README.md" + +[dependencies] +anyhow = "1.0" +clap = { version = "4.4", features = ["derive", "env"] } +dirs = "5.0" +rusqlite = "0.30" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } + +[dev-dependencies] +tempfile = "3.8" diff --git a/sqlite-watcher/README.md b/sqlite-watcher/README.md new file mode 100644 index 0000000..6c73ce4 --- /dev/null +++ b/sqlite-watcher/README.md @@ -0,0 +1,41 @@ +# sqlite-watcher + +`sqlite-watcher` tails SQLite WAL files and exposes change streams that `database-replicator` can consume for incremental syncs. The current milestone focuses on a CLI skeleton plus a WAL-growth watcher loop so we can exercise configuration, logging, and packaging before wiring in the change queue + gRPC service described in `docs/plans/sqlite-watcher-plan.md`. + +## Building + +```bash +cargo build -p sqlite-watcher +``` + +This crate participates in the main workspace, so `cargo build --workspace` or `cargo test --workspace` will also compile it. + +## CLI usage + +```bash +sqlite-watcher \ + --db /path/to/database.db \ + --listen unix:/tmp/sqlite-watcher.sock \ + --token-file ~/.seren/sqlite-watcher/token \ + --log-level info \ + --poll-interval-ms 250 \ + --min-event-bytes 4096 +``` + +Flag summary: + +- `--db` (required): SQLite file to monitor; must exist and be accessible in WAL mode. +- `--listen`: Listener endpoint; accepts `unix:/path`, `tcp:`, or `pipe:`. +- `--token-file`: Shared-secret used to authenticate gRPC clients (defaults to `~/.seren/sqlite-watcher/token`). +- `--log-level`: Tracing filter (also settable via `SQLITE_WATCHER_LOG`). +- `--poll-interval-ms`: How often to check the WAL file for growth (default 500 ms). Lower values react faster but cost more syscalls. +- `--min-event-bytes`: Minimum WAL byte growth before emitting an event. Use larger values to avoid spam when very small transactions occur. + +## Cross-platform notes + +- **Linux/macOS**: Default listener is a Unix domain socket at `/tmp/sqlite-watcher.sock`. Ensure the target SQLite database enables WAL journaling. +- **Windows**: Unix sockets are disabled; pass `--listen tcp:50051` or `--listen pipe:SerenWatcher`. Named pipes allow local service accounts without opening TCP ports. +- All platforms expect the token file to live under `~/.seren/sqlite-watcher/token` by default; create the directory with `0700` permissions so the watcher refuses to start if the secret is world-readable. +- The current WAL watcher polls the `*.sqlite-wal` file for byte growth. To keep WAL history available, configure your writers with `PRAGMA journal_mode=WAL;` and raise `wal_autocheckpoint` (or disable it) so the SQLite engine does not aggressively truncate the log. + +Additional design constraints and follow-up work items live in `docs/plans/sqlite-watcher-plan.md` and `docs/plans/sqlite-watcher-tickets.md`. diff --git a/sqlite-watcher/src/lib.rs b/sqlite-watcher/src/lib.rs new file mode 100644 index 0000000..dcd3d66 --- /dev/null +++ b/sqlite-watcher/src/lib.rs @@ -0,0 +1,2 @@ +pub mod queue; +pub mod wal; diff --git a/sqlite-watcher/src/main.rs b/sqlite-watcher/src/main.rs new file mode 100644 index 0000000..4f47c79 --- /dev/null +++ b/sqlite-watcher/src/main.rs @@ -0,0 +1,261 @@ +use std::fmt; +use std::path::{Path, PathBuf}; +use std::str::FromStr; +use std::sync::mpsc; +use std::time::Duration; + +use anyhow::{anyhow, bail, Context, Result}; +use clap::Parser; +use sqlite_watcher::wal::{start_wal_watcher, WalWatcherConfig as TailConfig}; +use tracing_subscriber::EnvFilter; + +#[cfg(unix)] +const DEFAULT_LISTEN: &str = "unix:/tmp/sqlite-watcher.sock"; +#[cfg(not(unix))] +const DEFAULT_LISTEN: &str = "tcp:50051"; + +/// Command-line interface definition for sqlite-watcher. +#[derive(Debug, Clone, Parser)] +#[command( + name = "sqlite-watcher", + version, + about = "Tails SQLite WAL files and exposes change streams.", + long_about = None +)] +struct Cli { + /// Path to the SQLite database the watcher should monitor. + #[arg(long = "db", value_name = "PATH")] + db_path: PathBuf, + + /// Listener binding. Accepts unix:/path, tcp:, or pipe:. + #[arg(long, value_name = "ENDPOINT", default_value = DEFAULT_LISTEN)] + listen: String, + + /// Shared-secret token file used to authenticate RPC clients. + #[arg(long = "token-file", value_name = "PATH")] + token_file: Option, + + /// Tracing filter (info,warn,debug,trace). Can also be provided via SQLITE_WATCHER_LOG. + #[arg( + long = "log-level", + value_name = "FILTER", + default_value = "info", + env = "SQLITE_WATCHER_LOG" + )] + log_filter: String, + + /// Interval in milliseconds between WAL file polls. + #[arg( + long = "poll-interval-ms", + default_value_t = 500, + value_parser = clap::value_parser!(u64).range(50..=60_000) + )] + poll_interval_ms: u64, + + /// Minimum WAL byte growth required before emitting an event. + #[arg( + long = "min-event-bytes", + default_value_t = 1, + value_parser = clap::value_parser!(u64).range(1..=10_000_000) + )] + min_event_bytes: u64, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +enum ListenAddress { + Unix(PathBuf), + Tcp { host: String, port: u16 }, + Pipe(String), +} + +impl fmt::Display for ListenAddress { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ListenAddress::Unix(path) => write!(f, "unix:{}", path.display()), + ListenAddress::Tcp { host, port } => write!(f, "tcp:{}:{}", host, port), + ListenAddress::Pipe(name) => write!(f, "pipe:{}", name), + } + } +} + +impl FromStr for ListenAddress { + type Err = anyhow::Error; + + fn from_str(value: &str) -> Result { + if let Some(path) = value.strip_prefix("unix:") { + if cfg!(windows) { + bail!("unix sockets are not supported on Windows"); + } + if path.is_empty() { + bail!("unix listen path cannot be empty"); + } + return Ok(ListenAddress::Unix(PathBuf::from(path))); + } + + if let Some(port) = value.strip_prefix("tcp:") { + let port: u16 = port + .parse() + .map_err(|_| anyhow!("tcp listener must specify a numeric port"))?; + return Ok(ListenAddress::Tcp { + host: "127.0.0.1".to_string(), + port, + }); + } + + if let Some(name) = value.strip_prefix("pipe:") { + if cfg!(not(windows)) { + bail!("named pipes are only valid on Windows"); + } + if name.is_empty() { + bail!("pipe name cannot be empty"); + } + return Ok(ListenAddress::Pipe(name.to_string())); + } + + bail!("listen endpoint must start with unix:/, tcp:, or pipe:"); + } +} + +#[derive(Debug, Clone)] +struct WatcherConfig { + database_path: PathBuf, + listen: ListenAddress, + token_file: PathBuf, + poll_interval: Duration, + min_event_bytes: u64, +} + +impl TryFrom for WatcherConfig { + type Error = anyhow::Error; + + fn try_from(args: Cli) -> Result { + let database_path = ensure_sqlite_file(&args.db_path)?; + let listen = ListenAddress::from_str(args.listen.trim())?; + let token_file = match args.token_file { + Some(path) => expand_home(path)?, + None => default_token_path()?, + }; + + Ok(Self { + database_path, + listen, + token_file, + poll_interval: Duration::from_millis(args.poll_interval_ms), + min_event_bytes: args.min_event_bytes, + }) + } +} + +fn ensure_sqlite_file(path: &Path) -> Result { + if !path.exists() { + bail!("database path {} does not exist", path.display()); + } + if !path.is_file() { + bail!("database path {} is not a file", path.display()); + } + Ok(path + .canonicalize() + .with_context(|| format!("failed to canonicalize {}", path.display()))?) +} + +fn default_token_path() -> Result { + let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?; + Ok(home.join(".seren/sqlite-watcher/token")) +} + +fn expand_home(path: PathBuf) -> Result { + let as_str = path.to_string_lossy(); + if let Some(stripped) = as_str.strip_prefix("~/") { + let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?; + return Ok(home.join(stripped)); + } + if as_str == "~" { + let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?; + return Ok(home); + } + Ok(path) +} + +fn init_tracing(filter: &str) -> Result<()> { + let env_filter = EnvFilter::try_new(filter).or_else(|_| EnvFilter::try_new("info"))?; + tracing_subscriber::fmt() + .with_env_filter(env_filter) + .with_target(false) + .try_init() + .map_err(|err| anyhow!("failed to init tracing subscriber: {err}")) +} + +fn main() -> Result<()> { + let cli = Cli::parse(); + init_tracing(&cli.log_filter)?; + let config = WatcherConfig::try_from(cli)?; + + tracing::info!( + db = %config.database_path.display(), + listen = %config.listen, + token = %config.token_file.display(), + poll_ms = config.poll_interval.as_millis(), + min_event_bytes = config.min_event_bytes, + "sqlite-watcher starting" + ); + + let (event_tx, event_rx) = mpsc::channel(); + let _wal_handle = start_wal_watcher( + &config.database_path, + TailConfig { + poll_interval: config.poll_interval, + min_event_bytes: config.min_event_bytes, + }, + event_tx, + )?; + + for event in event_rx { + tracing::info!( + bytes_added = event.bytes_added, + wal_size = event.current_size, + "wal file grew" + ); + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use clap::Parser; + + #[test] + fn parses_tcp_listener() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let cli = Cli::try_parse_from([ + "sqlite-watcher", + "--db", + tmp.path().to_str().unwrap(), + "--listen", + "tcp:6000", + "--token-file", + "./token", + "--log-level", + "debug", + ]) + .expect("cli parsing failed"); + + let config = WatcherConfig::try_from(cli).expect("config conversion failed"); + assert!(matches!( + config.listen, + ListenAddress::Tcp { host, port } if host == "127.0.0.1" && port == 6000 + )); + assert!(config.token_file.ends_with("token")); + } + + #[test] + #[cfg(unix)] + fn parses_unix_listener_default() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let cli = + Cli::try_parse_from(["sqlite-watcher", "--db", tmp.path().to_str().unwrap()]).unwrap(); + let config = WatcherConfig::try_from(cli).unwrap(); + assert!(matches!(config.listen, ListenAddress::Unix(_))); + } +} diff --git a/sqlite-watcher/src/queue.rs b/sqlite-watcher/src/queue.rs new file mode 100644 index 0000000..62d40ad --- /dev/null +++ b/sqlite-watcher/src/queue.rs @@ -0,0 +1,222 @@ +use std::fs; +use std::path::{Path, PathBuf}; + +use anyhow::{anyhow, Context, Result}; +use rusqlite::{params, Connection, OptionalExtension}; + +const SCHEMA: &str = r#" +CREATE TABLE IF NOT EXISTS changes ( + change_id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT NOT NULL, + op TEXT NOT NULL, + id TEXT NOT NULL, + payload BLOB, + wal_frame TEXT, + cursor TEXT, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + acked INTEGER NOT NULL DEFAULT 0 +); + +CREATE TABLE IF NOT EXISTS state ( + table_name TEXT PRIMARY KEY, + last_change_id INTEGER NOT NULL DEFAULT 0, + last_wal_frame TEXT, + cursor TEXT, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +"#; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ChangeOperation { + Insert, + Update, + Delete, +} + +impl ChangeOperation { + pub fn as_str(&self) -> &'static str { + match self { + ChangeOperation::Insert => "insert", + ChangeOperation::Update => "update", + ChangeOperation::Delete => "delete", + } + } + + fn from_str(value: &str) -> Result { + match value { + "insert" => Ok(ChangeOperation::Insert), + "update" => Ok(ChangeOperation::Update), + "delete" => Ok(ChangeOperation::Delete), + other => Err(anyhow!("unknown change op: {other}")), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct NewChange { + pub table_name: String, + pub operation: ChangeOperation, + pub primary_key: String, + pub payload: Option>, + pub wal_frame: Option, + pub cursor: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ChangeRecord { + pub change_id: i64, + pub table_name: String, + pub operation: ChangeOperation, + pub primary_key: String, + pub payload: Option>, + pub wal_frame: Option, + pub cursor: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct QueueState { + pub table_name: String, + pub last_change_id: i64, + pub last_wal_frame: Option, + pub cursor: Option, +} + +pub struct ChangeQueue { + path: PathBuf, + conn: Connection, +} + +impl ChangeQueue { + pub fn open(path: impl AsRef) -> Result { + let path = path.as_ref(); + if let Some(parent) = path.parent() { + fs::create_dir_all(parent).with_context(|| { + format!("failed to create queue directory {}", parent.display()) + })?; + #[cfg(unix)] + set_owner_perms(parent)?; + } + let conn = Connection::open(path) + .with_context(|| format!("failed to open queue database {}", path.display()))?; + conn.pragma_update(None, "journal_mode", &"wal") + .context("failed to enable WAL for change queue")?; + conn.pragma_update(None, "synchronous", &"normal").ok(); + conn.execute_batch(SCHEMA) + .context("failed to initialize queue schema")?; + Ok(Self { + path: path.to_path_buf(), + conn, + }) + } + + pub fn enqueue(&self, change: &NewChange) -> Result { + self.conn.execute( + "INSERT INTO changes(table_name, op, id, payload, wal_frame, cursor) VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + params![ + change.table_name, + change.operation.as_str(), + change.primary_key, + change.payload, + change.wal_frame, + change.cursor, + ], + )?; + Ok(self.conn.last_insert_rowid()) + } + + pub fn fetch_batch(&self, limit: usize) -> Result> { + let mut stmt = self.conn.prepare( + "SELECT change_id, table_name, op, id, payload, wal_frame, cursor + FROM changes + WHERE acked = 0 + ORDER BY change_id ASC + LIMIT ?1", + )?; + let mut rows = stmt.query([limit as i64])?; + let mut out = Vec::new(); + while let Some(row) = rows.next()? { + let op_str: String = row.get(2)?; + out.push(ChangeRecord { + change_id: row.get(0)?, + table_name: row.get(1)?, + operation: ChangeOperation::from_str(&op_str)?, + primary_key: row.get(3)?, + payload: row.get(4)?, + wal_frame: row.get(5)?, + cursor: row.get(6)?, + }); + } + Ok(out) + } + + pub fn ack_up_to(&self, change_id: i64) -> Result { + let updated = self.conn.execute( + "UPDATE changes SET acked = 1 WHERE change_id <= ?1", + [change_id], + )?; + Ok(updated as u64) + } + + pub fn vacuum_acknowledged(&self) -> Result { + let deleted = self + .conn + .execute("DELETE FROM changes WHERE acked = 1", [])?; + Ok(deleted as u64) + } + + pub fn get_state(&self, table_name: &str) -> Result> { + self.conn + .prepare( + "SELECT table_name, last_change_id, last_wal_frame, cursor + FROM state WHERE table_name = ?1", + )? + .query_row([table_name], |row| { + Ok(QueueState { + table_name: row.get(0)?, + last_change_id: row.get(1)?, + last_wal_frame: row.get(2)?, + cursor: row.get(3)?, + }) + }) + .optional() + .map_err(Into::into) + } + + pub fn set_state(&self, state: &QueueState) -> Result<()> { + self.conn.execute( + "INSERT INTO state(table_name, last_change_id, last_wal_frame, cursor, updated_at) + VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP) + ON CONFLICT(table_name) DO UPDATE SET + last_change_id = excluded.last_change_id, + last_wal_frame = excluded.last_wal_frame, + cursor = excluded.cursor, + updated_at = CURRENT_TIMESTAMP", + params![ + state.table_name, + state.last_change_id, + state.last_wal_frame, + state.cursor, + ], + )?; + Ok(()) + } + + pub fn path(&self) -> &Path { + &self.path + } +} + +#[cfg(unix)] +fn set_owner_perms(path: &Path) -> Result<()> { + use std::os::unix::fs::PermissionsExt; + let metadata = fs::metadata(path)?; + let mut perms = metadata.permissions(); + perms.set_mode(0o700); + fs::set_permissions(path, perms)?; + Ok(()) +} + +#[cfg(not(unix))] +fn set_owner_perms(_path: &Path) -> Result<()> { + Ok(()) +} diff --git a/sqlite-watcher/src/wal.rs b/sqlite-watcher/src/wal.rs new file mode 100644 index 0000000..13472c0 --- /dev/null +++ b/sqlite-watcher/src/wal.rs @@ -0,0 +1,236 @@ +use std::ffi::OsString; +use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::mpsc::Sender; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; +use std::time::Duration; + +use anyhow::{Context, Result}; +use tracing::{debug, warn}; + +#[derive(Debug, Clone, Copy)] +pub struct WalWatcherConfig { + pub poll_interval: Duration, + pub min_event_bytes: u64, +} + +impl Default for WalWatcherConfig { + fn default() -> Self { + Self { + poll_interval: Duration::from_millis(500), + min_event_bytes: 0, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WalEvent { + pub bytes_added: u64, + pub current_size: u64, +} + +pub struct WalWatcherHandle { + stop: Arc, + thread: Option>, +} + +impl Drop for WalWatcherHandle { + fn drop(&mut self) { + self.stop.store(true, Ordering::SeqCst); + if let Some(handle) = self.thread.take() { + let _ = handle.join(); + } + } +} + +pub fn start_wal_watcher>( + db_path: P, + options: WalWatcherConfig, + sender: Sender, +) -> Result { + let db_path = db_path.as_ref().canonicalize().with_context(|| { + format!( + "failed to canonicalize database path {}", + db_path.as_ref().display() + ) + })?; + if !db_path.is_file() { + anyhow::bail!("database path {} is not a file", db_path.display()); + } + + let wal_path = wal_file_path(&db_path); + let poll_interval = options.poll_interval; + let min_event_bytes = options.min_event_bytes; + let stop_flag = Arc::new(AtomicBool::new(false)); + let thread_stop = Arc::clone(&stop_flag); + + let handle = thread::spawn(move || { + let mut last_len = wal_file_size(&wal_path).unwrap_or(0); + debug!( + wal = %wal_path.display(), + last_len, + "wal watcher started" + ); + while !thread_stop.load(Ordering::SeqCst) { + match wal_file_size(&wal_path) { + Ok(len) => { + if len < last_len { + debug!( + wal = %wal_path.display(), + prev = last_len, + current = len, + "wal truncated; resetting baseline" + ); + last_len = len; + } else if len > last_len { + let delta = len - last_len; + last_len = len; + if delta >= min_event_bytes { + let event = WalEvent { + bytes_added: delta, + current_size: len, + }; + if sender.send(event).is_err() { + debug!("wal watcher stopping because receiver closed"); + break; + } + } + } + } + Err(err) => { + if err.kind() == std::io::ErrorKind::NotFound { + last_len = 0; + } else { + warn!( + wal = %wal_path.display(), + error = %err, + "failed to read wal metadata" + ); + } + } + } + + thread::sleep(poll_interval); + } + + debug!("wal watcher exiting"); + }); + + Ok(WalWatcherHandle { + stop: stop_flag, + thread: Some(handle), + }) +} + +fn wal_file_path(db_path: &Path) -> PathBuf { + let mut os_string = OsString::from(db_path.as_os_str()); + os_string.push("-wal"); + PathBuf::from(os_string) +} + +fn wal_file_size(path: &Path) -> std::io::Result { + std::fs::metadata(path).map(|m| m.len()) +} + +#[cfg(test)] +mod tests { + use super::*; + use rusqlite::Connection; + use std::sync::mpsc::channel; + use std::time::{Duration, Instant}; + use tempfile::tempdir; + + #[test] + fn emits_event_when_wal_grows() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("watch.sqlite"); + let writer = Connection::open(&db_path).unwrap(); + writer.pragma_update(None, "journal_mode", &"wal").unwrap(); + writer + .pragma_update(None, "wal_autocheckpoint", &0i64) + .unwrap(); + writer + .execute( + "CREATE TABLE changes(id INTEGER PRIMARY KEY, value TEXT)", + [], + ) + .unwrap(); + + let (tx, rx) = channel(); + let handle = start_wal_watcher( + &db_path, + WalWatcherConfig { + poll_interval: Duration::from_millis(50), + min_event_bytes: 1, + }, + tx, + ) + .unwrap(); + + for i in 0..50 { + writer + .execute( + "INSERT INTO changes(value) VALUES (?1)", + [format!("value-{i}")], + ) + .unwrap(); + } + + let event = rx.recv_timeout(Duration::from_secs(5)).unwrap(); + assert!(event.bytes_added > 0); + assert!(event.current_size >= event.bytes_added); + + drop(handle); + } + + #[test] + fn handles_wal_truncation() { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("truncate.sqlite"); + let writer = Connection::open(&db_path).unwrap(); + writer.pragma_update(None, "journal_mode", &"wal").unwrap(); + writer + .pragma_update(None, "wal_autocheckpoint", &0i64) + .unwrap(); + writer + .execute("CREATE TABLE stuff(id INTEGER PRIMARY KEY, value TEXT)", []) + .unwrap(); + + let (tx, rx) = channel(); + let handle = start_wal_watcher( + &db_path, + WalWatcherConfig { + poll_interval: Duration::from_millis(25), + min_event_bytes: 1, + }, + tx, + ) + .unwrap(); + + for i in 0..10 { + writer + .execute("INSERT INTO stuff(value) VALUES (?1)", [format!("row-{i}")]) + .unwrap(); + } + + rx.recv_timeout(Duration::from_secs(5)).unwrap(); + + writer + .execute_batch("PRAGMA wal_checkpoint(TRUNCATE);") + .unwrap(); + + // Ensure watcher does not send negative deltas (would panic or overflow) + let start = Instant::now(); + loop { + if rx.recv_timeout(Duration::from_millis(100)).is_ok() { + break; + } + if start.elapsed() > Duration::from_millis(500) { + break; + } + } + + drop(handle); + } +} diff --git a/sqlite-watcher/tests/queue_tests.rs b/sqlite-watcher/tests/queue_tests.rs new file mode 100644 index 0000000..75b38e3 --- /dev/null +++ b/sqlite-watcher/tests/queue_tests.rs @@ -0,0 +1,72 @@ +use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange, QueueState}; +use tempfile::tempdir; + +fn new_change(table: &str, pk: &str, op: ChangeOperation) -> NewChange { + NewChange { + table_name: table.to_string(), + operation: op, + primary_key: pk.to_string(), + payload: Some(format!("payload-{pk}").into_bytes()), + wal_frame: Some("0001".to_string()), + cursor: Some("cursor-1".to_string()), + } +} + +#[test] +fn queue_persists_changes_and_ack_flow() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("changes.db"); + let queue = ChangeQueue::open(&queue_path).unwrap(); + + let mut ids = Vec::new(); + for idx in 0..3 { + let change = new_change("vaults", &format!("pk-{idx}"), ChangeOperation::Insert); + ids.push(queue.enqueue(&change).unwrap()); + } + + let batch = queue.fetch_batch(10).unwrap(); + assert_eq!(batch.len(), 3); + assert_eq!(batch[0].change_id, ids[0]); + assert_eq!(batch[1].primary_key, "pk-1"); + + queue.ack_up_to(ids[0]).unwrap(); + let batch = queue.fetch_batch(10).unwrap(); + assert_eq!(batch.len(), 2); + assert_eq!(batch[0].change_id, ids[1]); + + let removed = queue.vacuum_acknowledged().unwrap(); + assert_eq!(removed, 1); + drop(queue); + + // Reopen to ensure durability. + let queue = ChangeQueue::open(&queue_path).unwrap(); + let batch = queue.fetch_batch(10).unwrap(); + assert_eq!(batch.len(), 2); + assert_eq!(batch[0].change_id, ids[1]); +} + +#[test] +fn queue_state_round_trip() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("state.db"); + let queue = ChangeQueue::open(&queue_path).unwrap(); + + assert!(queue.get_state("prices").unwrap().is_none()); + let state = QueueState { + table_name: "prices".to_string(), + last_change_id: 42, + last_wal_frame: Some("abcdef".to_string()), + cursor: Some(r#"{"timestamp":"2024-01-01T00:00:00Z"}"#.to_string()), + }; + queue.set_state(&state).unwrap(); + let fetched = queue.get_state("prices").unwrap().unwrap(); + assert_eq!(fetched, state); + + let updated = QueueState { + last_change_id: 55, + ..state + }; + queue.set_state(&updated).unwrap(); + let fetched = queue.get_state("prices").unwrap().unwrap(); + assert_eq!(fetched.last_change_id, 55); +} From 5188e44c6685e37f66e373648299556739f47a52 Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Fri, 12 Dec 2025 10:48:16 -0800 Subject: [PATCH 2/5] [sqlite-watcher] add RowChange abstraction and queue wiring --- Cargo.lock | 1 + sqlite-watcher/Cargo.toml | 1 + sqlite-watcher/README.md | 7 +++ sqlite-watcher/src/change.rs | 49 ++++++++++++++++++++ sqlite-watcher/src/lib.rs | 1 + sqlite-watcher/src/main.rs | 87 ++++++++++++++++++++++++++++++++---- 6 files changed, 137 insertions(+), 9 deletions(-) create mode 100644 sqlite-watcher/src/change.rs diff --git a/Cargo.lock b/Cargo.lock index db13330..a56110a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3063,6 +3063,7 @@ dependencies = [ "clap", "dirs", "rusqlite", + "serde_json", "tempfile", "tracing", "tracing-subscriber", diff --git a/sqlite-watcher/Cargo.toml b/sqlite-watcher/Cargo.toml index 4152a5d..03dcf61 100644 --- a/sqlite-watcher/Cargo.toml +++ b/sqlite-watcher/Cargo.toml @@ -13,6 +13,7 @@ anyhow = "1.0" clap = { version = "4.4", features = ["derive", "env"] } dirs = "5.0" rusqlite = "0.30" +serde_json = "1.0" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/sqlite-watcher/README.md b/sqlite-watcher/README.md index 6c73ce4..36459c4 100644 --- a/sqlite-watcher/README.md +++ b/sqlite-watcher/README.md @@ -18,6 +18,7 @@ sqlite-watcher \ --listen unix:/tmp/sqlite-watcher.sock \ --token-file ~/.seren/sqlite-watcher/token \ --log-level info \ + --queue-db ~/.seren/sqlite-watcher/changes.db \ --poll-interval-ms 250 \ --min-event-bytes 4096 ``` @@ -27,6 +28,7 @@ Flag summary: - `--db` (required): SQLite file to monitor; must exist and be accessible in WAL mode. - `--listen`: Listener endpoint; accepts `unix:/path`, `tcp:`, or `pipe:`. - `--token-file`: Shared-secret used to authenticate gRPC clients (defaults to `~/.seren/sqlite-watcher/token`). +- `--queue-db`: SQLite file used to persist change events + checkpoints (defaults to `~/.seren/sqlite-watcher/changes.db`). - `--log-level`: Tracing filter (also settable via `SQLITE_WATCHER_LOG`). - `--poll-interval-ms`: How often to check the WAL file for growth (default 500 ms). Lower values react faster but cost more syscalls. - `--min-event-bytes`: Minimum WAL byte growth before emitting an event. Use larger values to avoid spam when very small transactions occur. @@ -37,5 +39,10 @@ Flag summary: - **Windows**: Unix sockets are disabled; pass `--listen tcp:50051` or `--listen pipe:SerenWatcher`. Named pipes allow local service accounts without opening TCP ports. - All platforms expect the token file to live under `~/.seren/sqlite-watcher/token` by default; create the directory with `0700` permissions so the watcher refuses to start if the secret is world-readable. - The current WAL watcher polls the `*.sqlite-wal` file for byte growth. To keep WAL history available, configure your writers with `PRAGMA journal_mode=WAL;` and raise `wal_autocheckpoint` (or disable it) so the SQLite engine does not aggressively truncate the log. +- Change queue data is stored under `~/.seren/sqlite-watcher/changes.db`. The binary enforces owner-only permissions on that directory to keep tokens + change data private. + +### Placeholder change format + +Until the WAL decoder lands, each growth event is recorded as a `RowChange` with `table_name="__wal__"`, `operation=insert`, and a JSON payload describing the byte delta + timestamp. Downstream components can treat these as heartbeats while we finish Tickets B–D. Additional design constraints and follow-up work items live in `docs/plans/sqlite-watcher-plan.md` and `docs/plans/sqlite-watcher-tickets.md`. diff --git a/sqlite-watcher/src/change.rs b/sqlite-watcher/src/change.rs new file mode 100644 index 0000000..64059d7 --- /dev/null +++ b/sqlite-watcher/src/change.rs @@ -0,0 +1,49 @@ +use serde_json::Value; + +use crate::queue::{ChangeOperation, NewChange}; + +#[derive(Debug, Clone, PartialEq)] +pub struct RowChange { + pub table_name: String, + pub operation: ChangeOperation, + pub primary_key: String, + pub payload: Option, + pub wal_frame: Option, + pub cursor: Option, +} + +impl RowChange { + pub fn into_new_change(self) -> NewChange { + let payload = self + .payload + .map(|value| serde_json::to_vec(&value).expect("row change payload serializes")); + NewChange { + table_name: self.table_name, + operation: self.operation, + primary_key: self.primary_key, + payload, + wal_frame: self.wal_frame, + cursor: self.cursor, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn converts_to_new_change() { + let row = RowChange { + table_name: "prices".into(), + operation: ChangeOperation::Update, + primary_key: "pk1".into(), + payload: Some(serde_json::json!({"foo": "bar"})), + wal_frame: Some("frame-1".into()), + cursor: Some("cursor".into()), + }; + let change = row.into_new_change(); + assert_eq!(change.table_name, "prices"); + assert!(change.payload.unwrap().contains(&b'b')); + } +} diff --git a/sqlite-watcher/src/lib.rs b/sqlite-watcher/src/lib.rs index dcd3d66..29d96aa 100644 --- a/sqlite-watcher/src/lib.rs +++ b/sqlite-watcher/src/lib.rs @@ -1,2 +1,3 @@ +pub mod change; pub mod queue; pub mod wal; diff --git a/sqlite-watcher/src/main.rs b/sqlite-watcher/src/main.rs index 4f47c79..9e698fa 100644 --- a/sqlite-watcher/src/main.rs +++ b/sqlite-watcher/src/main.rs @@ -2,11 +2,14 @@ use std::fmt; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::mpsc; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use anyhow::{anyhow, bail, Context, Result}; use clap::Parser; -use sqlite_watcher::wal::{start_wal_watcher, WalWatcherConfig as TailConfig}; +use serde_json::json; +use sqlite_watcher::change::RowChange; +use sqlite_watcher::queue::{ChangeOperation, ChangeQueue}; +use sqlite_watcher::wal::{start_wal_watcher, WalEvent, WalWatcherConfig as TailConfig}; use tracing_subscriber::EnvFilter; #[cfg(unix)] @@ -35,6 +38,10 @@ struct Cli { #[arg(long = "token-file", value_name = "PATH")] token_file: Option, + /// Path to the durable change queue database. + #[arg(long = "queue-db", value_name = "PATH")] + queue_db: Option, + /// Tracing filter (info,warn,debug,trace). Can also be provided via SQLITE_WATCHER_LOG. #[arg( long = "log-level", @@ -121,6 +128,7 @@ struct WatcherConfig { database_path: PathBuf, listen: ListenAddress, token_file: PathBuf, + queue_path: PathBuf, poll_interval: Duration, min_event_bytes: u64, } @@ -135,11 +143,16 @@ impl TryFrom for WatcherConfig { Some(path) => expand_home(path)?, None => default_token_path()?, }; + let queue_path = match args.queue_db { + Some(path) => expand_home(path)?, + None => default_queue_path()?, + }; Ok(Self { database_path, listen, token_file, + queue_path, poll_interval: Duration::from_millis(args.poll_interval_ms), min_event_bytes: args.min_event_bytes, }) @@ -163,6 +176,11 @@ fn default_token_path() -> Result { Ok(home.join(".seren/sqlite-watcher/token")) } +fn default_queue_path() -> Result { + let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?; + Ok(home.join(".seren/sqlite-watcher/changes.db")) +} + fn expand_home(path: PathBuf) -> Result { let as_str = path.to_string_lossy(); if let Some(stripped) = as_str.strip_prefix("~/") { @@ -194,11 +212,13 @@ fn main() -> Result<()> { db = %config.database_path.display(), listen = %config.listen, token = %config.token_file.display(), + queue = %config.queue_path.display(), poll_ms = config.poll_interval.as_millis(), min_event_bytes = config.min_event_bytes, "sqlite-watcher starting" ); + let queue = ChangeQueue::open(&config.queue_path)?; let (event_tx, event_rx) = mpsc::channel(); let _wal_handle = start_wal_watcher( &config.database_path, @@ -210,24 +230,55 @@ fn main() -> Result<()> { )?; for event in event_rx { - tracing::info!( - bytes_added = event.bytes_added, - wal_size = event.current_size, - "wal file grew" - ); + match persist_wal_event(&queue, &event) { + Ok(change_id) => { + tracing::info!( + bytes_added = event.bytes_added, + wal_size = event.current_size, + change_id, + "queued wal growth event" + ); + } + Err(err) => { + tracing::warn!(error = %err, "failed to persist wal event to queue"); + } + } } Ok(()) } +fn persist_wal_event(queue: &ChangeQueue, event: &WalEvent) -> Result { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|err| anyhow!("system clock drift: {err}"))?; + let row = RowChange { + table_name: "__wal__".to_string(), + operation: ChangeOperation::Insert, + primary_key: now.as_nanos().to_string(), + payload: Some(json!({ + "kind": "wal_growth", + "bytes_added": event.bytes_added, + "current_size": event.current_size, + "recorded_at": now.as_secs_f64(), + })), + wal_frame: None, + cursor: None, + }; + queue.enqueue(&row.into_new_change()) +} + #[cfg(test)] mod tests { use super::*; use clap::Parser; + use sqlite_watcher::queue::ChangeQueue; + use sqlite_watcher::wal::WalEvent; + use tempfile::{tempdir, NamedTempFile}; #[test] fn parses_tcp_listener() { - let tmp = tempfile::NamedTempFile::new().unwrap(); + let tmp = NamedTempFile::new().unwrap(); let cli = Cli::try_parse_from([ "sqlite-watcher", "--db", @@ -247,15 +298,33 @@ mod tests { ListenAddress::Tcp { host, port } if host == "127.0.0.1" && port == 6000 )); assert!(config.token_file.ends_with("token")); + assert!(config.queue_path.ends_with("changes.db")); } #[test] #[cfg(unix)] fn parses_unix_listener_default() { - let tmp = tempfile::NamedTempFile::new().unwrap(); + let tmp = NamedTempFile::new().unwrap(); let cli = Cli::try_parse_from(["sqlite-watcher", "--db", tmp.path().to_str().unwrap()]).unwrap(); let config = WatcherConfig::try_from(cli).unwrap(); assert!(matches!(config.listen, ListenAddress::Unix(_))); } + + #[test] + fn persist_wal_events_into_queue() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("queue.db"); + let queue = ChangeQueue::open(&queue_path).unwrap(); + + let event = WalEvent { + bytes_added: 2048, + current_size: 4096, + }; + let change_id = persist_wal_event(&queue, &event).unwrap(); + let batch = queue.fetch_batch(10).unwrap(); + assert_eq!(batch.len(), 1); + assert_eq!(batch[0].change_id, change_id); + assert_eq!(batch[0].table_name, "__wal__"); + } } From 21c715010ccf5612dd95f9966a2f98cdb251f059 Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Fri, 12 Dec 2025 10:50:01 -0800 Subject: [PATCH 3/5] [sqlite-watcher] introduce wal growth decoder --- sqlite-watcher/src/decoder.rs | 48 +++++++++++++++++++++++++++++++ sqlite-watcher/src/lib.rs | 1 + sqlite-watcher/src/main.rs | 54 +++++++++++++++-------------------- 3 files changed, 72 insertions(+), 31 deletions(-) create mode 100644 sqlite-watcher/src/decoder.rs diff --git a/sqlite-watcher/src/decoder.rs b/sqlite-watcher/src/decoder.rs new file mode 100644 index 0000000..9da5268 --- /dev/null +++ b/sqlite-watcher/src/decoder.rs @@ -0,0 +1,48 @@ +use crate::change::RowChange; +use crate::queue::ChangeOperation; +use crate::wal::WalEvent; +use serde_json::json; +use std::time::{SystemTime, UNIX_EPOCH}; + +/// Temporary decoder that turns WAL growth bytes into placeholder RowChange events. +/// Placeholder until row-level decoding is implemented. +#[derive(Debug, Default, Clone)] +pub struct WalGrowthDecoder; + +impl WalGrowthDecoder { + pub fn decode(&self, event: &WalEvent) -> Vec { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("clock should be >= UNIX epoch"); + vec![RowChange { + table_name: "__wal__".to_string(), + operation: ChangeOperation::Insert, + primary_key: now.as_nanos().to_string(), + payload: Some(json!({ + "kind": "wal_growth", + "bytes_added": event.bytes_added, + "current_size": event.current_size, + "recorded_at": now.as_secs_f64(), + })), + wal_frame: None, + cursor: None, + }] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn produces_placeholder_row_change() { + let decoder = WalGrowthDecoder::default(); + let rows = decoder.decode(&WalEvent { + bytes_added: 1024, + current_size: 2048, + }); + assert_eq!(rows.len(), 1); + assert_eq!(rows[0].table_name, "__wal__"); + assert_eq!(rows[0].operation, ChangeOperation::Insert); + } +} diff --git a/sqlite-watcher/src/lib.rs b/sqlite-watcher/src/lib.rs index 29d96aa..ede4d33 100644 --- a/sqlite-watcher/src/lib.rs +++ b/sqlite-watcher/src/lib.rs @@ -1,3 +1,4 @@ pub mod change; +pub mod decoder; pub mod queue; pub mod wal; diff --git a/sqlite-watcher/src/main.rs b/sqlite-watcher/src/main.rs index 9e698fa..2dd704f 100644 --- a/sqlite-watcher/src/main.rs +++ b/sqlite-watcher/src/main.rs @@ -2,14 +2,13 @@ use std::fmt; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::mpsc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::Duration; use anyhow::{anyhow, bail, Context, Result}; use clap::Parser; -use serde_json::json; -use sqlite_watcher::change::RowChange; -use sqlite_watcher::queue::{ChangeOperation, ChangeQueue}; -use sqlite_watcher::wal::{start_wal_watcher, WalEvent, WalWatcherConfig as TailConfig}; +use sqlite_watcher::decoder::WalGrowthDecoder; +use sqlite_watcher::queue::ChangeQueue; +use sqlite_watcher::wal::{start_wal_watcher, WalWatcherConfig as TailConfig}; use tracing_subscriber::EnvFilter; #[cfg(unix)] @@ -219,6 +218,7 @@ fn main() -> Result<()> { ); let queue = ChangeQueue::open(&config.queue_path)?; + let decoder = WalGrowthDecoder::default(); let (event_tx, event_rx) = mpsc::channel(); let _wal_handle = start_wal_watcher( &config.database_path, @@ -230,42 +230,35 @@ fn main() -> Result<()> { )?; for event in event_rx { - match persist_wal_event(&queue, &event) { - Ok(change_id) => { + match process_wal_event(&decoder, &queue, &event) { + Ok(change_ids) if !change_ids.is_empty() => { tracing::info!( bytes_added = event.bytes_added, wal_size = event.current_size, - change_id, + change_count = change_ids.len(), "queued wal growth event" ); } Err(err) => { tracing::warn!(error = %err, "failed to persist wal event to queue"); } + _ => {} } } Ok(()) } -fn persist_wal_event(queue: &ChangeQueue, event: &WalEvent) -> Result { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_err(|err| anyhow!("system clock drift: {err}"))?; - let row = RowChange { - table_name: "__wal__".to_string(), - operation: ChangeOperation::Insert, - primary_key: now.as_nanos().to_string(), - payload: Some(json!({ - "kind": "wal_growth", - "bytes_added": event.bytes_added, - "current_size": event.current_size, - "recorded_at": now.as_secs_f64(), - })), - wal_frame: None, - cursor: None, - }; - queue.enqueue(&row.into_new_change()) +fn process_wal_event( + decoder: &WalGrowthDecoder, + queue: &ChangeQueue, + event: &sqlite_watcher::wal::WalEvent, +) -> Result> { + let mut ids = Vec::new(); + for row in decoder.decode(event) { + ids.push(queue.enqueue(&row.into_new_change())?); + } + Ok(ids) } #[cfg(test)] @@ -273,7 +266,6 @@ mod tests { use super::*; use clap::Parser; use sqlite_watcher::queue::ChangeQueue; - use sqlite_watcher::wal::WalEvent; use tempfile::{tempdir, NamedTempFile}; #[test] @@ -316,15 +308,15 @@ mod tests { let dir = tempdir().unwrap(); let queue_path = dir.path().join("queue.db"); let queue = ChangeQueue::open(&queue_path).unwrap(); + let decoder = WalGrowthDecoder::default(); - let event = WalEvent { + let event = sqlite_watcher::wal::WalEvent { bytes_added: 2048, current_size: 4096, }; - let change_id = persist_wal_event(&queue, &event).unwrap(); + let change_ids = process_wal_event(&decoder, &queue, &event).unwrap(); let batch = queue.fetch_batch(10).unwrap(); - assert_eq!(batch.len(), 1); - assert_eq!(batch[0].change_id, change_id); + assert_eq!(batch.len(), change_ids.len()); assert_eq!(batch[0].table_name, "__wal__"); } } From f079d2a6e05f3fef059d64b59bbf5da878b254ab Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Fri, 12 Dec 2025 10:58:03 -0800 Subject: [PATCH 4/5] [sqlite-watcher] add tonic-based watcher server --- Cargo.lock | 295 ++++++++++++++++++++++++++- sqlite-watcher/Cargo.toml | 9 + sqlite-watcher/README.md | 1 + sqlite-watcher/build.rs | 8 + sqlite-watcher/proto/watcher.proto | 62 ++++++ sqlite-watcher/src/lib.rs | 5 + sqlite-watcher/src/main.rs | 43 ++++ sqlite-watcher/src/server.rs | 222 ++++++++++++++++++++ sqlite-watcher/tests/server_tests.rs | 32 +++ 9 files changed, 670 insertions(+), 7 deletions(-) create mode 100644 sqlite-watcher/build.rs create mode 100644 sqlite-watcher/proto/watcher.proto create mode 100644 sqlite-watcher/src/server.rs create mode 100644 sqlite-watcher/tests/server_tests.rs diff --git a/Cargo.lock b/Cargo.lock index a56110a..36ed654 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,28 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -135,6 +157,51 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "base64" version = "0.21.7" @@ -169,7 +236,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -246,7 +313,7 @@ dependencies = [ "getrandom 0.2.16", "getrandom 0.3.4", "hex", - "indexmap", + "indexmap 2.12.0", "js-sys", "once_cell", "rand 0.9.2", @@ -917,6 +984,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.1.5" @@ -1188,7 +1261,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 2.12.0", "slab", "tokio", "tokio-util", @@ -1374,6 +1447,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1519,6 +1604,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.12.0" @@ -1583,6 +1678,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -1624,7 +1728,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" dependencies = [ - "indexmap", + "indexmap 2.12.0", ] [[package]] @@ -1786,6 +1890,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -1941,6 +2051,12 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "mysql-common-derive" version = "0.31.2" @@ -2222,6 +2338,16 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.12.0", +] + [[package]] name = "phf" version = "0.13.1" @@ -2354,6 +2480,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn 2.0.108", +] + [[package]] name = "proc-macro-crate" version = "3.4.0" @@ -2394,6 +2530,59 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +dependencies = [ + "bytes", + "heck", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.108", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.108", +] + +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -2881,7 +3070,7 @@ version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ - "indexmap", + "indexmap 2.12.0", "itoa", "memchr", "ryu", @@ -3062,9 +3251,14 @@ dependencies = [ "anyhow", "clap", "dirs", + "prost", "rusqlite", "serde_json", "tempfile", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", "tracing", "tracing-subscriber", ] @@ -3345,6 +3539,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bd86198d9ee903fedd2f9a2e72014287c0d9167e4ae43b5853007205dda1b76" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.6.0" @@ -3402,6 +3606,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.17" @@ -3452,7 +3667,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap", + "indexmap 2.12.0", "serde", "serde_spanned", "toml_datetime 0.6.11", @@ -3466,7 +3681,7 @@ version = "0.23.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" dependencies = [ - "indexmap", + "indexmap 2.12.0", "toml_datetime 0.7.3", "toml_parser", "winnow", @@ -3487,6 +3702,72 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.108", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" diff --git a/sqlite-watcher/Cargo.toml b/sqlite-watcher/Cargo.toml index 03dcf61..9e1095c 100644 --- a/sqlite-watcher/Cargo.toml +++ b/sqlite-watcher/Cargo.toml @@ -7,15 +7,24 @@ description = "SQLite WAL tailer that streams change events to database-replicat license = "Apache-2.0" repository = "https://github.com/serenorg/database-replicator" readme = "README.md" +build = "build.rs" + +[build-dependencies] +tonic-build = "0.11" [dependencies] anyhow = "1.0" clap = { version = "4.4", features = ["derive", "env"] } dirs = "5.0" +prost = "0.12" rusqlite = "0.30" serde_json = "1.0" +tokio = { version = "1.35", features = ["macros", "rt-multi-thread", "signal", "fs"] } +tonic = { version = "0.11", features = ["transport"] } +tokio-stream = { version = "0.1", features = ["net"] } tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } [dev-dependencies] tempfile = "3.8" +tokio = { version = "1.35", features = ["rt", "macros"] } diff --git a/sqlite-watcher/README.md b/sqlite-watcher/README.md index 36459c4..d2a8f22 100644 --- a/sqlite-watcher/README.md +++ b/sqlite-watcher/README.md @@ -32,6 +32,7 @@ Flag summary: - `--log-level`: Tracing filter (also settable via `SQLITE_WATCHER_LOG`). - `--poll-interval-ms`: How often to check the WAL file for growth (default 500 ms). Lower values react faster but cost more syscalls. - `--min-event-bytes`: Minimum WAL byte growth before emitting an event. Use larger values to avoid spam when very small transactions occur. +- `--listen` + `--token-file` now control the embedded gRPC server. Clients must send `Authorization: Bearer ` metadata when calling the `Watcher` service (see `proto/watcher.proto`). Unix sockets/pipes are placeholders until Ticket D is completed; TCP listens on `127.0.0.1:`. ## Cross-platform notes diff --git a/sqlite-watcher/build.rs b/sqlite-watcher/build.rs new file mode 100644 index 0000000..a38290d --- /dev/null +++ b/sqlite-watcher/build.rs @@ -0,0 +1,8 @@ +fn main() -> Result<(), Box> { + tonic_build::configure() + .build_client(true) + .build_server(true) + .compile(&["proto/watcher.proto"], &["proto"])?; + println!("cargo:rerun-if-changed=proto/watcher.proto"); + Ok(()) +} diff --git a/sqlite-watcher/proto/watcher.proto b/sqlite-watcher/proto/watcher.proto new file mode 100644 index 0000000..064f3cb --- /dev/null +++ b/sqlite-watcher/proto/watcher.proto @@ -0,0 +1,62 @@ +syntax = "proto3"; + +package sqlitewatcher; + +message HealthCheckRequest {} +message HealthCheckResponse { + string status = 1; +} + +message ListChangesRequest { + uint32 limit = 1; +} + +message Change { + int64 change_id = 1; + string table_name = 2; + string op = 3; + string primary_key = 4; + bytes payload = 5; + string wal_frame = 6; + string cursor = 7; +} + +message ListChangesResponse { + repeated Change changes = 1; +} + +message AckChangesRequest { + int64 up_to_change_id = 1; +} + +message AckChangesResponse { + uint64 acknowledged = 1; +} + +message GetStateRequest { + string table_name = 1; +} + +message GetStateResponse { + bool exists = 1; + int64 last_change_id = 2; + string last_wal_frame = 3; + string cursor = 4; +} + +message SetStateRequest { + string table_name = 1; + int64 last_change_id = 2; + string last_wal_frame = 3; + string cursor = 4; +} + +message SetStateResponse {} + +service Watcher { + rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse); + rpc ListChanges(ListChangesRequest) returns (ListChangesResponse); + rpc AckChanges(AckChangesRequest) returns (AckChangesResponse); + rpc GetState(GetStateRequest) returns (GetStateResponse); + rpc SetState(SetStateRequest) returns (SetStateResponse); +} diff --git a/sqlite-watcher/src/lib.rs b/sqlite-watcher/src/lib.rs index ede4d33..5f2fc6a 100644 --- a/sqlite-watcher/src/lib.rs +++ b/sqlite-watcher/src/lib.rs @@ -1,4 +1,9 @@ pub mod change; pub mod decoder; pub mod queue; +pub mod server; pub mod wal; + +pub mod watcher_proto { + tonic::include_proto!("sqlitewatcher"); +} diff --git a/sqlite-watcher/src/main.rs b/sqlite-watcher/src/main.rs index 2dd704f..9b986fb 100644 --- a/sqlite-watcher/src/main.rs +++ b/sqlite-watcher/src/main.rs @@ -1,4 +1,6 @@ use std::fmt; +use std::fs; +use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::str::FromStr; use std::sync::mpsc; @@ -8,6 +10,7 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::Parser; use sqlite_watcher::decoder::WalGrowthDecoder; use sqlite_watcher::queue::ChangeQueue; +use sqlite_watcher::server::TcpServerHandle; use sqlite_watcher::wal::{start_wal_watcher, WalWatcherConfig as TailConfig}; use tracing_subscriber::EnvFilter; @@ -206,6 +209,7 @@ fn main() -> Result<()> { let cli = Cli::parse(); init_tracing(&cli.log_filter)?; let config = WatcherConfig::try_from(cli)?; + let auth_token = read_token_file(&config.token_file)?; tracing::info!( db = %config.database_path.display(), @@ -219,6 +223,7 @@ fn main() -> Result<()> { let queue = ChangeQueue::open(&config.queue_path)?; let decoder = WalGrowthDecoder::default(); + let server_handle = start_grpc_server(&config.listen, &config.queue_path, &auth_token)?; let (event_tx, event_rx) = mpsc::channel(); let _wal_handle = start_wal_watcher( &config.database_path, @@ -246,6 +251,7 @@ fn main() -> Result<()> { } } + drop(server_handle); Ok(()) } @@ -261,6 +267,43 @@ fn process_wal_event( Ok(ids) } +fn read_token_file(path: &Path) -> Result { + let contents = fs::read_to_string(path) + .with_context(|| format!("failed to read token file {}", path.display()))?; + let token = contents.trim().to_string(); + if token.is_empty() { + bail!("token file {} is empty", path.display()); + } + Ok(token) +} + +fn start_grpc_server( + listen: &ListenAddress, + queue_path: &Path, + token: &str, +) -> Result> { + match listen { + ListenAddress::Tcp { host, port } => { + let addr: SocketAddr = format!("{}:{}", host, port) + .parse() + .with_context(|| format!("invalid tcp listen address {host}:{port}"))?; + let handle = TcpServerHandle::spawn(addr, queue_path.to_path_buf(), token.to_string())?; + Ok(Some(handle)) + } + ListenAddress::Unix(path) => { + tracing::warn!( + path = %path.display(), + "unix socket gRPC transport is not yet implemented" + ); + Ok(None) + } + ListenAddress::Pipe(name) => { + tracing::warn!(pipe = name, "named pipe transport is not yet implemented"); + Ok(None) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/sqlite-watcher/src/server.rs b/sqlite-watcher/src/server.rs new file mode 100644 index 0000000..c398094 --- /dev/null +++ b/sqlite-watcher/src/server.rs @@ -0,0 +1,222 @@ +use std::net::SocketAddr; +use std::path::PathBuf; +use std::sync::Arc; +use std::thread::{self, JoinHandle}; + +use anyhow::{Context, Result}; +use tokio::runtime::Builder; +use tokio::sync::oneshot; +use tokio_stream::wrappers::TcpListenerStream; +use tonic::service::Interceptor; +use tonic::{transport::Server, Request, Response, Status}; + +use crate::queue::{ChangeQueue, QueueState}; +use crate::watcher_proto::watcher_server::{Watcher, WatcherServer}; +use crate::watcher_proto::{ + AckChangesRequest, AckChangesResponse, Change, GetStateRequest, GetStateResponse, + HealthCheckRequest, HealthCheckResponse, ListChangesRequest, ListChangesResponse, + SetStateRequest, SetStateResponse, +}; + +pub struct TcpServerHandle { + shutdown: Option>, + thread: Option>>, +} + +impl Drop for TcpServerHandle { + fn drop(&mut self) { + if let Some(tx) = self.shutdown.take() { + let _ = tx.send(()); + } + if let Some(handle) = self.thread.take() { + let _ = handle.join(); + } + } +} + +impl TcpServerHandle { + pub fn spawn(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let thread = thread::spawn(move || -> Result<()> { + let runtime = Builder::new_multi_thread() + .enable_all() + .build() + .context("failed to build tokio runtime")?; + runtime.block_on(async move { + let listener = tokio::net::TcpListener::bind(addr) + .await + .context("failed to bind tcp listener")?; + let queue_path = Arc::new(queue_path); + let svc = WatcherService::new(queue_path.clone()); + let interceptor = AuthInterceptor { + token: Arc::new(token), + }; + Server::builder() + .add_service(WatcherServer::with_interceptor(svc, interceptor)) + .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move { + let _ = shutdown_rx.await; + }) + .await + .context("grpc server exited with error")?; + Ok(()) + }) + }); + + Ok(Self { + shutdown: Some(shutdown_tx), + thread: Some(thread), + }) + } +} + +struct WatcherService { + queue_path: Arc, +} + +impl WatcherService { + fn new(queue_path: Arc) -> Self { + Self { queue_path } + } + + fn open_queue(&self) -> Result { + ChangeQueue::open(&*self.queue_path) + } +} + +#[tonic::async_trait] +impl Watcher for WatcherService { + async fn health_check( + &self, + _: Request, + ) -> Result, Status> { + Ok(Response::new(HealthCheckResponse { + status: "ok".to_string(), + })) + } + + async fn list_changes( + &self, + request: Request, + ) -> Result, Status> { + let limit = request.get_ref().limit.max(1).min(10_000) as usize; + let queue = self + .open_queue() + .map_err(|err| Status::internal(err.to_string()))?; + let rows = queue + .fetch_batch(limit) + .map_err(|err| Status::internal(err.to_string()))?; + let changes = rows.into_iter().map(change_to_proto).collect(); + Ok(Response::new(ListChangesResponse { changes })) + } + + async fn ack_changes( + &self, + request: Request, + ) -> Result, Status> { + let upto = request.get_ref().up_to_change_id; + let queue = self + .open_queue() + .map_err(|err| Status::internal(err.to_string()))?; + let count = queue + .ack_up_to(upto) + .map_err(|err| Status::internal(err.to_string()))?; + Ok(Response::new(AckChangesResponse { + acknowledged: count, + })) + } + + async fn get_state( + &self, + request: Request, + ) -> Result, Status> { + let name = request.get_ref().table_name.clone(); + let queue = self + .open_queue() + .map_err(|err| Status::internal(err.to_string()))?; + let state = queue + .get_state(&name) + .map_err(|err| Status::internal(err.to_string()))?; + let resp = match state { + Some(state) => GetStateResponse { + exists: true, + last_change_id: state.last_change_id, + last_wal_frame: state.last_wal_frame.unwrap_or_default(), + cursor: state.cursor.unwrap_or_default(), + }, + None => GetStateResponse { + exists: false, + last_change_id: 0, + last_wal_frame: String::new(), + cursor: String::new(), + }, + }; + Ok(Response::new(resp)) + } + + async fn set_state( + &self, + request: Request, + ) -> Result, Status> { + let payload = request.into_inner(); + if payload.table_name.is_empty() { + return Err(Status::invalid_argument("table_name is required")); + } + let queue = self + .open_queue() + .map_err(|err| Status::internal(err.to_string()))?; + let state = QueueState { + table_name: payload.table_name, + last_change_id: payload.last_change_id, + last_wal_frame: if payload.last_wal_frame.is_empty() { + None + } else { + Some(payload.last_wal_frame) + }, + cursor: if payload.cursor.is_empty() { + None + } else { + Some(payload.cursor) + }, + }; + queue + .set_state(&state) + .map_err(|err| Status::internal(err.to_string()))?; + Ok(Response::new(SetStateResponse {})) + } +} + +fn change_to_proto(row: crate::queue::ChangeRecord) -> Change { + Change { + change_id: row.change_id, + table_name: row.table_name, + op: row.operation.as_str().to_string(), + primary_key: row.primary_key, + payload: row.payload.unwrap_or_default(), + wal_frame: row.wal_frame.unwrap_or_default(), + cursor: row.cursor.unwrap_or_default(), + } +} + +#[derive(Clone)] +struct AuthInterceptor { + token: Arc, +} + +impl Interceptor for AuthInterceptor { + fn call(&mut self, request: Request<()>) -> Result, Status> { + let header = request + .metadata() + .get("authorization") + .ok_or_else(|| Status::unauthenticated("missing authorization header"))?; + let expected = format!("Bearer {}", self.token.as_ref()); + if header + .to_str() + .map(|value| value == expected) + .unwrap_or(false) + { + Ok(request) + } else { + Err(Status::unauthenticated("invalid authorization header")) + } + } +} diff --git a/sqlite-watcher/tests/server_tests.rs b/sqlite-watcher/tests/server_tests.rs new file mode 100644 index 0000000..6a86008 --- /dev/null +++ b/sqlite-watcher/tests/server_tests.rs @@ -0,0 +1,32 @@ +use std::net::SocketAddr; +use std::time::Duration; + +use sqlite_watcher::server::TcpServerHandle; +use sqlite_watcher::watcher_proto::watcher_client::WatcherClient; +use sqlite_watcher::watcher_proto::HealthCheckRequest; +use tempfile::tempdir; +use tokio::time::sleep; +use tonic::metadata::MetadataValue; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn health_check_responds_ok() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("queue.db"); + let addr: SocketAddr = "127.0.0.1:55051".parse().unwrap(); + let token = "secret-token".to_string(); + + let _handle = TcpServerHandle::spawn(addr, queue_path, token.clone()).unwrap(); + sleep(Duration::from_millis(200)).await; + + let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) + .unwrap() + .connect() + .await + .unwrap(); + let mut client = WatcherClient::new(channel); + let mut req = tonic::Request::new(HealthCheckRequest {}); + let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap(); + req.metadata_mut().insert("authorization", header); + let resp = client.health_check(req).await.unwrap(); + assert_eq!(resp.into_inner().status, "ok"); +} From 076de6776c4438e4cb723f92a8cbd158000b0e1a Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Fri, 12 Dec 2025 11:08:39 -0800 Subject: [PATCH 5/5] [sqlite-watcher] add unix socket listener and tests --- Cargo.lock | 2 + sqlite-watcher/Cargo.toml | 1 + sqlite-watcher/README.md | 4 +- sqlite-watcher/src/main.rs | 22 ++-- sqlite-watcher/src/server.rs | 178 ++++++++++++++++++--------- sqlite-watcher/tests/server_tests.rs | 36 +++++- 6 files changed, 172 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 36ed654..da6daa9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3259,6 +3259,7 @@ dependencies = [ "tokio-stream", "tonic", "tonic-build", + "tower", "tracing", "tracing-subscriber", ] @@ -3780,6 +3781,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/sqlite-watcher/Cargo.toml b/sqlite-watcher/Cargo.toml index 9e1095c..b2c3102 100644 --- a/sqlite-watcher/Cargo.toml +++ b/sqlite-watcher/Cargo.toml @@ -28,3 +28,4 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } [dev-dependencies] tempfile = "3.8" tokio = { version = "1.35", features = ["rt", "macros"] } +tower = "0.4" diff --git a/sqlite-watcher/README.md b/sqlite-watcher/README.md index d2a8f22..bf0efe4 100644 --- a/sqlite-watcher/README.md +++ b/sqlite-watcher/README.md @@ -32,11 +32,11 @@ Flag summary: - `--log-level`: Tracing filter (also settable via `SQLITE_WATCHER_LOG`). - `--poll-interval-ms`: How often to check the WAL file for growth (default 500 ms). Lower values react faster but cost more syscalls. - `--min-event-bytes`: Minimum WAL byte growth before emitting an event. Use larger values to avoid spam when very small transactions occur. -- `--listen` + `--token-file` now control the embedded gRPC server. Clients must send `Authorization: Bearer ` metadata when calling the `Watcher` service (see `proto/watcher.proto`). Unix sockets/pipes are placeholders until Ticket D is completed; TCP listens on `127.0.0.1:`. +- `--listen` + `--token-file` now control the embedded gRPC server. Clients must send `Authorization: Bearer ` metadata when calling the `Watcher` service (see `proto/watcher.proto`). TCP (`tcp:50051`) and Unix sockets (`unix:/tmp/sqlite-watcher.sock`) are available today; Windows named pipes currently fall back to TCP until native support lands. ## Cross-platform notes -- **Linux/macOS**: Default listener is a Unix domain socket at `/tmp/sqlite-watcher.sock`. Ensure the target SQLite database enables WAL journaling. +- **Linux/macOS**: Default listener is a Unix domain socket at `/tmp/sqlite-watcher.sock`. The watcher cleans up stale socket files on startup; point `--listen unix:/path` elsewhere if needed. - **Windows**: Unix sockets are disabled; pass `--listen tcp:50051` or `--listen pipe:SerenWatcher`. Named pipes allow local service accounts without opening TCP ports. - All platforms expect the token file to live under `~/.seren/sqlite-watcher/token` by default; create the directory with `0700` permissions so the watcher refuses to start if the secret is world-readable. - The current WAL watcher polls the `*.sqlite-wal` file for byte growth. To keep WAL history available, configure your writers with `PRAGMA journal_mode=WAL;` and raise `wal_autocheckpoint` (or disable it) so the SQLite engine does not aggressively truncate the log. diff --git a/sqlite-watcher/src/main.rs b/sqlite-watcher/src/main.rs index 9b986fb..4b2f373 100644 --- a/sqlite-watcher/src/main.rs +++ b/sqlite-watcher/src/main.rs @@ -10,7 +10,9 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::Parser; use sqlite_watcher::decoder::WalGrowthDecoder; use sqlite_watcher::queue::ChangeQueue; -use sqlite_watcher::server::TcpServerHandle; +#[cfg(unix)] +use sqlite_watcher::server::spawn_unix_server; +use sqlite_watcher::server::{spawn_tcp_server, ServerHandle}; use sqlite_watcher::wal::{start_wal_watcher, WalWatcherConfig as TailConfig}; use tracing_subscriber::EnvFilter; @@ -281,21 +283,25 @@ fn start_grpc_server( listen: &ListenAddress, queue_path: &Path, token: &str, -) -> Result> { +) -> Result> { match listen { ListenAddress::Tcp { host, port } => { let addr: SocketAddr = format!("{}:{}", host, port) .parse() .with_context(|| format!("invalid tcp listen address {host}:{port}"))?; - let handle = TcpServerHandle::spawn(addr, queue_path.to_path_buf(), token.to_string())?; + let handle = spawn_tcp_server(addr, queue_path.to_path_buf(), token.to_string())?; Ok(Some(handle)) } ListenAddress::Unix(path) => { - tracing::warn!( - path = %path.display(), - "unix socket gRPC transport is not yet implemented" - ); - Ok(None) + #[cfg(unix)] + { + let handle = spawn_unix_server(path, queue_path.to_path_buf(), token.to_string())?; + Ok(Some(handle)) + } + #[cfg(not(unix))] + { + bail!("unix sockets are not supported on this platform") + } } ListenAddress::Pipe(name) => { tracing::warn!(pipe = name, "named pipe transport is not yet implemented"); diff --git a/sqlite-watcher/src/server.rs b/sqlite-watcher/src/server.rs index c398094..d41004f 100644 --- a/sqlite-watcher/src/server.rs +++ b/sqlite-watcher/src/server.rs @@ -1,5 +1,5 @@ use std::net::SocketAddr; -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::thread::{self, JoinHandle}; @@ -18,12 +18,19 @@ use crate::watcher_proto::{ SetStateRequest, SetStateResponse, }; -pub struct TcpServerHandle { +#[cfg(unix)] +use tokio::net::UnixListener; +#[cfg(unix)] +use tokio_stream::wrappers::UnixListenerStream; + +pub struct ServerHandle { shutdown: Option>, thread: Option>>, + #[cfg(unix)] + unix_path: Option, } -impl Drop for TcpServerHandle { +impl Drop for ServerHandle { fn drop(&mut self) { if let Some(tx) = self.shutdown.take() { let _ = tx.send(()); @@ -31,42 +38,95 @@ impl Drop for TcpServerHandle { if let Some(handle) = self.thread.take() { let _ = handle.join(); } + #[cfg(unix)] + if let Some(path) = self.unix_path.take() { + let _ = std::fs::remove_file(path); + } } } -impl TcpServerHandle { - pub fn spawn(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let thread = thread::spawn(move || -> Result<()> { - let runtime = Builder::new_multi_thread() - .enable_all() - .build() - .context("failed to build tokio runtime")?; - runtime.block_on(async move { - let listener = tokio::net::TcpListener::bind(addr) - .await - .context("failed to bind tcp listener")?; - let queue_path = Arc::new(queue_path); - let svc = WatcherService::new(queue_path.clone()); - let interceptor = AuthInterceptor { - token: Arc::new(token), - }; - Server::builder() - .add_service(WatcherServer::with_interceptor(svc, interceptor)) - .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move { - let _ = shutdown_rx.await; - }) - .await - .context("grpc server exited with error")?; - Ok(()) - }) - }); - - Ok(Self { - shutdown: Some(shutdown_tx), - thread: Some(thread), - }) +pub fn spawn_tcp_server( + addr: SocketAddr, + queue_path: PathBuf, + token: String, +) -> Result { + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let thread = thread::spawn(move || -> Result<()> { + let runtime = Builder::new_multi_thread() + .enable_all() + .build() + .context("failed to build tokio runtime")?; + runtime.block_on(async move { + let listener = tokio::net::TcpListener::bind(addr) + .await + .context("failed to bind tcp listener")?; + let queue_path = Arc::new(queue_path); + let svc = WatcherService::new(queue_path); + let interceptor = AuthInterceptor { + token: Arc::new(token), + }; + Server::builder() + .add_service(WatcherServer::with_interceptor(svc, interceptor)) + .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move { + let _ = shutdown_rx.await; + }) + .await + .context("grpc server exited with error")?; + Ok::<(), anyhow::Error>(()) + })?; + Ok(()) + }); + + Ok(ServerHandle { + shutdown: Some(shutdown_tx), + thread: Some(thread), + #[cfg(unix)] + unix_path: None, + }) +} + +#[cfg(unix)] +pub fn spawn_unix_server(path: &Path, queue_path: PathBuf, token: String) -> Result { + if path.exists() { + std::fs::remove_file(path) + .with_context(|| format!("failed to remove stale unix socket {}", path.display()))?; + } + if let Some(parent) = path.parent() { + std::fs::create_dir_all(parent) + .with_context(|| format!("failed to create unix socket dir {}", parent.display()))?; } + let path_buf = path.to_path_buf(); + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let path_for_drop = path_buf.clone(); + let thread = thread::spawn(move || -> Result<()> { + let runtime = Builder::new_multi_thread() + .enable_all() + .build() + .context("failed to build tokio runtime")?; + runtime.block_on(async move { + let listener = UnixListener::bind(&path_buf).context("failed to bind unix socket")?; + let queue_path = Arc::new(queue_path); + let svc = WatcherService::new(queue_path); + let interceptor = AuthInterceptor { + token: Arc::new(token), + }; + Server::builder() + .add_service(WatcherServer::with_interceptor(svc, interceptor)) + .serve_with_incoming_shutdown(UnixListenerStream::new(listener), async move { + let _ = shutdown_rx.await; + }) + .await + .context("grpc server exited with error")?; + Ok::<(), anyhow::Error>(()) + })?; + Ok(()) + }); + + Ok(ServerHandle { + shutdown: Some(shutdown_tx), + thread: Some(thread), + unix_path: Some(path_for_drop), + }) } struct WatcherService { @@ -83,6 +143,30 @@ impl WatcherService { } } +#[derive(Clone)] +struct AuthInterceptor { + token: Arc, +} + +impl Interceptor for AuthInterceptor { + fn call(&mut self, request: Request<()>) -> Result, Status> { + let header = request + .metadata() + .get("authorization") + .ok_or_else(|| Status::unauthenticated("missing authorization header"))?; + let expected = format!("Bearer {}", self.token.as_ref()); + if header + .to_str() + .map(|value| value == expected) + .unwrap_or(false) + { + Ok(request) + } else { + Err(Status::unauthenticated("invalid authorization header")) + } + } +} + #[tonic::async_trait] impl Watcher for WatcherService { async fn health_check( @@ -196,27 +280,3 @@ fn change_to_proto(row: crate::queue::ChangeRecord) -> Change { cursor: row.cursor.unwrap_or_default(), } } - -#[derive(Clone)] -struct AuthInterceptor { - token: Arc, -} - -impl Interceptor for AuthInterceptor { - fn call(&mut self, request: Request<()>) -> Result, Status> { - let header = request - .metadata() - .get("authorization") - .ok_or_else(|| Status::unauthenticated("missing authorization header"))?; - let expected = format!("Bearer {}", self.token.as_ref()); - if header - .to_str() - .map(|value| value == expected) - .unwrap_or(false) - { - Ok(request) - } else { - Err(Status::unauthenticated("invalid authorization header")) - } - } -} diff --git a/sqlite-watcher/tests/server_tests.rs b/sqlite-watcher/tests/server_tests.rs index 6a86008..0fe5526 100644 --- a/sqlite-watcher/tests/server_tests.rs +++ b/sqlite-watcher/tests/server_tests.rs @@ -1,7 +1,9 @@ use std::net::SocketAddr; use std::time::Duration; -use sqlite_watcher::server::TcpServerHandle; +use sqlite_watcher::server::spawn_tcp_server; +#[cfg(unix)] +use sqlite_watcher::server::spawn_unix_server; use sqlite_watcher::watcher_proto::watcher_client::WatcherClient; use sqlite_watcher::watcher_proto::HealthCheckRequest; use tempfile::tempdir; @@ -15,7 +17,7 @@ async fn health_check_responds_ok() { let addr: SocketAddr = "127.0.0.1:55051".parse().unwrap(); let token = "secret-token".to_string(); - let _handle = TcpServerHandle::spawn(addr, queue_path, token.clone()).unwrap(); + let _handle = spawn_tcp_server(addr, queue_path, token.clone()).unwrap(); sleep(Duration::from_millis(200)).await; let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) @@ -30,3 +32,33 @@ async fn health_check_responds_ok() { let resp = client.health_check(req).await.unwrap(); assert_eq!(resp.into_inner().status, "ok"); } +#[cfg(unix)] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn health_check_over_unix_socket() { + use tokio::net::UnixStream; + use tonic::transport::Endpoint; + use tower::service_fn; + + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("queue.db"); + let socket_path = dir.path().join("watcher.sock"); + let token = "secret-token".to_string(); + + let _handle = spawn_unix_server(&socket_path, queue_path, token.clone()).unwrap(); + sleep(Duration::from_millis(200)).await; + + let endpoint = Endpoint::try_from("http://[::]:50051").unwrap(); + let channel = endpoint + .connect_with_connector(service_fn(move |_: tonic::transport::Uri| { + let path = socket_path.clone(); + async move { UnixStream::connect(path).await } + })) + .await + .unwrap(); + let mut client = WatcherClient::new(channel); + let mut req = tonic::Request::new(HealthCheckRequest {}); + let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap(); + req.metadata_mut().insert("authorization", header); + let resp = client.health_check(req).await.unwrap(); + assert_eq!(resp.into_inner().status, "ok"); +}