[Drain] OnTimer - propagate caused by drain bit up to DoFnRunner#37012
[Drain] OnTimer - propagate caused by drain bit up to DoFnRunner#37012kennknowles merged 2 commits intoapache:masterfrom
Conversation
Summary of ChangesHello @stankiewicz, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enhances the Apache Beam Java SDK's timer mechanism by introducing a Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
0dbde3d to
69b077f
Compare
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions. |
69b077f to
9ff470a
Compare
2d77410 to
2612296
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #37012 +/- ##
=========================================
Coverage 40.06% 40.06%
Complexity 3404 3404
=========================================
Files 1177 1177
Lines 187083 187083
Branches 3581 3581
=========================================
+ Hits 74947 74953 +6
+ Misses 108744 108739 -5
+ Partials 3392 3391 -1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
2612296 to
bf072d2
Compare
bf072d2 to
602243b
Compare
| .setPaneInfo(currentTimer.getPaneInfo()) | ||
| .setCausedByDrain(causedByDrain) | ||
| .setReceiver( | ||
| windowedValue -> |
There was a problem hiding this comment.
@kennknowles , ptal.
causedBy drain will be part of windowedValue but because receiver is lambda from WV to context.outputWindowedValue, I'm pretty sure I'm loosing all WV metadata.
| .setReceiver( | ||
| windowedValue -> { | ||
| checkTimerTimestamp(windowedValue.getTimestamp()); | ||
| outputTo(mainOutputConsumer, windowedValue); |
There was a problem hiding this comment.
@kennknowles here, windowedValue is sent differently, why is that?
There was a problem hiding this comment.
I am not sure. This file was a huge mess, with actually many types of transforms executed in the same file via switch statements. It may just be an accidental piece of history and/or a part that I failed to refactor when I introduced OutputBuilder. My quick read is that this one is the idea way to do it, because it passes the whole WindowedValue to the FnDataReceiver so it doesn't lose dasta. In the other places where it goes through the OnTimerContext it loses metadata.
| Instant timestamp, | ||
| Collection<? extends BoundedWindow> windows, | ||
| PaneInfo paneInfo) { | ||
| builder(value).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output(); |
There was a problem hiding this comment.
@kennknowles , I assume that by introducing builder we freeze outputWindowedValue and have some limited functionality, but as part of receiver we should use outputWindowedValue method as we risk losing any new fields we've added.
There was a problem hiding this comment.
I am not sure if I understand the comment. Here is what I think are answers:
- Yes, we freeze the API for
outputWindowedValue. - No, we shouldn't lose fields, because
builder(value)is an abstract method and it is the job ofOutputReceiver.builder(...)to make sure to set all the builder values to defaults, and propagate the values from the current element or timer context.
…ions. Mostly passthrough.
43050a8 to
03aa66f
Compare
|
Assigning reviewers: R: @chamikaramj for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
|
||
| @Override | ||
| public CausedByDrain causedByDrain() { | ||
| return CausedByDrain.NORMAL; |
There was a problem hiding this comment.
Should this come from the element metadata?
| timestamp, | ||
| outputTimestamp, | ||
| timeDomain, | ||
| CausedByDrain.NORMAL); |
There was a problem hiding this comment.
I think it should propagate here? will that be a follow-up that adds it to the pushback side input DoFnRunner?
There was a problem hiding this comment.
ack, adding it. I've missed that interface.
| new Instant(0), | ||
| TimeDomain.EVENT_TIME); | ||
| TimeDomain.EVENT_TIME, | ||
| CausedByDrain.NORMAL); |
There was a problem hiding this comment.
Maybe this unit test file should test caused by drain propagation somehow?
There was a problem hiding this comment.
ack, adding test
| new Instant(((Number) kvMap.get("holdTimestamp")).longValue()), | ||
| paneInfo); | ||
| paneInfo, | ||
| CausedByDrain.NORMAL); |
There was a problem hiding this comment.
I think this should actually come from the data? It is a weird method and I have no context for why it is the way it is...
There was a problem hiding this comment.
I think here this requires follow up PR for runner v2 in the future - https://github.com/stankiewicz/beam/blob/e4a5eea37a52429ccec55e3a3a1e0701ba5ff1ee/model/pipeline/src/main/proto/org/apache/beam/model/pipeline/v1/beam_runner_api.proto#L901
| Instant timestamp, | ||
| Collection<? extends BoundedWindow> windows, | ||
| PaneInfo paneInfo) { | ||
| builder(value).setTimestamp(timestamp).setWindows(windows).setPaneInfo(paneInfo).output(); |
There was a problem hiding this comment.
I am not sure if I understand the comment. Here is what I think are answers:
- Yes, we freeze the API for
outputWindowedValue. - No, we shouldn't lose fields, because
builder(value)is an abstract method and it is the job ofOutputReceiver.builder(...)to make sure to set all the builder values to defaults, and propagate the values from the current element or timer context.
| HOLD_TIME, | ||
| PaneInfo.NO_FIRING)); | ||
| PaneInfo.NO_FIRING, | ||
| CausedByDrain.NORMAL)); |
There was a problem hiding this comment.
We need some caused by drain tests?
|
Reminder, please take a look at this pr: @chamikaramj |
|
waiting on author |
840eca7 to
38ace4c
Compare
fix flink rewrites
38ace4c to
047b7e2
Compare
|
Run Flink Container PreCommit |
|
R: @kennknowles |
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
kennknowles
left a comment
There was a problem hiding this comment.
I am happy with merging this right now. Regarding timers: there is a gap where we do not propagate or encode metadata, right? So then "caused by drain" will not be set on elements output from a timer during drain unless we always check the is_draining bit. I think this is fine, since during drain every timer could be caused by drain so it won't be mistakenly processed as "not caused by drain" even if it is not truly caused by drain (like if it is set to a past watermark that was before the drain started) but was set because of processing an element that was caused by drain... interesting situation, perhaps.
| InstantCoder.of().encode(timer.getFireTimestamp(), outStream); | ||
| InstantCoder.of().encode(timer.getHoldTimestamp(), outStream); | ||
| PaneInfoCoder.INSTANCE.encode(timer.getPaneInfo(), outStream); | ||
| // todo maybe similarly to windowedValue, should we propagate metadata with paneinfo bit |
There was a problem hiding this comment.
Yes, good point! We need the same metadata capability on encoded timers.
There was a problem hiding this comment.
correct, so today, metadata is lost when we set timer (we don't encode metadata anywhere in persistence) and timer expires we don't recover metadata from anywhere. I think this will require follow up exploration as I don't know if windmill would accept additional metadata.
Correct on gap - if someone sets the timer during draining, metadata is not propagating (e.g tracing) for subsequent onTimer call. but for caused by drain - it will be taken from is_draining bit from windmill. |
This pull request enhances the Apache Beam Java SDK's timer mechanism by propagating a causedByDrain flag. This flag allows DoFns to distinguish between timers that fire as part of normal pipeline execution and those that are explicitly triggered during a pipeline draining process. By propagating this information through the core timer interfaces and their implementations, it provides more granular control and context for DoFns reacting to timer events e.g. in future within SDF or ReduceFnRunner.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.