diff --git a/crates/executor/src/datafusion/logical_analyzer/custom_type_coercion.rs b/crates/executor/src/datafusion/logical_analyzer/custom_type_coercion.rs index 850511f8..00f49ffd 100644 --- a/crates/executor/src/datafusion/logical_analyzer/custom_type_coercion.rs +++ b/crates/executor/src/datafusion/logical_analyzer/custom_type_coercion.rs @@ -39,9 +39,22 @@ impl AnalyzerRule for CustomTypeCoercionRewriter { } fn analyze_internal(plan: &LogicalPlan) -> DFResult> { - // get schema representing all available input fields. This is used for data type - // resolution only, so order does not matter here - let schema = merge_schema(&plan.inputs()); + // Get schema representing all available input fields. Used for data-type + // resolution only, so order doesn't matter. + // + // For leaf plan nodes (e.g. `TableScan`), `plan.inputs()` is empty and + // `merge_schema` returns an empty schema. If we relied on that, filter + // expressions attached to the leaf itself — such as the target filter + // that `UserQuery::merge_query` injects via + // `LogicalPlanBuilder::scan_with_filters` when the MERGE source is a + // partitioned `DataFusionTable` — would see no fields and fail with + // "Schema error: No field named …". Fall back to `plan.schema()` in + // that case so the rewriter can actually look up the column types. + let schema = if plan.inputs().is_empty() { + plan.schema().as_ref().clone() + } else { + merge_schema(&plan.inputs()) + }; let name_preserver = NamePreserver::new(plan); let new_plan = plan.clone().map_expressions(|expr| { diff --git a/crates/executor/src/datafusion/physical_plan/merge.rs b/crates/executor/src/datafusion/physical_plan/merge.rs index 22b63bc7..8caa0af3 100644 --- a/crates/executor/src/datafusion/physical_plan/merge.rs +++ b/crates/executor/src/datafusion/physical_plan/merge.rs @@ -19,6 +19,7 @@ use datafusion_physical_plan::{ SendableRecordBatchStream, coalesce_partitions::CoalescePartitionsExec, execution_plan::{Boundedness, EmissionType}, + metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}, stream::RecordBatchStreamAdapter, }; use futures::{Stream, StreamExt}; @@ -52,6 +53,11 @@ pub struct MergeIntoCOWSinkExec { input: Arc, target: DataFusionTable, properties: PlanProperties, + /// Per-node metrics surfaced via `EXPLAIN ANALYZE`. Populated with + /// `updated_rows` / `inserted_rows` / `deleted_rows` counters after the + /// write transaction commits, so `EXPLAIN ANALYZE MERGE INTO …` reports + /// how many rows each clause produced alongside the child scan metrics. + metrics: ExecutionPlanMetricsSet, } impl MergeIntoCOWSinkExec { @@ -73,6 +79,7 @@ impl MergeIntoCOWSinkExec { input, target, properties, + metrics: ExecutionPlanMetricsSet::new(), } } } @@ -109,6 +116,13 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { vec![&self.input] } + /// Surface per-clause row counts (updated / inserted / deleted) as + /// `EXPLAIN ANALYZE` metrics. Values are populated by `execute()` after + /// the write transaction commits; they're zero until then. + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + fn with_new_children( self: Arc, children: Vec>, @@ -142,6 +156,16 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { let updated_rows: Arc = Arc::new(AtomicI64::new(0)); let inserted_rows: Arc = Arc::new(AtomicI64::new(0)); + // `Count` metrics that surface in `EXPLAIN ANALYZE` as + // `metrics=[updated_rows=…, inserted_rows=…, deleted_rows=…]` on this + // node. Populated below after the write transaction commits. + let updated_rows_metric: Count = + MetricBuilder::new(&self.metrics).counter("updated_rows", partition); + let inserted_rows_metric: Count = + MetricBuilder::new(&self.metrics).counter("inserted_rows", partition); + let deleted_rows_metric: Count = + MetricBuilder::new(&self.metrics).counter("deleted_rows", partition); + let coalesce = CoalescePartitionsExec::new(self.input.clone()); // Filter out rows whoose __data_file_path doesn't have a matching row @@ -163,6 +187,9 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { let schema = schema.clone(); let updated_rows = Arc::clone(&updated_rows); let inserted_rows = Arc::clone(&inserted_rows); + let updated_rows_metric = updated_rows_metric.clone(); + let inserted_rows_metric = inserted_rows_metric.clone(); + let deleted_rows_metric = deleted_rows_metric.clone(); let projected_schema = count_and_project_stream.projected_schema(); let batches: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new( projected_schema, @@ -222,6 +249,13 @@ impl ExecutionPlan for MergeIntoCOWSinkExec { // MERGE DELETE is not supported yet let deleted = 0i64; + // Publish per-clause counts to the `MetricsSet` so + // `EXPLAIN ANALYZE` shows them on the MergeIntoSinkExec line. + // Rely on `try_from` so huge row counts fall back cleanly. + updated_rows_metric.add(usize::try_from(updated).unwrap_or(usize::MAX)); + inserted_rows_metric.add(usize::try_from(inserted).unwrap_or(usize::MAX)); + deleted_rows_metric.add(usize::try_from(deleted).unwrap_or(usize::MAX)); + let arrays = schema .fields() .iter() @@ -655,7 +689,13 @@ impl Stream for MergeCOWFilterStream { .push(filtered_batch); } - if matching_data_and_manifest_files.is_empty() { + // Only take the fast paths if the current batch references no target file + // that will be (or has been) overwritten. Otherwise the full filter path + // below is required so target rows belonging to `all_matching_data_files` + // are re-emitted into the rewritten data file. + if matching_data_and_manifest_files.is_empty() + && all_matching_data_files.is_empty() + { // Return early if all rows only come from source if matching_data_file_array.len() == source_exists_array.len() { return Poll::Ready(Some(Ok(batch))); @@ -1210,4 +1250,24 @@ mod tests { &[(0, 2), (0, 1), (0, 6), (0, 5)], 60 ); + // Regression test for https://github.com/Embucket/embucket/issues/128 + // + // If a target file has been seen as "matching" in an earlier batch and a subsequent + // batch contains only target rows (no `__source_exists` = true rows) for that same + // file, the rows in the later batch must still be passed through the filter so they + // land in the rewritten data file. Previously the "no matches, no source" fast path + // dropped them, causing silent data loss during `MERGE INTO` on unsorted inputs. + test_merge_cow_filter_stream!(matching_then_target, &[(0, 4), (0, 1)], 20); + test_merge_cow_filter_stream!( + matching_then_target_then_matching, + &[(0, 4), (0, 1), (0, 4)], + 30 + ); + // Mixed scenario: several target-only batches arriving AFTER the target file has + // been matched. + test_merge_cow_filter_stream!( + matching_then_multiple_target_batches, + &[(0, 4), (0, 1), (0, 1), (0, 1)], + 40 + ); } diff --git a/crates/executor/src/query.rs b/crates/executor/src/query.rs index 829b3cde..fe19f8d8 100644 --- a/crates/executor/src/query.rs +++ b/crates/executor/src/query.rs @@ -57,7 +57,7 @@ use datafusion::sql::statement::object_name_to_string; use datafusion_common::config::ConfigOptions; use datafusion_common::{ Column, DFSchema, DataFusionError, ParamValues, ResolvedTableReference, SchemaReference, - TableReference, plan_datafusion_err, + TableReference, ToDFSchema, plan_datafusion_err, }; use datafusion_expr::conditional_expressions::CaseBuilder; use datafusion_expr::logical_plan::dml::{DmlStatement, InsertOp, WriteOp}; @@ -464,6 +464,50 @@ impl UserQuery { } } else if let DFStatement::CreateExternalTable(cetable) = statement { return Box::pin(self.create_external_table_query(cetable)).await; + } else if let DFStatement::Explain(explain) = &statement { + // DataFusion's default planner rejects `EXPLAIN MERGE INTO ...` as + // "Unsupported SQL statement: MERGE INTO" because MERGE has its + // own Embucket-side planner (`merge_query`). Intercept the case + // where the inner statement is a MERGE: build the merge logical + // plan ourselves, then wrap it in the equivalent `LogicalPlan::Explain` + // / `LogicalPlan::Analyze` that DataFusion's SQL path would have + // produced. This lets callers actually inspect the plan and see + // physical-level metrics via `EXPLAIN ANALYZE MERGE`. + if let DFStatement::Statement(inner) = explain.statement.as_ref() + && matches!(inner.as_ref(), Statement::Merge { .. }) + { + let analyze = explain.analyze; + let verbose = explain.verbose; + let format = explain.format.clone(); + let merge_stmt = (**inner).clone(); + let merge_plan = Box::pin(self.merge_to_logical_plan(merge_stmt)).await?; + let merge_plan = Arc::new(merge_plan); + let schema = datafusion_expr::LogicalPlan::explain_schema() + .to_dfschema_ref() + .context(ex_error::DataFusionSnafu)?; + let wrapped = if analyze { + LogicalPlan::Analyze(datafusion_expr::logical_plan::Analyze { + verbose, + input: merge_plan, + schema, + }) + } else { + let explain_format = match format.as_deref() { + Some(f) => datafusion_expr::logical_plan::ExplainFormat::from_str(f) + .unwrap_or(datafusion_expr::logical_plan::ExplainFormat::Indent), + None => datafusion_expr::logical_plan::ExplainFormat::Indent, + }; + LogicalPlan::Explain(datafusion_expr::logical_plan::Explain { + verbose, + explain_format, + plan: merge_plan, + stringified_plans: vec![], + schema, + logical_optimization_succeeded: false, + }) + }; + return self.execute_logical_plan(wrapped).await; + } } self.execute_sql(&self.query).await } @@ -1281,9 +1325,25 @@ impl UserQuery { } } - #[allow(clippy::too_many_lines)] #[instrument(name = "UserQuery::merge_query", level = "trace", skip(self), err)] pub async fn merge_query(&self, statement: Statement) -> Result { + let plan = self.merge_to_logical_plan(statement).await?; + self.execute_logical_plan(plan).await + } + + /// Builds the logical plan for a `MERGE INTO` statement without executing + /// it. Shared between `merge_query` (which runs the plan) and the + /// `DFStatement::Explain` routing in `execute` (which wraps it in + /// `LogicalPlan::Explain` / `LogicalPlan::Analyze` so callers can see the + /// plan or live physical metrics without a separate SQL path). + #[allow(clippy::too_many_lines)] + #[instrument( + name = "UserQuery::merge_to_logical_plan", + level = "trace", + skip(self), + err + )] + pub async fn merge_to_logical_plan(&self, statement: Statement) -> Result { let Statement::Merge { table: target, source, @@ -1499,10 +1559,9 @@ impl UserQuery { ) .context(ex_error::DataFusionSnafu)?; - self.execute_logical_plan(LogicalPlan::Extension(Extension { + Ok(LogicalPlan::Extension(Extension { node: Arc::new(merge_into_plan), })) - .await } #[instrument(name = "UserQuery::create_database", level = "trace", skip(self), err)] diff --git a/crates/executor/src/tests/query.rs b/crates/executor/src/tests/query.rs index d9090784..74e6842e 100644 --- a/crates/executor/src/tests/query.rs +++ b/crates/executor/src/tests/query.rs @@ -201,6 +201,11 @@ macro_rules! test_query { settings.add_filter(r"(?i)\b(metadata_load_time|time_elapsed_opening|time_elapsed_processing|time_elapsed_scanning_total|time_elapsed_scanning_until_data|elapsed_compute|bloom_filter_eval_time|page_index_eval_time|row_pushdown_eval_time|statistics_eval_time)\s*=\s*[0-9]+(?:\.[0-9]+)?\s*(?:ns|µs|us|ms|s)", "$1=[TIME]"); settings.add_filter(r"(-{130})(-{1,})", "$1"); settings.add_filter(r"( {100})( {1,})", "$1"); + // RoundRobinBatch fan-out equals the DataFusion planner's partition + // target, which in practice is the host CPU count. Normalize it so + // EXPLAIN snapshots don't flake between 4-core CI and dev boxes with + // different core counts. + settings.add_filter(r"RoundRobinBatch\(\d+\)", "RoundRobinBatch([N])"); let setup: Vec<&str> = vec![$($($setup_queries),*)?]; if !setup.is_empty() { diff --git a/crates/executor/src/tests/sql/ddl/merge_into.rs b/crates/executor/src/tests/sql/ddl/merge_into.rs index e4acaeec..6cda99e0 100644 --- a/crates/executor/src/tests/sql/ddl/merge_into.rs +++ b/crates/executor/src/tests/sql/ddl/merge_into.rs @@ -1,5 +1,28 @@ use crate::test_query; +// Observability: `EXPLAIN MERGE INTO ...` must work. Before the routing +// fix, Embucket rejected it with +// "SQL compilation error: unsupported feature: Unsupported SQL statement: +// MERGE INTO" because `execute()` never unwrapped +// `DFStatement::Explain(..MERGE..)` and fell through to DataFusion's default +// SQL path, which doesn't know about Embucket's MERGE planner. +// +// This test covers the plan shape only. `EXPLAIN ANALYZE MERGE` is +// exercised end-to-end against the deployed Lambda — its output contains +// per-run metric values whose width varies the formatted-table column +// padding, which is too unstable for an insta snapshot. +test_query!( + merge_into_explain, + "EXPLAIN MERGE INTO merge_target USING merge_source ON merge_target.id = merge_source.id WHEN MATCHED THEN UPDATE SET merge_target.description = merge_source.description", + setup_queries = [ + "CREATE TABLE embucket.public.merge_target (ID INTEGER, description VARCHAR)", + "CREATE TABLE embucket.public.merge_source (ID INTEGER, description VARCHAR)", + "INSERT INTO embucket.public.merge_target VALUES (1, 'existing row')", + "INSERT INTO embucket.public.merge_source VALUES (1, 'updated row')", + ], + snapshot_path = "merge_into" +); + test_query!( merge_into_only_update, "SELECT count(CASE WHEN description = 'updated row' THEN 1 ELSE NULL END) updated, count(CASE WHEN description = 'existing row' THEN 1 ELSE NULL END) existing FROM embucket.public.merge_target", @@ -299,3 +322,25 @@ test_query!( ], snapshot_path = "merge_into" ); + +// Regression test for https://github.com/Embucket/embucket/issues/128. +// +// Target is one data file with many rows; source is a mix of updates (matches) and +// inserts (no match), and the target rows of the join land in the filter stream in +// batches where some contain source_exists=true rows and some only contain target +// rows. Previously the "no matches, no source" fast path would silently drop the +// target-only batches for a file that had already been marked as matching in an +// earlier batch, causing the final row count to be less than the expected +// (target_rows + new_source_rows). This test asserts that no target row is lost. +test_query!( + merge_into_mixed_unsorted_multi_row_no_data_loss, + "SELECT COUNT(*) as total_rows, COUNT(CASE WHEN description = 'updated row' THEN 1 END) as updated_rows, COUNT(CASE WHEN description = 'original row' THEN 1 END) as preserved_rows, COUNT(CASE WHEN description = 'new row' THEN 1 END) as inserted_rows FROM embucket.public.merge_target", + setup_queries = [ + "CREATE TABLE embucket.public.merge_target (id INTEGER, description VARCHAR)", + "CREATE TABLE embucket.public.merge_source (id INTEGER, description VARCHAR)", + "INSERT INTO embucket.public.merge_target VALUES (1, 'original row'), (2, 'original row'), (3, 'original row'), (4, 'original row'), (5, 'original row'), (6, 'original row'), (7, 'original row'), (8, 'original row'), (9, 'original row'), (10, 'original row')", + "INSERT INTO embucket.public.merge_source VALUES (3, 'updated row'), (7, 'updated row'), (11, 'new row'), (12, 'new row')", + "MERGE INTO merge_target t USING merge_source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET t.description = s.description WHEN NOT MATCHED THEN INSERT (id, description) VALUES (s.id, s.description)", + ], + snapshot_path = "merge_into" +); diff --git a/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap b/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap new file mode 100644 index 00000000..4591fb22 --- /dev/null +++ b/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_explain.snap @@ -0,0 +1,34 @@ +--- +source: crates/executor/src/tests/sql/ddl/merge_into.rs +description: "\"EXPLAIN MERGE INTO merge_target USING merge_source ON merge_target.id = merge_source.id WHEN MATCHED THEN UPDATE SET merge_target.description = merge_source.description\"" +info: "Setup queries: CREATE TABLE embucket.public.merge_target (ID INTEGER, description VARCHAR); CREATE TABLE embucket.public.merge_source (ID INTEGER, description VARCHAR); INSERT INTO embucket.public.merge_target VALUES (1, 'existing row'); INSERT INTO embucket.public.merge_source VALUES (1, 'updated row')" +--- +Ok( + [ + "+---------------+----------------------------------------------------------------------------------------------------------------------------------+", + "| plan_type | plan |", + "+---------------+----------------------------------------------------------------------------------------------------------------------------------+", + "| logical_plan | MergeIntoSink |", + "| | Projection: embucket.public.merge_target.id, CASE WHEN __common_expr_1 THEN TRY_CAST(embucket.public.merge_source.description AS Utf8) ELSE embucket.public.merge_target.description END AS description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, __target_exists, __source_exists, __common_expr_1 AS __merge_row_updated, Boolean(false) AS __merge_row_inserted |", + "| | Projection: __target_exists AND __source_exists AS __common_expr_1, embucket.public.merge_target.id, embucket.public.merge_target.description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, __target_exists, embucket.public.merge_source.id, embucket.public.merge_source.description, __source_exists |", + "| | Full Join: embucket.public.merge_target.id = embucket.public.merge_source.id |", + "| | Projection: embucket.public.merge_target.id, embucket.public.merge_target.description, embucket.public.merge_target.__data_file_path, embucket.public.merge_target.__manifest_file_path, Boolean(true) AS __target_exists |", + "| | TableScan: embucket.public.merge_target |", + "| | Projection: embucket.public.merge_source.id, embucket.public.merge_source.description, Boolean(true) AS __source_exists |", + "| | TableScan: embucket.public.merge_source |", + "| physical_plan | MergeIntoSinkExec |", + "| | ProjectionExec: expr=[id@1 as id, CASE WHEN __common_expr_1@0 THEN description@7 ELSE description@2 END as description, __data_file_path@3 as __data_file_path, __manifest_file_path@4 as __manifest_file_path, __target_exists@5 as __target_exists, __source_exists@8 as __source_exists, __common_expr_1@0 as __merge_row_updated, false as __merge_row_inserted] |", + "| | ProjectionExec: expr=[__target_exists@4 AND __source_exists@7 as __common_expr_1, id@0 as id, description@1 as description, __data_file_path@2 as __data_file_path, __manifest_file_path@3 as __manifest_file_path, __target_exists@4 as __target_exists, id@5 as id, description@6 as description, __source_exists@7 as __source_exists] |", + "| | RepartitionExec: partitioning=RoundRobinBatch([N]), input_partitions=1 |", + "| | ProjectionExec: expr=[id@3 as id, description@4 as description, __data_file_path@5 as __data_file_path, __manifest_file_path@6 as __manifest_file_path, __target_exists@7 as __target_exists, id@0 as id, description@1 as description, __source_exists@2 as __source_exists] |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | CoalesceBatchesExec: target_batch_size=8192 |", + "| | HashJoinExec: mode=CollectLeft, join_type=Full, on=[(id@0, id@0)] |", + "| | ProjectionExec: expr=[id@0 as id, description@1 as description, true as __source_exists] |", + "| | DataSourceExec: file_groups={1 group: [[/[PATH]/testing/data/[HEX]/[UUID].parquet]]}, projection=[id, description], file_type=parquet |", + "| | ProjectionExec: expr=[id@0 as id, description@1 as description, __data_file_path@2 as __data_file_path, __manifest_file_path@3 as __manifest_file_path, true as __target_exists] |", + "| | DataSourceExec: file_groups={1 group: [[/[PATH]/testing/data/[HEX]/[UUID].parquet]]}, projection=[id, description, __data_file_path, __manifest_file_path], file_type=parquet |", + "| | |", + "+---------------+----------------------------------------------------------------------------------------------------------------------------------+", + ], +) diff --git a/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_mixed_unsorted_multi_row_no_data_loss.snap b/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_mixed_unsorted_multi_row_no_data_loss.snap new file mode 100644 index 00000000..3dd7a8b1 --- /dev/null +++ b/crates/executor/src/tests/sql/ddl/snapshots/merge_into/query_merge_into_mixed_unsorted_multi_row_no_data_loss.snap @@ -0,0 +1,14 @@ +--- +source: crates/executor/src/tests/sql/ddl/merge_into.rs +description: "\"SELECT COUNT(*) as total_rows, COUNT(CASE WHEN description = 'updated row' THEN 1 END) as updated_rows, COUNT(CASE WHEN description = 'original row' THEN 1 END) as preserved_rows, COUNT(CASE WHEN description = 'new row' THEN 1 END) as inserted_rows FROM embucket.public.merge_target\"" +info: "Setup queries: CREATE TABLE embucket.public.merge_target (id INTEGER, description VARCHAR); CREATE TABLE embucket.public.merge_source (id INTEGER, description VARCHAR); INSERT INTO embucket.public.merge_target VALUES (1, 'original row'), (2, 'original row'), (3, 'original row'), (4, 'original row'), (5, 'original row'), (6, 'original row'), (7, 'original row'), (8, 'original row'), (9, 'original row'), (10, 'original row'); INSERT INTO embucket.public.merge_source VALUES (3, 'updated row'), (7, 'updated row'), (11, 'new row'), (12, 'new row'); MERGE INTO merge_target t USING merge_source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET t.description = s.description WHEN NOT MATCHED THEN INSERT (id, description) VALUES (s.id, s.description)" +--- +Ok( + [ + "+------------+--------------+----------------+---------------+", + "| total_rows | updated_rows | preserved_rows | inserted_rows |", + "+------------+--------------+----------------+---------------+", + "| 12 | 2 | 8 | 2 |", + "+------------+--------------+----------------+---------------+", + ], +)