[WIP][SS] Stop dropping records with same event time when watermark delay is 0#56043
Draft
eason-yuchen-liu wants to merge 1 commit into
Draft
[WIP][SS] Stop dropping records with same event time when watermark delay is 0#56043eason-yuchen-liu wants to merge 1 commit into
eason-yuchen-liu wants to merge 1 commit into
Conversation
…elay is 0 When `withWatermark(col, "0 seconds")` is used, Spark's late-event predicate `event_time_us <= watermark_ms * 1000` treats records whose event time equals the previous batch's max event time as late and drops them. Two records with the same event time are then handled differently based on which batch carried them: the one that produced the max event time is kept, the one that arrives in the next batch is dropped. This change bumps a configured delay of 0 to 1 ms internally so the comparison becomes strict in practice and same-event-time records are no longer discriminated against. A warning is logged when the bump activates. Gated by a new default-on SQL conf `spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs`. Non-zero delays are unaffected. Added an `EventTimeWatermarkSuite` test that exercises both the new default and the legacy path. Existing tests in `MultiStatefulOperatorsSuite` that encode the legacy 0-delay boundary in their expected output are wrapped in `withSQLConf(... -> "false")` so they continue to validate that path.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
When a user configures
withWatermark(col, "0 seconds")(or any other zero-length interval), every record whose event time equals the current watermark is dropped by the late-event filter. This change bumps a configured delay of0to1 msinternally so that records sharing the latest event time across batches are no longer dropped, and logs a one-line warning at the call site so the user is aware. The bump is gated by a new default-on SQL confspark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs; set it tofalseto restore the pre-Spark 5.0 behavior. Non-zero delays are not affected.Why are the changes needed?
In microbatch mode the watermark is computed at the end of each batch as
max(event_time) - delayand used to filter late events in the next batch with the predicateevent_time_us <= watermark_ms * 1000. With a delay of0this means the predicate is satisfied (and the row is dropped) whenever an incoming record's event time equals the previous batch's maximum event time — even though that record is no more "late" than the one that produced the maximum in the first place. The effect is that two records with identical event times are treated differently solely based on which batch they happen to land in: the record that produced the max is kept, the record that arrives in the next batch is dropped.In other words, with a 0-second watermark, rows that share the exact same event time are discriminated against based on batch arrival. This fix removes that asymmetry by ensuring the effective delay is at least one millisecond, which makes the late-event comparison strict in practice (
event_time_us < watermark_ms * 1000).Does this PR introduce any user-facing change?
Yes. When
withWatermark(col, "0 seconds")is used (or any other zero-length interval):The change is gated by
spark.sql.streaming.eventTimeWatermark.bumpZeroDelayToOneMs, which defaults totrue. Set it tofalseto recover the prior behavior. Non-zero delays are unaffected.How was this patch tested?
Added
EventTimeWatermarkSuite."zero-delay watermark keeps records at max event time across batches"which exercises both the new default (bumpZeroDelayToOneMs=true) and the legacy path (bumpZeroDelayToOneMs=false).A handful of existing
MultiStatefulOperatorsSuitetests encode the legacy 0-delay boundary in their expected watermark/output and have been wrapped inwithSQLConf(... -> "false")so they continue to validate that path.EventTimeWatermarkSuite,MultiStatefulOperatorsSuite,StreamingJoinSuite,FlatMapGroupsWithStateSuite, andStreamingQueryOptimizationCorrectnessSuiteall pass locally.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Anthropic, claude-opus-4-7), with human review.