Skip to content

Commit 08b6b9a

Browse files
vault: propagate context cancellation from blob broadcast failures
Check ctx.Err() when BroadcastBlob fails so that context.Canceled and context.DeadlineExceeded are returned immediately rather than swallowed. This preserves fail-fast semantics for expired OCR rounds while still skipping item-specific transient errors. Made-with: Cursor
1 parent c160977 commit 08b6b9a

2 files changed

Lines changed: 78 additions & 11 deletions

File tree

core/services/ocr2/plugins/vault/plugin.go

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,11 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
514514
}
515515
}
516516

517-
obspb.PendingQueueItems = r.broadcastBlobPayloads(ctx, blobBroadcastFetcher, seqNr, blobPayloads, blobPayloadIDs)
517+
pendingQueueItems, err := r.broadcastBlobPayloads(ctx, blobBroadcastFetcher, seqNr, blobPayloads, blobPayloadIDs)
518+
if err != nil {
519+
return nil, err
520+
}
521+
obspb.PendingQueueItems = pendingQueueItems
518522

519523
// Second, generate a random nonce that we'll use to sort the observations.
520524
// Each node generates a nonce idepedently, to be concatenated later on.
@@ -541,14 +545,15 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
541545
// Observation() latency (shortening this phase helps the OCR round finish within
542546
// DeltaProgress). Individual broadcast failures are logged and skipped rather than
543547
// aborting the entire observation, so that one problematic payload does not prevent
544-
// the remaining items from being observed.
548+
// the remaining items from being observed. Context cancellation/deadline errors are
549+
// propagated immediately so that expired rounds fail fast.
545550
func (r *ReportingPlugin) broadcastBlobPayloads(
546551
ctx context.Context,
547552
fetcher ocr3_1types.BlobBroadcastFetcher,
548553
seqNr uint64,
549554
payloads [][]byte,
550555
requestIDs []string,
551-
) [][]byte {
556+
) ([][]byte, error) {
552557
results := make([][]byte, len(payloads))
553558

554559
start := time.Now()
@@ -561,6 +566,9 @@ func (r *ReportingPlugin) broadcastBlobPayloads(
561566
g.Go(func() error {
562567
blobHandle, err := fetcher.BroadcastBlob(ctx, payload, ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + 2})
563568
if err != nil {
569+
if ctx.Err() != nil {
570+
return ctx.Err()
571+
}
564572
r.lggr.Warnw("failed to broadcast pending queue item as blob, skipping",
565573
"seqNr", seqNr,
566574
"requestID", requestIDs[i],
@@ -581,15 +589,17 @@ func (r *ReportingPlugin) broadcastBlobPayloads(
581589
return nil
582590
})
583591
}
584-
_ = g.Wait()
592+
if err := g.Wait(); err != nil {
593+
return nil, err
594+
}
585595

586596
filtered := make([][]byte, 0, len(results))
587597
for _, item := range results {
588598
if item != nil {
589599
filtered = append(filtered, item)
590600
}
591601
}
592-
return filtered
602+
return filtered, nil
593603
}
594604

595605
func (r *ReportingPlugin) observeGetSecrets(ctx context.Context, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) {

core/services/ocr2/plugins/vault/plugin_test.go

Lines changed: 63 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7143,7 +7143,8 @@ func TestPlugin_broadcastBlobPayloads(t *testing.T) {
71437143
}
71447144

71457145
fetcher := &callbackBlobFetcher{fn: func([]byte) error { return nil }}
7146-
result := r.broadcastBlobPayloads(t.Context(), fetcher, 1, nil, nil)
7146+
result, err := r.broadcastBlobPayloads(t.Context(), fetcher, 1, nil, nil)
7147+
require.NoError(t, err)
71477148
assert.Empty(t, result)
71487149
})
71497150

@@ -7161,7 +7162,8 @@ func TestPlugin_broadcastBlobPayloads(t *testing.T) {
71617162
payloads := [][]byte{[]byte("p1"), []byte("p2"), []byte("p3")}
71627163
ids := []string{"req-1", "req-2", "req-3"}
71637164

7164-
result := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7165+
result, err := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7166+
require.NoError(t, err)
71657167
assert.Len(t, result, 3)
71667168
for _, item := range result {
71677169
assert.Equal(t, []byte("handle"), item)
@@ -7188,7 +7190,8 @@ func TestPlugin_broadcastBlobPayloads(t *testing.T) {
71887190
payloads := [][]byte{[]byte("p1"), []byte("p2"), []byte("p3")}
71897191
ids := []string{"req-1", "req-2", "req-3"}
71907192

7191-
result := r.broadcastBlobPayloads(t.Context(), fetcher, 5, payloads, ids)
7193+
result, err := r.broadcastBlobPayloads(t.Context(), fetcher, 5, payloads, ids)
7194+
require.NoError(t, err)
71927195
assert.Len(t, result, 2)
71937196

71947197
warnLogs := observed.FilterMessage("failed to broadcast pending queue item as blob, skipping")
@@ -7213,7 +7216,8 @@ func TestPlugin_broadcastBlobPayloads(t *testing.T) {
72137216
payloads := [][]byte{[]byte("p1"), []byte("p2")}
72147217
ids := []string{"req-1", "req-2"}
72157218

7216-
result := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7219+
result, err := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7220+
require.NoError(t, err)
72177221
assert.Empty(t, result)
72187222

72197223
warnLogs := observed.FilterMessage("failed to broadcast pending queue item as blob, skipping")
@@ -7234,7 +7238,8 @@ func TestPlugin_broadcastBlobPayloads(t *testing.T) {
72347238
payloads := [][]byte{[]byte("p1"), []byte("p2")}
72357239
ids := []string{"req-1", "req-2"}
72367240

7237-
result := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7241+
result, err := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7242+
require.NoError(t, err)
72387243
assert.Empty(t, result)
72397244

72407245
warnLogs := observed.FilterMessage("failed to marshal blob handle, skipping")
@@ -7267,12 +7272,64 @@ func TestPlugin_broadcastBlobPayloads(t *testing.T) {
72677272
payloads := [][]byte{[]byte("p1"), []byte("p2"), []byte("p3")}
72687273
ids := []string{"req-1", "req-2", "req-3"}
72697274

7270-
result := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7275+
result, err := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7276+
require.NoError(t, err)
72717277

72727278
broadcastWarns := observed.FilterMessage("failed to broadcast pending queue item as blob, skipping")
72737279
marshalWarns := observed.FilterMessage("failed to marshal blob handle, skipping")
72747280
assert.Equal(t, 1, broadcastWarns.Len())
72757281
assert.Equal(t, 1, marshalWarns.Len())
72767282
assert.Len(t, result, 1)
72777283
})
7284+
7285+
t.Run("context cancellation propagates error", func(t *testing.T) {
7286+
lggr := logger.TestLogger(t)
7287+
r := &ReportingPlugin{
7288+
lggr: lggr,
7289+
metrics: newTestMetrics(t),
7290+
marshalBlob: func(ocr3_1types.BlobHandle) ([]byte, error) {
7291+
return []byte("handle"), nil
7292+
},
7293+
}
7294+
7295+
ctx, cancel := context.WithCancel(t.Context())
7296+
cancel()
7297+
7298+
fetcher := &callbackBlobFetcher{fn: func([]byte) error {
7299+
return ctx.Err()
7300+
}}
7301+
7302+
payloads := [][]byte{[]byte("p1"), []byte("p2")}
7303+
ids := []string{"req-1", "req-2"}
7304+
7305+
result, err := r.broadcastBlobPayloads(ctx, fetcher, 1, payloads, ids)
7306+
assert.Nil(t, result)
7307+
assert.ErrorIs(t, err, context.Canceled)
7308+
})
7309+
7310+
t.Run("context deadline exceeded propagates error", func(t *testing.T) {
7311+
lggr := logger.TestLogger(t)
7312+
r := &ReportingPlugin{
7313+
lggr: lggr,
7314+
metrics: newTestMetrics(t),
7315+
marshalBlob: func(ocr3_1types.BlobHandle) ([]byte, error) {
7316+
return []byte("handle"), nil
7317+
},
7318+
}
7319+
7320+
ctx, cancel := context.WithTimeout(t.Context(), 0)
7321+
defer cancel()
7322+
<-ctx.Done()
7323+
7324+
fetcher := &callbackBlobFetcher{fn: func([]byte) error {
7325+
return ctx.Err()
7326+
}}
7327+
7328+
payloads := [][]byte{[]byte("p1")}
7329+
ids := []string{"req-1"}
7330+
7331+
result, err := r.broadcastBlobPayloads(ctx, fetcher, 1, payloads, ids)
7332+
assert.Nil(t, result)
7333+
assert.ErrorIs(t, err, context.DeadlineExceeded)
7334+
})
72787335
}

0 commit comments

Comments
 (0)