Skip to content

Commit ec65d64

Browse files
authored
[Prism] Fix gRPC deadline exceeded errors during bundle failure by passing errgroup context (#38472)
* Add go test to reproduce the deadline exceed errors when a dofn fails * Add python unit test to reproduce it. * Change the context to egctx so a bundle failure will cancel other bundle execution. * Fix lints. * Remove unused import. * Move test to a test class that use built prism during vr test. * Remove the new python test due to flakiness.
1 parent fa0eef9 commit ec65d64

3 files changed

Lines changed: 32 additions & 1 deletion

File tree

sdks/go/pkg/beam/runners/prism/internal/execute.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,11 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
376376
eg.Go(func() error {
377377
s := stages[rb.StageID]
378378
wk := wks[s.envID]
379-
if err := s.Execute(ctx, j, wk, comps, em, rb); err != nil {
379+
// Pass egctx instead of the parent ctx so that when any bundle fails,
380+
// the errgroup cancels egctx and all other concurrent bundle execution
381+
// goroutines immediately detect cancellation and abort. This prevents
382+
// eg.Wait() from blocking indefinitely and allows prompt error reporting.
383+
if err := s.Execute(egctx, j, wk, comps, em, rb); err != nil {
380384
// Ensure we clean up on bundle failure
381385
j.Logger.Error("Bundle Failed.", slog.Any("error", err))
382386
em.FailBundle(rb)

sdks/go/pkg/beam/runners/prism/internal/execute_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,28 @@ func TestFailure(t *testing.T) {
519519
}
520520
}
521521

522+
func TestFailureHang(t *testing.T) {
523+
initRunner(t)
524+
525+
p, s := beam.NewPipelineWithRoot()
526+
imp := beam.Impulse(s)
527+
col1 := beam.ParDo(s, doFnBlock, imp)
528+
col2 := beam.ParDo(s, doFnFail, imp)
529+
beam.ParDo(s, &int64Check{Name: "block", Want: []int{}}, col1)
530+
beam.ParDo(s, &int64Check{Name: "fail", Want: []int{}}, col2)
531+
532+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
533+
defer cancel()
534+
535+
_, err := executeWithT(ctx, t, p)
536+
if err == nil {
537+
t.Fatalf("expected pipeline failure, but got a success")
538+
}
539+
if want := "doFnFail: failing as intended"; !strings.Contains(err.Error(), want) {
540+
t.Fatalf("expected pipeline failure with %q, but was %v", want, err)
541+
}
542+
}
543+
522544
func TestRunner_Passert(t *testing.T) {
523545
initRunner(t)
524546
tests := []struct {

sdks/go/pkg/beam/runners/prism/internal/testdofns_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ func init() {
5959
register.Function3x0(dofn1Counter)
6060
register.Function2x0(dofnSink)
6161
register.Function3x1(doFnFail)
62+
register.Function3x0(doFnBlock)
6263

6364
register.Function2x1(combineIntSum)
6465

@@ -283,6 +284,10 @@ func doFnFail(ctx context.Context, _ []byte, emit func(int64)) error {
283284
return fmt.Errorf("doFnFail: failing as intended")
284285
}
285286

287+
func doFnBlock(ctx context.Context, _ []byte, emit func(int64)) {
288+
<-ctx.Done()
289+
}
290+
286291
func combineIntSum(a, b int64) int64 {
287292
return a + b
288293
}

0 commit comments

Comments
 (0)