[SPARK-55601][SS][FOLLOWUP] Cache offsetLog.getLatest() to avoid redundant ListStatus during MicroBatchExecution init#56054
Open
cloud-fan wants to merge 1 commit into
Open
Conversation
…ndant ListStatus during MicroBatchExecution init ### What changes were proposed in this pull request? Followup to apache#54373. SPARK-55601 added a new `offsetLog.getLatest()` call inside `logicalPlan`'s computation to derive `enforceNamed` from the last written offset log entry. `initializeExecution` already calls `offsetLog.getLatest()` on its first line. Both calls happen on the query thread during stream startup, with no offset log writes in between, so the two reads always return the same value. The second one is wasted work: each `getLatest()` triggers `listBatches` -> `HDFSMetadataLog.list` -> a filesystem `ListStatus` on the checkpoint's `offsets/` directory. This PR caches the first read in a `private lazy val initialLatestOffsetSeq` on `MicroBatchExecution` and routes both call sites through it: - `enforceNamed` derivation in `logicalPlan` lazy val. - `var latestStartedBatch` initialization in `initializeExecution`. Subsequent reads inside `initializeExecution` (after a `purgeAfter`) and in `populateStartOffsets` are unchanged -- those legitimately need fresh `getLatest()` results. ### Why are the changes needed? Avoids one redundant `ListStatus` on `<checkpoint>/offsets/` per stream startup. The cost is small but unnecessary, and downstream consumers that track per-checkpoint filesystem operations (for tracing, auditing, or test invariants) currently see one extra op against the offsets directory because of this duplication. ### Does this PR introduce _any_ user-facing change? No. Same behavior, fewer filesystem calls. ### How was this patch tested? Existing `MicroBatchExecutionSuite` and downstream streaming-startup tests cover both call sites. The change is a pure caching refactor; the cached value is identical to what a second `getLatest()` would return because nothing else writes the offset log between construction and `initializeExecution` on the query thread. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
Followup to #54373.
SPARK-55601 added a new
offsetLog.getLatest()call insidelogicalPlan's computation to deriveenforceNamedfrom the last written offset log entry.initializeExecutionalready callsoffsetLog.getLatest()on its first line. Both calls happen on the query thread during stream startup, with no offset log writes in between, so the two reads always return the same value. The second one is wasted work: eachgetLatest()triggerslistBatches→HDFSMetadataLog.list→ a filesystemListStatuson the checkpoint'soffsets/directory.This PR caches the first read in a
private lazy val initialLatestOffsetSeqonMicroBatchExecutionand routes both call sites through it:enforceNamedderivation inlogicalPlanlazy val.var latestStartedBatchinitialization ininitializeExecution.Subsequent reads inside
initializeExecution(after apurgeAfter) and inpopulateStartOffsetsare unchanged — those legitimately need freshgetLatest()results.Why are the changes needed?
Avoids one redundant
ListStatuson<checkpoint>/offsets/per stream startup. The cost is small but unnecessary, and downstream consumers that track per-checkpoint filesystem operations (for tracing, auditing, or test invariants) currently see one extra op against the offsets directory because of this duplication.Does this PR introduce any user-facing change?
No. Same behavior, fewer filesystem calls.
How was this patch tested?
Existing
MicroBatchExecutionSuiteand downstream streaming-startup tests cover both call sites. The change is a pure caching refactor; the cached value is identical to what a secondgetLatest()would return because nothing else writes the offset log between construction andinitializeExecutionon the query thread.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code