Skip to content

Commit e2a5b37

Browse files
authored
Feature/xmin sync (#64)
* feat: Implement xmin-based sync module for incremental PostgreSQL replication Implements core xmin-based sync functionality: - XminReader: Reads changes from source using PostgreSQL xmin system column for efficient incremental change detection (Closes #55) - SyncState: Persists high-water mark xmin values per table to enable resumable incremental syncs (Part of #55) - ChangeWriter: Applies changes to target using INSERT ON CONFLICT DO UPDATE (upsert) pattern for efficient batched updates (Closes #56) - Reconciler: Detects and removes orphaned rows in target that no longer exist in source through primary key comparison (Closes #57) - Writer utilities: get_table_columns, get_primary_key_columns, row_to_values helpers for type-safe value extraction (Closes #58) - SyncDaemon: Orchestrates continuous sync cycles with configurable intervals and reconciliation scheduling (Closes #59) Features: - Batched operations with PostgreSQL parameter limit handling - Type-safe value conversion for common PostgreSQL types - URL sanitization to prevent credential leakage in state files - Comprehensive unit tests for all components Related to #60 (sync command integration) Related to #61 (integration tests) Related to #62 (documentation) * feat: Add xmin-sync CLI command for incremental PostgreSQL sync Adds a new 'xmin-sync' CLI command that provides an alternative to logical replication for continuous synchronization between PostgreSQL databases. CLI options: - --source: Source database URL - --target: Target database URL - --schema: Schema to sync (default: public) - --tables: Comma-separated list of tables to sync (default: all) - --interval: Sync interval in seconds (default: 60) - --reconcile-interval: Delete detection interval (default: 3600) - --batch-size: Rows per batch (default: 1000) - --state-file: Custom path for sync state persistence - --once: Run single sync cycle then exit (useful for cron) - --no-reconcile: Skip delete detection Features: - Graceful shutdown via Ctrl+C - Persistent state for resume capability - Progress logging with table and row counts - Error reporting for partial failures Closes #60 * Fix Dockerfile for SQLite image (#63)
1 parent e182524 commit e2a5b37

11 files changed

Lines changed: 2004 additions & 5 deletions

File tree

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ categories = ["command-line-utilities", "database"]
1313

1414
[dependencies]
1515
tokio = { version = "1.35", features = ["full"] }
16-
tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] }
16+
tokio-postgres = { version = "0.7", features = ["with-serde_json-1", "with-chrono-0_4", "with-uuid-1"] }
17+
uuid = { version = "1", features = ["v4"] }
1718
clap = { version = "4.4", features = ["derive", "env"] }
1819
anyhow = "1.0"
1920
tracing = "0.1"
@@ -40,7 +41,7 @@ bson = "2.9"
4041
mysql_async = "0.34"
4142
dirs = "5.0"
4243
url = "2.5"
43-
chrono = { version = "0.4", default-features = false, features = ["clock"] }
44+
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }
4445

4546
[[test]]
4647
name = "fallback_test"

Dockerfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# syntax=docker/dockerfile:1
22

3-
FROM debian:bookworm-slim AS downloader
3+
FROM ubuntu:24.04 AS downloader
44
ARG VERSION=latest
55
ENV BINARY_NAME=database-replicator-linux-x64-binary
66
ENV RELEASE_ROOT=https://github.com/serenorg/database-replicator/releases
@@ -16,13 +16,13 @@ RUN set -eux; \
1616
curl -fL "$URL" -o /tmp/database-replicator && \
1717
chmod +x /tmp/database-replicator
1818

19-
FROM debian:bookworm-slim
19+
FROM ubuntu:24.04
2020
LABEL org.opencontainers.image.title="database-replicator" \
2121
org.opencontainers.image.description="Seren database replicator CLI" \
2222
org.opencontainers.image.source="https://github.com/serenorg/database-replicator"
2323

2424
RUN apt-get update && \
25-
apt-get install -y --no-install-recommends ca-certificates libssl3 libpq5 postgresql-client && \
25+
apt-get install -y --no-install-recommends ca-certificates libsqlite3-0 libssl3 libpq5 postgresql-client && \
2626
rm -rf /var/lib/apt/lists/* && \
2727
useradd -m replicator
2828

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub mod sqlite;
1919
pub mod state;
2020
pub mod table_rules;
2121
pub mod utils;
22+
pub mod xmin;
2223

2324
use anyhow::{bail, Result};
2425

src/main.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,38 @@ enum Commands {
188188
#[command(flatten)]
189189
args: commands::target::TargetArgs,
190190
},
191+
/// Run xmin-based incremental sync (alternative to logical replication)
192+
#[command(name = "xmin-sync")]
193+
XminSync {
194+
#[arg(long)]
195+
source: String,
196+
#[arg(long)]
197+
target: Option<String>,
198+
/// Schema to sync (defaults to "public")
199+
#[arg(long, default_value = "public")]
200+
schema: String,
201+
/// Tables to sync (comma-separated, syncs all if not specified)
202+
#[arg(long, value_delimiter = ',')]
203+
tables: Option<Vec<String>>,
204+
/// Sync interval in seconds (default: 60)
205+
#[arg(long, default_value_t = 60)]
206+
interval: u64,
207+
/// Reconciliation interval in seconds for delete detection (default: 3600 = 1 hour)
208+
#[arg(long, default_value_t = 3600)]
209+
reconcile_interval: u64,
210+
/// Batch size for reading changes (default: 1000)
211+
#[arg(long, default_value_t = 1000)]
212+
batch_size: usize,
213+
/// Path to state file for tracking sync progress
214+
#[arg(long)]
215+
state_file: Option<String>,
216+
/// Run a single sync cycle then exit (useful for cron jobs)
217+
#[arg(long)]
218+
once: bool,
219+
/// Skip reconciliation (delete detection)
220+
#[arg(long)]
221+
no_reconcile: bool,
222+
},
191223
}
192224

193225
#[tokio::main]
@@ -583,6 +615,37 @@ async fn main() -> anyhow::Result<()> {
583615
commands::verify(&source, &target, Some(filter)).await
584616
}
585617
Commands::Target { args } => commands::target(args).await,
618+
Commands::XminSync {
619+
source,
620+
target,
621+
schema,
622+
tables,
623+
interval,
624+
reconcile_interval,
625+
batch_size,
626+
state_file,
627+
once,
628+
no_reconcile,
629+
} => {
630+
let state = database_replicator::state::load()?;
631+
let target = target.or(state.target_url).ok_or_else(|| {
632+
anyhow::anyhow!("Target database URL not provided and not set in state. Use `--target` or `database-replicator target set`.")
633+
})?;
634+
635+
xmin_sync(
636+
source,
637+
target,
638+
schema,
639+
tables,
640+
interval,
641+
reconcile_interval,
642+
batch_size,
643+
state_file,
644+
once,
645+
no_reconcile,
646+
)
647+
.await
648+
}
586649
}
587650
}
588651

@@ -1000,3 +1063,131 @@ enum SerenTargetMode {
10001063
Project,
10011064
Url,
10021065
}
1066+
1067+
/// Run xmin-based incremental sync between source and target databases
1068+
#[allow(clippy::too_many_arguments)]
1069+
async fn xmin_sync(
1070+
source: String,
1071+
target: String,
1072+
schema: String,
1073+
tables: Option<Vec<String>>,
1074+
interval: u64,
1075+
reconcile_interval: u64,
1076+
batch_size: usize,
1077+
state_file: Option<String>,
1078+
once: bool,
1079+
no_reconcile: bool,
1080+
) -> anyhow::Result<()> {
1081+
use database_replicator::xmin::{DaemonConfig, SyncDaemon, SyncState};
1082+
use std::path::PathBuf;
1083+
use std::time::Duration;
1084+
1085+
tracing::info!("Starting xmin-based sync...");
1086+
tracing::info!(
1087+
"Source: {}",
1088+
database_replicator::utils::strip_password_from_url(&source).unwrap_or_else(|_| source.clone())
1089+
);
1090+
tracing::info!(
1091+
"Target: {}",
1092+
database_replicator::utils::strip_password_from_url(&target).unwrap_or_else(|_| target.clone())
1093+
);
1094+
tracing::info!("Schema: {}", schema);
1095+
if let Some(ref t) = tables {
1096+
tracing::info!("Tables: {}", t.join(", "));
1097+
} else {
1098+
tracing::info!("Tables: all");
1099+
}
1100+
1101+
// CRITICAL: Ensure source and target are different to prevent data loss
1102+
database_replicator::utils::validate_source_target_different(&source, &target)
1103+
.context("Source and target validation failed")?;
1104+
tracing::info!("Verified source and target are different databases");
1105+
1106+
// Build daemon config
1107+
let state_path = state_file
1108+
.map(PathBuf::from)
1109+
.unwrap_or_else(SyncState::default_path);
1110+
1111+
let reconcile_interval_duration = if no_reconcile {
1112+
None
1113+
} else {
1114+
Some(Duration::from_secs(reconcile_interval))
1115+
};
1116+
1117+
let config = DaemonConfig {
1118+
sync_interval: Duration::from_secs(interval),
1119+
reconcile_interval: reconcile_interval_duration,
1120+
state_path,
1121+
batch_size,
1122+
tables: tables.unwrap_or_default(),
1123+
schema,
1124+
};
1125+
1126+
tracing::info!("Sync interval: {}s", interval);
1127+
if let Some(ref ri) = config.reconcile_interval {
1128+
tracing::info!("Reconcile interval: {}s", ri.as_secs());
1129+
} else {
1130+
tracing::info!("Reconciliation disabled");
1131+
}
1132+
tracing::info!("Batch size: {}", batch_size);
1133+
tracing::info!("State file: {:?}", config.state_path);
1134+
1135+
// Create the daemon
1136+
let daemon = SyncDaemon::new(source.clone(), target.clone(), config);
1137+
1138+
if once {
1139+
// Run a single sync cycle
1140+
tracing::info!("Running single sync cycle...");
1141+
1142+
let stats = daemon.run_sync_cycle().await?;
1143+
1144+
tracing::info!("Sync cycle complete:");
1145+
tracing::info!(" Tables synced: {}", stats.tables_synced);
1146+
tracing::info!(" Rows synced: {}", stats.rows_synced);
1147+
if !stats.errors.is_empty() {
1148+
tracing::warn!(" Errors: {}", stats.errors.len());
1149+
for err in &stats.errors {
1150+
tracing::warn!(" - {}", err);
1151+
}
1152+
}
1153+
1154+
println!();
1155+
println!("========================================");
1156+
println!("Xmin sync cycle complete");
1157+
println!("========================================");
1158+
println!(" Tables synced: {}", stats.tables_synced);
1159+
println!(" Rows synced: {}", stats.rows_synced);
1160+
if !stats.errors.is_empty() {
1161+
println!(" Errors: {}", stats.errors.len());
1162+
}
1163+
} else {
1164+
// Run continuous sync
1165+
tracing::info!("Starting continuous sync daemon...");
1166+
tracing::info!("Press Ctrl+C to stop");
1167+
1168+
println!();
1169+
println!("========================================");
1170+
println!("Starting xmin-based continuous sync");
1171+
println!("========================================");
1172+
println!(" Sync interval: {}s", interval);
1173+
println!(" Press Ctrl+C to stop");
1174+
println!();
1175+
1176+
// Create shutdown channel
1177+
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);
1178+
1179+
// Set up Ctrl+C handler
1180+
let shutdown_tx_clone = shutdown_tx.clone();
1181+
tokio::spawn(async move {
1182+
tokio::signal::ctrl_c()
1183+
.await
1184+
.expect("Failed to listen for Ctrl+C");
1185+
tracing::info!("Received shutdown signal");
1186+
let _ = shutdown_tx_clone.send(());
1187+
});
1188+
1189+
daemon.run(shutdown_rx).await?;
1190+
}
1191+
1192+
Ok(())
1193+
}

0 commit comments

Comments
 (0)