Skip to content

Reconciler loads all PKs into memory causing OOM on large tables #75

@taariq

Description

@taariq

Problem

The reconciler loads ALL primary keys from both source and target tables into memory before comparing them. For tables with millions of rows, this causes:

  1. Massive memory usage - 2-3 GB for a 14M row table (just for PKs)
  2. Potential OOM - Especially on memory-constrained instances
  3. Connection timeouts - Long-running queries to fetch all PKs

Evidence

From src/xmin/reconciler.rs:44-67:

// Loads ALL PKs from source into Vec
let source_pks = self.get_all_primary_keys(...).await?;

// Loads ALL PKs from target into Vec  
let target_pks = self.get_all_primary_keys(...).await?;

// Creates ANOTHER copy as HashSet
let source_set: HashSet<Vec<String>> = source_pks.into_iter().collect();

Memory Analysis (14M rows, UUID primary key)

Data Structure Size
source_pks (Vec) ~700 MB
target_pks (Vec) ~700 MB
source_set (HashSet) ~1 GB
Total ~2.4 GB

Solution

Replace the load-everything approach with a streaming merge-join comparison:

Algorithm

  1. Query both databases with ORDER BY pk and use cursors
  2. Fetch PKs in batches from both sides
  3. Compare using merge-join (both sorted, single pass)
  4. Accumulate orphans and delete in batches

Implementation

pub async fn find_orphaned_rows_batched(
    &self,
    schema: &str,
    table: &str,
    primary_key_columns: &[String],
    batch_size: usize,
) -> Result<u64> {
    // Use cursors for streaming comparison
    let pk_cols = build_pk_select(primary_key_columns);
    let order_by = build_order_by(primary_key_columns);
    
    // Declare cursors on both databases
    self.source_client.execute(
        &format!("DECLARE source_cursor CURSOR FOR SELECT {} FROM {}.{} ORDER BY {}",
            pk_cols, schema, table, order_by),
        &[]
    ).await?;
    
    self.target_client.execute(
        &format!("DECLARE target_cursor CURSOR FOR SELECT {} FROM {}.{} ORDER BY {}",
            pk_cols, schema, table, order_by),
        &[]
    ).await?;
    
    let mut orphans_to_delete = Vec::new();
    let mut source_batch = fetch_cursor_batch(&self.source_client, batch_size).await?;
    let mut target_batch = fetch_cursor_batch(&self.target_client, batch_size).await?;
    let mut source_idx = 0;
    let mut target_idx = 0;
    
    // Merge-join comparison
    loop {
        // Refill batches as needed
        if source_idx >= source_batch.len() {
            source_batch = fetch_cursor_batch(&self.source_client, batch_size).await?;
            source_idx = 0;
            if source_batch.is_empty() {
                // Source exhausted - all remaining target rows are orphans
                while !target_batch.is_empty() {
                    for pk in target_batch.drain(target_idx..) {
                        orphans_to_delete.push(pk);
                        if orphans_to_delete.len() >= batch_size {
                            delete_batch(&orphans_to_delete).await?;
                            orphans_to_delete.clear();
                        }
                    }
                    target_batch = fetch_cursor_batch(&self.target_client, batch_size).await?;
                    target_idx = 0;
                }
                break;
            }
        }
        
        if target_idx >= target_batch.len() {
            target_batch = fetch_cursor_batch(&self.target_client, batch_size).await?;
            target_idx = 0;
            if target_batch.is_empty() {
                break; // Target exhausted - done
            }
        }
        
        let source_pk = &source_batch[source_idx];
        let target_pk = &target_batch[target_idx];
        
        match source_pk.cmp(target_pk) {
            Ordering::Equal => {
                // PKs match - both exist, advance both
                source_idx += 1;
                target_idx += 1;
            }
            Ordering::Less => {
                // Source has PK that target doesn't - advance source
                source_idx += 1;
            }
            Ordering::Greater => {
                // Target has PK that source doesn't - ORPHAN
                orphans_to_delete.push(target_pk.clone());
                target_idx += 1;
                
                if orphans_to_delete.len() >= batch_size {
                    delete_batch(&orphans_to_delete).await?;
                    orphans_to_delete.clear();
                }
            }
        }
    }
    
    // Delete remaining orphans
    if !orphans_to_delete.is_empty() {
        delete_batch(&orphans_to_delete).await?;
    }
    
    Ok(total_deleted)
}

Benefits

Metric Before After
Memory O(total_rows) ~2.4 GB O(batch_size) ~10 MB
Connection idle Minutes Seconds per batch
Progress visibility None Logged per batch

Testing

  1. Unit test: Mock cursor fetching, verify merge-join logic
  2. Integration test: Reconcile table with known orphans
  3. Manual test: Reconcile the 14M row serendb_coingeckoprice table

Related

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions