Skip to content

Commit de207e1

Browse files
[FLINK-39515][table-planner] Fix compiled plan restore for built-in PTFs with default args
This closes #27996.
1 parent 5106d15 commit de207e1

10 files changed

Lines changed: 428 additions & 2 deletions

File tree

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonDeserializer.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.flink.table.planner.functions.bridging.BridgingSqlAggFunction;
3333
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
3434
import org.apache.flink.table.planner.functions.sql.BuiltInSqlOperator;
35+
import org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator;
3536
import org.apache.flink.table.planner.typeutils.SymbolUtil.SerializableSymbol;
3637

3738
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -374,7 +375,13 @@ private static RexNode deserializeCall(JsonNode jsonNode, SerdeContext serdeCont
374375
} else {
375376
callType = serdeContext.getRexBuilder().deriveReturnType(operator, rexOperands);
376377
}
377-
return serdeContext.getRexBuilder().makeCall(callType, operator, rexOperands);
378+
// SqlDefaultArgOperator is constructed per-call site by FlinkSqlCallBinding and not
379+
// registered in any operator table. Rebuild the typed Flink instance here.
380+
final SqlOperator effectiveOperator =
381+
operator.getKind() == SqlKind.DEFAULT
382+
? new SqlDefaultArgOperator(callType)
383+
: operator;
384+
return serdeContext.getRexBuilder().makeCall(callType, effectiveOperator, rexOperands);
378385
}
379386

380387
// --------------------------------------------------------------------------------------------

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/serde/RexNodeJsonSerdeTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.apache.flink.table.planner.calcite.RexTableArgCall.SortOrder;
4646
import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
4747
import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
48+
import org.apache.flink.table.planner.functions.sql.SqlDefaultArgOperator;
4849
import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
4950
import org.apache.flink.table.types.inference.TypeInference;
5051
import org.apache.flink.table.types.inference.TypeStrategies;
@@ -86,6 +87,7 @@
8687
import java.math.BigDecimal;
8788
import java.util.Arrays;
8889
import java.util.Collections;
90+
import java.util.List;
8991
import java.util.Map;
9092
import java.util.Optional;
9193
import java.util.Set;
@@ -841,7 +843,11 @@ private static Stream<RexNode> testRexNodeSerde() {
841843
0,
842844
new int[] {1},
843845
new int[] {0},
844-
new SortOrder[] {SortOrder.ASC_NULLS_LAST}));
846+
new SortOrder[] {SortOrder.ASC_NULLS_LAST}),
847+
rexBuilder.makeCall(
848+
FACTORY.createSqlType(SqlTypeName.VARCHAR),
849+
new SqlDefaultArgOperator(FACTORY.createSqlType(SqlTypeName.VARCHAR)),
850+
List.of()));
845851
}
846852

