Skip to content

Commit dae6033

Browse files
committed
Integrate pre-flight checks with auto-fallback to remote
- Run pre-flight at start of local init for PostgreSQL sources - Print checklist-style output with actionable fixes - Auto-fallback to remote if: - Tool version incompatible AND - Target is SerenDB AND - --local was not explicit - Fail with clear error if --local explicit but checks fail - Add force_local parameter to init function - Update all tests to pass new parameter Closes #7
1 parent c9cfb9b commit dae6033

6 files changed

Lines changed: 106 additions & 9 deletions

File tree

src/commands/init.rs

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use tokio_postgres::Client;
2828
/// * `drop_existing` - Drop existing databases on target before copying
2929
/// * `enable_sync` - Set up continuous logical replication after snapshot (default: true)
3030
/// * `allow_resume` - Resume from checkpoint if available (default: true)
31+
/// * `force_local` - If true, --local was explicitly set (fail instead of fallback to remote)
3132
///
3233
/// # Returns
3334
///
@@ -58,8 +59,9 @@ use tokio_postgres::Client;
5859
/// false,
5960
/// ReplicationFilter::empty(),
6061
/// false,
61-
/// true, // Enable continuous replication
62-
/// true // Allow resume
62+
/// true, // Enable continuous replication
63+
/// true, // Allow resume
64+
/// false, // Not forcing local execution
6365
/// ).await?;
6466
///
6567
/// // Snapshot only (no continuous replication)
@@ -69,12 +71,14 @@ use tokio_postgres::Client;
6971
/// true,
7072
/// ReplicationFilter::empty(),
7173
/// false,
72-
/// false, // Disable continuous replication
73-
/// true // Allow resume
74+
/// false, // Disable continuous replication
75+
/// true, // Allow resume
76+
/// true, // Force local execution (--local flag)
7477
/// ).await?;
7578
/// # Ok(())
7679
/// # }
7780
/// ```
81+
#[allow(clippy::too_many_arguments)]
7882
pub async fn init(
7983
source_url: &str,
8084
target_url: &str,
@@ -83,6 +87,7 @@ pub async fn init(
8387
drop_existing: bool,
8488
enable_sync: bool,
8589
allow_resume: bool,
90+
force_local: bool,
8691
) -> Result<()> {
8792
tracing::info!("Starting initial replication...");
8893

@@ -94,6 +99,44 @@ pub async fn init(
9499
crate::SourceType::PostgreSQL => {
95100
// PostgreSQL to PostgreSQL replication (existing logic below)
96101
tracing::info!("Source type: PostgreSQL");
102+
103+
// Run pre-flight checks before any destructive operations
104+
tracing::info!("Running pre-flight checks...");
105+
106+
let databases = filter.include_databases().map(|v| v.to_vec());
107+
let preflight_result = crate::preflight::run_preflight_checks(
108+
source_url,
109+
target_url,
110+
databases.as_deref(),
111+
)
112+
.await?;
113+
114+
preflight_result.print();
115+
116+
if !preflight_result.all_passed() {
117+
// Check if we can auto-fallback to remote
118+
if preflight_result.tool_version_incompatible
119+
&& crate::utils::is_serendb_target(target_url)
120+
&& !force_local
121+
{
122+
println!();
123+
tracing::info!("Tool version incompatible. Switching to SerenAI cloud execution...");
124+
// Return special error that main.rs catches to trigger remote
125+
bail!("PREFLIGHT_FALLBACK_TO_REMOTE");
126+
}
127+
128+
// Cannot auto-fallback
129+
if force_local {
130+
bail!(
131+
"Pre-flight checks failed. Cannot continue with --local flag.\n\
132+
Fix the issues above or remove --local to allow remote execution."
133+
);
134+
}
135+
136+
bail!("Pre-flight checks failed. Fix the issues above and retry.");
137+
}
138+
139+
println!();
97140
}
98141
crate::SourceType::SQLite => {
99142
// SQLite to PostgreSQL migration (simpler path)
@@ -1106,7 +1149,7 @@ mod tests {
11061149

11071150
// Skip confirmation for automated tests, disable sync to keep test simple
11081151
let filter = crate::filters::ReplicationFilter::empty();
1109-
let result = init(&source, &target, true, filter, false, false, true).await;
1152+
let result = init(&source, &target, true, filter, false, false, true, false).await;
11101153
assert!(result.is_ok());
11111154
}
11121155

src/main.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,12 @@ async fn main() -> anyhow::Result<()> {
309309
}
310310

311311
// Local execution path
312+
// Clone filter values for potential fallback to remote
313+
let fallback_include_dbs = final_include_databases.clone();
314+
let fallback_exclude_dbs = final_exclude_databases.clone();
315+
let fallback_include_tables = final_include_tables.clone();
316+
let fallback_exclude_tables = final_exclude_tables.clone();
317+
312318
let filter = database_replicator::filters::ReplicationFilter::new(
313319
final_include_databases,
314320
final_exclude_databases,
@@ -319,16 +325,40 @@ async fn main() -> anyhow::Result<()> {
319325
let filter = filter.with_table_rules(table_rule_data);
320326

321327
let enable_sync = !no_sync; // Invert the flag: by default sync is enabled
322-
commands::init(
328+
329+
// Run init with pre-flight checks, handle fallback to remote
330+
match commands::init(
323331
&source,
324332
&target,
325333
yes,
326334
filter,
327335
drop_existing,
328336
enable_sync,
329337
!no_resume,
338+
local, // Pass whether --local was explicit
330339
)
331340
.await
341+
{
342+
Ok(_) => Ok(()),
343+
Err(e) if e.to_string().contains("PREFLIGHT_FALLBACK_TO_REMOTE") => {
344+
// Auto-fallback to remote execution
345+
init_remote(
346+
source,
347+
target,
348+
yes,
349+
fallback_include_dbs,
350+
fallback_exclude_dbs,
351+
fallback_include_tables,
352+
fallback_exclude_tables,
353+
drop_existing,
354+
no_sync,
355+
seren_api,
356+
job_timeout,
357+
)
358+
.await
359+
}
360+
Err(e) => Err(e),
361+
}
332362
}
333363
Commands::Sync {
334364
source,

tests/integration_test.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ async fn test_init_command_integration() {
4747

4848
// Skip confirmation for automated tests, disable sync to keep test simple
4949
let filter = database_replicator::filters::ReplicationFilter::empty();
50-
let result = commands::init(&source_url, &target_url, true, filter, false, false, true).await;
50+
let result = commands::init(&source_url, &target_url, true, filter, false, false, true, false).await;
5151

5252
match &result {
5353
Ok(_) => {
@@ -293,7 +293,7 @@ async fn test_init_with_database_filter() {
293293
.expect("Failed to create filter");
294294

295295
// Skip confirmation for automated tests, disable sync to keep test simple
296-
let result = commands::init(&source_url, &target_url, true, filter, false, false, true).await;
296+
let result = commands::init(&source_url, &target_url, true, filter, false, false, true, false).await;
297297

298298
match &result {
299299
Ok(_) => {
@@ -327,7 +327,7 @@ async fn test_init_with_table_filter() {
327327
.expect("Failed to create filter");
328328

329329
// Skip confirmation for automated tests, disable sync to keep test simple
330-
let result = commands::init(&source_url, &target_url, true, filter, false, false, true).await;
330+
let result = commands::init(&source_url, &target_url, true, filter, false, false, true, false).await;
331331

332332
match &result {
333333
Ok(_) => {

tests/mysql_integration_test.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ async fn test_mysql_full_replication_integration() {
154154
false,
155155
false,
156156
true,
157+
false,
157158
)
158159
.await;
159160

@@ -195,6 +196,7 @@ async fn test_mysql_null_and_blob_handling() {
195196
false,
196197
false,
197198
true,
199+
false,
198200
)
199201
.await;
200202

@@ -240,6 +242,7 @@ async fn test_mysql_empty_table_replication() {
240242
false,
241243
false,
242244
true,
245+
false,
243246
)
244247
.await;
245248

@@ -331,6 +334,7 @@ async fn test_mysql_all_data_types() {
331334
false,
332335
false,
333336
true,
337+
false,
334338
)
335339
.await;
336340

@@ -368,6 +372,7 @@ async fn test_mysql_empty_database_fails_gracefully() {
368372
false,
369373
false,
370374
true,
375+
false,
371376
)
372377
.await;
373378

@@ -400,6 +405,7 @@ async fn test_mysql_invalid_url_fails() {
400405
false,
401406
false,
402407
true,
408+
false,
403409
)
404410
.await;
405411

@@ -438,6 +444,7 @@ async fn test_mysql_missing_database_name_fails() {
438444
false,
439445
false,
440446
true,
447+
false,
441448
)
442449
.await;
443450

@@ -507,6 +514,7 @@ async fn test_mysql_decimal_and_datetime_precision() {
507514
false,
508515
false,
509516
true,
517+
false,
510518
)
511519
.await;
512520

tests/performance_test.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ async fn benchmark_sqlite_small_migration() {
198198
false,
199199
false,
200200
true,
201+
false,
201202
)
202203
.await;
203204
let elapsed = start.elapsed();
@@ -238,6 +239,7 @@ async fn benchmark_sqlite_medium_migration() {
238239
false,
239240
false,
240241
true,
242+
false,
241243
)
242244
.await;
243245
let elapsed = start.elapsed();
@@ -278,6 +280,7 @@ async fn benchmark_sqlite_large_migration() {
278280
false,
279281
false,
280282
true,
283+
false,
281284
)
282285
.await;
283286
let elapsed = start.elapsed();
@@ -321,6 +324,7 @@ async fn benchmark_mongodb_small_collection() {
321324
false,
322325
false,
323326
true,
327+
false,
324328
)
325329
.await;
326330
let elapsed = start.elapsed();
@@ -354,6 +358,7 @@ async fn benchmark_mongodb_medium_collection() {
354358
false,
355359
false,
356360
true,
361+
false,
357362
)
358363
.await;
359364
let elapsed = start.elapsed();
@@ -391,6 +396,7 @@ async fn benchmark_mysql_small_table() {
391396
false,
392397
false,
393398
true,
399+
false,
394400
)
395401
.await;
396402
let elapsed = start.elapsed();
@@ -424,6 +430,7 @@ async fn benchmark_mysql_medium_table() {
424430
false,
425431
false,
426432
true,
433+
false,
427434
)
428435
.await;
429436
let elapsed = start.elapsed();
@@ -462,6 +469,7 @@ async fn benchmark_jsonb_batch_insert() {
462469
false,
463470
false,
464471
true,
472+
false,
465473
)
466474
.await;
467475
let elapsed = start.elapsed();
@@ -560,6 +568,7 @@ async fn benchmark_many_small_tables() {
560568
false,
561569
false,
562570
true,
571+
false,
563572
)
564573
.await;
565574
let elapsed = start.elapsed();

tests/sqlite_integration_test.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ async fn test_sqlite_full_migration_integration() {
8989
false,
9090
false,
9191
true,
92+
false,
9293
)
9394
.await;
9495

@@ -127,6 +128,7 @@ async fn test_sqlite_null_and_blob_handling() {
127128
false,
128129
false,
129130
true,
131+
false,
130132
)
131133
.await;
132134

@@ -169,6 +171,7 @@ async fn test_sqlite_empty_table_migration() {
169171
false,
170172
false,
171173
true,
174+
false,
172175
)
173176
.await;
174177

@@ -230,6 +233,7 @@ async fn test_sqlite_all_data_types() {
230233
false,
231234
false,
232235
true,
236+
false,
233237
)
234238
.await;
235239

@@ -269,6 +273,7 @@ async fn test_sqlite_empty_database() {
269273
false,
270274
false,
271275
true,
276+
false,
272277
)
273278
.await;
274279

@@ -303,6 +308,7 @@ async fn test_sqlite_invalid_path_fails() {
303308
false,
304309
false,
305310
true,
311+
false,
306312
)
307313
.await;
308314

@@ -334,6 +340,7 @@ async fn test_sqlite_path_traversal_prevention() {
334340
false,
335341
false,
336342
true,
343+
false,
337344
)
338345
.await;
339346

0 commit comments

Comments
 (0)