diff --git a/Cargo.lock b/Cargo.lock index d7df7f7..87e12bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -240,7 +240,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "rustc-hash 2.1.1", + "rustc-hash", "shlex", "syn 2.0.108", ] @@ -780,13 +780,10 @@ dependencies = [ "serde", "serde_json", "sha2", - "sqlite-watcher", "tempfile", "tokio", "tokio-postgres", "toml", - "tonic", - "tower", "tracing", "tracing-subscriber", "url", @@ -954,7 +951,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -1203,8 +1200,10 @@ dependencies = [ [[package]] name = "fxhash" version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" dependencies = [ - "rustc-hash 1.1.0", + "byteorder", ] [[package]] @@ -2188,7 +2187,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2863,12 +2862,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "rustc-hash" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" - [[package]] name = "rustc-hash" version = "2.1.1" @@ -2917,7 +2910,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3257,28 +3250,20 @@ name = "sqlite-watcher" version = "0.1.0" dependencies = [ "anyhow", -<<<<<<< HEAD "base64 0.21.7", "clap", "dirs", "prost", -======= - "dirs", - "rand 0.8.5", ->>>>>>> origin/main "rusqlite", "serde", "serde_json", "tempfile", "thiserror 1.0.69", -<<<<<<< HEAD "tokio", "tokio-stream", "tonic", "tonic-build", "tower", -======= ->>>>>>> origin/main ] [[package]] @@ -3414,7 +3399,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.2", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4182,7 +4167,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/sqlite-watcher/README.md b/sqlite-watcher/README.md index 480acf0..ff09e91 100644 --- a/sqlite-watcher/README.md +++ b/sqlite-watcher/README.md @@ -1,10 +1,17 @@ -# sqlite-watcher (alpha) +# sqlite-watcher -This crate currently ships the shared queue + gRPC server used by `database-replicator sync-sqlite`. The `sqlite-watcher` binary includes: +This crate provides the building blocks for an upcoming sqlite-watcher binary. Issue #82 adds a durable queue plus a tonic-based gRPC server so other components can stream captured SQLite changes. -- `serve`: start the queue-backed gRPC API so clients can pull change batches. -- `enqueue`: helper for tests/smoke scripts to add sample changes to the queue database. +## Components -> **Note:** WAL tailing is still under active development; use the binary today to test queue + sync flows. +- `queue.rs`: stores change rows and per-table checkpoints in `~/.seren/sqlite-watcher/changes.db`. +- `proto/watcher.proto`: RPC definitions (`HealthCheck`, `ListChanges`, `AckChanges`, `GetState`, `SetState`). +- `server.rs`: tonic server wrappers exposing the queue over TCP or Unix sockets with shared-secret authentication. -See `sqlite-watcher-docs/installers.md` for per-OS service guidance and `scripts/test-sqlite-delta.sh` for the end-to-end smoke test. +## Building & Testing + +```bash +cargo test -p sqlite-watcher +``` + +The tests cover queue durability/state behavior. Server tests will be added once the consumer wiring lands. diff --git a/sqlite-watcher/src/server.rs b/sqlite-watcher/src/server.rs index 782c5bf..209f92d 100644 --- a/sqlite-watcher/src/server.rs +++ b/sqlite-watcher/src/server.rs @@ -8,8 +8,7 @@ use tokio::runtime::Builder; use tokio::sync::oneshot; use tokio_stream::wrappers::TcpListenerStream; use tonic::service::Interceptor; -use tonic::transport::Server; -use tonic::{Request, Response, Status}; +use tonic::{transport::Server, Request, Response, Status}; #[cfg(unix)] use tokio::net::UnixListener; @@ -67,28 +66,26 @@ impl Drop for ServerHandle { } pub fn spawn_tcp(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result { - let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let (tx, rx) = oneshot::channel(); let thread = thread::spawn(move || -> Result<()> { - let runtime = Builder::new_multi_thread().enable_all().build()?; - runtime.block_on(async move { + let rt = Builder::new_multi_thread().enable_all().build()?; + rt.block_on(async move { let listener = tokio::net::TcpListener::bind(addr) .await .context("failed to bind tcp listener")?; - let queue_path = Arc::new(queue_path); - let svc = WatcherService::new(queue_path); + let service = WatcherService::new(queue_path); let interceptor = AuthInterceptor::new(token); Server::builder() - .add_service(WatcherServer::with_interceptor(svc, interceptor)) + .add_service(WatcherServer::with_interceptor(service, interceptor)) .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move { - let _ = shutdown_rx.await; + let _ = rx.await; }) .await .context("grpc server exited") }) }); - Ok(ServerHandle::Tcp { - shutdown: Some(shutdown_tx), + shutdown: Some(tx), thread: Some(thread), }) } @@ -97,41 +94,34 @@ pub fn spawn_tcp(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result pub fn spawn_unix(path: &Path, queue_path: PathBuf, token: String) -> Result { if path.exists() { std::fs::remove_file(path) - .with_context(|| format!("failed to remove stale unix socket {}", path.display()))?; + .with_context(|| format!("failed to remove stale socket {}", path.display()))?; } if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent).with_context(|| { - format!( - "failed to create unix socket directory {}", - parent.display() - ) - })?; + std::fs::create_dir_all(parent) + .with_context(|| format!("failed to create socket dir {}", parent.display()))?; } - let path_buf = path.to_path_buf(); - let listener_path = path_buf.clone(); - let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let socket_path = path.to_path_buf(); + let (tx, rx) = oneshot::channel(); + let path_clone = socket_path.clone(); let thread = thread::spawn(move || -> Result<()> { - let runtime = Builder::new_multi_thread().enable_all().build()?; - runtime.block_on(async move { - let listener = - UnixListener::bind(&listener_path).context("failed to bind unix socket")?; - let queue_path = Arc::new(queue_path); - let svc = WatcherService::new(queue_path); + let rt = Builder::new_multi_thread().enable_all().build()?; + rt.block_on(async move { + let listener = UnixListener::bind(&path_clone).context("failed to bind unix socket")?; + let service = WatcherService::new(queue_path); let interceptor = AuthInterceptor::new(token); Server::builder() - .add_service(WatcherServer::with_interceptor(svc, interceptor)) + .add_service(WatcherServer::with_interceptor(service, interceptor)) .serve_with_incoming_shutdown(UnixListenerStream::new(listener), async move { - let _ = shutdown_rx.await; + let _ = rx.await; }) .await .context("grpc server exited") }) }); - Ok(ServerHandle::Unix { - shutdown: Some(shutdown_tx), + shutdown: Some(tx), thread: Some(thread), - path: path_buf, + path: socket_path, }) } @@ -141,8 +131,10 @@ struct WatcherService { } impl WatcherService { - fn new(queue_path: Arc) -> Self { - Self { queue_path } + fn new(queue_path: PathBuf) -> Self { + Self { + queue_path: Arc::new(queue_path), + } } fn queue(&self) -> Result { @@ -164,14 +156,14 @@ impl AuthInterceptor { } impl Interceptor for AuthInterceptor { - fn call(&mut self, req: Request<()>) -> Result, Status> { - let provided = req + fn call(&mut self, request: Request<()>) -> Result, Status> { + let provided = request .metadata() .get("authorization") .ok_or_else(|| Status::unauthenticated("missing authorization header"))?; let expected = format!("Bearer {}", self.token.as_str()); if provided == expected.as_str() { - Ok(req) + Ok(request) } else { Err(Status::unauthenticated("invalid authorization header")) } @@ -207,7 +199,6 @@ impl Watcher for WatcherService { let upto = request.get_ref().up_to_change_id; let queue = self.queue().map_err(internal_err)?; let count = queue.ack_up_to(upto).map_err(internal_err)?; - queue.purge_acked().ok(); Ok(Response::new(AckChangesResponse { acknowledged: count, })) diff --git a/sqlite-watcher/tests/server_tests.rs b/sqlite-watcher/tests/server_tests.rs index 0fe5526..2515678 100644 --- a/sqlite-watcher/tests/server_tests.rs +++ b/sqlite-watcher/tests/server_tests.rs @@ -1,23 +1,38 @@ use std::net::SocketAddr; use std::time::Duration; -use sqlite_watcher::server::spawn_tcp_server; -#[cfg(unix)] -use sqlite_watcher::server::spawn_unix_server; +use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange}; +use sqlite_watcher::server::spawn_tcp; use sqlite_watcher::watcher_proto::watcher_client::WatcherClient; -use sqlite_watcher::watcher_proto::HealthCheckRequest; +use sqlite_watcher::watcher_proto::{AckChangesRequest, HealthCheckRequest, ListChangesRequest}; use tempfile::tempdir; use tokio::time::sleep; use tonic::metadata::MetadataValue; +fn seed_queue(path: &str) { + let queue = ChangeQueue::open(path).unwrap(); + for i in 0..2 { + let change = NewChange { + table_name: "examples".into(), + operation: ChangeOperation::Insert, + primary_key: format!("row-{i}"), + payload: None, + wal_frame: None, + cursor: None, + }; + queue.enqueue(&change).unwrap(); + } +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn health_check_responds_ok() { +async fn tcp_server_handles_health_and_list() { let dir = tempdir().unwrap(); let queue_path = dir.path().join("queue.db"); - let addr: SocketAddr = "127.0.0.1:55051".parse().unwrap(); - let token = "secret-token".to_string(); + seed_queue(queue_path.to_str().unwrap()); - let _handle = spawn_tcp_server(addr, queue_path, token.clone()).unwrap(); + let addr: SocketAddr = "127.0.0.1:56060".parse().unwrap(); + let token = "secret".to_string(); + let _handle = spawn_tcp(addr, queue_path, token.clone()).unwrap(); sleep(Duration::from_millis(200)).await; let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) @@ -26,39 +41,68 @@ async fn health_check_responds_ok() { .await .unwrap(); let mut client = WatcherClient::new(channel); - let mut req = tonic::Request::new(HealthCheckRequest {}); + + let mut health_req = tonic::Request::new(HealthCheckRequest {}); let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap(); - req.metadata_mut().insert("authorization", header); - let resp = client.health_check(req).await.unwrap(); - assert_eq!(resp.into_inner().status, "ok"); + health_req + .metadata_mut() + .insert("authorization", header.clone()); + client.health_check(health_req).await.unwrap(); + + let mut list_req = tonic::Request::new(ListChangesRequest { limit: 10 }); + list_req.metadata_mut().insert("authorization", header); + let resp = client.list_changes(list_req).await.unwrap(); + assert_eq!(resp.into_inner().changes.len(), 2); } -#[cfg(unix)] -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn health_check_over_unix_socket() { - use tokio::net::UnixStream; - use tonic::transport::Endpoint; - use tower::service_fn; +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn unauthenticated_requests_fail() { let dir = tempdir().unwrap(); let queue_path = dir.path().join("queue.db"); - let socket_path = dir.path().join("watcher.sock"); - let token = "secret-token".to_string(); + let addr: SocketAddr = "127.0.0.1:56061".parse().unwrap(); + let token = "secret".to_string(); + let _handle = spawn_tcp(addr, queue_path, token).unwrap(); + sleep(Duration::from_millis(200)).await; + + let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) + .unwrap() + .connect() + .await + .unwrap(); + let mut client = WatcherClient::new(channel); + + let request = tonic::Request::new(ListChangesRequest { limit: 1 }); + let err = client.list_changes(request).await.unwrap_err(); + assert_eq!(err.code(), tonic::Code::Unauthenticated); +} - let _handle = spawn_unix_server(&socket_path, queue_path, token.clone()).unwrap(); +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn ack_changes_advances_queue() { + let dir = tempdir().unwrap(); + let queue_path = dir.path().join("queue.db"); + seed_queue(queue_path.to_str().unwrap()); + let addr: SocketAddr = "127.0.0.1:56062".parse().unwrap(); + let token = "secret".to_string(); + let _handle = spawn_tcp(addr, queue_path, token.clone()).unwrap(); sleep(Duration::from_millis(200)).await; - let endpoint = Endpoint::try_from("http://[::]:50051").unwrap(); - let channel = endpoint - .connect_with_connector(service_fn(move |_: tonic::transport::Uri| { - let path = socket_path.clone(); - async move { UnixStream::connect(path).await } - })) + let channel = tonic::transport::Channel::from_shared(format!("http://{}", addr)) + .unwrap() + .connect() .await .unwrap(); let mut client = WatcherClient::new(channel); - let mut req = tonic::Request::new(HealthCheckRequest {}); let header = MetadataValue::try_from(format!("Bearer {}", token)).unwrap(); - req.metadata_mut().insert("authorization", header); - let resp = client.health_check(req).await.unwrap(); - assert_eq!(resp.into_inner().status, "ok"); + + let mut req = tonic::Request::new(ListChangesRequest { limit: 10 }); + req.metadata_mut().insert("authorization", header.clone()); + let resp = client.list_changes(req).await.unwrap().into_inner(); + assert_eq!(resp.changes.len(), 2); + let highest = resp.changes.last().unwrap().change_id; + + let mut ack_req = tonic::Request::new(AckChangesRequest { + up_to_change_id: highest, + }); + ack_req.metadata_mut().insert("authorization", header); + client.ack_changes(ack_req).await.unwrap(); }