Skip to content

Commit 9eac2c7

Browse files
Vova Kolmakovclaude
andcommitted
feat: push IN predicate down to Lance filter engine
Render `field IN (v1, v2, ...)` from Flink's IN CallExpression using the existing literal-escaping helper, and expose getFilters() so tests can assert the exact Lance SQL string. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent fc3d064 commit 9eac2c7

2 files changed

Lines changed: 39 additions & 1 deletion

File tree

src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSource.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050

5151
import java.util.ArrayList;
5252
import java.util.Arrays;
53+
import java.util.Collections;
5354
import java.util.List;
5455
import java.util.stream.Collectors;
5556

@@ -260,13 +261,38 @@ else if (funcDef == BuiltInFunctionDefinitions.IS_NULL) {
260261
else if (funcDef == BuiltInFunctionDefinitions.LIKE) {
261262
return buildComparisonFilter(args, "LIKE");
262263
}
263-
// IN (not supported yet, requires more complex handling)
264+
// IN: args[0] is the field reference, args[1..n] are literal values
265+
else if (funcDef == BuiltInFunctionDefinitions.IN) {
266+
return buildInFilter(args);
267+
}
264268
// BETWEEN (not supported yet)
265269

266270
// Unsupported functions, return null
267271
return null;
268272
}
269273

274+
/**
275+
* Build IN filter expression: {@code field IN (v1, v2, ...)}.
276+
* Returns null if the field side is not a reference, the list is empty,
277+
* or any value cannot be rendered as a literal — pushdown is all-or-nothing
278+
* so Lance never sees a partial predicate.
279+
*/
280+
private String buildInFilter(List<ResolvedExpression> args) {
281+
if (args.size() < 2 || !(args.get(0) instanceof FieldReferenceExpression)) {
282+
return null;
283+
}
284+
String fieldName = ((FieldReferenceExpression) args.get(0)).getName();
285+
List<String> values = new ArrayList<>();
286+
for (int i = 1; i < args.size(); i++) {
287+
String value = extractLiteralValue(args.get(i));
288+
if (value == null) {
289+
return null;
290+
}
291+
values.add(value);
292+
}
293+
return fieldName + " IN (" + String.join(", ", values) + ")";
294+
}
295+
270296
/**
271297
* Build comparison filter expression
272298
*/
@@ -369,6 +395,14 @@ public LanceOptions getOptions() {
369395
return options;
370396
}
371397

398+
/**
399+
* Lance-side filter strings accumulated by {@link #applyFilters(List)}, in acceptance order.
400+
* Exposed so callers can inspect what was actually pushed down versus left in Flink.
401+
*/
402+
public List<String> getFilters() {
403+
return Collections.unmodifiableList(filters);
404+
}
405+
372406
/**
373407
* Get physical data type
374408
*/

src/test/java/org/apache/flink/connector/lance/table/LanceReadOptimizationsTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ void testInPredicatePushDown() {
265265
SupportsFilterPushDown.Result result = source.applyFilters(Collections.singletonList(inExpr));
266266

267267
assertEquals(1, result.getAcceptedFilters().size(), "IN predicate should be accepted");
268+
assertEquals(
269+
Collections.singletonList("status IN ('active', 'pending', 'completed')"),
270+
source.getFilters(),
271+
"IN predicate should be rendered as a Lance SQL IN clause with quoted string literals");
268272
}
269273

270274
@Test

0 commit comments

Comments
 (0)