Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7e88fdd
Add partition function expressions for chained partitioning
xiangfu0 Apr 19, 2026
aff37cc
Replace PartitionScalarFunctionResolver with PartitionEvaluatorFactor…
xiangfu0 Apr 19, 2026
5b75103
Polish code review findings: final class, private fields, cached args…
xiangfu0 Apr 19, 2026
7935708
Add FunctionNameUtils to pinot-spi; have ScalarFunctionUtils and Func…
xiangfu0 Apr 20, 2026
bd26f3b
Fix BYTES input type mismatch in realtime paths; reject Float/Double …
xiangfu0 Apr 20, 2026
d0644cf
Fix BYTES input type mismatch in controller segment partition metadata
xiangfu0 Apr 21, 2026
4bd5872
Fix null expression result crash on ingestion and case-sensitive expr…
xiangfu0 Apr 24, 2026
ae62939
Fix compile error: declare local fieldType in addColumnMetadataInfo
xiangfu0 Apr 24, 2026
de05a18
Accept integral-valued Float/Double partition expression results
xiangfu0 Apr 24, 2026
cc707b6
Validate expression-mode partition function output type at table-conf…
xiangfu0 Apr 25, 2026
eec0068
Address critical & major findings from code-review-orchestrator
xiangfu0 Apr 25, 2026
39ce0a8
Add regression tests for the bug-fix commits in this branch
xiangfu0 Apr 25, 2026
4961d4e
Tighten precision bound, thread-safety, and validation contracts
xiangfu0 Apr 25, 2026
6a2e25d
Cache compiled PartitionPipeline to avoid per-segment Calcite parse
xiangfu0 Apr 25, 2026
8e302e5
Address review-orchestrator critical/major findings
xiangfu0 Apr 25, 2026
1cc62c9
Address additional critical/major findings from code review
xiangfu0 Apr 25, 2026
910347a
Eliminate ThreadLocal re-entrancy hazard; tighten SPI deprecation; perf
xiangfu0 Apr 25, 2026
e1e8290
Fix realtime stream-numPartitions regression; harden eval contracts
xiangfu0 Apr 25, 2026
f4d918e
Harmonize inputType wire-format key; fix Deprecated discipline; stale…
xiangfu0 Apr 25, 2026
ab61799
Polish: cache normalizer, FQN match, validation strict on non-null pr…
xiangfu0 Apr 25, 2026
c56a353
Add drift test for partitionIdNormalizer constants; sunset note
xiangfu0 Apr 25, 2026
5b13315
Restore legacy SQL hash function names as aliases
xiangfu0 Apr 25, 2026
9386980
Restore broker constructor compat shim; canonicalize normalizer; sunset
xiangfu0 Apr 25, 2026
6b1fb32
Document reserved -1 sentinel contract on PartitionFunction.getPartition
xiangfu0 Apr 25, 2026
101cd92
Document identity-only contract of getPartitionIdNormalizer
xiangfu0 Apr 25, 2026
2945140
Drop unrelated rebase drift: restore AWS SDK 2.44.4 and PrometheusTem…
xiangfu0 May 8, 2026
f242146
Emit PARTITION_PRUNER_FAIL_OPEN metric on broker pruner exception
xiangfu0 May 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,7 @@ private void buildRoutingInternal(String tableNameWithType) {
tableNameWithType, partitionConfig.getKey());
partitionMetadataManager =
new SegmentPartitionMetadataManager(tableNameWithType, partitionConfig.getKey(),
partitionConfig.getValue().getFunctionName(), partitionConfig.getValue().getNumPartitions());
partitionConfig.getValue());
} else {
LOGGER.warn(
"Cannot enable SegmentPartitionMetadataManager for table: {} with multiple partition columns: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
Expand All @@ -39,6 +40,9 @@
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo;
import org.apache.pinot.core.routing.TablePartitionReplicatedServersInfo.PartitionInfo;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -61,7 +65,7 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi

// static content, if anything changes for the following. a rebuild of routing table is needed.
private final String _partitionColumn;
private final String _partitionFunctionName;
private final PartitionFunction _partitionFunction;
private final int _numPartitions;

// cache-able content, only follow changes if onlineSegments list (of ideal-state) is changed.
Expand All @@ -71,12 +75,33 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi
private transient TablePartitionInfo _tablePartitionInfo;
private transient TablePartitionReplicatedServersInfo _tablePartitionReplicatedServersInfo;

/// Backward-compat shim: the legacy (name-mode-only) constructor used by external broker plugins / vendor forks
/// that linked against the pre-expression-mode signature. Builds a synthetic [ColumnPartitionConfig] and
/// delegates to the new constructor. Expression-mode tables must use the [ColumnPartitionConfig] overloads.
/// TODO: remove after release 1.7.0 once external callers have migrated.
@Deprecated
public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn, String partitionFunctionName,
int numPartitions) {
this(tableNameWithType, partitionColumn,
new ColumnPartitionConfig(partitionFunctionName, numPartitions), null);
}

public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn,
ColumnPartitionConfig columnPartitionConfig) {
this(tableNameWithType, partitionColumn, columnPartitionConfig, null);
}

