Skip to content

Commit 92e67cf

Browse files
authored
[FLINK-39421][table] Metadata filter push-down for table sources
This closes #27913.
1 parent f005e5c commit 92e67cf

10 files changed

Lines changed: 1300 additions & 137 deletions

File tree

flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.java

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import org.apache.flink.table.connector.source.ScanTableSource;
2626
import org.apache.flink.table.data.RowData;
2727
import org.apache.flink.table.data.utils.JoinedRowData;
28+
import org.apache.flink.table.expressions.ResolvedExpression;
2829
import org.apache.flink.table.factories.Factory;
2930
import org.apache.flink.table.types.DataType;
3031
import org.apache.flink.table.types.logical.LogicalType;
3132
import org.apache.flink.table.types.logical.utils.LogicalTypeCasts;
3233

34+
import java.util.Collections;
3335
import java.util.LinkedHashMap;
3436
import java.util.List;
3537
import java.util.Map;
@@ -154,4 +156,71 @@ public interface SupportsReadingMetadata {
154156
default boolean supportsMetadataProjection() {
155157
return true;
156158
}
159+
160+
/**
161+
* Whether this source supports filtering on metadata columns.
162+
*
163+
* <p>When this method returns {@code true}, the planner may call {@link
164+
* #applyMetadataFilters(List)} during optimization with predicates expressed in metadata key
165+
* names (from {@link #listReadableMetadata()}), not SQL column aliases. Sources that do not
166+
* override this method will not receive metadata filter predicates.
167+
*
168+
* <p>This is independent of {@link SupportsFilterPushDown}, which handles physical column
169+
* predicates. A source can implement both to accept filters on physical and metadata columns.
170+
*/
171+
default boolean supportsMetadataFilterPushDown() {
172+
return false;
173+
}
174+
175+
/**
176+
* Provides a list of metadata filters in conjunctive form. A source can pick filters and return
177+
* the accepted and remaining filters. Same contract as {@link
178+
* SupportsFilterPushDown#applyFilters(List)}, but for metadata columns.
179+
*
180+
* <p>The provided filters reference metadata key names (from {@link #listReadableMetadata()}),
181+
* not SQL column aliases. For example, a column declared as {@code msg_offset BIGINT METADATA
182+
* FROM 'offset'} will have its predicate expressed as {@code offset >= 1000}, not {@code
183+
* msg_offset >= 1000}. The planner handles the alias-to-key translation before calling this
184+
* method.
185+
*/
186+
default MetadataFilterResult applyMetadataFilters(List<ResolvedExpression> metadataFilters) {
187+
return MetadataFilterResult.of(Collections.emptyList(), metadataFilters);
188+
}
189+
190+
/**
191+
* Result of a metadata filter push down. Communicates the source's response to the planner
192+
* during optimization.
193+
*/
194+
@PublicEvolving
195+
final class MetadataFilterResult {
196+
private final List<ResolvedExpression> acceptedFilters;
197+
private final List<ResolvedExpression> remainingFilters;
198+
199+
private MetadataFilterResult(
200+
List<ResolvedExpression> acceptedFilters,
201+
List<ResolvedExpression> remainingFilters) {
202+
this.acceptedFilters = acceptedFilters;
203+
this.remainingFilters = remainingFilters;
204+
}
205+
206+
/**
207+
* Constructs a metadata filter push-down result.
208+
*
209+
* @param acceptedFilters filters consumed by the source (best effort)
210+
* @param remainingFilters filters that a subsequent operation must still apply at runtime
211+
*/
212+
public static MetadataFilterResult of(
213+
List<ResolvedExpression> acceptedFilters,
214+
List<ResolvedExpression> remainingFilters) {
215+
return new MetadataFilterResult(acceptedFilters, remainingFilters);
216+
}
217+
218+
public List<ResolvedExpression> getAcceptedFilters() {
219+
return acceptedFilters;
220+
}
221+
222+
public List<ResolvedExpression> getRemainingFilters() {
223+
return remainingFilters;
224+
}
225+
}
157226
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java

Lines changed: 56 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -99,50 +99,9 @@ public static SupportsFilterPushDown.Result apply(
9999
DynamicTableSource tableSource,
100100
SourceAbilityContext context) {
101101
if (tableSource instanceof SupportsFilterPushDown) {
102-
RexNodeToExpressionConverter converter =
103-
new RexNodeToExpressionConverter(
104-
new RexBuilder(context.getTypeFactory()),
105-
context.getSourceRowType().getFieldNames().toArray(new String[0]),
106-
context.getFunctionCatalog(),
107-
context.getCatalogManager(),
108-
Option.apply(
109-
context.getTypeFactory()
110-
.buildRelNodeRowType(context.getSourceRowType())));
111-
List<Expression> filters =
112-
predicates.stream()
113-
.map(
114-
p -> {
115-
scala.Option<ResolvedExpression> expr = p.accept(converter);
116-
if (expr.isDefined()) {
117-
return expr.get();
118-
} else {
119-
throw new TableException(
120-
String.format(
121-
"%s can not be converted to Expression, please make sure %s can accept %s.",
122-
p.toString(),
123-
tableSource.getClass().getSimpleName(),
124-
p.toString()));
125-
}
126-
})
127-
.collect(Collectors.toList());
128-
ExpressionResolver resolver =
129-
ExpressionResolver.resolverFor(
130-
context.getTableConfig(),
131-
context.getClassLoader(),
132-
name -> Optional.empty(),
133-
context.getFunctionCatalog()
134-
.asLookup(
135-
str -> {
136-
throw new TableException(
137-
"We should not need to lookup any expressions at this point");
138-
}),
139-
context.getCatalogManager().getDataTypeFactory(),
140-
(sqlExpression, inputRowType, outputType) -> {
141-
throw new TableException(
142-
"SQL expression parsing is not supported at this location.");
143-
})
144-
.build();
145-
return ((SupportsFilterPushDown) tableSource).applyFilters(resolver.resolve(filters));
102+
List<ResolvedExpression> resolved =
103+
resolvePredicates(predicates, context.getSourceRowType(), tableSource, context);
104+
return ((SupportsFilterPushDown) tableSource).applyFilters(resolved);
146105
} else {
147106
throw new TableException(
148107
String.format(
@@ -151,6 +110,59 @@ public static SupportsFilterPushDown.Result apply(
151110
}
152111
}
153112

113+
/**
114+
* Converts {@link RexNode} predicates to {@link ResolvedExpression}s using the given row type.
115+
* Shared between physical and metadata filter push-down paths.
116+
*/
117+
static List<ResolvedExpression> resolvePredicates(
118+
List<RexNode> predicates,
119+
RowType rowType,
120+
DynamicTableSource tableSource,
121+
SourceAbilityContext context) {
122+
RexNodeToExpressionConverter converter =
123+
new RexNodeToExpressionConverter(
124+
new RexBuilder(context.getTypeFactory()),
125+
rowType.getFieldNames().toArray(new String[0]),
126+
context.getFunctionCatalog(),
127+
context.getCatalogManager(),
128+
Option.apply(context.getTypeFactory().buildRelNodeRowType(rowType)));
129+
List<Expression> filters =
130+
predicates.stream()
131+
.map(
132+
p -> {
133+
scala.Option<ResolvedExpression> expr = p.accept(converter);
134+
if (expr.isDefined()) {
135+
return expr.get();
136+
} else {
137+
throw new TableException(
138+
String.format(
139+
"%s can not be converted to Expression, please make sure %s can accept %s.",
140+
p.toString(),
141+
tableSource.getClass().getSimpleName(),
142+
p.toString()));
143+
}
144+
})
145+
.collect(Collectors.toList());
146+
ExpressionResolver resolver =
147+
ExpressionResolver.resolverFor(
148+
context.getTableConfig(),
149+
context.getClassLoader(),
150+
name -> Optional.empty(),
151+
context.getFunctionCatalog()
152+
.asLookup(
153+
str -> {
154+
throw new TableException(
155+
"We should not need to lookup any expressions at this point");
156+
}),
157+
context.getCatalogManager().getDataTypeFactory(),
158+
(sqlExpression, inputRowType, outputType) -> {
159+
throw new TableException(
160+
"SQL expression parsing is not supported at this location.");
161+
})
162+
.build();
163+
return resolver.resolve(filters);
164+
}
165+
154166
@Override
155167
public boolean needAdjustFieldReferenceAfterProjection() {
156168
return true;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.planner.plan.abilities.source;
20+
21+
import org.apache.flink.table.api.TableException;
22+
import org.apache.flink.table.connector.source.DynamicTableSource;
23+
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
24+
import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata.MetadataFilterResult;
25+
import org.apache.flink.table.expressions.ResolvedExpression;
26+
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
27+
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
28+
import org.apache.flink.table.types.logical.RowType;
29+
30+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
31+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
32+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
33+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
34+
35+
import org.apache.calcite.rex.RexNode;
36+
37+
import java.util.ArrayList;
38+
import java.util.List;
39+
import java.util.Objects;
40+
41+
import static org.apache.flink.util.Preconditions.checkNotNull;
42+
43+
/**
44+
* Serializes metadata filter predicates and replays them during compiled plan restoration.
45+
*
46+
* <p>Predicates are stored with a {@code predicateRowType} that already uses metadata key names
47+
* (not SQL aliases). The alias-to-key translation happens once at optimization time, so no
48+
* column-to-key mapping needs to be persisted.
49+
*/
50+
@JsonIgnoreProperties(ignoreUnknown = true)
51+
@JsonTypeName("MetadataFilterPushDown")
52+
public final class MetadataFilterPushDownSpec extends SourceAbilitySpecBase {
53+
54+
public static final String FIELD_NAME_PREDICATES = "predicates";
55+
public static final String FIELD_NAME_PREDICATE_ROW_TYPE = "predicateRowType";
56+
57+
@JsonProperty(FIELD_NAME_PREDICATES)
58+
private final List<RexNode> predicates;
59+
60+
/**
61+
* Row type snapshot using metadata key names. Stored because ProjectPushDownSpec may narrow the
62+
* context's row type during restore.
63+
*/
64+
@JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE)
65+
private final RowType predicateRowType;
66+
67+
@JsonCreator
68+
public MetadataFilterPushDownSpec(
69+
@JsonProperty(FIELD_NAME_PREDICATES) List<RexNode> predicates,
70+
@JsonProperty(FIELD_NAME_PREDICATE_ROW_TYPE) RowType predicateRowType) {
71+
this.predicates = new ArrayList<>(checkNotNull(predicates));
72+
this.predicateRowType = checkNotNull(predicateRowType);
73+
}
74+
75+
public List<RexNode> getPredicates() {
76+
return predicates;
77+
}
78+
79+
@Override
80+
public void apply(DynamicTableSource tableSource, SourceAbilityContext context) {
81+
// Use stored predicateRowType; context's row type may be narrowed by ProjectPushDownSpec.
82+
MetadataFilterResult result =
83+
applyMetadataFilters(predicates, predicateRowType, tableSource, context);
84+
if (result.getAcceptedFilters().size() != predicates.size()) {
85+
throw new TableException("All metadata predicates should be accepted here.");
86+
}
87+
}
88+
89+
/**
90+
* Converts RexNode predicates to ResolvedExpressions using the given row type and calls
91+
* applyMetadataFilters on the source. The row type must already use metadata key names.
92+
*/
93+
public static MetadataFilterResult applyMetadataFilters(
94+
List<RexNode> predicates,
95+
RowType metadataKeyRowType,
96+
DynamicTableSource tableSource,
97+
SourceAbilityContext context) {
98+
if (!(tableSource instanceof SupportsReadingMetadata)) {
99+
throw new TableException(
100+
String.format(
101+
"%s does not support SupportsReadingMetadata.",
102+
tableSource.getClass().getName()));
103+
}
104+
SupportsReadingMetadata readingMetadata = (SupportsReadingMetadata) tableSource;
105+
if (!readingMetadata.supportsMetadataFilterPushDown()) {
106+
throw new TableException(
107+
String.format(
108+
"%s no longer supports metadata filter push-down.",
109+
tableSource.getClass().getName()));
110+
}
111+
List<ResolvedExpression> resolved =
112+
FilterPushDownSpec.resolvePredicates(
113+
predicates, metadataKeyRowType, tableSource, context);
114+
return readingMetadata.applyMetadataFilters(resolved);
115+
}
116+
117+
@Override
118+
public boolean needAdjustFieldReferenceAfterProjection() {
119+
return true;
120+
}
121+
122+
@Override
123+
public String getDigests(SourceAbilityContext context) {
124+
final List<String> expressionStrs = new ArrayList<>();
125+
for (RexNode rexNode : predicates) {
126+
expressionStrs.add(
127+
FlinkRexUtil.getExpressionString(
128+
rexNode,
129+
JavaScalaConversionUtil.toScala(predicateRowType.getFieldNames())));
130+
}
131+
132+
return String.format(
133+
"metadataFilter=[%s]",
134+
expressionStrs.stream()
135+
.reduce((l, r) -> String.format("and(%s, %s)", l, r))
136+
.orElse(""));
137+
}
138+
139+
@Override
140+
public boolean equals(Object o) {
141+
if (this == o) {
142+
return true;
143+
}
144+
if (o == null || getClass() != o.getClass()) {
145+
return false;
146+
}
147+
if (!super.equals(o)) {
148+
return false;
149+
}
150+
MetadataFilterPushDownSpec that = (MetadataFilterPushDownSpec) o;
151+
return Objects.equals(predicates, that.predicates)
152+
&& Objects.equals(predicateRowType, that.predicateRowType);
153+
}
154+
155+
@Override
156+
public int hashCode() {
157+
return Objects.hash(super.hashCode(), predicates, predicateRowType);
158+
}
159+
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/SourceAbilitySpec.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
@JsonSubTypes({
3939
@JsonSubTypes.Type(value = FilterPushDownSpec.class),
4040
@JsonSubTypes.Type(value = LimitPushDownSpec.class),
41+
@JsonSubTypes.Type(value = MetadataFilterPushDownSpec.class),
4142
@JsonSubTypes.Type(value = PartitionPushDownSpec.class),
4243
@JsonSubTypes.Type(value = ProjectPushDownSpec.class),
4344
@JsonSubTypes.Type(value = ReadingMetadataSpec.class),

0 commit comments

Comments
 (0)