From 84583ebc1a61a96de56c5db3ae52b64d9c30b8d9 Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Wed, 10 Dec 2025 10:32:47 -0800 Subject: [PATCH 1/5] fix: Use batched processing in sync daemon to prevent OOM and timeouts The sync_table method was loading entire tables into memory before processing, causing: - 10GB+ memory usage for tables with millions of rows - Connection timeouts when queries exceeded ELB idle timeouts - Failed syncs with "connection closed" errors Changes: - Use existing batched reader (read_changes_batched + fetch_batch) instead of loading all rows at once - Process and write each batch immediately (memory = O(batch_size)) - Update sync state after each batch for resume capability - Add progress logging every 10 batches - Increase default batch_size from 1000 to 10000 for better throughput - Check for xmin wraparound at start rather than during read This reduces memory from O(total_rows) to O(batch_size), enabling sync of tables with millions of rows without OOM or timeouts. Closes #74 --- src/xmin/daemon.rs | 131 ++++++++++++++++++++++++++++++++------------- 1 file changed, 95 insertions(+), 36 deletions(-) diff --git a/src/xmin/daemon.rs b/src/xmin/daemon.rs index 2baff6b..66ed161 100644 --- a/src/xmin/daemon.rs +++ b/src/xmin/daemon.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use std::time::Duration; use tokio::time::interval; -use super::reader::XminReader; +use super::reader::{detect_wraparound, WraparoundCheck, XminReader}; use super::reconciler::Reconciler; use super::state::SyncState; use super::writer::{get_primary_key_columns, get_table_columns, row_to_values, ChangeWriter}; @@ -35,7 +35,7 @@ impl Default for DaemonConfig { sync_interval: Duration::from_secs(3600), // 1 hour reconcile_interval: Some(Duration::from_secs(86400)), // 1 day state_path: SyncState::default_path(), - batch_size: 1000, + batch_size: 10_000, // 10K rows per batch for good throughput while bounding memory tables: Vec::new(), schema: "public".to_string(), } @@ -323,7 +323,11 @@ impl SyncDaemon { Ok(()) } - /// Sync a single table. + /// Sync a single table using batched processing. + /// + /// This method processes rows in batches to avoid loading entire tables into memory. + /// This is critical for large tables (millions of rows) where loading everything + /// at once would cause OOM or connection timeouts. async fn sync_table( &self, reader: &XminReader<'_>, @@ -334,7 +338,7 @@ impl SyncDaemon { ) -> Result { // Get table state let table_state = state.get_or_create_table(schema, table); - let since_xmin = table_state.last_xmin; + let stored_xmin = table_state.last_xmin; // Get table metadata from SOURCE (not target - tables may not exist there yet) let columns = get_table_columns(reader.client(), schema, table).await?; @@ -346,54 +350,109 @@ impl SyncDaemon { let column_names: Vec = columns.iter().map(|(name, _)| name.clone()).collect(); - // Read changes with wraparound detection - let (rows, max_xmin, was_full_sync) = reader - .read_changes_with_wraparound_check(schema, table, &column_names, since_xmin) + // Check for xmin wraparound before starting + let current_xmin = reader.get_current_xmin().await?; + let (since_xmin, is_full_sync) = if detect_wraparound(stored_xmin, current_xmin) + == WraparoundCheck::WraparoundDetected + { + tracing::warn!( + "xmin wraparound detected for {}.{} - performing full table sync", + schema, + table + ); + (0, true) // Start from beginning + } else { + (stored_xmin, false) + }; + + // Use batched reading to avoid loading entire table into memory + let batch_size = self.config.batch_size; + let mut batch_reader = reader + .read_changes_batched(schema, table, &column_names, since_xmin, batch_size) .await?; - if rows.is_empty() { + let mut total_rows = 0u64; + let mut max_xmin = since_xmin; + let mut batch_count = 0u64; + + // Process batches until exhausted + while let Some((rows, batch_max_xmin)) = reader.fetch_batch(&mut batch_reader).await? { + if rows.is_empty() { + break; + } + + batch_count += 1; + let batch_len = rows.len(); + + // Log first batch with total context, then periodic progress + if batch_count == 1 { + if is_full_sync { + tracing::info!( + "Starting full table sync for {}.{} (batch size: {})", + schema, + table, + batch_size + ); + } else { + tracing::info!( + "Found changes in {}.{} (xmin {} -> {}), processing in batches", + schema, + table, + since_xmin, + batch_max_xmin + ); + } + } + + // Convert and apply batch immediately (memory = O(batch_size)) + let values: Vec>> = rows + .iter() + .map(|row| row_to_values(row, &columns)) + .collect(); + + let affected = writer + .apply_batch(schema, table, &pk_columns, &column_names, values) + .await?; + + total_rows += affected; + max_xmin = batch_max_xmin; + + // Update state after each batch for resume capability + state.update_table(schema, table, max_xmin, affected); + + // Log progress every 10 batches or 100K rows + if batch_count.is_multiple_of(10) || total_rows % 100_000 < batch_len as u64 { + tracing::info!( + "Progress: {}.{} - {} rows synced ({} batches), current xmin: {}", + schema, + table, + total_rows, + batch_count, + max_xmin + ); + } + } + + if total_rows == 0 { tracing::debug!( "No changes in {}.{} since xmin {}", schema, table, since_xmin ); - return Ok(0); - } - - if was_full_sync { - tracing::warn!( - "xmin wraparound detected for {}.{} - performed full table sync ({} rows)", - schema, - table, - rows.len() - ); } else { tracing::info!( - "Found {} changed rows in {}.{} (xmin {} -> {})", - rows.len(), + "Completed sync for {}.{}: {} rows in {} batches (xmin {} -> {})", schema, table, + total_rows, + batch_count, since_xmin, max_xmin ); } - // Convert rows to values (excluding the _xmin column we added) - let values: Vec>> = rows - .iter() - .map(|row| row_to_values(row, &columns)) - .collect(); - - // Apply changes - let affected = writer - .apply_batch(schema, table, &pk_columns, &column_names, values) - .await?; - - // Update state - state.update_table(schema, table, max_xmin, affected); - - Ok(affected) + Ok(total_rows) } /// Load existing state or create new state. @@ -431,7 +490,7 @@ mod tests { let config = DaemonConfig::default(); assert_eq!(config.sync_interval, Duration::from_secs(3600)); assert_eq!(config.reconcile_interval, Some(Duration::from_secs(86400))); - assert_eq!(config.batch_size, 1000); + assert_eq!(config.batch_size, 10_000); assert_eq!(config.schema, "public"); } From fc35298dcdbc9ec2f8307ccfd863e4b98b3ebfc7 Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Wed, 10 Dec 2025 10:52:55 -0800 Subject: [PATCH 2/5] feat: Auto-detect optimal batch size based on available memory Add cross-platform memory detection and automatic batch size calculation to prevent OOM on small instances while maximizing throughput on larger ones. New functions in utils.rs: - get_available_memory(): Cross-platform (Linux, macOS, Windows) - Linux: Reads MemAvailable from /proc/meminfo - macOS: Uses sysctl + vm_stat for free/inactive pages - Windows: Uses GlobalMemoryStatusEx Win32 API - calculate_optimal_batch_size(): Auto-calculates based on memory - Uses 25% of available memory as working budget - Assumes 2KB per row (conservative estimate) - Clamps between 1,000 and 50,000 rows Expected batch sizes by instance type: - t3.nano (512MB): ~1,000 rows - t3.small (2GB): ~10,000 rows - t3.large (8GB+): 50,000 rows (capped) Refs #74 --- src/main.rs | 2 +- src/utils.rs | 270 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 271 insertions(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index c692684..bf5a1b6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -700,7 +700,7 @@ async fn main() -> anyhow::Result<()> { tables_to_sync, // Tables from filter sync_interval, // CLI: --sync-interval (default 60s) reconcile_interval, // CLI: --reconcile-interval (default 3600s) - 1000, // Batch size + database_replicator::utils::calculate_optimal_batch_size(), // Auto-detect based on available memory None, // State file: use default once, // CLI: --once (run single cycle) no_reconcile, // CLI: --no-reconcile (disable delete detection) diff --git a/src/utils.rs b/src/utils.rs index af418e7..9e57952 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1318,10 +1318,280 @@ pub fn parse_pg_version_string(version_str: &str) -> Result { bail!("Could not parse PostgreSQL version from: {}", version_str) } +/// Get available system memory in bytes +/// +/// Cross-platform function that works on Linux, macOS, and Windows. +/// Returns the amount of memory available for use by applications. +/// +/// # Platform Details +/// +/// - **Linux**: Reads `MemAvailable` from `/proc/meminfo` +/// - **macOS**: Uses `sysctl hw.memsize` for total memory, estimates available +/// - **Windows**: Uses `GlobalMemoryStatusEx` API +/// +/// # Returns +/// +/// Available memory in bytes, or an error if detection fails. +/// +/// # Examples +/// +/// ```no_run +/// use database_replicator::utils::get_available_memory; +/// +/// let available = get_available_memory().unwrap(); +/// println!("Available memory: {} MB", available / 1024 / 1024); +/// ``` +pub fn get_available_memory() -> Result { + #[cfg(target_os = "linux")] + { + get_available_memory_linux() + } + + #[cfg(target_os = "macos")] + { + get_available_memory_macos() + } + + #[cfg(target_os = "windows")] + { + get_available_memory_windows() + } + + #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))] + { + // Fallback: assume 1GB available for unknown platforms + tracing::warn!("Memory detection not supported on this platform, assuming 1GB available"); + Ok(1024 * 1024 * 1024) + } +} + +#[cfg(target_os = "linux")] +fn get_available_memory_linux() -> Result { + use std::fs; + + let meminfo = fs::read_to_string("/proc/meminfo") + .context("Failed to read /proc/meminfo")?; + + // Try MemAvailable first (more accurate, available since Linux 3.14) + for line in meminfo.lines() { + if line.starts_with("MemAvailable:") { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 2 { + let kb: u64 = parts[1].parse() + .context("Failed to parse MemAvailable value")?; + return Ok(kb * 1024); // Convert KB to bytes + } + } + } + + // Fallback: MemFree + Buffers + Cached (less accurate but works on older kernels) + let mut mem_free: u64 = 0; + let mut buffers: u64 = 0; + let mut cached: u64 = 0; + + for line in meminfo.lines() { + let parts: Vec<&str> = line.split_whitespace().collect(); + if parts.len() >= 2 { + let value: u64 = parts[1].parse().unwrap_or(0); + if line.starts_with("MemFree:") { + mem_free = value; + } else if line.starts_with("Buffers:") { + buffers = value; + } else if line.starts_with("Cached:") && !line.starts_with("SwapCached:") { + cached = value; + } + } + } + + Ok((mem_free + buffers + cached) * 1024) // Convert KB to bytes +} + +#[cfg(target_os = "macos")] +fn get_available_memory_macos() -> Result { + use std::process::Command; + + // Get total physical memory using sysctl + let output = Command::new("sysctl") + .args(["-n", "hw.memsize"]) + .output() + .context("Failed to execute sysctl")?; + + let total_str = String::from_utf8_lossy(&output.stdout); + let total_bytes: u64 = total_str.trim().parse() + .context("Failed to parse hw.memsize")?; + + // Get page size and free pages using vm_stat + let vm_output = Command::new("vm_stat") + .output() + .context("Failed to execute vm_stat")?; + + let vm_stat = String::from_utf8_lossy(&vm_output.stdout); + + // Parse page size (usually 4096 or 16384 on Apple Silicon) + let page_size: u64 = 4096; // Default, macOS uses 4KB or 16KB pages + + // Parse free and inactive pages from vm_stat + let mut pages_free: u64 = 0; + let mut pages_inactive: u64 = 0; + let mut pages_purgeable: u64 = 0; + + for line in vm_stat.lines() { + if line.starts_with("Pages free:") { + pages_free = parse_vm_stat_value(line); + } else if line.starts_with("Pages inactive:") { + pages_inactive = parse_vm_stat_value(line); + } else if line.starts_with("Pages purgeable:") { + pages_purgeable = parse_vm_stat_value(line); + } + } + + // Available = free + inactive + purgeable (conservative estimate) + let available = (pages_free + pages_inactive + pages_purgeable) * page_size; + + // If vm_stat parsing failed, estimate 50% of total as available + if available == 0 { + tracing::debug!("vm_stat parsing returned 0, estimating 50% of total memory as available"); + return Ok(total_bytes / 2); + } + + Ok(available) +} + +#[cfg(target_os = "macos")] +fn parse_vm_stat_value(line: &str) -> u64 { + // Format: "Pages free: 12345." + line.split(':') + .nth(1) + .and_then(|s| s.trim().trim_end_matches('.').parse().ok()) + .unwrap_or(0) +} + +#[cfg(target_os = "windows")] +fn get_available_memory_windows() -> Result { + use std::mem; + + // MEMORYSTATUSEX structure + #[repr(C)] + #[allow(non_snake_case)] + struct MEMORYSTATUSEX { + dwLength: u32, + dwMemoryLoad: u32, + ullTotalPhys: u64, + ullAvailPhys: u64, + ullTotalPageFile: u64, + ullAvailPageFile: u64, + ullTotalVirtual: u64, + ullAvailVirtual: u64, + ullAvailExtendedVirtual: u64, + } + + #[link(name = "kernel32")] + extern "system" { + fn GlobalMemoryStatusEx(lpBuffer: *mut MEMORYSTATUSEX) -> i32; + } + + let mut mem_status: MEMORYSTATUSEX = unsafe { mem::zeroed() }; + mem_status.dwLength = mem::size_of::() as u32; + + let result = unsafe { GlobalMemoryStatusEx(&mut mem_status) }; + + if result == 0 { + bail!("GlobalMemoryStatusEx failed"); + } + + Ok(mem_status.ullAvailPhys) +} + +/// Calculate optimal batch size based on available system memory +/// +/// Automatically determines an appropriate batch size for the sync daemon +/// based on the amount of available system memory. This prevents OOM errors +/// on memory-constrained instances while maximizing throughput on larger ones. +/// +/// # Memory Model +/// +/// The calculation assumes: +/// - Each row consumes approximately 2KB in memory (conservative estimate for wide tables) +/// - We should use at most 25% of available memory for batch processing +/// - Minimum batch size: 1,000 rows (for very constrained systems) +/// - Maximum batch size: 50,000 rows (diminishing returns beyond this) +/// +/// # Returns +/// +/// Optimal batch size in number of rows, or default of 10,000 if detection fails. +/// +/// # Examples +/// +/// ```no_run +/// use database_replicator::utils::calculate_optimal_batch_size; +/// +/// let batch_size = calculate_optimal_batch_size(); +/// println!("Using batch size: {}", batch_size); +/// // On t3.nano (512MB): ~1,000-2,000 +/// // On t3.small (2GB): ~10,000 +/// // On t3.large (8GB): ~50,000 (capped) +/// ``` +pub fn calculate_optimal_batch_size() -> usize { + const BYTES_PER_ROW: u64 = 2048; // Conservative: 2KB per row + const MEMORY_FRACTION: f64 = 0.25; // Use at most 25% of available memory + const MIN_BATCH_SIZE: usize = 1_000; + const MAX_BATCH_SIZE: usize = 50_000; + const DEFAULT_BATCH_SIZE: usize = 10_000; + + match get_available_memory() { + Ok(available_bytes) => { + // Calculate how many rows we can fit in 25% of available memory + let usable_bytes = (available_bytes as f64 * MEMORY_FRACTION) as u64; + let calculated_size = (usable_bytes / BYTES_PER_ROW) as usize; + + // Clamp to min/max range + let batch_size = calculated_size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE); + + tracing::info!( + "Auto-detected batch size: {} (available memory: {} MB)", + batch_size, + available_bytes / 1024 / 1024 + ); + + batch_size + } + Err(e) => { + tracing::warn!( + "Failed to detect available memory: {}. Using default batch size: {}", + e, + DEFAULT_BATCH_SIZE + ); + DEFAULT_BATCH_SIZE + } + } +} + #[cfg(test)] mod tests { use super::*; + #[test] + fn test_get_available_memory() { + // This should work on all supported platforms + let result = get_available_memory(); + + // Should succeed (may fail in very restricted environments) + if let Ok(available) = result { + // Sanity check: should be at least 10MB, less than 1TB + assert!(available > 10 * 1024 * 1024, "Available memory too low: {}", available); + assert!(available < 1024 * 1024 * 1024 * 1024, "Available memory too high: {}", available); + } + } + + #[test] + fn test_calculate_optimal_batch_size() { + let batch_size = calculate_optimal_batch_size(); + + // Should be within expected range + assert!(batch_size >= 1_000, "Batch size too small: {}", batch_size); + assert!(batch_size <= 50_000, "Batch size too large: {}", batch_size); + } + #[test] fn test_validate_connection_string_valid() { assert!(validate_connection_string("postgresql://user:pass@localhost:5432/dbname").is_ok()); From 268dff07ed5affa11ddc53c58f9a1673b7a410b3 Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Wed, 10 Dec 2025 11:01:42 -0800 Subject: [PATCH 3/5] fix: Use batched reconciliation to prevent OOM on large tables The reconciler was loading ALL primary keys from both source and target tables into memory before comparing them. For tables with millions of rows (e.g., 14M rows), this caused: - 2-3 GB memory usage just for PKs - Potential OOM on memory-constrained instances - Connection timeouts during long-running PK fetch queries Changes: - Add reconcile_table_batched() using merge-join comparison - Implement PkBatchReader with keyset pagination (WHERE pk > last_pk) - Fetch PKs in sorted batches from both databases - Compare using single-pass merge-join (both streams sorted) - Delete orphans in batches as they're discovered - Add progress logging every 100K comparisons This reduces memory from O(total_rows) to O(batch_size), enabling reconciliation of tables with millions of rows without OOM. Closes #75 --- src/xmin/daemon.rs | 7 +- src/xmin/reconciler.rs | 349 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 355 insertions(+), 1 deletion(-) diff --git a/src/xmin/daemon.rs b/src/xmin/daemon.rs index 66ed161..ecdb7c3 100644 --- a/src/xmin/daemon.rs +++ b/src/xmin/daemon.rs @@ -203,7 +203,12 @@ impl SyncDaemon { } match reconciler - .reconcile_table(&self.config.schema, table, &pk_columns) + .reconcile_table_batched( + &self.config.schema, + table, + &pk_columns, + self.config.batch_size, + ) .await { Ok(deleted) => { diff --git a/src/xmin/reconciler.rs b/src/xmin/reconciler.rs index c0d96f2..63d1f57 100644 --- a/src/xmin/reconciler.rs +++ b/src/xmin/reconciler.rs @@ -2,6 +2,7 @@ // ABOUTME: Compares primary keys between source and target to find orphaned rows use anyhow::{Context, Result}; +use std::cmp::Ordering; use std::collections::HashSet; use tokio_postgres::types::ToSql; use tokio_postgres::Client; @@ -202,6 +203,354 @@ impl<'a> Reconciler<'a> { Ok(row.get(0)) } + + /// Reconcile a table using batched streaming comparison (memory-efficient). + /// + /// Uses merge-join comparison on sorted primary keys fetched in batches. + /// This avoids loading all PKs into memory, making it suitable for tables + /// with millions of rows. + /// + /// # Arguments + /// + /// * `schema` - Schema name + /// * `table` - Table name + /// * `primary_key_columns` - Primary key column names + /// * `batch_size` - Number of PKs to fetch per batch + /// + /// # Returns + /// + /// The number of orphaned rows deleted from target. + pub async fn reconcile_table_batched( + &self, + schema: &str, + table: &str, + primary_key_columns: &[String], + batch_size: usize, + ) -> Result { + tracing::info!( + "Starting batched reconciliation for {}.{} (batch size: {})", + schema, + table, + batch_size + ); + + let writer = ChangeWriter::new(self.target_client); + let mut total_deleted = 0u64; + let mut orphans_batch: Vec> = Vec::new(); + + // Initialize batch readers for both source and target + let mut source_reader = PkBatchReader::new( + self.source_client, + schema, + table, + primary_key_columns, + batch_size, + ); + let mut target_reader = PkBatchReader::new( + self.target_client, + schema, + table, + primary_key_columns, + batch_size, + ); + + // Fetch initial batches + let mut source_batch = source_reader.fetch_next().await?; + let mut target_batch = target_reader.fetch_next().await?; + let mut source_idx = 0; + let mut target_idx = 0; + let mut comparisons = 0u64; + + // Merge-join comparison loop + loop { + // Refill source batch if exhausted + if source_idx >= source_batch.len() && !source_reader.exhausted { + source_batch = source_reader.fetch_next().await?; + source_idx = 0; + } + + // Refill target batch if exhausted + if target_idx >= target_batch.len() && !target_reader.exhausted { + target_batch = target_reader.fetch_next().await?; + target_idx = 0; + } + + // Check termination conditions + let source_exhausted = source_idx >= source_batch.len(); + let target_exhausted = target_idx >= target_batch.len(); + + if source_exhausted && target_exhausted { + // Both exhausted - done + break; + } + + if source_exhausted { + // Source exhausted but target has more - all remaining are orphans + while target_idx < target_batch.len() { + orphans_batch.push(target_batch[target_idx].clone()); + target_idx += 1; + + // Delete batch when full + if orphans_batch.len() >= batch_size { + total_deleted += self + .delete_orphan_batch( + &writer, + schema, + table, + primary_key_columns, + &orphans_batch, + ) + .await?; + orphans_batch.clear(); + } + } + + // Fetch more from target + if !target_reader.exhausted { + target_batch = target_reader.fetch_next().await?; + target_idx = 0; + } + continue; + } + + if target_exhausted { + // Target exhausted but source has more - no more orphans possible + break; + } + + // Compare current PKs + let source_pk = &source_batch[source_idx]; + let target_pk = &target_batch[target_idx]; + comparisons += 1; + + match compare_pks(source_pk, target_pk) { + Ordering::Equal => { + // PKs match - both exist, advance both + source_idx += 1; + target_idx += 1; + } + Ordering::Less => { + // Source PK < Target PK - source has row target doesn't + // This is fine, just advance source + source_idx += 1; + } + Ordering::Greater => { + // Source PK > Target PK - target has orphan + orphans_batch.push(target_pk.clone()); + target_idx += 1; + + // Delete batch when full + if orphans_batch.len() >= batch_size { + total_deleted += self + .delete_orphan_batch( + &writer, + schema, + table, + primary_key_columns, + &orphans_batch, + ) + .await?; + orphans_batch.clear(); + } + } + } + + // Log progress periodically + if comparisons.is_multiple_of(100_000) { + tracing::info!( + "Reconciliation progress for {}.{}: {} comparisons, {} orphans found", + schema, + table, + comparisons, + total_deleted + orphans_batch.len() as u64 + ); + } + } + + // Delete remaining orphans + if !orphans_batch.is_empty() { + total_deleted += self + .delete_orphan_batch(&writer, schema, table, primary_key_columns, &orphans_batch) + .await?; + } + + tracing::info!( + "Completed reconciliation for {}.{}: {} comparisons, {} orphans deleted", + schema, + table, + comparisons, + total_deleted + ); + + Ok(total_deleted) + } + + /// Delete a batch of orphan rows. + async fn delete_orphan_batch( + &self, + writer: &ChangeWriter<'_>, + schema: &str, + table: &str, + primary_key_columns: &[String], + orphans: &[Vec], + ) -> Result { + if orphans.is_empty() { + return Ok(0); + } + + tracing::debug!( + "Deleting batch of {} orphan rows from {}.{}", + orphans.len(), + schema, + table + ); + + // Convert string PKs to ToSql values + let pk_values: Vec>> = orphans + .iter() + .map(|pk| { + pk.iter() + .map(|v| Box::new(v.clone()) as Box) + .collect() + }) + .collect(); + + writer + .delete_rows(schema, table, primary_key_columns, pk_values) + .await + } +} + +/// Compare two primary key tuples lexicographically. +fn compare_pks(a: &[String], b: &[String]) -> Ordering { + for (av, bv) in a.iter().zip(b.iter()) { + match av.cmp(bv) { + Ordering::Equal => continue, + other => return other, + } + } + a.len().cmp(&b.len()) +} + +/// Batch reader for primary keys using keyset pagination. +/// +/// Fetches PKs in sorted order using WHERE pk > last_pk LIMIT batch_size, +/// which is more efficient than OFFSET for large tables. +struct PkBatchReader<'a> { + client: &'a Client, + schema: String, + table: String, + pk_columns: Vec, + batch_size: usize, + last_pk: Option>, + pub exhausted: bool, +} + +impl<'a> PkBatchReader<'a> { + fn new( + client: &'a Client, + schema: &str, + table: &str, + pk_columns: &[String], + batch_size: usize, + ) -> Self { + Self { + client, + schema: schema.to_string(), + table: table.to_string(), + pk_columns: pk_columns.to_vec(), + batch_size, + last_pk: None, + exhausted: false, + } + } + + /// Fetch the next batch of primary keys. + async fn fetch_next(&mut self) -> Result>> { + if self.exhausted { + return Ok(Vec::new()); + } + + let pk_cols_select: Vec = self + .pk_columns + .iter() + .map(|c| format!("\"{}\"::text", c)) + .collect(); + + let order_by: Vec = self + .pk_columns + .iter() + .map(|c| format!("\"{}\"", c)) + .collect(); + + let query = if self.last_pk.is_some() { + // Keyset pagination: WHERE (pk1, pk2, ...) > ($1, $2, ...) + let pk_tuple: Vec = self + .pk_columns + .iter() + .map(|c| format!("\"{}\"", c)) + .collect(); + + let params: Vec = (1..=self.pk_columns.len()) + .map(|i| format!("${}", i)) + .collect(); + + format!( + "SELECT {} FROM \"{}\".\"{}\" WHERE ({}) > ({}) ORDER BY {} LIMIT {}", + pk_cols_select.join(", "), + self.schema, + self.table, + pk_tuple.join(", "), + params.join(", "), + order_by.join(", "), + self.batch_size + ) + } else { + // First batch: no WHERE clause + format!( + "SELECT {} FROM \"{}\".\"{}\" ORDER BY {} LIMIT {}", + pk_cols_select.join(", "), + self.schema, + self.table, + order_by.join(", "), + self.batch_size + ) + }; + + // Build parameters for keyset pagination + let params: Vec<&(dyn ToSql + Sync)> = if let Some(ref last) = self.last_pk { + last.iter().map(|s| s as &(dyn ToSql + Sync)).collect() + } else { + Vec::new() + }; + + let rows = self.client.query(&query, ¶ms).await.with_context(|| { + format!( + "Failed to fetch PK batch from {}.{}", + self.schema, self.table + ) + })?; + + if rows.len() < self.batch_size { + self.exhausted = true; + } + + let pks: Vec> = rows + .iter() + .map(|row| { + (0..self.pk_columns.len()) + .map(|i| row.get::<_, String>(i)) + .collect() + }) + .collect(); + + // Update last_pk for next iteration + if let Some(last_row) = pks.last() { + self.last_pk = Some(last_row.clone()); + } + + Ok(pks) + } } /// Configuration for reconciliation behavior. From 6f36b9d67e393b54c3bbfa54b5c26ed71a6f66ec Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Wed, 10 Dec 2025 11:02:15 -0800 Subject: [PATCH 4/5] style: Format code with cargo fmt --- src/main.rs | 6 +++--- src/utils.rs | 22 ++++++++++++++++------ 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/main.rs b/src/main.rs index bf5a1b6..a6f165b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -701,9 +701,9 @@ async fn main() -> anyhow::Result<()> { sync_interval, // CLI: --sync-interval (default 60s) reconcile_interval, // CLI: --reconcile-interval (default 3600s) database_replicator::utils::calculate_optimal_batch_size(), // Auto-detect based on available memory - None, // State file: use default - once, // CLI: --once (run single cycle) - no_reconcile, // CLI: --no-reconcile (disable delete detection) + None, // State file: use default + once, // CLI: --once (run single cycle) + no_reconcile, // CLI: --no-reconcile (disable delete detection) ) .await } diff --git a/src/utils.rs b/src/utils.rs index 9e57952..7b84757 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1369,15 +1369,15 @@ pub fn get_available_memory() -> Result { fn get_available_memory_linux() -> Result { use std::fs; - let meminfo = fs::read_to_string("/proc/meminfo") - .context("Failed to read /proc/meminfo")?; + let meminfo = fs::read_to_string("/proc/meminfo").context("Failed to read /proc/meminfo")?; // Try MemAvailable first (more accurate, available since Linux 3.14) for line in meminfo.lines() { if line.starts_with("MemAvailable:") { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 { - let kb: u64 = parts[1].parse() + let kb: u64 = parts[1] + .parse() .context("Failed to parse MemAvailable value")?; return Ok(kb * 1024); // Convert KB to bytes } @@ -1417,7 +1417,9 @@ fn get_available_memory_macos() -> Result { .context("Failed to execute sysctl")?; let total_str = String::from_utf8_lossy(&output.stdout); - let total_bytes: u64 = total_str.trim().parse() + let total_bytes: u64 = total_str + .trim() + .parse() .context("Failed to parse hw.memsize")?; // Get page size and free pages using vm_stat @@ -1578,8 +1580,16 @@ mod tests { // Should succeed (may fail in very restricted environments) if let Ok(available) = result { // Sanity check: should be at least 10MB, less than 1TB - assert!(available > 10 * 1024 * 1024, "Available memory too low: {}", available); - assert!(available < 1024 * 1024 * 1024 * 1024, "Available memory too high: {}", available); + assert!( + available > 10 * 1024 * 1024, + "Available memory too low: {}", + available + ); + assert!( + available < 1024 * 1024 * 1024 * 1024, + "Available memory too high: {}", + available + ); } } From 7900b0de8d1206228ae6f52393d39989e930618b Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Wed, 10 Dec 2025 11:17:52 -0800 Subject: [PATCH 5/5] fix: Address critical review findings for batched sync/reconciliation This commit fixes critical correctness issues identified in PR #76 review: ## Critical Fix 1: xmin batching skipping rows with same xmin The batched xmin reader was using `WHERE xmin > $1` which skips rows when multiple rows share the same xmin (bulk inserts in single transaction). Fix: Use (xmin, ctid) as compound pagination key. ctid provides a stable tie-breaker for rows with identical xmin values. - Add `last_ctid` field to BatchReader - Use `WHERE (xmin, ctid) > ($1, $2::tid)` for subsequent batches - Include `ctid::text` in SELECT and ORDER BY ## Critical Fix 2: Reconciler PK ordering mismatch PKs were cast to ::text in SELECT but ORDER BY used native column types. For numeric PKs: "10" < "2" lexicographically but 10 > 2 numerically. This caused false orphan detection and data loss. Fix: Use ::text cast in both SELECT and ORDER BY to ensure SQL stream order matches Rust's lexicographic string comparison. - Change ORDER BY from `"col"` to `"col"::text` - Change WHERE from `"col" > $1` to `"col"::text > $1` ## Moderate Fix: macOS page size detection Apple Silicon uses 16KB pages, not 4KB. Hardcoded 4KB underestimated available memory by 4x, leading to unnecessarily small batch sizes. Fix: Use `sysctl hw.pagesize` to get actual page size. --- src/utils.rs | 17 ++++-- src/xmin/reader.rs | 114 +++++++++++++++++++++++++++++------------ src/xmin/reconciler.rs | 49 +++++++++--------- 3 files changed, 118 insertions(+), 62 deletions(-) diff --git a/src/utils.rs b/src/utils.rs index 7b84757..b5b6650 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1422,16 +1422,25 @@ fn get_available_memory_macos() -> Result { .parse() .context("Failed to parse hw.memsize")?; - // Get page size and free pages using vm_stat + // Get actual page size using sysctl hw.pagesize + // Intel Macs use 4KB (4096), Apple Silicon uses 16KB (16384) + let page_size: u64 = { + let page_output = Command::new("sysctl") + .args(["-n", "hw.pagesize"]) + .output() + .context("Failed to execute sysctl hw.pagesize")?; + + let page_str = String::from_utf8_lossy(&page_output.stdout); + page_str.trim().parse().unwrap_or(4096) // Default to 4KB if parsing fails + }; + + // Get free pages using vm_stat let vm_output = Command::new("vm_stat") .output() .context("Failed to execute vm_stat")?; let vm_stat = String::from_utf8_lossy(&vm_output.stdout); - // Parse page size (usually 4096 or 16384 on Apple Silicon) - let page_size: u64 = 4096; // Default, macOS uses 4KB or 16KB pages - // Parse free and inactive pages from vm_stat let mut pages_free: u64 = 0; let mut pages_inactive: u64 = 0; diff --git a/src/xmin/reader.rs b/src/xmin/reader.rs index 6df328f..336e28d 100644 --- a/src/xmin/reader.rs +++ b/src/xmin/reader.rs @@ -167,12 +167,17 @@ impl<'a> XminReader<'a> { table: table.to_string(), columns: columns.to_vec(), current_xmin: since_xmin, + last_ctid: None, batch_size, exhausted: false, }) } /// Execute a batched read query and return the next batch. + /// + /// Uses (xmin, ctid) as the pagination key to correctly handle cases where + /// many rows share the same xmin (e.g., bulk inserts in a single transaction). + /// Without ctid tie-breaking, rows with duplicate xmin values would be skipped. pub async fn fetch_batch( &self, batch_reader: &mut BatchReader, @@ -192,45 +197,81 @@ impl<'a> XminReader<'a> { .join(", ") }; - let query = format!( - "SELECT {}, xmin::text::bigint as _xmin FROM \"{}\".\"{}\" \ - WHERE xmin::text::bigint > $1 \ - ORDER BY xmin::text::bigint \ - LIMIT $2", - column_list, batch_reader.schema, batch_reader.table - ); - - let rows = self - .client - .query( - &query, - &[ - &(batch_reader.current_xmin as i64), - &(batch_reader.batch_size as i64), - ], - ) - .await - .with_context(|| { - format!( - "Failed to read batch from {}.{}", - batch_reader.schema, batch_reader.table + // Use (xmin, ctid) as compound pagination key to handle duplicate xmin values. + // ctid is the physical tuple location and provides a stable tie-breaker. + let (query, rows) = if let Some(ref last_ctid) = batch_reader.last_ctid { + // Subsequent batches: use compound (xmin, ctid) > ($1, $2) filter + let query = format!( + "SELECT {}, xmin::text::bigint as _xmin, ctid::text as _ctid \ + FROM \"{}\".\"{}\" \ + WHERE (xmin::text::bigint, ctid) > ($1, $2::tid) \ + ORDER BY xmin::text::bigint, ctid \ + LIMIT $3", + column_list, batch_reader.schema, batch_reader.table + ); + + let rows = self + .client + .query( + &query, + &[ + &(batch_reader.current_xmin as i64), + &last_ctid, + &(batch_reader.batch_size as i64), + ], + ) + .await + .with_context(|| { + format!( + "Failed to read batch from {}.{}", + batch_reader.schema, batch_reader.table + ) + })?; + (query, rows) + } else { + // First batch: simple xmin > $1 filter + let query = format!( + "SELECT {}, xmin::text::bigint as _xmin, ctid::text as _ctid \ + FROM \"{}\".\"{}\" \ + WHERE xmin::text::bigint > $1 \ + ORDER BY xmin::text::bigint, ctid \ + LIMIT $2", + column_list, batch_reader.schema, batch_reader.table + ); + + let rows = self + .client + .query( + &query, + &[ + &(batch_reader.current_xmin as i64), + &(batch_reader.batch_size as i64), + ], ) - })?; + .await + .with_context(|| { + format!( + "Failed to read batch from {}.{}", + batch_reader.schema, batch_reader.table + ) + })?; + (query, rows) + }; + + // Suppress unused variable warning - query is useful for debugging + let _ = query; if rows.is_empty() { batch_reader.exhausted = true; return Ok(None); } - // Update current_xmin to the max in this batch - let max_xmin = rows - .iter() - .map(|row| { - let xmin: i64 = row.get("_xmin"); - (xmin & 0xFFFFFFFF) as u32 - }) - .max() - .unwrap_or(batch_reader.current_xmin); + // Get xmin and ctid from the last row for next iteration's pagination + let last_row = rows.last().unwrap(); + let last_xmin: i64 = last_row.get("_xmin"); + let last_ctid: String = last_row.get("_ctid"); + + let max_xmin = (last_xmin & 0xFFFFFFFF) as u32; // Mark as exhausted if we got fewer rows than batch_size if rows.len() < batch_reader.batch_size { @@ -238,6 +279,7 @@ impl<'a> XminReader<'a> { } batch_reader.current_xmin = max_xmin; + batch_reader.last_ctid = Some(last_ctid); Ok(Some((rows, max_xmin))) } @@ -437,11 +479,17 @@ impl<'a> XminReader<'a> { } /// Batch reader state for iterating over large result sets. +/// +/// Uses (xmin, ctid) as the pagination key to handle cases where many rows +/// share the same xmin (e.g., bulk inserts in a single transaction). pub struct BatchReader { pub schema: String, pub table: String, pub columns: Vec, pub current_xmin: u32, + /// Last seen ctid for tie-breaking when multiple rows have same xmin. + /// Format: "(page,tuple)" e.g., "(0,1)" + pub last_ctid: Option, pub batch_size: usize, pub exhausted: bool, } @@ -466,6 +514,7 @@ mod tests { table: "users".to_string(), columns: vec!["id".to_string(), "name".to_string()], current_xmin: 0, + last_ctid: None, batch_size: 1000, exhausted: false, }; @@ -473,6 +522,7 @@ mod tests { assert_eq!(reader.schema, "public"); assert_eq!(reader.table, "users"); assert_eq!(reader.current_xmin, 0); + assert!(reader.last_ctid.is_none()); assert!(!reader.exhausted); } diff --git a/src/xmin/reconciler.rs b/src/xmin/reconciler.rs index 63d1f57..18c78ed 100644 --- a/src/xmin/reconciler.rs +++ b/src/xmin/reconciler.rs @@ -126,6 +126,9 @@ impl<'a> Reconciler<'a> { } /// Get all primary key values from a table. + /// + /// Note: Uses `::text` cast for both SELECT and ORDER BY to ensure consistent + /// lexicographic ordering that matches Rust string comparison. async fn get_all_primary_keys( &self, client: &Client, @@ -133,21 +136,18 @@ impl<'a> Reconciler<'a> { table: &str, primary_key_columns: &[String], ) -> Result>> { - let pk_cols: Vec = primary_key_columns + // Use ::text cast for both SELECT and ORDER BY to match Rust comparison + let pk_cols_text: Vec = primary_key_columns .iter() .map(|c| format!("\"{}\"::text", c)) .collect(); let query = format!( "SELECT {} FROM \"{}\".\"{}\" ORDER BY {}", - pk_cols.join(", "), + pk_cols_text.join(", "), schema, table, - primary_key_columns - .iter() - .map(|c| format!("\"{}\"", c)) - .collect::>() - .join(", ") + pk_cols_text.join(", ") ); let rows = client @@ -466,53 +466,50 @@ impl<'a> PkBatchReader<'a> { } /// Fetch the next batch of primary keys. + /// + /// IMPORTANT: Both SELECT and ORDER BY use `::text` cast to ensure the SQL + /// stream order matches the lexicographic comparison used in Rust. Without + /// this, numeric PKs would be ordered numerically in SQL (1, 2, 10) but + /// compared lexicographically in Rust ("1" < "10" < "2"), causing false + /// orphan detection and data loss. async fn fetch_next(&mut self) -> Result>> { if self.exhausted { return Ok(Vec::new()); } - let pk_cols_select: Vec = self + // Cast PKs to text for both SELECT and ORDER BY to ensure SQL stream + // order matches Rust's lexicographic string comparison + let pk_cols_text: Vec = self .pk_columns .iter() .map(|c| format!("\"{}\"::text", c)) .collect(); - let order_by: Vec = self - .pk_columns - .iter() - .map(|c| format!("\"{}\"", c)) - .collect(); - let query = if self.last_pk.is_some() { - // Keyset pagination: WHERE (pk1, pk2, ...) > ($1, $2, ...) - let pk_tuple: Vec = self - .pk_columns - .iter() - .map(|c| format!("\"{}\"", c)) - .collect(); - + // Keyset pagination: WHERE (pk1::text, pk2::text, ...) > ($1, $2, ...) + // Must use text-cast columns in WHERE to match ORDER BY ordering let params: Vec = (1..=self.pk_columns.len()) .map(|i| format!("${}", i)) .collect(); format!( "SELECT {} FROM \"{}\".\"{}\" WHERE ({}) > ({}) ORDER BY {} LIMIT {}", - pk_cols_select.join(", "), + pk_cols_text.join(", "), self.schema, self.table, - pk_tuple.join(", "), + pk_cols_text.join(", "), params.join(", "), - order_by.join(", "), + pk_cols_text.join(", "), self.batch_size ) } else { // First batch: no WHERE clause format!( "SELECT {} FROM \"{}\".\"{}\" ORDER BY {} LIMIT {}", - pk_cols_select.join(", "), + pk_cols_text.join(", "), self.schema, self.table, - order_by.join(", "), + pk_cols_text.join(", "), self.batch_size ) };