Skip to content

Commit 17ffab6

Browse files
vault: gracefully handle individual blob broadcast failures in Observation (#21765)
* vault: gracefully handle individual blob broadcast failures in Observation Previously, if any single payload failed to broadcast as a blob during the Observation phase, the entire observation was aborted and returned an error. This is unnecessarily disruptive — one problematic payload (e.g. transient network issue, malformed data) would prevent all other valid payloads from being included in the observation, stalling the OCR round. Now, individual broadcast failures are logged as warnings (with the request ID and error details) and the failed payload is simply excluded from PendingQueueItems. The remaining payloads continue to be broadcast and observed normally. The blob broadcast logic is extracted into a dedicated broadcastBlobPayloads method for clarity. Made-with: Cursor * vault: propagate context cancellation from blob broadcast failures Check ctx.Err() when BroadcastBlob fails so that context.Canceled and context.DeadlineExceeded are returned immediately rather than swallowed. This preserves fail-fast semantics for expired OCR rounds while still skipping item-specific transient errors. Made-with: Cursor * vault: add per-call 2s timeout to BroadcastBlob invocations Each parallel BroadcastBlob call now gets a 2-second timeout derived from the parent context. A slow individual broadcast will be cancelled and skipped without stalling the rest of the batch. Parent context cancellation still propagates immediately for round-level failures. Made-with: Cursor
1 parent 5c91c34 commit 17ffab6

2 files changed

Lines changed: 346 additions & 30 deletions

File tree

core/services/ocr2/plugins/vault/plugin.go

Lines changed: 72 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
470470
}
471471

472472
blobPayloads := make([][]byte, 0, len(localQueueItems))
473+
blobPayloadIDs := make([]string, 0, len(localQueueItems))
473474
maxObservedLocalQueueItems := 0
474475
for _, item := range localQueueItems {
475476
// The item is already in the pending queue. We'll be processing it
@@ -502,6 +503,7 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
502503
}
503504

504505
blobPayloads = append(blobPayloads, itemb)
506+
blobPayloadIDs = append(blobPayloadIDs, item.Id)
505507

506508
if len(blobPayloads) >= maxObservedLocalQueueItems {
507509
r.lggr.Warnw("Observed local queue exceeds batch size limit, truncating",
@@ -512,35 +514,11 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
512514
}
513515
}
514516

515-
observedLocalQueue := make([][]byte, len(blobPayloads))
516-
// Broadcast pending-queue blobs in parallel to reduce Observation() latency.
517-
// Shortening this phase helps the OCR round finish within DeltaProgress.
518-
blobBroadcastStart := time.Now()
519-
defer func() {
520-
r.lggr.Debugw("observation blob broadcast finished", "seqNr", seqNr, "blobCount", len(blobPayloads), "elapsed", time.Since(blobBroadcastStart))
521-
}()
522-
g, broadcastCtx := errgroup.WithContext(ctx)
523-
for i, payload := range blobPayloads {
524-
g.Go(func() error {
525-
blobHandle, ierr2 := blobBroadcastFetcher.BroadcastBlob(broadcastCtx, payload, ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + 2})
526-
if ierr2 != nil {
527-
return fmt.Errorf("could not broadcast pending queue item as blob: %w", ierr2)
528-
}
529-
530-
blobHandleBytes, ierr2 := r.marshalBlob(blobHandle)
531-
if ierr2 != nil {
532-
return fmt.Errorf("could not marshal blob handle to bytes: %w", ierr2)
533-
}
534-
535-
observedLocalQueue[i] = blobHandleBytes
536-
return nil
537-
})
538-
}
539-
if err = g.Wait(); err != nil {
517+
pendingQueueItems, err := r.broadcastBlobPayloads(ctx, blobBroadcastFetcher, seqNr, blobPayloads, blobPayloadIDs)
518+
if err != nil {
540519
return nil, err
541520
}
542-
543-
obspb.PendingQueueItems = observedLocalQueue
521+
obspb.PendingQueueItems = pendingQueueItems
544522

545523
// Second, generate a random nonce that we'll use to sort the observations.
546524
// Each node generates a nonce idepedently, to be concatenated later on.
@@ -563,6 +541,73 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
563541
return types.Observation(obsb), nil
564542
}
565543

544+
// broadcastBlobPayloads broadcasts each payload as a blob in parallel to reduce
545+
// Observation() latency (shortening this phase helps the OCR round finish within
546+
// DeltaProgress). Each call is given a 2-second timeout so that a single slow
547+
// broadcast cannot stall the entire batch. Individual broadcast failures are logged
548+
// and skipped rather than aborting the entire observation, so that one problematic
549+
// payload does not prevent the remaining items from being observed. Context
550+
// cancellation/deadline errors on the parent context are propagated immediately so
551+
// that expired rounds fail fast.
552+
func (r *ReportingPlugin) broadcastBlobPayloads(
553+
ctx context.Context,
554+
fetcher ocr3_1types.BlobBroadcastFetcher,
555+
seqNr uint64,
556+
payloads [][]byte,
557+
requestIDs []string,
558+
) ([][]byte, error) {
559+
results := make([][]byte, len(payloads))
560+
561+
start := time.Now()
562+
defer func() {
563+
r.lggr.Debugw("observation blob broadcast finished", "seqNr", seqNr, "blobCount", len(payloads), "elapsed", time.Since(start))
564+
}()
565+
566+
const perBlobTimeout = 2 * time.Second
567+
var g errgroup.Group
568+
for i, payload := range payloads {
569+
g.Go(func() error {
570+
broadcastCtx, cancel := context.WithTimeout(ctx, perBlobTimeout)
571+
defer cancel()
572+
573+
blobHandle, err := fetcher.BroadcastBlob(broadcastCtx, payload, ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + 2})
574+
if err != nil {
575+
if ctx.Err() != nil {
576+
return ctx.Err()
577+
}
578+
r.lggr.Warnw("failed to broadcast pending queue item as blob, skipping",
579+
"seqNr", seqNr,
580+
"requestID", requestIDs[i],
581+
"err", err)
582+
return nil
583+
}
584+
585+
blobHandleBytes, err := r.marshalBlob(blobHandle)
586+
if err != nil {
587+
r.lggr.Warnw("failed to marshal blob handle, skipping",
588+
"seqNr", seqNr,
589+
"requestID", requestIDs[i],
590+
"err", err)
591+
return nil
592+
}
593+
594+
results[i] = blobHandleBytes
595+
return nil
596+
})
597+
}
598+
if err := g.Wait(); err != nil {
599+
return nil, err
600+
}
601+
602+
filtered := make([][]byte, 0, len(results))
603+
for _, item := range results {
604+
if item != nil {
605+
filtered = append(filtered, item)
606+
}
607+
}
608+
return filtered, nil
609+
}
610+
566611
func (r *ReportingPlugin) observeGetSecrets(ctx context.Context, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) {
567612
tp := req.(*vaultcommon.GetSecretsRequest)
568613
o.RequestType = vaultcommon.RequestType_GET_SECRETS

0 commit comments

Comments
 (0)