Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ categories = ["command-line-utilities", "database"]

[dependencies]
tokio = { version = "1.35", features = ["full"] }
tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] }
tokio-postgres = { version = "0.7", features = ["with-serde_json-1", "with-chrono-0_4", "with-uuid-1"] }
uuid = { version = "1", features = ["v4"] }
clap = { version = "4.4", features = ["derive", "env"] }
anyhow = "1.0"
tracing = "0.1"
Expand All @@ -40,7 +41,7 @@ bson = "2.9"
mysql_async = "0.34"
dirs = "5.0"
url = "2.5"
chrono = { version = "0.4", default-features = false, features = ["clock"] }
chrono = { version = "0.4", default-features = false, features = ["clock", "serde"] }

[[test]]
name = "fallback_test"
Expand Down
6 changes: 3 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax=docker/dockerfile:1

FROM debian:bookworm-slim AS downloader
FROM ubuntu:24.04 AS downloader
ARG VERSION=latest
ENV BINARY_NAME=database-replicator-linux-x64-binary
ENV RELEASE_ROOT=https://github.com/serenorg/database-replicator/releases
Expand All @@ -16,13 +16,13 @@ RUN set -eux; \
curl -fL "$URL" -o /tmp/database-replicator && \
chmod +x /tmp/database-replicator

FROM debian:bookworm-slim
FROM ubuntu:24.04
LABEL org.opencontainers.image.title="database-replicator" \
org.opencontainers.image.description="Seren database replicator CLI" \
org.opencontainers.image.source="https://github.com/serenorg/database-replicator"

RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates libssl3 libpq5 postgresql-client && \
apt-get install -y --no-install-recommends ca-certificates libsqlite3-0 libssl3 libpq5 postgresql-client && \
rm -rf /var/lib/apt/lists/* && \
useradd -m replicator

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub mod sqlite;
pub mod state;
pub mod table_rules;
pub mod utils;
pub mod xmin;

use anyhow::{bail, Result};

Expand Down
191 changes: 191 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,38 @@ enum Commands {
#[command(flatten)]
args: commands::target::TargetArgs,
},
/// Run xmin-based incremental sync (alternative to logical replication)
#[command(name = "xmin-sync")]
XminSync {
#[arg(long)]
source: String,
#[arg(long)]
target: Option<String>,
/// Schema to sync (defaults to "public")
#[arg(long, default_value = "public")]
schema: String,
/// Tables to sync (comma-separated, syncs all if not specified)
#[arg(long, value_delimiter = ',')]
tables: Option<Vec<String>>,
/// Sync interval in seconds (default: 60)
#[arg(long, default_value_t = 60)]
interval: u64,
/// Reconciliation interval in seconds for delete detection (default: 3600 = 1 hour)
#[arg(long, default_value_t = 3600)]
reconcile_interval: u64,
/// Batch size for reading changes (default: 1000)
#[arg(long, default_value_t = 1000)]
batch_size: usize,
/// Path to state file for tracking sync progress
#[arg(long)]
state_file: Option<String>,
/// Run a single sync cycle then exit (useful for cron jobs)
#[arg(long)]
once: bool,
/// Skip reconciliation (delete detection)
#[arg(long)]
no_reconcile: bool,
},
}

#[tokio::main]
Expand Down Expand Up @@ -583,6 +615,37 @@ async fn main() -> anyhow::Result<()> {
commands::verify(&source, &target, Some(filter)).await
}
Commands::Target { args } => commands::target(args).await,
Commands::XminSync {
source,
target,
schema,
tables,
interval,
reconcile_interval,
batch_size,
state_file,
once,
no_reconcile,
} => {
let state = database_replicator::state::load()?;
let target = target.or(state.target_url).ok_or_else(|| {
anyhow::anyhow!("Target database URL not provided and not set in state. Use `--target` or `database-replicator target set`.")
})?;

xmin_sync(
source,
target,
schema,
tables,
interval,
reconcile_interval,
batch_size,
state_file,
once,
no_reconcile,
)
.await
}
}
}

Expand Down Expand Up @@ -1000,3 +1063,131 @@ enum SerenTargetMode {
Project,
Url,
}

/// Run xmin-based incremental sync between source and target databases
#[allow(clippy::too_many_arguments)]
async fn xmin_sync(
source: String,
target: String,
schema: String,
tables: Option<Vec<String>>,
interval: u64,
reconcile_interval: u64,
batch_size: usize,
state_file: Option<String>,
once: bool,
no_reconcile: bool,
) -> anyhow::Result<()> {
use database_replicator::xmin::{DaemonConfig, SyncDaemon, SyncState};
use std::path::PathBuf;
use std::time::Duration;

tracing::info!("Starting xmin-based sync...");
tracing::info!(
"Source: {}",
database_replicator::utils::strip_password_from_url(&source).unwrap_or_else(|_| source.clone())
);
tracing::info!(
"Target: {}",
database_replicator::utils::strip_password_from_url(&target).unwrap_or_else(|_| target.clone())
);
tracing::info!("Schema: {}", schema);
if let Some(ref t) = tables {
tracing::info!("Tables: {}", t.join(", "));
} else {
tracing::info!("Tables: all");
}

// CRITICAL: Ensure source and target are different to prevent data loss
database_replicator::utils::validate_source_target_different(&source, &target)
.context("Source and target validation failed")?;
tracing::info!("Verified source and target are different databases");

// Build daemon config
let state_path = state_file
.map(PathBuf::from)
.unwrap_or_else(SyncState::default_path);

let reconcile_interval_duration = if no_reconcile {
None
} else {
Some(Duration::from_secs(reconcile_interval))
};

let config = DaemonConfig {
sync_interval: Duration::from_secs(interval),
reconcile_interval: reconcile_interval_duration,
state_path,
batch_size,
tables: tables.unwrap_or_default(),
schema,
};

tracing::info!("Sync interval: {}s", interval);
if let Some(ref ri) = config.reconcile_interval {
tracing::info!("Reconcile interval: {}s", ri.as_secs());
} else {
tracing::info!("Reconciliation disabled");
}
tracing::info!("Batch size: {}", batch_size);
tracing::info!("State file: {:?}", config.state_path);

// Create the daemon
let daemon = SyncDaemon::new(source.clone(), target.clone(), config);

if once {
// Run a single sync cycle
tracing::info!("Running single sync cycle...");

let stats = daemon.run_sync_cycle().await?;

tracing::info!("Sync cycle complete:");
tracing::info!(" Tables synced: {}", stats.tables_synced);
tracing::info!(" Rows synced: {}", stats.rows_synced);
if !stats.errors.is_empty() {
tracing::warn!(" Errors: {}", stats.errors.len());
for err in &stats.errors {
tracing::warn!(" - {}", err);
}
}

println!();
println!("========================================");
println!("Xmin sync cycle complete");
println!("========================================");
println!(" Tables synced: {}", stats.tables_synced);
println!(" Rows synced: {}", stats.rows_synced);
if !stats.errors.is_empty() {
println!(" Errors: {}", stats.errors.len());
}
} else {
// Run continuous sync
tracing::info!("Starting continuous sync daemon...");
tracing::info!("Press Ctrl+C to stop");

println!();
println!("========================================");
println!("Starting xmin-based continuous sync");
println!("========================================");
println!(" Sync interval: {}s", interval);
println!(" Press Ctrl+C to stop");
println!();

// Create shutdown channel
let (shutdown_tx, shutdown_rx) = tokio::sync::broadcast::channel::<()>(1);

// Set up Ctrl+C handler
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("Failed to listen for Ctrl+C");
tracing::info!("Received shutdown signal");
let _ = shutdown_tx_clone.send(());
});

daemon.run(shutdown_rx).await?;
}

Ok(())
}
Loading
Loading