Skip to content

Commit 15b0de3

Browse files
authored
Fix compacted column statistics to restrict all stats to valid documents (#18041)
1 parent f576cf1 commit 15b0de3

21 files changed

Lines changed: 2801 additions & 1172 deletions

pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/CommitTimeCompactionIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ public void testCommitTimeCompactionWithMultiValueColumns()
332332
throws Exception {
333333
// Test Case: Multi-Value Fields with Commit-Time Compaction
334334
// Goal: Ensure commit-time compaction correctly handles multi-value dictionary columns
335-
// (like arrays/lists) during segment conversion, validating the fix for CompactedDictEncodedColumnStatistics
335+
// (like arrays/lists) during segment conversion, validating the fix for CompactedColumnStatistics
336336

337337
// Create test data with multi-value fields similar to user's "tags" column
338338
List<String> testRecords = List.of("200,Player200,game1,85.5,1681054200000,false,action;shooter",

pinot-segment-local/src/main/java/org/apache/pinot/segment/local/realtime/converter/RealtimeSegmentConverter.java

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.pinot.common.metrics.ServerMeter;
2727
import org.apache.pinot.common.metrics.ServerMetrics;
2828
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
29-
import org.apache.pinot.segment.local.realtime.converter.stats.RealtimeSegmentSegmentCreationDataSource;
29+
import org.apache.pinot.segment.local.realtime.converter.stats.MutableSegmentCreationDataSource;
3030
import org.apache.pinot.segment.local.segment.creator.TransformPipeline;
3131
import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
3232
import org.apache.pinot.segment.local.segment.readers.CompactedPinotSegmentRecordReader;
@@ -42,6 +42,7 @@
4242
import org.apache.pinot.spi.config.table.TableConfig;
4343
import org.apache.pinot.spi.data.Schema;
4444
import org.apache.pinot.spi.data.readers.RecordReader;
45+
import org.roaringbitmap.RoaringBitmap;
4546
import org.slf4j.Logger;
4647
import org.slf4j.LoggerFactory;
4748

@@ -122,24 +123,22 @@ public void build(@Nullable SegmentVersion segmentVersion, @Nullable ServerMetri
122123

123124
if (useCompactedReader) {
124125
// Take a snapshot of validDocIds at the beginning of conversion to ensure consistency
125-
ThreadSafeMutableRoaringBitmap validDocIdsSnapshot = getValidDocIdSnapshot();
126-
if (validDocIdsSnapshot == null) {
126+
RoaringBitmap validDocIds = getValidDocIds();
127+
if (validDocIds == null) {
127128
throw new IllegalStateException("Cannot use CompactedPinotSegmentRecordReader without valid document IDs. "
128129
+ "Segment may be corrupted.");
129130
}
130131
// Use CompactedPinotSegmentRecordReader to remove obsolete/invalidated records
131-
try (CompactedPinotSegmentRecordReader recordReader = new CompactedPinotSegmentRecordReader(
132-
validDocIdsSnapshot)) {
132+
try (CompactedPinotSegmentRecordReader recordReader = new CompactedPinotSegmentRecordReader(validDocIds)) {
133133
recordReader.init(_realtimeSegmentImpl, sortedDocIds);
134-
buildSegmentWithReader(driver, genConfig, recordReader, sortedDocIds, sortedColumn, useCompactedReader,
135-
validDocIdsSnapshot);
134+
buildSegmentWithReader(driver, genConfig, recordReader, sortedDocIds, sortedColumn, validDocIds);
136135
publishCompactionMetrics(serverMetrics, preCommitRowCount, driver, compactionStartTime);
137136
}
138137
} else {
139138
// Use regular PinotSegmentRecordReader (existing behavior)
140139
try (PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader()) {
141140
recordReader.init(_realtimeSegmentImpl, sortedDocIds);
142-
buildSegmentWithReader(driver, genConfig, recordReader, sortedDocIds, sortedColumn, useCompactedReader, null);
141+
buildSegmentWithReader(driver, genConfig, recordReader, sortedDocIds, sortedColumn, null);
143142
}
144143
}
145144

@@ -152,13 +151,10 @@ public void build(@Nullable SegmentVersion segmentVersion, @Nullable ServerMetri
152151
}
153152
}
154153

155-
private @Nullable ThreadSafeMutableRoaringBitmap getValidDocIdSnapshot() {
156-
ThreadSafeMutableRoaringBitmap validDocIdsSnapshot = null;
157-
if (_realtimeSegmentImpl.getValidDocIds() != null) {
158-
validDocIdsSnapshot = new ThreadSafeMutableRoaringBitmap(
159-
_realtimeSegmentImpl.getValidDocIds().getMutableRoaringBitmap());
160-
}
161-
return validDocIdsSnapshot;
154+
@Nullable
155+
private RoaringBitmap getValidDocIds() {
156+
ThreadSafeMutableRoaringBitmap validDocIds = _realtimeSegmentImpl.getValidDocIds();
157+
return validDocIds != null ? validDocIds.getMutableRoaringBitmap().toRoaringBitmap() : null;
162158
}
163159

164160
/**
@@ -202,28 +198,17 @@ private void publishCompactionMetrics(@Nullable ServerMetrics serverMetrics,
202198
* Common method to build segment with the provided record reader
203199
*/
204200
private void buildSegmentWithReader(SegmentIndexCreationDriverImpl driver, SegmentGeneratorConfig genConfig,
205-
RecordReader recordReader, int[] sortedDocIds, @Nullable String sortedColumn, boolean useCompactedReader,
206-
@Nullable ThreadSafeMutableRoaringBitmap validDocIdsSnapshot)
201+
RecordReader recordReader, int[] sortedDocIds, @Nullable String sortedColumn, @Nullable RoaringBitmap validDocIds)
207202
throws Exception {
208-
RealtimeSegmentSegmentCreationDataSource dataSource;
209-
if (useCompactedReader) {
210-
// For compacted readers, use the constructor that takes sortedDocIds and pass the validDocIds snapshot
211-
dataSource = new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl, recordReader, sortedDocIds,
212-
sortedColumn, validDocIdsSnapshot);
213-
} else {
214-
// For regular readers, use the original constructor
215-
dataSource = new RealtimeSegmentSegmentCreationDataSource(_realtimeSegmentImpl,
216-
(PinotSegmentRecordReader) recordReader, sortedColumn);
217-
}
218-
// initializes reader
203+
MutableSegmentCreationDataSource dataSource =
204+
new MutableSegmentCreationDataSource(_realtimeSegmentImpl, recordReader, sortedDocIds, sortedColumn,
205+
validDocIds);
219206
driver.init(genConfig, dataSource, TransformPipeline.getPassThroughPipeline(_tableName), InstanceType.SERVER);
220207

221208
if (!_enableColumnMajor) {
222209
driver.build();
223210
} else {
224-
//buildByColumn uses validDocIds to skip invalid record while indexing each column. We pass the validDocIds
225-
// only if we are using compacted reader.
226-
driver.buildByColumn(_realtimeSegmentImpl, useCompactedReader ? validDocIdsSnapshot : null);
211+
driver.buildByColumn(_realtimeSegmentImpl, validDocIds);
227212
}
228213
}
229214

0 commit comments

Comments
 (0)