[Dataflow Streaming] [Multi Key] Drop failed work in BoundedQueueExecutor::pollWork#38920
[Dataflow Streaming] [Multi Key] Drop failed work in BoundedQueueExecutor::pollWork#38920arunpandianp wants to merge 2 commits into
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request improves the robustness of the BoundedQueueExecutor by ensuring that failed work items are filtered out during the polling process. By identifying and dropping these items before they reach the execution threads, the system avoids unnecessary processing overhead and potential errors associated with failed tasks. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request updates the BoundedQueueExecutor to continuously poll and drop failed work items, closing their handles, until a valid work item is found or the queue is empty. A corresponding unit test, testPollWorkDropsFailedWork, was added to verify this behavior. The review feedback correctly points out a potential thread leak in the new test if assertions fail before the blocker thread is unblocked and the executor is shut down. It is recommended to wrap the test assertions and execution in a try-finally block to guarantee proper cleanup.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
|
R: @scwhittle |
|
Assigning reviewers: R: @Abacn added as fallback since no labels match configuration Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
| return null; | ||
| } | ||
| if (queuedWork.getWork().work().isFailed()) { | ||
| queuedWork.getHandle().close(); |
There was a problem hiding this comment.
this is just updating the counters, but it is not updating that this key was scheduled for processing. I think that somehow ActiveWorkState.completeWorkAndGetNextWorkForKey needs to be called so that we note that this key is completed and other work for the key can schedule.
Maybe a unit test at a higher level would help verify this behavior is working properly.
There was a problem hiding this comment.
Thanks for catching it. Will fix it with a test, after #38814 is merged
|
Reminder, please take a look at this pr: @Abacn |
|
Assigning new set of reviewers because Pr has gone too long without review. If you would like to opt out of this review, comment R: @Abacn added as fallback since no labels match configuration Available commands:
|
|
Current status: waiting for other PR to merge to improve tests for this one before merging |
Drop failed work in BoundedQueueExecutor::pollWork before it reaches the harness threads for execution.