[Prism] Fix gRPC deadline exceeded errors during bundle failure by passing errgroup context#38472
Conversation
|
r: @lostluck |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a concurrency issue in the Prism runner where bundle execution goroutines were not properly respecting cancellation signals. By switching to the errgroup context, the runner now ensures that when one bundle fails, all other concurrent operations are cancelled immediately. This change prevents indefinite blocking, allowing the runner to report job failures promptly and avoiding unnecessary gRPC deadline exceeded errors on the client side. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request improves the Prism runner's error handling by utilizing the errgroup context during bundle execution. This ensures that if any bundle fails, all other concurrent bundle goroutines are immediately canceled, preventing the runner from hanging and allowing for prompt error reporting. The changes include new regression tests in both Go and Python. Review feedback identified a bug in the Python test where incorrect pipeline options were imported, which would result in a NameError during execution.
|
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #38472 +/- ##
============================================
- Coverage 57.54% 55.76% -1.79%
+ Complexity 5329 2095 -3234
============================================
Files 1399 1099 -300
Lines 198776 172274 -26502
Branches 4980 1350 -3630
============================================
- Hits 114385 96061 -18324
+ Misses 80476 73817 -6659
+ Partials 3915 2396 -1519
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
cc'ed @claudevdm |
In
executePipeline, bundle execution goroutines previously used the parent job context instead of the errgroup context (egctx). When a bundle failed,egctxwas cancelled but other concurrent bundle execution goroutines did not detect cancellation. This causedeg.Wait()to block indefinitely, preventing Prism from reportingJobState_FAILEDpromptly and leading to 60-second gRPCDEADLINE_EXCEEDEDerrors on the client side.Example failed tests:
With the proposed fix, the following steps will be triggered to ensure both the server and the client exit gracefully.
As soon as any bundle fails,
egcancelsegctx(We need to pass egctx instead of ctx ins.Execute()below).beam/sdks/go/pkg/beam/runners/prism/internal/execute.go
Lines 376 to 386 in 4d683c0
All other concurrent bundle execution goroutines immediately detect
<-egctx.Done()and exit.beam/sdks/go/pkg/beam/runners/prism/internal/stage.go
Line 125 in 4d683c0
beam/sdks/go/pkg/beam/runners/prism/internal/stage.go
Lines 203 to 206 in 4d683c0
eg.Wait()unblocks immediately, allowingRunPipelineto callj.Failed(err).beam/sdks/go/pkg/beam/runners/prism/internal/execute.go
Lines 369 to 375 in 4d683c0
Prism immediately broadcasts
JobState_FAILEDoverGetStateStream, and the Python SDK client gracefully receives and raises the underlying worker exception without encountering a deadline exceeded error.beam/sdks/go/pkg/beam/runners/prism/internal/execute.go
Lines 90 to 93 in 4d683c0