Skip to content

Commit b3b74c5

Browse files
committed
[sqlite-watcher] add queue + grpc server scaffold
1 parent 8566627 commit b3b74c5

10 files changed

Lines changed: 1026 additions & 7 deletions

File tree

Cargo.lock

Lines changed: 306 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
[workspace]
2+
resolver = "2"
3+
members = [
4+
".",
5+
"sqlite-watcher",
6+
]
7+
18
[package]
29
name = "database-replicator"
310
version = "7.0.14"

sqlite-watcher/Cargo.toml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
[package]
2+
name = "sqlite-watcher"
3+
version = "0.1.0"
4+
edition = "2021"
5+
authors = ["SerenAI <eng@serendb.com>"]
6+
description = "SQLite watcher components (queue + gRPC)."
7+
license = "Apache-2.0"
8+
repository = "https://github.com/serenorg/database-replicator"
9+
build = "build.rs"
10+
11+
[build-dependencies]
12+
tonic-build = "0.11"
13+
14+
[dependencies]
15+
anyhow = "1.0"
16+
clap = { version = "4.4", features = ["derive", "env"] }
17+
dirs = "5.0"
18+
rusqlite = { version = "0.30", features = ["chrono"] }
19+
serde = { version = "1.0", features = ["derive"] }
20+
serde_json = "1.0"
21+
thiserror = "1.0"
22+
base64 = "0.21"
23+
tokio = { version = "1.35", features = ["rt-multi-thread", "macros", "signal", "fs"] }
24+
tonic = { version = "0.11", features = ["transport"] }
25+
tokio-stream = { version = "0.1", features = ["net"] }
26+
prost = "0.12"
27+
28+
[dev-dependencies]
29+
tempfile = "3.8"
30+
tokio = { version = "1.35", features = ["rt", "macros"] }
31+
tonic = { version = "0.11", features = ["transport"] }
32+
tower = "0.4"

sqlite-watcher/README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# sqlite-watcher
2+
3+
This crate provides the building blocks for an upcoming sqlite-watcher binary. Issue #82 adds a durable queue plus a tonic-based gRPC server so other components can stream captured SQLite changes.
4+
5+
## Components
6+
7+
- `queue.rs`: stores change rows and per-table checkpoints in `~/.seren/sqlite-watcher/changes.db`.
8+
- `proto/watcher.proto`: RPC definitions (`HealthCheck`, `ListChanges`, `AckChanges`, `GetState`, `SetState`).
9+
- `server.rs`: tonic server wrappers exposing the queue over TCP or Unix sockets with shared-secret authentication.
10+
11+
## Building & Testing
12+
13+
```bash
14+
cargo test -p sqlite-watcher
15+
```
16+
17+
The tests cover queue durability/state behavior. Server tests will be added once the consumer wiring lands.

sqlite-watcher/build.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
fn main() -> Result<(), Box<dyn std::error::Error>> {
2+
tonic_build::configure()
3+
.build_client(true)
4+
.build_server(true)
5+
.compile(&["proto/watcher.proto"], &["proto"])?;
6+
println!("cargo:rerun-if-changed=proto/watcher.proto");
7+
Ok(())
8+
}

sqlite-watcher/proto/watcher.proto

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
syntax = "proto3";
2+
3+
package sqlitewatcher;
4+
5+
message HealthCheckRequest {}
6+
message HealthCheckResponse { string status = 1; }
7+
8+
message ListChangesRequest { uint32 limit = 1; }
9+
message Change {
10+
int64 change_id = 1;
11+
string table_name = 2;
12+
string op = 3;
13+
string primary_key = 4;
14+
bytes payload = 5;
15+
string wal_frame = 6;
16+
string cursor = 7;
17+
}
18+
message ListChangesResponse { repeated Change changes = 1; }
19+
20+
message AckChangesRequest { int64 up_to_change_id = 1; }
21+
message AckChangesResponse { uint64 acknowledged = 1; }
22+
23+
message GetStateRequest { string table_name = 1; }
24+
message GetStateResponse {
25+
bool exists = 1;
26+
int64 last_change_id = 2;
27+
string last_wal_frame = 3;
28+
string cursor = 4;
29+
}
30+
31+
message SetStateRequest {
32+
string table_name = 1;
33+
int64 last_change_id = 2;
34+
string last_wal_frame = 3;
35+
string cursor = 4;
36+
}
37+
message SetStateResponse {}
38+
39+
service Watcher {
40+
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
41+
rpc ListChanges(ListChangesRequest) returns (ListChangesResponse);
42+
rpc AckChanges(AckChangesRequest) returns (AckChangesResponse);
43+
rpc GetState(GetStateRequest) returns (GetStateResponse);
44+
rpc SetState(SetStateRequest) returns (SetStateResponse);
45+
}

sqlite-watcher/src/lib.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pub mod queue;
2+
pub mod server;
3+
pub mod watcher_proto {
4+
tonic::include_proto!("sqlitewatcher");
5+
}

sqlite-watcher/src/queue.rs

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
use std::fs;
2+
use std::path::{Path, PathBuf};
3+
4+
use anyhow::{anyhow, Context, Result};
5+
use rusqlite::{params, Connection, OptionalExtension, Row};
6+
7+
const SCHEMA: &str = r#"
8+
CREATE TABLE IF NOT EXISTS changes (
9+
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
10+
table_name TEXT NOT NULL,
11+
op TEXT NOT NULL,
12+
id TEXT NOT NULL,
13+
payload BLOB,
14+
wal_frame TEXT,
15+
cursor TEXT,
16+
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
17+
acked INTEGER NOT NULL DEFAULT 0
18+
);
19+
20+
CREATE TABLE IF NOT EXISTS state (
21+
table_name TEXT PRIMARY KEY,
22+
last_change_id INTEGER NOT NULL DEFAULT 0,
23+
last_wal_frame TEXT,
24+
cursor TEXT,
25+
updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
26+
);
27+
"#;
28+
29+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30+
pub enum ChangeOperation {
31+
Insert,
32+
Update,
33+
Delete,
34+
}
35+
36+
impl ChangeOperation {
37+
pub fn as_str(&self) -> &'static str {
38+
match self {
39+
ChangeOperation::Insert => "insert",
40+
ChangeOperation::Update => "update",
41+
ChangeOperation::Delete => "delete",
42+
}
43+
}
44+
45+
fn from_str(value: &str) -> Result<Self> {
46+
match value {
47+
"insert" => Ok(ChangeOperation::Insert),
48+
"update" => Ok(ChangeOperation::Update),
49+
"delete" => Ok(ChangeOperation::Delete),
50+
other => Err(anyhow!("unknown change operation '{other}'")),
51+
}
52+
}
53+
}
54+
55+
#[derive(Debug, Clone, PartialEq, Eq)]
56+
pub struct NewChange {
57+
pub table_name: String,
58+
pub operation: ChangeOperation,
59+
pub primary_key: String,
60+
pub payload: Option<Vec<u8>>,
61+
pub wal_frame: Option<String>,
62+
pub cursor: Option<String>,
63+
}
64+
65+
#[derive(Debug, Clone, PartialEq, Eq)]
66+
pub struct ChangeRecord {
67+
pub change_id: i64,
68+
pub table_name: String,
69+
pub operation: ChangeOperation,
70+
pub primary_key: String,
71+
pub payload: Option<Vec<u8>>,
72+
pub wal_frame: Option<String>,
73+
pub cursor: Option<String>,
74+
}
75+
76+
#[derive(Debug, Clone, PartialEq, Eq)]
77+
pub struct QueueState {
78+
pub table_name: String,
79+
pub last_change_id: i64,
80+
pub last_wal_frame: Option<String>,
81+
pub cursor: Option<String>,
82+
}
83+
84+
pub struct ChangeQueue {
85+
path: PathBuf,
86+
conn: Connection,
87+
}
88+
89+
impl ChangeQueue {
90+
pub fn open(path: impl AsRef<Path>) -> Result<Self> {
91+
let path = path.as_ref();
92+
if let Some(parent) = path.parent() {
93+
fs::create_dir_all(parent).with_context(|| {
94+
format!("failed to create queue directory {}", parent.display())
95+
})?;
96+
#[cfg(unix)]
97+
enforce_dir_perms(parent)?;
98+
}
99+
let conn = Connection::open(path)
100+
.with_context(|| format!("failed to open queue database {}", path.display()))?;
101+
conn.pragma_update(None, "journal_mode", &"wal").ok();
102+
conn.pragma_update(None, "synchronous", &"normal").ok();
103+
conn.execute_batch(SCHEMA)
104+
.context("failed to initialize change queue schema")?;
105+
Ok(Self {
106+
path: path.to_path_buf(),
107+
conn,
108+
})
109+
}
110+
111+
pub fn enqueue(&self, change: &NewChange) -> Result<i64> {
112+
self.conn.execute(
113+
"INSERT INTO changes(table_name, op, id, payload, wal_frame, cursor)
114+
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
115+
params![
116+
change.table_name,
117+
change.operation.as_str(),
118+
change.primary_key,
119+
change.payload,
120+
change.wal_frame,
121+
change.cursor,
122+
],
123+
)?;
124+
Ok(self.conn.last_insert_rowid())
125+
}
126+
127+
pub fn fetch_batch(&self, limit: usize) -> Result<Vec<ChangeRecord>> {
128+
let mut stmt = self.conn.prepare(
129+
"SELECT change_id, table_name, op, id, payload, wal_frame, cursor
130+
FROM changes WHERE acked = 0 ORDER BY change_id ASC LIMIT ?1",
131+
)?;
132+
let mut rows = stmt.query([limit as i64])?;
133+
let mut results = Vec::new();
134+
while let Some(row) = rows.next()? {
135+
results.push(row_to_change(&row)?);
136+
}
137+
Ok(results)
138+
}
139+
140+
pub fn ack_up_to(&self, change_id: i64) -> Result<u64> {
141+
let updated = self.conn.execute(
142+
"UPDATE changes SET acked = 1 WHERE change_id <= ?1",
143+
[change_id],
144+
)?;
145+
Ok(updated as u64)
146+
}
147+
148+
pub fn purge_acked(&self) -> Result<u64> {
149+
let deleted = self
150+
.conn
151+
.execute("DELETE FROM changes WHERE acked = 1", [])?;
152+
Ok(deleted as u64)
153+
}
154+
155+
pub fn get_state(&self, table: &str) -> Result<Option<QueueState>> {
156+
self.conn
157+
.prepare(
158+
"SELECT table_name, last_change_id, last_wal_frame, cursor
159+
FROM state WHERE table_name = ?1",
160+
)?
161+
.query_row([table], |row| {
162+
Ok(QueueState {
163+
table_name: row.get(0)?,
164+
last_change_id: row.get(1)?,
165+
last_wal_frame: row.get(2)?,
166+
cursor: row.get(3)?,
167+
})
168+
})
169+
.optional()
170+
.map_err(Into::into)
171+
}
172+
173+
pub fn set_state(&self, state: &QueueState) -> Result<()> {
174+
self.conn.execute(
175+
"INSERT INTO state(table_name, last_change_id, last_wal_frame, cursor, updated_at)
176+
VALUES (?1, ?2, ?3, ?4, CURRENT_TIMESTAMP)
177+
ON CONFLICT(table_name) DO UPDATE SET
178+
last_change_id = excluded.last_change_id,
179+
last_wal_frame = excluded.last_wal_frame,
180+
cursor = excluded.cursor,
181+
updated_at = CURRENT_TIMESTAMP",
182+
params![
183+
state.table_name,
184+
state.last_change_id,
185+
state.last_wal_frame,
186+
state.cursor,
187+
],
188+
)?;
189+
Ok(())
190+
}
191+
192+
pub fn path(&self) -> &Path {
193+
&self.path
194+
}
195+
}
196+
197+
fn row_to_change(row: &Row<'_>) -> Result<ChangeRecord> {
198+
let op_str: String = row.get(2)?;
199+
Ok(ChangeRecord {
200+
change_id: row.get(0)?,
201+
table_name: row.get(1)?,
202+
operation: ChangeOperation::from_str(&op_str)?,
203+
primary_key: row.get(3)?,
204+
payload: row.get(4)?,
205+
wal_frame: row.get(5)?,
206+
cursor: row.get(6)?,
207+
})
208+
}
209+
210+
#[cfg(unix)]
211+
fn enforce_dir_perms(path: &Path) -> Result<()> {
212+
use std::os::unix::fs::PermissionsExt;
213+
214+
let metadata = fs::metadata(path)?;
215+
let mut perms = metadata.permissions();
216+
perms.set_mode(0o700);
217+
fs::set_permissions(path, perms)?;
218+
Ok(())
219+
}
220+
221+
#[cfg(not(unix))]
222+
fn enforce_dir_perms(_path: &Path) -> Result<()> {
223+
Ok(())
224+
}

0 commit comments

Comments
 (0)