/// Preferred constructor: pass the partition column's [FieldSpec] so expression-mode pipelines on BYTES
/// columns are compiled with BYTES input. The `FieldSpec`-less overload above always compiles with STRING
/// input, which silently disagrees with ingestion partition ids on BYTES columns.
public SegmentPartitionMetadataManager(String tableNameWithType, String partitionColumn,
ColumnPartitionConfig columnPartitionConfig, @Nullable FieldSpec fieldSpec) {
_tableNameWithType = tableNameWithType;
_partitionColumn = partitionColumn;
_partitionFunctionName = partitionFunctionName;
_numPartitions = numPartitions;
_partitionFunction = fieldSpec != null
? PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionConfig, fieldSpec)
: PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionConfig);
_numPartitions = _partitionFunction.getNumPartitions();
}

@Override
Expand All @@ -103,7 +128,7 @@ private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
return INVALID_PARTITION_ID;
}
PartitionFunction partitionFunction = segmentPartitionInfo.getPartitionFunction();
if (!_partitionFunctionName.equalsIgnoreCase(partitionFunction.getName())) {
if (!isMatchingPartitionFunction(partitionFunction)) {
return INVALID_PARTITION_ID;
}
if (_numPartitions != partitionFunction.getNumPartitions()) {
Expand All @@ -116,6 +141,41 @@ private int getPartitionId(String segment, @Nullable ZNRecord znRecord) {
return partitions.iterator().next();
}

private String getPartitionFunctionDescription() {
String functionExpr = _partitionFunction.getFunctionExpr();
return functionExpr != null ? functionExpr : _partitionFunction.getName();
}

private boolean isMatchingPartitionFunction(PartitionFunction partitionFunction) {
Map<String, String> functionConfig = partitionFunction.getFunctionConfig();
if (functionConfig != null && !functionConfig.isEmpty()
&& !Objects.equals(_partitionFunction.getFunctionConfig(), functionConfig)) {
return false;
}

String partitionIdNormalizer = partitionFunction.getPartitionIdNormalizer();
if (partitionIdNormalizer != null && !partitionIdNormalizer.isEmpty()
&& !equalsIgnoreCase(_partitionFunction.getPartitionIdNormalizer(), partitionIdNormalizer)) {
return false;
}

String functionName = partitionFunction.getName();
if (functionName != null) {
String configuredFunctionName = _partitionFunction.getName();
return configuredFunctionName != null && configuredFunctionName.equalsIgnoreCase(functionName);
}

String functionExpr = partitionFunction.getFunctionExpr();
if (functionExpr != null) {
return Objects.equals(_partitionFunction.getFunctionExpr(), functionExpr);
}
return false;
}

private static boolean equalsIgnoreCase(@Nullable String a, String b) {
return a != null && a.equalsIgnoreCase(b);
}

private static long getCreationTimeMs(@Nullable ZNRecord znRecord) {
if (znRecord == null) {
return INVALID_CREATION_TIME_MS;
Expand Down Expand Up @@ -306,7 +366,7 @@ private void computeTablePartitionReplicatedServersInfo() {
}
}
_tablePartitionReplicatedServersInfo =
new TablePartitionReplicatedServersInfo(_tableNameWithType, _partitionColumn, _partitionFunctionName,
new TablePartitionReplicatedServersInfo(_tableNameWithType, _partitionColumn, getPartitionFunctionDescription(),
_numPartitions, partitionInfoMap, segmentsWithInvalidPartition);
}

Expand Down Expand Up @@ -337,7 +397,7 @@ private void computeTablePartitionInfo() {
_tableNameWithType);
}
_tablePartitionInfo =
new TablePartitionInfo(_tableNameWithType, _partitionColumn, _partitionFunctionName, _numPartitions,
new TablePartitionInfo(_tableNameWithType, _partitionColumn, getPartitionFunctionDescription(), _numPartitions,
segmentsByPartition, segmentsWithInvalidPartition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.annotation.Nullable;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -79,9 +80,15 @@ public static SegmentPartitionInfo extractPartitionInfo(String tableNameWithType
return INVALID_PARTITION_INFO;
}

return new SegmentPartitionInfo(partitionColumn,
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata),
columnPartitionMetadata.getPartitions());
try {
PartitionFunction partitionFunction =
PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionMetadata);
return new SegmentPartitionInfo(partitionColumn, partitionFunction, columnPartitionMetadata.getPartitions());
} catch (Exception e) {
LOGGER.warn("Caught exception while constructing partition function for column: {}, segment: {}, table: {}",
partitionColumn, segment, tableNameWithType, e);
return INVALID_PARTITION_INFO;
}
}

/**
Expand Down Expand Up @@ -123,10 +130,16 @@ public static Map<String, SegmentPartitionInfo> extractPartitionInfoMap(String t
segment, tableNameWithType);
continue;
}
SegmentPartitionInfo segmentPartitionInfo = new SegmentPartitionInfo(partitionColumn,
PartitionFunctionFactory.getPartitionFunction(columnPartitionMetadata),
columnPartitionMetadata.getPartitions());
columnSegmentPartitionInfoMap.put(partitionColumn, segmentPartitionInfo);
try {
PartitionFunction partitionFunction =
PartitionFunctionFactory.getPartitionFunction(partitionColumn, columnPartitionMetadata);
SegmentPartitionInfo segmentPartitionInfo =
new SegmentPartitionInfo(partitionColumn, partitionFunction, columnPartitionMetadata.getPartitions());
columnSegmentPartitionInfoMap.put(partitionColumn, segmentPartitionInfo);
} catch (Exception e) {
LOGGER.warn("Caught exception while constructing partition function for column: {}, segment: {}, table: {}",
partitionColumn, segment, tableNameWithType, e);
}
}
if (columnSegmentPartitionInfoMap.size() == 1) {
String partitionColumn = columnSegmentPartitionInfoMap.keySet().iterator().next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,25 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.routing.segmentpartition.SegmentPartitionInfo;
import org.apache.pinot.broker.routing.segmentpartition.SegmentPartitionUtils;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.Identifier;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.sql.FilterKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* The {@code MultiPartitionColumnsSegmentPruner} prunes segments based on their partition metadata stored in ZK. The
* pruner supports queries with filter (or nested filter) of EQUALITY and IN predicates.
*/
public class MultiPartitionColumnsSegmentPruner implements SegmentPruner {
private static final Logger LOGGER = LoggerFactory.getLogger(MultiPartitionColumnsSegmentPruner.class);

private final String _tableNameWithType;
private final Set<String> _partitionColumns;
private final Map<String, Map<String, SegmentPartitionInfo>> _segmentColumnPartitionInfoMap =
Expand Down Expand Up @@ -140,8 +146,8 @@ private boolean isPartitionMatch(Expression filterExpression,
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null) {
SegmentPartitionInfo partitionInfo = columnPartitionInfoMap.get(identifier.getName());
return partitionInfo == null || partitionInfo.getPartitions().contains(
partitionInfo.getPartitionFunction().getPartition(RequestContextUtils.getStringValue(operands.get(1))));
return partitionInfo == null || isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(
operands.get(1)));
} else {
return true;
}
Expand All @@ -155,8 +161,7 @@ private boolean isPartitionMatch(Expression filterExpression,
}
int numOperands = operands.size();
for (int i = 1; i < numOperands; i++) {
if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
.getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
if (isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(i)))) {
return true;
}
}
Expand All @@ -169,4 +174,21 @@ private boolean isPartitionMatch(Expression filterExpression,
return true;
}
}

