Problem
The reconciler loads ALL primary keys from both source and target tables into memory before comparing them. For tables with millions of rows, this causes:
- Massive memory usage - 2-3 GB for a 14M row table (just for PKs)
- Potential OOM - Especially on memory-constrained instances
- Connection timeouts - Long-running queries to fetch all PKs
Evidence
From src/xmin/reconciler.rs:44-67:
// Loads ALL PKs from source into Vec
let source_pks = self.get_all_primary_keys(...).await?;
// Loads ALL PKs from target into Vec
let target_pks = self.get_all_primary_keys(...).await?;
// Creates ANOTHER copy as HashSet
let source_set: HashSet<Vec<String>> = source_pks.into_iter().collect();
Memory Analysis (14M rows, UUID primary key)
| Data Structure |
Size |
| source_pks (Vec) |
~700 MB |
| target_pks (Vec) |
~700 MB |
| source_set (HashSet) |
~1 GB |
| Total |
~2.4 GB |
Solution
Replace the load-everything approach with a streaming merge-join comparison:
Algorithm
- Query both databases with
ORDER BY pk and use cursors
- Fetch PKs in batches from both sides
- Compare using merge-join (both sorted, single pass)
- Accumulate orphans and delete in batches
Implementation
pub async fn find_orphaned_rows_batched(
&self,
schema: &str,
table: &str,
primary_key_columns: &[String],
batch_size: usize,
) -> Result<u64> {
// Use cursors for streaming comparison
let pk_cols = build_pk_select(primary_key_columns);
let order_by = build_order_by(primary_key_columns);
// Declare cursors on both databases
self.source_client.execute(
&format!("DECLARE source_cursor CURSOR FOR SELECT {} FROM {}.{} ORDER BY {}",
pk_cols, schema, table, order_by),
&[]
).await?;
self.target_client.execute(
&format!("DECLARE target_cursor CURSOR FOR SELECT {} FROM {}.{} ORDER BY {}",
pk_cols, schema, table, order_by),
&[]
).await?;
let mut orphans_to_delete = Vec::new();
let mut source_batch = fetch_cursor_batch(&self.source_client, batch_size).await?;
let mut target_batch = fetch_cursor_batch(&self.target_client, batch_size).await?;
let mut source_idx = 0;
let mut target_idx = 0;
// Merge-join comparison
loop {
// Refill batches as needed
if source_idx >= source_batch.len() {
source_batch = fetch_cursor_batch(&self.source_client, batch_size).await?;
source_idx = 0;
if source_batch.is_empty() {
// Source exhausted - all remaining target rows are orphans
while !target_batch.is_empty() {
for pk in target_batch.drain(target_idx..) {
orphans_to_delete.push(pk);
if orphans_to_delete.len() >= batch_size {
delete_batch(&orphans_to_delete).await?;
orphans_to_delete.clear();
}
}
target_batch = fetch_cursor_batch(&self.target_client, batch_size).await?;
target_idx = 0;
}
break;
}
}
if target_idx >= target_batch.len() {
target_batch = fetch_cursor_batch(&self.target_client, batch_size).await?;
target_idx = 0;
if target_batch.is_empty() {
break; // Target exhausted - done
}
}
let source_pk = &source_batch[source_idx];
let target_pk = &target_batch[target_idx];
match source_pk.cmp(target_pk) {
Ordering::Equal => {
// PKs match - both exist, advance both
source_idx += 1;
target_idx += 1;
}
Ordering::Less => {
// Source has PK that target doesn't - advance source
source_idx += 1;
}
Ordering::Greater => {
// Target has PK that source doesn't - ORPHAN
orphans_to_delete.push(target_pk.clone());
target_idx += 1;
if orphans_to_delete.len() >= batch_size {
delete_batch(&orphans_to_delete).await?;
orphans_to_delete.clear();
}
}
}
}
// Delete remaining orphans
if !orphans_to_delete.is_empty() {
delete_batch(&orphans_to_delete).await?;
}
Ok(total_deleted)
}
Benefits
| Metric |
Before |
After |
| Memory |
O(total_rows) ~2.4 GB |
O(batch_size) ~10 MB |
| Connection idle |
Minutes |
Seconds per batch |
| Progress visibility |
None |
Logged per batch |
Testing
- Unit test: Mock cursor fetching, verify merge-join logic
- Integration test: Reconcile table with known orphans
- Manual test: Reconcile the 14M row
serendb_coingeckoprice table
Related
Problem
The reconciler loads ALL primary keys from both source and target tables into memory before comparing them. For tables with millions of rows, this causes:
Evidence
From
src/xmin/reconciler.rs:44-67:Memory Analysis (14M rows, UUID primary key)
Solution
Replace the load-everything approach with a streaming merge-join comparison:
Algorithm
ORDER BY pkand use cursorsImplementation
Benefits
Testing
serendb_coingeckopricetableRelated