Skip to content

[SPARK-55601][SS][FOLLOWUP] Cache offsetLog.getLatest() to avoid redundant ListStatus during MicroBatchExecution init#56054

Open
cloud-fan wants to merge 1 commit into
apache:masterfrom
cloud-fan:SPARK-55601-followup
Open

[SPARK-55601][SS][FOLLOWUP] Cache offsetLog.getLatest() to avoid redundant ListStatus during MicroBatchExecution init#56054
cloud-fan wants to merge 1 commit into
apache:masterfrom
cloud-fan:SPARK-55601-followup

Conversation

@cloud-fan
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

Followup to #54373.

SPARK-55601 added a new offsetLog.getLatest() call inside logicalPlan's computation to derive enforceNamed from the last written offset log entry. initializeExecution already calls offsetLog.getLatest() on its first line. Both calls happen on the query thread during stream startup, with no offset log writes in between, so the two reads always return the same value. The second one is wasted work: each getLatest() triggers listBatchesHDFSMetadataLog.list → a filesystem ListStatus on the checkpoint's offsets/ directory.

This PR caches the first read in a private lazy val initialLatestOffsetSeq on MicroBatchExecution and routes both call sites through it:

  • enforceNamed derivation in logicalPlan lazy val.
  • var latestStartedBatch initialization in initializeExecution.

Subsequent reads inside initializeExecution (after a purgeAfter) and in populateStartOffsets are unchanged — those legitimately need fresh getLatest() results.

Why are the changes needed?

Avoids one redundant ListStatus on <checkpoint>/offsets/ per stream startup. The cost is small but unnecessary, and downstream consumers that track per-checkpoint filesystem operations (for tracing, auditing, or test invariants) currently see one extra op against the offsets directory because of this duplication.

Does this PR introduce any user-facing change?

No. Same behavior, fewer filesystem calls.

How was this patch tested?

Existing MicroBatchExecutionSuite and downstream streaming-startup tests cover both call sites. The change is a pure caching refactor; the cached value is identical to what a second getLatest() would return because nothing else writes the offset log between construction and initializeExecution on the query thread.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code

…ndant ListStatus during MicroBatchExecution init

### What changes were proposed in this pull request?

Followup to apache#54373.

SPARK-55601 added a new `offsetLog.getLatest()` call inside `logicalPlan`'s computation to derive `enforceNamed` from the last written offset log entry. `initializeExecution` already calls `offsetLog.getLatest()` on its first line. Both calls happen on the query thread during stream startup, with no offset log writes in between, so the two reads always return the same value. The second one is wasted work: each `getLatest()` triggers `listBatches` -> `HDFSMetadataLog.list` -> a filesystem `ListStatus` on the checkpoint's `offsets/` directory.

This PR caches the first read in a `private lazy val initialLatestOffsetSeq` on `MicroBatchExecution` and routes both call sites through it:

- `enforceNamed` derivation in `logicalPlan` lazy val.
- `var latestStartedBatch` initialization in `initializeExecution`.

Subsequent reads inside `initializeExecution` (after a `purgeAfter`) and in `populateStartOffsets` are unchanged -- those legitimately need fresh `getLatest()` results.

### Why are the changes needed?

Avoids one redundant `ListStatus` on `<checkpoint>/offsets/` per stream startup. The cost is small but unnecessary, and downstream consumers that track per-checkpoint filesystem operations (for tracing, auditing, or test invariants) currently see one extra op against the offsets directory because of this duplication.

### Does this PR introduce _any_ user-facing change?

No. Same behavior, fewer filesystem calls.

### How was this patch tested?

Existing `MicroBatchExecutionSuite` and downstream streaming-startup tests cover both call sites. The change is a pure caching refactor; the cached value is identical to what a second `getLatest()` would return because nothing else writes the offset log between construction and `initializeExecution` on the query thread.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code
Copy link
Copy Markdown
Contributor

@ericm-db ericm-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants