Skip to content

Commit 014dd04

Browse files
committed
fix: --include-tables ignored in xmin sync, CTRL+C unresponsive during sync
- Fixed sync command ignoring --include-tables and other CLI filter flags when using xmin-based sync. Now skips interactive mode when CLI flags are provided and passes filtered tables to xmin sync. - Fixed CTRL+C not responding during sync cycle. Daemon now checks shutdown signal during sync/reconciliation cycles using nested tokio::select! with biased polling. Bump version to 7.0.4
1 parent 78140e7 commit 014dd04

5 files changed

Lines changed: 96 additions & 33 deletions

File tree

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
All notable changes to this project will be documented in this file.
44

5+
## [7.0.4] - 2025-12-09
6+
7+
### Fixed
8+
9+
- **`--include-tables` ignored in xmin sync**: Fixed bug where `--include-tables` and other CLI filter flags were ignored by the `sync` command when using xmin-based sync. The sync command now correctly respects table filters when CLI flags are provided, skipping interactive mode and passing filtered tables to xmin sync.
10+
11+
- **CTRL+C not responding during sync**: Fixed bug where CTRL+C would not interrupt a running sync cycle. The daemon now checks for shutdown signals during sync and reconciliation cycles, allowing graceful termination at any point.
12+
513
## [7.0.3] - 2025-12-08
614

715
### Fixed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "database-replicator"
3-
version = "7.0.3"
3+
version = "7.0.4"
44
edition = "2021"
55
license = "Apache-2.0"
66
description = "Universal database-to-PostgreSQL replication CLI. Supports PostgreSQL, SQLite, MongoDB, and MySQL."

src/main.rs

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -519,7 +519,13 @@ async fn main() -> anyhow::Result<()> {
519519
app_state.target_url = Some(resolved_target.clone());
520520
database_replicator::state::save(&app_state)?;
521521

522-
let filter = if !no_interactive {
522+
// Check if CLI filter flags were provided (skip interactive if so)
523+
let has_cli_filters = include_databases.is_some()
524+
|| exclude_databases.is_some()
525+
|| include_tables.is_some()
526+
|| exclude_tables.is_some();
527+
528+
let filter = if !no_interactive && !has_cli_filters {
523529
// Interactive mode (default) - prompt user to select databases and tables
524530
let (filter, rules) =
525531
database_replicator::interactive::select_databases_and_tables(&source).await?;
@@ -636,12 +642,39 @@ async fn main() -> anyhow::Result<()> {
636642
);
637643
tracing::info!("Using xmin-based sync (no source configuration required)");
638644

645+
// Extract tables from filter for xmin sync
646+
// Filter stores "db.table" format, we need just table names for the source db
647+
let source_parts = database_replicator::utils::parse_postgres_url(&source)?;
648+
let source_db = &source_parts.database;
649+
650+
let tables_to_sync: Option<Vec<String>> = filter.include_tables().map(|tables| {
651+
tables
652+
.iter()
653+
.filter_map(|qualified| {
654+
// Split "db.table" into parts
655+
let parts: Vec<&str> = qualified.splitn(2, '.').collect();
656+
if parts.len() == 2 {
657+
let (db, table) = (parts[0], parts[1]);
658+
// Only include tables from the source database
659+
if db == source_db {
660+
Some(table.to_string())
661+
} else {
662+
None
663+
}
664+
} else {
665+
// No dot, treat as plain table name
666+
Some(qualified.clone())
667+
}
668+
})
669+
.collect()
670+
});
671+
639672
// Use CLI-provided intervals or defaults
640673
xmin_sync(
641674
source,
642675
resolved_target,
643676
"public".to_string(), // Default schema
644-
None, // Discover all tables
677+
tables_to_sync, // Tables from filter
645678
sync_interval, // CLI: --sync-interval (default 60s)
646679
reconcile_interval, // CLI: --reconcile-interval (default 3600s)
647680
1000, // Batch size

src/xmin/daemon.rs

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -244,25 +244,41 @@ impl SyncDaemon {
244244

245245
loop {
246246
tokio::select! {
247+
biased; // Check shutdown first
248+
249+
_ = shutdown.recv() => {
250+
tracing::info!("Shutdown signal received, stopping SyncDaemon");
251+
break;
252+
}
247253
_ = sync_interval.tick() => {
248254
cycles += 1;
249255
tracing::info!("Starting sync cycle {}", cycles);
250256

251-
match self.run_sync_cycle().await {
252-
Ok(stats) => {
253-
tracing::info!(
254-
"Sync cycle {} completed: {} tables, {} rows in {}ms",
255-
cycles,
256-
stats.tables_synced,
257-
stats.rows_synced,
258-
stats.duration_ms
259-
);
260-
if !stats.errors.is_empty() {
261-
tracing::warn!("Sync cycle had {} errors", stats.errors.len());
262-
}
257+
// Run sync cycle with shutdown check - abort if shutdown received
258+
tokio::select! {
259+
biased;
260+
_ = shutdown.recv() => {
261+
tracing::info!("Shutdown signal received during sync cycle, aborting");
262+
break;
263263
}
264-
Err(e) => {
265-
tracing::error!("Sync cycle {} failed: {}", cycles, e);
264+
result = self.run_sync_cycle() => {
265+
match result {
266+
Ok(stats) => {
267+
tracing::info!(
268+
"Sync cycle {} completed: {} tables, {} rows in {}ms",
269+
cycles,
270+
stats.tables_synced,
271+
stats.rows_synced,
272+
stats.duration_ms
273+
);
274+
if !stats.errors.is_empty() {
275+
tracing::warn!("Sync cycle had {} errors", stats.errors.len());
276+
}
277+
}
278+
Err(e) => {
279+
tracing::error!("Sync cycle {} failed: {}", cycles, e);
280+
}
281+
}
266282
}
267283
}
268284
}
@@ -276,25 +292,31 @@ impl SyncDaemon {
276292
reconcile_cycles += 1;
277293
tracing::info!("Starting reconciliation cycle {}", reconcile_cycles);
278294

279-
match self.run_reconciliation().await {
280-
Ok(stats) => {
281-
tracing::info!(
282-
"Reconciliation cycle {} completed: {} tables, {} rows deleted in {}ms",
283-
reconcile_cycles,
284-
stats.tables_synced,
285-
stats.rows_deleted,
286-
stats.duration_ms
287-
);
295+
// Run reconciliation with shutdown check
296+
tokio::select! {
297+
biased;
298+
_ = shutdown.recv() => {
299+
tracing::info!("Shutdown signal received during reconciliation, aborting");
300+
break;
288301
}
289-
Err(e) => {
290-
tracing::error!("Reconciliation cycle {} failed: {}", reconcile_cycles, e);
302+
result = self.run_reconciliation() => {
303+
match result {
304+
Ok(stats) => {
305+
tracing::info!(
306+
"Reconciliation cycle {} completed: {} tables, {} rows deleted in {}ms",
307+
reconcile_cycles,
308+
stats.tables_synced,
309+
stats.rows_deleted,
310+
stats.duration_ms
311+
);
312+
}
313+
Err(e) => {
314+
tracing::error!("Reconciliation cycle {} failed: {}", reconcile_cycles, e);
315+
}
316+
}
291317
}
292318
}
293319
}
294-
_ = shutdown.recv() => {
295-
tracing::info!("Shutdown signal received, stopping SyncDaemon");
296-
break;
297-
}
298320
}
299321
}
300322

0 commit comments

Comments
 (0)