Skip to content

Commit eec756e

Browse files
authored
[Auto reset 1/3]Auto reset offset during ingestion lag (#16492)
1 parent 88190cc commit eec756e

7 files changed

Lines changed: 318 additions & 7 deletions

File tree

pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java

Lines changed: 73 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ public class PinotLLCRealtimeSegmentManager {
171171

172172
// Max time to wait for all LLC segments to complete committing their metadata while stopping the controller.
173173
private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L;
174+
// Timeout for calling stream metadata provider APIs
175+
private static final long STREAM_FETCH_TIMEOUT_MS = 5_000L;
174176

175177
// TODO: make this configurable with default set to 10
176178
/**
@@ -927,7 +929,11 @@ private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig st
927929
int numReplicas) {
928930
String realtimeTableName = tableConfig.getTableName();
929931
String segmentName = newLLCSegmentName.getSegmentName();
930-
String startOffset = committingSegmentDescriptor.getNextOffset();
932+
933+
// Handle offset auto reset
934+
String nextOffset = committingSegmentDescriptor.getNextOffset();
935+
String startOffset = computeStartOffset(
936+
nextOffset, streamConfig, newLLCSegmentName.getPartitionGroupId());
931937

932938
LOGGER.info(
933939
"Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}",
@@ -958,6 +964,65 @@ private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig st
958964
persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1);
959965
}
960966

967+
private String computeStartOffset(String nextOffset, StreamConfig streamConfig, int partitionId) {
968+
if (!streamConfig.isEnableOffsetAutoReset()) {
969+
return nextOffset;
970+
}
971+
long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold();
972+
int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold();
973+
if (timeThreshold <= 0 && offsetThreshold <= 0) {
974+
LOGGER.warn("Invalid offset auto reset configuration for table: {}, topic: {}. "
975+
+ "timeThreshold: {}, offsetThreshold: {}",
976+
streamConfig.getTableNameWithType(), streamConfig.getTopicName(), timeThreshold, offsetThreshold);
977+
return nextOffset;
978+
}
979+
String clientId = getTableTopicUniqueClientId(streamConfig);
980+
StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
981+
StreamPartitionMsgOffsetFactory offsetFactory = consumerFactory.createStreamMsgOffsetFactory();
982+
StreamPartitionMsgOffset nextOffsetWithType = offsetFactory.create(nextOffset);
983+
StreamPartitionMsgOffset offsetAtSLA = null;
984+
StreamPartitionMsgOffset latestOffset;
985+
try (StreamMetadataProvider metadataProvider = consumerFactory.createPartitionMetadataProvider(clientId,
986+
partitionId)) {
987+
// Fetching timestamp from an offset is an expensive operation which requires reading the data,
988+
// while fetching offset from timestamp is lightweight and only needs to read metadata.
989+
// Hence, instead of checking if latestOffset's time - nextOffset's time < SLA, we would check
990+
// (CurrentTime - SLA)'s offset > nextOffset.
991+
// TODO: it is relying on System.currentTimeMillis() which might be affected by time drift. If we are able to
992+
// get nextOffset's time, we should instead check (nextOffset's time + SLA)'s offset < latestOffset
993+
latestOffset = metadataProvider.fetchStreamPartitionOffset(
994+
OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS);
995+
LOGGER.info("Latest offset of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId,
996+
latestOffset);
997+
if (timeThreshold > 0) {
998+
offsetAtSLA =
999+
metadataProvider.getOffsetAtTimestamp(partitionId, System.currentTimeMillis() - timeThreshold * 1000,
1000+
STREAM_FETCH_TIMEOUT_MS);
1001+
LOGGER.info("Offset at SLA of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId,
1002+
offsetAtSLA);
1003+
}
1004+
} catch (Exception e) {
1005+
LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting offsets", e);
1006+
return nextOffset;
1007+
}
1008+
try {
1009+
if (timeThreshold > 0 && offsetAtSLA != null && offsetAtSLA.compareTo(nextOffsetWithType) < 0) {
1010+
LOGGER.info("Auto reset offset from {} to {} on partition {} because time threshold reached", nextOffset,
1011+
latestOffset, partitionId);
1012+
return latestOffset.toString();
1013+
}
1014+
if (offsetThreshold > 0
1015+
&& Long.parseLong(latestOffset.toString()) - Long.parseLong(nextOffset) > offsetThreshold) {
1016+
LOGGER.info("Auto reset offset from {} to {} on partition {} because number of offsets threshold reached",
1017+
nextOffset, latestOffset, partitionId);
1018+
return latestOffset.toString();
1019+
}
1020+
} catch (Exception e) {
1021+
LOGGER.warn("Not able to compare the offsets, skip auto resetting offsets", e);
1022+
}
1023+
return nextOffset;
1024+
}
1025+
9611026
@Nullable
9621027
private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId,
9631028
int numPartitionGroups) {
@@ -1006,16 +1071,20 @@ public long getCommitTimeoutMS(String realtimeTableName) {
10061071
return commitTimeoutMS;
10071072
}
10081073

1074+
private String getTableTopicUniqueClientId(StreamConfig streamConfig) {
1075+
return StreamConsumerFactory.getUniqueClientId(
1076+
PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
1077+
+ streamConfig.getTopicName());
1078+
}
1079+
10091080
/**
10101081
* Fetches the partition ids for the stream. Some stream (e.g. Kinesis) might not support this operation, in which
10111082
* case exception will be thrown.
10121083
*/
10131084
@VisibleForTesting
10141085
Set<Integer> getPartitionIds(StreamConfig streamConfig)
10151086
throws Exception {
1016-
String clientId = StreamConsumerFactory.getUniqueClientId(
1017-
PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
1018-
+ streamConfig.getTopicName());
1087+
String clientId = getTableTopicUniqueClientId(streamConfig);
10191088
StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
10201089
try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) {
10211090
return metadataProvider.fetchPartitionIds(5000L);

pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,15 @@
7777
import org.apache.pinot.spi.env.PinotConfiguration;
7878
import org.apache.pinot.spi.filesystem.PinotFSFactory;
7979
import org.apache.pinot.spi.stream.LongMsgOffset;
80+
import org.apache.pinot.spi.stream.LongMsgOffsetFactory;
81+
import org.apache.pinot.spi.stream.OffsetCriteria;
8082
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
8183
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
8284
import org.apache.pinot.spi.stream.StreamConfig;
85+
import org.apache.pinot.spi.stream.StreamConfigProperties;
86+
import org.apache.pinot.spi.stream.StreamConsumerFactory;
87+
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
88+
import org.apache.pinot.spi.stream.StreamMetadataProvider;
8389
import org.apache.pinot.spi.utils.CommonConstants;
8490
import org.apache.pinot.spi.utils.CommonConstants.Helix;
8591
import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance;
@@ -91,6 +97,7 @@
9197
import org.apache.pinot.util.TestUtils;
9298
import org.apache.zookeeper.data.Stat;
9399
import org.joda.time.Interval;
100+
import org.mockito.MockedStatic;
94101
import org.testng.Assert;
95102
import org.testng.annotations.AfterClass;
96103
import org.testng.annotations.Test;
@@ -122,6 +129,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
122129
static final String CRC = Long.toString(RANDOM.nextLong() & 0xFFFFFFFFL);
123130
static final SegmentVersion SEGMENT_VERSION = RANDOM.nextBoolean() ? SegmentVersion.v1 : SegmentVersion.v3;
124131
static final int NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1;
132+
static final long LATEST_OFFSET = PARTITION_OFFSET.getOffset() * 2 + NUM_DOCS;
125133
static final int SEGMENT_SIZE_IN_BYTES = 100000000;
126134
@AfterClass
127135
public void tearDown()
@@ -325,6 +333,139 @@ public void testCommitSegment() {
325333
assertNull(consumingSegmentZKMetadata);
326334
}
327335

336+
@Test
337+
public void testCommitSegmentWithOffsetAutoResetOnOffset()
338+
throws Exception {
339+
// Set up a new table with 2 replicas, 5 instances, 4 partition
340+
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
341+
FakePinotLLCRealtimeSegmentManager segmentManager =
342+
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
343+
setUpNewTable(segmentManager, 2, 5, 4);
344+
Map<String, Map<String, String>> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields();
345+
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(segmentManager._tableConfig).get(0);
346+
streamConfigMap.put(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET, String.valueOf(true));
347+
streamConfigMap.put(StreamConfigProperties.OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY, "100");
348+
segmentManager.makeTableConfig(streamConfigMap);
349+
350+
StreamConsumerFactory mockConsumerFactory = mock(StreamConsumerFactory.class);
351+
StreamMetadataProvider mockMetadataProvider = mock(StreamMetadataProvider.class);
352+
when(mockConsumerFactory.createPartitionMetadataProvider(anyString(), anyInt())).thenReturn(mockMetadataProvider);
353+
when(mockConsumerFactory.createStreamMsgOffsetFactory()).thenReturn(new LongMsgOffsetFactory());
354+
when(mockMetadataProvider.fetchStreamPartitionOffset(eq(OffsetCriteria.LARGEST_OFFSET_CRITERIA),
355+
anyLong())).thenReturn(new LongMsgOffset(LATEST_OFFSET));
356+
when(mockMetadataProvider.getOffsetAtTimestamp(eq(0), anyLong(), anyLong())).thenReturn(PARTITION_OFFSET);
357+
358+
try (MockedStatic<StreamConsumerFactoryProvider> mockedStaticProvider = mockStatic(
359+
StreamConsumerFactoryProvider.class)) {
360+
361+
mockedStaticProvider.when(() -> StreamConsumerFactoryProvider.create(segmentManager._streamConfigs.get(0)))
362+
.thenReturn(mockConsumerFactory);
363+
364+
// Commit a segment for partition group 0
365+
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
366+
String endOffset = new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString();
367+
CommittingSegmentDescriptor committingSegmentDescriptor =
368+
new CommittingSegmentDescriptor(committingSegment, endOffset, SEGMENT_SIZE_IN_BYTES);
369+
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
370+
segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
371+
372+
// Verify instance states for committed segment and new consuming segment
373+
Map<String, String> committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment);
374+
assertNotNull(committedSegmentInstanceStateMap);
375+
assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
376+
Collections.singleton(SegmentStateModel.ONLINE));
377+
378+
String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
379+
Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
380+
assertNotNull(consumingSegmentInstanceStateMap);
381+
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
382+
Collections.singleton(SegmentStateModel.CONSUMING));
383+
384+
// Verify segment ZK metadata for committed segment and new consuming segment
385+
SegmentZKMetadata committedSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment);
386+
assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
387+
assertEquals(committedSegmentZKMetadata.getStartOffset(), PARTITION_OFFSET.toString());
388+
assertEquals(committedSegmentZKMetadata.getEndOffset(), endOffset);
389+
assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
390+
assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
391+
assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION.name());
392+
assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
393+
assertEquals(committedSegmentZKMetadata.getSizeInBytes(), SEGMENT_SIZE_IN_BYTES);
394+
395+
SegmentZKMetadata consumingSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(consumingSegment);
396+
assertEquals(consumingSegmentZKMetadata.getStatus(), Status.IN_PROGRESS);
397+
assertEquals(consumingSegmentZKMetadata.getStartOffset(), String.valueOf(LATEST_OFFSET));
398+
assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
399+
}
400+
}
401+
402+
@Test
403+
public void testCommitSegmentWithOffsetAutoResetOnTime()
404+
throws Exception {
405+
// Set up a new table with 2 replicas, 5 instances, 4 partition
406+
PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class);
407+
FakePinotLLCRealtimeSegmentManager segmentManager =
408+
new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager);
409+
setUpNewTable(segmentManager, 2, 5, 4);
410+
Map<String, Map<String, String>> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields();
411+
Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(segmentManager._tableConfig).get(0);
412+
streamConfigMap.put(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET, String.valueOf(true));
413+
streamConfigMap.put(StreamConfigProperties.OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY, "1800");
414+
segmentManager.makeTableConfig(streamConfigMap);
415+
416+
StreamConsumerFactory mockConsumerFactory = mock(StreamConsumerFactory.class);
417+
StreamMetadataProvider mockMetadataProvider = mock(StreamMetadataProvider.class);
418+
when(mockConsumerFactory.createPartitionMetadataProvider(anyString(), anyInt())).thenReturn(mockMetadataProvider);
419+
when(mockConsumerFactory.createStreamMsgOffsetFactory()).thenReturn(new LongMsgOffsetFactory());
420+
when(mockMetadataProvider.fetchStreamPartitionOffset(eq(OffsetCriteria.LARGEST_OFFSET_CRITERIA),
421+
anyLong())).thenReturn(new LongMsgOffset(LATEST_OFFSET));
422+
when(mockMetadataProvider.getOffsetAtTimestamp(eq(0), anyLong(), anyLong())).thenReturn(
423+
new LongMsgOffset(PARTITION_OFFSET.getOffset() + 1L));
424+
425+
try (MockedStatic<StreamConsumerFactoryProvider> mockedStaticProvider = mockStatic(
426+
StreamConsumerFactoryProvider.class)) {
427+
428+
mockedStaticProvider.when(() -> StreamConsumerFactoryProvider.create(segmentManager._streamConfigs.get(0)))
429+
.thenReturn(mockConsumerFactory);
430+
431+
// Commit a segment for partition group 0
432+
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName();
433+
String endOffset = new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString();
434+
CommittingSegmentDescriptor committingSegmentDescriptor =
435+
new CommittingSegmentDescriptor(committingSegment, endOffset, SEGMENT_SIZE_IN_BYTES);
436+
committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata());
437+
segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor);
438+
439+
// Verify instance states for committed segment and new consuming segment
440+
Map<String, String> committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment);
441+
assertNotNull(committedSegmentInstanceStateMap);
442+
assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()),
443+
Collections.singleton(SegmentStateModel.ONLINE));
444+
445+
String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName();
446+
Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment);
447+
assertNotNull(consumingSegmentInstanceStateMap);
448+
assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()),
449+
Collections.singleton(SegmentStateModel.CONSUMING));
450+
451+
// Verify segment ZK metadata for committed segment and new consuming segment
452+
SegmentZKMetadata committedSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment);
453+
assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE);
454+
assertEquals(committedSegmentZKMetadata.getStartOffset(), PARTITION_OFFSET.toString());
455+
assertEquals(committedSegmentZKMetadata.getEndOffset(), endOffset);
456+
assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
457+
assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
458+
assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION.name());
459+
assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
460+
assertEquals(committedSegmentZKMetadata.getSizeInBytes(), SEGMENT_SIZE_IN_BYTES);
461+
462+
SegmentZKMetadata consumingSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(consumingSegment);
463+
assertEquals(consumingSegmentZKMetadata.getStatus(), Status.IN_PROGRESS);
464+
assertEquals(consumingSegmentZKMetadata.getStartOffset(), String.valueOf(LATEST_OFFSET));
465+
assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
466+
}
467+
}
468+
328469
/**
329470
* Test cases for the scenario where stream partitions increase, and the validation manager is attempting to create
330471
* segments for new partitions. This test assumes that all other factors remain the same (no error conditions or
@@ -1725,6 +1866,13 @@ void makeTableConfig() {
17251866
_streamConfigs = IngestionConfigUtils.getStreamConfigs(_tableConfig);
17261867
}
17271868

1869+
void makeTableConfig(Map<String, String> streamConfigMap) {
1870+
_tableConfig =
1871+
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas)
1872+
.setStreamConfigs(streamConfigMap).build();
1873+
_streamConfigs = IngestionConfigUtils.getStreamConfigs(_tableConfig);
1874+
}
1875+
17281876
void makeConsumingInstancePartitions() {
17291877
List<String> instances = new ArrayList<>(_numInstances);
17301878
for (int i = 0; i < _numInstances; i++) {

pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,12 @@ public List<TopicMetadata> getTopics() {
187187
}
188188
}
189189

190+
@Override
191+
public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long timestampMillis, long timeoutMillis) {
192+
return new LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition, timestampMillis),
193+
Duration.ofMillis(timeoutMillis)).get(_topicPartition).offset());
194+
}
195+
190196
public static class KafkaTopicMetadata implements TopicMetadata {
191197
private String _name;
192198

pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,13 @@ public KafkaTopicMetadata setName(String name) {
199199
return this;
200200
}
201201
}
202+
203+
@Override
204+
public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long timestampMillis, long timeoutMillis) {
205+
return new LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition, timestampMillis),
206+
Duration.ofMillis(timeoutMillis)).get(_topicPartition).offset());
207+
}
208+
202209
@Override
203210
public void close()
204211
throws IOException {

0 commit comments

Comments
 (0)