Skip to content

Commit 829a8a2

Browse files
wirybeaverclaude
andcommitted
Add MERGE INTO types to datafusion-expr
Add new types to support MERGE INTO DML statements: - MergeIntoOp: Carries ON condition and WHEN clauses - MergeIntoClause: Single WHEN clause (kind, predicate, action) - MergeIntoClauseKind: Matched/NotMatched/NotMatchedByTarget/NotMatchedBySource - MergeIntoAction: Update/Insert/Delete actions Add WriteOp::MergeInto variant. Mark WriteOp #[non_exhaustive] so future variant additions are not a SemVer break for downstream matchers. Wire MergeInto through datafusion-proto: extend the DmlNode message with a MERGE_INTO type tag and a (boxed) MergeIntoOpNode payload field, and add matching serializers in to_proto.rs / from_proto.rs. The conversion goes through a single fallible helper, parse_write_op(&DmlNode, ...), which reads the payload when the tag is MergeInto. There is no From<dml_node::Type> for WriteOp that would have to panic when faced with a MergeInto tag without payload, and no From<&WriteOp> for dml_node::Type that would silently lose the payload — both directions are handled in one place. Document that MergeIntoClauseKind::NotMatched and NotMatchedByTarget are semantically equivalent and must be treated identically downstream, and provide two helpers so consumers don't have to remember: - is_not_matched_by_target(): predicate covering both spellings - canonical(): collapses NotMatched into NotMatchedByTarget Tests: - datafusion-expr unit tests for the helpers and WriteOp::MergeInto display - datafusion-proto round-trip test exercising all four clause kinds and all three action variants - datafusion-proto error-path tests for each defensive check in parse_merge_into_op/clause/action (missing on, missing payload, unknown clause kind tag, missing clause action, missing action oneof) Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e62f06c commit 829a8a2

9 files changed

Lines changed: 1572 additions & 34 deletions

File tree

datafusion/expr/src/logical_plan/dml.rs

Lines changed: 149 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use arrow::datatypes::{DataType, Field, Schema};
2525
use datafusion_common::file_options::file_type::FileType;
2626
use datafusion_common::{DFSchemaRef, TableReference};
2727

28-
use crate::{LogicalPlan, TableSource};
28+
use crate::{Expr, LogicalPlan, TableSource};
2929

3030
/// Operator that copies the contents of a database to file(s)
3131
#[derive(Clone)]
@@ -227,7 +227,11 @@ impl PartialOrd for DmlStatement {
227227
/// The type of DML operation to perform.
228228
///
229229
/// See [`DmlStatement`] for more details.
230+
///
231+
/// Marked `#[non_exhaustive]` so adding new variants in future releases is
232+
/// not a SemVer break for downstream matchers.
230233
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
234+
#[non_exhaustive]
231235
pub enum WriteOp {
232236
/// `INSERT INTO` operation
233237
Insert(InsertOp),
@@ -239,6 +243,8 @@ pub enum WriteOp {
239243
Ctas,
240244
/// `TRUNCATE` operation
241245
Truncate,
246+
/// `MERGE INTO` operation
247+
MergeInto(MergeIntoOp),
242248
}
243249

244250
impl WriteOp {
@@ -250,6 +256,7 @@ impl WriteOp {
250256
WriteOp::Update => "Update",
251257
WriteOp::Ctas => "Ctas",
252258
WriteOp::Truncate => "Truncate",
259+
WriteOp::MergeInto(_) => "MergeInto",
253260
}
254261
}
255262
}
@@ -291,10 +298,151 @@ impl Display for InsertOp {
291298
}
292299
}
293300

301+
/// Describes a MERGE INTO operation's parameters.
302+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
303+
pub struct MergeIntoOp {
304+
/// The join condition from `ON <expr>`.
305+
pub on: Expr,
306+
/// The WHEN clauses, in the order they appeared in the SQL.
307+
pub clauses: Vec<MergeIntoClause>,
308+
}
309+
310+
/// A single WHEN clause within a MERGE INTO statement.
311+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
312+
pub struct MergeIntoClause {
313+
/// Whether this fires on matched or unmatched rows.
314+
pub kind: MergeIntoClauseKind,
315+
/// Optional additional predicate (`AND <expr>`).
316+
pub predicate: Option<Expr>,
317+
/// The action to take.
318+
pub action: MergeIntoAction,
319+
}
320+
321+
/// Which rows a MERGE WHEN clause applies to.
322+
///
323+
/// Mirrors `sqlparser::ast::MergeClauseKind` so that the SQL spelling is
324+
/// preserved through the logical plan.
325+
///
326+
/// **Note on `NotMatched` vs `NotMatchedByTarget`:** these two variants are
327+
/// semantically identical — both describe a source row that has no matching
328+
/// target row. `NotMatched` is the SQL standard short form (used by
329+
/// Snowflake, Postgres, SQL Server); `NotMatchedByTarget` is BigQuery's
330+
/// explicit form added for symmetry with `NotMatchedBySource`. Downstream
331+
/// consumers (planners, table providers, optimizers) MUST treat the two
332+
/// variants identically.
333+
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
334+
pub enum MergeIntoClauseKind {
335+
/// `WHEN MATCHED`
336+
Matched,
337+
/// `WHEN NOT MATCHED` — see type-level note for the equivalence with
338+
/// [`NotMatchedByTarget`](Self::NotMatchedByTarget).
339+
NotMatched,
340+
/// `WHEN NOT MATCHED BY TARGET` — see type-level note for the
341+
/// equivalence with [`NotMatched`](Self::NotMatched).
342+
NotMatchedByTarget,
343+
/// `WHEN NOT MATCHED BY SOURCE`
344+
NotMatchedBySource,
345+
}
346+
347+
impl MergeIntoClauseKind {
348+
/// True if this clause fires on a source row that has no matching target
349+
/// row. Returns `true` for both [`NotMatched`](Self::NotMatched) and
350+
/// [`NotMatchedByTarget`](Self::NotMatchedByTarget) (see the type-level
351+
/// note explaining why those two variants are semantically identical).
352+
///
353+
/// Prefer this predicate over hand-written `matches!` arms so the
354+
/// `NotMatched`/`NotMatchedByTarget` equivalence is enforced in one place.
355+
pub fn is_not_matched_by_target(&self) -> bool {
356+
matches!(self, Self::NotMatched | Self::NotMatchedByTarget)
357+
}
358+
359+
/// Collapse the SQL-spelling variants into the canonical three semantic
360+
/// categories: [`Matched`](Self::Matched),
361+
/// [`NotMatchedByTarget`](Self::NotMatchedByTarget) (covering both
362+
/// "NOT MATCHED" spellings), and
363+
/// [`NotMatchedBySource`](Self::NotMatchedBySource).
364+
///
365+
/// Use this in downstream `match` expressions when the SQL spelling
366+
/// distinction does not matter — e.g. in planners, optimizers, or
367+
/// table-provider dispatch.
368+
pub fn canonical(self) -> Self {
369+
match self {
370+
Self::NotMatched => Self::NotMatchedByTarget,
371+
other => other,
372+
}
373+
}
374+
}
375+
376+
/// The action for a single WHEN clause.
377+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
378+
pub enum MergeIntoAction {
379+
/// `UPDATE SET col1 = expr1, col2 = expr2, ...`, stored as
380+
/// `(column_name, value_expr)` pairs.
381+
Update(Vec<(String, Expr)>),
382+
/// `INSERT (col1, col2, ...) VALUES (expr1, expr2, ...)`. `columns` may
383+
/// be empty, meaning all columns.
384+
Insert {
385+
columns: Vec<String>,
386+
values: Vec<Expr>,
387+
},
388+
Delete,
389+
}
390+
294391
fn make_count_schema() -> DFSchemaRef {
295392
Arc::new(
296393
Schema::new(vec![Field::new("count", DataType::UInt64, false)])
297394
.try_into()
298395
.unwrap(),
299396
)
300397
}
398+
399+
#[cfg(test)]
400+
mod tests {
401+
use super::*;
402+
use crate::{col, lit};
403+
404+
#[test]
405+
fn write_op_merge_into_name_and_display() {
406+
let op = WriteOp::MergeInto(MergeIntoOp {
407+
on: col("id").eq(col("source_id")),
408+
clauses: vec![MergeIntoClause {
409+
kind: MergeIntoClauseKind::Matched,
410+
predicate: Some(col("qty").gt(lit(0_i64))),
411+
action: MergeIntoAction::Update(vec![(
412+
"qty".to_string(),
413+
col("source_qty"),
414+
)]),
415+
}],
416+
});
417+
assert_eq!(op.name(), "MergeInto");
418+
assert_eq!(format!("{op}"), "MergeInto");
419+
}
420+
421+
#[test]
422+
fn merge_into_clause_kind_is_not_matched_by_target() {
423+
assert!(!MergeIntoClauseKind::Matched.is_not_matched_by_target());
424+
assert!(MergeIntoClauseKind::NotMatched.is_not_matched_by_target());
425+
assert!(MergeIntoClauseKind::NotMatchedByTarget.is_not_matched_by_target());
426+
assert!(!MergeIntoClauseKind::NotMatchedBySource.is_not_matched_by_target());
427+
}
428+
429+
#[test]
430+
fn merge_into_clause_kind_canonical_collapses_not_matched() {
431+
assert_eq!(
432+
MergeIntoClauseKind::NotMatched.canonical(),
433+
MergeIntoClauseKind::NotMatchedByTarget
434+
);
435+
assert_eq!(
436+
MergeIntoClauseKind::NotMatchedByTarget.canonical(),
437+
MergeIntoClauseKind::NotMatchedByTarget
438+
);
439+
assert_eq!(
440+
MergeIntoClauseKind::Matched.canonical(),
441+
MergeIntoClauseKind::Matched
442+
);
443+
assert_eq!(
444+
MergeIntoClauseKind::NotMatchedBySource.canonical(),
445+
MergeIntoClauseKind::NotMatchedBySource
446+
);
447+
}
448+
}

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ pub use ddl::{
3636
CreateFunctionBody, CreateIndex, CreateMemoryTable, CreateView, DdlStatement,
3737
DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg,
3838
};
39-
pub use dml::{DmlStatement, WriteOp};
39+
pub use dml::{
40+
DmlStatement, MergeIntoAction, MergeIntoClause, MergeIntoClauseKind, MergeIntoOp,
41+
WriteOp,
42+
};
4043
pub use plan::{
4144
Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct, DistinctOn,
4245
EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter, Join,

datafusion/proto/proto/datafusion.proto

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,13 +300,63 @@ message DmlNode{
300300
INSERT_OVERWRITE = 4;
301301
INSERT_REPLACE = 5;
302302
TRUNCATE = 6;
303+
MERGE_INTO = 7;
303304
}
304305
Type dml_type = 1;
305306
LogicalPlanNode input = 2;
306307
TableReference table_name = 3;
307308
LogicalPlanNode target = 5;
309+
// Populated only when dml_type == MERGE_INTO.
310+
MergeIntoOpNode merge_into = 6;
308311
}
309312

