4949import org .apache .flink .table .planner .plan .nodes .exec .utils .ExecNodeUtil ;
5050import org .apache .flink .table .planner .plan .schema .TableSourceTable ;
5151import org .apache .flink .table .planner .plan .utils .DeltaJoinUtil ;
52+ import org .apache .flink .table .planner .plan .utils .FunctionCallUtil ;
5253import org .apache .flink .table .planner .plan .utils .FunctionCallUtil .AsyncOptions ;
5354import org .apache .flink .table .planner .plan .utils .KeySelectorUtil ;
5455import org .apache .flink .table .planner .plan .utils .LookupJoinUtil ;
6061import org .apache .flink .table .runtime .operators .join .FlinkJoinType ;
6162import org .apache .flink .table .runtime .operators .join .deltajoin .AsyncDeltaJoinRunner ;
6263import org .apache .flink .table .runtime .operators .join .deltajoin .BinaryLookupHandler ;
64+ import org .apache .flink .table .runtime .operators .join .deltajoin .CascadedLookupHandler ;
65+ import org .apache .flink .table .runtime .operators .join .deltajoin .DeltaJoinHandlerBase ;
6366import org .apache .flink .table .runtime .operators .join .deltajoin .DeltaJoinHandlerChain ;
6467import org .apache .flink .table .runtime .operators .join .deltajoin .DeltaJoinRuntimeTree ;
6568import org .apache .flink .table .runtime .operators .join .deltajoin .LookupHandlerBase ;
69+ import org .apache .flink .table .runtime .operators .join .deltajoin .TailOutputDataHandler ;
6670import org .apache .flink .table .runtime .typeutils .InternalSerializers ;
6771import org .apache .flink .table .runtime .typeutils .InternalTypeInfo ;
6872import org .apache .flink .table .types .logical .RowType ;
@@ -499,6 +503,7 @@ protected Transformation<RowData> translateToPlanInternal(
499503
500504 private static LookupHandlerBase generateLookupHandler (
501505 boolean isBinaryLookup ,
506+ @ Nullable Integer id , // used for debug. `null` if it is a binary lookup
502507 DeltaJoinLookupChain .Node node ,
503508 Map <Integer , GeneratedFunction <AsyncFunction <RowData , Object >>>
504509 generatedFetcherCollector ,
@@ -507,6 +512,9 @@ private static LookupHandlerBase generateLookupHandler(
507512 FlinkTypeFactory typeFactory ,
508513 ClassLoader classLoader ,
509514 ExecNodeConfig config ) {
515+ Preconditions .checkArgument (
516+ isBinaryLookup == (id == null ), "Id should be null if it is binary lookup" );
517+
510518 final int [] sourceInputOrdinals = node .inputTableBinaryInputOrdinals ;
511519 final int lookupTableOrdinal = node .lookupTableBinaryInputOrdinal ;
512520 final RowType sourceStreamType =
@@ -584,8 +592,64 @@ private static LookupHandlerBase generateLookupHandler(
584592 node .lookupTableBinaryInputOrdinal );
585593 }
586594
587- // TODO FLINK-39233 Support cascaded delta join in runtime
588- throw new IllegalStateException ("Support later" );
595+ final RowType lookupResultPassThroughCalcRowType ;
596+ if (node .isLeftLookupRight ()) {
597+ lookupResultPassThroughCalcRowType =
598+ combineOutputRowType (
599+ sourceStreamType ,
600+ lookupSidePassThroughCalcRowType ,
601+ node .joinType ,
602+ typeFactory );
603+ } else {
604+ lookupResultPassThroughCalcRowType =
605+ combineOutputRowType (
606+ lookupSidePassThroughCalcRowType ,
607+ sourceStreamType ,
608+ swapJoinType (node .joinType ),
609+ typeFactory );
610+ }
611+
612+ GeneratedFilterCondition generatedRemainingCondition =
613+ node .deltaJoinSpec
614+ .getRemainingCondition ()
615+ .map (
616+ remainCond ->
617+ FilterCodeGenerator .generateFilterCondition (
618+ config ,
619+ planner .getFlinkContext ().getClassLoader (),
620+ remainCond ,
621+ lookupResultPassThroughCalcRowType ,
622+ GENERATED_JOIN_CONDITION_CLASS_NAME ))
623+ .orElse (null );
624+
625+ final RowDataKeySelector streamSideLookupKeySelector =
626+ KeySelectorUtil .getRowDataSelector (
627+ classLoader ,
628+ lookupKeysOnInputSide .stream ()
629+ .mapToInt (
630+ key -> {
631+ Preconditions .checkState (
632+ key instanceof FunctionCallUtil .FieldRef ,
633+ "Currently, delta join only supports to use field "
634+ + "reference as lookup key, but found %s" ,
635+ key .getClass ().getName ());
636+ return ((FunctionCallUtil .FieldRef ) key ).index ;
637+ })
638+ .toArray (),
639+ InternalTypeInfo .of (sourceStreamType ));
640+
641+ return new CascadedLookupHandler (
642+ id ,
643+ TypeConversions .fromLogicalToDataType (sourceStreamType ),
644+ lookupSideGeneratedFetcherWithType .dataType (),
645+ TypeConversions .fromLogicalToDataType (lookupSidePassThroughCalcRowType ),
646+ InternalSerializers .create (lookupSidePassThroughCalcRowType ),
647+ lookupSideGeneratedCalc ,
648+ generatedRemainingCondition ,
649+ streamSideLookupKeySelector ,
650+ node .inputTableBinaryInputOrdinals ,
651+ node .lookupTableBinaryInputOrdinal ,
652+ node .isLeftLookupRight ());
589653 }
590654
591655 private static RowDataKeySelector getUpsertKeySelector (
@@ -600,23 +664,6 @@ private static RowDataKeySelector getUpsertKeySelector(
600664 classLoader , finalUpsertKeys , InternalTypeInfo .of (rowType ));
601665 }
602666
603- private boolean enableCache (ReadableConfig config ) {
604- return config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED );
605- }
606-
607- /** Get the left cache size and right size. */
608- private Tuple2 <Long , Long > getCacheSize (ReadableConfig config ) {
609- long leftCacheSize =
610- config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_LEFT_CACHE_SIZE );
611- long rightCacheSize =
612- config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_RIGHT_CACHE_SIZE );
613- if ((leftCacheSize <= 0 || rightCacheSize <= 0 ) && enableCache (config )) {
614- throw new IllegalArgumentException (
615- "Cache size in delta join must be positive when enabling cache." );
616- }
617- return Tuple2 .of (leftCacheSize , rightCacheSize );
618- }
619-
620667 private abstract static class DeltaJoinOperatorFactoryBuilder {
621668 protected final PlannerBase planner ;
622669 protected final ExecNodeConfig config ;
@@ -651,6 +698,23 @@ public DeltaJoinOperatorFactoryBuilder(
651698 }
652699
653700 protected abstract StreamOperatorFactory <RowData > build ();
701+
702+ /** Get the left cache size and right size. */
703+ protected Tuple2 <Long , Long > getCacheSize (ReadableConfig config ) {
704+ long leftCacheSize =
705+ config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_LEFT_CACHE_SIZE );
706+ long rightCacheSize =
707+ config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_RIGHT_CACHE_SIZE );
708+ if ((leftCacheSize <= 0 || rightCacheSize <= 0 ) && enableCache (config )) {
709+ throw new IllegalArgumentException (
710+ "Cache size in delta join must be positive when enabling cache." );
711+ }
712+ return Tuple2 .of (leftCacheSize , rightCacheSize );
713+ }
714+
715+ protected boolean enableCache (ReadableConfig config ) {
716+ return config .get (ExecutionConfigOptions .TABLE_EXEC_DELTA_JOIN_CACHE_ENABLED );
717+ }
654718 }
655719
656720 private class DeltaJoinOperatorFactoryBuilderV1 extends DeltaJoinOperatorFactoryBuilder {
@@ -798,6 +862,7 @@ private DeltaJoinHandlerChain buildBinaryLookupHandlerChain(
798862 Collections .singletonList (
799863 generateLookupHandler (
800864 true , // isBinaryLookup
865+ null ,
801866 node ,
802867 generatedFetcherCollector ,
803868 deltaJoinTree ,
@@ -926,9 +991,10 @@ public StreamOperatorFactory<RowData> build() {
926991 Map <Integer , GeneratedFunction <AsyncFunction <RowData , Object >>>
927992 generatedFetcherCollector = new HashMap <>();
928993 DeltaJoinHandlerChain left2RightHandlerChain =
929- generateDeltaJoinHandlerChain (true , generatedFetcherCollector );
994+ generateDeltaJoinHandlerChain (true , leftStreamType , generatedFetcherCollector );
930995 DeltaJoinHandlerChain right2LeftHandlerChain =
931- generateDeltaJoinHandlerChain (false , generatedFetcherCollector );
996+ generateDeltaJoinHandlerChain (
997+ false , rightStreamType , generatedFetcherCollector );
932998 Preconditions .checkState (
933999 generatedFetcherCollector .size ()
9341000 == leftAllBinaryInputOrdinals .size ()
@@ -1008,6 +1074,7 @@ public StreamOperatorFactory<RowData> build() {
10081074
10091075 private DeltaJoinHandlerChain generateDeltaJoinHandlerChain (
10101076 boolean lookupRight ,
1077+ RowType streamRowType ,
10111078 Map <Integer , GeneratedFunction <AsyncFunction <RowData , Object >>>
10121079 generatedFetcherCollector ) {
10131080 int [] streamOwnedSourceOrdinals =
@@ -1029,6 +1096,7 @@ private DeltaJoinHandlerChain generateDeltaJoinHandlerChain(
10291096 Collections .singletonList (
10301097 generateLookupHandler (
10311098 true , // isBinaryLookup
1099+ null , // debug id
10321100 nodes .get (0 ),
10331101 generatedFetcherCollector ,
10341102 deltaJoinTree ,
@@ -1039,7 +1107,40 @@ private DeltaJoinHandlerChain generateDeltaJoinHandlerChain(
10391107 streamOwnedSourceOrdinals );
10401108 }
10411109
1042- throw new UnsupportedOperationException ("Support cascaded delta join operator later" );
1110+ final List <DeltaJoinHandlerBase > lookupJoinHandlers = new ArrayList <>();
1111+
1112+ // build delta join handler chain
1113+ for (int i = 0 ; i < nodes .size (); i ++) {
1114+ DeltaJoinLookupChain .Node node = nodes .get (i );
1115+ LookupHandlerBase lookupHandler =
1116+ generateLookupHandler (
1117+ false , // isBinaryLookup
1118+ i + 1 , // debug id
1119+ node ,
1120+ generatedFetcherCollector ,
1121+ deltaJoinTree ,
1122+ planner ,
1123+ typeFactory ,
1124+ classLoader ,
1125+ config );
1126+ lookupJoinHandlers .add (lookupHandler );
1127+ }
1128+ List <Integer > lookupSideAllBinaryInputOrdinals =
1129+ lookupRight ? rightAllBinaryInputOrdinals : leftAllBinaryInputOrdinals ;
1130+ int lookupSideTableOffset = lookupRight ? leftAllBinaryInputOrdinals .size () : 0 ;
1131+ lookupJoinHandlers .add (
1132+ new TailOutputDataHandler (
1133+ lookupSideAllBinaryInputOrdinals .stream ()
1134+ .mapToInt (i -> i + lookupSideTableOffset )
1135+ .toArray ()));
1136+
1137+ Preconditions .checkArgument (
1138+ streamRowType .getFieldCount ()
1139+ == deltaJoinTree
1140+ .getOutputRowTypeOnNode (streamOwnedSourceOrdinals , typeFactory )
1141+ .getFieldCount ());
1142+
1143+ return DeltaJoinHandlerChain .build (lookupJoinHandlers , streamOwnedSourceOrdinals );
10431144 }
10441145
10451146 private Set <Set <Integer >> getAllDrivenInputsWhenLookup (boolean lookupRight ) {
0 commit comments