private boolean isPartitionMatch(SegmentPartitionInfo partitionInfo, String value) {
try {
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction().getPartition(value));
} catch (RuntimeException e) {
BrokerMetrics brokerMetrics = BrokerMetrics.get();
if (brokerMetrics != null) {
brokerMetrics.addMeteredTableValue(_tableNameWithType, BrokerMeter.PARTITION_PRUNER_FAIL_OPEN, 1);
}
// Fail-open: a buggy partition function/expression must not drop user query results. Log at ERROR (not WARN)
// so this surfaces in alerting — silent fail-open hides table-config bugs that should be fixed.
LOGGER.error("Failed to evaluate partition function for table: {}, partition column: {}; falling back to "
+ "scatter-gather (no pruning) for this query. Fix the partition expression to avoid query-time fail-open.",
_tableNameWithType, partitionInfo.getPartitionColumn(), e);
return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,25 @@
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.broker.routing.segmentpartition.SegmentPartitionInfo;
import org.apache.pinot.broker.routing.segmentpartition.SegmentPartitionUtils;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.Identifier;
import org.apache.pinot.common.request.context.RequestContextUtils;
import org.apache.pinot.sql.FilterKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* The {@code SinglePartitionColumnSegmentPruner} prunes segments based on their partition metadata stored in ZK. The
* pruner supports queries with filter (or nested filter) of EQUALITY and IN predicates.
*/
public class SinglePartitionColumnSegmentPruner implements SegmentPruner {
private static final Logger LOGGER = LoggerFactory.getLogger(SinglePartitionColumnSegmentPruner.class);

private final String _tableNameWithType;
private final String _partitionColumn;
private final Map<String, SegmentPartitionInfo> _partitionInfoMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -129,8 +135,7 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
case EQUALS: {
Identifier identifier = operands.get(0).getIdentifier();
if (identifier != null && identifier.getName().equals(_partitionColumn)) {
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
.getPartition(RequestContextUtils.getStringValue(operands.get(1))));
return isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(1)));
} else {
return true;
}
Expand All @@ -140,8 +145,7 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
if (identifier != null && identifier.getName().equals(_partitionColumn)) {
int numOperands = operands.size();
for (int i = 1; i < numOperands; i++) {
if (partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction()
.getPartition(RequestContextUtils.getStringValue(operands.get(i))))) {
if (isPartitionMatch(partitionInfo, RequestContextUtils.getStringValue(operands.get(i)))) {
return true;
}
}
Expand All @@ -154,4 +158,22 @@ private boolean isPartitionMatch(Expression filterExpression, SegmentPartitionIn
return true;
}
}

private boolean isPartitionMatch(SegmentPartitionInfo partitionInfo, String value) {
try {
return partitionInfo.getPartitions().contains(partitionInfo.getPartitionFunction().getPartition(value));
} catch (RuntimeException e) {
// Fail-open: a buggy partition function/expression must not drop user query results. Log at ERROR (not WARN)
// and emit a meter so this surfaces in alerting — silent fail-open hides table-config bugs that should be
// fixed.
LOGGER.error("Failed to evaluate partition function for table: {}, partition column: {}; falling back to "
+ "scatter-gather (no pruning) for this query. Fix the partition expression to avoid query-time fail-open.",
_tableNameWithType, _partitionColumn, e);
BrokerMetrics brokerMetrics = BrokerMetrics.get();
if (brokerMetrics != null) {
brokerMetrics.addMeteredTableValue(_tableNameWithType, BrokerMeter.PARTITION_PRUNER_FAIL_OPEN, 1);
}
return true;
}
}
}
Loading
Loading