Skip to content

Commit 0cd2c3c

Browse files
vault: add per-call 2s timeout to BroadcastBlob invocations
Each parallel BroadcastBlob call now gets a 2-second timeout derived from the parent context. A slow individual broadcast will be cancelled and skipped without stalling the rest of the batch. Parent context cancellation still propagates immediately for round-level failures. Made-with: Cursor
1 parent 08b6b9a commit 0cd2c3c

2 files changed

Lines changed: 57 additions & 5 deletions

File tree

core/services/ocr2/plugins/vault/plugin.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -543,10 +543,12 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
543543

544544
// broadcastBlobPayloads broadcasts each payload as a blob in parallel to reduce
545545
// Observation() latency (shortening this phase helps the OCR round finish within
546-
// DeltaProgress). Individual broadcast failures are logged and skipped rather than
547-
// aborting the entire observation, so that one problematic payload does not prevent
548-
// the remaining items from being observed. Context cancellation/deadline errors are
549-
// propagated immediately so that expired rounds fail fast.
546+
// DeltaProgress). Each call is given a 2-second timeout so that a single slow
547+
// broadcast cannot stall the entire batch. Individual broadcast failures are logged
548+
// and skipped rather than aborting the entire observation, so that one problematic
549+
// payload does not prevent the remaining items from being observed. Context
550+
// cancellation/deadline errors on the parent context are propagated immediately so
551+
// that expired rounds fail fast.
550552
func (r *ReportingPlugin) broadcastBlobPayloads(
551553
ctx context.Context,
552554
fetcher ocr3_1types.BlobBroadcastFetcher,
@@ -561,10 +563,14 @@ func (r *ReportingPlugin) broadcastBlobPayloads(
561563
r.lggr.Debugw("observation blob broadcast finished", "seqNr", seqNr, "blobCount", len(payloads), "elapsed", time.Since(start))
562564
}()
563565

566+
const perBlobTimeout = 2 * time.Second
564567
var g errgroup.Group
565568
for i, payload := range payloads {
566569
g.Go(func() error {
567-
blobHandle, err := fetcher.BroadcastBlob(ctx, payload, ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + 2})
570+
broadcastCtx, cancel := context.WithTimeout(ctx, perBlobTimeout)
571+
defer cancel()
572+
573+
blobHandle, err := fetcher.BroadcastBlob(broadcastCtx, payload, ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + 2})
568574
if err != nil {
569575
if ctx.Err() != nil {
570576
return ctx.Err()

core/services/ocr2/plugins/vault/plugin_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5188,6 +5188,21 @@ func (f *callbackBlobFetcher) FetchBlob(context.Context, ocr3_1types.BlobHandle)
51885188
panic("FetchBlob should not be called in broadcastBlobPayloads tests")
51895189
}
51905190

5191+
type ctxCallbackBlobFetcher struct {
5192+
fn func(ctx context.Context, payload []byte) error
5193+
}
5194+
5195+
func (f *ctxCallbackBlobFetcher) BroadcastBlob(ctx context.Context, payload []byte, _ ocr3_1types.BlobExpirationHint) (ocr3_1types.BlobHandle, error) {
5196+
if err := f.fn(ctx, payload); err != nil {
5197+
return ocr3_1types.BlobHandle{}, err
5198+
}
5199+
return ocr3_1types.BlobHandle{}, nil
5200+
}
5201+
5202+
func (f *ctxCallbackBlobFetcher) FetchBlob(context.Context, ocr3_1types.BlobHandle) ([]byte, error) {
5203+
panic("FetchBlob should not be called in broadcastBlobPayloads tests")
5204+
}
5205+
51915206
func TestPlugin_StateTransition_StoresPendingQueue(t *testing.T) {
51925207
lggr := logger.TestLogger(t)
51935208
store := requests.NewStore[*vaulttypes.Request]()
@@ -7332,4 +7347,35 @@ func TestPlugin_broadcastBlobPayloads(t *testing.T) {
73327347
assert.Nil(t, result)
73337348
assert.ErrorIs(t, err, context.DeadlineExceeded)
73347349
})
7350+
7351+
t.Run("slow broadcast hits per-call timeout and is skipped", func(t *testing.T) {
7352+
lggr, observed := logger.TestLoggerObserved(t, zapcore.WarnLevel)
7353+
r := &ReportingPlugin{
7354+
lggr: lggr,
7355+
metrics: newTestMetrics(t),
7356+
marshalBlob: func(ocr3_1types.BlobHandle) ([]byte, error) {
7357+
return []byte("handle"), nil
7358+
},
7359+
}
7360+
7361+
fetcher := &ctxCallbackBlobFetcher{fn: func(ctx context.Context, payload []byte) error {
7362+
if string(payload) == "slow" {
7363+
<-ctx.Done()
7364+
return ctx.Err()
7365+
}
7366+
return nil
7367+
}}
7368+
7369+
payloads := [][]byte{[]byte("fast"), []byte("slow")}
7370+
ids := []string{"req-fast", "req-slow"}
7371+
7372+
result, err := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7373+
require.NoError(t, err)
7374+
assert.Len(t, result, 1)
7375+
7376+
warnLogs := observed.FilterMessage("failed to broadcast pending queue item as blob, skipping")
7377+
assert.Equal(t, 1, warnLogs.Len())
7378+
fields := warnLogs.All()[0].ContextMap()
7379+
assert.Equal(t, "req-slow", fields["requestID"])
7380+
})
73357381
}

0 commit comments

Comments
 (0)