Skip to content

Commit b6589b2

Browse files
xiangfu0claude
andcommitted
[feature] Add pluggable codec pipeline for RAW forward indexes
Replace the single compressionCodec with a codecPipeline field that supports an ordered list of transform and compression stages. Transforms (DELTA, DOUBLE_DELTA) are applied left-to-right on write and right-to-left on read, with at most one terminal compressor. Uses writer version 7 with a new header format that stores the full pipeline. Includes 53 tests covering unit, pipeline round-trip, and full end-to-end writer/reader validation including triple-delta stacking scenarios. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 36cdab4 commit b6589b2

48 files changed

Lines changed: 2878 additions & 84 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryIntegerCompression.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.Random;
2525
import java.util.concurrent.TimeUnit;
2626
import net.jpountz.lz4.LZ4Factory;
27-
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
27+
import org.apache.pinot.segment.local.io.codec.compression.ChunkCompressorFactory;
2828
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
2929
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
3030
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;

pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryLongCompression.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.Random;
2525
import java.util.concurrent.TimeUnit;
2626
import net.jpountz.lz4.LZ4Factory;
27-
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
27+
import org.apache.pinot.segment.local.io.codec.compression.ChunkCompressorFactory;
2828
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
2929
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
3030
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;

pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkNoDictionaryStringCompression.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import java.util.Random;
2525
import java.util.concurrent.TimeUnit;
2626
import org.apache.commons.lang3.RandomStringUtils;
27-
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
27+
import org.apache.pinot.segment.local.io.codec.compression.ChunkCompressorFactory;
2828
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
2929
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
3030
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.io.codec;
20+
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.util.List;
24+
import org.apache.pinot.segment.local.io.codec.compression.ChunkCompressorFactory;
25+
import org.apache.pinot.segment.local.io.codec.transform.ChunkTransformFactory;
26+
import org.apache.pinot.segment.spi.codec.ChunkCodec;
27+
import org.apache.pinot.segment.spi.codec.ChunkCodecPipeline;
28+
import org.apache.pinot.segment.spi.codec.ChunkTransform;
29+
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
30+
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
31+
32+
33+
/**
34+
* A {@link ChunkCompressor} that applies a pipeline of codec stages: first all
35+
* {@link ChunkCodec.CodecKind#TRANSFORM TRANSFORM} stages in order (left-to-right),
36+
* then the terminal {@link ChunkCodec.CodecKind#COMPRESSOR COMPRESSOR}.
37+
*
38+
* <p>This is the write-path counterpart of {@link PipelineChunkDecompressor}.</p>
39+
*/
40+
public class PipelineChunkCompressor implements ChunkCompressor {
41+
42+
private final ChunkCodecPipeline _pipeline;
43+
private final ChunkTransform[] _transforms;
44+
private final ChunkCompressor _terminalCompressor;
45+
private final int _valueSizeInBytes;
46+
47+
/**
48+
* Creates a pipeline compressor.
49+
*
50+
* @param pipeline the codec pipeline
51+
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
52+
*/
53+
public PipelineChunkCompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
54+
_pipeline = pipeline;
55+
_valueSizeInBytes = valueSizeInBytes;
56+
57+
List<ChunkCodec> transformStages = pipeline.getTransforms();
58+
_transforms = new ChunkTransform[transformStages.size()];
59+
for (int i = 0; i < transformStages.size(); i++) {
60+
_transforms[i] = ChunkTransformFactory.getTransform(transformStages.get(i));
61+
}
62+
63+
_terminalCompressor = ChunkCompressorFactory.getCompressor(
64+
pipeline.getChunkCompressionType());
65+
}
66+
67+
@Override
68+
public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
69+
throws IOException {
70+
// Apply transforms left-to-right (in-place on the input buffer)
71+
int numBytes = inUncompressed.remaining();
72+
for (ChunkTransform transform : _transforms) {
73+
transform.encode(inUncompressed, numBytes, _valueSizeInBytes);
74+
}
75+
76+
// Apply terminal compression
77+
return _terminalCompressor.compress(inUncompressed, outCompressed);
78+
}
79+
80+
@Override
81+
public int maxCompressedSize(int uncompressedSize) {
82+
// Transforms are in-place and don't change size; delegate to terminal compressor
83+
return _terminalCompressor.maxCompressedSize(uncompressedSize);
84+
}
85+
86+
@Override
87+
public ChunkCompressionType compressionType() {
88+
return _terminalCompressor.compressionType();
89+
}
90+
91+
@Override
92+
public void close()
93+
throws IOException {
94+
_terminalCompressor.close();
95+
}
96+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.io.codec;
20+
21+
import java.io.IOException;
22+
import java.nio.ByteBuffer;
23+
import java.util.List;
24+
import org.apache.pinot.segment.local.io.codec.compression.ChunkCompressorFactory;
25+
import org.apache.pinot.segment.local.io.codec.transform.ChunkTransformFactory;
26+
import org.apache.pinot.segment.spi.codec.ChunkCodec;
27+
import org.apache.pinot.segment.spi.codec.ChunkCodecPipeline;
28+
import org.apache.pinot.segment.spi.codec.ChunkTransform;
29+
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
30+
31+
32+
/**
33+
* A {@link ChunkDecompressor} that reverses a codec pipeline: first decompresses using the
34+
* terminal {@link ChunkCodec.CodecKind#COMPRESSOR COMPRESSOR}, then applies all
35+
* {@link ChunkCodec.CodecKind#TRANSFORM TRANSFORM} stages in reverse order (right-to-left).
36+
*
37+
* <p>This is the read-path counterpart of {@link PipelineChunkCompressor}.</p>
38+
*/
39+
public class PipelineChunkDecompressor implements ChunkDecompressor {
40+
41+
private final ChunkCodecPipeline _pipeline;
42+
private final ChunkTransform[] _transforms;
43+
private final ChunkDecompressor _terminalDecompressor;
44+
private final int _valueSizeInBytes;
45+
46+
/**
47+
* Creates a pipeline decompressor.
48+
*
49+
* @param pipeline the codec pipeline
50+
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
51+
*/
52+
public PipelineChunkDecompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
53+
_pipeline = pipeline;
54+
_valueSizeInBytes = valueSizeInBytes;
55+
56+
List<ChunkCodec> transformStages = pipeline.getTransforms();
57+
_transforms = new ChunkTransform[transformStages.size()];
58+
for (int i = 0; i < transformStages.size(); i++) {
59+
_transforms[i] = ChunkTransformFactory.getTransform(transformStages.get(i));
60+
}
61+
62+
_terminalDecompressor = ChunkCompressorFactory.getDecompressor(
63+
pipeline.getChunkCompressionType());
64+
}
65+
66+
@Override
67+
public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
68+
throws IOException {
69+
// Decompress using terminal decompressor.
70+
// Per Pinot convention, after this call the output buffer is flipped: position=0, limit=dataSize.
71+
int decompressedSize = _terminalDecompressor.decompress(compressedInput, decompressedOutput);
72+
73+
if (_transforms.length > 0) {
74+
// Buffer is already in read mode (flipped). Transforms operate from position=0.
75+
int numBytes = decompressedOutput.remaining();
76+
77+
// Apply transforms in reverse order (right-to-left)
78+
for (int i = _transforms.length - 1; i >= 0; i--) {
79+
_transforms[i].decode(decompressedOutput, numBytes, _valueSizeInBytes);
80+
}
81+
// Buffer remains flipped: position=0, limit=numBytes — ready for the caller to read.
82+
}
83+
84+
return decompressedSize;
85+
}
86+
87+
@Override
88+
public int decompressedLength(ByteBuffer compressedInput)
89+
throws IOException {
90+
return _terminalDecompressor.decompressedLength(compressedInput);
91+
}
92+
93+
@Override
94+
public void close()
95+
throws IOException {
96+
_terminalDecompressor.close();
97+
}
98+
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java renamed to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/compression/ChunkCompressorFactory.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,11 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pinot.segment.local.io.compression;
19+
package org.apache.pinot.segment.local.io.codec.compression;
2020

