diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 099a49a10393..76a56a93aa1c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -171,6 +171,8 @@ public class PinotLLCRealtimeSegmentManager { // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller. private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L; + // Timeout for calling stream metadata provider APIs + private static final long STREAM_FETCH_TIMEOUT_MS = 5_000L; // TODO: make this configurable with default set to 10 /** @@ -931,7 +933,11 @@ private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig st int numReplicas) { String realtimeTableName = tableConfig.getTableName(); String segmentName = newLLCSegmentName.getSegmentName(); - String startOffset = committingSegmentDescriptor.getNextOffset(); + + // Handle offset auto reset + String nextOffset = committingSegmentDescriptor.getNextOffset(); + String startOffset = computeStartOffset( + nextOffset, streamConfig, newLLCSegmentName.getPartitionGroupId()); LOGGER.info( "Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", @@ -962,6 +968,65 @@ private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig st persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1); } + private String computeStartOffset(String nextOffset, StreamConfig streamConfig, int partitionId) { + if (!streamConfig.isEnableOffsetAutoReset()) { + return nextOffset; + } + long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold(); + int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold(); + if (timeThreshold <= 0 && offsetThreshold <= 0) { + LOGGER.warn("Invalid offset auto reset configuration for table: {}, topic: {}. " + + "timeThreshold: {}, offsetThreshold: {}", + streamConfig.getTableNameWithType(), streamConfig.getTopicName(), timeThreshold, offsetThreshold); + return nextOffset; + } + String clientId = getTableTopicUniqueClientId(streamConfig); + StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig); + StreamPartitionMsgOffsetFactory offsetFactory = consumerFactory.createStreamMsgOffsetFactory(); + StreamPartitionMsgOffset nextOffsetWithType = offsetFactory.create(nextOffset); + StreamPartitionMsgOffset offsetAtSLA = null; + StreamPartitionMsgOffset latestOffset; + try (StreamMetadataProvider metadataProvider = consumerFactory.createPartitionMetadataProvider(clientId, + partitionId)) { + // Fetching timestamp from an offset is an expensive operation which requires reading the data, + // while fetching offset from timestamp is lightweight and only needs to read metadata. + // Hence, instead of checking if latestOffset's time - nextOffset's time < SLA, we would check + // (CurrentTime - SLA)'s offset > nextOffset. + // TODO: it is relying on System.currentTimeMillis() which might be affected by time drift. If we are able to + // get nextOffset's time, we should instead check (nextOffset's time + SLA)'s offset < latestOffset + latestOffset = metadataProvider.fetchStreamPartitionOffset( + OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS); + LOGGER.info("Latest offset of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId, + latestOffset); + if (timeThreshold > 0) { + offsetAtSLA = + metadataProvider.getOffsetAtTimestamp(partitionId, System.currentTimeMillis() - timeThreshold * 1000, + STREAM_FETCH_TIMEOUT_MS); + LOGGER.info("Offset at SLA of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId, + offsetAtSLA); + } + } catch (Exception e) { + LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting offsets", e); + return nextOffset; + } + try { + if (timeThreshold > 0 && offsetAtSLA != null && offsetAtSLA.compareTo(nextOffsetWithType) < 0) { + LOGGER.info("Auto reset offset from {} to {} on partition {} because time threshold reached", nextOffset, + latestOffset, partitionId); + return latestOffset.toString(); + } + if (offsetThreshold > 0 + && Long.parseLong(latestOffset.toString()) - Long.parseLong(nextOffset) > offsetThreshold) { + LOGGER.info("Auto reset offset from {} to {} on partition {} because number of offsets threshold reached", + nextOffset, latestOffset, partitionId); + return latestOffset.toString(); + } + } catch (Exception e) { + LOGGER.warn("Not able to compare the offsets, skip auto resetting offsets", e); + } + return nextOffset; + } + @Nullable private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId, int numPartitionGroups) { @@ -1010,6 +1075,12 @@ public long getCommitTimeoutMS(String realtimeTableName) { return commitTimeoutMS; } + private String getTableTopicUniqueClientId(StreamConfig streamConfig) { + return StreamConsumerFactory.getUniqueClientId( + PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" + + streamConfig.getTopicName()); + } + /** * Fetches the partition ids for the stream. Some stream (e.g. Kinesis) might not support this operation, in which * case exception will be thrown. @@ -1017,9 +1088,7 @@ public long getCommitTimeoutMS(String realtimeTableName) { @VisibleForTesting Set getPartitionIds(StreamConfig streamConfig) throws Exception { - String clientId = StreamConsumerFactory.getUniqueClientId( - PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" - + streamConfig.getTopicName()); + String clientId = getTableTopicUniqueClientId(streamConfig); StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig); try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) { return metadataProvider.fetchPartitionIds(5000L); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index d15f87efbdc2..263484c5d9d4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -77,9 +77,15 @@ import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.LongMsgOffsetFactory; +import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.stream.StreamConsumerFactory; +import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; +import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Helix.Instance; @@ -91,6 +97,7 @@ import org.apache.pinot.util.TestUtils; import org.apache.zookeeper.data.Stat; import org.joda.time.Interval; +import org.mockito.MockedStatic; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.Test; @@ -122,6 +129,7 @@ public class PinotLLCRealtimeSegmentManagerTest { static final String CRC = Long.toString(RANDOM.nextLong() & 0xFFFFFFFFL); static final SegmentVersion SEGMENT_VERSION = RANDOM.nextBoolean() ? SegmentVersion.v1 : SegmentVersion.v3; static final int NUM_DOCS = RANDOM.nextInt(Integer.MAX_VALUE) + 1; + static final long LATEST_OFFSET = PARTITION_OFFSET.getOffset() * 2 + NUM_DOCS; static final int SEGMENT_SIZE_IN_BYTES = 100000000; @AfterClass public void tearDown() @@ -325,6 +333,139 @@ public void testCommitSegment() { assertNull(consumingSegmentZKMetadata); } + @Test + public void testCommitSegmentWithOffsetAutoResetOnOffset() + throws Exception { + // Set up a new table with 2 replicas, 5 instances, 4 partition + PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); + FakePinotLLCRealtimeSegmentManager segmentManager = + new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); + setUpNewTable(segmentManager, 2, 5, 4); + Map> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields(); + Map streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(segmentManager._tableConfig).get(0); + streamConfigMap.put(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET, String.valueOf(true)); + streamConfigMap.put(StreamConfigProperties.OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY, "100"); + segmentManager.makeTableConfig(streamConfigMap); + + StreamConsumerFactory mockConsumerFactory = mock(StreamConsumerFactory.class); + StreamMetadataProvider mockMetadataProvider = mock(StreamMetadataProvider.class); + when(mockConsumerFactory.createPartitionMetadataProvider(anyString(), anyInt())).thenReturn(mockMetadataProvider); + when(mockConsumerFactory.createStreamMsgOffsetFactory()).thenReturn(new LongMsgOffsetFactory()); + when(mockMetadataProvider.fetchStreamPartitionOffset(eq(OffsetCriteria.LARGEST_OFFSET_CRITERIA), + anyLong())).thenReturn(new LongMsgOffset(LATEST_OFFSET)); + when(mockMetadataProvider.getOffsetAtTimestamp(eq(0), anyLong(), anyLong())).thenReturn(PARTITION_OFFSET); + + try (MockedStatic mockedStaticProvider = mockStatic( + StreamConsumerFactoryProvider.class)) { + + mockedStaticProvider.when(() -> StreamConsumerFactoryProvider.create(segmentManager._streamConfigs.get(0))) + .thenReturn(mockConsumerFactory); + + // Commit a segment for partition group 0 + String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); + String endOffset = new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(); + CommittingSegmentDescriptor committingSegmentDescriptor = + new CommittingSegmentDescriptor(committingSegment, endOffset, SEGMENT_SIZE_IN_BYTES); + committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata()); + segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor); + + // Verify instance states for committed segment and new consuming segment + Map committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment); + assertNotNull(committedSegmentInstanceStateMap); + assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()), + Collections.singleton(SegmentStateModel.ONLINE)); + + String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName(); + Map consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment); + assertNotNull(consumingSegmentInstanceStateMap); + assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()), + Collections.singleton(SegmentStateModel.CONSUMING)); + + // Verify segment ZK metadata for committed segment and new consuming segment + SegmentZKMetadata committedSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment); + assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE); + assertEquals(committedSegmentZKMetadata.getStartOffset(), PARTITION_OFFSET.toString()); + assertEquals(committedSegmentZKMetadata.getEndOffset(), endOffset); + assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS); + assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC)); + assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION.name()); + assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS); + assertEquals(committedSegmentZKMetadata.getSizeInBytes(), SEGMENT_SIZE_IN_BYTES); + + SegmentZKMetadata consumingSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(consumingSegment); + assertEquals(consumingSegmentZKMetadata.getStatus(), Status.IN_PROGRESS); + assertEquals(consumingSegmentZKMetadata.getStartOffset(), String.valueOf(LATEST_OFFSET)); + assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS); + } + } + + @Test + public void testCommitSegmentWithOffsetAutoResetOnTime() + throws Exception { + // Set up a new table with 2 replicas, 5 instances, 4 partition + PinotHelixResourceManager mockHelixResourceManager = mock(PinotHelixResourceManager.class); + FakePinotLLCRealtimeSegmentManager segmentManager = + new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager); + setUpNewTable(segmentManager, 2, 5, 4); + Map> instanceStatesMap = segmentManager._idealState.getRecord().getMapFields(); + Map streamConfigMap = IngestionConfigUtils.getStreamConfigMaps(segmentManager._tableConfig).get(0); + streamConfigMap.put(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET, String.valueOf(true)); + streamConfigMap.put(StreamConfigProperties.OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY, "1800"); + segmentManager.makeTableConfig(streamConfigMap); + + StreamConsumerFactory mockConsumerFactory = mock(StreamConsumerFactory.class); + StreamMetadataProvider mockMetadataProvider = mock(StreamMetadataProvider.class); + when(mockConsumerFactory.createPartitionMetadataProvider(anyString(), anyInt())).thenReturn(mockMetadataProvider); + when(mockConsumerFactory.createStreamMsgOffsetFactory()).thenReturn(new LongMsgOffsetFactory()); + when(mockMetadataProvider.fetchStreamPartitionOffset(eq(OffsetCriteria.LARGEST_OFFSET_CRITERIA), + anyLong())).thenReturn(new LongMsgOffset(LATEST_OFFSET)); + when(mockMetadataProvider.getOffsetAtTimestamp(eq(0), anyLong(), anyLong())).thenReturn( + new LongMsgOffset(PARTITION_OFFSET.getOffset() + 1L)); + + try (MockedStatic mockedStaticProvider = mockStatic( + StreamConsumerFactoryProvider.class)) { + + mockedStaticProvider.when(() -> StreamConsumerFactoryProvider.create(segmentManager._streamConfigs.get(0))) + .thenReturn(mockConsumerFactory); + + // Commit a segment for partition group 0 + String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, CURRENT_TIME_MS).getSegmentName(); + String endOffset = new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString(); + CommittingSegmentDescriptor committingSegmentDescriptor = + new CommittingSegmentDescriptor(committingSegment, endOffset, SEGMENT_SIZE_IN_BYTES); + committingSegmentDescriptor.setSegmentMetadata(mockSegmentMetadata()); + segmentManager.commitSegmentMetadata(REALTIME_TABLE_NAME, committingSegmentDescriptor); + + // Verify instance states for committed segment and new consuming segment + Map committedSegmentInstanceStateMap = instanceStatesMap.get(committingSegment); + assertNotNull(committedSegmentInstanceStateMap); + assertEquals(new HashSet<>(committedSegmentInstanceStateMap.values()), + Collections.singleton(SegmentStateModel.ONLINE)); + + String consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 1, CURRENT_TIME_MS).getSegmentName(); + Map consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment); + assertNotNull(consumingSegmentInstanceStateMap); + assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()), + Collections.singleton(SegmentStateModel.CONSUMING)); + + // Verify segment ZK metadata for committed segment and new consuming segment + SegmentZKMetadata committedSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment); + assertEquals(committedSegmentZKMetadata.getStatus(), Status.DONE); + assertEquals(committedSegmentZKMetadata.getStartOffset(), PARTITION_OFFSET.toString()); + assertEquals(committedSegmentZKMetadata.getEndOffset(), endOffset); + assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS); + assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC)); + assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION.name()); + assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS); + assertEquals(committedSegmentZKMetadata.getSizeInBytes(), SEGMENT_SIZE_IN_BYTES); + + SegmentZKMetadata consumingSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(consumingSegment); + assertEquals(consumingSegmentZKMetadata.getStatus(), Status.IN_PROGRESS); + assertEquals(consumingSegmentZKMetadata.getStartOffset(), String.valueOf(LATEST_OFFSET)); + assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS); + } + } + /** * Test cases for the scenario where stream partitions increase, and the validation manager is attempting to create * segments for new partitions. This test assumes that all other factors remain the same (no error conditions or @@ -1725,6 +1866,13 @@ void makeTableConfig() { _streamConfigs = IngestionConfigUtils.getStreamConfigs(_tableConfig); } + void makeTableConfig(Map streamConfigMap) { + _tableConfig = + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(_numReplicas) + .setStreamConfigs(streamConfigMap).build(); + _streamConfigs = IngestionConfigUtils.getStreamConfigs(_tableConfig); + } + void makeConsumingInstancePartitions() { List instances = new ArrayList<>(_numInstances); for (int i = 0; i < _numInstances; i++) { diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java index 0b13c05ba482..66361984bb2e 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java @@ -187,6 +187,12 @@ public List getTopics() { } } + @Override + public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long timestampMillis, long timeoutMillis) { + return new LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition, timestampMillis), + Duration.ofMillis(timeoutMillis)).get(_topicPartition).offset()); + } + public static class KafkaTopicMetadata implements TopicMetadata { private String _name; diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java index 62a181f605dd..c0d005ae26d2 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java @@ -199,6 +199,13 @@ public KafkaTopicMetadata setName(String name) { return this; } } + + @Override + public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long timestampMillis, long timeoutMillis) { + return new LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition, timestampMillis), + Duration.ofMillis(timeoutMillis)).get(_topicPartition).offset()); + } + @Override public void close() throws IOException { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java index d907c0a9e4ab..6fab25a51e80 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java @@ -75,6 +75,10 @@ public class StreamConfig { private final double _topicConsumptionRateLimit; + private final boolean _enableOffsetAutoReset; + private final int _offsetAutoResetOffsetThreshold; + private final long _offsetAutoResetTimeSecThreshold; + private final Map _streamConfigMap = new HashMap<>(); // Allow overriding it to use different offset criteria @@ -199,6 +203,10 @@ public StreamConfig(String tableNameWithType, Map streamConfigMa String rate = streamConfigMap.get(StreamConfigProperties.TOPIC_CONSUMPTION_RATE_LIMIT); _topicConsumptionRateLimit = rate != null ? Double.parseDouble(rate) : CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED; + _enableOffsetAutoReset = Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.ENABLE_OFFSET_AUTO_RESET)); + _offsetAutoResetOffsetThreshold = parseOffsetAutoResetOffsetThreshold(streamConfigMap); + _offsetAutoResetTimeSecThreshold = parseOffsetAutoResetTimeSecThreshold(streamConfigMap); + _streamConfigMap.putAll(streamConfigMap); } @@ -310,6 +318,34 @@ public static long extractFlushThresholdTimeMillis(Map streamCon } } + public static int parseOffsetAutoResetOffsetThreshold(Map streamConfigMap) { + String key = StreamConfigProperties.OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY; + String offsetAutoResetOffsetThresholdStr = streamConfigMap.get(key); + if (offsetAutoResetOffsetThresholdStr != null) { + try { + return Integer.parseInt(offsetAutoResetOffsetThresholdStr); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid config " + key + ": " + offsetAutoResetOffsetThresholdStr); + } + } else { + return -1; // Default value indicating disabled + } + } + + public static long parseOffsetAutoResetTimeSecThreshold(Map streamConfigMap) { + String key = StreamConfigProperties.OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY; + String offsetAutoResetTimeSecThresholdStr = streamConfigMap.get(key); + if (offsetAutoResetTimeSecThresholdStr != null) { + try { + return Long.parseLong(offsetAutoResetTimeSecThresholdStr); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid config " + key + ": " + offsetAutoResetTimeSecThresholdStr); + } + } else { + return -1; // Default value indicating disabled + } + } + public String getType() { return _type; } @@ -383,6 +419,18 @@ public Optional getTopicConsumptionRateLimit() { : Optional.of(_topicConsumptionRateLimit); } + public boolean isEnableOffsetAutoReset() { + return _enableOffsetAutoReset; + } + + public int getOffsetAutoResetOffsetThreshold() { + return _offsetAutoResetOffsetThreshold; + } + + public long getOffsetAutoResetTimeSecThreshold() { + return _offsetAutoResetTimeSecThreshold; + } + public String getTableNameWithType() { return _tableNameWithType; } @@ -402,7 +450,11 @@ public String toString() { + _flushThresholdTimeMillis + ", _flushThresholdSegmentSizeBytes=" + _flushThresholdSegmentSizeBytes + ", _flushThresholdVarianceFraction=" + _flushThresholdVarianceFraction + ", _flushAutotuneInitialRows=" + _flushAutotuneInitialRows + ", _groupId='" + _groupId + '\'' - + ", _topicConsumptionRateLimit=" + _topicConsumptionRateLimit + ", _streamConfigMap=" + _streamConfigMap + + ", _topicConsumptionRateLimit=" + _topicConsumptionRateLimit + + ", _enableOffsetAutoReset=" + _enableOffsetAutoReset + + ", _offsetAutoResetOffsetThreshold" + _offsetAutoResetOffsetThreshold + + ", _offSetAutoResetTimeSecThreshold" + _offsetAutoResetTimeSecThreshold + + ", _streamConfigMap=" + _streamConfigMap + ", _offsetCriteria=" + _offsetCriteria + ", _serverUploadToDeepStore=" + _serverUploadToDeepStore + '}'; } @@ -427,7 +479,10 @@ public boolean equals(Object o) { && Objects.equals(_consumerFactoryClassName, that._consumerFactoryClassName) && Objects.equals(_decoderClass, that._decoderClass) && Objects.equals(_decoderProperties, that._decoderProperties) && Objects.equals(_groupId, that._groupId) && Objects.equals(_streamConfigMap, that._streamConfigMap) && Objects.equals(_offsetCriteria, - that._offsetCriteria) && Objects.equals(_flushThresholdVarianceFraction, that._flushThresholdVarianceFraction); + that._offsetCriteria) && Objects.equals(_flushThresholdVarianceFraction, that._flushThresholdVarianceFraction) + && _enableOffsetAutoReset == that._enableOffsetAutoReset + && _offsetAutoResetOffsetThreshold == that._offsetAutoResetOffsetThreshold + && _offsetAutoResetTimeSecThreshold == that._offsetAutoResetTimeSecThreshold; } @Override @@ -436,6 +491,7 @@ public int hashCode() { _decoderProperties, _connectionTimeoutMillis, _fetchTimeoutMillis, _idleTimeoutMillis, _flushThresholdRows, _flushThresholdSegmentRows, _flushThresholdTimeMillis, _flushThresholdSegmentSizeBytes, _flushAutotuneInitialRows, _groupId, _topicConsumptionRateLimit, _streamConfigMap, _offsetCriteria, - _serverUploadToDeepStore, _flushThresholdVarianceFraction); + _serverUploadToDeepStore, _flushThresholdVarianceFraction, _offsetAutoResetOffsetThreshold, + _enableOffsetAutoReset, _offsetAutoResetTimeSecThreshold); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java index 4286adbbb278..79db54dd2882 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java @@ -143,6 +143,25 @@ private StreamConfigProperties() { public static final String PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS = "realtime.segment.pauseless.download.timeoutSeconds"; + /** + * Config used to enable offset auto reset during segment commit. + */ + public static final String ENABLE_OFFSET_AUTO_RESET = "realtime.segment.offsetAutoReset.enable"; + + /** + * During segment commit, the new segment startOffset would skip to the latest offset if thisValue is set as positive + * and (latestStreamOffset - latestIngestedOffset > thisValue) + */ + public static final String OFFSET_AUTO_RESET_OFFSET_THRESHOLD_KEY = + "realtime.segment.offsetAutoReset.offsetThreshold"; + + /** + * During segment commit, the new segment startOffset would skip to the latest offset if thisValue is set as positive + * and (latestStreamOffset's timestamp - latestIngestedOffset's timestamp > thisValue) + */ + public static final String OFFSET_AUTO_RESET_TIMESEC_THRESHOLD_KEY = + "realtime.segment.offsetAutoReset.timeThresholdSeconds"; + /** * Helper method to create a stream specific property */ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java index 66bf9768b5fc..6cf724b058b5 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeoutException; +import javax.annotation.Nullable; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.annotations.InterfaceStability; @@ -127,6 +128,11 @@ default Map getCurrentPartitionLagState( return result; } + @Nullable + default StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long timestampMillis, long timeoutMillis) { + return null; + } + /** * Fetches the list of available topics/streams *