From a9f466d8e964ba02120bb8aba47e3077a0f66093 Mon Sep 17 00:00:00 2001 From: Taariq Lewis <701864+taariq@users.noreply.github.com> Date: Thu, 11 Dec 2025 13:24:58 -0800 Subject: [PATCH] feat: Add batched processing for SQLite converter to prevent OOM (#77) The SQLite to PostgreSQL converter now processes rows in batches instead of loading the entire table into memory. This enables migration of large SQLite databases (7M+ rows) without running out of memory. Changes: - Add BatchedTableReader struct for rowid-based pagination - Add read_table_batch() function for memory-efficient reading - Add convert_table_batched() async function that reads, converts, and inserts in batches - Update init_sqlite_to_postgres() to use batched processing - Use calculate_optimal_batch_size() for memory-based batch sizing - Add comprehensive tests for batched reader functionality Memory usage now stays constant regardless of table size. Batch size automatically adjusts based on available system memory (25% of free RAM, clamped between 1,000 and 50,000 rows). Fixes #77 --- Cargo.lock | 2 +- src/commands/init.rs | 69 +++++++--- src/sqlite/converter.rs | 160 ++++++++++++++++++++++ src/sqlite/reader.rs | 284 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 493 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 81c85e3..4ae03bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -687,7 +687,7 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476" [[package]] name = "database-replicator" -version = "7.0.10" +version = "7.0.11" dependencies = [ "anyhow", "base64 0.21.7", diff --git a/src/commands/init.rs b/src/commands/init.rs index 93d98e7..fbda5fc 100644 --- a/src/commands/init.rs +++ b/src/commands/init.rs @@ -911,22 +911,35 @@ pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Res let target_client = postgres::connect_with_retry(target_url).await?; tracing::info!(" ✓ Connected to PostgreSQL target"); - // Step 4: Migrate each table - tracing::info!("Step 4/4: Migrating tables..."); - for (idx, table_name) in tables.iter().enumerate() { + // Get row counts for progress display + let mut table_row_counts: Vec<(&str, usize)> = Vec::new(); + let mut total_rows = 0usize; + for table_name in &tables { + let count = + crate::sqlite::reader::get_table_row_count(&sqlite_conn, table_name).unwrap_or(0); + table_row_counts.push((table_name, count)); + total_rows += count; + } + + tracing::info!( + "Total rows to migrate: {} across {} table(s)", + total_rows, + tables.len() + ); + + // Step 4: Migrate each table using batched processing + tracing::info!("Step 4/4: Migrating tables (batched processing)..."); + let mut migrated_rows = 0usize; + + for (idx, (table_name, row_count)) in table_row_counts.iter().enumerate() { tracing::info!( - "Migrating table {}/{}: '{}'", + "Migrating table {}/{}: '{}' ({} rows)", idx + 1, tables.len(), - table_name + table_name, + row_count ); - // Convert SQLite table to JSONB - let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name) - .with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?; - - tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name); - // Create JSONB table in PostgreSQL crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite") .await @@ -939,21 +952,39 @@ pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Res tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name); - if !rows.is_empty() { - // Batch insert all rows - crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite") - .await - .with_context(|| format!("Failed to insert data into table '{}'", table_name))?; + // Use batched conversion for memory efficiency + let rows_processed = crate::sqlite::converter::convert_table_batched( + &sqlite_conn, + &target_client, + table_name, + "sqlite", + None, // Use default batch size + ) + .await + .with_context(|| format!("Failed to migrate table '{}'", table_name))?; - tracing::info!(" ✓ Inserted all rows into '{}'", table_name); + migrated_rows += rows_processed; + + if rows_processed > 0 { + tracing::info!( + " ✓ Migrated {} rows from '{}' ({:.1}% of total)", + rows_processed, + table_name, + if total_rows > 0 { + migrated_rows as f64 / total_rows as f64 * 100.0 + } else { + 100.0 + } + ); } else { - tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name); + tracing::info!(" ✓ Table '{}' is empty (no rows to migrate)", table_name); } } tracing::info!("✅ SQLite to PostgreSQL migration complete!"); tracing::info!( - " Migrated {} table(s) from '{}' to PostgreSQL", + " Migrated {} row(s) from {} table(s) in '{}'", + migrated_rows, tables.len(), sqlite_path ); diff --git a/src/sqlite/converter.rs b/src/sqlite/converter.rs index 1c96632..c81e6c9 100644 --- a/src/sqlite/converter.rs +++ b/src/sqlite/converter.rs @@ -235,6 +235,166 @@ fn detect_id_column(conn: &Connection, table: &str) -> Result> { Ok(None) } +/// Convert a batch of SQLite rows to JSONB format. +/// +/// Converts a pre-read batch of rows, extracting IDs and converting to JSON. +fn convert_batch_to_jsonb( + rows: Vec>, + id_column: &Option, + start_row_num: usize, + table: &str, +) -> Result> { + let mut result = Vec::with_capacity(rows.len()); + + for (batch_idx, mut row) in rows.into_iter().enumerate() { + let row_num = start_row_num + batch_idx; + + // Remove internal _rowid tracking column before conversion + row.remove("_rowid"); + + // Extract or generate ID + let id = if let Some(ref id_col) = id_column { + match row.get(id_col) { + Some(rusqlite::types::Value::Integer(i)) => i.to_string(), + Some(rusqlite::types::Value::Text(s)) => s.clone(), + Some(rusqlite::types::Value::Real(f)) => f.to_string(), + _ => (row_num + 1).to_string(), + } + } else { + (row_num + 1).to_string() + }; + + // Convert row to JSON + let json_data = sqlite_row_to_json(row).with_context(|| { + format!( + "Failed to convert row {} in table '{}' to JSON", + row_num + 1, + table + ) + })?; + + result.push((id, json_data)); + } + + Ok(result) +} + +/// Convert and insert a SQLite table to PostgreSQL using batched processing. +/// +/// This function uses memory-efficient batched processing to handle large tables: +/// 1. Reads rows in batches (default 10,000 rows) +/// 2. Converts each batch to JSONB format +/// 3. Inserts each batch to PostgreSQL before reading the next +/// +/// Memory usage stays constant regardless of table size. +/// +/// # Arguments +/// +/// * `sqlite_conn` - SQLite database connection +/// * `pg_client` - PostgreSQL client connection +/// * `table` - Table name to convert +/// * `source_type` - Source type label for metadata (e.g., "sqlite") +/// * `batch_size` - Optional batch size (default: 10,000 rows) +/// +/// # Returns +/// +/// Total number of rows processed. +/// +/// # Examples +/// +/// ```no_run +/// # use database_replicator::sqlite::converter::convert_table_batched; +/// # async fn example( +/// # sqlite_conn: &rusqlite::Connection, +/// # pg_client: &tokio_postgres::Client, +/// # ) -> anyhow::Result<()> { +/// let rows_processed = convert_table_batched( +/// sqlite_conn, +/// pg_client, +/// "large_table", +/// "sqlite", +/// None, +/// ).await?; +/// println!("Processed {} rows", rows_processed); +/// # Ok(()) +/// # } +/// ``` +pub async fn convert_table_batched( + sqlite_conn: &Connection, + pg_client: &tokio_postgres::Client, + table: &str, + source_type: &str, + batch_size: Option, +) -> Result { + use crate::sqlite::reader::{read_table_batch, BatchedTableReader}; + + // Use memory-based batch size calculation if not specified + let batch_size = batch_size.unwrap_or_else(crate::utils::calculate_optimal_batch_size); + + tracing::info!( + "Starting batched conversion of table '{}' (batch_size={})", + table, + batch_size + ); + + // Detect ID column once before processing batches + let id_column = detect_id_column(sqlite_conn, table)?; + + // Create batched reader + let mut reader = BatchedTableReader::new(sqlite_conn, table, batch_size)?; + + let mut total_rows = 0usize; + let mut batch_num = 0usize; + + // Process batches until exhausted + while let Some(rows) = read_table_batch(sqlite_conn, &mut reader)? { + let batch_row_count = rows.len(); + batch_num += 1; + + tracing::debug!( + "Processing batch {} ({} rows) from table '{}'", + batch_num, + batch_row_count, + table + ); + + // Convert batch to JSONB + let jsonb_rows = convert_batch_to_jsonb(rows, &id_column, total_rows, table)?; + + // Insert batch to PostgreSQL + if !jsonb_rows.is_empty() { + crate::jsonb::writer::insert_jsonb_batch(pg_client, table, jsonb_rows, source_type) + .await + .with_context(|| { + format!( + "Failed to insert batch {} into PostgreSQL table '{}'", + batch_num, table + ) + })?; + } + + total_rows += batch_row_count; + + // Log progress for large tables + if total_rows.is_multiple_of(100_000) { + tracing::info!( + "Progress: {} rows processed from table '{}'", + total_rows, + table + ); + } + } + + tracing::info!( + "Completed batched conversion of table '{}': {} total rows in {} batches", + table, + total_rows, + batch_num + ); + + Ok(total_rows) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/sqlite/reader.rs b/src/sqlite/reader.rs index fbaed74..442c21e 100644 --- a/src/sqlite/reader.rs +++ b/src/sqlite/reader.rs @@ -104,6 +104,139 @@ pub fn get_table_row_count(conn: &Connection, table: &str) -> Result { Ok(count as usize) } +/// Batch reader state for iterating over large SQLite tables. +/// +/// Uses rowid-based pagination to efficiently read tables in chunks +/// without loading all data into memory. +#[derive(Debug)] +pub struct BatchedTableReader { + /// Table name being read + pub table: String, + /// Column names in the table + pub columns: Vec, + /// Last rowid seen (for pagination) + pub last_rowid: i64, + /// Maximum rows per batch + pub batch_size: usize, + /// Whether all rows have been read + pub exhausted: bool, +} + +impl BatchedTableReader { + /// Create a new batched reader for a table. + /// + /// # Arguments + /// + /// * `conn` - SQLite database connection + /// * `table` - Table name (must be validated) + /// * `batch_size` - Maximum rows per batch + pub fn new(conn: &Connection, table: &str, batch_size: usize) -> Result { + // Validate table name + crate::jsonb::validate_table_name(table) + .context("Invalid table name for batched reading")?; + + // Get column names + let query = format!("SELECT * FROM {} LIMIT 0", crate::utils::quote_ident(table)); + let stmt = conn + .prepare(&query) + .with_context(|| format!("Failed to prepare statement for table '{}'", table))?; + + let columns: Vec = stmt.column_names().iter().map(|s| s.to_string()).collect(); + + Ok(Self { + table: table.to_string(), + columns, + last_rowid: 0, + batch_size, + exhausted: false, + }) + } +} + +/// Read the next batch of rows from a table. +/// +/// Uses rowid-based pagination for efficient batched reading. +/// SQLite's rowid is always present (even if not explicitly defined) +/// and provides stable ordering for pagination. +/// +/// # Arguments +/// +/// * `conn` - SQLite database connection +/// * `reader` - Mutable batch reader state +/// +/// # Returns +/// +/// Some(rows) if there are more rows, None if exhausted. +pub fn read_table_batch( + conn: &Connection, + reader: &mut BatchedTableReader, +) -> Result>>> { + if reader.exhausted { + return Ok(None); + } + + // Query using rowid for stable pagination + // rowid is always available in SQLite (alias for INTEGER PRIMARY KEY if defined) + let query = format!( + "SELECT rowid, * FROM {} WHERE rowid > ? ORDER BY rowid LIMIT ?", + crate::utils::quote_ident(&reader.table) + ); + + let mut stmt = conn + .prepare(&query) + .with_context(|| format!("Failed to prepare batch query for table '{}'", reader.table))?; + + let column_names = &reader.columns; + let last_rowid = reader.last_rowid; + let batch_size = reader.batch_size as i64; + + let rows: Vec> = stmt + .query_map([last_rowid, batch_size], |row| { + let mut row_map = HashMap::new(); + + // First column is rowid (index 0), actual columns start at index 1 + for (idx, col_name) in column_names.iter().enumerate() { + let value: rusqlite::types::Value = row.get(idx + 1)?; + row_map.insert(col_name.clone(), value); + } + + // Also store rowid for pagination tracking + let rowid: i64 = row.get(0)?; + row_map.insert("_rowid".to_string(), rusqlite::types::Value::Integer(rowid)); + + Ok(row_map) + }) + .with_context(|| format!("Failed to query batch from table '{}'", reader.table))? + .collect::, _>>() + .with_context(|| format!("Failed to collect batch from table '{}'", reader.table))?; + + if rows.is_empty() { + reader.exhausted = true; + return Ok(None); + } + + // Update last_rowid from the last row for next iteration + if let Some(last_row) = rows.last() { + if let Some(rusqlite::types::Value::Integer(rowid)) = last_row.get("_rowid") { + reader.last_rowid = *rowid; + } + } + + // Mark as exhausted if we got fewer rows than batch_size + if rows.len() < reader.batch_size { + reader.exhausted = true; + } + + tracing::debug!( + "Read batch of {} rows from '{}' (last_rowid={})", + rows.len(), + reader.table, + reader.last_rowid + ); + + Ok(Some(rows)) +} + /// Read all data from a SQLite table /// /// Returns all rows as a vector of HashMaps, where each HashMap maps @@ -124,8 +257,8 @@ pub fn get_table_row_count(conn: &Connection, table: &str) -> Result { /// /// # Performance /// -/// Loads all rows into memory. For very large tables, consider pagination -/// or streaming approaches. +/// Loads all rows into memory. For large tables, use `BatchedTableReader` +/// and `read_table_batch()` instead. /// /// # Examples /// @@ -370,4 +503,151 @@ mod tests { .to_string() .contains("Invalid table name")); } + + #[test] + fn test_batched_table_reader_creation() { + let (_temp_dir, db_path) = create_test_db(); + let conn = Connection::open(db_path).unwrap(); + + let reader = BatchedTableReader::new(&conn, "users", 100).unwrap(); + + assert_eq!(reader.table, "users"); + assert_eq!(reader.batch_size, 100); + assert_eq!(reader.last_rowid, 0); + assert!(!reader.exhausted); + assert_eq!(reader.columns.len(), 4); // id, name, email, age + } + + #[test] + fn test_batched_table_reader_invalid_table() { + let (_temp_dir, db_path) = create_test_db(); + let conn = Connection::open(db_path).unwrap(); + + let result = BatchedTableReader::new(&conn, "users; DROP TABLE users;", 100); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Invalid table name")); + } + + #[test] + fn test_read_table_batch_single_batch() { + let (_temp_dir, db_path) = create_test_db(); + let conn = Connection::open(db_path).unwrap(); + + // Use batch size larger than row count - should get all rows in one batch + let mut reader = BatchedTableReader::new(&conn, "users", 100).unwrap(); + + // First batch should return all 3 rows + let batch1 = read_table_batch(&conn, &mut reader).unwrap(); + assert!(batch1.is_some()); + let rows = batch1.unwrap(); + assert_eq!(rows.len(), 3); + + // Second call should return None (exhausted) + let batch2 = read_table_batch(&conn, &mut reader).unwrap(); + assert!(batch2.is_none()); + assert!(reader.exhausted); + } + + #[test] + fn test_read_table_batch_multiple_batches() { + let (_temp_dir, db_path) = create_test_db(); + let conn = Connection::open(db_path).unwrap(); + + // Use batch size of 1 - should need multiple batches + let mut reader = BatchedTableReader::new(&conn, "users", 1).unwrap(); + + // Collect all batches + let mut all_rows = Vec::new(); + while let Some(batch) = read_table_batch(&conn, &mut reader).unwrap() { + assert_eq!(batch.len(), 1); // Each batch should have 1 row + all_rows.extend(batch); + } + + assert_eq!(all_rows.len(), 3); + assert!(reader.exhausted); + } + + #[test] + fn test_read_table_batch_preserves_data() { + let (_temp_dir, db_path) = create_test_db(); + let conn = Connection::open(db_path).unwrap(); + + let mut reader = BatchedTableReader::new(&conn, "users", 100).unwrap(); + let batch = read_table_batch(&conn, &mut reader).unwrap().unwrap(); + + // Verify row contents (sorted by rowid) + let first_row = &batch[0]; + assert!(first_row.contains_key("id")); + assert!(first_row.contains_key("name")); + assert!(first_row.contains_key("email")); + assert!(first_row.contains_key("age")); + + // First row should be Alice (id=1) + match &first_row["name"] { + rusqlite::types::Value::Text(s) => assert_eq!(s, "Alice"), + _ => panic!("name should be TEXT"), + } + } + + #[test] + fn test_read_table_batch_empty_table() { + let temp_dir = tempfile::tempdir().unwrap(); + let db_path = temp_dir.path().join("empty.db"); + let conn = Connection::open(&db_path).unwrap(); + + conn.execute( + "CREATE TABLE empty_table (id INTEGER PRIMARY KEY, name TEXT)", + [], + ) + .unwrap(); + + let mut reader = BatchedTableReader::new(&conn, "empty_table", 100).unwrap(); + + // Should return None immediately for empty table + let batch = read_table_batch(&conn, &mut reader).unwrap(); + assert!(batch.is_none()); + assert!(reader.exhausted); + } + + #[test] + fn test_read_table_batch_large_table() { + let temp_dir = tempfile::tempdir().unwrap(); + let db_path = temp_dir.path().join("large.db"); + let conn = Connection::open(&db_path).unwrap(); + + conn.execute( + "CREATE TABLE large_table (id INTEGER PRIMARY KEY, value TEXT)", + [], + ) + .unwrap(); + + // Insert 250 rows + for i in 1..=250 { + conn.execute( + "INSERT INTO large_table (id, value) VALUES (?, ?)", + rusqlite::params![i, format!("value_{}", i)], + ) + .unwrap(); + } + + // Read with batch size of 100 + let mut reader = BatchedTableReader::new(&conn, "large_table", 100).unwrap(); + + let mut batch_count = 0; + let mut total_rows = 0; + + while let Some(batch) = read_table_batch(&conn, &mut reader).unwrap() { + batch_count += 1; + total_rows += batch.len(); + + // Each batch should be at most 100 rows + assert!(batch.len() <= 100); + } + + assert_eq!(total_rows, 250); + assert_eq!(batch_count, 3); // 100 + 100 + 50 + } }