Skip to content

[Auto reset 1/3]Auto reset offset during ingestion lag#16492

Merged
Jackie-Jiang merged 5 commits into
apache:masterfrom
lnbest0707:upstream_fork_autoreset_1
Aug 15, 2025
Merged

[Auto reset 1/3]Auto reset offset during ingestion lag#16492
Jackie-Jiang merged 5 commits into
apache:masterfrom
lnbest0707:upstream_fork_autoreset_1

Conversation

@lnbest0707
Copy link
Copy Markdown
Contributor

@lnbest0707 lnbest0707 commented Aug 4, 2025

real-time ingestion feature
Part 1 of #15782
Issue #14815
Design doc https://docs.google.com/document/d/1NKPeNh6V2ctaQ4T_X3OKJ6Gcy5TRanLiU1uIDT8_9UA/edit?usp=sharing

Reset offset during ingestion lag. The change would only skip the offset by configs. The backfill on the interval would be introduced in a separate patch.
2 new and optional config could be used realtime.segment.offsetAutoReset.offsetThreshold and realtime.segment.offsetAutoReset.timeSecThreshold.
During segment commit time, if the lag (by offset or time) is over the threshold in the config, the new segment's startOffset would be "latest offset" instead of the "next offset".

@lnbest0707 lnbest0707 changed the title Auto reset offset during ingestion lag [Auto reset 1/3]Auto reset offset during ingestion lag Aug 5, 2025
@lnbest0707
Copy link
Copy Markdown
Contributor Author

@Jackie-Jiang @noob-se7en @KKcorps please help review, thanks.

@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Aug 5, 2025

Codecov Report

❌ Patch coverage is 65.67164% with 23 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.35%. Comparing base (584119f) to head (b96295c).
⚠️ Report is 1140 commits behind head on master.

Files with missing lines Patch % Lines
.../core/realtime/PinotLLCRealtimeSegmentManager.java 64.10% 10 Missing and 4 partials ⚠️
...java/org/apache/pinot/spi/stream/StreamConfig.java 82.60% 4 Missing ⚠️
...in/stream/kafka20/KafkaStreamMetadataProvider.java 0.00% 2 Missing ⚠️
...in/stream/kafka30/KafkaStreamMetadataProvider.java 0.00% 2 Missing ⚠️
...pache/pinot/spi/stream/StreamMetadataProvider.java 0.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #16492      +/-   ##
============================================
+ Coverage     63.22%   63.35%   +0.12%     
- Complexity     1362     1381      +19     
============================================
  Files          3012     3014       +2     
  Lines        174362   174721     +359     
  Branches      26698    26763      +65     
============================================
+ Hits         110247   110701     +454     
+ Misses        55691    55582     -109     
- Partials       8424     8438      +14     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.23% <65.67%> (+0.02%) ⬆️
java-21 63.33% <65.67%> (+0.13%) ⬆️
temurin 63.35% <65.67%> (+0.12%) ⬆️
unittests 63.35% <65.67%> (+0.12%) ⬆️
unittests1 56.41% <50.00%> (+0.06%) ⬆️
unittests2 33.41% <61.19%> (+0.08%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@Jackie-Jiang Jackie-Jiang added feature configuration Config changes (addition/deletion/change in behavior) ingestion Related to data ingestion pipeline real-time Related to realtime table ingestion and serving documentation Improvements or additions to documentation labels Aug 5, 2025
@Jackie-Jiang
Copy link
Copy Markdown
Contributor

@KKcorps @9aman @noob-se7en Can you help take a look?

Copy link
Copy Markdown
Contributor

@noob-se7en noob-se7en left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added few comments for first iteration of review

return latestOffset.toString();
}
if (offsetThreshold > 0
&& Long.valueOf(latestOffset.toString()) - Long.valueOf(nextOffset) > offsetThreshold) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same use .compareTo

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The StreamPartitionMsgOffset is Comparable but does not have a "subtract" method, any good way to achieve this?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compareTo returns the difference so we can compare that returned difference with the offsetThreshold.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I got your point but that is not the standard usage either and cannot work. The internal implementation is based on Long.compare

  public static int compare(long x, long y) {
        return (x < y) ? -1 : ((x == y) ? 0 : 1);
    }

which is not returning the real diff.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, my bad. Then i don't think there is a cleaner way.
Since we are only supporting LongMsgOffset here, let just check if object is instanceOf LongMsgOffset and use it's getOffset method.

@lnbest0707 lnbest0707 force-pushed the upstream_fork_autoreset_1 branch from b6d69b7 to 4081cf8 Compare August 6, 2025 22:46
@lnbest0707
Copy link
Copy Markdown
Contributor Author

Added few comments for first iteration of review

Thanks for the review. Resolved or replied to the comments.

@lnbest0707 lnbest0707 closed this Aug 6, 2025
@lnbest0707 lnbest0707 reopened this Aug 6, 2025
@lnbest0707 lnbest0707 force-pushed the upstream_fork_autoreset_1 branch from 2857cbd to ba437ce Compare August 7, 2025 21:51
String clientId =
PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
+ streamConfig.getTopicName();
StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this to a separate function getLargestOffset

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a util similar to

  @VisibleForTesting
  Set<Integer> getPartitionIds(StreamConfig streamConfig)
      throws Exception {
    String clientId = StreamConsumerFactory.getUniqueClientId(
        PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" + streamConfig.getTableNameWithType() + "-"
            + streamConfig.getTopicName());
    StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
    try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) {
      return metadataProvider.fetchPartitionIds(5000L);
    }
  }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already a separate method computeStartOffset, and we need to construct the consumer to get 2 offsets, "largest", "offsetAtSLA". If we want to extract, then that method needs to return a Pair of StreamPartitionMsgOffset or a wrapper class of 2. Feels it would be a overkill. It would be rare to get those 2 offsets in other usages.

Comment thread pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java Outdated

private String computeStartOffset(String nextOffset, StreamConfig streamConfig, int partitionId) {
if (!streamConfig.isEnableOffsetAutoReset()) {
return nextOffset;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we add a log here of skipping this ?
A change in streamConfig is not recorded and debugging old segment commits might get hard.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is enabled, the log

LOGGER.info("Latest offset of topic {} and partition {} is {}", streamConfig.getTopicName(), partitionId, latestOffset);

will be printed.
And right after this method,

LOGGER.info(
        "Creating segment ZK metadata for new CONSUMING segment: {} with start offset: {} and creation time: {}", segmentName, startOffset, creationTimeMs);

would be printed.
With the help of those 2 logs, we should be able to understand if autoReset is enabled or not.

}
return nextOffset;
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* Fetches the latest offset from the stream for a given partition.
* Extracted as a separate method for better testability and reusability.
*/
@VisibleForTesting
String fetchLatestOffset(StreamConfig streamConfig, int partitionId) throws Exception {
String clientId = StreamConsumerFactory.getUniqueClientId(
String.format("%s-%s-%s-latest-offset",
PinotLLCRealtimeSegmentManager.class.getSimpleName(),
streamConfig.getTableNameWithType(),
streamConfig.getTopicName())
);
StreamConsumerFactory consumerFactory = StreamConsumerFactoryProvider.create(streamConfig);
try (StreamMetadataProvider metadataProvider = consumerFactory.createStreamMetadataProvider(clientId)) {
StreamPartitionMsgOffset latestOffset = metadataProvider.fetchStreamPartitionOffset(
OffsetCriteria.LARGEST_OFFSET_CRITERIA,
STREAM_FETCH_TIMEOUT_MS
);
return latestOffset.toString();
}
}

Adapt as per need.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java Outdated
}
try {
if (timeThreshold > 0 && offsetAtSLA != null && offsetAtSLA.compareTo(nextOffsetWithType) < 0) {
LOGGER.info("Auto reset offset from {} to {} on partition {} because time threshold reached", nextOffset,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to log as warning given this can cause data loss. Same for line 1020

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the reset happens, users must have already set this up through the config explicitly. Then the happening of the reset is an "expected" behavior which I feel may not be good to be logged as warnings.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though this is "expected", it is definitely worth noting. We should also consider emitting a metric when offset reset happens because it means data loss

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, metrics is in plan. That part would be added afterwards.

Comment thread pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfigProperties.java Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

configuration Config changes (addition/deletion/change in behavior) documentation Improvements or additions to documentation ingestion Related to data ingestion pipeline real-time Related to realtime table ingestion and serving

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants