Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,10 +700,10 @@ async fn main() -> anyhow::Result<()> {
tables_to_sync, // Tables from filter
sync_interval, // CLI: --sync-interval (default 60s)
reconcile_interval, // CLI: --reconcile-interval (default 3600s)
1000, // Batch size
None, // State file: use default
once, // CLI: --once (run single cycle)
no_reconcile, // CLI: --no-reconcile (disable delete detection)
database_replicator::utils::calculate_optimal_batch_size(), // Auto-detect based on available memory
None, // State file: use default
once, // CLI: --once (run single cycle)
no_reconcile, // CLI: --no-reconcile (disable delete detection)
)
.await
}
Expand Down
289 changes: 289 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1318,10 +1318,299 @@ pub fn parse_pg_version_string(version_str: &str) -> Result<u32> {
bail!("Could not parse PostgreSQL version from: {}", version_str)
}

/// Get available system memory in bytes
///
/// Cross-platform function that works on Linux, macOS, and Windows.
/// Returns the amount of memory available for use by applications.
///
/// # Platform Details
///
/// - **Linux**: Reads `MemAvailable` from `/proc/meminfo`
/// - **macOS**: Uses `sysctl hw.memsize` for total memory, estimates available
/// - **Windows**: Uses `GlobalMemoryStatusEx` API
///
/// # Returns
///
/// Available memory in bytes, or an error if detection fails.
///
/// # Examples
///
/// ```no_run
/// use database_replicator::utils::get_available_memory;
///
/// let available = get_available_memory().unwrap();
/// println!("Available memory: {} MB", available / 1024 / 1024);
/// ```
pub fn get_available_memory() -> Result<u64> {
#[cfg(target_os = "linux")]
{
get_available_memory_linux()
}

#[cfg(target_os = "macos")]
{
get_available_memory_macos()
}

#[cfg(target_os = "windows")]
{
get_available_memory_windows()
}

#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
{
// Fallback: assume 1GB available for unknown platforms
tracing::warn!("Memory detection not supported on this platform, assuming 1GB available");
Ok(1024 * 1024 * 1024)
}
}

#[cfg(target_os = "linux")]
fn get_available_memory_linux() -> Result<u64> {
use std::fs;

let meminfo = fs::read_to_string("/proc/meminfo").context("Failed to read /proc/meminfo")?;

// Try MemAvailable first (more accurate, available since Linux 3.14)
for line in meminfo.lines() {
if line.starts_with("MemAvailable:") {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
let kb: u64 = parts[1]
.parse()
.context("Failed to parse MemAvailable value")?;
return Ok(kb * 1024); // Convert KB to bytes
}
}
}

// Fallback: MemFree + Buffers + Cached (less accurate but works on older kernels)
let mut mem_free: u64 = 0;
let mut buffers: u64 = 0;
let mut cached: u64 = 0;

for line in meminfo.lines() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
let value: u64 = parts[1].parse().unwrap_or(0);
if line.starts_with("MemFree:") {
mem_free = value;
} else if line.starts_with("Buffers:") {
buffers = value;
} else if line.starts_with("Cached:") && !line.starts_with("SwapCached:") {
cached = value;
}
}
}

Ok((mem_free + buffers + cached) * 1024) // Convert KB to bytes
}

#[cfg(target_os = "macos")]
fn get_available_memory_macos() -> Result<u64> {
use std::process::Command;

// Get total physical memory using sysctl
let output = Command::new("sysctl")
.args(["-n", "hw.memsize"])
.output()
.context("Failed to execute sysctl")?;

let total_str = String::from_utf8_lossy(&output.stdout);
let total_bytes: u64 = total_str
.trim()
.parse()
.context("Failed to parse hw.memsize")?;

// Get actual page size using sysctl hw.pagesize
// Intel Macs use 4KB (4096), Apple Silicon uses 16KB (16384)
let page_size: u64 = {
let page_output = Command::new("sysctl")
.args(["-n", "hw.pagesize"])
.output()
.context("Failed to execute sysctl hw.pagesize")?;

let page_str = String::from_utf8_lossy(&page_output.stdout);
page_str.trim().parse().unwrap_or(4096) // Default to 4KB if parsing fails
};

// Get free pages using vm_stat
let vm_output = Command::new("vm_stat")
.output()
.context("Failed to execute vm_stat")?;

let vm_stat = String::from_utf8_lossy(&vm_output.stdout);

// Parse free and inactive pages from vm_stat
let mut pages_free: u64 = 0;
let mut pages_inactive: u64 = 0;
let mut pages_purgeable: u64 = 0;

for line in vm_stat.lines() {
if line.starts_with("Pages free:") {
pages_free = parse_vm_stat_value(line);
} else if line.starts_with("Pages inactive:") {
pages_inactive = parse_vm_stat_value(line);
} else if line.starts_with("Pages purgeable:") {
pages_purgeable = parse_vm_stat_value(line);
}
}

// Available = free + inactive + purgeable (conservative estimate)
let available = (pages_free + pages_inactive + pages_purgeable) * page_size;

// If vm_stat parsing failed, estimate 50% of total as available
if available == 0 {
tracing::debug!("vm_stat parsing returned 0, estimating 50% of total memory as available");
return Ok(total_bytes / 2);
}

Ok(available)
}

#[cfg(target_os = "macos")]
fn parse_vm_stat_value(line: &str) -> u64 {
// Format: "Pages free: 12345."
line.split(':')
.nth(1)
.and_then(|s| s.trim().trim_end_matches('.').parse().ok())
.unwrap_or(0)
}

#[cfg(target_os = "windows")]
fn get_available_memory_windows() -> Result<u64> {
use std::mem;

// MEMORYSTATUSEX structure
#[repr(C)]
#[allow(non_snake_case)]
struct MEMORYSTATUSEX {
dwLength: u32,
dwMemoryLoad: u32,
ullTotalPhys: u64,
ullAvailPhys: u64,
ullTotalPageFile: u64,
ullAvailPageFile: u64,
ullTotalVirtual: u64,
ullAvailVirtual: u64,
ullAvailExtendedVirtual: u64,
}

#[link(name = "kernel32")]
extern "system" {
fn GlobalMemoryStatusEx(lpBuffer: *mut MEMORYSTATUSEX) -> i32;
}

let mut mem_status: MEMORYSTATUSEX = unsafe { mem::zeroed() };
mem_status.dwLength = mem::size_of::<MEMORYSTATUSEX>() as u32;

let result = unsafe { GlobalMemoryStatusEx(&mut mem_status) };

if result == 0 {
bail!("GlobalMemoryStatusEx failed");
}

Ok(mem_status.ullAvailPhys)
}

/// Calculate optimal batch size based on available system memory
///
/// Automatically determines an appropriate batch size for the sync daemon
/// based on the amount of available system memory. This prevents OOM errors
/// on memory-constrained instances while maximizing throughput on larger ones.
///
/// # Memory Model
///
/// The calculation assumes:
/// - Each row consumes approximately 2KB in memory (conservative estimate for wide tables)
/// - We should use at most 25% of available memory for batch processing
/// - Minimum batch size: 1,000 rows (for very constrained systems)
/// - Maximum batch size: 50,000 rows (diminishing returns beyond this)
///
/// # Returns
///
/// Optimal batch size in number of rows, or default of 10,000 if detection fails.
///
/// # Examples
///
/// ```no_run
/// use database_replicator::utils::calculate_optimal_batch_size;
///
/// let batch_size = calculate_optimal_batch_size();
/// println!("Using batch size: {}", batch_size);
/// // On t3.nano (512MB): ~1,000-2,000
/// // On t3.small (2GB): ~10,000
/// // On t3.large (8GB): ~50,000 (capped)
/// ```
pub fn calculate_optimal_batch_size() -> usize {
const BYTES_PER_ROW: u64 = 2048; // Conservative: 2KB per row
const MEMORY_FRACTION: f64 = 0.25; // Use at most 25% of available memory
const MIN_BATCH_SIZE: usize = 1_000;
const MAX_BATCH_SIZE: usize = 50_000;
const DEFAULT_BATCH_SIZE: usize = 10_000;

match get_available_memory() {
Ok(available_bytes) => {
// Calculate how many rows we can fit in 25% of available memory
let usable_bytes = (available_bytes as f64 * MEMORY_FRACTION) as u64;
let calculated_size = (usable_bytes / BYTES_PER_ROW) as usize;

// Clamp to min/max range
let batch_size = calculated_size.clamp(MIN_BATCH_SIZE, MAX_BATCH_SIZE);

tracing::info!(
"Auto-detected batch size: {} (available memory: {} MB)",
batch_size,
available_bytes / 1024 / 1024
);

batch_size
}
Err(e) => {
tracing::warn!(
"Failed to detect available memory: {}. Using default batch size: {}",
e,
DEFAULT_BATCH_SIZE
);
DEFAULT_BATCH_SIZE
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_get_available_memory() {
// This should work on all supported platforms
let result = get_available_memory();

// Should succeed (may fail in very restricted environments)
if let Ok(available) = result {
// Sanity check: should be at least 10MB, less than 1TB
assert!(
available > 10 * 1024 * 1024,
"Available memory too low: {}",
available
);
assert!(
available < 1024 * 1024 * 1024 * 1024,
"Available memory too high: {}",
available
);
}
}

#[test]
fn test_calculate_optimal_batch_size() {
let batch_size = calculate_optimal_batch_size();

// Should be within expected range
assert!(batch_size >= 1_000, "Batch size too small: {}", batch_size);
assert!(batch_size <= 50_000, "Batch size too large: {}", batch_size);
}

#[test]
fn test_validate_connection_string_valid() {
assert!(validate_connection_string("postgresql://user:pass@localhost:5432/dbname").is_ok());
Expand Down
Loading
Loading