Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 50 additions & 19 deletions src/commands/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,22 +911,35 @@ pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Res
let target_client = postgres::connect_with_retry(target_url).await?;
tracing::info!(" ✓ Connected to PostgreSQL target");

// Step 4: Migrate each table
tracing::info!("Step 4/4: Migrating tables...");
for (idx, table_name) in tables.iter().enumerate() {
// Get row counts for progress display
let mut table_row_counts: Vec<(&str, usize)> = Vec::new();
let mut total_rows = 0usize;
for table_name in &tables {
let count =
crate::sqlite::reader::get_table_row_count(&sqlite_conn, table_name).unwrap_or(0);
table_row_counts.push((table_name, count));
total_rows += count;
}

tracing::info!(
"Total rows to migrate: {} across {} table(s)",
total_rows,
tables.len()
);

// Step 4: Migrate each table using batched processing
tracing::info!("Step 4/4: Migrating tables (batched processing)...");
let mut migrated_rows = 0usize;

for (idx, (table_name, row_count)) in table_row_counts.iter().enumerate() {
tracing::info!(
"Migrating table {}/{}: '{}'",
"Migrating table {}/{}: '{}' ({} rows)",
idx + 1,
tables.len(),
table_name
table_name,
row_count
);

// Convert SQLite table to JSONB
let rows = crate::sqlite::converter::convert_table_to_jsonb(&sqlite_conn, table_name)
.with_context(|| format!("Failed to convert table '{}' to JSONB", table_name))?;

tracing::info!(" ✓ Converted {} rows from '{}'", rows.len(), table_name);

// Create JSONB table in PostgreSQL
crate::jsonb::writer::create_jsonb_table(&target_client, table_name, "sqlite")
.await
Expand All @@ -939,21 +952,39 @@ pub async fn init_sqlite_to_postgres(sqlite_path: &str, target_url: &str) -> Res

tracing::info!(" ✓ Created JSONB table '{}' in PostgreSQL", table_name);

if !rows.is_empty() {
// Batch insert all rows
crate::jsonb::writer::insert_jsonb_batch(&target_client, table_name, rows, "sqlite")
.await
.with_context(|| format!("Failed to insert data into table '{}'", table_name))?;
// Use batched conversion for memory efficiency
let rows_processed = crate::sqlite::converter::convert_table_batched(
&sqlite_conn,
&target_client,
table_name,
"sqlite",
None, // Use default batch size
)
.await
.with_context(|| format!("Failed to migrate table '{}'", table_name))?;

tracing::info!(" ✓ Inserted all rows into '{}'", table_name);
migrated_rows += rows_processed;

if rows_processed > 0 {
tracing::info!(
" ✓ Migrated {} rows from '{}' ({:.1}% of total)",
rows_processed,
table_name,
if total_rows > 0 {
migrated_rows as f64 / total_rows as f64 * 100.0
} else {
100.0
}
);
} else {
tracing::info!(" ✓ Table '{}' is empty (no rows to insert)", table_name);
tracing::info!(" ✓ Table '{}' is empty (no rows to migrate)", table_name);
}
}

tracing::info!("✅ SQLite to PostgreSQL migration complete!");
tracing::info!(
" Migrated {} table(s) from '{}' to PostgreSQL",
" Migrated {} row(s) from {} table(s) in '{}'",
migrated_rows,
tables.len(),
sqlite_path
);
Expand Down
160 changes: 160 additions & 0 deletions src/sqlite/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,166 @@ fn detect_id_column(conn: &Connection, table: &str) -> Result<Option<String>> {
Ok(None)
}

/// Convert a batch of SQLite rows to JSONB format.
///
/// Converts a pre-read batch of rows, extracting IDs and converting to JSON.
fn convert_batch_to_jsonb(
rows: Vec<HashMap<String, rusqlite::types::Value>>,
id_column: &Option<String>,
start_row_num: usize,
table: &str,
) -> Result<Vec<(String, JsonValue)>> {
let mut result = Vec::with_capacity(rows.len());

for (batch_idx, mut row) in rows.into_iter().enumerate() {
let row_num = start_row_num + batch_idx;

// Remove internal _rowid tracking column before conversion
row.remove("_rowid");

// Extract or generate ID
let id = if let Some(ref id_col) = id_column {
match row.get(id_col) {
Some(rusqlite::types::Value::Integer(i)) => i.to_string(),
Some(rusqlite::types::Value::Text(s)) => s.clone(),
Some(rusqlite::types::Value::Real(f)) => f.to_string(),
_ => (row_num + 1).to_string(),
}
} else {
(row_num + 1).to_string()
};

// Convert row to JSON
let json_data = sqlite_row_to_json(row).with_context(|| {
format!(
"Failed to convert row {} in table '{}' to JSON",
row_num + 1,
table
)
})?;

result.push((id, json_data));
}

Ok(result)
}

/// Convert and insert a SQLite table to PostgreSQL using batched processing.
///
/// This function uses memory-efficient batched processing to handle large tables:
/// 1. Reads rows in batches (default 10,000 rows)
/// 2. Converts each batch to JSONB format
/// 3. Inserts each batch to PostgreSQL before reading the next
///
/// Memory usage stays constant regardless of table size.
///
/// # Arguments
///
/// * `sqlite_conn` - SQLite database connection
/// * `pg_client` - PostgreSQL client connection
/// * `table` - Table name to convert
/// * `source_type` - Source type label for metadata (e.g., "sqlite")
/// * `batch_size` - Optional batch size (default: 10,000 rows)
///
/// # Returns
///
/// Total number of rows processed.
///
/// # Examples
///
/// ```no_run
/// # use database_replicator::sqlite::converter::convert_table_batched;
/// # async fn example(
/// # sqlite_conn: &rusqlite::Connection,
/// # pg_client: &tokio_postgres::Client,
/// # ) -> anyhow::Result<()> {
/// let rows_processed = convert_table_batched(
/// sqlite_conn,
/// pg_client,
/// "large_table",
/// "sqlite",
/// None,
/// ).await?;
/// println!("Processed {} rows", rows_processed);
/// # Ok(())
/// # }
/// ```
pub async fn convert_table_batched(
sqlite_conn: &Connection,
pg_client: &tokio_postgres::Client,
table: &str,
source_type: &str,
batch_size: Option<usize>,
) -> Result<usize> {
use crate::sqlite::reader::{read_table_batch, BatchedTableReader};

// Use memory-based batch size calculation if not specified
let batch_size = batch_size.unwrap_or_else(crate::utils::calculate_optimal_batch_size);

tracing::info!(
"Starting batched conversion of table '{}' (batch_size={})",
table,
batch_size
);

// Detect ID column once before processing batches
let id_column = detect_id_column(sqlite_conn, table)?;

// Create batched reader
let mut reader = BatchedTableReader::new(sqlite_conn, table, batch_size)?;

let mut total_rows = 0usize;
let mut batch_num = 0usize;

// Process batches until exhausted
while let Some(rows) = read_table_batch(sqlite_conn, &mut reader)? {
let batch_row_count = rows.len();
batch_num += 1;

tracing::debug!(
"Processing batch {} ({} rows) from table '{}'",
batch_num,
batch_row_count,
table
);

// Convert batch to JSONB
let jsonb_rows = convert_batch_to_jsonb(rows, &id_column, total_rows, table)?;

// Insert batch to PostgreSQL
if !jsonb_rows.is_empty() {
crate::jsonb::writer::insert_jsonb_batch(pg_client, table, jsonb_rows, source_type)
.await
.with_context(|| {
format!(
"Failed to insert batch {} into PostgreSQL table '{}'",
batch_num, table
)
})?;
}

total_rows += batch_row_count;

// Log progress for large tables
if total_rows.is_multiple_of(100_000) {
tracing::info!(
"Progress: {} rows processed from table '{}'",
total_rows,
table
);
}
}

tracing::info!(
"Completed batched conversion of table '{}': {} total rows in {} batches",
table,
total_rows,
batch_num
);

Ok(total_rows)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading