Skip to content

Realtime ingestion offset auto reset#15782

Closed
lnbest0707 wants to merge 6 commits intoapache:masterfrom
lnbest0707:upstream_fork_ingestion_freshness
Closed

Realtime ingestion offset auto reset#15782
lnbest0707 wants to merge 6 commits intoapache:masterfrom
lnbest0707:upstream_fork_ingestion_freshness

Conversation

@lnbest0707
Copy link
Copy Markdown
Contributor

@lnbest0707 lnbest0707 commented May 14, 2025

ingestion feature performance
Issue #14815
Design doc https://docs.google.com/document/d/1NKPeNh6V2ctaQ4T_X3OKJ6Gcy5TRanLiU1uIDT8_9UA/edit?usp=sharing

TLDR, for realtime ingestions, when the ingestion delay goes beyond a configurable threshold, it could skip the in between offsets and directly consume from the latest. The in-between offsets could then be backfilled by the plugins provided by the table owner.

Additionally, add some formal features/restrictions on multi-topics ingestion

  • Ensure each stream could use their own streamConfig instead of sharing the configs from the first streamConfig. Removing the streamConfigs.get(0) usage.
  • Does not allow duplicate stream (same stream type with same topic name)
  • [Tentative] Does not allow deleting topics. Deleting topics is not banned currently, but it is not a safe operation as segment metadata mapping would be messed.
  • Add a new type of typic, "ephemeral" topic, which could be removed. It could be used as ingestion backfill.
  • For ephemeral topic types in multi-topic ingestion, tune the segment naming format, so that we can safely delete them
  • The new naming format could be used for all types of topics in the future. So that we will be able to remove topics from multi-topics real-time table.

@lnbest0707 lnbest0707 force-pushed the upstream_fork_ingestion_freshness branch 4 times, most recently from 75e053c to 5cb11a1 Compare May 14, 2025 17:25
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 14, 2025

Codecov Report

❌ Patch coverage is 58.65385% with 172 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.21%. Comparing base (9e4bb2a) to head (c08e5b4).
⚠️ Report is 1188 commits behind head on master.

Files with missing lines Patch % Lines
...iodictask/RealtimeOffsetAutoResetKafkaHandler.java 0.00% 55 Missing ⚠️
.../core/realtime/PinotLLCRealtimeSegmentManager.java 69.16% 30 Missing and 7 partials ⚠️
...ler/validation/RealtimeOffsetAutoResetManager.java 70.53% 26 Missing and 7 partials ⚠️
.../org/apache/pinot/common/utils/LLCSegmentName.java 57.50% 13 Missing and 4 partials ⚠️
...apache/pinot/controller/BaseControllerStarter.java 37.50% 4 Missing and 1 partial ⚠️
...g/apache/pinot/spi/utils/IngestionConfigUtils.java 44.44% 3 Missing and 2 partials ⚠️
...a/manager/realtime/RealtimeSegmentDataManager.java 33.33% 4 Missing ⚠️
...he/pinot/segment/local/utils/TableConfigUtils.java 0.00% 4 Missing ⚠️
...ot/spi/stream/PartitionGroupConsumptionStatus.java 57.14% 3 Missing ⚠️
...in/stream/kafka20/KafkaStreamMetadataProvider.java 0.00% 2 Missing ⚠️
... and 4 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #15782      +/-   ##
============================================
- Coverage     63.23%   63.21%   -0.02%     
  Complexity     1362     1362              
============================================
  Files          3010     3012       +2     
  Lines        174269   174577     +308     
  Branches      26682    26726      +44     
============================================
+ Hits         110194   110362     +168     
- Misses        55659    55787     +128     
- Partials       8416     8428      +12     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.17% <58.65%> (-0.03%) ⬇️
java-21 63.19% <58.65%> (+6.83%) ⬆️
temurin 63.21% <58.65%> (-0.02%) ⬇️
unittests 63.21% <58.65%> (-0.02%) ⬇️
unittests1 56.37% <36.95%> (-0.03%) ⬇️
unittests2 33.32% <56.00%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@noob-se7en
Copy link
Copy Markdown
Contributor

@lnbest0707-uber can you please add description here. This is because I am also planning to work on offset reset problem.
Not sure if you are also trying to achieve the same thing.

@lnbest0707
Copy link
Copy Markdown
Contributor Author

@lnbest0707-uber can you please add description here. This is because I am also planning to work on offset reset problem. Not sure if you are also trying to achieve the same thing.

Thanks for sharing the interest. I've updated the description with the design doc.
I already have a working internal version and would publish it after some polish.
The feature should be extensible/pluggable for various implementation preferences.

@lnbest0707 lnbest0707 force-pushed the upstream_fork_ingestion_freshness branch from 5cb11a1 to 6a9067e Compare July 9, 2025 21:31
@lnbest0707 lnbest0707 changed the title [WIP]Offset auto reset Realtime ingestion offset auto reset Jul 9, 2025
@lnbest0707 lnbest0707 force-pushed the upstream_fork_ingestion_freshness branch 2 times, most recently from 5cecb91 to 436e5ad Compare July 9, 2025 22:02
@lnbest0707 lnbest0707 changed the title Realtime ingestion offset auto reset Offset auto reset Jul 10, 2025
@lnbest0707 lnbest0707 changed the title Offset auto reset Realtime ingestion offset auto reset Jul 10, 2025
@Jackie-Jiang Jackie-Jiang added feature ingestion Related to data ingestion pipeline performance Related to performance optimization real-time Related to realtime table ingestion and serving labels Jul 16, 2025
Copy link
Copy Markdown
Contributor

@deemoliu deemoliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to my curiosity, are there any place we can tune the periodically job frequency, thanks!


private void removeTopicFromTableConfig(String tableNameWithType, String topicName, TableConfig tableConfig) {
List<Map<String, String>> streamConfigMaps = IngestionConfigUtils.getStreamConfigMaps(tableConfig);
for (int i = 0; i < streamConfigMaps.size(); i++) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Major:
Q1: Does streamConfigMaps maintain the order?
Q2: I feel it might be a concurrent modification exception here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is ordered. And it is using the _pinotHelixResourceManager.setExistingTableConfig method to set the config. That method has built-in retry and further using the helix API to ensure concurrency control. IIUC, the concurrent modification should not be the issue.

*/
public class PartitionGroupConsumptionStatus {

private final String _topicName;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topicName seems to be a notNull value, let's verify this carefully in the integration tests.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do not use Nullable here. It should at least be empty string ""

@deemoliu
Copy link
Copy Markdown
Contributor

please address the conflict and rerun the intergation tests, thank you!

@lnbest0707 lnbest0707 force-pushed the upstream_fork_ingestion_freshness branch from 436e5ad to c434cf2 Compare July 30, 2025 04:58
@lnbest0707
Copy link
Copy Markdown
Contributor Author

please address the conflict and rerun the intergation tests, thank you!

@deemoliu Thanks for the review. Addressed the comments. The failing test looks to be flaky. Rerun twice and failed in different locations (the failed in 1st one succeed in 2nd run, but the 2nd run failed at the place where it succeeded in 1st)

Copy link
Copy Markdown
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split the PR into 2 parts:

  1. Automatically reset offset to largest offset
  2. Handle backfill

I'd also request higher standard on code review @deemoliu

@KKcorps @noob-se7en Please also take a look

protected PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
protected PinotHelixResourceManager _pinotHelixResourceManager;

public RealtimeOffsetAutoResetHandler(PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest making this an interface, and add an init() to initialize it

import org.slf4j.LoggerFactory;


public abstract class RealtimeOffsetAutoResetKafkaHandler extends RealtimeOffsetAutoResetHandler {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this PR half implemented? Seems there is no concrete implementation of RealtimeOffsetAutoResetHandler?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are internally using a Kafka replication based implementation (refer to the attached design doc). That one uses the Kafka ecosystem APIs to trigger the backfill. That particular implementation is pretty "internal". That's the reason I make it pluggable here.

}
try {
TableConfig currentTableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType);
addNewTopicToTableConfig(newTopicStreamConfig, currentTableConfig);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow this backfill logic. Is the idea to use a different Kafka topic to backfill the data? Some comments will help explain

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it is. The design doc explains it. I would also add comments.

public static final int DEFAULT_RETENTION_MANAGER_FREQUENCY_IN_SECONDS = 6 * 60 * 60; // 6 Hours.
public static final int DEFAULT_OFFLINE_SEGMENT_INTERVAL_CHECKER_FREQUENCY_IN_SECONDS = 24 * 60 * 60; // 24 Hours.
public static final int DEFAULT_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
public static final int DEFAULT_OFFSET_AUTO_RESET_FREQUENCY_IN_SECONDS = 60 * 60; // 1 Hour.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this is an opt-in feature, I'd suggest disable the manager by default until it is tested in production

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I can make the manager to be enabled by a config.

// resetOffsetTopicPartition=0
// resetOffsetFrom=0
// resetOffsetTo=1000
if (periodicTaskProperties.containsKey(context._backfillJobPropertyKeys.toArray()[0])) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you trying to do periodicTaskProperties.containsKey(Constants.RESET_OFFSET_TOPIC_NAME)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here you need to ensure all the keys exist instead, or it will throw NPE on line 121

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't follow the logic. For periodic task, periodicTaskProperties applies to all tables. How do you use it to setup the topic info for one single partition?

Copy link
Copy Markdown
Contributor Author

@lnbest0707 lnbest0707 Aug 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For periodic task, periodicTaskProperties applies to all tables. How do you use it to setup the topic info for one single partition?

Not sure if I understand correctly.
When the task is triggered by the "periodic", the periodicTaskProperties is empty and the manager/handler would try to check if the registered backfill job is running or complete. If complete, it would remove the backfill topic.
When the task is triggered by PinotLLCRealtimeSegmentManager during segment commit, then periodicTaskProperties would have required info to "trigger" the backfill.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to trigger this manually only? From periodic trigger, context._shouldTriggerBackfillJobs will be false, and the task becomes no-op. We shouldn't even scan tables

LOGGER.info("Trigger backfill jobs with StreamConfig {}, topicName {}, properties {}",
topicStreamConfig, topicName, context._backfillJobProperties);
try {
_tableToHandler.get(tableNameWithType).triggerBackfillJob(tableNameWithType,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should also check the return value of triggerBackfillJob()

Comment on lines +121 to +123
Integer.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)),
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)),
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Integer.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)),
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)),
Long.valueOf(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO)));
Integer.parseInt(context._backfillJobProperties.get(Constants.RESET_OFFSET_TOPIC_PARTITION)),
Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_FROM)),
Long.parseLong(context._backfillJobProperties.get(Constants.RESET_OFFSET_TO)));

}
}

@Override
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also be added to kafka30

return result;
}

default StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long timestampMillis) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Annotate it with @Nullable

this(tableName, "", partitionGroupId, sequenceNumber, msSinceEpoch);
}

public LLCSegmentName(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(CRITICAL) This could cause incompatibility across versions. New controller will persist segment names not parseable by the old brokers/servers

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point and it requires reverse update sequence (if this introduced feature is already in use).

The update is useful even for normal multi-topic ingestions. With that, we will be able to remove topics from existing table. We've seen a lot of such request from our running production.

Please also suggest any better or more graceful methods. Thanks

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good change, but needs to follow one of the 2 ways:

  • Introduce it in 2 releases
    • First release add all handling of topic with backward compatibility to old segment name but not change the segment name
    • Second release change the segment name after all nodes can already handle it
  • Add a flag to control the segment name format (e.g. one cluster config + per table override in table config), and by default using the existing name format

I prefer the second one for faster rolling out + not changing existing behavior. Making segment name longer has side effect of larger ZNode size, and will cause Pinot to hit scale limit of ZK faster. For single topic case we might still want to stick with the current format.

Given this change itself is very critical, we should split it out as a separate PR

@lnbest0707
Copy link
Copy Markdown
Contributor Author

Can we split the PR into 2 parts:

  1. Automatically reset offset to largest offset
  2. Handle backfill

@KKcorps @noob-se7en Please also take a look

Thanks for the review.
Wanna clarify about the "split", the PR itself is right now with only interfaces for backfill. The change that "matters" in other perspective is to introduce the "backfill" type of topic (different naming method). Do you suggest making that part independent or making the new manager independent?

@Jackie-Jiang
Copy link
Copy Markdown
Contributor

Thanks for the review. Wanna clarify about the "split", the PR itself is right now with only interfaces for backfill. The change that "matters" in other perspective is to introduce the "backfill" type of topic (different naming method). Do you suggest making that part independent or making the new manager independent?

This PR covers multiple topics, which makes it hard to review (reviewer can miss critical pieces). We can split it into 3 parts so that reviewer can focus on one topic at a time:

  1. Change segment name format (this is super critical, and can cause all RT table to fail during upgrade if not handled properly)
  2. Add auto reset support (detect lag and reset to latest offset without backfill)
  3. Add backfill support

@lnbest0707
Copy link
Copy Markdown
Contributor Author

  1. Change segment name format (this is super critical, and can cause all RT table to fail during upgrade if not handled properly)
  2. Add auto reset support (detect lag and reset to latest offset without backfill)
  3. Add backfill support

SG. Have created

Please review and let me know the thoughts. Thanks @Jackie-Jiang

@deeppatel710
Copy link
Copy Markdown
Contributor

deeppatel710 commented Apr 8, 2026

Hi @Jackie-Jiang, my PR #18127 addresses your review feedback that wasn't picked up when the feature was split and merged as #16492, #16692, and #16724.

Specifically:

  • RealtimeOffsetAutoResetHandler still had redundant public abstract modifiers and its init() javadoc incorrectly said "called once in constructor" instead of
    "called once after instantiation"
  • RealtimeOffsetAutoResetKafkaHandler still had constructor injection — init() is now the sole initialization path, called by the manager after no-arg
    reflective instantiation
  • ensureBackfillJobsRunning had a List / Collection mismatch between the interface and implementation
  • nonLeaderCleanup() wasn't clearing _tableBackfillTopics, leaving stale state when the controller re-acquires leadership

Would appreciate a review when you get a chance. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ingestion Related to data ingestion pipeline performance Related to performance optimization real-time Related to realtime table ingestion and serving

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants