Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sqlite-watcher/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::configure()
.build_client(true)
.build_server(true)
.compile(&["proto/watcher.proto"], &["proto"])?;
println!("cargo:rerun-if-changed=proto/watcher.proto");
Ok(())
}
62 changes: 62 additions & 0 deletions sqlite-watcher/proto/watcher.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
syntax = "proto3";

package sqlitewatcher;

message HealthCheckRequest {}
message HealthCheckResponse {
string status = 1;
}

message ListChangesRequest {
uint32 limit = 1;
}

message Change {
int64 change_id = 1;
string table_name = 2;
string op = 3;
string primary_key = 4;
bytes payload = 5;
string wal_frame = 6;
string cursor = 7;
}

message ListChangesResponse {
repeated Change changes = 1;
}

message AckChangesRequest {
int64 up_to_change_id = 1;
}

message AckChangesResponse {
uint64 acknowledged = 1;
}

message GetStateRequest {
string table_name = 1;
}

message GetStateResponse {
bool exists = 1;
int64 last_change_id = 2;
string last_wal_frame = 3;
string cursor = 4;
}

message SetStateRequest {
string table_name = 1;
int64 last_change_id = 2;
string last_wal_frame = 3;
string cursor = 4;
}

message SetStateResponse {}

service Watcher {
rpc HealthCheck(HealthCheckRequest) returns (HealthCheckResponse);
rpc ListChanges(ListChangesRequest) returns (ListChangesResponse);
rpc AckChanges(AckChangesRequest) returns (AckChangesResponse);
rpc GetState(GetStateRequest) returns (GetStateResponse);
rpc SetState(SetStateRequest) returns (SetStateResponse);
}
49 changes: 49 additions & 0 deletions sqlite-watcher/src/change.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use serde_json::Value;

use crate::queue::{ChangeOperation, NewChange};

#[derive(Debug, Clone, PartialEq)]
pub struct RowChange {
pub table_name: String,
pub operation: ChangeOperation,
pub primary_key: String,
pub payload: Option<Value>,
pub wal_frame: Option<String>,
pub cursor: Option<String>,
}

impl RowChange {
pub fn into_new_change(self) -> NewChange {
let payload = self
.payload
.map(|value| serde_json::to_vec(&value).expect("row change payload serializes"));
NewChange {
table_name: self.table_name,
operation: self.operation,
primary_key: self.primary_key,
payload,
wal_frame: self.wal_frame,
cursor: self.cursor,
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn converts_to_new_change() {
let row = RowChange {
table_name: "prices".into(),
operation: ChangeOperation::Update,
primary_key: "pk1".into(),
payload: Some(serde_json::json!({"foo": "bar"})),
wal_frame: Some("frame-1".into()),
cursor: Some("cursor".into()),
};
let change = row.into_new_change();
assert_eq!(change.table_name, "prices");
assert!(change.payload.unwrap().contains(&b'b'));
}
}
48 changes: 48 additions & 0 deletions sqlite-watcher/src/decoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::change::RowChange;
use crate::queue::ChangeOperation;
use crate::wal::WalEvent;
use serde_json::json;
use std::time::{SystemTime, UNIX_EPOCH};

/// Temporary decoder that turns WAL growth bytes into placeholder RowChange events.
/// Placeholder until row-level decoding is implemented.
#[derive(Debug, Default, Clone)]
pub struct WalGrowthDecoder;

impl WalGrowthDecoder {
pub fn decode(&self, event: &WalEvent) -> Vec<RowChange> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("clock should be >= UNIX epoch");
vec![RowChange {
table_name: "__wal__".to_string(),
operation: ChangeOperation::Insert,
primary_key: now.as_nanos().to_string(),
payload: Some(json!({
"kind": "wal_growth",
"bytes_added": event.bytes_added,
"current_size": event.current_size,
"recorded_at": now.as_secs_f64(),
})),
wal_frame: None,
cursor: None,
}]
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn produces_placeholder_row_change() {
let decoder = WalGrowthDecoder::default();
let rows = decoder.decode(&WalEvent {
bytes_added: 1024,
current_size: 2048,
});
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].table_name, "__wal__");
assert_eq!(rows[0].operation, ChangeOperation::Insert);
}
}
Loading
Loading