Skip to content

Commit 624a538

Browse files
xiangfu0claude
andcommitted
[feature] Add pluggable codec pipeline for RAW forward indexes
Replace the single compressionCodec with a codecPipeline field that supports an ordered list of transform and compression stages. Transforms (DELTA, DOUBLE_DELTA) are applied left-to-right on write and right-to-left on read, with at most one terminal compressor. Uses writer version 7 with a new header format that stores the full pipeline. Includes 53 tests covering unit, pipeline round-trip, and full end-to-end writer/reader validation including triple-delta stacking scenarios. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 7960057 commit 624a538

20 files changed

Lines changed: 1993 additions & 39 deletions

File tree

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/compression/ChunkCompressorFactory.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pinot.segment.local.io.compression;
2020

21+
import org.apache.pinot.segment.spi.compression.ChunkCodecPipeline;
2122
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
2223
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
2324
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
@@ -118,4 +119,34 @@ public static ChunkDecompressor getDecompressor(ChunkCompressionType compression
118119
throw new IllegalArgumentException("Illegal decompressor name " + compressionType);
119120
}
120121
}
122+
123+
/**
124+
* Returns a compressor for a codec pipeline. If the pipeline has transforms, returns a
125+
* {@link PipelineChunkCompressor}; otherwise returns the plain compressor for the terminal codec.
126+
*
127+
* @param pipeline the codec pipeline
128+
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
129+
* @return compressor for the pipeline
130+
*/
131+
public static ChunkCompressor getCompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
132+
if (pipeline.hasTransforms()) {
133+
return new PipelineChunkCompressor(pipeline, valueSizeInBytes);
134+
}
135+
return getCompressor(pipeline.getChunkCompressionType());
136+
}
137+
138+
/**
139+
* Returns a decompressor for a codec pipeline. If the pipeline has transforms, returns a
140+
* {@link PipelineChunkDecompressor}; otherwise returns the plain decompressor for the terminal codec.
141+
*
142+
* @param pipeline the codec pipeline
143+
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
144+
* @return decompressor for the pipeline
145+
*/
146+
public static ChunkDecompressor getDecompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
147+
if (pipeline.hasTransforms()) {
148+
return new PipelineChunkDecompressor(pipeline, valueSizeInBytes);
149+
}
150+
return getDecompressor(pipeline.getChunkCompressionType());
151+
}
121152
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.io.compression;
20+
21+
import org.apache.pinot.segment.spi.compression.ChunkCodec;
22+
import org.apache.pinot.segment.spi.compression.ChunkTransform;
23+
24+
25+
/**
26+
* Factory for obtaining {@link ChunkTransform} instances by their {@link ChunkCodec} identifier.
27+
*/
28+
public class ChunkTransformFactory {
29+
30+
private ChunkTransformFactory() {
31+
}
32+
33+
/**
34+
* Returns the singleton {@link ChunkTransform} for the given transform codec.
35+
*
36+
* @param codec the transform codec (must be a {@link ChunkCodec.CodecKind#TRANSFORM})
37+
* @return the corresponding transform instance
38+
* @throws IllegalArgumentException if the codec is not a known transform
39+
*/
40+
public static ChunkTransform getTransform(ChunkCodec codec) {
41+
switch (codec) {
42+
case DELTA:
43+
return DeltaTransform.INSTANCE;
44+
case DOUBLE_DELTA:
45+
return DoubleDeltaTransform.INSTANCE;
46+
default:
47+
throw new IllegalArgumentException("Unknown transform codec: " + codec);
48+
}
49+
}
50+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.io.compression;
20+
21+
import java.nio.ByteBuffer;
22+
import org.apache.pinot.segment.spi.compression.ChunkTransform;
23+
24+
25+
/**
26+
* Delta encoding transform for numeric values. Stores the first value as-is, then each
27+
* subsequent value as the difference from the previous value. Operates in-place on the
28+
* ByteBuffer.
29+
*
30+
* <p>Effective for monotonically increasing data (e.g., timestamps, counters).
31+
* Java two's complement arithmetic guarantees correct wrap-around for overflow.</p>
32+
*/
33+
public class DeltaTransform implements ChunkTransform {
34+
35+
public static final DeltaTransform INSTANCE = new DeltaTransform();
36+
37+
private DeltaTransform() {
38+
}
39+
40+
@Override
41+
public void encode(ByteBuffer buffer, int numBytes, int valueSizeInBytes) {
42+
if (valueSizeInBytes == Integer.BYTES) {
43+
encodeInts(buffer, numBytes);
44+
} else {
45+
encodeLongs(buffer, numBytes);
46+
}
47+
}
48+
49+
@Override
50+
public void decode(ByteBuffer buffer, int numBytes, int valueSizeInBytes) {
51+
if (valueSizeInBytes == Integer.BYTES) {
52+
decodeInts(buffer, numBytes);
53+
} else {
54+
decodeLongs(buffer, numBytes);
55+
}
56+
}
57+
58+
private void encodeInts(ByteBuffer buffer, int numBytes) {
59+
int numValues = numBytes / Integer.BYTES;
60+
if (numValues <= 1) {
61+
return;
62+
}
63+
int pos = buffer.position();
64+
// Encode backwards so each value can be read before being overwritten
65+
int prev = buffer.getInt(pos + (numValues - 2) * Integer.BYTES);
66+
for (int i = numValues - 1; i >= 1; i--) {
67+
int offset = pos + i * Integer.BYTES;
68+
int curr = buffer.getInt(offset);
69+
buffer.putInt(offset, curr - prev);
70+
if (i > 1) {
71+
prev = buffer.getInt(pos + (i - 2) * Integer.BYTES);
72+
}
73+
}
74+
}
75+
76+
private void encodeLongs(ByteBuffer buffer, int numBytes) {
77+
int numValues = numBytes / Long.BYTES;
78+
if (numValues <= 1) {
79+
return;
80+
}
81+
int pos = buffer.position();
82+
long prev = buffer.getLong(pos + (numValues - 2) * Long.BYTES);
83+
for (int i = numValues - 1; i >= 1; i--) {
84+
int offset = pos + i * Long.BYTES;
85+
long curr = buffer.getLong(offset);
86+
buffer.putLong(offset, curr - prev);
87+
if (i > 1) {
88+
prev = buffer.getLong(pos + (i - 2) * Long.BYTES);
89+
}
90+
}
91+
}
92+
93+
private void decodeInts(ByteBuffer buffer, int numBytes) {
94+
int numValues = numBytes / Integer.BYTES;
95+
if (numValues <= 1) {
96+
return;
97+
}
98+
int pos = buffer.position();
99+
for (int i = 1; i < numValues; i++) {
100+
int offset = pos + i * Integer.BYTES;
101+
int prevValue = buffer.getInt(pos + (i - 1) * Integer.BYTES);
102+
int delta = buffer.getInt(offset);
103+
buffer.putInt(offset, prevValue + delta);
104+
}
105+
}
106+
107+
private void decodeLongs(ByteBuffer buffer, int numBytes) {
108+
int numValues = numBytes / Long.BYTES;
109+
if (numValues <= 1) {
110+
return;
111+
}
112+
int pos = buffer.position();
113+
for (int i = 1; i < numValues; i++) {
114+
int offset = pos + i * Long.BYTES;
115+
long prevValue = buffer.getLong(pos + (i - 1) * Long.BYTES);
116+
long delta = buffer.getLong(offset);
117+
buffer.putLong(offset, prevValue + delta);
118+
}
119+
}
120+
}
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pinot.segment.local.io.compression;
20+
21+
import java.nio.ByteBuffer;
22+
import org.apache.pinot.segment.spi.compression.ChunkTransform;
23+
24+
25+
/**
26+
* Double-delta (delta-of-delta) transform for numeric values. Stores the first value as-is,
27+
* the first delta as-is, then each subsequent value as the difference between consecutive deltas.
28+
* Operates in-place on the ByteBuffer.
29+
*
30+
* <p>Effective for data with constant or near-constant step sizes (e.g., fixed-interval timestamps).</p>
31+
*/
32+
public class DoubleDeltaTransform implements ChunkTransform {
33+
34+
public static final DoubleDeltaTransform INSTANCE = new DoubleDeltaTransform();
35+
36+
private DoubleDeltaTransform() {
37+
}
38+
39+
@Override
40+
public void encode(ByteBuffer buffer, int numBytes, int valueSizeInBytes) {
41+
if (valueSizeInBytes == Integer.BYTES) {
42+
encodeInts(buffer, numBytes);
43+
} else {
44+
encodeLongs(buffer, numBytes);
45+
}
46+
}
47+
48+
@Override
49+
public void decode(ByteBuffer buffer, int numBytes, int valueSizeInBytes) {
50+
if (valueSizeInBytes == Integer.BYTES) {
51+
decodeInts(buffer, numBytes);
52+
} else {
53+
decodeLongs(buffer, numBytes);
54+
}
55+
}
56+
57+
private void encodeInts(ByteBuffer buffer, int numBytes) {
58+
int numValues = numBytes / Integer.BYTES;
59+
if (numValues <= 2) {
60+
// For 0, 1, or 2 values, just apply regular delta (first value stays, second becomes delta)
61+
if (numValues == 2) {
62+
int pos = buffer.position();
63+
int v0 = buffer.getInt(pos);
64+
int v1 = buffer.getInt(pos + Integer.BYTES);
65+
buffer.putInt(pos + Integer.BYTES, v1 - v0);
66+
}
67+
return;
68+
}
69+
int pos = buffer.position();
70+
71+
// Encode in forward order, keeping the previous original value and delta in local variables
72+
// so we can safely overwrite each position in-place as we advance from index 2 onward.
73+
int prevPrevVal = buffer.getInt(pos);
74+
int prevVal = buffer.getInt(pos + Integer.BYTES);
75+
int prevDelta = prevVal - prevPrevVal;
76+
77+
// Store first delta at index 1
78+
buffer.putInt(pos + Integer.BYTES, prevDelta);
79+
80+
int prev = prevVal;
81+
int prevD = prevDelta;
82+
for (int i = 2; i < numValues; i++) {
83+
int offset = pos + i * Integer.BYTES;
84+
int curr = buffer.getInt(offset);
85+
int currDelta = curr - prev;
86+
int doubleDelta = currDelta - prevD;
87+
buffer.putInt(offset, doubleDelta);
88+
prev = curr;
89+
prevD = currDelta;
90+
}
91+
}
92+
93+
private void encodeLongs(ByteBuffer buffer, int numBytes) {
94+
int numValues = numBytes / Long.BYTES;
95+
if (numValues <= 2) {
96+
if (numValues == 2) {
97+
int pos = buffer.position();
98+
long v0 = buffer.getLong(pos);
99+
long v1 = buffer.getLong(pos + Long.BYTES);
100+
buffer.putLong(pos + Long.BYTES, v1 - v0);
101+
}
102+
return;
103+
}
104+
int pos = buffer.position();
105+
106+
long prevPrevVal = buffer.getLong(pos);
107+
long prevVal = buffer.getLong(pos + Long.BYTES);
108+
long prevDelta = prevVal - prevPrevVal;
109+
110+
buffer.putLong(pos + Long.BYTES, prevDelta);
111+
112+
long prev = prevVal;
113+
long prevD = prevDelta;
114+
for (int i = 2; i < numValues; i++) {
115+
int offset = pos + i * Long.BYTES;
116+
long curr = buffer.getLong(offset);
117+
long currDelta = curr - prev;
118+
long doubleDelta = currDelta - prevD;
119+
buffer.putLong(offset, doubleDelta);
120+
prev = curr;
121+
prevD = currDelta;
122+
}
123+
}
124+
125+
private void decodeInts(ByteBuffer buffer, int numBytes) {
126+
int numValues = numBytes / Integer.BYTES;
127+
if (numValues <= 2) {
128+
if (numValues == 2) {
129+
int pos = buffer.position();
130+
int v0 = buffer.getInt(pos);
131+
int d1 = buffer.getInt(pos + Integer.BYTES);
132+
buffer.putInt(pos + Integer.BYTES, v0 + d1);
133+
}
134+
return;
135+
}
136+
int pos = buffer.position();
137+
138+
int v0 = buffer.getInt(pos);
139+
int d1 = buffer.getInt(pos + Integer.BYTES);
140+
int v1 = v0 + d1;
141+
buffer.putInt(pos + Integer.BYTES, v1);
142+
143+
int prev = v1;
144+
int prevDelta = d1;
145+
for (int i = 2; i < numValues; i++) {
146+
int offset = pos + i * Integer.BYTES;
147+
int doubleDelta = buffer.getInt(offset);
148+
int currDelta = prevDelta + doubleDelta;
149+
prev = prev + currDelta;
150+
buffer.putInt(offset, prev);
151+
prevDelta = currDelta;
152+
}
153+
}
154+
155+
private void decodeLongs(ByteBuffer buffer, int numBytes) {
156+
int numValues = numBytes / Long.BYTES;
157+
if (numValues <= 2) {
158+
if (numValues == 2) {
159+
int pos = buffer.position();
160+
long v0 = buffer.getLong(pos);
161+
long d1 = buffer.getLong(pos + Long.BYTES);
162+
buffer.putLong(pos + Long.BYTES, v0 + d1);
163+
}
164+
return;
165+
}
166+
int pos = buffer.position();
167+
168+
long v0 = buffer.getLong(pos);
169+
long d1 = buffer.getLong(pos + Long.BYTES);
170+
long v1 = v0 + d1;
171+
buffer.putLong(pos + Long.BYTES, v1);
172+
173+
long prev = v1;
174+
long prevDelta = d1;
175+
for (int i = 2; i < numValues; i++) {
176+
int offset = pos + i * Long.BYTES;
177+
long doubleDelta = buffer.getLong(offset);
178+
long currDelta = prevDelta + doubleDelta;
179+
prev = prev + currDelta;
180+
buffer.putLong(offset, prev);
181+
prevDelta = currDelta;
182+
}
183+
}
184+
}

0 commit comments

Comments
 (0)