From b940b915093a7b4353624a1d2fb77a3fb4519b31 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Tue, 24 Mar 2026 20:04:26 +0800 Subject: [PATCH 1/5] fix(sql): support unsigned ASOF join keys (#19570) --- src/query/expression/src/types.rs | 24 +++++++++++ src/query/expression/tests/it/types.rs | 35 ++++++++++++++++ .../bind_table_reference/bind_asof_join.rs | 4 +- .../duckdb/join/asof/test_asof_join_ints.test | 41 +++++++++++++++++++ 4 files changed, 102 insertions(+), 2 deletions(-) diff --git a/src/query/expression/src/types.rs b/src/query/expression/src/types.rs index 48e1e73cac1d5..eb0c4f1312ed4 100755 --- a/src/query/expression/src/types.rs +++ b/src/query/expression/src/types.rs @@ -294,6 +294,18 @@ impl DataType { DataType::Number(NumberDataType::Float32) => Ok(Scalar::Number(NumberScalar::Float32( OrderedFloat(f32::INFINITY), ))), + DataType::Number(NumberDataType::UInt8) => { + Ok(Scalar::Number(NumberScalar::UInt8(u8::MAX))) + } + DataType::Number(NumberDataType::UInt16) => { + Ok(Scalar::Number(NumberScalar::UInt16(u16::MAX))) + } + DataType::Number(NumberDataType::UInt32) => { + Ok(Scalar::Number(NumberScalar::UInt32(u32::MAX))) + } + DataType::Number(NumberDataType::UInt64) => { + Ok(Scalar::Number(NumberScalar::UInt64(u64::MAX))) + } DataType::Number(NumberDataType::Int32) => { Ok(Scalar::Number(NumberScalar::Int32(i32::MAX))) } @@ -322,6 +334,18 @@ impl DataType { DataType::Number(NumberDataType::Float32) => Ok(Scalar::Number(NumberScalar::Float32( OrderedFloat(f32::NEG_INFINITY), ))), + DataType::Number(NumberDataType::UInt8) => { + Ok(Scalar::Number(NumberScalar::UInt8(u8::MIN))) + } + DataType::Number(NumberDataType::UInt16) => { + Ok(Scalar::Number(NumberScalar::UInt16(u16::MIN))) + } + DataType::Number(NumberDataType::UInt32) => { + Ok(Scalar::Number(NumberScalar::UInt32(u32::MIN))) + } + DataType::Number(NumberDataType::UInt64) => { + Ok(Scalar::Number(NumberScalar::UInt64(u64::MIN))) + } DataType::Number(NumberDataType::Int32) => { Ok(Scalar::Number(NumberScalar::Int32(i32::MIN))) } diff --git a/src/query/expression/tests/it/types.rs b/src/query/expression/tests/it/types.rs index df48cff10bae4..25584405453c0 100644 --- a/src/query/expression/tests/it/types.rs +++ b/src/query/expression/tests/it/types.rs @@ -15,8 +15,12 @@ use arrow_schema::Schema; use databend_common_expression::DataField; use databend_common_expression::DataSchema; +use databend_common_expression::Scalar; use databend_common_expression::arrow::deserialize_column; use databend_common_expression::arrow::serialize_column; +use databend_common_expression::types::DataType; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::NumberScalar; use databend_common_expression::types::timestamp::timestamp_to_string; use jiff::fmt::strtime::BrokenDownTime; use jiff::tz; @@ -90,3 +94,34 @@ fn test_convert_types() { assert_eq!(c, c2, "in {idx} | datatype: {}", c.data_type()); } } + +#[test] +fn test_integer_infinity_boundaries() { + let cases = [ + ( + DataType::Number(NumberDataType::UInt8), + Scalar::Number(NumberScalar::UInt8(u8::MAX)), + Scalar::Number(NumberScalar::UInt8(u8::MIN)), + ), + ( + DataType::Number(NumberDataType::UInt16), + Scalar::Number(NumberScalar::UInt16(u16::MAX)), + Scalar::Number(NumberScalar::UInt16(u16::MIN)), + ), + ( + DataType::Number(NumberDataType::UInt32), + Scalar::Number(NumberScalar::UInt32(u32::MAX)), + Scalar::Number(NumberScalar::UInt32(u32::MIN)), + ), + ( + DataType::Number(NumberDataType::UInt64), + Scalar::Number(NumberScalar::UInt64(u64::MAX)), + Scalar::Number(NumberScalar::UInt64(u64::MIN)), + ), + ]; + + for (data_type, max_value, min_value) in cases { + assert_eq!(data_type.infinity().unwrap(), max_value, "{data_type:?}"); + assert_eq!(data_type.ninfinity().unwrap(), min_value, "{data_type:?}"); + } +} diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs index 859cbf3b1be37..14fab6a02867b 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs @@ -164,13 +164,13 @@ impl Binder { .data_type()? .remove_nullable() .infinity() - .unwrap() + .map_err(ErrorCode::BadDataValueType)? } else { left_column .data_type()? .remove_nullable() .ninfinity() - .unwrap() + .map_err(ErrorCode::BadDataValueType)? }; ConstantExpr { span: left_column.span(), diff --git a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test index 585e30726d7ed..3343606107b0c 100644 --- a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test +++ b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test @@ -8,6 +8,12 @@ drop table if exists events0 statement ok drop table if exists probe0 +statement ok +drop table if exists events_u8 + +statement ok +drop table if exists probe_u8 + # Join on a string range statement ok @@ -29,6 +35,23 @@ CREATE TABLE probe0 AS FROM range(0,10) vals(v)) ; +statement ok +CREATE TABLE events_u8 (begin TINYINT UNSIGNED, value INTEGER); + +statement ok +INSERT INTO events_u8 VALUES + (1, 10), + (3, 20), + (6, 30), + (8, 40) +; + +statement ok +CREATE TABLE probe_u8 AS + (SELECT v::TINYINT UNSIGNED AS begin + FROM range(0,10) vals(v)) +; + # This is not implemented yet because it requires a dedicated operator # instead of LEAD(...infinity::INTEGER) @@ -85,3 +108,21 @@ ORDER BY p.begin ASC, e.value ASC 9 3 NULL -1 NULL 9 + +# LEFT ON inequality only for unsigned integer keys +query II +SELECT p.begin, e.value +FROM probe_u8 p ASOF LEFT JOIN events_u8 e +ON p.begin >= e.begin +ORDER BY p.begin ASC +---- +0 NULL +1 10 +2 10 +3 20 +4 20 +5 20 +6 30 +7 30 +8 40 +9 40 From 251c0d4e6202d27004eca3fb3d716c3c0be84c38 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 25 Mar 2026 07:31:42 +0800 Subject: [PATCH 2/5] fix(sql): preserve open-ended ASOF unsigned bounds --- src/query/expression/src/types.rs | 24 --- src/query/expression/tests/it/types.rs | 35 ---- .../bind_table_reference/bind_asof_join.rs | 175 +++++++++++++----- .../duckdb/join/asof/test_asof_join_ints.test | 44 +++++ 4 files changed, 171 insertions(+), 107 deletions(-) diff --git a/src/query/expression/src/types.rs b/src/query/expression/src/types.rs index eb0c4f1312ed4..48e1e73cac1d5 100755 --- a/src/query/expression/src/types.rs +++ b/src/query/expression/src/types.rs @@ -294,18 +294,6 @@ impl DataType { DataType::Number(NumberDataType::Float32) => Ok(Scalar::Number(NumberScalar::Float32( OrderedFloat(f32::INFINITY), ))), - DataType::Number(NumberDataType::UInt8) => { - Ok(Scalar::Number(NumberScalar::UInt8(u8::MAX))) - } - DataType::Number(NumberDataType::UInt16) => { - Ok(Scalar::Number(NumberScalar::UInt16(u16::MAX))) - } - DataType::Number(NumberDataType::UInt32) => { - Ok(Scalar::Number(NumberScalar::UInt32(u32::MAX))) - } - DataType::Number(NumberDataType::UInt64) => { - Ok(Scalar::Number(NumberScalar::UInt64(u64::MAX))) - } DataType::Number(NumberDataType::Int32) => { Ok(Scalar::Number(NumberScalar::Int32(i32::MAX))) } @@ -334,18 +322,6 @@ impl DataType { DataType::Number(NumberDataType::Float32) => Ok(Scalar::Number(NumberScalar::Float32( OrderedFloat(f32::NEG_INFINITY), ))), - DataType::Number(NumberDataType::UInt8) => { - Ok(Scalar::Number(NumberScalar::UInt8(u8::MIN))) - } - DataType::Number(NumberDataType::UInt16) => { - Ok(Scalar::Number(NumberScalar::UInt16(u16::MIN))) - } - DataType::Number(NumberDataType::UInt32) => { - Ok(Scalar::Number(NumberScalar::UInt32(u32::MIN))) - } - DataType::Number(NumberDataType::UInt64) => { - Ok(Scalar::Number(NumberScalar::UInt64(u64::MIN))) - } DataType::Number(NumberDataType::Int32) => { Ok(Scalar::Number(NumberScalar::Int32(i32::MIN))) } diff --git a/src/query/expression/tests/it/types.rs b/src/query/expression/tests/it/types.rs index 25584405453c0..df48cff10bae4 100644 --- a/src/query/expression/tests/it/types.rs +++ b/src/query/expression/tests/it/types.rs @@ -15,12 +15,8 @@ use arrow_schema::Schema; use databend_common_expression::DataField; use databend_common_expression::DataSchema; -use databend_common_expression::Scalar; use databend_common_expression::arrow::deserialize_column; use databend_common_expression::arrow::serialize_column; -use databend_common_expression::types::DataType; -use databend_common_expression::types::NumberDataType; -use databend_common_expression::types::NumberScalar; use databend_common_expression::types::timestamp::timestamp_to_string; use jiff::fmt::strtime::BrokenDownTime; use jiff::tz; @@ -94,34 +90,3 @@ fn test_convert_types() { assert_eq!(c, c2, "in {idx} | datatype: {}", c.data_type()); } } - -#[test] -fn test_integer_infinity_boundaries() { - let cases = [ - ( - DataType::Number(NumberDataType::UInt8), - Scalar::Number(NumberScalar::UInt8(u8::MAX)), - Scalar::Number(NumberScalar::UInt8(u8::MIN)), - ), - ( - DataType::Number(NumberDataType::UInt16), - Scalar::Number(NumberScalar::UInt16(u16::MAX)), - Scalar::Number(NumberScalar::UInt16(u16::MIN)), - ), - ( - DataType::Number(NumberDataType::UInt32), - Scalar::Number(NumberScalar::UInt32(u32::MAX)), - Scalar::Number(NumberScalar::UInt32(u32::MIN)), - ), - ( - DataType::Number(NumberDataType::UInt64), - Scalar::Number(NumberScalar::UInt64(u64::MAX)), - Scalar::Number(NumberScalar::UInt64(u64::MIN)), - ), - ]; - - for (data_type, max_value, min_value) in cases { - assert_eq!(data_type.infinity().unwrap(), max_value, "{data_type:?}"); - assert_eq!(data_type.ninfinity().unwrap(), min_value, "{data_type:?}"); - } -} diff --git a/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs b/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs index 14fab6a02867b..4e1b7355c2ee7 100644 --- a/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs +++ b/src/query/sql/src/planner/binder/bind_table_reference/bind_asof_join.rs @@ -84,40 +84,32 @@ impl Binder { std::mem::swap(&mut condition.left, &mut condition.right) } - let span = right_column.span(); - let arguments = [ - right_column, - BoundColumnRef { - span, - column: ColumnBindingBuilder::new( - window_func.display_name.clone(), - window_info.index, - Box::new(window_func.func.return_type()), - Visibility::Visible, - ) - .build(), - } - .into(), - ] - .to_vec(); - let func_name = match range_func.func_name.as_str() { GTE => LT, GT => LTE, LT => GTE, LTE => GT, _ => unreachable!(), + }; + let span = right_column.span(); + let lead_column = BoundColumnRef { + span, + column: ColumnBindingBuilder::new( + window_func.display_name.clone(), + window_info.index, + Box::new(window_func.func.return_type()), + Visibility::Visible, + ) + .build(), } - .to_string(); - join.non_equi_conditions.push( - FunctionCall { - span: range_func.span, - params: vec![], - arguments, + .into(); + join.non_equi_conditions + .push(make_asof_interval_end_condition( + range_func.span, + right_column, + lead_column, func_name, - } - .into(), - ); + )); let window_plan = bind_window_function_info(&self.ctx, window_info, right)?; Ok(SExpr::create_binary( @@ -158,26 +150,6 @@ impl Binder { _ => unreachable!(), }; - let constant_default = { - let value = if asc { - left_column - .data_type()? - .remove_nullable() - .infinity() - .map_err(ErrorCode::BadDataValueType)? - } else { - left_column - .data_type()? - .remove_nullable() - .ninfinity() - .map_err(ErrorCode::BadDataValueType)? - }; - ConstantExpr { - span: left_column.span(), - value, - } - }; - let order_items = vec![WindowOrderBy { expr: left_column.clone(), asc: Some(asc), @@ -189,12 +161,13 @@ impl Binder { partition_items.push(condition.right.clone()); } + let return_type = asof_window_result_type(&left_column.data_type()?); let func_type = WindowFuncType::LagLead(LagLeadFunction { is_lag: false, - return_type: Box::new(left_column.data_type()?.clone()), + return_type: Box::new(return_type), arg: Box::new(left_column), offset: 1, - default: Some(Box::new(constant_default.into())), + default: None, }); let window_func = WindowFunc { @@ -217,6 +190,45 @@ impl Binder { } } +fn asof_window_result_type( + data_type: &databend_common_expression::types::DataType, +) -> databend_common_expression::types::DataType { + data_type.wrap_nullable() +} + +fn make_asof_interval_end_condition( + span: databend_common_ast::Span, + probe_key: ScalarExpr, + lead_key: ScalarExpr, + func_name: &str, +) -> ScalarExpr { + let compare = ScalarExpr::FunctionCall(FunctionCall { + span, + func_name: func_name.to_string(), + params: vec![], + arguments: vec![probe_key, lead_key.clone()], + }); + + ScalarExpr::FunctionCall(FunctionCall { + span, + func_name: "if".to_string(), + params: vec![], + arguments: vec![ + ScalarExpr::FunctionCall(FunctionCall { + span, + func_name: "is_not_null".to_string(), + params: vec![], + arguments: vec![lead_key], + }), + compare, + ScalarExpr::ConstantExpr(ConstantExpr { + span, + value: Scalar::Boolean(true), + }), + ], + }) +} + pub fn is_range_join_condition<'a>( expr: &'a ScalarExpr, left_prop: &RelationalProperty, @@ -252,3 +264,70 @@ pub fn is_range_join_condition<'a>( _ => None, } } + +#[cfg(test)] +mod tests { + use databend_common_expression::types::DataType; + use databend_common_expression::types::NumberDataType; + + use super::*; + use crate::Symbol; + + fn test_column(name: &str, index: usize, data_type: DataType) -> ScalarExpr { + BoundColumnRef { + span: None, + column: ColumnBindingBuilder::new( + name.to_string(), + Symbol::from_field_index(index), + Box::new(data_type), + Visibility::Visible, + ) + .build(), + } + .into() + } + + #[test] + fn test_asof_interval_end_condition_guards_open_tail_with_null_lead() { + let probe = test_column("probe", 0, DataType::Number(NumberDataType::UInt8)); + let lead = test_column( + "lead", + 1, + DataType::Number(NumberDataType::UInt8).wrap_nullable(), + ); + + let expr = make_asof_interval_end_condition(None, probe.clone(), lead.clone(), LT); + let ScalarExpr::FunctionCall(func) = expr else { + panic!("expected function call"); + }; + + assert_eq!(func.func_name, "if"); + assert_eq!(func.arguments.len(), 3); + + let ScalarExpr::FunctionCall(not_null) = &func.arguments[0] else { + panic!("expected is_not_null guard"); + }; + assert_eq!(not_null.func_name, "is_not_null"); + assert_eq!(not_null.arguments, vec![lead.clone()]); + + let ScalarExpr::FunctionCall(compare) = &func.arguments[1] else { + panic!("expected comparison branch"); + }; + assert_eq!(compare.func_name, LT); + assert_eq!(compare.arguments, vec![probe, lead]); + + let ScalarExpr::ConstantExpr(constant) = &func.arguments[2] else { + panic!("expected constant true branch"); + }; + assert_eq!(constant.value, Scalar::Boolean(true)); + } + + #[test] + fn test_asof_window_result_type_is_nullable() { + let data_type = DataType::Number(NumberDataType::UInt8); + assert_eq!( + asof_window_result_type(&data_type), + data_type.wrap_nullable() + ); + } +} diff --git a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test index 3343606107b0c..184ee0935d6e3 100644 --- a/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test +++ b/tests/sqllogictests/suites/duckdb/join/asof/test_asof_join_ints.test @@ -14,6 +14,12 @@ drop table if exists events_u8 statement ok drop table if exists probe_u8 +statement ok +drop table if exists events_u8_bounds + +statement ok +drop table if exists probe_u8_bounds + # Join on a string range statement ok @@ -52,6 +58,24 @@ CREATE TABLE probe_u8 AS FROM range(0,10) vals(v)) ; +statement ok +CREATE TABLE events_u8_bounds (begin TINYINT UNSIGNED, value INTEGER); + +statement ok +INSERT INTO events_u8_bounds VALUES + (0, 100), + (255, 200) +; + +statement ok +CREATE TABLE probe_u8_bounds (begin TINYINT UNSIGNED); + +statement ok +INSERT INTO probe_u8_bounds VALUES + (0), + (255) +; + # This is not implemented yet because it requires a dedicated operator # instead of LEAD(...infinity::INTEGER) @@ -126,3 +150,23 @@ ORDER BY p.begin ASC 7 30 8 40 9 40 + +# Boundary rows at the legal unsigned maximum still belong to the terminal interval. +query II +SELECT p.begin, e.value +FROM probe_u8_bounds p ASOF LEFT JOIN events_u8_bounds e +ON p.begin >= e.begin +ORDER BY p.begin ASC +---- +0 100 +255 200 + +# Boundary rows at the legal unsigned minimum still belong to the initial interval. +query II +SELECT p.begin, e.value +FROM probe_u8_bounds p ASOF LEFT JOIN events_u8_bounds e +ON p.begin <= e.begin +ORDER BY p.begin ASC +---- +0 100 +255 200 From 48881149de9c43fed4d0ae549f1a8b08d1d1389e Mon Sep 17 00:00:00 2001 From: kould Date: Wed, 8 Apr 2026 12:09:06 +0800 Subject: [PATCH 3/5] fix(query): preserve outer rows in merge ASOF joins --- .../transforms/range_join/merge_join_state.rs | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs index f2a139f4c7072..4f08462022bac 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs @@ -12,18 +12,36 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::atomic::Ordering; + use databend_common_exception::Result; +use databend_common_expression::BlockEntry; +use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::RepeatIndex; use databend_common_expression::ScalarRef; use databend_common_expression::SortColumnDescription; +use databend_common_expression::Value; +use databend_common_expression::types::AccessType; +use databend_common_expression::types::NumberColumn; use databend_common_expression::types::NumberScalar; +use databend_common_expression::types::UInt64Type; use crate::pipelines::processors::transforms::range_join::RangeJoinState; use crate::pipelines::processors::transforms::range_join::filter_block; impl RangeJoinState { pub fn range_join(&self, task_id: usize) -> Result> { + let partition_count = self.partition_count.load(Ordering::SeqCst) as usize; + if task_id >= partition_count { + if !self.left_match.read().is_empty() { + return Ok(vec![self.fill_outer(task_id, true)?]); + } else if !self.right_match.read().is_empty() { + return Ok(vec![self.fill_outer(task_id, false)?]); + } + return Ok(vec![DataBlock::empty()]); + } + let tasks = self.tasks.read(); let (left_idx, right_idx) = tasks[task_id]; let left_sorted_blocks = self.left_sorted_blocks.read(); @@ -59,6 +77,10 @@ impl RangeJoinState { let mut result_blocks = Vec::with_capacity(left_len); let left_table = self.left_table.read(); let right_table = self.right_table.read(); + let track_left_outer = !self.left_match.read().is_empty(); + let track_right_outer = !self.right_match.read().is_empty(); + let mut matched_left = Vec::with_capacity(left_len); + let mut matched_right = Vec::with_capacity(right_len); while i < left_len { if j == right_len { @@ -79,9 +101,12 @@ impl RangeJoinState { ) { let mut left_result_block = DataBlock::empty(); let mut right_buffer = Vec::with_capacity(right_len - j); + let mut right_match_buffer = Vec::with_capacity(right_len - j); + let mut left_match_index = None; if let ScalarRef::Number(NumberScalar::Int64(left)) = unsafe { left_idx_col.index_unchecked(i) } { + left_match_index = Some((left - 1) as usize); left_result_block = left_table[left_idx].take_compacted_indices( &[RepeatIndex { row: ((left - 1) as usize - left_offset) as u32, @@ -95,6 +120,9 @@ impl RangeJoinState { unsafe { right_idx_col.index_unchecked(k) } { right_buffer.push(((-right - 1) as usize - right_offset) as u32); + if track_right_outer { + right_match_buffer.push(((-right - 1) as usize) as u64); + } } } if !left_result_block.is_empty() { @@ -102,9 +130,43 @@ impl RangeJoinState { right_table[right_idx].take(right_buffer.as_slice())?; // Merge left_result_block and right_result_block left_result_block.merge_block(right_result_block); + if track_right_outer { + left_result_block.add_entry(BlockEntry::new( + Value::Column(Column::Number(NumberColumn::UInt64( + right_match_buffer.into(), + ))), + || { + ( + databend_common_expression::types::DataType::Number( + databend_common_expression::types::NumberDataType::UInt64, + ), + left_result_block.num_rows(), + ) + }, + )); + } for filter in self.other_conditions.iter() { left_result_block = filter_block(left_result_block, filter)?; } + if track_left_outer && !left_result_block.is_empty() { + if let Some(left_match_index) = left_match_index { + matched_left.push(left_match_index); + } + } + if track_right_outer && !left_result_block.is_empty() { + let column = &left_result_block + .columns() + .last() + .unwrap() + .value() + .try_downcast::() + .unwrap(); + if let Value::Column(col) = column { + matched_right + .extend(UInt64Type::iter_column(col).map(|idx| idx as usize)); + } + left_result_block.pop_columns(1); + } result_blocks.push(left_result_block); } i += 1; @@ -112,6 +174,22 @@ impl RangeJoinState { j += 1; } } + + if track_left_outer && !matched_left.is_empty() { + let mut left_match = self.left_match.write(); + for idx in matched_left { + left_match.set(idx, true); + } + } + + if track_right_outer && !matched_right.is_empty() { + let mut right_match = self.right_match.write(); + for idx in matched_right { + right_match.set(idx, true); + } + } + + self.completed_pair.fetch_add(1, Ordering::SeqCst); Ok(result_blocks) } From 6f2a4a15b7542ec4c79ddbea50d81aa5cf9644d9 Mon Sep 17 00:00:00 2001 From: kould Date: Wed, 8 Apr 2026 14:57:34 +0800 Subject: [PATCH 4/5] chore(query): document outer ASOF merge join handling --- .../processors/transforms/range_join/merge_join_state.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs index 4f08462022bac..179a12375a1d0 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs @@ -32,6 +32,11 @@ use crate::pipelines::processors::transforms::range_join::filter_block; impl RangeJoinState { pub fn range_join(&self, task_id: usize) -> Result> { + // Merge range join originally only served Inner/Cross joins, so it could return + // matched pairs directly without tracking unmatched rows. ASOF LEFT/RIGHT joins now + // reuse this path after the nullable interval-end rewrite, which means we must record + // rows that survive `other_conditions` filtering and then run the existing outer-fill + // tasks for the remaining probe/build rows. let partition_count = self.partition_count.load(Ordering::SeqCst) as usize; if task_id >= partition_count { if !self.left_match.read().is_empty() { From dad47cfb9c9accfc08e6dafaeafd915e0e6a3ba2 Mon Sep 17 00:00:00 2001 From: kould Date: Wed, 8 Apr 2026 15:51:48 +0800 Subject: [PATCH 5/5] fix(query): count failed merge range tasks before outer fill --- .../transforms/range_join/merge_join_state.rs | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs b/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs index 179a12375a1d0..eee9d3d0c6aa7 100644 --- a/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs +++ b/src/query/service/src/pipelines/processors/transforms/range_join/merge_join_state.rs @@ -46,7 +46,27 @@ impl RangeJoinState { } return Ok(vec![DataBlock::empty()]); } + let result = self.range_join_partition(task_id); + self.completed_pair.fetch_add(1, Ordering::SeqCst); + result + } + + // Used by range join + fn sort_descriptions(&self, _: bool) -> Vec { + let op = &self.conditions[0].operator; + let asc = match op.as_str() { + "gt" | "gte" => false, + "lt" | "lte" => true, + _ => unreachable!(), + }; + vec![SortColumnDescription { + offset: 0, + asc, + nulls_first: true, + }] + } + fn range_join_partition(&self, task_id: usize) -> Result> { let tasks = self.tasks.read(); let (left_idx, right_idx) = tasks[task_id]; let left_sorted_blocks = self.left_sorted_blocks.read(); @@ -194,24 +214,8 @@ impl RangeJoinState { } } - self.completed_pair.fetch_add(1, Ordering::SeqCst); Ok(result_blocks) } - - // Used by range join - fn sort_descriptions(&self, _: bool) -> Vec { - let op = &self.conditions[0].operator; - let asc = match op.as_str() { - "gt" | "gte" => false, - "lt" | "lte" => true, - _ => unreachable!(), - }; - vec![SortColumnDescription { - offset: 0, - asc, - nulls_first: true, - }] - } } fn compare_scalar(left: &ScalarRef, right: &ScalarRef, op: &str) -> bool {