Skip to content

Commit b976e79

Browse files
authored
[FLINK-39293][table] MATCH_RECOGNIZE fails with SqlParserException in views
1 parent 66689e4 commit b976e79

3 files changed

Lines changed: 415 additions & 363 deletions

File tree

flink-table/flink-sql-parser/src/main/codegen/templates/Parser.jj

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2202,15 +2202,21 @@ SqlNode TableRef3(ExprContext exprContext, boolean lateral) :
22022202
[ tableRef = ExtendTable(tableRef) ]
22032203
tableRef = Over(tableRef)
22042204
[ tableRef = Snapshot(tableRef) ]
2205-
[ tableRef = MatchRecognize(tableRef) ]
2205+
[
2206+
LOOKAHEAD(3)
2207+
tableRef = MatchRecognize(tableRef)
2208+
]
22062209
)
22072210
|
22082211
LOOKAHEAD(2)
22092212
[ <LATERAL> { lateral = true; } ]
22102213
tableRef = ParenthesizedExpression(exprContext)
22112214
tableRef = Over(tableRef)
22122215
tableRef = addLateral(tableRef, lateral)
2213-
[ tableRef = MatchRecognize(tableRef) ]
2216+
[
2217+
LOOKAHEAD(3)
2218+
tableRef = MatchRecognize(tableRef)
2219+
]
22142220
|
22152221
LOOKAHEAD(2)
22162222
[ <LATERAL> ] // "LATERAL" is implicit with "UNNEST", so ignore
@@ -3059,6 +3065,7 @@ void AddUnpivotValue(List<SqlNode> list) :
30593065
SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
30603066
{
30613067
final Span s, s0, s1, s2;
3068+
final SqlIdentifier aliasBeforeMatch;
30623069
final SqlNodeList measureList;
30633070
final SqlNodeList partitionList;
30643071
final SqlNodeList orderList;
@@ -3073,6 +3080,12 @@ SqlMatchRecognize MatchRecognize(SqlNode tableRef) :
30733080
final SqlLiteral isStrictEnds;
30743081
}
30753082
{
3083+
[
3084+
<AS> aliasBeforeMatch = SimpleIdentifier() {
3085+
tableRef = SqlStdOperatorTable.AS.createCall(
3086+
Span.of(tableRef).end(this), tableRef, aliasBeforeMatch);
3087+
}
3088+
]
30763089
<MATCH_RECOGNIZE> { s = span(); checkNotJoin(tableRef); } <LPAREN>
30773090
(
30783091
<PARTITION> { s2 = span(); } <BY>
@@ -7209,7 +7222,7 @@ SqlCall MatchRecognizeCallWithModifier() :
72097222
{
72107223
final Span s;
72117224
final SqlOperator runningOp;
7212-
final SqlNode func;
7225+
final SqlNode e;
72137226
}
72147227
{
72157228
(
@@ -7218,8 +7231,8 @@ SqlCall MatchRecognizeCallWithModifier() :
72187231
<FINAL> { runningOp = SqlStdOperatorTable.FINAL; }
72197232
)
72207233
{ s = span(); }
7221-
func = NamedFunctionCall() {
7222-
return runningOp.createCall(s.end(func), func);
7234+
e = Expression3(ExprContext.ACCEPT_NON_QUERY) {
7235+
return runningOp.createCall(s.end(e), e);
72237236
}
72247237
}
72257238

flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -169,22 +169,35 @@
169169
* Default implementation of {@link SqlValidator}, the class was copied over because of
170170
* CALCITE-4554.
171171
*
172-
* <p>Lines 202 ~ 205, Flink improves error message for functions without appropriate arguments in
172+
* <p>Lines 219 ~ 222, Flink improves error message for functions without appropriate arguments in
173173
* handleUnresolvedFunction.
174174
*
175-
* <p>Lines 1270 ~ 1272, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
175+
* <p>Lines 1287 ~ 1289, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
176176
*
177-
* <p>Lines 2031 ~ 2045, Flink improves error message for functions without appropriate arguments in
177+
* <p>Lines 2048 ~ 2062, Flink improves error message for functions without appropriate arguments in
178178
* handleUnresolvedFunction at {@link SqlValidatorImpl#handleUnresolvedFunction}.
179179
*
180-
* <p>Lines 2571 ~ 2588, CALCITE-7217, should be removed after upgrading Calcite to 1.41.0.
180+
* <p>Lines 2475 ~ 2477, CALCITE-7471 should be removed after upgrading Calcite to 1.42.0.
181181
*
182-
* <p>Line 2618 ~2631, set the correct scope for VECTOR_SEARCH.
182+
* <p>Lines 2590 ~ 2609, CALCITE-7217, CALCITE-7312 should be removed after upgrading Calcite to
183+
* 1.42.0.
183184
*
184-
* <p>Lines 3920 ~ 3925, 6599 ~ 6606 Flink improves Optimize the retrieval of sub-operands in
185+
* <p>Line 2640 ~2658, set the correct scope for VECTOR_SEARCH.
186+
*
187+
* <p>Lines 3937 ~ 3941, 6612 ~ 6618 Flink improves Optimize the retrieval of sub-operands in
185188
* SqlCall when using NamedParameters at {@link SqlValidatorImpl#checkRollUp}.
186189
*
187-
* <p>Lines 5340 ~ 5347, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
190+
* <p>Lines 5357 ~ 5363, FLINK-24352 Add null check for temporal table check on SqlSnapshot.
191+
*
192+
* <p>Lines 5782-5784, CALCITE-7466 should be removed after upgrading Calcite to 1.42.0.
193+
*
194+
* <p>Lines 5838-5840, CALCITE-7470 should be removed after upgrading Calcite to 1.42.0.
195+
*
196+
* <p>Lines 7267-7290, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0.
197+
*
198+
* <p>Lines 7335-7352, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0.
199+
*
200+
* <p>Lines 7397-7405, CALCITE-7486 should be removed after upgrading Calcite to 1.42.0.
188201
*/
189202
public class SqlValidatorImpl implements SqlValidatorWithHints {
190203
// ~ Static fields/initializers ---------------------------------------------
@@ -2459,7 +2472,9 @@ private SqlNode registerFrom(
24592472
enclosingNode,
24602473
alias,
24612474
forceNullable);
2462-
return node;
2475+
// ----- FLINK MODIFICATION BEGIN -----
2476+
return newNode;
2477+
// ----- FLINK MODIFICATION END -----
24632478

24642479
case PIVOT:
24652480
registerPivot(
@@ -5764,11 +5779,9 @@ private PairList<String, RelDataType> validateMeasure(
57645779
setValidatedNodeType(measure, type);
57655780

57665781
fields.add(alias, type);
5767-
sqlNodes.add(
5768-
SqlStdOperatorTable.AS.createCall(
5769-
SqlParserPos.ZERO,
5770-
expand,
5771-
new SqlIdentifier(alias, SqlParserPos.ZERO)));
5782+
// ----- FLINK MODIFICATION BEGIN -----
5783+
sqlNodes.add(expand);
5784+
// ----- FLINK MODIFICATION END -----
57725785
}
57735786

57745787
SqlNodeList list = new SqlNodeList(sqlNodes, measures.getParserPosition());
@@ -5822,11 +5835,9 @@ private void validateDefinitions(SqlMatchRecognize mr, MatchRecognizeScope scope
58225835

58235836
// Some extra work need required here.
58245837
// In PREV, NEXT, FINAL and LAST, only one pattern variable is allowed.
5825-
sqlNodes.add(
5826-
SqlStdOperatorTable.AS.createCall(
5827-
SqlParserPos.ZERO,
5828-
expand,
5829-
new SqlIdentifier(alias, SqlParserPos.ZERO)));
5838+
// ----- FLINK MODIFICATION BEGIN -----
5839+
sqlNodes.add(expand);
5840+
// ----- FLINK MODIFICATION END -----
58305841

58315842
final RelDataType type = deriveType(scope, expand);
58325843
if (!SqlTypeUtil.inBooleanFamily(type)) {
@@ -7251,19 +7262,31 @@ private class PatternValidator extends SqlBasicVisitor<@Nullable Set<String>> {
72517262
int firstLastCount;
72527263
int prevNextCount;
72537264
int aggregateCount;
7265+
// ----- FLINK MODIFICATION BEGIN -----
7266+
int index;
7267+
int argCount;
72547268

72557269
PatternValidator(boolean isMeasure) {
7256-
this(isMeasure, 0, 0, 0);
7270+
this(isMeasure, 0, 0, 0, 0, 0);
72577271
}
72587272

72597273
PatternValidator(
7260-
boolean isMeasure, int firstLastCount, int prevNextCount, int aggregateCount) {
7274+
boolean isMeasure,
7275+
int firstLastCount,
7276+
int prevNextCount,
7277+
int aggregateCount,
7278+
int index,
7279+
int argCount) {
72617280
this.isMeasure = isMeasure;
72627281
this.firstLastCount = firstLastCount;
72637282
this.prevNextCount = prevNextCount;
72647283
this.aggregateCount = aggregateCount;
7284+
this.index = index;
7285+
this.argCount = argCount;
72657286
}
72667287

7288+
// ----- FLINK MODIFICATION END -----
7289+
72677290
@Override
72687291
public Set<String> visit(SqlCall call) {
72697292
boolean isSingle = false;
@@ -7309,7 +7332,9 @@ public Set<String> visit(SqlCall call) {
73097332
call, Static.RESOURCE.patternRunningFunctionInDefine(call.toString()));
73107333
}
73117334

7312-
for (SqlNode node : operands) {
7335+
// ----- FLINK MODIFICATION BEGIN -----
7336+
for (int i = 0; i < operands.size(); i++) {
7337+
SqlNode node = operands.get(i);
73137338
if (node != null) {
73147339
vars.addAll(
73157340
requireNonNull(
@@ -7318,10 +7343,13 @@ public Set<String> visit(SqlCall call) {
73187343
isMeasure,
73197344
firstLastCount,
73207345
prevNextCount,
7321-
aggregateCount)),
7346+
aggregateCount,
7347+
i,
7348+
operands.size())),
73227349
() -> "node.accept(PatternValidator) for node " + node));
73237350
}
73247351
}
7352+
// ----- FLINK MODIFICATION END -----
73257353

73267354
if (isSingle) {
73277355
switch (kind) {
@@ -7366,7 +7394,15 @@ public Set<String> visit(SqlIdentifier identifier) {
73667394

73677395
@Override
73687396
public Set<String> visit(SqlLiteral literal) {
7369-
return ImmutableSet.of();
7397+
// ----- FLINK MODIFICATION BEGIN -----
7398+
if ((this.argCount == 1 || this.index < this.argCount - 1)
7399+
&& (this.firstLastCount > 0 || this.prevNextCount > 0)
7400+
&& !SqlUtil.isNull(literal)) {
7401+
return ImmutableSet.of(literal.toValue());
7402+
} else {
7403+
return ImmutableSet.of();
7404+
}
7405+
// ----- FLINK MODIFICATION END -----
73707406
}
73717407

73727408
@Override

0 commit comments

Comments
 (0)