Skip to content

Commit 5709bbe

Browse files
authored
[FLINK-39293][table] MATCH_RECOGNIZE fails with SqlParserException in views
1 parent 92e67cf commit 5709bbe

3 files changed

Lines changed: 414 additions & 363 deletions

File tree

flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2202,15 +2202,21 @@ SqlNode TableRef3(ExprContext exprContext, boolean lateral) :
22022202
[ tableRef = ExtendTable(tableRef) ]
22032203
tableRef = Over(tableRef)
22042204
[ tableRef = Snapshot(tableRef) ]
2205-
[ tableRef = MatchRecognize(tableRef) ]
2205+
[
2206+
LOOKAHEAD(3)
2207+
tableRef = MatchRecognize(tableRef)
2208+
]
22062209
)
22072210
|
22082211
LOOKAHEAD(2)
22092212
[ <LATERAL> { lateral = true; } ]
22102213
tableRef = ParenthesizedExpression(exprContext)
22112214
tableRef = Over(tableRef)
22122215
tableRef = addLateral(tableRef, lateral)
2213-
[ tableRef = MatchRecognize(tableRef) ]
2216+
[
2217+
LOOKAHEAD(3)
2218+
tableRef = MatchRecognize(tableRef)
2219+
]
22142220
|
22152221
LOOKAHEAD(2)
22162222
[ <LATERAL> ] // "LATERAL" is implicit with "UNNEST", so ignore
@@ -3059,6 +3065,7 @@ void AddUnpivotValue(List<SqlNode> list) :
30593065
SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
30603066
{
30613067
final Span s, s0, s1, s2;
3068+
final SqlIdentifier aliasBeforeMatch;
30623069
final SqlNodeList measureList;
30633070
final SqlNodeList partitionList;
30643071
final SqlNodeList orderList;
@@ -3073,6 +3080,12 @@ SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
30733080
final SqlLiteral isStrictEnds;
30743081
}
30753082
{
3083+
[
3084+
<AS> aliasBeforeMatch = SimpleIdentifier() {
3085+
tableRef = SqlStdOperatorTable.AS.createCall(
3086+
Span.of(tableRef).end(this), tableRef, aliasBeforeMatch);
3087+
}
3088+
]
30763089
<MATCH_RECOGNIZE> { s = span(); checkNotJoin(tableRef); } <LPAREN>
30773090
(
30783091
<PARTITION> { s2 = span(); } <BY>
@@ -7210,7 +7223,7 @@ SqlCall MatchRecognizeCallWithModifier() :
72107223
{
72117224
final Span s;
72127225
final SqlOperator runningOp;
7213-
final SqlNode func;
7226+
final SqlNode e;
72147227
}
72157228
{
72167229
(
@@ -7219,8 +7232,8 @@ SqlCall MatchRecognizeCallWithModifier() :
72197232
<FINAL> { runningOp = SqlStdOperatorTable.FINAL; }
72207233
)
72217234
{ s = span(); }
7222-
func = NamedFunctionCall() {
7223-
return runningOp.createCall(s.end(func), func);
7235+
e = Expression3(ExprContext.ACCEPT_NON_QUERY) {
7236+
return runningOp.createCall(s.end(e), e);
72247237
}
72257238
}
72267239

flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java

Lines changed: 58 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -169,23 +169,35 @@
169169
* Default implementation of {@link SqlValidator}, the class was copied over because of
170170
* CALCITE-4554.
171171
*
172-
* <p>Lines 207 ~ 210, Flink improves error message for functions without appropriate arguments in
172+
* <p>Lines 219 ~ 222, Flink improves error message for functions without appropriate arguments in
173173
* handleUnresolvedFunction.
174174
*
175-
* <p>Lines 1275 ~ 1277, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
175+
* <p>Lines 1287 ~ 1289, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
176176
*
177-
* <p>Lines 2036 ~ 2050, Flink improves error message for functions without appropriate arguments in
177+
* <p>Lines 2048 ~ 2062, Flink improves error message for functions without appropriate arguments in
178178
* handleUnresolvedFunction at {@link SqlValidatorImpl#handleUnresolvedFunction}.
179179
*
180-
* <p>Lines 2576 ~ 2595, CALCITE-7217, CALCITE-7312 should be removed after upgrading Calcite to
180+
* <p>Lines 2475 ~ 2477, CALCITE-7471 should be removed after upgrading Calcite to 1.42.0.
181+
*
182+
* <p>Lines 2590 ~ 2609, CALCITE-7217, CALCITE-7312 should be removed after upgrading Calcite to
181183
* 1.42.0.
182184
*
183-
* <p>Line 2626 ~2644, set the correct scope for VECTOR_SEARCH.
185+
* <p>Line 2640 ~2658, set the correct scope for VECTOR_SEARCH.
184186
*
185-
* <p>Lines 3923 ~ 3927, 6602 ~ 6608 Flink improves Optimize the retrieval of sub-operands in
187+
* <p>Lines 3937 ~ 3941, 6612 ~ 6618 Flink improves Optimize the retrieval of sub-operands in
186188
* SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
187189
*
188-
* <p>Lines 5343 ~ 5349, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
190+
* <p>Lines 5357 ~ 5363, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
191+
*
192+
* <p>Lines 5784-5786, CALCITE-7466 should be removed after upgrading Calcite to 1.42.0.
193+
*
194+
* <p>Lines 5840-5842, CALCITE-7470 should be removed after upgrading Calcite to 1.42.0.
195+
*
196+
* <p>Lines 7267-7290, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0.
197+
*
198+
* <p>Lines 7337-7354, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0.
199+
*
200+
* <p>Lines 7399-7407, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0.
189201
*/
190202
public class SqlValidatorImpl implements SqlValidatorWithHints {
191203
// ~ Static fields/initializers ---------------------------------------------
@@ -2460,7 +2472,9 @@ private SqlNode registerFrom(
24602472
enclosingNode,
24612473
alias,
24622474
forceNullable);
2463-
return node;
2475+
// ----- FLINK MODIFICATION BEGIN -----
2476+
return newNode;
2477+
// ----- FLINK MODIFICATION END -----
24642478

24652479
case PIVOT:
24662480
registerPivot(
@@ -5767,11 +5781,9 @@ private PairList<String, RelDataType> validateMeasure(
57675781
setValidatedNodeType(measure, type);
57685782

57695783
fields.add(alias, type);
5770-
sqlNodes.add(
5771-
SqlStdOperatorTable.AS.createCall(
5772-
SqlParserPos.ZERO,
5773-
expand,
5774-
new SqlIdentifier(alias, SqlParserPos.ZERO)));
5784+
// ----- FLINK MODIFICATION BEGIN -----
5785+
sqlNodes.add(expand);
5786+
// ----- FLINK MODIFICATION END -----
57755787
}
57765788

57775789
SqlNodeList list = new SqlNodeList(sqlNodes, measures.getParserPosition());
@@ -5825,11 +5837,9 @@ private void validateDefinitions(SqlMatchRecognize mr, MatchRecognizeScope scope
58255837

58265838
// Some extra work need required here.
58275839
// In PREV, NEXT, FINAL and LAST, only one pattern variable is allowed.
5828-
sqlNodes.add(
5829-
SqlStdOperatorTable.AS.createCall(
5830-
SqlParserPos.ZERO,
5831-
expand,
5832-
new SqlIdentifier(alias, SqlParserPos.ZERO)));
5840+
// ----- FLINK MODIFICATION BEGIN -----
5841+
sqlNodes.add(expand);
5842+
// ----- FLINK MODIFICATION END -----
58335843

58345844
final RelDataType type = deriveType(scope, expand);
58355845
if (!SqlTypeUtil.inBooleanFamily(type)) {
@@ -7254,19 +7264,31 @@ private class PatternValidator extends SqlBasicVisitor<@Nullable Set<String>> {
72547264
int firstLastCount;
72557265
int prevNextCount;
72567266
int aggregateCount;
7267+
// ----- FLINK MODIFICATION BEGIN -----
7268+
int index;
7269+
int argCount;
72577270

72587271
PatternValidator(boolean isMeasure) {
7259-
this(isMeasure, 0, 0, 0);
7272+
this(isMeasure, 0, 0, 0, 0, 0);
72607273
}
72617274

72627275
PatternValidator(
7263-
boolean isMeasure, int firstLastCount, int prevNextCount, int aggregateCount) {
7276+
boolean isMeasure,
7277+
int firstLastCount,
7278+
int prevNextCount,
7279+
int aggregateCount,
7280+
int index,
7281+
int argCount) {
72647282
this.isMeasure = isMeasure;
72657283
this.firstLastCount = firstLastCount;
72667284
this.prevNextCount = prevNextCount;
72677285
this.aggregateCount = aggregateCount;
7286+
this.index = index;
7287+
this.argCount = argCount;
72687288
}
72697289

7290+
// ----- FLINK MODIFICATION END -----
7291+
72707292
@Override
72717293
public Set<String> visit(SqlCall call) {
72727294
boolean isSingle = false;
@@ -7312,7 +7334,9 @@ public Set<String> visit(SqlCall call) {
73127334
call, Static.RESOURCE.patternRunningFunctionInDefine(call.toString()));
73137335
}
73147336

7315-
for (SqlNode node : operands) {
7337+
// ----- FLINK MODIFICATION BEGIN -----
7338+
for (int i = 0; i < operands.size(); i++) {
7339+
SqlNode node = operands.get(i);
73167340
if (node != null) {
73177341
vars.addAll(
73187342
requireNonNull(
@@ -7321,10 +7345,13 @@ public Set<String> visit(SqlCall call) {
73217345
isMeasure,
73227346
firstLastCount,
73237347
prevNextCount,
7324-
aggregateCount)),
7348+
aggregateCount,
7349+
i,
7350+
operands.size())),
73257351
() -> "node.accept(PatternValidator) for node " + node));
73267352
}
73277353
}
7354+
// ----- FLINK MODIFICATION END -----
73287355

73297356
if (isSingle) {
73307357
switch (kind) {
@@ -7369,7 +7396,15 @@ public Set<String> visit(SqlIdentifier identifier) {
73697396

73707397
@Override
73717398
public Set<String> visit(SqlLiteral literal) {
7372-
return ImmutableSet.of();
7399+
// ----- FLINK MODIFICATION BEGIN -----
7400+
if ((this.argCount == 1 || this.index < this.argCount - 1)
7401+
&& (this.firstLastCount > 0 || this.prevNextCount > 0)
7402+
&& !SqlUtil.isNull(literal)) {
7403+
return ImmutableSet.of(literal.toValue());
7404+
} else {
7405+
return ImmutableSet.of();
7406+
}
7407+
// ----- FLINK MODIFICATION END -----
73737408
}
73747409

73757410
@Override

0 commit comments

Comments
 (0)