Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -114,6 +115,20 @@ protected List<FieldConfig> getFieldConfigs() {
return fieldConfigs;
}

@Override
public void setUp()
throws Exception {
LOGGER.warn("Setting up integration test class: {}", getClass().getSimpleName());
initControllerRequestURLBuilder();
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);

setUpTable();

// CLP segment conversion can be slow in CI; use a longer timeout than the default 60s.
waitForAllDocsLoaded(600_000);
LOGGER.warn("Finished setting up integration test class: {}", getClass().getSimpleName());
}

@Override
protected IngestionConfig getIngestionConfig() {
List<TransformConfig> transforms = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
import net.jpountz.lz4.LZ4Factory;
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.local.io.codec.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
import net.jpountz.lz4.LZ4Factory;
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.local.io.codec.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.segment.local.io.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.local.io.codec.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.codec;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.pinot.segment.local.io.codec.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.local.io.codec.transform.ChunkTransformFactory;
import org.apache.pinot.segment.spi.codec.ChunkCodec;
import org.apache.pinot.segment.spi.codec.ChunkCodecPipeline;
import org.apache.pinot.segment.spi.codec.ChunkTransform;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;


/**
* A {@link ChunkCompressor} that applies a pipeline of codec stages: first all
* {@link ChunkCodec.CodecKind#TRANSFORM TRANSFORM} stages in order (left-to-right),
* then the terminal {@link ChunkCodec.CodecKind#COMPRESSOR COMPRESSOR}.
*
* <p>This is the write-path counterpart of {@link PipelineChunkDecompressor}.</p>
*/
public class PipelineChunkCompressor implements ChunkCompressor {

private final ChunkCodecPipeline _pipeline;
private final ChunkTransform[] _transforms;
private final ChunkCompressor _terminalCompressor;
private final int _valueSizeInBytes;

/**
* Creates a pipeline compressor.
*
* @param pipeline the codec pipeline
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
*/
public PipelineChunkCompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
_pipeline = pipeline;
_valueSizeInBytes = valueSizeInBytes;

List<ChunkCodec> transformStages = pipeline.getTransforms();
_transforms = new ChunkTransform[transformStages.size()];
for (int i = 0; i < transformStages.size(); i++) {
_transforms[i] = ChunkTransformFactory.getTransform(transformStages.get(i));
}

_terminalCompressor = ChunkCompressorFactory.getCompressor(
pipeline.getChunkCompressionType());
}

@Override
public int compress(ByteBuffer inUncompressed, ByteBuffer outCompressed)
throws IOException {
// Apply transforms left-to-right (in-place on the input buffer)
int numBytes = inUncompressed.remaining();
for (ChunkTransform transform : _transforms) {
transform.encode(inUncompressed, numBytes, _valueSizeInBytes);
}

// Apply terminal compression
return _terminalCompressor.compress(inUncompressed, outCompressed);
}

@Override
public int maxCompressedSize(int uncompressedSize) {
// Transforms are in-place and don't change size; delegate to terminal compressor
return _terminalCompressor.maxCompressedSize(uncompressedSize);
}

@Override
public ChunkCompressionType compressionType() {
return _terminalCompressor.compressionType();
}

@Override
public void close()
throws IOException {
_terminalCompressor.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.codec;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.pinot.segment.local.io.codec.compression.ChunkCompressorFactory;
import org.apache.pinot.segment.local.io.codec.transform.ChunkTransformFactory;
import org.apache.pinot.segment.spi.codec.ChunkCodec;
import org.apache.pinot.segment.spi.codec.ChunkCodecPipeline;
import org.apache.pinot.segment.spi.codec.ChunkTransform;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;


/**
* A {@link ChunkDecompressor} that reverses a codec pipeline: first decompresses using the
* terminal {@link ChunkCodec.CodecKind#COMPRESSOR COMPRESSOR}, then applies all
* {@link ChunkCodec.CodecKind#TRANSFORM TRANSFORM} stages in reverse order (right-to-left).
*
* <p>This is the read-path counterpart of {@link PipelineChunkCompressor}.</p>
*/
public class PipelineChunkDecompressor implements ChunkDecompressor {

private final ChunkCodecPipeline _pipeline;
private final ChunkTransform[] _transforms;
private final ChunkDecompressor _terminalDecompressor;
private final int _valueSizeInBytes;

/**
* Creates a pipeline decompressor.
*
* @param pipeline the codec pipeline
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
*/
public PipelineChunkDecompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
_pipeline = pipeline;
_valueSizeInBytes = valueSizeInBytes;

List<ChunkCodec> transformStages = pipeline.getTransforms();
_transforms = new ChunkTransform[transformStages.size()];
for (int i = 0; i < transformStages.size(); i++) {
_transforms[i] = ChunkTransformFactory.getTransform(transformStages.get(i));
}

_terminalDecompressor = ChunkCompressorFactory.getDecompressor(
pipeline.getChunkCompressionType());
}

@Override
public int decompress(ByteBuffer compressedInput, ByteBuffer decompressedOutput)
throws IOException {
// Decompress using terminal decompressor.
// Per Pinot convention, after this call the output buffer is flipped: position=0, limit=dataSize.
int decompressedSize = _terminalDecompressor.decompress(compressedInput, decompressedOutput);

if (_transforms.length > 0) {
// Buffer is already in read mode (flipped). Transforms operate from position=0.
int numBytes = decompressedOutput.remaining();

// Apply transforms in reverse order (right-to-left)
for (int i = _transforms.length - 1; i >= 0; i--) {
_transforms[i].decode(decompressedOutput, numBytes, _valueSizeInBytes);
}
// Buffer remains flipped: position=0, limit=numBytes — ready for the caller to read.
}

return decompressedSize;
}

@Override
public int decompressedLength(ByteBuffer compressedInput)
throws IOException {
return _terminalDecompressor.decompressedLength(compressedInput);
}

@Override
public void close()
throws IOException {
_terminalDecompressor.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import org.apache.pinot.segment.local.io.codec.PipelineChunkCompressor;
import org.apache.pinot.segment.local.io.codec.PipelineChunkDecompressor;
import org.apache.pinot.segment.spi.codec.ChunkCodecPipeline;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.compression.ChunkCompressor;
import org.apache.pinot.segment.spi.compression.ChunkDecompressor;
Expand Down Expand Up @@ -118,4 +121,38 @@ public static ChunkDecompressor getDecompressor(ChunkCompressionType compression
throw new IllegalArgumentException("Illegal decompressor name " + compressionType);
}
}

/**
* Returns a compressor for a codec pipeline. If the pipeline has transforms, returns a
* {@link PipelineChunkCompressor}; otherwise returns the plain compressor for the terminal codec.
* Legacy compound codecs (DELTA_LZ4, DOUBLE_DELTA_LZ4) are handled by their existing compressor
* implementations.
*
* @param pipeline the codec pipeline
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
* @return compressor for the pipeline
*/
public static ChunkCompressor getCompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
if (pipeline.hasTransforms()) {
return new PipelineChunkCompressor(pipeline, valueSizeInBytes);
}
return getCompressor(pipeline.getChunkCompressionType());
}

/**
* Returns a decompressor for a codec pipeline. If the pipeline has transforms, returns a
* {@link PipelineChunkDecompressor}; otherwise returns the plain decompressor for the terminal codec.
* Legacy compound codecs (DELTA_LZ4, DOUBLE_DELTA_LZ4) are handled by their existing decompressor
* implementations.
*
* @param pipeline the codec pipeline
* @param valueSizeInBytes size of each typed value (4 for INT, 8 for LONG); used by transforms
* @return decompressor for the pipeline
*/
public static ChunkDecompressor getDecompressor(ChunkCodecPipeline pipeline, int valueSizeInBytes) {
if (pipeline.hasTransforms()) {
return new PipelineChunkDecompressor(pipeline, valueSizeInBytes);
}
return getDecompressor(pipeline.getChunkCompressionType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.io.compression;
package org.apache.pinot.segment.local.io.codec.compression;

import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down
Loading
Loading