313+
// Carries the ON condition and WHEN clauses of a MERGE INTO operation.
314+
message MergeIntoOpNode {
315+
LogicalExprNode on = 1;
316+
repeated MergeIntoClauseNode clauses = 2;
317+
}
318+
319+
// A single WHEN clause within a MERGE INTO statement.
320+
message MergeIntoClauseNode {
321+
enum Kind {
322+
MATCHED = 0;
323+
NOT_MATCHED = 1;
324+
NOT_MATCHED_BY_TARGET = 2;
325+
NOT_MATCHED_BY_SOURCE = 3;
326+
}
327+
Kind kind = 1;
328+
// Optional `AND <expr>` predicate. Absent when the clause has no predicate.
329+
LogicalExprNode predicate = 2;
330+
MergeIntoActionNode action = 3;
331+
}
332+
333+
// The action for a single WHEN clause.
334+
message MergeIntoActionNode {
335+
oneof action {
336+
MergeUpdateAction update = 1;
337+
MergeInsertAction insert = 2;
338+
MergeDeleteAction delete = 3;
339+
}
340+
}
341+
342+
message MergeUpdateAction {
343+
repeated MergeAssignment assignments = 1;
344+
}
345+
346+
message MergeAssignment {
347+
string column = 1;
348+
LogicalExprNode value = 2;
349+
}
350+
351+
message MergeInsertAction {
352+
// May be empty (meaning all columns).
353+
repeated string columns = 1;
354+
// One expression per inserted column.
355+
repeated LogicalExprNode values = 2;
356+
}
357+
358+
message MergeDeleteAction {}
359+
310360
message UnnestNode {
311361
LogicalPlanNode input = 1;
312362
repeated datafusion_common.Column exec_columns = 2;

0 commit comments

Comments
 (0)