847853
// --------------------------------------------------------------------------------------------
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.nodes.exec.stream;
20+
21+
import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
22+
import org.apache.flink.table.test.program.TableTestProgram;
23+
24+
import java.util.List;
25+
26+
/**
27+
* Restore tests for the built-in {@link
28+
* org.apache.flink.table.functions.BuiltInFunctionDefinitions#FROM_CHANGELOG} PTF.
29+
*/
30+
public class FromChangelogRestoreTest extends RestoreTestBase {
31+
32+
public FromChangelogRestoreTest() {
33+
super(StreamExecProcessTableFunction.class);
34+
}
35+
36+
@Override
37+
public List<TableTestProgram> programs() {
38+
return List.of(FromChangelogTestPrograms.RETRACT_RESTORE);
39+
}
40+
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,46 @@ public class FromChangelogTestPrograms {
183183
+ "input => TABLE changelog_view)")
184184
.build();
185185

186+
// --------------------------------------------------------------------------------------------
187+
// Restore tests
188+
// --------------------------------------------------------------------------------------------
189+
190+
/**
191+
* Append source with retract op codes through FROM_CHANGELOG, split across a compiled-plan +
192+
* savepoint restore.
193+
*/
194+
public static final TableTestProgram RETRACT_RESTORE =
195+
TableTestProgram.of(
196+
"from-changelog-retract-restore",
197+
"FROM_CHANGELOG over an append CDC source restores via compiled plan "
198+
+ "+ savepoint")
199+
.setupTableSource(
200+
SourceTestStep.newBuilder("cdc_stream")
201+
.addSchema(SIMPLE_CDC_SCHEMA)
202+
.producedBeforeRestore(
203+
Row.of(1, "INSERT", "Alice"),
204+
Row.of(2, "INSERT", "Bob"))
205+
.producedAfterRestore(
206+
Row.of(1, "UPDATE_BEFORE", "Alice"),
207+
Row.of(1, "UPDATE_AFTER", "Alice2"),
208+
Row.of(2, "DELETE", "Bob"))
209+
.build())
210+
.setupTableSink(
211+
SinkTestStep.newBuilder("sink")
212+
.addSchema("id INT", "name STRING")
213+
.consumedBeforeRestore(
214+
Row.ofKind(RowKind.INSERT, 1, "Alice"),
215+
Row.ofKind(RowKind.INSERT, 2, "Bob"))
216+
.consumedAfterRestore(
217+
Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"),
218+
Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"),
219+
Row.ofKind(RowKind.DELETE, 2, "Bob"))
220+
.build())
221+
.runSql(
222+
"INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
223+
+ "input => TABLE cdc_stream)")
224+
.build();
225+
186226
// --------------------------------------------------------------------------------------------
187227
// Error validation tests
188228
// --------------------------------------------------------------------------------------------
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.nodes.exec.stream;
20+
21+
import org.apache.flink.table.planner.plan.nodes.exec.testutils.RestoreTestBase;
22+
import org.apache.flink.table.test.program.TableTestProgram;
23+
24+
import java.util.List;
25+
26+
/**
27+
* Restore tests for the built-in {@link
28+
* org.apache.flink.table.functions.BuiltInFunctionDefinitions#TO_CHANGELOG} PTF.
29+
*/
30+
public class ToChangelogRestoreTest extends RestoreTestBase {
31+
32+
public ToChangelogRestoreTest() {
33+
super(StreamExecProcessTableFunction.class);
34+
}
35+
36+
@Override
37+
public List<TableTestProgram> programs() {
38+
return List.of(ToChangelogTestPrograms.RETRACT_RESTORE);
39+
}
40+
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,38 @@ public class ToChangelogTestPrograms {
8989
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)")
9090
.build();
9191

92+
/** Retract input through TO_CHANGELOG, split across a compiled-plan + savepoint restore. */
93+
public static final TableTestProgram RETRACT_RESTORE =
94+
TableTestProgram.of(
95+
"to-changelog-retract-restore",
96+
"TO_CHANGELOG over a retract source restores via compiled plan + "
97+
+ "savepoint")
98+
.setupTableSource(
99+
SourceTestStep.newBuilder("t")
100+
.addSchema(
101+
"name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT")
102+
.addMode(ChangelogMode.all())
103+
.producedBeforeRestore(
104+
Row.ofKind(RowKind.INSERT, "Alice", 10L),
105+
Row.ofKind(RowKind.INSERT, "Bob", 20L))
106+
.producedAfterRestore(
107+
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 10L),
108+
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 30L),
109+
Row.ofKind(RowKind.DELETE, "Bob", 20L))
110+
.build())
111+
.setupTableSink(
112+
SinkTestStep.newBuilder("sink")
113+
.addSchema("op STRING", "name STRING", "score BIGINT")
114+
.consumedBeforeRestore(
115+
"+I[INSERT, Alice, 10]", "+I[INSERT, Bob, 20]")
116+
.consumedAfterRestore(
117+
"+I[UPDATE_BEFORE, Alice, 10]",
118+
"+I[UPDATE_AFTER, Alice, 30]",
119+
"+I[DELETE, Bob, 20]")
120+
.build())
121+
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input => TABLE t)")
122+
.build();
123+
92124
public static final TableTestProgram UPSERT_INPUT =
93125
TableTestProgram.of(
94126
"to-changelog-upsert-input",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
{
2+
"flinkVersion" : "2.4",
3+
"nodes" : [ {
4+
"id" : 1,
5+
"type" : "stream-exec-table-source-scan_2",
6+
"scanTableSource" : {
7+
"table" : {
8+
"identifier" : "`default_catalog`.`default_database`.`cdc_stream`",
9+
"resolvedTable" : {
10+
"schema" : {
11+
"columns" : [ {
12+
"name" : "id",
13+
"dataType" : "INT"
14+
}, {
15+
"name" : "op",
16+
"dataType" : "VARCHAR(2147483647)"
17+
}, {
18+
"name" : "name",
19+
"dataType" : "VARCHAR(2147483647)"
20+
} ]
21+
}
22+
}
23+
}
24+
},
25+
"outputType" : "ROW<`id` INT, `op` VARCHAR(2147483647), `name` VARCHAR(2147483647)>",
26+
"description" : "TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], fields=[id, op, name])"
27+
}, {
28+
"id" : 2,
29+
"type" : "stream-exec-process-table-function_1",
30+
"inputProperties" : [ {
31+
"requiredDistribution" : {
32+
"type" : "UNKNOWN"
33+
},
34+
"damBehavior" : "PIPELINED",
35+
"priority" : 0
36+
} ],
37+
"outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
38+
"description" : "ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)])",
39+
"uid" : null,
40+
"functionCall" : {
41+
"kind" : "CALL",
42+
"internalName" : "$FROM_CHANGELOG$1",
43+
"operands" : [ {
44+
"kind" : "TABLE_ARG_CALL",
45+
"inputIndex" : 0,
46+
"partitionKeys" : [ ],
47+
"orderKeys" : [ ],
48+
"orderDirections" : [ ],
49+
"type" : "ROW<`id` INT, `op` VARCHAR(2147483647), `name` VARCHAR(2147483647)> NOT NULL"
50+
}, {
51+
"kind" : "CALL",
52+
"syntax" : "SPECIAL",
53+
"internalName" : "$DEFAULT$1",
54+
"type" : "DESCRIPTOR"
55+
}, {
56+
"kind" : "CALL",
57+
"syntax" : "SPECIAL",
58+
"internalName" : "$DEFAULT$1",
59+
"type" : "MAP<VARCHAR(2147483647), VARCHAR(2147483647)>"
60+
}, {
61+
"kind" : "CALL",
62+
"syntax" : "SPECIAL",
63+
"internalName" : "$DEFAULT$1",
64+
"type" : "DESCRIPTOR"
65+
}, {
66+
"kind" : "CALL",
67+
"syntax" : "SPECIAL",
68+
"internalName" : "$DEFAULT$1",
69+
"type" : "VARCHAR(2147483647)"
70+
} ],
71+
"type" : "ROW<`id` INT, `name` VARCHAR(2147483647)> NOT NULL"
72+
},
73+
"inputChangelogModes" : [ [ "INSERT" ] ],
74+
"outputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ]
75+
}, {
76+
"id" : 3,
77+
"type" : "stream-exec-sink_2",
78+
"configuration" : {
79+
"table.exec.sink.keyed-shuffle" : "AUTO",
80+
"table.exec.sink.not-null-enforcer" : "ERROR",
81+
"table.exec.sink.rowtime-inserter" : "ENABLED",
82+
"table.exec.sink.type-length-enforcer" : "IGNORE",
83+
"table.exec.sink.upsert-materialize" : "AUTO"
84+
},
85+
"dynamicTableSink" : {
86+
"table" : {
87+
"identifier" : "`default_catalog`.`default_database`.`sink`",
88+
"resolvedTable" : {
89+
"schema" : {
90+
"columns" : [ {
91+
"name" : "id",
92+
"dataType" : "INT"
93+
}, {
94+
"name" : "name",
95+
"dataType" : "VARCHAR(2147483647)"
96+
} ]
97+
}
98+
}
99+
}
100+
},
101+
"inputChangelogMode" : [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ],
102+
"upsertMaterializeStrategy" : "ADAPTIVE",
103+
"inputProperties" : [ {
104+
"requiredDistribution" : {
105+
"type" : "UNKNOWN"
106+
},
107+
"damBehavior" : "PIPELINED",
108+
"priority" : 0
109+
} ],
110+
"outputType" : "ROW<`id` INT, `name` VARCHAR(2147483647)>",
111+
"description" : "Sink(table=[default_catalog.default_database.sink], fields=[id, name])"
112+
} ],
113+
"edges" : [ {
114+
"source" : 1,
115+
"target" : 2,
116+
"shuffle" : {
117+
"type" : "FORWARD"
118+
},
119+
"shuffleMode" : "PIPELINED"
120+
}, {
121+
"source" : 2,
122+
"target" : 3,
123+
"shuffle" : {
124+
"type" : "FORWARD"
125+
},
126+
"shuffleMode" : "PIPELINED"
127+
} ]
128+
}

0 commit comments

Comments
 (0)