Skip to content

Commit 90c650f

Browse files
xiangfu0claude
andcommitted
[feature] Add pluggable codec pipeline for RAW forward indexes
Replace the single compressionCodec with a codecPipeline field that supports an ordered list of transform and compression stages. Transforms (DELTA, DOUBLE_DELTA) are applied left-to-right on write and right-to-left on read, with at most one terminal compressor. Uses writer version 7 with a new header format that stores the full pipeline. Includes 53 tests covering unit, pipeline round-trip, and full end-to-end writer/reader validation including triple-delta stacking scenarios. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7960057 commit 90c650f

24 files changed

Lines changed: 2612 additions & 55 deletions

File tree

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.segment.local.io.compression;
2020

21+
import org.apache.pinot.segment.spi.compression.ChunkCodecPipeline;
2122
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
2223
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
2324
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
@@ -118,4 +119,38 @@ public static ChunkDecompressor getDecompressor(ChunkCompressionType compression
118119
throw new IllegalArgumentException("Illegal decompressor name " + compressionType);
119120
}
120121
}
122+
123+
/**
124+
* Returns a compressor for a codec pipeline. If the pipeline has transforms, returns a
125+
* {@link PipelineChunkCompressor}; otherwise returns the plain compressor for the terminal codec.
126+
* Legacy compound codecs (DELTA_LZ4, DOUBLE_DELTA_LZ4) are handled by their existing compressor
127+
* implementations.
128+
*
129+
* @param pipeline the codec pipeline
130+
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
131+
* @return compressor for the pipeline
132+
*/
133+
public static ChunkCompressor getCompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
134+
if (pipeline.hasTransforms()) {
135+
return new PipelineChunkCompressor(pipeline, valueSizeInBytes);
136+
}
137+
return getCompressor(pipeline.getChunkCompressionType());
138+
}
139+
140+
/**
141+
* Returns a decompressor for a codec pipeline. If the pipeline has transforms, returns a
142+
* {@link PipelineChunkDecompressor}; otherwise returns the plain decompressor for the terminal codec.
143+
* Legacy compound codecs (DELTA_LZ4, DOUBLE_DELTA_LZ4) are handled by their existing decompressor
144+
* implementations.
145+
*
146+
* @param pipeline the codec pipeline
147+
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
148+
* @return decompressor for the pipeline
149+
*/
150+
public static ChunkDecompressor getDecompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
151+
if (pipeline.hasTransforms()) {
152+
return new PipelineChunkDecompressor(pipeline, valueSizeInBytes);
153+
}
154+
return getDecompressor(pipeline.getChunkCompressionType());
155+
}
121156
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.io.compression;
20+
21+
import org.apache.pinot.segment.spi.compression.ChunkCodec;
22+
import org.apache.pinot.segment.spi.compression.ChunkTransform;
23+
24+
25+
/**
26+
* Factory for obtaining {@link ChunkTransform} instances by their {@link ChunkCodec} identifier.
27+
*/
28+
public class ChunkTransformFactory {
29+
30+
private ChunkTransformFactory() {
31+
}
32+
33+
/**
34+
* Returns the singleton {@link ChunkTransform} for the given transform codec.
35+
*
36+
* @param codec the transform codec (must be a {@link ChunkCodec.CodecKind#TRANSFORM})
37+
* @return the corresponding transform instance
38+
* @throws IllegalArgumentException if the codec is not a known transform
39+
*/
40+
public static ChunkTransform getTransform(ChunkCodec codec) {
41+
switch (codec) {
42+
case DELTA:
43+
return DeltaTransform.INSTANCE;
44+
case DOUBLE_DELTA:
45+
return DoubleDeltaTransform.INSTANCE;
46+
case XOR:
47+
return XorTransform.INSTANCE;
48+
default:
49+
throw new IllegalArgumentException("Unknown transform codec: " + codec);
50+
}
51+
}
52+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.io.compression;
20+
21+
import java.nio.ByteBuffer;
22+
import org.apache.pinot.segment.spi.compression.ChunkTransform;
23+
24+
25+
/**
26+
* Delta encoding transform for numeric values. Stores the first value as-is, then each
27+
* subsequent value as the difference from the previous value. Operates in-place on the
28+
* ByteBuffer.
29+
*
30+
* <p>Effective for monotonically increasing data (e.g., timestamps, counters).
31+
* Java two's complement arithmetic guarantees correct wrap-around for overflow.</p>
32+
*/
33+
public class DeltaTransform implements ChunkTransform {
34+
35+
public static final DeltaTransform INSTANCE = new DeltaTransform();
36+
37+
private DeltaTransform() {
38+
}
39+
40+
@Override
41+
public void encode(ByteBuffer buffer, int numBytes, int valueSizeInBytes) {
42+
if (valueSizeInBytes == Integer.BYTES) {
43+
encodeInts(buffer, numBytes);
44+
} else {
45+
encodeLongs(buffer, numBytes);
46+
}
47+
}
48+
49+
@Override
50+
public void decode(ByteBuffer buffer, int numBytes, int valueSizeInBytes) {
51+
if (valueSizeInBytes == Integer.BYTES) {
52+
decodeInts(buffer, numBytes);
53+
} else {
54+
decodeLongs(buffer, numBytes);
55+
}
56+
}
57+
58+
private void encodeInts(ByteBuffer buffer, int numBytes) {
59+
int numValues = numBytes / Integer.BYTES;
60+
if (numValues <= 1) {
61+
return;
62+
}
63+
int pos = buffer.position();
64+
// Encode backwards so each value can be read before being overwritten
65+
int prev = buffer.getInt(pos + (numValues - 2) * Integer.BYTES);
66+
for (int i = numValues - 1; i >= 1; i--) {
67+
int offset = pos + i * Integer.BYTES;
68+
int curr = buffer.getInt(offset);
69+
buffer.putInt(offset, curr - prev);
70+
if (i > 1) {
71+
prev = buffer.getInt(pos + (i - 2) * Integer.BYTES);
72+
}
73+
}
74+
}
75+
76+
private void encodeLongs(ByteBuffer buffer, int numBytes) {
77+
int numValues = numBytes / Long.BYTES;
78+
if (numValues <= 1) {
79+
return;
80+
}
81+
int pos = buffer.position();
82+
long prev = buffer.getLong(pos + (numValues - 2) * Long.BYTES);
83+
for (int i = numValues - 1; i >= 1; i--) {
84+
int offset = pos + i * Long.BYTES;
85+
long curr = buffer.getLong(offset);
86+
buffer.putLong(offset, curr - prev);
87+
if (i > 1) {
88+
prev = buffer.getLong(pos + (i - 2) * Long.BYTES);
89+
}
90+
}
91+
}
92+
93+
private void decodeInts(ByteBuffer buffer, int numBytes) {
94+
int numValues = numBytes / Integer.BYTES;
95+
if (numValues <= 1) {
96+
return;
97+
}
98+
int pos = buffer.position();
99+
for (int i = 1; i < numValues; i++) {
100+
int offset = pos + i * Integer.BYTES;
101+
int prevValue = buffer.getInt(pos + (i - 1) * Integer.BYTES);
102+
int delta = buffer.getInt(offset);
103+
buffer.putInt(offset, prevValue + delta);
104+
}
105+
}
106+
107+
private void decodeLongs(ByteBuffer buffer, int numBytes) {
108+
int numValues = numBytes / Long.BYTES;
109+
if (numValues <= 1) {
110+
return;
111+
}
112+
int pos = buffer.position();
113+
for (int i = 1; i < numValues; i++) {
114+
int offset = pos + i * Long.BYTES;
115+
long prevValue = buffer.getLong(pos + (i - 1) * Long.BYTES);
116+
long delta = buffer.getLong(offset);
117+
buffer.putLong(offset, prevValue + delta);
118+
}
119+
}
120+
}

0 commit comments

Comments
 (0)