21+
import org.apache.pinot.segment.local.io.codec.PipelineChunkCompressor;
22+
import org.apache.pinot.segment.local.io.codec.PipelineChunkDecompressor;
23+
import org.apache.pinot.segment.spi.codec.ChunkCodecPipeline;
2124
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
2225
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
2326
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
@@ -118,4 +121,38 @@ public static ChunkDecompressor getDecompressor(ChunkCompressionType compression
118121
throw new IllegalArgumentException("Illegal decompressor name " + compressionType);
119122
}
120123
}
124+
125+
/**
126+
* Returns a compressor for a codec pipeline. If the pipeline has transforms, returns a
127+
* {@link PipelineChunkCompressor}; otherwise returns the plain compressor for the terminal codec.
128+
* Legacy compound codecs (DELTA_LZ4, DOUBLE_DELTA_LZ4) are handled by their existing compressor
129+
* implementations.
130+
*
131+
* @param pipeline the codec pipeline
132+
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
133+
* @return compressor for the pipeline
134+
*/
135+
public static ChunkCompressor getCompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
136+
if (pipeline.hasTransforms()) {
137+
return new PipelineChunkCompressor(pipeline, valueSizeInBytes);
138+
}
139+
return getCompressor(pipeline.getChunkCompressionType());
140+
}
141+
142+
/**
143+
* Returns a decompressor for a codec pipeline. If the pipeline has transforms, returns a
144+
* {@link PipelineChunkDecompressor}; otherwise returns the plain decompressor for the terminal codec.
145+
* Legacy compound codecs (DELTA_LZ4, DOUBLE_DELTA_LZ4) are handled by their existing decompressor
146+
* implementations.
147+
*
148+
* @param pipeline the codec pipeline
149+
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
150+
* @return decompressor for the pipeline
151+
*/
152+
public static ChunkDecompressor getDecompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
153+
if (pipeline.hasTransforms()) {
154+
return new PipelineChunkDecompressor(pipeline, valueSizeInBytes);
155+
}
156+
return getDecompressor(pipeline.getChunkCompressionType());
157+
}
121158
}

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaCompressor.java renamed to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/compression/DeltaCompressor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pinot.segment.local.io.compression;
19+
package org.apache.pinot.segment.local.io.codec.compression;
2020

2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDecompressor.java renamed to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/compression/DeltaDecompressor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pinot.segment.local.io.compression;
19+
package org.apache.pinot.segment.local.io.codec.compression;
2020

2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaCompressor.java renamed to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/compression/DeltaDeltaCompressor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pinot.segment.local.io.compression;
19+
package org.apache.pinot.segment.local.io.codec.compression;
2020

2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/DeltaDeltaDecompressor.java renamed to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/codec/compression/DeltaDeltaDecompressor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pinot.segment.local.io.compression;
19+
package org.apache.pinot.segment.local.io.codec.compression;
2020

2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;

0 commit comments

Comments
 (0)