Skip to content

Commit 6ab5d12

Browse files
authored
feat: Add batched processing for SQLite converter to prevent OOM (#77) (#78)
The SQLite to PostgreSQL converter now processes rows in batches instead of loading the entire table into memory. This enables migration of large SQLite databases (7M+ rows) without running out of memory. Changes: - Add BatchedTableReader struct for rowid-based pagination - Add read_table_batch() function for memory-efficient reading - Add convert_table_batched() async function that reads, converts, and inserts in batches - Update init_sqlite_to_postgres() to use batched processing - Use calculate_optimal_batch_size() for memory-based batch sizing - Add comprehensive tests for batched reader functionality Memory usage now stays constant regardless of table size. Batch size automatically adjusts based on available system memory (25% of free RAM, clamped between 1,000 and 50,000 rows). Fixes #77
1 parent cc5caba commit 6ab5d12

4 files changed

Lines changed: 493 additions & 22 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/commands/init.rs

Lines changed: 50 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -911,22 +911,35 @@ pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Res
911911
let target_client = postgres::connect_with_retry(target_url).await?;
912912
tracing::info!(" ✓ Connected to PostgreSQL target");
913913

914-
// Step 4: Migrate each table
915-
tracing::info!("Step 4/4: Migrating tables...");
916-
for (idx, table_name) in tables.iter().enumerate() {
914+
// Get row counts for progress display
915+
let mut table_row_counts: Vec<(&str, usize)> = Vec::new();
916+
let mut total_rows = 0usize;
917+
for table_name in &tables {
918+
let count =
919+
crate::sqlite::reader::get_table_row_count(&sqlite_conn, table_name).unwrap_or(0);
920+
table_row_counts.push((table_name, count));
921+
total_rows += count;
922+
}
923+
924+
tracing::info!(
925+
"Total rows to migrate: {} across {} table(s)",
926+
total_rows,
927+
tables.len()
928+
);
929+
930+
// Step 4: Migrate each table using batched processing
931+
tracing::info!("Step 4/4: Migrating tables (batched processing)...");
932+
let mut migrated_rows = 0usize;
933+
934+
for (idx, (table_name, row_count)) in table_row_counts.iter().enumerate() {
917935
tracing::info!(
918-
"Migrating table {}/{}: '{}'",
936+
"Migrating table {}/{}: '{}' ({} rows)",
919937
idx + 1,
920938
tables.len(),
921-
table_name
939+
table_name,
940+
row_count
922941
);
923942

924-
// Convert SQLite table to JSONB
925-
let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
926-
.with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;
927-
928-
tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);
929-
930943
// Create JSONB table in PostgreSQL
931944
crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
932945
.await
@@ -939,21 +952,39 @@ pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Res
939952

940953
tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);
941954

942-
if !rows.is_empty() {
943-
// Batch insert all rows
944-
crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
945-
.await
946-
.with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
955+
// Use batched conversion for memory efficiency
956+
let rows_processed = crate::sqlite::converter::convert_table_batched(
957+
&sqlite_conn,
958+
&target_client,
959+
table_name,
960+
"sqlite",
961+
None, // Use default batch size
962+
)
963+
.await
964+
.with_context(|| format!("Failed to migrate table '{}'", table_name))?;
947965

948-
tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
966+
migrated_rows += rows_processed;
967+
968+
if rows_processed > 0 {
969+
tracing::info!(
970+
" ✓ Migrated {} rows from '{}' ({:.1}% of total)",
971+
rows_processed,
972+
table_name,
973+
if total_rows > 0 {
974+
migrated_rows as f64 / total_rows as f64 * 100.0
975+
} else {
976+
100.0
977+
}
978+
);
949979
} else {
950-
tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
980+
tracing::info!(" ✓ Table '{}' is empty (no rows to migrate)", table_name);
951981
}
952982
}
953983

954984
tracing::info!("✅ SQLite to PostgreSQL migration complete!");
955985
tracing::info!(
956-
" Migrated {} table(s) from '{}' to PostgreSQL",
986+
" Migrated {} row(s) from {} table(s) in '{}'",
987+
migrated_rows,
957988
tables.len(),
958989
sqlite_path
959990
);

src/sqlite/converter.rs

Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,166 @@ fn detect_id_column(conn: &Connection, table: &str) -> Result<Option<String>> {
235235
Ok(None)
236236
}
237237

238+
/// Convert a batch of SQLite rows to JSONB format.
239+
///
240+
/// Converts a pre-read batch of rows, extracting IDs and converting to JSON.
241+
fn convert_batch_to_jsonb(
242+
rows: Vec<HashMap<String, rusqlite::types::Value>>,
243+
id_column: &Option<String>,
244+
start_row_num: usize,
245+
table: &str,
246+
) -> Result<Vec<(String, JsonValue)>> {
247+
let mut result = Vec::with_capacity(rows.len());
248+
249+
for (batch_idx, mut row) in rows.into_iter().enumerate() {
250+
let row_num = start_row_num + batch_idx;
251+
252+
// Remove internal _rowid tracking column before conversion
253+
row.remove("_rowid");
254+
255+
// Extract or generate ID
256+
let id = if let Some(ref id_col) = id_column {
257+
match row.get(id_col) {
258+
Some(rusqlite::types::Value::Integer(i)) => i.to_string(),
259+
Some(rusqlite::types::Value::Text(s)) => s.clone(),
260+
Some(rusqlite::types::Value::Real(f)) => f.to_string(),
261+
_ => (row_num + 1).to_string(),
262+
}
263+
} else {
264+
(row_num + 1).to_string()
265+
};
266+
267+
// Convert row to JSON
268+
let json_data = sqlite_row_to_json(row).with_context(|| {
269+
format!(
270+
"Failed to convert row {} in table '{}' to JSON",
271+
row_num + 1,
272+
table
273+
)
274+
})?;
275+
276+
result.push((id, json_data));
277+
}
278+
279+
Ok(result)
280+
}
281+
282+
/// Convert and insert a SQLite table to PostgreSQL using batched processing.
283+
///
284+
/// This function uses memory-efficient batched processing to handle large tables:
285+
/// 1. Reads rows in batches (default 10,000 rows)
286+
/// 2. Converts each batch to JSONB format
287+
/// 3. Inserts each batch to PostgreSQL before reading the next
288+
///
289+
/// Memory usage stays constant regardless of table size.
290+
///
291+
/// # Arguments
292+
///
293+
/// * `sqlite_conn` - SQLite database connection
294+
/// * `pg_client` - PostgreSQL client connection
295+
/// * `table` - Table name to convert
296+
/// * `source_type` - Source type label for metadata (e.g., "sqlite")
297+
/// * `batch_size` - Optional batch size (default: 10,000 rows)
298+
///
299+
/// # Returns
300+
///
301+
/// Total number of rows processed.
302+
///
303+
/// # Examples
304+
///
305+
/// ```no_run
306+
/// # use database_replicator::sqlite::converter::convert_table_batched;
307+
/// # async fn example(
308+
/// # sqlite_conn: &rusqlite::Connection,
309+
/// # pg_client: &tokio_postgres::Client,
310+
/// # ) -> anyhow::Result<()> {
311+
/// let rows_processed = convert_table_batched(
312+
/// sqlite_conn,
313+
/// pg_client,
314+
/// "large_table",
315+
/// "sqlite",
316+
/// None,
317+
/// ).await?;
318+
/// println!("Processed {} rows", rows_processed);
319+
/// # Ok(())
320+
/// # }
321+
/// ```
322+
pub async fn convert_table_batched(
323+
sqlite_conn: &Connection,
324+
pg_client: &tokio_postgres::Client,
325+
table: &str,
326+
source_type: &str,
327+
batch_size: Option<usize>,
328+
) -> Result<usize> {
329+
use crate::sqlite::reader::{read_table_batch, BatchedTableReader};
330+
331+
// Use memory-based batch size calculation if not specified
332+
let batch_size = batch_size.unwrap_or_else(crate::utils::calculate_optimal_batch_size);
333+
334+
tracing::info!(
335+
"Starting batched conversion of table '{}' (batch_size={})",
336+
table,
337+
batch_size
338+
);
339+
340+
// Detect ID column once before processing batches
341+
let id_column = detect_id_column(sqlite_conn, table)?;
342+
343+
// Create batched reader
344+
let mut reader = BatchedTableReader::new(sqlite_conn, table, batch_size)?;
345+
346+
let mut total_rows = 0usize;
347+
let mut batch_num = 0usize;
348+
349+
// Process batches until exhausted
350+
while let Some(rows) = read_table_batch(sqlite_conn, &mut reader)? {
351+
let batch_row_count = rows.len();
352+
batch_num += 1;
353+
354+
tracing::debug!(
355+
"Processing batch {} ({} rows) from table '{}'",
356+
batch_num,
357+
batch_row_count,
358+
table
359+
);
360+
361+
// Convert batch to JSONB
362+
let jsonb_rows = convert_batch_to_jsonb(rows, &id_column, total_rows, table)?;
363+
364+
// Insert batch to PostgreSQL
365+
if !jsonb_rows.is_empty() {
366+
crate::jsonb::writer::insert_jsonb_batch(pg_client, table, jsonb_rows, source_type)
367+
.await
368+
.with_context(|| {
369+
format!(
370+
"Failed to insert batch {} into PostgreSQL table '{}'",
371+
batch_num, table
372+
)
373+
})?;
374+
}
375+
376+
total_rows += batch_row_count;
377+
378+
// Log progress for large tables
379+
if total_rows.is_multiple_of(100_000) {
380+
tracing::info!(
381+
"Progress: {} rows processed from table '{}'",
382+
total_rows,
383+
table
384+
);
385+
}
386+
}
387+
388+
tracing::info!(
389+
"Completed batched conversion of table '{}': {} total rows in {} batches",
390+
table,
391+
total_rows,
392+
batch_num
393+
);
394+
395+
Ok(total_rows)
396+
}
397+
238398
#[cfg(test)]
239399
mod tests {
240400
use super::*;

0 commit comments

Comments
 (0)