Skip to content

sync command loads entire table into memory causing OOM and connection timeouts #74

@taariq

Description

@taariq

Problem

The sync command's xmin-based daemon loads entire tables into memory before processing, causing:

  1. Massive memory usage - 10GB+ VSZ for tables with millions of rows
  2. Connection timeouts - Long-running queries exceed ELB/proxy idle timeouts (typically 60s-10min)
  3. Failed syncs - All tables fail with "connection closed" errors

Evidence

Production logs showing the issue:

2025-12-10T17:43:46Z  INFO Found 14446803 changed rows in public.serendb_coingeckoprice (xmin 0 -> 2471865127)
2025-12-10T18:01:37Z ERROR Failed to sync public.serendb_coingeckoprice: Failed to upsert batch
    Caused by: connection closed
  • 14.4 million rows attempted to load into memory
  • ~18 minutes before connection timeout
  • Process showed 10GB VSZ (virtual memory)

Root Cause

In src/xmin/daemon.rs:350-386, sync_table loads all rows at once:

// PROBLEM: Loads ALL rows into memory
let (rows, max_xmin, was_full_sync) = reader
    .read_changes_with_wraparound_check(schema, table, &column_names, since_xmin)
    .await?;

// PROBLEM: Creates another full copy in memory
let values: Vec<Vec<Box<dyn ToSql>>> = rows
    .iter()
    .map(|row| row_to_values(row, &columns))
    .collect();

Both read_changes() and read_all_rows() in src/xmin/reader.rs use .query() which returns all results at once.

Ironically, a batched reader already exists but isn't used: read_changes_batched() and fetch_batch() in reader.rs (lines 157-243).

Solution

Modify sync_table to use streaming/batched processing:

1. Use existing batched reader

async fn sync_table(&self, ...) -> Result<u64> {
    // ... setup code ...
    
    let batch_size = self.config.batch_size; // Default 10,000
    let mut batch_reader = reader
        .read_changes_batched(schema, table, &column_names, since_xmin, batch_size)
        .await?;
    
    let mut total_rows = 0u64;
    let mut max_xmin = since_xmin;
    
    while let Some((rows, batch_max_xmin)) = reader.fetch_batch(&mut batch_reader).await? {
        if rows.is_empty() {
            break;
        }
        
        tracing::debug!(
            "Processing batch of {} rows for {}.{} (xmin {} -> {})",
            rows.len(), schema, table, max_xmin, batch_max_xmin
        );
        
        // Convert and apply batch immediately (memory = O(batch_size))
        let values: Vec<Vec<Box<dyn ToSql + Sync + Send>>> = rows
            .iter()
            .map(|row| row_to_values(row, &columns))
            .collect();
        
        let affected = writer
            .apply_batch(schema, table, &pk_columns, &column_names, values)
            .await?;
        
        total_rows += affected;
        max_xmin = batch_max_xmin;
        
        // Update state after each batch for resume capability
        state.update_table(schema, table, max_xmin, affected);
    }
    
    Ok(total_rows)
}

2. Handle wraparound detection

Add wraparound check before starting batched read:

// Check for wraparound at start
let current_xmin = reader.get_current_xmin().await?;
let since_xmin = if detect_wraparound(table_state.last_xmin, current_xmin) == WraparoundCheck::WraparoundDetected {
    tracing::warn!("xmin wraparound detected for {}.{} - performing full table sync", schema, table);
    0 // Start from beginning
} else {
    table_state.last_xmin
};

3. Add progress logging

For large tables, log progress periodically:

if total_rows > 0 && total_rows % 100_000 == 0 {
    tracing::info!(
        "Synced {} rows so far for {}.{}", 
        total_rows, schema, table
    );
}

Benefits

Metric Before After
Memory O(total_rows) ~10GB O(batch_size) ~10MB
Connection idle time Minutes (query duration) Seconds (batch query)
Resume capability None (all or nothing) Per-batch (state saved)
Progress visibility None until complete Logged per batch

Testing

  1. Unit test: Mock batched reader, verify batches processed correctly
  2. Integration test: Sync table with >100K rows, verify memory stays bounded
  3. Manual test: Sync the production serendb_coingeckoprice table (14.4M rows)

Related

  • Connection timeout troubleshooting in CLAUDE.md mentions ELB idle timeouts
  • Batch size is already configurable via DaemonConfig::batch_size (default 1000, should increase to 10000)

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions