[Auto reset 1/3]Auto reset offset during ingestion lag#16492
Conversation
|
@Jackie-Jiang @noob-se7en @KKcorps please help review, thanks. |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #16492 +/- ##
============================================
+ Coverage 63.22% 63.35% +0.12%
- Complexity 1362 1381 +19
============================================
Files 3012 3014 +2
Lines 174362 174721 +359
Branches 26698 26763 +65
============================================
+ Hits 110247 110701 +454
+ Misses 55691 55582 -109
- Partials 8424 8438 +14
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@KKcorps @9aman @noob-se7en Can you help take a look? |
noob-se7en
left a comment
There was a problem hiding this comment.
Added few comments for first iteration of review
| return latestOffset.toString(); | ||
| } | ||
| if (offsetThreshold > 0 | ||
| && Long.valueOf(latestOffset.toString()) - Long.valueOf(nextOffset) > offsetThreshold) { |
There was a problem hiding this comment.
The StreamPartitionMsgOffset is Comparable but does not have a "subtract" method, any good way to achieve this?
There was a problem hiding this comment.
compareTo returns the difference so we can compare that returned difference with the offsetThreshold.
There was a problem hiding this comment.
I got your point but that is not the standard usage either and cannot work. The internal implementation is based on Long.compare
public static int compare(long x, long y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
which is not returning the real diff.
There was a problem hiding this comment.
I see, my bad. Then i don't think there is a cleaner way.
Since we are only supporting LongMsgOffset here, let just check if object is instanceOf LongMsgOffset and use it's getOffset method.
b6d69b7 to
4081cf8
Compare
Thanks for the review. Resolved or replied to the comments. |
2857cbd to
ba437ce
Compare
| String clientId = | ||
| PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-" | ||
| + streamConfig.getTopicName(); | ||
| StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig); |
There was a problem hiding this comment.
Can we move this to a separate function getLargestOffset
There was a problem hiding this comment.
Use a util similar to
@VisibleForTesting
Set<Integer> getPartitionIds(StreamConfig streamConfig)
throws Exception {
String clientId = StreamConsumerFactory.getUniqueClientId(
PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
+ streamConfig.getTopicName());
StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) {
return metadataProvider.fetchPartitionIds(5000L);
}
}
There was a problem hiding this comment.
This is already a separate method computeStartOffset, and we need to construct the consumer to get 2 offsets, "largest", "offsetAtSLA". If we want to extract, then that method needs to return a Pair of StreamPartitionMsgOffset or a wrapper class of 2. Feels it would be a overkill. It would be rare to get those 2 offsets in other usages.
|
|
||
| private String computeStartOffset(String nextOffset, StreamConfig streamConfig, int partitionId) { | ||
| if (!streamConfig.isEnableOffsetAutoReset()) { | ||
| return nextOffset; |
There was a problem hiding this comment.
nit: Should we add a log here of skipping this ?
A change in streamConfig is not recorded and debugging old segment commits might get hard.
There was a problem hiding this comment.
If it is enabled, the log
LOGGER.info("Latest offset of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId, latestOffset);
will be printed.
And right after this method,
LOGGER.info(
"Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", segmentName, startOffset, creationTimeMs);
would be printed.
With the help of those 2 logs, we should be able to understand if autoReset is enabled or not.
| } | ||
| return nextOffset; | ||
| } | ||
|
|
There was a problem hiding this comment.
| /** | |
| * Fetches the latest offset from the stream for a given partition. | |
| * Extracted as a separate method for better testability and reusability. | |
| */ | |
| @VisibleForTesting | |
| String fetchLatestOffset(StreamConfig streamConfig, int partitionId) throws Exception { | |
| String clientId = StreamConsumerFactory.getUniqueClientId( | |
| String.format("%s-%s-%s-latest-offset", | |
| PinotLLCRealtimeSegmentManager.class.getSimpleName(), | |
| streamConfig.getTableNameWithType(), | |
| streamConfig.getTopicName()) | |
| ); | |
| StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig); | |
| try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) { | |
| StreamPartitionMsgOffset latestOffset = metadataProvider.fetchStreamPartitionOffset( | |
| OffsetCriteria.LARGEST_OFFSET_CRITERIA, | |
| STREAM_FETCH_TIMEOUT_MS | |
| ); | |
| return latestOffset.toString(); | |
| } | |
| } |
Adapt as per need.
| } | ||
| try { | ||
| if (timeThreshold > 0 && offsetAtSLA != null && offsetAtSLA.compareTo(nextOffsetWithType) < 0) { | ||
| LOGGER.info("Auto reset offset from {} to {} on partition {} because time threshold reached", nextOffset, |
There was a problem hiding this comment.
We probably want to log as warning given this can cause data loss. Same for line 1020
There was a problem hiding this comment.
If the reset happens, users must have already set this up through the config explicitly. Then the happening of the reset is an "expected" behavior which I feel may not be good to be logged as warnings.
There was a problem hiding this comment.
Even though this is "expected", it is definitely worth noting. We should also consider emitting a metric when offset reset happens because it means data loss
There was a problem hiding this comment.
Yes, metrics is in plan. That part would be added afterwards.
real-timeingestionfeaturePart 1 of #15782
Issue #14815
Design doc https://docs.google.com/document/d/1NKPeNh6V2ctaQ4T_X3OKJ6Gcy5TRanLiU1uIDT8_9UA/edit?usp=sharing
Reset offset during ingestion lag. The change would only skip the offset by configs. The backfill on the interval would be introduced in a separate patch.
2 new and optional config could be used
realtime.segment.offsetAutoReset.offsetThresholdandrealtime.segment.offsetAutoReset.timeSecThreshold.During segment commit time, if the lag (by offset or time) is over the threshold in the config, the new segment's startOffset would be "latest offset" instead of the "next offset".