-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[Auto reset 1/3]Auto reset offset during ingestion lag #16492
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -171,6 +171,8 @@ public class PinotLLCRealtimeSegmentManager { | |||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| // Max time to wait for all LLC segments to complete committing their metadata while stopping the controller. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private static final long MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS = 30_000L; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // Timeout for calling stream metadata provider APIs | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private static final long STREAM_FETCH_TIMEOUT_MS = 5_000L; | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO: make this configurable with default set to 10 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -931,7 +933,11 @@ private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig st | |||||||||||||||||||||||||||||||||||||||||||||||||
| int numReplicas) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| String realtimeTableName = tableConfig.getTableName(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| String segmentName = newLLCSegmentName.getSegmentName(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| String startOffset = committingSegmentDescriptor.getNextOffset(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| // Handle offset auto reset | ||||||||||||||||||||||||||||||||||||||||||||||||||
| String nextOffset = committingSegmentDescriptor.getNextOffset(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| String startOffset = computeStartOffset( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| nextOffset, streamConfig, newLLCSegmentName.getPartitionGroupId()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.info( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| "Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -962,6 +968,65 @@ private void createNewSegmentZKMetadata(TableConfig tableConfig, StreamConfig st | |||||||||||||||||||||||||||||||||||||||||||||||||
| persistSegmentZKMetadata(realtimeTableName, newSegmentZKMetadata, -1); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| private String computeStartOffset(String nextOffset, StreamConfig streamConfig, int partitionId) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!streamConfig.isEnableOffsetAutoReset()) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return nextOffset; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| long timeThreshold = streamConfig.getOffsetAutoResetTimeSecThreshold(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| int offsetThreshold = streamConfig.getOffsetAutoResetOffsetThreshold(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (timeThreshold <= 0 && offsetThreshold <= 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.warn("Invalid offset auto reset configuration for table: {}, topic: {}. " | ||||||||||||||||||||||||||||||||||||||||||||||||||
| + "timeThreshold: {}, offsetThreshold: {}", | ||||||||||||||||||||||||||||||||||||||||||||||||||
| streamConfig.getTableNameWithType(), streamConfig.getTopicName(), timeThreshold, offsetThreshold); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return nextOffset; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| String clientId = getTableTopicUniqueClientId(streamConfig); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we move this to a separate function
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a util similar to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is already a separate method |
||||||||||||||||||||||||||||||||||||||||||||||||||
| StreamPartitionMsgOffsetFactory offsetFactory = consumerFactory.createStreamMsgOffsetFactory(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| StreamPartitionMsgOffset nextOffsetWithType = offsetFactory.create(nextOffset); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| StreamPartitionMsgOffset offsetAtSLA = null; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| StreamPartitionMsgOffset latestOffset; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| try (StreamMetadataProvider metadataProvider = consumerFactory.createPartitionMetadataProvider(clientId, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| partitionId)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // Fetching timestamp from an offset is an expensive operation which requires reading the data, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // while fetching offset from timestamp is lightweight and only needs to read metadata. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // Hence, instead of checking if latestOffset's time - nextOffset's time < SLA, we would check | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // (CurrentTime - SLA)'s offset > nextOffset. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // TODO: it is relying on System.currentTimeMillis() which might be affected by time drift. If we are able to | ||||||||||||||||||||||||||||||||||||||||||||||||||
| // get nextOffset's time, we should instead check (nextOffset's time + SLA)'s offset < latestOffset | ||||||||||||||||||||||||||||||||||||||||||||||||||
| latestOffset = metadataProvider.fetchStreamPartitionOffset( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| OffsetCriteria.LARGEST_OFFSET_CRITERIA, STREAM_FETCH_TIMEOUT_MS); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.info("Latest offset of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| latestOffset); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (timeThreshold > 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetAtSLA = | ||||||||||||||||||||||||||||||||||||||||||||||||||
| metadataProvider.getOffsetAtTimestamp(partitionId, System.currentTimeMillis() - timeThreshold * 1000, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| STREAM_FETCH_TIMEOUT_MS); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.info("Offset at SLA of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| offsetAtSLA); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.warn("Not able to fetch the offset metadata, skip auto resetting offsets", e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return nextOffset; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (timeThreshold > 0 && offsetAtSLA != null && offsetAtSLA.compareTo(nextOffsetWithType) < 0) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.info("Auto reset offset from {} to {} on partition {} because time threshold reached", nextOffset, | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably want to log as warning given this can cause data loss. Same for line 1020
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the reset happens, users must have already set this up through the config explicitly. Then the happening of the reset is an "expected" behavior which I feel may not be good to be logged as warnings.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even though this is "expected", it is definitely worth noting. We should also consider emitting a metric when offset reset happens because it means data loss
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, metrics is in plan. That part would be added afterwards. |
||||||||||||||||||||||||||||||||||||||||||||||||||
| latestOffset, partitionId); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return latestOffset.toString(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| if (offsetThreshold > 0 | ||||||||||||||||||||||||||||||||||||||||||||||||||
| && Long.parseLong(latestOffset.toString()) - Long.parseLong(nextOffset) > offsetThreshold) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.info("Auto reset offset from {} to {} on partition {} because number of offsets threshold reached", | ||||||||||||||||||||||||||||||||||||||||||||||||||
| nextOffset, latestOffset, partitionId); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return latestOffset.toString(); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| LOGGER.warn("Not able to compare the offsets, skip auto resetting offsets", e); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return nextOffset; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Adapt as per need.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as #16492 (comment) |
||||||||||||||||||||||||||||||||||||||||||||||||||
| @Nullable | ||||||||||||||||||||||||||||||||||||||||||||||||||
| private SegmentPartitionMetadata getPartitionMetadataFromTableConfig(TableConfig tableConfig, int partitionId, | ||||||||||||||||||||||||||||||||||||||||||||||||||
| int numPartitionGroups) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -1010,16 +1075,20 @@ public long getCommitTimeoutMS(String realtimeTableName) { | |||||||||||||||||||||||||||||||||||||||||||||||||
| return commitTimeoutMS; | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| private String getTableTopicUniqueClientId(StreamConfig streamConfig) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return StreamConsumerFactory.getUniqueClientId( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" | ||||||||||||||||||||||||||||||||||||||||||||||||||
| + streamConfig.getTopicName()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * Fetches the partition ids for the stream. Some stream (e.g. Kinesis) might not support this operation, in which | ||||||||||||||||||||||||||||||||||||||||||||||||||
| * case exception will be thrown. | ||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||
| @VisibleForTesting | ||||||||||||||||||||||||||||||||||||||||||||||||||
| Set<Integer> getPartitionIds(StreamConfig streamConfig) | ||||||||||||||||||||||||||||||||||||||||||||||||||
| throws Exception { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| String clientId = StreamConsumerFactory.getUniqueClientId( | ||||||||||||||||||||||||||||||||||||||||||||||||||
| PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" | ||||||||||||||||||||||||||||||||||||||||||||||||||
| + streamConfig.getTopicName()); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| String clientId = getTableTopicUniqueClientId(streamConfig); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig); | ||||||||||||||||||||||||||||||||||||||||||||||||||
| try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) { | ||||||||||||||||||||||||||||||||||||||||||||||||||
| return metadataProvider.fetchPartitionIds(5000L); | ||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Should we add a log here of skipping this ?
A change in streamConfig is not recorded and debugging old segment commits might get hard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it is enabled, the log
will be printed.
And right after this method,
would be printed.
With the help of those 2 logs, we should be able to understand if autoReset is enabled or not.