Skip to content

Commit 63669ca

Browse files
rampage644claude
andauthored
fix(merge): partition pruning, EXPLAIN routing, and data-loss fix on the MERGE path (rebased) (#134)
* fix(custom_type_coercion): fall back to plan.schema() for leaf nodes `CustomTypeCoercionRewriter::analyze_internal` built its lookup schema from `merge_schema(&plan.inputs())`. For leaf nodes like `LogicalPlan::TableScan`, `plan.inputs()` is empty and the merged schema has no fields, so any binary-op expression attached directly to the leaf — e.g. via `LogicalPlanBuilder::scan_with_filters` — would fail coercion with "Schema error: No field named <col>" during the analyzer pass. This broke the target-side partition pruning hint path that `UserQuery::merge_query` wires up when a MERGE source is a partitioned `DataFusionTable`: `target_filter_expression()` builds a per-partition `col(source) >= min AND col(source) <= max` predicate and pushes it into the target `TableScan`'s filters via `scan_with_filters`, expecting Iceberg's file pruner to use it at manifest level. The filter never made it past the analyzer. Fix: when `plan.inputs().is_empty()`, use `plan.schema()` directly for type resolution, mirroring the pattern DataFusion's built-in `TypeCoercion` analyzer uses. All existing `custom_type_coercion` snapshot tests still pass, and the full `merge_into` suite (22 tests) stays green. Verified end-to-end against a deployed Embucket Lambda: `MERGE INTO demo.atomic.events_hooli_tiny USING demo.atomic.events_hooli_ident` where the source is partitioned by `identity(event_name)` — previously failed with `custom_type_coercion / Schema error: No field named event_name`, now returns 100 matched rows and the update lands on disk. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(merge): route EXPLAIN / EXPLAIN ANALYZE into the MERGE planner Embucket has its own MERGE planner (`UserQuery::merge_query`) because DataFusion's SQL path doesn't produce a usable plan for `MERGE INTO`. The side effect was that `EXPLAIN MERGE INTO …` and `EXPLAIN ANALYZE MERGE INTO …` both fell through `execute()` to `execute_sql`, which hands the statement to DataFusion's planner and bounces back with: SQL compilation error: unsupported feature: Unsupported SQL statement: MERGE INTO … No observability for MERGE plans or for per-scan metrics — which made it impossible to verify partition-pruning behaviour on partitioned Iceberg targets (files scanned, bytes scanned, manifest-level pruning counters). Changes: 1. Split `merge_query` into a pure plan-builder `merge_to_logical_plan` and a thin wrapper that calls `execute_logical_plan`. 2. In `execute()`, when the parsed statement is `DFStatement::Explain(..)` whose inner statement is `Statement::Merge { .. }`, build the MERGE logical plan via `merge_to_logical_plan`, then wrap it in the same `LogicalPlan::Explain` / `LogicalPlan::Analyze` shape DataFusion's own `explain_to_plan` constructs. Everything downstream (physical planning, execution, output formatting) is unchanged. 3. Add a snapshot test `merge_into_explain` over a minimal unpartitioned target + source — asserts the logical and physical plans render. `EXPLAIN ANALYZE` is exercised end-to-end through the deployed Lambda rather than via snapshot because the formatted-table column widths depend on the pre-redaction metric value widths and aren't stable across runs. After this change: - `EXPLAIN MERGE INTO t USING s ON ... WHEN MATCHED THEN UPDATE ...` returns the logical plan + physical plan (including `MergeIntoSinkExec`, `HashJoinExec`, `DataSourceExec { file_groups, projection, file_type }` for each side). - `EXPLAIN ANALYZE` of the same statement executes the MERGE and additionally reports per-node runtime metrics. The `DataSourceExec` rows now surface the DataFusion/Parquet scan counters that were previously invisible: `bytes_scanned`, `files_ranges_pruned_statistics`, `row_groups_pruned_statistics`, `pushdown_rows_pruned`, `page_index_rows_pruned`. That's the signal you need to verify source-side partition-hint pruning actually prunes. All 23 `merge_into` tests pass (22 existing + 1 new). Full `cargo test -p executor --lib` is 359/0. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(merge): expose updated/inserted/deleted row counts as MetricsSet `MergeIntoCOWSinkExec` tracked per-clause row counts in `AtomicI64` purely to populate the final result batch. After the EXPLAIN / EXPLAIN ANALYZE routing fix on this branch, `EXPLAIN ANALYZE MERGE INTO …` reports rich per-scan metrics on every `DataSourceExec` in the plan, but the sink line was still rendering as `MergeIntoSinkExec, metrics=[]` because this node didn't own an `ExecutionPlanMetricsSet`. Wire one up: register `Count` metrics `updated_rows`, `inserted_rows`, and `deleted_rows` via `MetricBuilder::new(&self.metrics).counter(..)` at the start of `execute()`, clone them into the async write closure, and `add()` the final `AtomicI64` values after the transaction commits. Implement `ExecutionPlan::metrics()` to return `Some(self.metrics.clone_inner())` so DataFusion's plan formatter picks them up. Row counts that exceed `usize::MAX` saturate via `try_from` rather than panicking. After this change, `EXPLAIN ANALYZE MERGE` shows the sink counters alongside the child scan counters, so an operator can read updated / inserted / deleted counts directly off the plan output instead of only from the result row. All 23 `merge_into` tests stay green. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(merge): preserve target rows when MERGE batch contains only target The MergeCOWFilterStream "no matches in this batch" fast path short-circuited on `matching_data_and_manifest_files.is_empty()` without checking the cumulative `all_matching_data_files` set. If a target file had been seen as matching in an earlier batch and a later batch contained only target rows for that file, the rows in the later batch were silently dropped. The downstream COW commit then overwrote the original file with the partial result, permanently losing the unmatched target rows whose batch hit the dead path. The fix tightens the guard to also require `all_matching_data_files` to be empty before taking the fast path. When a batch belongs to a file already in the overwrite set, it falls through to the main filter path which correctly emits target rows via `file_predicate OR source_exists`. Adds three unit tests against MergeCOWFilterStream covering the matching-then-target patterns, plus a SQL snapshot test that exercises the same shape end-to-end. Fixes #128 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style: apply rustfmt after rebase on main Rustfmt on Rust 1.94 formats the long #[instrument(...)] attribute and the following fn signature differently than the PR was originally authored against. No semantic change. * style(merge): collapse nested if into let-chain for EXPLAIN MERGE route Rust 1.94 clippy (clippy::collapsible_if, denied via clippy::all) flags the nested `if let ... { if matches!(...) { ... } }` guard in execute(). Merge both conditions into a single let-chain so clippy is happy without changing the observable behaviour of the MERGE EXPLAIN routing. * test(merge): normalize RoundRobinBatch fan-out in EXPLAIN snapshots The DataFusion planner uses the host CPU count as the RoundRobinBatch partition target, so the EXPLAIN snapshot literal differed between the PR author's dev box (10 cores) and the 4-core ubuntu-latest GitHub runner. Add an insta filter to the shared test_query! macro that rewrites `RoundRobinBatch(N)` to `RoundRobinBatch([N])`, and regenerate the `query_merge_into_explain` snapshot to use the normalized token so the test is stable across core counts. * ci: re-trigger clippy after transient actions/checkout HTTP 500 --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 7482768 commit 63669ca

7 files changed

Lines changed: 238 additions & 8 deletions

File tree

crates/executor/src/datafusion/logical_analyzer/custom_type_coercion.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,22 @@ impl AnalyzerRule for CustomTypeCoercionRewriter {
3939
}
4040

4141
fn analyze_internal(plan: &LogicalPlan) -> DFResult<Transformed<LogicalPlan>> {
42-
// get schema representing all available input fields. This is used for data type
43-
// resolution only, so order does not matter here
44-
let schema = merge_schema(&plan.inputs());
42+
// Get schema representing all available input fields. Used for data-type
43+
// resolution only, so order doesn't matter.
44+
//
45+
// For leaf plan nodes (e.g. `TableScan`), `plan.inputs()` is empty and
46+
// `merge_schema` returns an empty schema. If we relied on that, filter
47+
// expressions attached to the leaf itself — such as the target filter
48+
// that `UserQuery::merge_query` injects via
49+
// `LogicalPlanBuilder::scan_with_filters` when the MERGE source is a
50+
// partitioned `DataFusionTable` — would see no fields and fail with
51+
// "Schema error: No field named …". Fall back to `plan.schema()` in
52+
// that case so the rewriter can actually look up the column types.
53+
let schema = if plan.inputs().is_empty() {
54+
plan.schema().as_ref().clone()
55+
} else {
56+
merge_schema(&plan.inputs())
57+
};
4558

4659
let name_preserver = NamePreserver::new(plan);
4760
let new_plan = plan.clone().map_expressions(|expr| {

crates/executor/src/datafusion/physical_plan/merge.rs

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use datafusion_physical_plan::{
1919
SendableRecordBatchStream,
2020
coalesce_partitions::CoalescePartitionsExec,
2121
execution_plan::{Boundedness, EmissionType},
22+
metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet},
2223
stream::RecordBatchStreamAdapter,
2324
};
2425
use futures::{Stream, StreamExt};
@@ -52,6 +53,11 @@ pub struct MergeIntoCOWSinkExec {
5253
input: Arc<dyn ExecutionPlan>,
5354
target: DataFusionTable,
5455
properties: PlanProperties,
56+
/// Per-node metrics surfaced via `EXPLAIN ANALYZE`. Populated with
57+
/// `updated_rows` / `inserted_rows` / `deleted_rows` counters after the
58+
/// write transaction commits, so `EXPLAIN ANALYZE MERGE INTO …` reports
59+
/// how many rows each clause produced alongside the child scan metrics.
60+
metrics: ExecutionPlanMetricsSet,
5561
}
5662

5763
impl MergeIntoCOWSinkExec {
@@ -73,6 +79,7 @@ impl MergeIntoCOWSinkExec {
7379
input,
7480
target,
7581
properties,
82+
metrics: ExecutionPlanMetricsSet::new(),
7683
}
7784
}
7885
}
@@ -109,6 +116,13 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
109116
vec![&self.input]
110117
}
111118

119+
/// Surface per-clause row counts (updated / inserted / deleted) as
120+
/// `EXPLAIN ANALYZE` metrics. Values are populated by `execute()` after
121+
/// the write transaction commits; they're zero until then.
122+
fn metrics(&self) -> Option<MetricsSet> {
123+
Some(self.metrics.clone_inner())
124+
}
125+
112126
fn with_new_children(
113127
self: Arc<Self>,
114128
children: Vec<Arc<dyn ExecutionPlan>>,
@@ -142,6 +156,16 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
142156
let updated_rows: Arc<AtomicI64> = Arc::new(AtomicI64::new(0));
143157
let inserted_rows: Arc<AtomicI64> = Arc::new(AtomicI64::new(0));
144158

159+
// `Count` metrics that surface in `EXPLAIN ANALYZE` as
160+
// `metrics=[updated_rows=…, inserted_rows=…, deleted_rows=…]` on this
161+
// node. Populated below after the write transaction commits.
162+
let updated_rows_metric: Count =
163+
MetricBuilder::new(&self.metrics).counter("updated_rows", partition);
164+
let inserted_rows_metric: Count =
165+
MetricBuilder::new(&self.metrics).counter("inserted_rows", partition);
166+
let deleted_rows_metric: Count =
167+
MetricBuilder::new(&self.metrics).counter("deleted_rows", partition);
168+
145169
let coalesce = CoalescePartitionsExec::new(self.input.clone());
146170

147171
// Filter out rows whoose __data_file_path doesn't have a matching row
@@ -163,6 +187,9 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
163187
let schema = schema.clone();
164188
let updated_rows = Arc::clone(&updated_rows);
165189
let inserted_rows = Arc::clone(&inserted_rows);
190+
let updated_rows_metric = updated_rows_metric.clone();
191+
let inserted_rows_metric = inserted_rows_metric.clone();
192+
let deleted_rows_metric = deleted_rows_metric.clone();
166193
let projected_schema = count_and_project_stream.projected_schema();
167194
let batches: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
168195
projected_schema,
@@ -222,6 +249,13 @@ impl ExecutionPlan for MergeIntoCOWSinkExec {
222249
// MERGE DELETE is not supported yet
223250
let deleted = 0i64;
224251

252+
// Publish per-clause counts to the `MetricsSet` so
253+
// `EXPLAIN ANALYZE` shows them on the MergeIntoSinkExec line.
254+
// Rely on `try_from` so huge row counts fall back cleanly.
255+
updated_rows_metric.add(usize::try_from(updated).unwrap_or(usize::MAX));
256+
inserted_rows_metric.add(usize::try_from(inserted).unwrap_or(usize::MAX));
257+
deleted_rows_metric.add(usize::try_from(deleted).unwrap_or(usize::MAX));
258+
225259
let arrays = schema
226260
.fields()
227261
.iter()
@@ -655,7 +689,13 @@ impl Stream for MergeCOWFilterStream {
655689
.push(filtered_batch);
656690
}
657691

658-
if matching_data_and_manifest_files.is_empty() {
692+
// Only take the fast paths if the current batch references no target file
693+
// that will be (or has been) overwritten. Otherwise the full filter path
694+
// below is required so target rows belonging to `all_matching_data_files`
695+
// are re-emitted into the rewritten data file.
696+
if matching_data_and_manifest_files.is_empty()
697+
&& all_matching_data_files.is_empty()
698+
{
659699
// Return early if all rows only come from source
660700
if matching_data_file_array.len() == source_exists_array.len() {
661701
return Poll::Ready(Some(Ok(batch)));
@@ -1210,4 +1250,24 @@ mod tests {
12101250
&[(0, 2), (0, 1), (0, 6), (0, 5)],
12111251
60
12121252
);
1253+
// Regression test for https://github.com/Embucket/embucket/issues/128
1254+
//
1255+
// If a target file has been seen as "matching" in an earlier batch and a subsequent
1256+
// batch contains only target rows (no `__source_exists` = true rows) for that same
1257+
// file, the rows in the later batch must still be passed through the filter so they
1258+
// land in the rewritten data file. Previously the "no matches, no source" fast path
1259+
// dropped them, causing silent data loss during `MERGE INTO` on unsorted inputs.
1260+
test_merge_cow_filter_stream!(matching_then_target, &[(0, 4), (0, 1)], 20);
1261+
test_merge_cow_filter_stream!(
1262+
matching_then_target_then_matching,
1263+
&[(0, 4), (0, 1), (0, 4)],
1264+
30
1265+
);
1266+
// Mixed scenario: several target-only batches arriving AFTER the target file has
1267+
// been matched.
1268+
test_merge_cow_filter_stream!(
1269+
matching_then_multiple_target_batches,
1270+
&[(0, 4), (0, 1), (0, 1), (0, 1)],
1271+
40
1272+
);
12131273
}

crates/executor/src/query.rs

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ use datafusion::sql::statement::object_name_to_string;
5757
use datafusion_common::config::ConfigOptions;
5858
use datafusion_common::{
5959
Column, DFSchema, DataFusionError, ParamValues, ResolvedTableReference, SchemaReference,
60-
TableReference, plan_datafusion_err,
60+
TableReference, ToDFSchema, plan_datafusion_err,
6161
};
6262
use datafusion_expr::conditional_expressions::CaseBuilder;
6363
use datafusion_expr::logical_plan::dml::{DmlStatement, InsertOp, WriteOp};
@@ -464,6 +464,50 @@ impl UserQuery {
464464
}
465465
} else if let DFStatement::CreateExternalTable(cetable) = statement {
466466
return Box::pin(self.create_external_table_query(cetable)).await;
467+
} else if let DFStatement::Explain(explain) = &statement {
468+
// DataFusion's default planner rejects `EXPLAIN MERGE INTO ...` as
469+
// "Unsupported SQL statement: MERGE INTO" because MERGE has its
470+
// own Embucket-side planner (`merge_query`). Intercept the case
471+
// where the inner statement is a MERGE: build the merge logical
472+
// plan ourselves, then wrap it in the equivalent `LogicalPlan::Explain`
473+
// / `LogicalPlan::Analyze` that DataFusion's SQL path would have
474+
// produced. This lets callers actually inspect the plan and see
475+
// physical-level metrics via `EXPLAIN ANALYZE MERGE`.
476+
if let DFStatement::Statement(inner) = explain.statement.as_ref()
477+
&& matches!(inner.as_ref(), Statement::Merge { .. })
478+
{
479+
let analyze = explain.analyze;
480+
let verbose = explain.verbose;
481+
let format = explain.format.clone();
482+
let merge_stmt = (**inner).clone();
483+
let merge_plan = Box::pin(self.merge_to_logical_plan(merge_stmt)).await?;
484+
let merge_plan = Arc::new(merge_plan);
485+
let schema = datafusion_expr::LogicalPlan::explain_schema()
486+
.to_dfschema_ref()
487+
.context(ex_error::DataFusionSnafu)?;
488+
let wrapped = if analyze {
489+
LogicalPlan::Analyze(datafusion_expr::logical_plan::Analyze {
490+
verbose,
491+
input: merge_plan,
492+
schema,
493+
})
494+
} else {
495+
let explain_format = match format.as_deref() {
496+
Some(f) => datafusion_expr::logical_plan::ExplainFormat::from_str(f)
497+
.unwrap_or(datafusion_expr::logical_plan::ExplainFormat::Indent),
498+
None => datafusion_expr::logical_plan::ExplainFormat::Indent,
499+
};
500+
LogicalPlan::Explain(datafusion_expr::logical_plan::Explain {
501+
verbose,
502+
explain_format,
503+
plan: merge_plan,
504+
stringified_plans: vec![],
505+
schema,
506+
logical_optimization_succeeded: false,
507+
})
508+
};
509+
return self.execute_logical_plan(wrapped).await;
510+
}
467511
}
468512
self.execute_sql(&self.query).await
469513
}
@@ -1281,9 +1325,25 @@ impl UserQuery {
12811325
}
12821326
}
12831327

1284-
#[allow(clippy::too_many_lines)]
12851328
#[instrument(name = "UserQuery::merge_query", level = "trace", skip(self), err)]
12861329
pub async fn merge_query(&self, statement: Statement) -> Result<QueryResult> {
1330+
let plan = self.merge_to_logical_plan(statement).await?;
1331+
self.execute_logical_plan(plan).await
1332+
}
1333+
1334+
/// Builds the logical plan for a `MERGE INTO` statement without executing
1335+
/// it. Shared between `merge_query` (which runs the plan) and the
1336+
/// `DFStatement::Explain` routing in `execute` (which wraps it in
1337+
/// `LogicalPlan::Explain` / `LogicalPlan::Analyze` so callers can see the
1338+
/// plan or live physical metrics without a separate SQL path).
1339+
#[allow(clippy::too_many_lines)]
1340+
#[instrument(
1341+
name = "UserQuery::merge_to_logical_plan",
1342+
level = "trace",
1343+
skip(self),
1344+
err
1345+
)]
1346+
pub async fn merge_to_logical_plan(&self, statement: Statement) -> Result<LogicalPlan> {
12871347
let Statement::Merge {
12881348
table: target,
12891349
source,
@@ -1499,10 +1559,9 @@ impl UserQuery {
14991559
)
15001560
.context(ex_error::DataFusionSnafu)?;
15011561

1502-
self.execute_logical_plan(LogicalPlan::Extension(Extension {
1562+
Ok(LogicalPlan::Extension(Extension {
15031563
node: Arc::new(merge_into_plan),
15041564
}))
1505-
.await
15061565
}
15071566

15081567
#[instrument(name = "UserQuery::create_database", level = "trace", skip(self), err)]

crates/executor/src/tests/query.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,11 @@ macro_rules! test_query {
201201
settings.add_filter(r"(?i)\b(metadata_load_time|time_elapsed_opening|time_elapsed_processing|time_elapsed_scanning_total|time_elapsed_scanning_until_data|elapsed_compute|bloom_filter_eval_time|page_index_eval_time|row_pushdown_eval_time|statistics_eval_time)\s*=\s*[0-9]+(?:\.[0-9]+)?\s*(?:ns|µs|us|ms|s)", "$1=[TIME]");
202202
settings.add_filter(r"(-{130})(-{1,})", "$1");
203203
settings.add_filter(r"( {100})( {1,})", "$1");
204+
// RoundRobinBatch fan-out equals the DataFusion planner's partition
205+
// target, which in practice is the host CPU count. Normalize it so
206+
// EXPLAIN snapshots don't flake between 4-core CI and dev boxes with
207+
// different core counts.
208+
settings.add_filter(r"RoundRobinBatch\(\d+\)", "RoundRobinBatch([N])");
204209

205210
let setup: Vec<&str> = vec![$($($setup_queries),*)?];
206211
if !setup.is_empty() {

crates/executor/src/tests/sql/ddl/merge_into.rs

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,28 @@
11
use crate::test_query;
22

3+
// Observability: `EXPLAIN MERGE INTO ...` must work. Before the routing
4+
// fix, Embucket rejected it with
5+
// "SQL compilation error: unsupported feature: Unsupported SQL statement:
6+
// MERGE INTO" because `execute()` never unwrapped
7+
// `DFStatement::Explain(..MERGE..)` and fell through to DataFusion's default
8+
// SQL path, which doesn't know about Embucket's MERGE planner.
9+
//
10+
// This test covers the plan shape only. `EXPLAIN ANALYZE MERGE` is
11+
// exercised end-to-end against the deployed Lambda — its output contains
12+
// per-run metric values whose width varies the formatted-table column
13+
// padding, which is too unstable for an insta snapshot.
14+
test_query!(
15+
merge_into_explain,
16+
"EXPLAIN MERGE INTO merge_target USING merge_source ON merge_target.id = merge_source.id WHEN MATCHED THEN UPDATE SET merge_target.description = merge_source.description",
17+
setup_queries = [
18+
"CREATE TABLE embucket.public.merge_target (ID INTEGER, description VARCHAR)",
19+
"CREATE TABLE embucket.public.merge_source (ID INTEGER, description VARCHAR)",
20+
"INSERT INTO embucket.public.merge_target VALUES (1, 'existing row')",
21+
"INSERT INTO embucket.public.merge_source VALUES (1, 'updated row')",
22+
],
23+
snapshot_path = "merge_into"
24+
);
25+
326
test_query!(
427
merge_into_only_update,
528
"SELECT count(CASE WHEN description = 'updated row' THEN 1 ELSE NULL END) updated, count(CASE WHEN description = 'existing row' THEN 1 ELSE NULL END) existing FROM embucket.public.merge_target",
@@ -299,3 +322,25 @@ test_query!(
299322
],
300323
snapshot_path = "merge_into"
301324
);
325+
326+
// Regression test for https://github.com/Embucket/embucket/issues/128.
327+
//
328+
// Target is one data file with many rows; source is a mix of updates (matches) and
329+
// inserts (no match), and the target rows of the join land in the filter stream in
330+
// batches where some contain source_exists=true rows and some only contain target
331+
// rows. Previously the "no matches, no source" fast path would silently drop the
332+
// target-only batches for a file that had already been marked as matching in an
333+
// earlier batch, causing the final row count to be less than the expected
334+
// (target_rows + new_source_rows). This test asserts that no target row is lost.
335+
test_query!(
336+
merge_into_mixed_unsorted_multi_row_no_data_loss,
337+
"SELECT COUNT(*) as total_rows, COUNT(CASE WHEN description = 'updated row' THEN 1 END) as updated_rows, COUNT(CASE WHEN description = 'original row' THEN 1 END) as preserved_rows, COUNT(CASE WHEN description = 'new row' THEN 1 END) as inserted_rows FROM embucket.public.merge_target",
338+
setup_queries = [
339+
"CREATE TABLE embucket.public.merge_target (id INTEGER, description VARCHAR)",
340+
"CREATE TABLE embucket.public.merge_source (id INTEGER, description VARCHAR)",
341+
"INSERT INTO embucket.public.merge_target VALUES (1, 'original row'), (2, 'original row'), (3, 'original row'), (4, 'original row'), (5, 'original row'), (6, 'original row'), (7, 'original row'), (8, 'original row'), (9, 'original row'), (10, 'original row')",
342+
"INSERT INTO embucket.public.merge_source VALUES (3, 'updated row'), (7, 'updated row'), (11, 'new row'), (12, 'new row')",
343+
"MERGE INTO merge_target t USING merge_source s ON t.id = s.id WHEN MATCHED THEN UPDATE SET t.description = s.description WHEN NOT MATCHED THEN INSERT (id, description) VALUES (s.id, s.description)",
344+
],
345+
snapshot_path = "merge_into"
346+
);

0 commit comments

Comments
 (0)