2121#include < Interpreters/JoinUtils.h>
2222#include < Interpreters/JoinV2/HashJoin.h>
2323#include < Interpreters/JoinV2/HashJoinProbe.h>
24+ #include < Interpreters/JoinV2/SemiJoinProbe.h>
2425#include < Interpreters/NullableUtils.h>
2526#include < Parsers/ASTTablesInSelectQuery.h>
2627
@@ -39,7 +40,9 @@ bool JoinProbeContext::isProbeFinished() const
3940{
4041 return current_row_idx >= rows
4142 // For prefetching
42- && prefetch_active_states == 0 ;
43+ && prefetch_active_states == 0
44+ // For (left outer) (anti) semi join with other conditions
45+ && (semi_join_probe_list == nullptr || semi_join_probe_list->activeSlots () == 0 );
4346}
4447
4548bool JoinProbeContext::isAllFinished () const
@@ -71,6 +74,7 @@ void JoinProbeContext::prepareForHashProbe(
7174 HashJoinKeyMethod method,
7275 ASTTableJoin::Kind kind,
7376 bool has_other_condition,
77+ bool has_other_eq_cond_from_in,
7478 const Names & key_names,
7579 const String & filter_column,
7680 const NameSet & probe_output_name_set,
@@ -123,6 +127,23 @@ void JoinProbeContext::prepareForHashProbe(
123127 {
124128 left_semi_match_res.clear ();
125129 left_semi_match_res.resize_fill_zero (rows);
130+ if (has_other_eq_cond_from_in)
131+ {
132+ left_semi_match_null_res.clear ();
133+ left_semi_match_null_res.resize_fill_zero (rows);
134+ }
135+ }
136+ if ((kind == Semi || kind == Anti) && has_other_condition)
137+ {
138+ semi_selective_offsets.clear ();
139+ semi_selective_offsets.reserve (rows);
140+ }
141+
142+ if (SemiJoinProbeHelper::isSupported (kind, has_other_condition))
143+ {
144+ if unlikely (!semi_join_probe_list)
145+ semi_join_probe_list = createSemiJoinProbeList (method);
146+ semi_join_probe_list->reset (rows);
126147 }
127148
128149 is_prepared = true ;
@@ -634,16 +655,17 @@ void JoinProbeHelper::probeFillColumns(JoinProbeContext & ctx, JoinProbeWorkerDa
634655 using Adder = JoinProbeAdder<kind, has_other_condition, late_materialization>;
635656
636657 auto & key_getter = *static_cast <KeyGetterType *>(ctx.key_getter .get ());
658+ // Some columns in wd.result_block may remain empty due to late materialization for join with other conditions.
659+ // But since all columns are cleared after handling other conditions, wd.result_block.rows() is always 0.
637660 size_t current_offset = wd.result_block .rows ();
661+ if constexpr (has_other_condition)
662+ RUNTIME_CHECK (current_offset == 0 );
638663 size_t idx = ctx.current_row_idx ;
639664 RowPtr ptr = ctx.current_build_row_ptr ;
640665 bool is_matched = ctx.current_row_is_matched ;
641666 size_t collision = 0 ;
642- size_t key_offset = sizeof (RowPtr);
643- if constexpr (KeyGetterType::joinKeyCompareHashFirst ())
644- {
645- key_offset += sizeof (HashValueType);
646- }
667+ constexpr size_t key_offset
668+ = sizeof (RowPtr) + (KeyGetterType::joinKeyCompareHashFirst () ? sizeof (HashValueType) : 0 );
647669
648670#define NOT_MATCHED (not_matched ) \
649671 if constexpr (Adder::need_not_matched) \
@@ -782,8 +804,10 @@ struct ProbePrefetchState
782804 KeyType key{};
783805 union
784806 {
785- RowPtr ptr = nullptr ;
786- std::atomic<RowPtr> * pointer_ptr;
807+ // / Used when stage is FindHeader
808+ std::atomic<RowPtr> * pointer_ptr = nullptr ;
809+ // / Used when stage is FindNext
810+ RowPtr ptr;
787811 };
788812};
789813
@@ -801,24 +825,26 @@ void JoinProbeHelper::probeFillColumnsPrefetch(
801825 using Adder = JoinProbeAdder<kind, has_other_condition, late_materialization>;
802826
803827 auto & key_getter = *static_cast <KeyGetterType *>(ctx.key_getter .get ());
804- if (!ctx.prefetch_states )
828+ const size_t probe_prefetch_step = settings.probe_prefetch_step ;
829+ if unlikely (!ctx.prefetch_states )
805830 {
806831 ctx.prefetch_states = decltype (ctx.prefetch_states )(
807- static_cast <void *>(new ProbePrefetchState<KeyGetter>[settings. probe_prefetch_step ]),
832+ static_cast <void *>(new ProbePrefetchState<KeyGetter>[probe_prefetch_step]),
808833 [](void * ptr) { delete[] static_cast <ProbePrefetchState<KeyGetter> *>(ptr); });
809834 }
810835 auto * states = static_cast <ProbePrefetchState<KeyGetter> *>(ctx.prefetch_states .get ());
811836
812837 size_t idx = ctx.current_row_idx ;
813838 size_t active_states = ctx.prefetch_active_states ;
814839 size_t k = ctx.prefetch_iter ;
840+ // Some columns in wd.result_block may remain empty due to late materialization for join with other conditions.
841+ // But since all columns are cleared after handling other conditions, wd.result_block.rows() is always 0.
815842 size_t current_offset = wd.result_block .rows ();
843+ if constexpr (has_other_condition)
844+ RUNTIME_CHECK (current_offset == 0 );
816845 size_t collision = 0 ;
817- size_t key_offset = sizeof (RowPtr);
818- if constexpr (KeyGetterType::joinKeyCompareHashFirst ())
819- {
820- key_offset += sizeof (HashValueType);
821- }
846+ constexpr size_t key_offset
847+ = sizeof (RowPtr) + (KeyGetterType::joinKeyCompareHashFirst () ? sizeof (HashValueType) : 0 );
822848
823849#define NOT_MATCHED (not_matched, idx ) \
824850 if constexpr (Adder::need_not_matched) \
@@ -831,7 +857,6 @@ void JoinProbeHelper::probeFillColumnsPrefetch(
831857 } \
832858 }
833859
834- const size_t probe_prefetch_step = settings.probe_prefetch_step ;
835860 while (idx < ctx.rows || active_states > 0 )
836861 {
837862 k = k == probe_prefetch_step ? 0 : k;
@@ -845,11 +870,10 @@ void JoinProbeHelper::probeFillColumnsPrefetch(
845870 const auto & key2 = key_getter.deserializeJoinKey (ptr + key_offset);
846871 bool key_is_equal = joinKeyIsEqual (key_getter, state->key , key2, state->hash , ptr);
847872 collision += !key_is_equal;
873+ if constexpr (Adder::need_not_matched)
874+ state->is_matched |= key_is_equal;
848875 if (key_is_equal)
849876 {
850- if constexpr (Adder::need_not_matched)
851- state->is_matched = true ;
852-
853877 if constexpr (Adder::need_matched)
854878 {
855879 bool is_end = Adder::addMatched (
@@ -1026,6 +1050,17 @@ Block JoinProbeHelper::handleOtherConditions(
10261050
10271051 non_equal_conditions.other_cond_expr ->execute (exec_block);
10281052
1053+ SCOPE_EXIT ({
1054+ RUNTIME_CHECK (wd.result_block .columns () == left_columns + right_columns);
1055+ // / Clear the data in result_block.
1056+ for (size_t i = 0 ; i < left_columns + right_columns; ++i)
1057+ {
1058+ auto column = wd.result_block .getByPosition (i).column ->assumeMutable ();
1059+ column->popBack (column->size ());
1060+ wd.result_block .getByPosition (i).column = std::move (column);
1061+ }
1062+ });
1063+
10291064 size_t rows = exec_block.rows ();
10301065 // Ensure BASE_OFFSETS is accessed within bound.
10311066 // It must be true because max_block_size <= BASE_OFFSETS.size(HASH_JOIN_MAX_BLOCK_SIZE_UPPER_BOUND).
@@ -1212,17 +1247,6 @@ Block JoinProbeHelper::handleOtherConditions(
12121247 }
12131248 };
12141249
1215- SCOPE_EXIT ({
1216- RUNTIME_CHECK (wd.result_block .columns () == left_columns + right_columns);
1217- // / Clear the data in result_block.
1218- for (size_t i = 0 ; i < left_columns + right_columns; ++i)
1219- {
1220- auto column = wd.result_block .getByPosition (i).column ->assumeMutable ();
1221- column->popBack (column->size ());
1222- wd.result_block .getByPosition (i).column = std::move (column);
1223- }
1224- });
1225-
12261250 size_t length = std::min (result_size, remaining_insert_size);
12271251 fill_matched (0 , length);
12281252 if (result_size >= remaining_insert_size)
0 commit comments