diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 998630f..fa4aadd 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -23,18 +23,26 @@ jobs: target: x86_64-unknown-linux-gnu artifact_name: database-replicator asset_name: database-replicator-linux-x64-binary + watcher_artifact: sqlite-watcher + watcher_asset: sqlite-watcher-linux-x64 - os: macos-latest target: x86_64-apple-darwin artifact_name: database-replicator asset_name: database-replicator-macos-x64-binary + watcher_artifact: sqlite-watcher + watcher_asset: sqlite-watcher-macos-x64 - os: macos-latest target: aarch64-apple-darwin artifact_name: database-replicator asset_name: database-replicator-macos-arm64-binary + watcher_artifact: sqlite-watcher + watcher_asset: sqlite-watcher-macos-arm64 - os: windows-latest target: x86_64-pc-windows-msvc artifact_name: database-replicator.exe asset_name: database-replicator-windows-x64.exe + watcher_artifact: sqlite-watcher.exe + watcher_asset: sqlite-watcher-windows-x64.exe steps: - name: Checkout code uses: actions/checkout@v4 @@ -75,6 +83,9 @@ jobs: - name: Build release binary run: cargo build --release --target ${{ matrix.target }} --verbose + - name: Build sqlite-watcher binary + run: cargo build --release --target ${{ matrix.target }} -p sqlite-watcher --verbose + - name: Strip binary (Linux) if: matrix.os == 'ubuntu-latest' run: strip target/${{ matrix.target }}/release/${{ matrix.artifact_name }} @@ -89,12 +100,24 @@ jobs: cp target/${{ matrix.target }}/release/${{ matrix.artifact_name }} ${{ matrix.asset_name }} chmod +x ${{ matrix.asset_name }} + - name: Rename sqlite-watcher (Unix) + if: matrix.os != 'windows-latest' + run: | + cp target/${{ matrix.target }}/release/${{ matrix.watcher_artifact }} ${{ matrix.watcher_asset }} + chmod +x ${{ matrix.watcher_asset }} + - name: Rename binary (Windows) if: matrix.os == 'windows-latest' run: | copy target\${{ matrix.target }}\release\${{ matrix.artifact_name }} ${{ matrix.asset_name }} shell: cmd + - name: Rename sqlite-watcher (Windows) + if: matrix.os == 'windows-latest' + run: | + copy target\${{ matrix.target }}\release\${{ matrix.watcher_artifact }} ${{ matrix.watcher_asset }} + shell: cmd + - name: Upload artifact uses: actions/upload-artifact@v4 with: @@ -102,6 +125,13 @@ jobs: path: ${{ matrix.asset_name }} if-no-files-found: error + - name: Upload sqlite-watcher artifact + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.watcher_asset }} + path: ${{ matrix.watcher_asset }} + if-no-files-found: error + create-release: name: Create GitHub Release needs: build-release diff --git a/CHANGELOG.md b/CHANGELOG.md index 5069762..126eeaa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -97,7 +97,7 @@ All notable changes to this project will be documented in this file. ### Changed -- **README-SQLite.md**: Updated all examples to include `-y` flag and added notes explaining that interactive mode only works with PostgreSQL sources. +- **sqlite-watcher-docs/README-SQLite.md**: Updated all examples to include `-y` flag and added notes explaining that interactive mode only works with PostgreSQL sources. ## [7.0.4] - 2025-12-09 @@ -408,7 +408,7 @@ All notable changes to this project will be documented in this file. - **File-based migration** (local execution only, no remote support) - **Path validation** with directory traversal prevention - **Comprehensive security testing**: 14 SQLite-specific tests -- **Documentation**: [README-SQLite.md](README-SQLite.md) with usage examples +- **Documentation**: [sqlite-watcher-docs/README-SQLite.md](sqlite-watcher-docs/README-SQLite.md) with usage examples - **Integration tests**: Full workflow testing with real SQLite files #### MongoDB Support (Phase 2) @@ -491,7 +491,7 @@ All notable changes to this project will be documented in this file. - **[README.md](README.md)** - Universal landing page with multi-database support - **[README-PostgreSQL.md](README-PostgreSQL.md)** - Comprehensive PostgreSQL replication guide (1,000+ lines) -- **[README-SQLite.md](README-SQLite.md)** - Complete SQLite migration guide +- **[sqlite-watcher-docs/README-SQLite.md](sqlite-watcher-docs/README-SQLite.md)** - Complete SQLite migration guide - **[README-MongoDB.md](README-MongoDB.md)** - Complete MongoDB migration guide with periodic refresh - **[README-MySQL.md](README-MySQL.md)** - Complete MySQL/MariaDB migration guide - **[docs/plans/multi-database-support.md](docs/plans/multi-database-support.md)** - Implementation plan and architecture diff --git a/Cargo.lock b/Cargo.lock index a654f04..d7df7f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,6 +118,28 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.108", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -135,6 +157,51 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "axum" +version = "0.6.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" +dependencies = [ + "async-trait", + "axum-core", + "bitflags 1.3.2", + "bytes", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "sync_wrapper", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "base64" version = "0.21.7" @@ -169,11 +236,11 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools", + "itertools 0.13.0", "proc-macro2", "quote", "regex", - "rustc-hash", + "rustc-hash 2.1.1", "shlex", "syn 2.0.108", ] @@ -246,7 +313,7 @@ dependencies = [ "getrandom 0.2.16", "getrandom 0.3.4", "hex", - "indexmap", + "indexmap 2.12.0", "js-sys", "once_cell", "rand 0.9.2", @@ -713,10 +780,13 @@ dependencies = [ "serde", "serde_json", "sha2", + "sqlite-watcher", "tempfile", "tokio", "tokio-postgres", "toml", + "tonic", + "tower", "tracing", "tracing-subscriber", "url", @@ -884,7 +954,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -917,6 +987,12 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" +[[package]] +name = "fixedbitset" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" + [[package]] name = "flate2" version = "1.1.5" @@ -1127,10 +1203,8 @@ dependencies = [ [[package]] name = "fxhash" version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" dependencies = [ - "byteorder", + "rustc-hash 1.1.0", ] [[package]] @@ -1188,7 +1262,7 @@ dependencies = [ "futures-sink", "futures-util", "http", - "indexmap", + "indexmap 2.12.0", "slab", "tokio", "tokio-util", @@ -1374,6 +1448,18 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-timeout" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" +dependencies = [ + "hyper", + "pin-project-lite", + "tokio", + "tokio-io-timeout", +] + [[package]] name = "hyper-tls" version = "0.5.0" @@ -1519,6 +1605,16 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", +] + [[package]] name = "indexmap" version = "2.12.0" @@ -1583,6 +1679,15 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.13.0" @@ -1624,7 +1729,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" dependencies = [ - "indexmap", + "indexmap 2.12.0", ] [[package]] @@ -1786,6 +1891,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -1941,6 +2052,12 @@ dependencies = [ "syn 2.0.108", ] +[[package]] +name = "multimap" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" + [[package]] name = "mysql-common-derive" version = "0.31.2" @@ -2071,7 +2188,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2222,6 +2339,16 @@ version = "2.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" +[[package]] +name = "petgraph" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" +dependencies = [ + "fixedbitset", + "indexmap 2.12.0", +] + [[package]] name = "phf" version = "0.13.1" @@ -2354,6 +2481,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn 2.0.108", +] + [[package]] name = "proc-macro-crate" version = "3.4.0" @@ -2394,6 +2531,59 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "prost" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29" +dependencies = [ + "bytes", + "prost-derive", +] + +[[package]] +name = "prost-build" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" +dependencies = [ + "bytes", + "heck", + "itertools 0.12.1", + "log", + "multimap", + "once_cell", + "petgraph", + "prettyplease", + "prost", + "prost-types", + "regex", + "syn 2.0.108", + "tempfile", +] + +[[package]] +name = "prost-derive" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1" +dependencies = [ + "anyhow", + "itertools 0.12.1", + "proc-macro2", + "quote", + "syn 2.0.108", +] + +[[package]] +name = "prost-types" +version = "0.12.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0" +dependencies = [ + "prost", +] + [[package]] name = "ptr_meta" version = "0.1.4" @@ -2673,6 +2863,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rustc-hash" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" + [[package]] name = "rustc-hash" version = "2.1.1" @@ -2721,7 +2917,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -2882,7 +3078,7 @@ version = "1.0.145" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "402a6f66d8c709116cf22f558eab210f5a50187f702eb4d7e5ef38d9a7f1c79c" dependencies = [ - "indexmap", + "indexmap 2.12.0", "itoa", "memchr", "ryu", @@ -3061,13 +3257,28 @@ name = "sqlite-watcher" version = "0.1.0" dependencies = [ "anyhow", +<<<<<<< HEAD + "base64 0.21.7", + "clap", + "dirs", + "prost", +======= "dirs", "rand 0.8.5", +>>>>>>> origin/main "rusqlite", "serde", "serde_json", "tempfile", "thiserror 1.0.69", +<<<<<<< HEAD + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tower", +======= +>>>>>>> origin/main ] [[package]] @@ -3203,7 +3414,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.2", - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -3346,6 +3557,16 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "tokio-io-timeout" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bd86198d9ee903fedd2f9a2e72014287c0d9167e4ae43b5853007205dda1b76" +dependencies = [ + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-macros" version = "2.6.0" @@ -3403,6 +3624,17 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.17" @@ -3453,7 +3685,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap", + "indexmap 2.12.0", "serde", "serde_spanned", "toml_datetime 0.6.11", @@ -3467,7 +3699,7 @@ version = "0.23.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6485ef6d0d9b5d0ec17244ff7eb05310113c3f316f2d14200d4de56b3cb98f8d" dependencies = [ - "indexmap", + "indexmap 2.12.0", "toml_datetime 0.7.3", "toml_parser", "winnow", @@ -3488,6 +3720,72 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tonic" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13" +dependencies = [ + "async-stream", + "async-trait", + "axum", + "base64 0.21.7", + "bytes", + "h2", + "http", + "http-body", + "hyper", + "hyper-timeout", + "percent-encoding", + "pin-project", + "prost", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "quote", + "syn 2.0.108", +] + +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "indexmap 1.9.3", + "pin-project", + "pin-project-lite", + "rand 0.8.5", + "slab", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-layer" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "121c2a6cda46980bb0fcd1647ffaf6cd3fc79a013de288782836f6df9c48780e" + [[package]] name = "tower-service" version = "0.3.3" @@ -3500,6 +3798,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -3883,7 +4182,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ab7988c..fe1a445 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,8 @@ indicatif = "0.18" which = "6.0" home = ">=0.5.4, <0.5.12" # Pin to avoid v0.5.12 which requires unstable edition2024 rand = "0.8" -reqwest = { version = "0.11", features = ["json"] } +# Disable rustls to avoid pulling rustls-pemfile (unmaintained) +reqwest = { version = "0.11", default-features = false, features = ["json", "native-tls"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = "0.10" @@ -51,6 +52,12 @@ url = "2.5" chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] } libc = "0.2" rust_decimal = { version = "1.39", features = ["db-tokio-postgres"] } +tonic = { version = "0.11", features = ["transport"] } +tower = "0.4" +sqlite-watcher = { path = "sqlite-watcher" } + +[patch.crates-io] +fxhash = { path = "third-party/fxhash" } [target.'cfg(unix)'.dependencies] daemonize = "0.5" diff --git a/Dockerfile b/Dockerfile index bd302dc..7952720 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,19 +2,22 @@ FROM ubuntu:24.04 AS downloader ARG VERSION=latest -ENV BINARY_NAME=database-replicator-linux-x64-binary +ENV REPLICATOR_ASSET=database-replicator-linux-x64-binary +ENV WATCHER_ASSET=sqlite-watcher-linux-x64 ENV RELEASE_ROOT=https://github.com/serenorg/database-replicator/releases RUN apt-get update && apt-get install -y --no-install-recommends curl ca-certificates && rm -rf /var/lib/apt/lists/* RUN set -eux; \ if [ "$VERSION" = "latest" ]; then \ - URL="$RELEASE_ROOT/latest/download/$BINARY_NAME"; \ + REP_URL="$RELEASE_ROOT/latest/download/$REPLICATOR_ASSET"; \ + WATCH_URL="$RELEASE_ROOT/latest/download/$WATCHER_ASSET"; \ else \ - URL="$RELEASE_ROOT/download/$VERSION/$BINARY_NAME"; \ + REP_URL="$RELEASE_ROOT/download/$VERSION/$REPLICATOR_ASSET"; \ + WATCH_URL="$RELEASE_ROOT/download/$VERSION/$WATCHER_ASSET"; \ fi; \ - curl -fL "$URL" -o /tmp/database-replicator && \ - chmod +x /tmp/database-replicator + curl -fL "$REP_URL" -o /tmp/database-replicator && chmod +x /tmp/database-replicator && \ + curl -fL "$WATCH_URL" -o /tmp/sqlite-watcher && chmod +x /tmp/sqlite-watcher FROM ubuntu:24.04 LABEL org.opencontainers.image.title="database-replicator" \ @@ -27,6 +30,7 @@ RUN apt-get update && \ useradd -m replicator COPY --from=downloader /tmp/database-replicator /usr/local/bin/database-replicator +COPY --from=downloader /tmp/sqlite-watcher /usr/local/bin/sqlite-watcher USER replicator ENTRYPOINT ["database-replicator"] CMD ["--help"] diff --git a/README.md b/README.md index f1848db..b0677de 100644 --- a/README.md +++ b/README.md @@ -139,7 +139,7 @@ database-replicator init \ --target "postgresql://user:pass@host:5432/db" ``` -**[📖 Full SQLite Guide →](README-SQLite.md)** +**[📖 Full SQLite Guide →](sqlite-watcher-docs/README-SQLite.md)** --- @@ -302,7 +302,7 @@ docker run --rm -it \ ### Database-Specific Guides - **[PostgreSQL to PostgreSQL](README-PostgreSQL.md)** - Zero-downtime replication with logical replication -- **[SQLite to PostgreSQL](README-SQLite.md)** - One-time replication using JSONB storage +- **[SQLite to PostgreSQL](sqlite-watcher-docs/README-SQLite.md)** - One-time replication using JSONB storage - **[MongoDB to PostgreSQL](README-MongoDB.md)** - One-time replication with periodic refresh support - **[MySQL/MariaDB to PostgreSQL](README-MySQL.md)** - One-time replication with periodic refresh support diff --git a/scripts/test-sqlite-delta.sh b/scripts/test-sqlite-delta.sh new file mode 100755 index 0000000..743c820 --- /dev/null +++ b/scripts/test-sqlite-delta.sh @@ -0,0 +1,93 @@ +#!/usr/bin/env bash +# Smoke test for sqlite-watcher + database-replicator incremental sync +# Requires: docker, sqlite-watcher, database-replicator, sqlite3 + +set -euo pipefail + +if ! command -v docker >/dev/null; then + echo "[smoke] docker is required" >&2 + exit 1 +fi +if ! command -v sqlite-watcher >/dev/null; then + echo "[smoke] sqlite-watcher binary not found in PATH" >&2 + exit 1 +fi +if ! command -v database-replicator >/dev/null; then + echo "[smoke] database-replicator binary not found in PATH" >&2 + exit 1 +fi + +TMPDIR=$(mktemp -d) +QUEUE_DB="$TMPDIR/queue.db" +SOCK="$TMPDIR/watcher.sock" +TOKEN_FILE="$TMPDIR/token" +POSTGRES_PORT=55432 +CONTAINER_NAME=sqlite-delta-smoke + +cleanup() { + set +e + if [[ -n "${WATCHER_PID:-}" ]]; then + kill "$WATCHER_PID" >/dev/null 2>&1 || true + fi + docker rm -f "$CONTAINER_NAME" >/dev/null 2>&1 || true + rm -rf "$TMPDIR" +} +trap cleanup EXIT + +echo "[smoke] preparing token + queue" +mkdir -p "$(dirname "$TOKEN_FILE")" +printf 'smoke-%s' "$RANDOM" > "$TOKEN_FILE" +chmod 600 "$TOKEN_FILE" + +sqlite-watcher enqueue --queue-db "$QUEUE_DB" --table demo --id smoke --payload '{"message":"hello-from-watcher"}' + +sqlite-watcher serve --queue-db "$QUEUE_DB" --listen "unix:$SOCK" --token-file "$TOKEN_FILE" >/dev/null 2>&1 & +WATCHER_PID=$! +sleep 1 + +echo "[smoke] starting postgres container" +docker run -d --rm \ + --name "$CONTAINER_NAME" \ + -e POSTGRES_PASSWORD=postgres \ + -p "$POSTGRES_PORT":5432 \ + postgres:15 >/dev/null + +until docker exec "$CONTAINER_NAME" pg_isready -U postgres >/dev/null 2>&1; do + sleep 1 +done + +docker exec "$CONTAINER_NAME" psql -U postgres <<'SQL' +CREATE TABLE IF NOT EXISTS demo ( + id TEXT PRIMARY KEY, + data JSONB NOT NULL, + _source_type TEXT NOT NULL DEFAULT 'sqlite', + _migrated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +CREATE TABLE IF NOT EXISTS sqlite_sync_state ( + table_name TEXT PRIMARY KEY, + last_change_id BIGINT NOT NULL DEFAULT 0, + last_wal_frame TEXT, + cursor TEXT, + snapshot_completed BOOLEAN NOT NULL DEFAULT FALSE, + incremental_mode TEXT NOT NULL DEFAULT 'append', + baseline_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); +INSERT INTO sqlite_sync_state(table_name, snapshot_completed, incremental_mode) +VALUES ('demo', TRUE, 'append') +ON CONFLICT(table_name) DO UPDATE SET snapshot_completed = EXCLUDED.snapshot_completed, + incremental_mode = EXCLUDED.incremental_mode; +SQL + +echo "[smoke] running sync-sqlite" +DATABASE_URL="postgresql://postgres:postgres@localhost:$POSTGRES_PORT/postgres" +database-replicator sync-sqlite \ + --target "$DATABASE_URL" \ + --watcher-endpoint "unix:$SOCK" \ + --token-file "$TOKEN_FILE" \ + --batch-size 50 \ + --incremental-mode append >/dev/null + +docker exec "$CONTAINER_NAME" psql -U postgres -tAc "SELECT count(*) FROM demo WHERE id = 'smoke'" | grep -q '^ 1' + +echo "[smoke] success! sqlite-watcher + sync-sqlite end-to-end" +echo "[windows] Manual steps: run sqlite-watcher serve with tcp listener, start a Postgres instance (Docker Desktop works), then run database-replicator sync-sqlite with the TCP watcher endpoint." diff --git a/README-SQLite.md b/sqlite-watcher-docs/README-SQLite.md similarity index 92% rename from README-SQLite.md rename to sqlite-watcher-docs/README-SQLite.md index a01b44d..ea0f6d8 100644 --- a/README-SQLite.md +++ b/sqlite-watcher-docs/README-SQLite.md @@ -596,3 +596,36 @@ No. The tool uses `SQLITE_OPEN_READ_ONLY` which allows concurrent readers. Other For issues or questions: - **GitHub Issues**: https://github.com/serenorg/database-replicator/issues - **Email**: support@seren.ai +## Delta replication with sqlite-watcher + +Once you have completed the initial snapshot (`database-replicator init --source sqlite ...`), you can switch to incremental change capture: + +1. Install `sqlite-watcher` (see [sqlite-watcher-docs/installers.md](installers.md) for Linux systemd units, macOS launchd plists, and Windows service guidance). +2. Start the watcher beside your `.sqlite` file (example for Linux/macOS): + + ```bash + sqlite-watcher serve \ + --queue-db ~/.seren/sqlite-watcher/changes.db \ + --listen unix:/tmp/sqlite-watcher.sock \ + --token-file ~/.seren/sqlite-watcher/token + ``` + +3. Consume the change feed with the new command: + + ```bash + database-replicator sync-sqlite \ + --target "postgresql://user:pass@your-serendb.serendb.com:5432/app" \ + --watcher-endpoint unix:/tmp/sqlite-watcher.sock \ + --token-file ~/.seren/sqlite-watcher/token \ + --incremental-mode append + ``` + + Use `--incremental-mode append_deduped` to maintain `_latest` tables (one row per primary key) in addition to the append-only history. + +4. Verify the smoke test if you have Docker available: + + ```bash + scripts/test-sqlite-delta.sh + ``` + + The script spins up a temporary Postgres container, runs `sqlite-watcher`, and executes `database-replicator sync-sqlite` against the watcher feed. Windows users can follow the same steps manually (the script prints the equivalent commands at the end). diff --git a/sqlite-watcher-docs/installers.md b/sqlite-watcher-docs/installers.md new file mode 100644 index 0000000..1a2fb1a --- /dev/null +++ b/sqlite-watcher-docs/installers.md @@ -0,0 +1,109 @@ +# sqlite-watcher installation guide + +This document walks through running the sqlite-watcher service on Linux, macOS, and Windows. The watcher process should run beside the `.sqlite` file so it can tail the WAL and expose change batches over the embedded gRPC API. + +All platforms share these basics: + +- Create a token file (default `~/.seren/sqlite-watcher/token`) with restrictive permissions (owner read/write only). +- Choose a queue database path (default `~/.seren/sqlite-watcher/changes.db`). Ensure the parent directory is `0700` on Unix. +- Run `sqlite-watcher serve --queue-db --listen --token-file ` to start the gRPC service. Endpoints use the `unix:/path` or `tcp:host:port` syntax. + +## Linux (systemd) + +1. Install binaries: + + ```bash + sudo install -m 0755 database-replicator /usr/local/bin/database-replicator + sudo install -m 0755 sqlite-watcher /usr/local/bin/sqlite-watcher + ``` + +2. Create token + queue directories: + + ```bash + install -d -m 0700 ~/.seren/sqlite-watcher + openssl rand -hex 32 > ~/.seren/sqlite-watcher/token + ``` + +3. Create `/etc/systemd/system/sqlite-watcher.service`: + + ```ini + [Unit] + Description=sqlite-watcher for /srv/app.db + After=network-online.target + + [Service] + User=replicator + ExecStart=/usr/local/bin/sqlite-watcher serve \ + --queue-db /var/lib/sqlite-watcher/changes.db \ + --listen unix:/run/sqlite-watcher.sock \ + --token-file /home/replicator/.seren/sqlite-watcher/token + Restart=on-failure + + [Install] + WantedBy=multi-user.target + ``` + +4. Enable/start: + + ```bash + sudo systemctl daemon-reload + sudo systemctl enable --now sqlite-watcher.service + ``` + +## macOS (launchd) + +1. Copy binaries into `/usr/local/bin`. +2. Save the following to `~/Library/LaunchAgents/com.seren.sqlite-watcher.plist`: + + ```xml + + + + + Label + com.seren.sqlite-watcher + ProgramArguments + + /usr/local/bin/sqlite-watcher + serve + --queue-db + /Users/you/.seren/sqlite-watcher/changes.db + --listen + unix:/Users/you/.seren/sqlite-watcher/watcher.sock + --token-file + /Users/you/.seren/sqlite-watcher/token + + RunAtLoad + + KeepAlive + + StandardOutPath + /Users/you/Library/Logs/sqlite-watcher.log + StandardErrorPath + /Users/you/Library/Logs/sqlite-watcher.log + + + ``` + +3. Load the agent: `launchctl load ~/Library/LaunchAgents/com.seren.sqlite-watcher.plist`. + +## Windows (Service) + +1. Copy `database-replicator.exe` and `sqlite-watcher.exe` to a directory on `%PATH%` (e.g. `C:\Program Files\Seren`). +2. Create a token file under `%USERPROFILE%\.seren\sqlite-watcher\token`. +3. Use the built-in `sc.exe` to install a service (or NSSM if you prefer a GUI): + + ```powershell + sc.exe create sqlite-watcher binPath= "C:\Program Files\Seren\sqlite-watcher.exe serve --queue-db C:\data\sqlite-watcher\changes.db --listen tcp:127.0.0.1:6000 --token-file %USERPROFILE%\.seren\sqlite-watcher\token" start= auto + ``` + +4. Start the service with `sc.exe start sqlite-watcher`. + +Remember to open the firewall only if the watcher must accept remote TCP connections. In most deployments, keep it bound to loopback or Unix sockets. + +## Running sync-sqlite on a schedule + +- Linux/macOS: use cron or systemd timers to run `database-replicator sync-sqlite ...` periodically. +- Windows: create a Scheduled Task pointing at `database-replicator.exe sync-sqlite ...`. + +Consult the smoke test (`scripts/test-sqlite-delta.sh`) to see a minimal end-to-end example. diff --git a/sqlite-watcher/Cargo.toml b/sqlite-watcher/Cargo.toml index 5028dcc..b81704e 100644 --- a/sqlite-watcher/Cargo.toml +++ b/sqlite-watcher/Cargo.toml @@ -3,18 +3,30 @@ name = "sqlite-watcher" version = "0.1.0" edition = "2021" authors = ["SerenAI "] -description = "Utilities for monitoring SQLite databases (queue module)." +description = "SQLite watcher components (queue + gRPC)." license = "Apache-2.0" repository = "https://github.com/serenorg/database-replicator" +build = "build.rs" + +[build-dependencies] +tonic-build = "0.11" [dependencies] anyhow = "1.0" +clap = { version = "4.4", features = ["derive", "env"] } dirs = "5.0" rusqlite = { version = "0.30", features = ["chrono"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" +base64 = "0.21" +tokio = { version = "1.35", features = ["rt-multi-thread", "macros", "signal", "fs"] } +tonic = { version = "0.11", features = ["transport"] } +tokio-stream = { version = "0.1", features = ["net"] } +prost = "0.12" [dev-dependencies] tempfile = "3.8" -rand = "0.8" +tokio = { version = "1.35", features = ["rt", "macros"] } +tonic = { version = "0.11", features = ["transport"] } +tower = "0.4" diff --git a/sqlite-watcher/README.md b/sqlite-watcher/README.md index 6f798d3..480acf0 100644 --- a/sqlite-watcher/README.md +++ b/sqlite-watcher/README.md @@ -1,5 +1,10 @@ -# sqlite-watcher +# sqlite-watcher (alpha) -Work-in-progress tooling for monitoring SQLite databases. This issue adds the durable change queue used by the watcher service. The queue stores row-level changes plus per-table checkpoints in `~/.seren/sqlite-watcher/changes.db` so restarts can resume from the last acknowledged WAL frame. +This crate currently ships the shared queue + gRPC server used by `database-replicator sync-sqlite`. The `sqlite-watcher` binary includes: -Run `cargo test -p sqlite-watcher` to execute the queue integration tests. +- `serve`: start the queue-backed gRPC API so clients can pull change batches. +- `enqueue`: helper for tests/smoke scripts to add sample changes to the queue database. + +> **Note:** WAL tailing is still under active development; use the binary today to test queue + sync flows. + +See `sqlite-watcher-docs/installers.md` for per-OS service guidance and `scripts/test-sqlite-delta.sh` for the end-to-end smoke test. diff --git a/sqlite-watcher/proto/watcher.proto b/sqlite-watcher/proto/watcher.proto index 064f3cb..2424406 100644 --- a/sqlite-watcher/proto/watcher.proto +++ b/sqlite-watcher/proto/watcher.proto @@ -3,14 +3,9 @@ syntax = "proto3"; package sqlitewatcher; message HealthCheckRequest {} -message HealthCheckResponse { - string status = 1; -} - -message ListChangesRequest { - uint32 limit = 1; -} +message HealthCheckResponse { string status = 1; } +message ListChangesRequest { uint32 limit = 1; } message Change { int64 change_id = 1; string table_name = 2; @@ -20,23 +15,12 @@ message Change { string wal_frame = 6; string cursor = 7; } +message ListChangesResponse { repeated Change changes = 1; } -message ListChangesResponse { - repeated Change changes = 1; -} - -message AckChangesRequest { - int64 up_to_change_id = 1; -} - -message AckChangesResponse { - uint64 acknowledged = 1; -} - -message GetStateRequest { - string table_name = 1; -} +message AckChangesRequest { int64 up_to_change_id = 1; } +message AckChangesResponse { uint64 acknowledged = 1; } +message GetStateRequest { string table_name = 1; } message GetStateResponse { bool exists = 1; int64 last_change_id = 2; @@ -50,7 +34,6 @@ message SetStateRequest { string last_wal_frame = 3; string cursor = 4; } - message SetStateResponse {} service Watcher { diff --git a/sqlite-watcher/src/lib.rs b/sqlite-watcher/src/lib.rs index e8ae652..b24ec66 100644 --- a/sqlite-watcher/src/lib.rs +++ b/sqlite-watcher/src/lib.rs @@ -1 +1,5 @@ pub mod queue; +pub mod server; +pub mod watcher_proto { + tonic::include_proto!("sqlitewatcher"); +} diff --git a/sqlite-watcher/src/main.rs b/sqlite-watcher/src/main.rs index 4b2f373..5159550 100644 --- a/sqlite-watcher/src/main.rs +++ b/sqlite-watcher/src/main.rs @@ -1,371 +1,209 @@ -use std::fmt; -use std::fs; -use std::net::SocketAddr; -use std::path::{Path, PathBuf}; -use std::str::FromStr; -use std::sync::mpsc; +use std::path::PathBuf; use std::time::Duration; -use anyhow::{anyhow, bail, Context, Result}; -use clap::Parser; -use sqlite_watcher::decoder::WalGrowthDecoder; -use sqlite_watcher::queue::ChangeQueue; -#[cfg(unix)] -use sqlite_watcher::server::spawn_unix_server; -use sqlite_watcher::server::{spawn_tcp_server, ServerHandle}; -use sqlite_watcher::wal::{start_wal_watcher, WalWatcherConfig as TailConfig}; -use tracing_subscriber::EnvFilter; +use anyhow::{bail, Context, Result}; +use clap::{Parser, Subcommand, ValueEnum}; +use sqlite_watcher::queue::{ChangeOperation, ChangeQueue, NewChange}; +use sqlite_watcher::server::{spawn_tcp, spawn_unix}; +use tokio::signal; -#[cfg(unix)] -const DEFAULT_LISTEN: &str = "unix:/tmp/sqlite-watcher.sock"; -#[cfg(not(unix))] -const DEFAULT_LISTEN: &str = "tcp:50051"; - -/// Command-line interface definition for sqlite-watcher. -#[derive(Debug, Clone, Parser)] -#[command( - name = "sqlite-watcher", - version, - about = "Tails SQLite WAL files and exposes change streams.", - long_about = None -)] +#[derive(Parser)] +#[command(name = "sqlite-watcher")] +#[command(about = "sqlite watcher utility (alpha)")] struct Cli { - /// Path to the SQLite database the watcher should monitor. - #[arg(long = "db", value_name = "PATH")] - db_path: PathBuf, - - /// Listener binding. Accepts unix:/path, tcp:, or pipe:. - #[arg(long, value_name = "ENDPOINT", default_value = DEFAULT_LISTEN)] - listen: String, - - /// Shared-secret token file used to authenticate RPC clients. - #[arg(long = "token-file", value_name = "PATH")] - token_file: Option, - - /// Path to the durable change queue database. - #[arg(long = "queue-db", value_name = "PATH")] - queue_db: Option, - - /// Tracing filter (info,warn,debug,trace). Can also be provided via SQLITE_WATCHER_LOG. - #[arg( - long = "log-level", - value_name = "FILTER", - default_value = "info", - env = "SQLITE_WATCHER_LOG" - )] - log_filter: String, - - /// Interval in milliseconds between WAL file polls. - #[arg( - long = "poll-interval-ms", - default_value_t = 500, - value_parser = clap::value_parser!(u64).range(50..=60_000) - )] - poll_interval_ms: u64, - - /// Minimum WAL byte growth required before emitting an event. - #[arg( - long = "min-event-bytes", - default_value_t = 1, - value_parser = clap::value_parser!(u64).range(1..=10_000_000) - )] - min_event_bytes: u64, -} - -#[derive(Debug, Clone, PartialEq, Eq)] -enum ListenAddress { - Unix(PathBuf), - Tcp { host: String, port: u16 }, - Pipe(String), -} - -impl fmt::Display for ListenAddress { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ListenAddress::Unix(path) => write!(f, "unix:{}", path.display()), - ListenAddress::Tcp { host, port } => write!(f, "tcp:{}:{}", host, port), - ListenAddress::Pipe(name) => write!(f, "pipe:{}", name), + #[command(subcommand)] + command: Command, +} + +#[derive(Subcommand)] +enum Command { + /// Run the gRPC server exposing the change queue + Serve { + /// SQLite queue database path + #[arg(long = "queue-db")] + queue_db: Option, + /// gRPC listener (unix:/path or tcp:host:port) + #[arg(long = "listen", default_value = "unix:/tmp/sqlite-watcher.sock")] + listen: String, + /// Shared-secret token file (defaults to ~/.seren/sqlite-watcher/token) + #[arg(long = "token-file")] + token_file: Option, + }, + /// Enqueue a test change into the queue database + Enqueue { + #[arg(long = "queue-db")] + queue_db: Option, + #[arg(long = "table", default_value = "demo")] + table: String, + #[arg(long = "id", default_value = "smoke-test")] + id: String, + #[arg(long = "payload", default_value = r#"{""message"":""hello""}"#)] + payload: String, + #[arg(long = "op", value_enum, default_value = "insert")] + op: ChangeOp, + }, +} + +#[derive(Clone, Copy, ValueEnum)] +enum ChangeOp { + Insert, + Update, + Delete, +} + +impl From for ChangeOperation { + fn from(value: ChangeOp) -> Self { + match value { + ChangeOp::Insert => ChangeOperation::Insert, + ChangeOp::Update => ChangeOperation::Update, + ChangeOp::Delete => ChangeOperation::Delete, } } } -impl FromStr for ListenAddress { - type Err = anyhow::Error; - - fn from_str(value: &str) -> Result { - if let Some(path) = value.strip_prefix("unix:") { - if cfg!(windows) { - bail!("unix sockets are not supported on Windows"); - } - if path.is_empty() { - bail!("unix listen path cannot be empty"); - } - return Ok(ListenAddress::Unix(PathBuf::from(path))); - } - - if let Some(port) = value.strip_prefix("tcp:") { - let port: u16 = port - .parse() - .map_err(|_| anyhow!("tcp listener must specify a numeric port"))?; - return Ok(ListenAddress::Tcp { - host: "127.0.0.1".to_string(), - port, - }); - } - - if let Some(name) = value.strip_prefix("pipe:") { - if cfg!(not(windows)) { - bail!("named pipes are only valid on Windows"); - } - if name.is_empty() { - bail!("pipe name cannot be empty"); - } - return Ok(ListenAddress::Pipe(name.to_string())); - } - - bail!("listen endpoint must start with unix:/, tcp:, or pipe:"); - } -} - -#[derive(Debug, Clone)] -struct WatcherConfig { - database_path: PathBuf, - listen: ListenAddress, - token_file: PathBuf, - queue_path: PathBuf, - poll_interval: Duration, - min_event_bytes: u64, -} - -impl TryFrom for WatcherConfig { - type Error = anyhow::Error; - - fn try_from(args: Cli) -> Result { - let database_path = ensure_sqlite_file(&args.db_path)?; - let listen = ListenAddress::from_str(args.listen.trim())?; - let token_file = match args.token_file { - Some(path) => expand_home(path)?, - None => default_token_path()?, - }; - let queue_path = match args.queue_db { - Some(path) => expand_home(path)?, - None => default_queue_path()?, - }; - - Ok(Self { - database_path, +#[tokio::main] +async fn main() -> Result<()> { + let cli = Cli::parse(); + match cli.command { + Command::Serve { + queue_db, listen, token_file, - queue_path, - poll_interval: Duration::from_millis(args.poll_interval_ms), - min_event_bytes: args.min_event_bytes, - }) - } -} - -fn ensure_sqlite_file(path: &Path) -> Result { - if !path.exists() { - bail!("database path {} does not exist", path.display()); - } - if !path.is_file() { - bail!("database path {} is not a file", path.display()); + } => serve(queue_db, &listen, token_file).await, + Command::Enqueue { + queue_db, + table, + id, + payload, + op, + } => enqueue(queue_db, &table, &id, &payload, op.into()), } - Ok(path - .canonicalize() - .with_context(|| format!("failed to canonicalize {}", path.display()))?) } -fn default_token_path() -> Result { - let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?; - Ok(home.join(".seren/sqlite-watcher/token")) +async fn serve(queue_db: Option, listen: &str, token_file: Option) -> Result<()> { + let queue_path = resolve_queue_path(queue_db)?; + let token_path = resolve_token_path(token_file)?; + let token = std::fs::read_to_string(&token_path) + .with_context(|| format!("failed to read token file {}", token_path.display()))?; + let queue = ChangeQueue::open(&queue_path)?; + let endpoint = WatcherEndpoint::parse(listen)?; + println!( + "sqlite-watcher serving {listen} using queue {}", + queue.path().display() + ); + let handle = match endpoint { + WatcherEndpoint::Tcp { host, port } => { + let addr = format!("{}:{}", host, port) + .parse() + .context("invalid tcp address")?; + spawn_tcp(addr, queue.path().to_path_buf(), token)? + } + WatcherEndpoint::Unix(path) => spawn_unix(&path, queue.path().to_path_buf(), token)?, + WatcherEndpoint::Pipe(name) => bail!("named pipes are not yet supported ({name})"), + }; + println!("Press Ctrl+C to stop sqlite-watcher"); + let ctrl_c = signal::ctrl_c(); + tokio::pin!(ctrl_c); + let _ = tokio::time::timeout(Duration::MAX, &mut ctrl_c).await; + drop(handle); + Ok(()) } -fn default_queue_path() -> Result { - let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?; - Ok(home.join(".seren/sqlite-watcher/changes.db")) +fn enqueue( + queue_db: Option, + table: &str, + id: &str, + payload: &str, + op: ChangeOperation, +) -> Result<()> { + let queue_path = resolve_queue_path(queue_db)?; + let queue = ChangeQueue::open(&queue_path)?; + let bytes = payload.as_bytes().to_vec(); + queue.enqueue(&NewChange { + table_name: table.to_string(), + operation: op, + primary_key: id.to_string(), + payload: Some(bytes), + wal_frame: None, + cursor: None, + })?; + println!( + "Enqueued row id '{}' for table '{}' into {}", + id, + table, + queue.path().display() + ); + Ok(()) } -fn expand_home(path: PathBuf) -> Result { - let as_str = path.to_string_lossy(); - if let Some(stripped) = as_str.strip_prefix("~/") { - let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?; - return Ok(home.join(stripped)); - } - if as_str == "~" { - let home = dirs::home_dir().ok_or_else(|| anyhow!("unable to determine home directory"))?; - return Ok(home); +fn resolve_queue_path(path: Option) -> Result { + match path { + Some(p) => Ok(expand_path(p)?), + None => { + let mut default = dirs::home_dir() + .ok_or_else(|| anyhow::anyhow!("Unable to resolve home directory"))?; + default.push(".seren/sqlite-watcher/changes.db"); + Ok(default) + } } - Ok(path) -} - -fn init_tracing(filter: &str) -> Result<()> { - let env_filter = EnvFilter::try_new(filter).or_else(|_| EnvFilter::try_new("info"))?; - tracing_subscriber::fmt() - .with_env_filter(env_filter) - .with_target(false) - .try_init() - .map_err(|err| anyhow!("failed to init tracing subscriber: {err}")) } -fn main() -> Result<()> { - let cli = Cli::parse(); - init_tracing(&cli.log_filter)?; - let config = WatcherConfig::try_from(cli)?; - let auth_token = read_token_file(&config.token_file)?; - - tracing::info!( - db = %config.database_path.display(), - listen = %config.listen, - token = %config.token_file.display(), - queue = %config.queue_path.display(), - poll_ms = config.poll_interval.as_millis(), - min_event_bytes = config.min_event_bytes, - "sqlite-watcher starting" - ); - - let queue = ChangeQueue::open(&config.queue_path)?; - let decoder = WalGrowthDecoder::default(); - let server_handle = start_grpc_server(&config.listen, &config.queue_path, &auth_token)?; - let (event_tx, event_rx) = mpsc::channel(); - let _wal_handle = start_wal_watcher( - &config.database_path, - TailConfig { - poll_interval: config.poll_interval, - min_event_bytes: config.min_event_bytes, - }, - event_tx, - )?; - - for event in event_rx { - match process_wal_event(&decoder, &queue, &event) { - Ok(change_ids) if !change_ids.is_empty() => { - tracing::info!( - bytes_added = event.bytes_added, - wal_size = event.current_size, - change_count = change_ids.len(), - "queued wal growth event" - ); - } - Err(err) => { - tracing::warn!(error = %err, "failed to persist wal event to queue"); - } - _ => {} +fn resolve_token_path(path: Option) -> Result { + match path { + Some(p) => Ok(expand_path(p)?), + None => { + let mut default = dirs::home_dir() + .ok_or_else(|| anyhow::anyhow!("Unable to resolve home directory"))?; + default.push(".seren/sqlite-watcher/token"); + Ok(default) } } - - drop(server_handle); - Ok(()) } -fn process_wal_event( - decoder: &WalGrowthDecoder, - queue: &ChangeQueue, - event: &sqlite_watcher::wal::WalEvent, -) -> Result> { - let mut ids = Vec::new(); - for row in decoder.decode(event) { - ids.push(queue.enqueue(&row.into_new_change())?); +fn expand_path(p: PathBuf) -> Result { + let s = p.to_string_lossy(); + if let Some(rest) = s.strip_prefix("~/") { + let mut home = + dirs::home_dir().ok_or_else(|| anyhow::anyhow!("Unable to resolve home directory"))?; + home.push(rest); + Ok(home) + } else { + Ok(p) } - Ok(ids) } -fn read_token_file(path: &Path) -> Result { - let contents = fs::read_to_string(path) - .with_context(|| format!("failed to read token file {}", path.display()))?; - let token = contents.trim().to_string(); - if token.is_empty() { - bail!("token file {} is empty", path.display()); - } - Ok(token) +enum WatcherEndpoint { + Tcp { host: String, port: u16 }, + Unix(PathBuf), + Pipe(String), } -fn start_grpc_server( - listen: &ListenAddress, - queue_path: &Path, - token: &str, -) -> Result> { - match listen { - ListenAddress::Tcp { host, port } => { - let addr: SocketAddr = format!("{}:{}", host, port) - .parse() - .with_context(|| format!("invalid tcp listen address {host}:{port}"))?; - let handle = spawn_tcp_server(addr, queue_path.to_path_buf(), token.to_string())?; - Ok(Some(handle)) - } - ListenAddress::Unix(path) => { - #[cfg(unix)] - { - let handle = spawn_unix_server(path, queue_path.to_path_buf(), token.to_string())?; - Ok(Some(handle)) - } - #[cfg(not(unix))] - { - bail!("unix sockets are not supported on this platform") +impl WatcherEndpoint { + fn parse(value: &str) -> Result { + if let Some(rest) = value.strip_prefix("unix:") { + if rest.is_empty() { + bail!("unix endpoint requires a path"); } + return Ok(WatcherEndpoint::Unix(PathBuf::from(rest))); } - ListenAddress::Pipe(name) => { - tracing::warn!(pipe = name, "named pipe transport is not yet implemented"); - Ok(None) + if let Some(rest) = value.strip_prefix("tcp:") { + let mut parts = rest.split(':'); + let host = parts + .next() + .ok_or_else(|| anyhow::anyhow!("tcp endpoint missing host"))?; + let port = parts + .next() + .ok_or_else(|| anyhow::anyhow!("tcp endpoint missing port"))? + .parse::() + .context("invalid tcp port")?; + return Ok(WatcherEndpoint::Tcp { + host: host.to_string(), + port, + }); } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use clap::Parser; - use sqlite_watcher::queue::ChangeQueue; - use tempfile::{tempdir, NamedTempFile}; - - #[test] - fn parses_tcp_listener() { - let tmp = NamedTempFile::new().unwrap(); - let cli = Cli::try_parse_from([ - "sqlite-watcher", - "--db", - tmp.path().to_str().unwrap(), - "--listen", - "tcp:6000", - "--token-file", - "./token", - "--log-level", - "debug", - ]) - .expect("cli parsing failed"); - - let config = WatcherConfig::try_from(cli).expect("config conversion failed"); - assert!(matches!( - config.listen, - ListenAddress::Tcp { host, port } if host == "127.0.0.1" && port == 6000 - )); - assert!(config.token_file.ends_with("token")); - assert!(config.queue_path.ends_with("changes.db")); - } - - #[test] - #[cfg(unix)] - fn parses_unix_listener_default() { - let tmp = NamedTempFile::new().unwrap(); - let cli = - Cli::try_parse_from(["sqlite-watcher", "--db", tmp.path().to_str().unwrap()]).unwrap(); - let config = WatcherConfig::try_from(cli).unwrap(); - assert!(matches!(config.listen, ListenAddress::Unix(_))); - } - - #[test] - fn persist_wal_events_into_queue() { - let dir = tempdir().unwrap(); - let queue_path = dir.path().join("queue.db"); - let queue = ChangeQueue::open(&queue_path).unwrap(); - let decoder = WalGrowthDecoder::default(); - - let event = sqlite_watcher::wal::WalEvent { - bytes_added: 2048, - current_size: 4096, - }; - let change_ids = process_wal_event(&decoder, &queue, &event).unwrap(); - let batch = queue.fetch_batch(10).unwrap(); - assert_eq!(batch.len(), change_ids.len()); - assert_eq!(batch[0].table_name, "__wal__"); + if let Some(rest) = value.strip_prefix("pipe:") { + return Ok(WatcherEndpoint::Pipe(rest.to_string())); + } + bail!("unsupported listener endpoint: {value}"); } } diff --git a/sqlite-watcher/src/queue.rs b/sqlite-watcher/src/queue.rs index 8a9b0b7..e0b8305 100644 --- a/sqlite-watcher/src/queue.rs +++ b/sqlite-watcher/src/queue.rs @@ -3,7 +3,6 @@ use std::path::{Path, PathBuf}; use anyhow::{anyhow, Context, Result}; use rusqlite::{params, Connection, OptionalExtension, Row}; -use serde::Serialize; const SCHEMA: &str = r#" CREATE TABLE IF NOT EXISTS changes ( @@ -27,7 +26,7 @@ CREATE TABLE IF NOT EXISTS state ( ); "#; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ChangeOperation { Insert, Update, @@ -43,7 +42,7 @@ impl ChangeOperation { } } - fn from_str(value: &str) -> Result { + pub fn from_str(value: &str) -> Result { match value { "insert" => Ok(ChangeOperation::Insert), "update" => Ok(ChangeOperation::Update), @@ -109,10 +108,6 @@ impl ChangeQueue { }) } - pub fn path(&self) -> &Path { - &self.path - } - pub fn enqueue(&self, change: &NewChange) -> Result { self.conn.execute( "INSERT INTO changes(table_name, op, id, payload, wal_frame, cursor) @@ -193,6 +188,10 @@ impl ChangeQueue { )?; Ok(()) } + + pub fn path(&self) -> &Path { + &self.path + } } fn row_to_change(row: &Row<'_>) -> Result { diff --git a/sqlite-watcher/src/server.rs b/sqlite-watcher/src/server.rs index d41004f..782c5bf 100644 --- a/sqlite-watcher/src/server.rs +++ b/sqlite-watcher/src/server.rs @@ -8,7 +8,13 @@ use tokio::runtime::Builder; use tokio::sync::oneshot; use tokio_stream::wrappers::TcpListenerStream; use tonic::service::Interceptor; -use tonic::{transport::Server, Request, Response, Status}; +use tonic::transport::Server; +use tonic::{Request, Response, Status}; + +#[cfg(unix)] +use tokio::net::UnixListener; +#[cfg(unix)] +use tokio_stream::wrappers::UnixListenerStream; use crate::queue::{ChangeQueue, QueueState}; use crate::watcher_proto::watcher_server::{Watcher, WatcherServer}; @@ -18,117 +24,118 @@ use crate::watcher_proto::{ SetStateRequest, SetStateResponse, }; -#[cfg(unix)] -use tokio::net::UnixListener; -#[cfg(unix)] -use tokio_stream::wrappers::UnixListenerStream; - -pub struct ServerHandle { - shutdown: Option>, - thread: Option>>, +pub enum ServerHandle { + Tcp { + shutdown: Option>, + thread: Option>>, + }, #[cfg(unix)] - unix_path: Option, + Unix { + shutdown: Option>, + thread: Option>>, + path: PathBuf, + }, } impl Drop for ServerHandle { fn drop(&mut self) { - if let Some(tx) = self.shutdown.take() { - let _ = tx.send(()); - } - if let Some(handle) = self.thread.take() { - let _ = handle.join(); - } - #[cfg(unix)] - if let Some(path) = self.unix_path.take() { - let _ = std::fs::remove_file(path); + match self { + ServerHandle::Tcp { shutdown, thread } => { + if let Some(tx) = shutdown.take() { + let _ = tx.send(()); + } + if let Some(handle) = thread.take() { + let _ = handle.join(); + } + } + #[cfg(unix)] + ServerHandle::Unix { + shutdown, + thread, + path, + } => { + if let Some(tx) = shutdown.take() { + let _ = tx.send(()); + } + if let Some(handle) = thread.take() { + let _ = handle.join(); + } + let _ = std::fs::remove_file(path); + } } } } -pub fn spawn_tcp_server( - addr: SocketAddr, - queue_path: PathBuf, - token: String, -) -> Result { +pub fn spawn_tcp(addr: SocketAddr, queue_path: PathBuf, token: String) -> Result { let (shutdown_tx, shutdown_rx) = oneshot::channel(); let thread = thread::spawn(move || -> Result<()> { - let runtime = Builder::new_multi_thread() - .enable_all() - .build() - .context("failed to build tokio runtime")?; + let runtime = Builder::new_multi_thread().enable_all().build()?; runtime.block_on(async move { let listener = tokio::net::TcpListener::bind(addr) .await .context("failed to bind tcp listener")?; let queue_path = Arc::new(queue_path); let svc = WatcherService::new(queue_path); - let interceptor = AuthInterceptor { - token: Arc::new(token), - }; + let interceptor = AuthInterceptor::new(token); Server::builder() .add_service(WatcherServer::with_interceptor(svc, interceptor)) .serve_with_incoming_shutdown(TcpListenerStream::new(listener), async move { let _ = shutdown_rx.await; }) .await - .context("grpc server exited with error")?; - Ok::<(), anyhow::Error>(()) - })?; - Ok(()) + .context("grpc server exited") + }) }); - Ok(ServerHandle { + Ok(ServerHandle::Tcp { shutdown: Some(shutdown_tx), thread: Some(thread), - #[cfg(unix)] - unix_path: None, }) } #[cfg(unix)] -pub fn spawn_unix_server(path: &Path, queue_path: PathBuf, token: String) -> Result { +pub fn spawn_unix(path: &Path, queue_path: PathBuf, token: String) -> Result { if path.exists() { std::fs::remove_file(path) .with_context(|| format!("failed to remove stale unix socket {}", path.display()))?; } if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent) - .with_context(|| format!("failed to create unix socket dir {}", parent.display()))?; + std::fs::create_dir_all(parent).with_context(|| { + format!( + "failed to create unix socket directory {}", + parent.display() + ) + })?; } let path_buf = path.to_path_buf(); + let listener_path = path_buf.clone(); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let path_for_drop = path_buf.clone(); let thread = thread::spawn(move || -> Result<()> { - let runtime = Builder::new_multi_thread() - .enable_all() - .build() - .context("failed to build tokio runtime")?; + let runtime = Builder::new_multi_thread().enable_all().build()?; runtime.block_on(async move { - let listener = UnixListener::bind(&path_buf).context("failed to bind unix socket")?; + let listener = + UnixListener::bind(&listener_path).context("failed to bind unix socket")?; let queue_path = Arc::new(queue_path); let svc = WatcherService::new(queue_path); - let interceptor = AuthInterceptor { - token: Arc::new(token), - }; + let interceptor = AuthInterceptor::new(token); Server::builder() .add_service(WatcherServer::with_interceptor(svc, interceptor)) .serve_with_incoming_shutdown(UnixListenerStream::new(listener), async move { let _ = shutdown_rx.await; }) .await - .context("grpc server exited with error")?; - Ok::<(), anyhow::Error>(()) - })?; - Ok(()) + .context("grpc server exited") + }) }); - Ok(ServerHandle { + Ok(ServerHandle::Unix { shutdown: Some(shutdown_tx), thread: Some(thread), - unix_path: Some(path_for_drop), + path: path_buf, }) } +#[derive(Clone)] struct WatcherService { queue_path: Arc, } @@ -138,7 +145,7 @@ impl WatcherService { Self { queue_path } } - fn open_queue(&self) -> Result { + fn queue(&self) -> Result { ChangeQueue::open(&*self.queue_path) } } @@ -148,19 +155,23 @@ struct AuthInterceptor { token: Arc, } +impl AuthInterceptor { + fn new(token: String) -> Self { + Self { + token: Arc::new(token), + } + } +} + impl Interceptor for AuthInterceptor { - fn call(&mut self, request: Request<()>) -> Result, Status> { - let header = request + fn call(&mut self, req: Request<()>) -> Result, Status> { + let provided = req .metadata() .get("authorization") .ok_or_else(|| Status::unauthenticated("missing authorization header"))?; - let expected = format!("Bearer {}", self.token.as_ref()); - if header - .to_str() - .map(|value| value == expected) - .unwrap_or(false) - { - Ok(request) + let expected = format!("Bearer {}", self.token.as_str()); + if provided == expected.as_str() { + Ok(req) } else { Err(Status::unauthenticated("invalid authorization header")) } @@ -183,12 +194,8 @@ impl Watcher for WatcherService { request: Request, ) -> Result, Status> { let limit = request.get_ref().limit.max(1).min(10_000) as usize; - let queue = self - .open_queue() - .map_err(|err| Status::internal(err.to_string()))?; - let rows = queue - .fetch_batch(limit) - .map_err(|err| Status::internal(err.to_string()))?; + let queue = self.queue().map_err(internal_err)?; + let rows = queue.fetch_batch(limit).map_err(internal_err)?; let changes = rows.into_iter().map(change_to_proto).collect(); Ok(Response::new(ListChangesResponse { changes })) } @@ -198,12 +205,9 @@ impl Watcher for WatcherService { request: Request, ) -> Result, Status> { let upto = request.get_ref().up_to_change_id; - let queue = self - .open_queue() - .map_err(|err| Status::internal(err.to_string()))?; - let count = queue - .ack_up_to(upto) - .map_err(|err| Status::internal(err.to_string()))?; + let queue = self.queue().map_err(internal_err)?; + let count = queue.ack_up_to(upto).map_err(internal_err)?; + queue.purge_acked().ok(); Ok(Response::new(AckChangesResponse { acknowledged: count, })) @@ -213,13 +217,10 @@ impl Watcher for WatcherService { &self, request: Request, ) -> Result, Status> { - let name = request.get_ref().table_name.clone(); - let queue = self - .open_queue() - .map_err(|err| Status::internal(err.to_string()))?; + let queue = self.queue().map_err(internal_err)?; let state = queue - .get_state(&name) - .map_err(|err| Status::internal(err.to_string()))?; + .get_state(&request.get_ref().table_name) + .map_err(internal_err)?; let resp = match state { Some(state) => GetStateResponse { exists: true, @@ -245,9 +246,7 @@ impl Watcher for WatcherService { if payload.table_name.is_empty() { return Err(Status::invalid_argument("table_name is required")); } - let queue = self - .open_queue() - .map_err(|err| Status::internal(err.to_string()))?; + let queue = self.queue().map_err(internal_err)?; let state = QueueState { table_name: payload.table_name, last_change_id: payload.last_change_id, @@ -262,9 +261,7 @@ impl Watcher for WatcherService { Some(payload.cursor) }, }; - queue - .set_state(&state) - .map_err(|err| Status::internal(err.to_string()))?; + queue.set_state(&state).map_err(internal_err)?; Ok(Response::new(SetStateResponse {})) } } @@ -280,3 +277,7 @@ fn change_to_proto(row: crate::queue::ChangeRecord) -> Change { cursor: row.cursor.unwrap_or_default(), } } + +fn internal_err(err: anyhow::Error) -> Status { + Status::internal(err.to_string()) +} diff --git a/third-party/fxhash/Cargo.toml b/third-party/fxhash/Cargo.toml new file mode 100644 index 0000000..60212b9 --- /dev/null +++ b/third-party/fxhash/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "fxhash" +version = "0.2.1" +edition = "2018" +description = "Local replacement for fxhash using rustc-hash" +license = "CC0-1.0" +publish = false + +[dependencies] +rustc-hash = "1.1" diff --git a/third-party/fxhash/src/lib.rs b/third-party/fxhash/src/lib.rs new file mode 100644 index 0000000..cab4b9d --- /dev/null +++ b/third-party/fxhash/src/lib.rs @@ -0,0 +1,40 @@ +use std::collections::{HashMap, HashSet}; +use std::fmt; +use std::hash::{BuildHasherDefault, Hasher}; + +use rustc_hash::FxHasher as InnerFxHasher; + +pub struct FxHasher(InnerFxHasher); + +impl Default for FxHasher { + fn default() -> Self { + Self(InnerFxHasher::default()) + } +} + +impl Clone for FxHasher { + fn clone(&self) -> Self { + // FxHasher does not implement Clone; start a new hasher + Self::default() + } +} + +impl fmt::Debug for FxHasher { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("FxHasher").finish() + } +} + +impl Hasher for FxHasher { + fn finish(&self) -> u64 { + self.0.finish() + } + + fn write(&mut self, bytes: &[u8]) { + self.0.write(bytes) + } +} + +pub type FxBuildHasher = BuildHasherDefault; +pub type FxHashMap = HashMap; +pub type FxHashSet = HashSet;