Skip to content

Commit c27fd0b

Browse files
authored
[refactor](be) Add operator IO wrappers (#64139)
### What problem does this PR solve? Issue Number: N/A Problem Summary: Pipeline operator source and sink paths need a common place to validate output and input blocks. Before this change, `sink` and `get_block` were the virtual override points, so common validation either had to stay in call sites or be duplicated across operator implementations. Root cause: the public operator data-flow entry points were also the polymorphic implementation hooks, which left no wrapper layer for shared checks. This change makes `DataSinkOperatorXBase::sink` and `OperatorXBase::get_block` non-virtual wrappers. The wrappers run `Block::check_type_and_column()` at the source/sink boundary and then dispatch to the new virtual `sink_impl` and `get_block_impl` methods. All pipeline operator implementations, exchange operators, scan operators, and related BE test mocks are migrated to the new impl methods. The scan projection path is updated to call the base `get_block` wrapper so the shared checks still apply.
1 parent 2471d75 commit c27fd0b

119 files changed

Lines changed: 165 additions & 131 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

be/src/exec/exchange/local_exchange_sink_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) con
142142
return fmt::to_string(debug_string_buffer);
143143
}
144144

145-
Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, Block* in_block, bool eos) {
145+
Status LocalExchangeSinkOperatorX::sink_impl(RuntimeState* state, Block* in_block, bool eos) {
146146
auto& local_state = get_local_state(state);
147147
SCOPED_TIMER(local_state.exec_time_counter());
148148
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

be/src/exec/exchange/local_exchange_sink_operator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
103103

104104
Status prepare(RuntimeState* state) override;
105105

106-
Status sink(RuntimeState* state, Block* in_block, bool eos) override;
106+
Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
107107

108108
void set_low_memory_mode(RuntimeState* state) override {
109109
auto& local_state = get_local_state(state);

be/src/exec/exchange/local_exchange_source_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c
8989
return fmt::to_string(debug_string_buffer);
9090
}
9191

92-
Status LocalExchangeSourceOperatorX::get_block(RuntimeState* state, Block* block, bool* eos) {
92+
Status LocalExchangeSourceOperatorX::get_block_impl(RuntimeState* state, Block* block, bool* eos) {
9393
auto& local_state = get_local_state(state);
9494
SCOPED_TIMER(local_state.exec_time_counter());
9595
RETURN_IF_ERROR(local_state._exchanger->get_block(

be/src/exec/exchange/local_exchange_source_operator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class LocalExchangeSourceOperatorX final : public OperatorX<LocalExchangeSourceL
7878
RowDescriptor& row_descriptor() override { return _child->row_descriptor(); }
7979
const RowDescriptor& row_desc() const override { return _child->row_desc(); }
8080

81-
Status get_block(RuntimeState* state, Block* block, bool* eos) override;
81+
Status get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
8282

8383
bool is_source() const override { return true; }
8484

be/src/exec/operator/aggregation_sink_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -988,7 +988,7 @@ Status AggSinkOperatorX::_check_agg_fn_output() {
988988
return Status::OK();
989989
}
990990

991-
Status AggSinkOperatorX::sink(doris::RuntimeState* state, Block* in_block, bool eos) {
991+
Status AggSinkOperatorX::sink_impl(doris::RuntimeState* state, Block* in_block, bool eos) {
992992
auto& local_state = get_local_state(state);
993993
SCOPED_TIMER(local_state.exec_time_counter());
994994
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());

be/src/exec/operator/aggregation_sink_operator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ class AggSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<AggSinkLoca
154154

155155
Status prepare(RuntimeState* state) override;
156156

157-
Status sink(RuntimeState* state, Block* in_block, bool eos) override;
157+
Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
158158

159159
DataDistribution required_data_distribution(RuntimeState* state) const override {
160160
if (_partition_exprs.empty()) {

be/src/exec/operator/aggregation_source_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -562,7 +562,7 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
562562
_needs_finalize(tnode.agg_node.need_finalize),
563563
_without_key(tnode.agg_node.grouping_exprs.empty()) {}
564564

565-
Status AggSourceOperatorX::get_block(RuntimeState* state, Block* block, bool* eos) {
565+
Status AggSourceOperatorX::get_block_impl(RuntimeState* state, Block* block, bool* eos) {
566566
auto& local_state = get_local_state(state);
567567
SCOPED_TIMER(local_state.exec_time_counter());
568568
SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);

be/src/exec/operator/aggregation_source_operator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class AggSourceOperatorX : public OperatorX<AggLocalState> {
102102
AggSourceOperatorX() = default;
103103
#endif
104104

105-
Status get_block(RuntimeState* state, Block* block, bool* eos) override;
105+
Status get_block_impl(RuntimeState* state, Block* block, bool* eos) override;
106106

107107
bool is_source() const override { return true; }
108108

be/src/exec/operator/analytic_sink_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -744,7 +744,7 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
744744
return Status::OK();
745745
}
746746

747-
Status AnalyticSinkOperatorX::sink(doris::RuntimeState* state, Block* input_block, bool eos) {
747+
Status AnalyticSinkOperatorX::sink_impl(doris::RuntimeState* state, Block* input_block, bool eos) {
748748
auto& local_state = get_local_state(state);
749749
SCOPED_TIMER(local_state.exec_time_counter());
750750
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)input_block->rows());

be/src/exec/operator/analytic_sink_operator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
209209

210210
Status prepare(RuntimeState* state) override;
211211

212-
Status sink(RuntimeState* state, Block* in_block, bool eos) override;
212+
Status sink_impl(RuntimeState* state, Block* in_block, bool eos) override;
213213
DataDistribution required_data_distribution(RuntimeState* /*state*/) const override {
214214
if (_partition_by_eq_expr_ctxs.empty()) {
215215
return {ExchangeType::PASSTHROUGH};

0 commit comments

Comments
 (0)