Realtime ingestion offset auto reset#15782
Conversation
75e053c to
5cb11a1
Compare
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #15782 +/- ##
============================================
- Coverage 63.23% 63.21% -0.02%
Complexity 1362 1362
============================================
Files 3010 3012 +2
Lines 174269 174577 +308
Branches 26682 26726 +44
============================================
+ Hits 110194 110362 +168
- Misses 55659 55787 +128
- Partials 8416 8428 +12
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
@lnbest0707-uber can you please add description here. This is because I am also planning to work on offset reset problem. |
Thanks for sharing the interest. I've updated the description with the design doc. |
5cb11a1 to
6a9067e
Compare
5cecb91 to
436e5ad
Compare
deemoliu
left a comment
There was a problem hiding this comment.
to my curiosity, are there any place we can tune the periodically job frequency, thanks!
|
|
||
| private void removeTopicFromTableConfig(String tableNameWithType, String topicName, TableConfig tableConfig) { | ||
| List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig); | ||
| for (int i = 0; i < streamConfigMaps.size(); i++) { |
There was a problem hiding this comment.
Major:
Q1: Does streamConfigMaps maintain the order?
Q2: I feel it might be a concurrent modification exception here.
There was a problem hiding this comment.
It is ordered. And it is using the _pinotHelixResourceManager.setExistingTableConfig method to set the config. That method has built-in retry and further using the helix API to ensure concurrency control. IIUC, the concurrent modification should not be the issue.
| */ | ||
| public class PartitionGroupConsumptionStatus { | ||
|
|
||
| private final String _topicName; |
There was a problem hiding this comment.
topicName seems to be a notNull value, let's verify this carefully in the integration tests.
There was a problem hiding this comment.
Yes, we do not use Nullable here. It should at least be empty string ""
|
please address the conflict and rerun the intergation tests, thank you! |
436e5ad to
c434cf2
Compare
@deemoliu Thanks for the review. Addressed the comments. The failing test looks to be flaky. Rerun twice and failed in different locations (the failed in 1st one succeed in 2nd run, but the 2nd run failed at the place where it succeeded in 1st) |
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Can we split the PR into 2 parts:
- Automatically reset offset to largest offset
- Handle backfill
I'd also request higher standard on code review @deemoliu
@KKcorps @noob-se7en Please also take a look
| protected PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager; | ||
| protected PinotHelixResourceManager _pinotHelixResourceManager; | ||
|
|
||
| public RealtimeOffsetAutoResetHandler(PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager, |
There was a problem hiding this comment.
Suggest making this an interface, and add an init() to initialize it
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
||
| public abstract class RealtimeOffsetAutoResetKafkaHandler extends RealtimeOffsetAutoResetHandler { |
There was a problem hiding this comment.
Is this PR half implemented? Seems there is no concrete implementation of RealtimeOffsetAutoResetHandler?
There was a problem hiding this comment.
We are internally using a Kafka replication based implementation (refer to the attached design doc). That one uses the Kafka ecosystem APIs to trigger the backfill. That particular implementation is pretty "internal". That's the reason I make it pluggable here.
| } | ||
| try { | ||
| TableConfig currentTableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); | ||
| addNewTopicToTableConfig(newTopicStreamConfig, currentTableConfig); |
There was a problem hiding this comment.
I don't follow this backfill logic. Is the idea to use a different Kafka topic to backfill the data? Some comments will help explain
There was a problem hiding this comment.
Yes it is. The design doc explains it. I would also add comments.
| public static final int DEFAULT_RETENTION_MANAGER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours. | ||
| public static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; // 24 Hours. | ||
| public static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. | ||
| public static final int DEFAULT_OFFSET_AUTO_RESET_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour. |
There was a problem hiding this comment.
Given this is an opt-in feature, I'd suggest disable the manager by default until it is tested in production
There was a problem hiding this comment.
Good point, I can make the manager to be enabled by a config.
| // resetOffsetTopicPartition=0 | ||
| // resetOffsetFrom=0 | ||
| // resetOffsetTo=1000 | ||
| if (periodicTaskProperties.containsKey(context._backfillJobPropertyKeys.toArray()[0])) { |
There was a problem hiding this comment.
Are you trying to do periodicTaskProperties.containsKey(Constants.RESET_OFFSET_TOPIC_NAME)?
There was a problem hiding this comment.
I think here you need to ensure all the keys exist instead, or it will throw NPE on line 121
There was a problem hiding this comment.
I still don't follow the logic. For periodic task, periodicTaskProperties applies to all tables. How do you use it to setup the topic info for one single partition?
There was a problem hiding this comment.
For periodic task, periodicTaskProperties applies to all tables. How do you use it to setup the topic info for one single partition?
Not sure if I understand correctly.
When the task is triggered by the "periodic", the periodicTaskProperties is empty and the manager/handler would try to check if the registered backfill job is running or complete. If complete, it would remove the backfill topic.
When the task is triggered by PinotLLCRealtimeSegmentManager during segment commit, then periodicTaskProperties would have required info to "trigger" the backfill.
There was a problem hiding this comment.
Do you want to trigger this manually only? From periodic trigger, context._shouldTriggerBackfillJobs will be false, and the task becomes no-op. We shouldn't even scan tables
| LOGGER.info("Trigger backfill jobs with StreamConfig {}, topicName {}, properties {}", | ||
| topicStreamConfig, topicName, context._backfillJobProperties); | ||
| try { | ||
| _tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType, |
There was a problem hiding this comment.
You should also check the return value of triggerBackfillJob()
| Integer.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)), | ||
| Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)), | ||
| Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO))); |
There was a problem hiding this comment.
| Integer.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)), | |
| Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)), | |
| Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO))); | |
| Integer.parseInt(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)), | |
| Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)), | |
| Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO))); |
| } | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
This should also be added to kafka30
| return result; | ||
| } | ||
|
|
||
| default StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long timestampMillis) { |
There was a problem hiding this comment.
Annotate it with @Nullable
| this(tableName, "", partitionGroupId, sequenceNumber, msSinceEpoch); | ||
| } | ||
|
|
||
| public LLCSegmentName( |
There was a problem hiding this comment.
(CRITICAL) This could cause incompatibility across versions. New controller will persist segment names not parseable by the old brokers/servers
There was a problem hiding this comment.
This is a good point and it requires reverse update sequence (if this introduced feature is already in use).
The update is useful even for normal multi-topic ingestions. With that, we will be able to remove topics from existing table. We've seen a lot of such request from our running production.
Please also suggest any better or more graceful methods. Thanks
There was a problem hiding this comment.
This is a good change, but needs to follow one of the 2 ways:
- Introduce it in 2 releases
- First release add all handling of
topicwith backward compatibility to old segment name but not change the segment name - Second release change the segment name after all nodes can already handle it
- First release add all handling of
- Add a flag to control the segment name format (e.g. one cluster config + per table override in table config), and by default using the existing name format
I prefer the second one for faster rolling out + not changing existing behavior. Making segment name longer has side effect of larger ZNode size, and will cause Pinot to hit scale limit of ZK faster. For single topic case we might still want to stick with the current format.
Given this change itself is very critical, we should split it out as a separate PR
Thanks for the review. |
This PR covers multiple topics, which makes it hard to review (reviewer can miss critical pieces). We can split it into 3 parts so that reviewer can focus on one topic at a time:
|
SG. Have created
Please review and let me know the thoughts. Thanks @Jackie-Jiang |
|
Hi @Jackie-Jiang, my PR #18127 addresses your review feedback that wasn't picked up when the feature was split and merged as #16492, #16692, and #16724. Specifically:
Would appreciate a review when you get a chance. Thanks! |
ingestionfeatureperformanceIssue #14815
Design doc https://docs.google.com/document/d/1NKPeNh6V2ctaQ4T_X3OKJ6Gcy5TRanLiU1uIDT8_9UA/edit?usp=sharing
TLDR, for realtime ingestions, when the ingestion delay goes beyond a configurable threshold, it could skip the in between offsets and directly consume from the latest. The in-between offsets could then be backfilled by the plugins provided by the table owner.
Additionally, add some formal features/restrictions on multi-topics ingestion
streamConfigs.get(0)usage.