Skip to content

Commit e4f63d1

Browse files
authored
fix: Memory-efficient sync and reconciliation for large tables (#76)
* fix: Use batched processing in sync daemon to prevent OOM and timeouts The sync_table method was loading entire tables into memory before processing, causing: - 10GB+ memory usage for tables with millions of rows - Connection timeouts when queries exceeded ELB idle timeouts - Failed syncs with "connection closed" errors Changes: - Use existing batched reader (read_changes_batched + fetch_batch) instead of loading all rows at once - Process and write each batch immediately (memory = O(batch_size)) - Update sync state after each batch for resume capability - Add progress logging every 10 batches - Increase default batch_size from 1000 to 10000 for better throughput - Check for xmin wraparound at start rather than during read This reduces memory from O(total_rows) to O(batch_size), enabling sync of tables with millions of rows without OOM or timeouts. Closes #74 * feat: Auto-detect optimal batch size based on available memory Add cross-platform memory detection and automatic batch size calculation to prevent OOM on small instances while maximizing throughput on larger ones. New functions in utils.rs: - get_available_memory(): Cross-platform (Linux, macOS, Windows) - Linux: Reads MemAvailable from /proc/meminfo - macOS: Uses sysctl + vm_stat for free/inactive pages - Windows: Uses GlobalMemoryStatusEx Win32 API - calculate_optimal_batch_size(): Auto-calculates based on memory - Uses 25% of available memory as working budget - Assumes 2KB per row (conservative estimate) - Clamps between 1,000 and 50,000 rows Expected batch sizes by instance type: - t3.nano (512MB): ~1,000 rows - t3.small (2GB): ~10,000 rows - t3.large (8GB+): 50,000 rows (capped) Refs #74 * fix: Use batched reconciliation to prevent OOM on large tables The reconciler was loading ALL primary keys from both source and target tables into memory before comparing them. For tables with millions of rows (e.g., 14M rows), this caused: - 2-3 GB memory usage just for PKs - Potential OOM on memory-constrained instances - Connection timeouts during long-running PK fetch queries Changes: - Add reconcile_table_batched() using merge-join comparison - Implement PkBatchReader with keyset pagination (WHERE pk > last_pk) - Fetch PKs in sorted batches from both databases - Compare using single-pass merge-join (both streams sorted) - Delete orphans in batches as they're discovered - Add progress logging every 100K comparisons This reduces memory from O(total_rows) to O(batch_size), enabling reconciliation of tables with millions of rows without OOM. Closes #75 * style: Format code with cargo fmt * fix: Address critical review findings for batched sync/reconciliation This commit fixes critical correctness issues identified in PR #76 review: ## Critical Fix 1: xmin batching skipping rows with same xmin The batched xmin reader was using `WHERE xmin > $1` which skips rows when multiple rows share the same xmin (bulk inserts in single transaction). Fix: Use (xmin, ctid) as compound pagination key. ctid provides a stable tie-breaker for rows with identical xmin values. - Add `last_ctid` field to BatchReader - Use `WHERE (xmin, ctid) > ($1, $2::tid)` for subsequent batches - Include `ctid::text` in SELECT and ORDER BY ## Critical Fix 2: Reconciler PK ordering mismatch PKs were cast to ::text in SELECT but ORDER BY used native column types. For numeric PKs: "10" < "2" lexicographically but 10 > 2 numerically. This caused false orphan detection and data loss. Fix: Use ::text cast in both SELECT and ORDER BY to ensure SQL stream order matches Rust's lexicographic string comparison. - Change ORDER BY from `"col"` to `"col"::text` - Change WHERE from `"col" > $1` to `"col"::text > $1` ## Moderate Fix: macOS page size detection Apple Silicon uses 16KB pages, not 4KB. Hardcoded 4KB underestimated available memory by 4x, leading to unnecessarily small batch sizes. Fix: Use `sysctl hw.pagesize` to get actual page size.
1 parent 7676a7a commit e4f63d1

5 files changed

Lines changed: 829 additions & 80 deletions

File tree

src/main.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -700,10 +700,10 @@ async fn main() -> anyhow::Result<()> {
700700
tables_to_sync, // Tables from filter
701701
sync_interval, // CLI: --sync-interval (default 60s)
702702
reconcile_interval, // CLI: --reconcile-interval (default 3600s)
703-
1000, // Batch size
704-
None, // State file: use default
705-
once, // CLI: --once (run single cycle)
706-
no_reconcile, // CLI: --no-reconcile (disable delete detection)
703+
database_replicator::utils::calculate_optimal_batch_size(), // Auto-detect based on available memory
704+
None, // State file: use default
705+
once, // CLI: --once (run single cycle)
706+
no_reconcile, // CLI: --no-reconcile (disable delete detection)
707707
)
708708
.await
709709
}

src/utils.rs

Lines changed: 289 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1318,10 +1318,299 @@ pub fn parse_pg_version_string(version_str: &str) -> Result<u32> {
13181318
bail!("Could not parse PostgreSQL version from: {}", version_str)
13191319
}
13201320

1321+
/// Get available system memory in bytes
1322+
///
1323+
/// Cross-platform function that works on Linux, macOS, and Windows.
1324+
/// Returns the amount of memory available for use by applications.
1325+
///
1326+
/// # Platform Details
1327+
///
1328+
/// - **Linux**: Reads `MemAvailable` from `/proc/meminfo`
1329+
/// - **macOS**: Uses `sysctl hw.memsize` for total memory, estimates available
1330+
/// - **Windows**: Uses `GlobalMemoryStatusEx` API
1331+
///
1332+
/// # Returns
1333+
///
1334+
/// Available memory in bytes, or an error if detection fails.
1335+
///
1336+
/// # Examples
1337+
///
1338+
/// ```no_run
1339+
/// use database_replicator::utils::get_available_memory;
1340+
///
1341+
/// let available = get_available_memory().unwrap();
1342+
/// println!("Available memory: {} MB", available / 1024 / 1024);
1343+
/// ```
1344+
pub fn get_available_memory() -> Result<u64> {
1345+
#[cfg(target_os = "linux")]
1346+
{
1347+
get_available_memory_linux()
1348+
}
1349+
1350+
#[cfg(target_os = "macos")]
1351+
{
1352+
get_available_memory_macos()
1353+
}
1354+
1355+
#[cfg(target_os = "windows")]
1356+
{
1357+
get_available_memory_windows()
1358+
}
1359+
1360+
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
1361+
{
1362+
// Fallback: assume 1GB available for unknown platforms
1363+
tracing::warn!("Memory detection not supported on this platform, assuming 1GB available");
1364+
Ok(1024 * 1024 * 1024)
1365+
}
1366+
}
1367+
1368+
#[cfg(target_os = "linux")]
1369+
fn get_available_memory_linux() -> Result<u64> {
1370+
use std::fs;
1371+
1372+
let meminfo = fs::read_to_string("/proc/meminfo").context("Failed to read /proc/meminfo")?;
1373+
1374+
// Try MemAvailable first (more accurate, available since Linux 3.14)
1375+
for line in meminfo.lines() {
1376+
if line.starts_with("MemAvailable:") {
1377+
let parts: Vec<&str> = line.split_whitespace().collect();
1378+
if parts.len() >= 2 {
1379+
let kb: u64 = parts[1]
1380+
.parse()
1381+
.context("Failed to parse MemAvailable value")?;
1382+
return Ok(kb * 1024); // Convert KB to bytes
1383+
}
1384+
}
1385+
}
1386+
1387+
// Fallback: MemFree + Buffers + Cached (less accurate but works on older kernels)
1388+
let mut mem_free: u64 = 0;
1389+
let mut buffers: u64 = 0;
1390+
let mut cached: u64 = 0;
1391+
1392+
for line in meminfo.lines() {
1393+
let parts: Vec<&str> = line.split_whitespace().collect();
1394+
if parts.len() >= 2 {
1395+
let value: u64 = parts[1].parse().unwrap_or(0);
1396+
if line.starts_with("MemFree:") {
1397+
mem_free = value;
1398+
} else if line.starts_with("Buffers:") {
1399+
buffers = value;
1400+
} else if line.starts_with("Cached:") && !line.starts_with("SwapCached:") {
1401+
cached = value;
1402+
}
1403+
}
1404+
}
1405+
1406+
Ok((mem_free + buffers + cached) * 1024) // Convert KB to bytes
1407+
}
1408+
1409+
#[cfg(target_os = "macos")]
1410+
fn get_available_memory_macos() -> Result<u64> {
1411+
use std::process::Command;
1412+
1413+
// Get total physical memory using sysctl
1414+
let output = Command::new("sysctl")
1415+
.args(["-n", "hw.memsize"])
1416+
.output()
1417+
.context("Failed to execute sysctl")?;
1418+
1419+
let total_str = String::from_utf8_lossy(&output.stdout);
1420+
let total_bytes: u64 = total_str
1421+
.trim()
1422+
.parse()
1423+
.context("Failed to parse hw.memsize")?;
1424+
1425+
// Get actual page size using sysctl hw.pagesize
1426+
// Intel Macs use 4KB (4096), Apple Silicon uses 16KB (16384)
1427+
let page_size: u64 = {
1428+
let page_output = Command::new("sysctl")
1429+
.args(["-n", "hw.pagesize"])
1430+
.output()
1431+
.context("Failed to execute sysctl hw.pagesize")?;
1432+
1433+
let page_str = String::from_utf8_lossy(&page_output.stdout);
1434+
page_str.trim().parse().unwrap_or(4096) // Default to 4KB if parsing fails
1435+
};
1436+
1437+
// Get free pages using vm_stat
1438+
let vm_output = Command::new("vm_stat")
1439+
.output()
1440+
.context("Failed to execute vm_stat")?;
1441+
1442+
let vm_stat = String::from_utf8_lossy(&vm_output.stdout);
1443+
1444+
// Parse free and inactive pages from vm_stat
1445+
let mut pages_free: u64 = 0;
1446+
let mut pages_inactive: u64 = 0;
1447+
let mut pages_purgeable: u64 = 0;
1448+
1449+
for line in vm_stat.lines() {
1450+
if line.starts_with("Pages free:") {
1451+
pages_free = parse_vm_stat_value(line);
1452+
} else if line.starts_with("Pages inactive:") {
1453+
pages_inactive = parse_vm_stat_value(line);
1454+
} else if line.starts_with("Pages purgeable:") {
1455+
pages_purgeable = parse_vm_stat_value(line);
1456+
}
1457+
}
1458+
1459+
// Available = free + inactive + purgeable (conservative estimate)
1460+
let available = (pages_free + pages_inactive + pages_purgeable) * page_size;
1461+
1462+
// If vm_stat parsing failed, estimate 50% of total as available
1463+
if available == 0 {
1464+
tracing::debug!("vm_stat parsing returned 0, estimating 50% of total memory as available");
1465+
return Ok(total_bytes / 2);
1466+
}
1467+
1468+
Ok(available)
1469+
}
1470+
1471+
#[cfg(target_os = "macos")]
1472+
fn parse_vm_stat_value(line: &str) -> u64 {
1473+
// Format: "Pages free: 12345."
1474+
line.split(':')
1475+
.nth(1)
1476+
.and_then(|s| s.trim().trim_end_matches('.').parse().ok())
1477+
.unwrap_or(0)
1478+
}
1479+
1480+
#[cfg(target_os = "windows")]
1481+
fn get_available_memory_windows() -> Result<u64> {
1482+
use std::mem;
1483+
1484+
// MEMORYSTATUSEX structure
1485+
#[repr(C)]
1486+
#[allow(non_snake_case)]
1487+
struct MEMORYSTATUSEX {
1488+
dwLength: u32,
1489+
dwMemoryLoad: u32,
1490+
ullTotalPhys: u64,
1491+
ullAvailPhys: u64,
1492+
ullTotalPageFile: u64,
1493+
ullAvailPageFile: u64,
1494+
ullTotalVirtual: u64,
1495+
ullAvailVirtual: u64,
1496+
ullAvailExtendedVirtual: u64,
1497+
}
1498+
1499+
#[link(name = "kernel32")]
1500+
extern "system" {
1501+
fn GlobalMemoryStatusEx(lpBuffer: *mut MEMORYSTATUSEX) -> i32;
1502+
}
1503+
1504+
let mut mem_status: MEMORYSTATUSEX = unsafe { mem::zeroed() };
1505+
mem_status.dwLength = mem::size_of::<MEMORYSTATUSEX>() as u32;
1506+
1507+
let result = unsafe { GlobalMemoryStatusEx(&mut mem_status) };
1508+
1509+
if result == 0 {
1510+
bail!("GlobalMemoryStatusEx failed");
1511+
}
1512+
1513+
Ok(mem_status.ullAvailPhys)
1514+
}
1515+
1516+
/// Calculate optimal batch size based on available system memory
1517+
///
1518+
/// Automatically determines an appropriate batch size for the sync daemon
1519+
/// based on the amount of available system memory. This prevents OOM errors
1520+
/// on memory-constrained instances while maximizing throughput on larger ones.
1521+
///
1522+
/// # Memory Model
1523+
///
1524+
/// The calculation assumes:
1525+
/// - Each row consumes approximately 2KB in memory (conservative estimate for wide tables)
1526+
/// - We should use at most 25% of available memory for batch processing
1527+
/// - Minimum batch size: 1,000 rows (for very constrained systems)
1528+
/// - Maximum batch size: 50,000 rows (diminishing returns beyond this)
1529+
///
1530+
/// # Returns
1531+
///
1532+
/// Optimal batch size in number of rows, or default of 10,000 if detection fails.
1533+
///
1534+
/// # Examples
1535+
///
1536+
/// ```no_run
1537+
/// use database_replicator::utils::calculate_optimal_batch_size;
1538+
///
1539+
/// let batch_size = calculate_optimal_batch_size();
1540+
/// println!("Using batch size: {}", batch_size);
1541+
/// // On t3.nano (512MB): ~1,000-2,000
1542+
/// // On t3.small (2GB): ~10,000
1543+
/// // On t3.large (8GB): ~50,000 (capped)
1544+
/// ```
1545+
pub fn calculate_optimal_batch_size() -> usize {
1546+
const BYTES_PER_ROW: u64 = 2048; // Conservative: 2KB per row
1547+
const MEMORY_FRACTION: f64 = 0.25; // Use at most 25% of available memory
1548+
const MIN_BATCH_SIZE: usize = 1_000;
1549+
const MAX_BATCH_SIZE: usize = 50_000;
1550+
const DEFAULT_BATCH_SIZE: usize = 10_000;
1551+
1552+
match get_available_memory() {
1553+
Ok(available_bytes) => {
1554+
// Calculate how many rows we can fit in 25% of available memory
1555+
let usable_bytes = (available_bytes as f64 * MEMORY_FRACTION) as u64;
1556+
let calculated_size = (usable_bytes / BYTES_PER_ROW) as usize;
1557+
1558+
// Clamp to min/max range
1559+
let batch_size = calculated_size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE);
1560+
1561+
tracing::info!(
1562+
"Auto-detected batch size: {} (available memory: {} MB)",
1563+
batch_size,
1564+
available_bytes / 1024 / 1024
1565+
);
1566+
1567+
batch_size
1568+
}
1569+
Err(e) => {
1570+
tracing::warn!(
1571+
"Failed to detect available memory: {}. Using default batch size: {}",
1572+
e,
1573+
DEFAULT_BATCH_SIZE
1574+
);
1575+
DEFAULT_BATCH_SIZE
1576+
}
1577+
}
1578+
}
1579+
13211580
#[cfg(test)]
13221581
mod tests {
13231582
use super::*;
13241583

1584+
#[test]
1585+
fn test_get_available_memory() {
1586+
// This should work on all supported platforms
1587+
let result = get_available_memory();
1588+
1589+
// Should succeed (may fail in very restricted environments)
1590+
if let Ok(available) = result {
1591+
// Sanity check: should be at least 10MB, less than 1TB
1592+
assert!(
1593+
available > 10 * 1024 * 1024,
1594+
"Available memory too low: {}",
1595+
available
1596+
);
1597+
assert!(
1598+
available < 1024 * 1024 * 1024 * 1024,
1599+
"Available memory too high: {}",
1600+
available
1601+
);
1602+
}
1603+
}
1604+
1605+
#[test]
1606+
fn test_calculate_optimal_batch_size() {
1607+
let batch_size = calculate_optimal_batch_size();
1608+
1609+
// Should be within expected range
1610+
assert!(batch_size >= 1_000, "Batch size too small: {}", batch_size);
1611+
assert!(batch_size <= 50_000, "Batch size too large: {}", batch_size);
1612+
}
1613+
13251614
#[test]
13261615
fn test_validate_connection_string_valid() {
13271616
assert!(validate_connection_string("postgresql://user:pass@localhost:5432/dbname").is_ok());

0 commit comments

Comments
 (0)