Add partition function expressions for chained partitioning#18165
Add partition function expressions for chained partitioning#18165xiangfu0 wants to merge 25 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds expression-mode partitioning (functionExpr) so Pinot can derive partitions and perform query-time pruning by evaluating a deterministic scalar-function pipeline against raw column values (supporting chained transforms like fnv1a_32(md5(id))), while preserving legacy functionName behavior.
Changes:
- Introduces
functionExprin table/segment partition configs and persists it through offline + realtime metadata paths. - Adds a partition-expression compiler that builds a typed
PartitionPipelineand aPartitionPipelineFunctionadapter. - Extends broker/server pruning to evaluate the same partition pipeline for EQ/IN predicates on the raw column (with fail-open behavior on evaluation errors) and adds/updates tests.
Reviewed changes
Copilot reviewed 46 out of 46 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommand.java | Uses functionExpr when computing partitions for backfill segment selection. |
| pinot-tools/src/test/java/org/apache/pinot/tools/admin/command/LaunchBackfillIngestionJobCommandTest.java | Adds coverage for segment partition matching with functionExpr. |
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ColumnPartitionConfig.java | Adds functionExpr mode with validation to enforce exclusive config mode. |
| pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentPartitionConfig.java | Exposes getFunctionExpr() for partition configs. |
| pinot-spi/src/test/java/org/apache/pinot/spi/config/table/IndexingConfigTest.java | Adds JSON round-trip and invalid-mode tests for functionExpr. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/V1Constants.java | Adds segment metadata key for partitionFunctionExpr. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunction.java | Extends partition function interface to optionally expose getFunctionExpr(). |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/PartitionFunctionFactory.java | Adds overloads to create partition functions from configs/metadata, including expression mode. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/metadata/ColumnPartitionMetadata.java | Persists/loads functionExpr in partition metadata JSON. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/index/metadata/ColumnMetadataImpl.java | Reads partitionFunctionExpr from segment properties and constructs the correct partition function. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompiler.java | Implements compilation of restricted scalar-function expressions into a typed pipeline. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionPipelineFunction.java | Adapts compiled pipelines to the PartitionFunction interface. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionPipeline.java | Defines the immutable compiled pipeline and evaluation logic. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionStep.java | Defines a typed pipeline step contract and runtime validation. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionValue.java | Adds typed runtime value wrapper for pipeline evaluation. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionValueType.java | Defines supported type system for the partition pipeline compiler. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionIntNormalizer.java | Adds normalization strategies to match legacy partition-id semantics. |
| pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/partition/pipeline/PartitionFunctionExprCompilerTest.java | Adds compiler coverage (canonicalization, literals, determinism, thread-safety, legacy normalization). |
| pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/function/scalar/PartitionFunctionExprTestFunctions.java | Provides test-only scalar functions to exercise the compiler in pinot-segment-spi. |
| pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/function/scalar/PartitionFunctionExprRacyTestFunctions.java | Provides a non-static racy scalar function to validate thread-local targets. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java | Validates partition configs by constructing the partition function (including expression mode). |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java | Adds validation tests for functionExpr, including literal args and invalid output type. |
| pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/StatsCollectorConfig.java | Exposes getPartitionFunctionExpr() to segment creation stats collection. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/stats/AbstractColumnStatisticsCollector.java | Creates partition functions using either functionName or functionExpr. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/creator/impl/BaseSegmentCreator.java | Persists partitionFunctionExpr into segment metadata properties when present. |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/writer/StatelessRealtimeSegmentWriter.java | Builds realtime partition function from ColumnPartitionConfig (supports expr mode). |
| pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java | Preserves functionExpr when reporting segment partition config. |
| pinot-segment-local/src/test/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverterTest.java | Adds coverage ensuring expression partition metadata is preserved across realtime conversion. |
| pinot-core/src/main/java/org/apache/pinot/core/segment/processing/partitioner/TableConfigPartitioner.java | Uses factory method that supports expression-mode partition functions. |
| pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java | Uses fail-open partition evaluation for EQ/IN pruning and supports expr-based partition functions. |
| pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java | Adds pruning coverage for functionExpr and fail-open behavior when evaluation throws. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java | Builds realtime partition function from config with expression support. |
| pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java | Detects staleness when partition function expression changes (in addition to name/partition count). |
| pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java | Persists expression partition metadata into ZK using the effective partition function. |
| pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadataUtils.java | Writes partition metadata including functionExpr into ZK metadata. |
| pinot-common/src/main/java/org/apache/pinot/common/function/scalar/InternalFunctions.java | Marks request-context dependent scalar functions as non-deterministic. |
| pinot-common/src/main/java/org/apache/pinot/common/function/scalar/DateTimeFunctions.java | Marks time-dependent scalar functions as non-deterministic. |
| pinot-common/src/main/java/org/apache/pinot/common/function/scalar/HashFunctions.java | Adds hash aliases (murmur2, murmur3_32, fnv1*_xx, md5_raw) for partition-expression compatibility. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionUtils.java | Treats invalid expression metadata as unprunable by catching construction exceptions. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java | Tracks partition function expression for routing metadata validation. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/manager/BaseBrokerRoutingManager.java | Initializes partition metadata manager with effective function name + expression. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SinglePartitionColumnSegmentPruner.java | Adds fail-open behavior for partition evaluation errors. |
| pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/MultiPartitionColumnsSegmentPruner.java | Adds fail-open behavior for partition evaluation errors. |
| pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/PartitionFunctionExprSegmentPrunerTest.java | Adds broker pruning tests for expression-mode partitions and fail-open on bad literals/metadata. |
| pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java | Extends existing routing/pruning tests to cover functionExpr. |
| pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManagerTest.java | Adds metadata-manager coverage for expression-mode partition metadata handling. |
645dc07 to
f230f4d
Compare
9b026ad to
ffdb486
Compare
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #18165 +/- ##
=============================================
- Coverage 63.53% 34.90% -28.63%
+ Complexity 1709 849 -860
=============================================
Files 3250 3260 +10
Lines 198923 199563 +640
Branches 30823 30970 +147
=============================================
- Hits 126384 69662 -56722
- Misses 62472 123796 +61324
+ Partials 10067 6105 -3962
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
ffdb486 to
2bc240a
Compare
d5aef71 to
4c371b0
Compare
4c371b0 to
3620543
Compare
17b4482 to
b54f745
Compare
…y backed by InbuiltFunctionEvaluator Addresses code review feedback on PR apache#18165: - Replace the 440-line CommonPartitionScalarFunctionResolver (custom overload resolution, typed PartitionValue/PartitionStep/PartitionValueConversions) with a thin PartitionEvaluatorFactory SPI in pinot-segment-spi implemented in pinot-common by PartitionFunctionEvaluator, which reuses Pinot's FunctionRegistry for all scalar-function resolution. - PartitionFunctionEvaluator extends ExecutableFunctionEvaluator and overrides String→byte[] conversion to use UTF-8 encoding (vs hex-decoding in InbuiltFunctionEvaluator) for correct ingestion semantics. - PartitionPipeline simplified: no longer extends ExecutableFunctionEvaluator; holds FunctionEvaluator. - Delete PartitionScalarFunctionResolver, PartitionValue, PartitionValueConversions, PartitionStep. - Move expression-mode partition tests requiring pinot-common to pinot-common (can't run in pinot-segment-spi where pinot-common is not a dependency). - Fix ColumnPartitionMetadata.hashCode() to include _partitions via Objects.hash() uniformly. - Add Javadoc to PartitionIntNormalizer enum constants clarifying pre- vs post-modulo semantics. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1f62e08 to
62e026f
Compare
…y backed by InbuiltFunctionEvaluator Addresses code review feedback on PR apache#18165: - Replace the 440-line CommonPartitionScalarFunctionResolver (custom overload resolution, typed PartitionValue/PartitionStep/PartitionValueConversions) with a thin PartitionEvaluatorFactory SPI in pinot-segment-spi implemented in pinot-common by PartitionFunctionEvaluator, which reuses Pinot's FunctionRegistry for all scalar-function resolution. - PartitionFunctionEvaluator extends ExecutableFunctionEvaluator and overrides String→byte[] conversion to use UTF-8 encoding (vs hex-decoding in InbuiltFunctionEvaluator) for correct ingestion semantics. - PartitionPipeline simplified: no longer extends ExecutableFunctionEvaluator; holds FunctionEvaluator. - Delete PartitionScalarFunctionResolver, PartitionValue, PartitionValueConversions, PartitionStep. - Move expression-mode partition tests requiring pinot-common to pinot-common (can't run in pinot-segment-spi where pinot-common is not a dependency). - Fix ColumnPartitionMetadata.hashCode() to include _partitions via Objects.hash() uniformly. - Add Javadoc to PartitionIntNormalizer enum constants clarifying pre- vs post-modulo semantics. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
62e026f to
56e95a0
Compare
Introduces expression-mode partition functions that allow composing scalar functions in a pipeline (e.g. fnv1a_32(md5(col))) for more flexible partition schemes, including full BYTES-column support. Key changes: - New PartitionFunctionExprCompiler that compiles a restricted expression syntax into a typed PartitionPipeline backed by deterministic scalar functions - PartitionPipelineFunction adapter implementing PartitionFunction / FunctionEvaluator for both ingestion and broker routing - ColumnPartitionConfig.functionExpr field (expression-mode) alongside existing functionName field (name-mode); getFunctionName() is now @nullable - PartitionIntNormalizer enum (POSITIVE_MODULO / ABS / MASK) replaces ad-hoc modulo logic; normalizer name persisted in config and segment metadata - ColumnPartitionMetadata extended with functionExpr, partitionIdNormalizer, and inputType (BYTES only) fields; "FunctionExpr" sentinel written for functionName so pre-upgrade brokers degrade gracefully (no pruning) instead of NPE-ing - BYTES-column support: pipelines compiled with PartitionValueType.BYTES pass raw byte arrays through scalar functions; BytesColumnPreIndexStatsCollector and StatsCollectorConfig detect the column type and use the correct pipeline - Schema-aware factory helpers (PartitionFunctionFactory, PartitionerFactory, TableConfigPartitioner) so minion merge tasks and stale-segment detection use the correct input type for BYTES expression-mode columns - Broker routing extended to prune segments using expression-mode partition functions; SegmentPartitionMetadataManager and segment pruners handle both modes - Non-deterministic scalar functions (now/ago/sleep) blocked from partition expressions via isAllowedForPartitioning(); isDeterministic flag unchanged so compile-time constant-folding in query parsing continues to work - CommonPartitionScalarFunctionResolver wired via ServiceLoader; resolves scalar functions from FunctionRegistry for use in partition expression compilation - JSON round-trip test for ColumnPartitionMetadata expression-mode serialization Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…y backed by InbuiltFunctionEvaluator Addresses code review feedback on PR apache#18165: - Replace the 440-line CommonPartitionScalarFunctionResolver (custom overload resolution, typed PartitionValue/PartitionStep/PartitionValueConversions) with a thin PartitionEvaluatorFactory SPI in pinot-segment-spi implemented in pinot-common by PartitionFunctionEvaluator, which reuses Pinot's FunctionRegistry for all scalar-function resolution. - PartitionFunctionEvaluator extends ExecutableFunctionEvaluator and overrides String→byte[] conversion to use UTF-8 encoding (vs hex-decoding in InbuiltFunctionEvaluator) for correct ingestion semantics. - PartitionPipeline simplified: no longer extends ExecutableFunctionEvaluator; holds FunctionEvaluator. - Delete PartitionScalarFunctionResolver, PartitionValue, PartitionValueConversions, PartitionStep. - Move expression-mode partition tests requiring pinot-common to pinot-common (can't run in pinot-segment-spi where pinot-common is not a dependency). - Fix ColumnPartitionMetadata.hashCode() to include _partitions via Objects.hash() uniformly. - Add Javadoc to PartitionIntNormalizer enum constants clarifying pre- vs post-modulo semantics. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…, fix Javadoc refs - Declare PartitionFunctionEvaluator final (thread-safety contract) - Make PartitionFunctionExecutionNode fields private final - Cache getArguments() result in PartitionPipeline and delegate from PartitionPipelineFunction to avoid singletonList allocation on every call - Fix stale Javadoc references in PartitionEvaluatorFactory and PartitionFunctionExprCompiler (was: InbuiltFunctionEvaluator, now: PartitionFunctionEvaluator) - Add assertEquals(roundTripped, metadata) to ColumnPartitionMetadataTest round-trip tests to cover the hashCode() fix - Delete PartitionFunctionExprRacyTestFunctions (dead code — the test using it was removed) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…tionUtils delegate to it Addresses review feedback: canonicalize() applies to all function types, not just scalar functions, so it belongs in a general-purpose FunctionNameUtils class rather than the scalar-function-specific ScalarFunctionUtils. Both ScalarFunctionUtils.canonicalize() and FunctionUtils.canonicalize() are kept as delegates for backward compatibility. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…partition results; fix null-vs-empty functionConfig comparison
- RealtimeSegmentDataManager and StatelessRealtimeSegmentWriter now pass the FieldSpec to
getPartitionFunction so BYTES-typed partition columns compile with PartitionValueType.BYTES,
matching the offline path (previously always compiled with STRING input)
- PartitionPipelineFunction.normalizeResult() now explicitly rejects Float/Double results
instead of silently truncating them to int (fail-fast: misconfigured expressions surface
at evaluation time rather than producing wrong partition IDs)
- SegmentPartitionMetadataManager.functionConfigsMatch() treats null and empty map as
equal to avoid false mismatches between table config (null) and deserialized metadata ({})
- Document the BYTES caveat on PartitionFunctionFactory.getPartitionFunction(String, ColumnPartitionConfig, int)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
PinotLLCRealtimeSegmentManager.buildPartitionMetadataForNewSegment was calling the (String, ColumnPartitionConfig, int) overload which always compiles expression-mode partition functions with STRING input, causing BYTES-typed partition columns on the controller to produce partition IDs inconsistent with the server-side computation. Adds getPartitionFunction(String, ColumnPartitionConfig, int, FieldSpec) to PartitionFunctionFactory so callers can supply both an explicit numPartitions and the column FieldSpec for correct BYTES handling. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… routing miss - PartitionPipelineFunction.normalizeResult: return NULL_RESULT_PARTITION_ID (-1) instead of throwing when the expression evaluates to null for a given value (e.g. null column input). Prevents segment creation from crashing on rows where the partition expression yields null. - AbstractColumnStatisticsCollector.updatePartition: skip adding the sentinel -1 to the partition set so null-expression results don't corrupt segment partition metadata. - SegmentPartitionMetadataManager.isMatchingPartitionFunction: use equalsIgnoreCaseNullable for functionExpr comparison (consistent with partitionIdNormalizer). Segments written with a mixed-case expression by an older node or a direct ZK write were previously excluded from partition-aware routing, silently degrading to full-scan. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The rebase onto upstream/master preserved references to the `fieldType` variable in the reorganized datetime/complex-field blocks but lost the local declaration. Re-declare it from fieldSpec.getFieldType() so pinot-segment-local compiles again. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Pinot scalar functions commonly box integral arithmetic to Double
(e.g. plus(long, long) → Double), so an expression like
plus(intDiv(col, 1000), 7) may legitimately yield 61.0 (Double) for
input that should partition to id 61. The previous check rejected any
Float/Double outright, breaking valid expressions.
Relax normalizeResult to accept Float/Double iff the value is finite
and has no fractional part (NaN/Infinity/non-integral values still
rejected since they cannot map to a stable partition id).
Fixes CI failure:
PartitionFunctionExprCommonScalarFunctionTest
.testClassBasedScalarFunctionSupportsRawStringNumericInput
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ig validation
The test testRejectInvalidSegmentPartitionFunctionExpr expects
TableConfigUtils.validate to reject configs whose partition expression
yields a non-numeric type (e.g. md5(col) returns STRING). The check was
missing — validation only built the partition function but never
probed its output type, so misconfigured tables would pass validation
and fail later at segment creation.
Add PartitionPipelineFunction.validateOutputType() which probes the
compiled expression with a sample value and throws IllegalArgumentException
("Partition pipeline must produce INT or LONG output, got: <TYPE>") when
the result is non-numeric. Wire it into TableConfigUtils.validate after
the existing partition-function build call. Probe failures (RuntimeException
during sample evaluation) skip output validation and let runtime ingestion
surface the error.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Critical correctness fixes:
- BaseSegmentCreator: always write PARTITION_FUNCTION key (with sentinel
"FunctionExpr" for expression-mode) so old servers reading new segments
fail fast with IllegalArgumentException("No enum constant for: FunctionExpr")
instead of silently dropping partition pruning. Removes the misleading
"v7 format" comment (no version bump exists in this PR).
- PartitionPipelineFunction.normalizeResult: reject Float/Double whose
absolute value exceeds 2^53. Past this threshold every double is
trivially "integral" because the mantissa cannot represent the
fractional bit, so multiple distinct longs collapse to the same double
and silently produce identical partition ids. Caller must cast to LONG
explicitly in the expression to opt into a wider range.
- PartitionPipelineFunction.validateOutputType: probe with multiple
sample values matching the column's stored type. If every probe
throws, fail validation with a clear message rather than silently
passing — previously a misconfigured pipeline (e.g. one rejecting "1"
as input) would skip output-type validation entirely.
- PartitionPipelineFunction.evaluate(GenericRow|Object[]): when the
expression chain produces null mid-evaluation (NULL_RESULT_PARTITION_ID
sentinel from getPartition), return null on the FunctionEvaluator
surface so ingestion callers using Integer-boxing semantics treat it
as "no partition" instead of literal partition id -1.
- BaseTableDataManager.isPartitionStale: null-tolerant case-insensitive
compare for partitionIdNormalizer. Built-in partition functions on the
new side return non-null normalizer names while pre-existing custom
plugins inherit the SPI default of null. Without this fix, plugin
segments would be flagged perpetually stale and re-downloaded on
every reload.
Major fixes:
- ColumnPartitionMetadata.normalizeOptionalText: drop the "null".equals
branch. The dual guard masked a real risk that any caller serializing
the literal Java string "null" silently null-coerces; rely on the
write side to never produce "null" (already true via JsonUtils).
- PartitionFunctionExprCompiler.canonicalize: pre-compile the three
whitespace-stripping patterns once at class init instead of
recompiling on every call.
- PartitionFunctionFactory.getPartitionFunction(String, ColumnPartitionConfig, int):
marked @deprecated since the Javadoc already warns "CAUTION" about
always compiling expressions with STRING input — callers should use
the FieldSpec-aware overload.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- testIntegralValuedDoubleAcceptedAsPartitionResult: regression for 9bcd3ee. plus(intDiv(54321,1000), 7) returns 61.0 (Double); the pipeline must accept this since Pinot scalar functions box integral arithmetic to Double. - testDoubleBeyondMantissaPrecisionIsRejected: regression for the precision-loss critical fix. Doubles with |x| > 2^53 cannot represent successive integers uniquely; reject to avoid silent partition-id collapse. - testNullMidChainReturnsNullOnFunctionEvaluatorSurface: regression for 28546f1 + the null-divergence fix. evaluate(GenericRow) must return null when the column value is null or the chain produces null mid-evaluation, so ingestion treats it as "no partition" rather than the literal -1 sentinel. - testValidateOutputTypeRejectsStringExpressionResult: regression for 9c031aa. md5(col) returns STRING — validateOutputType must reject it at config-validation time with a clear "INT or LONG" error. - testPartitionMetadataManagerProcessingWithMixedCaseFunctionExpr: regression for 28546f1. Broker config-side functionExpr is canonicalized (lowercased) but segments may be written with mixed-case expressions; the routing match must be case-insensitive so such segments are still partition-pruned instead of falling back to scatter-gather. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Critical fixes: - PartitionPipelineFunction.normalizeResult: tighten precision check to strict |x| < 2^53. The previous inclusive bound admitted 2^53 itself, but the next representable integer above 2^53 is 2^53+2 — meaning 2^53+1 silently rounds to 2^53.0 and would collide with 2^53 onto the same partition id. Add a boundary regression test asserting 2^53 itself is rejected. - InbuiltFunctionEvaluator.FunctionExecutionNode: replace shared Object[] _arguments scratch array with ThreadLocal<Object[]> so the parent ExecutableFunctionEvaluator's documented thread-safety contract holds for both subclasses (InbuiltFunctionEvaluator and PartitionFunctionEvaluator). Update class-level Javadoc accordingly. Major fixes: - PartitionPipelineFunction.validateOutputType: include null in the probe set so validation also exercises the null-input path that ingestion already handles. - PartitionFunctionExprCompiler.canonicalize: collapse runs of internal whitespace to a single space so segment-vs-config comparisons of canonicalized expressions agree on whitespace. - V1Constants: add PARTITION_FUNCTION_EXPR_SENTINEL constant documenting that "FunctionExpr" is reserved as the cross-version fail-fast value written into segment metadata for expression-mode segments. Must stay in sync with PartitionPipelineFunction.NAME. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously, every segment loaded by a broker/server invoked PartitionFunctionExprCompiler.compile() which parses the canonical expression through Calcite once per segment. With thousands of segments sharing the same partition expression, this is thousands of redundant Calcite parses at table-init/refresh time. Add a process-wide ConcurrentMap<PipelineCacheKey, PartitionPipeline> keyed on (rawColumn, isBytesInput, canonicalExpr, normalizerName). PartitionPipeline is stateless except for per-thread scratch arrays in its inner ExecutableNode tree, so a cached instance is safe to share across multiple PartitionPipelineFunction wrappers — each wrapper applies its own numPartitions at normalization time. Cache size is bounded by unique (column, function spec) tuples, typically a handful per cluster. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Critical fixes:
- TableConfigPartitioner.computePartition: filter NULL_RESULT_PARTITION_ID
and null inputs to a designated NULL_PARTITION bucket name. Previously
null-mid-chain results yielded "-1" as a partition string, leaking a
stray partition-bucket -1 into minion segment-mapper output.
- PartitionFunctionExprCompiler: bound PIPELINE_CACHE with Guava
CacheBuilder maximumSize=10_000 to prevent unbounded growth on
long-lived processes with table-config churn. Switch from
ConcurrentHashMap.computeIfAbsent (which forbids re-entrant cache
modification) to Cache.get(key, Callable) which is re-entrant-safe.
Unwrap UncheckedExecutionException + ExecutionException to preserve
the original compilation error (e.g. invalid-expression
IllegalArgumentException).
- PartitionFunctionExprCompiler.EvaluatorFactoryHolder: relax the
exactly-1 ServiceLoader check. Multiple impls (common in shaded-jar
/ multi-module test setups) no longer JVM-fail; prefer
InbuiltPartitionEvaluatorFactory by name, otherwise pick the first.
- BaseSegmentCreator + ColumnMetadataImpl + V1Constants: persist
PARTITION_INPUT_TYPE in segment metadata for BYTES expression-mode
pipelines. Reader prefers the stored value, falling back to
schema-derived stored type for older segments.
- PartitionFunctionTest: add regression test asserting
PartitionFunctionType.fromString("FunctionExpr") throws so a future
enum addition with that name does not silently break the
cross-version fail-fast contract for old readers.
Major fixes:
- BaseTableDataManager.isPartitionStale: tighten the asymmetric null
tolerance for partitionIdNormalizer. Previously both-side null and
one-side null were both treated as "match", so a user explicitly
removing the normalizer would not flag staleness. Now segment-side
null is tolerated (legacy or plugin SPI default), but config-side
null with non-null segment-side correctly flags stale.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Critical fixes: - PartitionPipelineFunction.normalizeResult: dedicated BigInteger path reduces modulo numPartitions in BigInteger arithmetic before narrowing to int. Number.longValue() silently truncates to the low 64 bits when the magnitude exceeds Long, producing a wrap-around partition id. - SinglePartitionColumnSegmentPruner / MultiPartitionColumnsSegmentPruner: log fail-open partition-function failures at ERROR (not WARN) so the silent scatter-gather degradation surfaces in alerting rather than hiding indefinitely. The fail-open behavior itself is preserved (partition-function bugs must not drop user query results). Major fixes: - PartitionFunctionExprCompiler: add expireAfterAccess(1 hour) to PIPELINE_CACHE so dropped tables / edited expressions don't pin pipelines indefinitely without an explicit invalidation hook. - PartitionPipelineFunction.NAME: bind to V1Constants.MetadataKeys. Column.PARTITION_FUNCTION_EXPR_SENTINEL so the cross-version fail-fast contract for old readers cannot drift via duplicate string literals. - BaseTableDataManager.isPartitionStale: add input-type comparison for expression-mode pipelines. A field-spec edit can flip STRING ↔ BYTES without changing functionExpr or normalizer; the recompiled pipeline would otherwise produce different partition ids than the existing segment without flagging staleness. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Critical fixes: - ExecutableFunctionEvaluator.FunctionNode + InbuiltFunctionEvaluator .FunctionExecutionNode + PartitionFunctionEvaluator .PartitionFunctionExecutionNode: replace ThreadLocal<Object[]> with per-call Object[] allocation. Previously a single thread re-entering the same shared FunctionNode (e.g. via a UDF that recursively evaluates an expression) would overwrite the outer call's argument scratch and produce silent wrong results. Per-call allocation is small and short-lived; HotSpot's escape analysis routinely scalar- replaces such arrays so per-row cost is negligible. Update thread-safety Javadoc on all three classes. - PartitionFunctionFactory.getPartitionFunction(ColumnPartitionConfig) and getPartitionFunction(ColumnPartitionMetadata): mark @deprecated. These legacy 1-arg overloads now throw IllegalArgumentException on expression-mode configs/metadata; external plugin authors should migrate to the column-aware overloads. Major fixes: - SegmentPartitionMetadataManager: add a FieldSpec-aware constructor so expression-mode pipelines on BYTES partition columns are compiled with BYTES input. The FieldSpec-less constructor compiles with STRING input, silently disagreeing with ingestion partition ids on BYTES columns. - PartitionPipelineFunction: cache BigInteger.valueOf(_numPartitions) in the constructor to avoid per-row allocation for BigInteger- returning expressions. - PartitionFunctionExprCompiler.PIPELINE_CACHE: switch expireAfterAccess → expireAfterWrite so an edited partition expression on a hot table takes effect within the TTL (access-based expiry would never fire on a continuously-queried table). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Critical fix: - RealtimeSegmentDataManager + StatelessRealtimeSegmentWriter: thread the stream-derived numPartitions override through to the partition function builder. The previous 3-arg fieldSpec overload silently used config.getNumPartitions(), discarding the local override computed from the stream. This is a regression on the realtime ingestion path that affects both legacy name-mode and expression-mode partition functions on streams that have been resharded beyond the table-config partition count. Major fixes: - ColumnPartitionMetadata.ColumnPartitionMetadataDeserializer: guard against missing 'numPartitions' key with a descriptive IllegalArgumentException instead of letting a NullPointerException propagate up to the caller's catch handler with an opaque message. - PartitionPipelineFunction constructor: assert that the wrapped pipeline has a non-null IntNormalizer, surfacing the contract violation at construction time rather than crashing on the first row. - PartitionEvaluatorFactory Javadoc: correct the documented loader contract from "Exactly one implementation" to match the actual multi-impl-tolerant behavior. PartitionFunctionExprCompiler now logs the chosen factory at INFO when more than one is present so operators can detect classpath ambiguity. - PartitionPipelineFunction.evaluate(GenericRow): delegate directly to PartitionPipeline.evaluate(GenericRow), saving one Object[] allocation per row on the ingestion hot path. The pipeline already handles the column lookup, BYTES-vs-STRING dispatch, and null short-circuit; the wrapper only adds normalizeResult. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ness gap Major fixes: - ColumnPartitionMetadata.getInputType: bind @JsonProperty to "partitionInputType" so the ZK JSON field matches the segment metadata.properties key V1Constants.PARTITION_INPUT_TYPE. Update the deserializer's INPUT_TYPE_KEY accordingly. Eliminates the same-logical-field-different-name asymmetry that would silently fall back to STRING-input semantics for BYTES columns when the two paths exchange data. - SegmentPartitionConfig.getFunctionConfig(String): restore the @deprecated annotation that was accidentally dropped — the upstream-deprecated method must keep emitting deprecation warnings to existing callers/IDEs. - StatsCollectorConfig.getPartitionFunctionExpr(String): remove the @deprecated annotation. The method was newly added; per C1.15 only deprecate when a replacement exists. - BaseTableDataManager.isPartitionStale: detect "partition function removed" — segment carries a partitionFunction but the table config no longer has any partitionColumn (or removed this column). Flag stale so the segment is rebuilt without partition metadata, instead of silently retaining stale routing info until replaced. - PartitionFunction SPI Javadoc: document the expression-mode contract (non-null getFunctionExpr indicates expression mode; framework callers dispatch on that distinction). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…obes Major fixes: - PartitionFunctionExprCompiler.EvaluatorFactoryHolder: match InbuiltPartitionEvaluatorFactory by FQN instead of getSimpleName(). A user-supplied factory in a different package with the same simple name would otherwise silently shadow the real built-in. - PartitionPipelineFunction.validateOutputType: track non-null-probe success separately. Previously a pipeline that only succeeds on null inputs (e.g. case-when patterns guarding nulls) would pass validation via the null probe and then crash on the first real row. - PartitionPipelineFunction.normalizeResult: remove the per-row Preconditions.checkState(intNormalizer != null) check and the associated getter call. Cache _intNormalizer at construction time; the constructor already enforces non-null. Minor fixes: - BaseTableDataManager.isPartitionStale: drop the duplicate comma in the "num partitions changed" debug log format. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Major fix: - Add testPartitionIdNormalizerConstantsMatchEnumValues to lock the alignment between ColumnPartitionConfig.PARTITION_ID_NORMALIZER_* string constants (pinot-spi) and PartitionIntNormalizer enum values (pinot-segment-spi). The duplication exists because pinot-spi cannot reference the enum due to module dependency direction; without this test, adding a new normalizer to the enum without updating the validator string list would silently reject configs at runtime. Minor: - SegmentPartitionConfig.getFunctionConfig(String): add a TODO with the planned removal target so the @deprecated has an explicit sunset. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Critical fix:
- HashFunctions.java: when this branch added @ScalarFunction(names =
{"murmur2", ...}) on existing public byte[]-overload hash functions
(murmurHash2, fnv1Hash32, fnv1aHash32, fnv1Hash64, fnv1aHash64), the
explicit names array overrode the auto-derived canonical name —
silently breaking existing user SQL queries / ingestion transforms /
views that called these functions on byte[] inputs by their original
names. Add the legacy camelCase names alongside the new short aliases
so both resolve to the same method:
@ScalarFunction(names = {"murmur2", "murmurHash2"})
@ScalarFunction(names = {"fnv1_32", "fnv1Hash32"})
@ScalarFunction(names = {"fnv1a_32", "fnv1aHash32"})
@ScalarFunction(names = {"fnv1_64", "fnv1Hash64"})
@ScalarFunction(names = {"fnv1a_64", "fnv1aHash64"})
(murmur3Bit32Default is a new 1-arg method introduced by this branch
with no upstream auto-derived name to preserve, so its single
"murmur3_32" alias is fine.)
- HashFunctionsTest: add testLegacyAndNewHashFunctionAliasesBothResolve
asserting both the legacy and new alias resolve via FunctionRegistry
to the same method, locking the contract so a future name reshuffle
cannot silently regress backward compatibility.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Critical fix: - SegmentPartitionMetadataManager: restore the legacy (String tableNameWithType, String partitionColumn, String partitionFunctionName, int numPartitions) constructor that existed on upstream master. Removing it broke binary compatibility for external broker plugins / vendor forks that linked against this signature. The shim builds a synthetic ColumnPartitionConfig and delegates to the new constructor; expression-mode tables must use the ColumnPartitionConfig overloads. Major fixes: - ColumnPartitionConfig: canonicalize partitionIdNormalizer to uppercase at construction time so downstream comparisons (mix of case-sensitive wire format and case-insensitive older comparisons) cannot disagree on case alone. - PartitionFunctionFactory + ColumnPartitionMetadata: add explicit "TODO: remove after release 1.7.0" sunset markers to all four remaining @deprecated entries on the partition surfaces, matching the existing sunset on SegmentPartitionConfig.getFunctionConfig. Skipped findings (with rationale): - Broker pruner string-form predicate (CRIT in prior reviews): plumbing typed values through requires substantial SPI refactor. - ByteArrayPartitionFunction normalizer mapping (claimed MASK by reviewer): on closer inspection abs(hashCode) % n is the ABS pattern (not MASK), so the existing return PartitionIntNormalizer .ABS.name() is correct. - Sentinel rename to "__functionExpr__": the literal "FunctionExpr" is now permanent on-the-wire and changing it would break test-built segments. The drift regression test already guards against future PartitionFunctionType enum collisions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Major fix: - PartitionFunction.getPartition(String) Javadoc now explicitly states the return-value contract: implementations must return non-negative partition ids in [0, getNumPartitions()), and -1 is reserved as a framework-internal sentinel for "expression evaluated to null" (PartitionPipelineFunction.NULL_RESULT_PARTITION_ID). Custom plugin partition functions must not return -1 as a real partition id; internal callers (broker pruner, stats collector, segment processing partitioner) treat -1 as "skip / no partition". Skipped findings (with rationale): - Broker pruner fail-open metric: requires introducing a new BrokerMeter; defer to a follow-up that adds the meter alongside similar pruning failure observability. - End-to-end integration test: substantial test infrastructure work (CustomDataQueryClusterIntegrationTest subclass with controller + realtime ingestion + broker routing assertions); defer. - PR-scope split: process feedback that requires coordinated landing with maintainers; out of scope for code-only fixes. - Cross-version controller-side gate: substantial Helix integration; documented hazard in ColumnPartitionConfig Javadoc, defer. - ColumnBoundPartitionFunction.evaluate null handling: reviewer's claim that legacy partition functions produced a deterministic partition id for null is incorrect — legacy PartitionFunction.getPartition(String) would NPE on value.getBytes(UTF_8). The wrapper's null→null conversion is a graceful improvement. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Minor clarification: the reviewer flagged that legacy partition functions (HashCodePartitionFunction / ByteArrayPartitionFunction) use Kafka-style abs handling Integer.MIN_VALUE → 0, while PartitionIntNormalizer.ABS does strict mod-then-abs. These differ at the MIN_VALUE edge case. The string returned from getPartitionIdNormalizer is purely advertised for staleness/identity matching between config-side and segment-side function metadata, not used to drive runtime normalization for legacy functions (which have their own internal partition-id calculation). Expression-mode pipelines are the only code path where this value is authoritative (via PartitionIntNormalizer.fromConfigString). Document the contract on the SPI Javadoc so plugin authors and future maintainers don't mistake the string for a runtime hook. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
56e95a0 to
150494e
Compare
What changed
This adds partition function expressions so Pinot can compute segment partitions and query-time pruning partitions from the raw column through a deterministic scalar-function pipeline instead of a single hard-coded partition function.
Key changes:
functionExprwhile preserving existingfunctionNamebehaviorpartitionIdNormalizerfor expression mode, defaulting toPOSITIVE_MODULOpinot-segment-spiWhy
Pinot's partition model assumed one partition function per raw column. That breaks down for common production layouts where the upstream partition key is derived by chaining transforms such as
md5(id) -> fnv1a_32(...) -> partitionorlower(key) -> murmur2(...) -> partition. Derived ingestion columns do not preserve correct pruning semantics when queries still filter on the raw column.This PR keeps partitioning attached to the raw column and evaluates the same deterministic pipeline against query literals so pruning remains correct.
Example table config
A table can now configure partitioning on the raw column with
functionExprinsidesegmentPartitionConfig:{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "id": { "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128 } } } } }When
partitionIdNormalizeris omitted, Pinot usesPOSITIVE_MODULOby default.If an operator needs compatibility with an existing hash-to-partition mapping, expression mode can also configure the normalizer explicitly. For example,
ABSpreserves absolute-remainder semantics:{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "id": { "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128, "partitionIdNormalizer": "ABS" } } } } }Or
MASKpreserves sign-bit masking semantics:{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "id": { "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128, "partitionIdNormalizer": "MASK" } } } } }Another supported example is:
{ "indexingConfig": { "segmentPartitionConfig": { "columnPartitionMap": { "memberId": { "functionExpr": "murmur2(lower(memberId))", "numPartitions": 8 } } } } }Partition-id normalization
Expression mode now separates two concerns:
functionExprcomputes the deterministic scalar pipeline from the raw column to an integer hash candidatepartitionIdNormalizerconverts that integer into the final partition id in[0, numPartitions)Supported normalizers are:
POSITIVE_MODULO: default behavior for expression mode; uses modulo with negative remainders shifted back into[0, numPartitions)ABS: compatibility mode for layouts that expectabs(hash % numPartitions)semanticsMASK: compatibility mode for hash pipelines that expect sign-bit masking semanticsQuery behavior
Users still query the raw column. There is no query rewrite and no need to expose a derived ingestion column.
For example, with the config above users write:
or:
Pinot evaluates the same expression pipeline against the raw query literal(s) during pruning.
For the compatibility config:
{ "functionExpr": "fnv1a_32(md5(id))", "numPartitions": 128, "partitionIdNormalizer": "MASK" }The UUID
000016be-9d72-466c-9632-cfa680dc8fa3maps to partition104, so an equality predicate on that rawidprunes using partition104.Safety and correctness
This also fixes a few issues discovered during review:
Validation
Ran:
./mvnw spotless:apply -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw checkstyle:check -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw license:format -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw license:check -pl pinot-broker,pinot-common,pinot-controller,pinot-core,pinot-segment-local,pinot-segment-spi,pinot-spi./mvnw -pl pinot-broker -am -Dtest=PartitionFunctionExprSegmentPrunerTest,SegmentPartitionMetadataManagerTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-core -am -Dtest=BaseTableDataManagerNeedRefreshTest -Dsurefire.failIfNoSpecifiedTests=false test./mvnw -pl pinot-segment-local -Dtest=TableConfigUtilsTest,RealtimeSegmentConverterTest test./mvnw -pl pinot-segment-spi -Dtest=PartitionFunctionExprCompilerTest test./mvnw -pl pinot-spi -Dtest=IndexingConfigTest testNotable regression coverage includes:
POSITIVE_MODULOABSnormalization for negative-hash absolute-remainder compatibilityMASKnormalization forfnv1a_32(md5(id))with 128 partitions and UUID000016be-9d72-466c-9632-cfa680dc8fa3mapping to partition 104