Skip to content

Commit 2fbf74a

Browse files
committed
Add pluggable custom filter predicates
1 parent b7103bd commit 2fbf74a

30 files changed

Lines changed: 648 additions & 69 deletions

File tree

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,10 @@ public Set<String> getPartitionColumns() {
119119
private boolean isPartitionMatch(Expression filterExpression,
120120
Map<String, SegmentPartitionInfo> columnPartitionInfoMap) {
121121
Function function = filterExpression.getFunctionCall();
122-
FilterKind filterKind = FilterKind.valueOf(function.getOperator());
122+
FilterKind filterKind = FilterKind.fromOperator(function.getOperator());
123+
if (filterKind == null) {
124+
return true;
125+
}
123126
List<Expression> operands = function.getOperands();
124127
switch (filterKind) {
125128
case AND:

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,10 @@ public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) {
109109

110110
private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionInfo partitionInfo) {
111111
Function function = filterExpression.getFunctionCall();
112-
FilterKind filterKind = FilterKind.valueOf(function.getOperator());
112+
FilterKind filterKind = FilterKind.fromOperator(function.getOperator());
113+
if (filterKind == null) {
114+
return true;
115+
}
113116
List<Expression> operands = function.getOperands();
114117
switch (filterKind) {
115118
case AND:

pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/TimeSegmentPruner.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,10 @@ public Set<String> prune(BrokerRequest brokerRequest, Set<String> segments) {
173173
@Nullable
174174
private List<Interval> getFilterTimeIntervals(Expression filterExpression) {
175175
Function function = filterExpression.getFunctionCall();
176-
FilterKind filterKind = FilterKind.valueOf(function.getOperator());
176+
FilterKind filterKind = FilterKind.fromOperator(function.getOperator());
177+
if (filterKind == null) {
178+
return null;
179+
}
177180
List<Expression> operands = function.getOperands();
178181
switch (filterKind) {
179182
case AND:

pinot-common/src/main/java/org/apache/pinot/common/filter/FilterPredicatePlugin.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public interface FilterPredicatePlugin {
4444
/**
4545
* Returns the canonical name of this filter predicate (e.g., "SEMANTIC_MATCH").
4646
* Must be unique across all registered plugins and built-in filter kinds.
47-
* The name is case-insensitive and will be uppercased for matching.
47+
* The name is case-insensitive and will be canonicalized for matching.
4848
*/
4949
String name();
5050

@@ -94,6 +94,19 @@ default List<Integer> getOptionalOperandIndices() {
9494
return List.of();
9595
}
9696

97+
/**
98+
* Returns whether this predicate accepts additional trailing operands beyond those declared in
99+
* {@link #getOperandTypes()}.
100+
*
101+
* <p>Variadic custom predicates are currently registered as homogeneous variadic functions in Calcite,
102+
* so every declared operand type must be identical. For example,
103+
* {@code List.of(OperandType.STRING, OperandType.STRING)} with this flag enabled accepts
104+
* {@code (STRING, STRING, STRING, ...)}.
105+
*/
106+
default boolean acceptsVariadicArguments() {
107+
return false;
108+
}
109+
97110
// --- Predicate Creation Layer (RequestContextUtils) ---
98111

99112
/**

pinot-common/src/main/java/org/apache/pinot/common/filter/FilterPredicateRegistry.java

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,25 @@
1818
*/
1919
package org.apache.pinot.common.filter;
2020

21+
import com.google.common.base.Preconditions;
2122
import java.util.Collection;
22-
import java.util.Locale;
2323
import java.util.Map;
24-
import java.util.ServiceLoader;
2524
import java.util.concurrent.ConcurrentHashMap;
2625
import java.util.concurrent.atomic.AtomicBoolean;
2726
import javax.annotation.Nullable;
27+
import org.apache.pinot.common.function.FunctionRegistry;
28+
import org.apache.pinot.common.function.TransformFunctionType;
29+
import org.apache.pinot.segment.spi.AggregationFunctionType;
30+
import org.apache.pinot.spi.plugin.PluginManager;
31+
import org.apache.pinot.sql.FilterKind;
2832
import org.slf4j.Logger;
2933
import org.slf4j.LoggerFactory;
3034

3135

3236
/**
3337
* Registry for custom filter predicate plugins.
3438
*
35-
* <p>Plugins are discovered automatically via {@link ServiceLoader} during {@link #init()},
39+
* <p>Plugins are discovered automatically via {@link PluginManager} during {@link #init()},
3640
* and can also be registered programmatically via {@link #register(FilterPredicatePlugin)}.
3741
*
3842
* <p>This registry is consulted at multiple points in the query pipeline when the filter name
@@ -53,15 +57,14 @@ private FilterPredicateRegistry() {
5357
}
5458

5559
/**
56-
* Discovers and registers all {@link FilterPredicatePlugin} implementations found via ServiceLoader.
57-
* Safe to call multiple times; subsequent calls after the first are no-ops.
60+
* Discovers and registers all {@link FilterPredicatePlugin} implementations visible from the application classpath
61+
* and loaded Pinot plugins. Safe to call multiple times; subsequent calls after the first are no-ops.
5862
*/
5963
public static void init() {
6064
if (!INITIALIZED.compareAndSet(false, true)) {
6165
return;
6266
}
63-
ServiceLoader<FilterPredicatePlugin> loader = ServiceLoader.load(FilterPredicatePlugin.class);
64-
for (FilterPredicatePlugin plugin : loader) {
67+
for (FilterPredicatePlugin plugin : PluginManager.get().loadServices(FilterPredicatePlugin.class)) {
6568
register(plugin);
6669
}
6770
}
@@ -73,29 +76,32 @@ public static void init() {
7376
* @throws IllegalStateException if a different plugin is already registered with the same name
7477
*/
7578
public static void register(FilterPredicatePlugin plugin) {
76-
String name = plugin.name().toUpperCase(Locale.ROOT);
77-
FilterPredicatePlugin existing = REGISTRY.putIfAbsent(name, plugin);
79+
String canonicalName = canonicalize(plugin.name());
80+
Preconditions.checkState(!isBuiltInName(canonicalName),
81+
"Cannot register FilterPredicatePlugin '%s' (%s): name collides with a built-in Pinot predicate or function",
82+
plugin.name(), plugin.getClass().getName());
83+
FilterPredicatePlugin existing = REGISTRY.putIfAbsent(canonicalName, plugin);
7884
if (existing != null && existing != plugin) {
7985
throw new IllegalStateException(
8086
String.format("Cannot register FilterPredicatePlugin '%s' (%s): already registered by %s",
81-
name, plugin.getClass().getName(), existing.getClass().getName()));
87+
plugin.name(), plugin.getClass().getName(), existing.getClass().getName()));
8288
}
83-
LOGGER.info("Registered custom filter predicate plugin: {} ({})", name, plugin.getClass().getName());
89+
LOGGER.info("Registered custom filter predicate plugin: {} ({})", plugin.name(), plugin.getClass().getName());
8490
}
8591

8692
/**
8793
* Returns the plugin registered for the given filter name, or null if none is registered.
8894
*/
8995
@Nullable
9096
public static FilterPredicatePlugin get(String name) {
91-
return REGISTRY.get(name.toUpperCase(Locale.ROOT));
97+
return REGISTRY.get(canonicalize(name));
9298
}
9399

94100
/**
95101
* Returns true if a custom filter predicate plugin is registered for the given name.
96102
*/
97103
public static boolean isRegistered(String name) {
98-
return REGISTRY.containsKey(name.toUpperCase(Locale.ROOT));
104+
return REGISTRY.containsKey(canonicalize(name));
99105
}
100106

101107
/**
@@ -112,4 +118,32 @@ public static void clear() {
112118
REGISTRY.clear();
113119
INITIALIZED.set(false);
114120
}
121+
122+
private static String canonicalize(String name) {
123+
return FunctionRegistry.canonicalize(name);
124+
}
125+
126+
private static boolean isBuiltInName(String canonicalName) {
127+
if (FunctionRegistry.contains(canonicalName)) {
128+
return true;
129+
}
130+
for (FilterKind filterKind : FilterKind.values()) {
131+
if (canonicalize(filterKind.name()).equals(canonicalName)) {
132+
return true;
133+
}
134+
}
135+
for (TransformFunctionType functionType : TransformFunctionType.values()) {
136+
for (String name : functionType.getNames()) {
137+
if (canonicalize(name).equals(canonicalName)) {
138+
return true;
139+
}
140+
}
141+
}
142+
for (AggregationFunctionType functionType : AggregationFunctionType.values()) {
143+
if (canonicalize(functionType.getName()).equals(canonicalName)) {
144+
return true;
145+
}
146+
}
147+
return false;
148+
}
115149
}

pinot-common/src/main/java/org/apache/pinot/common/request/context/RequestContextUtils.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.ArrayList;
2323
import java.util.Collections;
2424
import java.util.List;
25-
import org.apache.commons.lang3.EnumUtils;
2625
import org.apache.pinot.common.filter.FilterPredicatePlugin;
2726
import org.apache.pinot.common.filter.FilterPredicateRegistry;
2827
import org.apache.pinot.common.request.Expression;
@@ -136,13 +135,22 @@ private static FilterContext getFilterInner(Expression thriftExpression) {
136135
private static FilterContext getFilterInner(Function thriftFunction) {
137136
String functionOperator = thriftFunction.getOperator();
138137

138+
FilterPredicatePlugin customPlugin = FilterPredicateRegistry.get(functionOperator);
139+
if (customPlugin != null) {
140+
List<ExpressionContext> operands = new ArrayList<>(thriftFunction.getOperandsSize());
141+
for (Expression operand : thriftFunction.getOperands()) {
142+
operands.add(getExpression(operand));
143+
}
144+
return FilterContext.forPredicate(customPlugin.createPredicate(operands));
145+
}
146+
139147
// convert "WHERE startsWith(col, 'str')" to "WHERE startsWith(col, 'str') = true"
140-
if (!EnumUtils.isValidEnum(FilterKind.class, functionOperator)) {
148+
FilterKind filterKind = FilterKind.fromOperator(functionOperator);
149+
if (filterKind == null) {
141150
return FilterContext.forPredicate(
142151
new EqPredicate(ExpressionContext.forFunction(getFunction(thriftFunction)), "true"));
143152
}
144153

145-
FilterKind filterKind = FilterKind.valueOf(thriftFunction.getOperator().toUpperCase());
146154
List<Expression> operands = thriftFunction.getOperands();
147155
int numOperands = operands.size();
148156
switch (filterKind) {
@@ -331,7 +339,7 @@ private static FilterContext getFilterInner(ExpressionContext filterExpression)
331339
}
332340

333341
private static FilterContext getFilterInner(FunctionContext filterFunction) {
334-
String functionOperator = filterFunction.getFunctionName().toUpperCase();
342+
String functionOperator = filterFunction.getFunctionName();
335343

336344
// Check if this is a custom filter predicate registered via plugin
337345
FilterPredicatePlugin customPlugin = FilterPredicateRegistry.get(functionOperator);
@@ -341,11 +349,11 @@ private static FilterContext getFilterInner(FunctionContext filterFunction) {
341349
}
342350

343351
// convert "WHERE startsWith(col, 'str')" to "WHERE startsWith(col, 'str') = true"
344-
if (!EnumUtils.isValidEnum(FilterKind.class, functionOperator)) {
352+
FilterKind filterKind = FilterKind.fromOperator(functionOperator);
353+
if (filterKind == null) {
345354
return FilterContext.forPredicate(new EqPredicate(ExpressionContext.forFunction(filterFunction), "true"));
346355
}
347356

348-
FilterKind filterKind = FilterKind.valueOf(filterFunction.getFunctionName().toUpperCase());
349357
List<ExpressionContext> operands = filterFunction.getArguments();
350358
int numOperands = operands.size();
351359
switch (filterKind) {

pinot-common/src/main/java/org/apache/pinot/sql/FilterKind.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
*/
1919
package org.apache.pinot.sql;
2020

21+
import javax.annotation.Nullable;
22+
2123
public enum FilterKind {
2224
AND,
2325
OR,
@@ -50,4 +52,19 @@ public boolean isRange() {
5052
return this == GREATER_THAN || this == GREATER_THAN_OR_EQUAL || this == LESS_THAN || this == LESS_THAN_OR_EQUAL
5153
|| this == BETWEEN || this == RANGE;
5254
}
55+
56+
/**
57+
* Resolves the given operator name to a built-in filter kind.
58+
*
59+
* <p>Returns {@code null} for custom predicates registered through the plugin system.
60+
*/
61+
@Nullable
62+
public static FilterKind fromOperator(String operator) {
63+
for (FilterKind filterKind : values()) {
64+
if (filterKind.name().equalsIgnoreCase(operator)) {
65+
return filterKind;
66+
}
67+
}
68+
return null;
69+
}
5370
}

pinot-common/src/main/java/org/apache/pinot/sql/parsers/rewriter/PredicateComparisonRewriter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.google.common.base.Preconditions;
2222
import java.util.List;
23-
import org.apache.commons.lang3.EnumUtils;
2423
import org.apache.pinot.common.filter.FilterPredicatePlugin;
2524
import org.apache.pinot.common.filter.FilterPredicateRegistry;
2625
import org.apache.pinot.common.request.Expression;
@@ -82,8 +81,9 @@ private static Expression updatePredicate(Expression expression) {
8281
private static Expression updateFunctionExpression(Expression expression) {
8382
Function function = expression.getFunctionCall();
8483
String functionOperator = function.getOperator();
84+
FilterKind filterKind = FilterKind.fromOperator(functionOperator);
8585

86-
if (!EnumUtils.isValidEnum(FilterKind.class, functionOperator)) {
86+
if (filterKind == null) {
8787
// Check if this is a custom filter predicate registered via plugin
8888
FilterPredicatePlugin customPlugin = FilterPredicateRegistry.get(functionOperator);
8989
if (customPlugin != null) {
@@ -96,7 +96,6 @@ private static Expression updateFunctionExpression(Expression expression) {
9696
expression = convertPredicateToEqualsBooleanExpression(expression);
9797
return expression;
9898
} else {
99-
FilterKind filterKind = FilterKind.valueOf(function.getOperator());
10099
List<Expression> operands = function.getOperands();
101100
switch (filterKind) {
102101
case AND:
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.common.filter;
20+
21+
import java.util.List;
22+
import org.apache.pinot.common.request.Expression;
23+
import org.apache.pinot.common.request.context.ExpressionContext;
24+
import org.apache.pinot.common.request.context.predicate.EqPredicate;
25+
import org.apache.pinot.common.request.context.predicate.Predicate;
26+
import org.testng.Assert;
27+
import org.testng.annotations.AfterMethod;
28+
import org.testng.annotations.Test;
29+
30+
31+
public class FilterPredicateRegistryTest {
32+
33+
@AfterMethod
34+
public void tearDown() {
35+
FilterPredicateRegistry.clear();
36+
}
37+
38+
@Test
39+
public void testCanonicalizedLookup() {
40+
FilterPredicatePlugin plugin = new TestFilterPredicatePlugin("LIKE_ANY");
41+
42+
FilterPredicateRegistry.register(plugin);
43+
44+
Assert.assertSame(FilterPredicateRegistry.get("LIKE_ANY"), plugin);
45+
Assert.assertSame(FilterPredicateRegistry.get("like_any"), plugin);
46+
Assert.assertSame(FilterPredicateRegistry.get("likeany"), plugin);
47+
}
48+
49+
@Test
50+
public void testRejectsBuiltInFilterName() {
51+
IllegalStateException exception = Assert.expectThrows(IllegalStateException.class,
52+
() -> FilterPredicateRegistry.register(new TestFilterPredicatePlugin("TEXT_MATCH")));
53+
54+
Assert.assertTrue(exception.getMessage().contains("collides with a built-in Pinot predicate or function"));
55+
}
56+
57+
@Test
58+
public void testRejectsBuiltInFunctionName() {
59+
IllegalStateException exception = Assert.expectThrows(IllegalStateException.class,
60+
() -> FilterPredicateRegistry.register(new TestFilterPredicatePlugin("startsWith")));
61+
62+
Assert.assertTrue(exception.getMessage().contains("collides with a built-in Pinot predicate or function"));
63+
}
64+
65+
private static final class TestFilterPredicatePlugin implements FilterPredicatePlugin {
66+
private final String _name;
67+
68+
private TestFilterPredicatePlugin(String name) {
69+
_name = name;
70+
}
71+
72+
@Override
73+
public String name() {
74+
return _name;
75+
}
76+
77+
@Override
78+
public void validateFilterExpression(List<Expression> operands) {
79+
}
80+
81+
@Override
82+
public List<OperandType> getOperandTypes() {
83+
return List.of(OperandType.STRING, OperandType.STRING);
84+
}
85+
86+
@Override
87+
public Predicate createPredicate(List<ExpressionContext> operands) {
88+
return new EqPredicate(ExpressionContext.forIdentifier("col"), "value");
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)