Skip to content

Commit c160977

Browse files
vault: gracefully handle individual blob broadcast failures in Observation
Previously, if any single payload failed to broadcast as a blob during the Observation phase, the entire observation was aborted and returned an error. This is unnecessarily disruptive — one problematic payload (e.g. transient network issue, malformed data) would prevent all other valid payloads from being included in the observation, stalling the OCR round. Now, individual broadcast failures are logged as warnings (with the request ID and error details) and the failed payload is simply excluded from PendingQueueItems. The remaining payloads continue to be broadcast and observed normally. The blob broadcast logic is extracted into a dedicated broadcastBlobPayloads method for clarity. Made-with: Cursor
1 parent 6eda8c3 commit c160977

2 files changed

Lines changed: 229 additions & 32 deletions

File tree

core/services/ocr2/plugins/vault/plugin.go

Lines changed: 58 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
470470
}
471471

472472
blobPayloads := make([][]byte, 0, len(localQueueItems))
473+
blobPayloadIDs := make([]string, 0, len(localQueueItems))
473474
maxObservedLocalQueueItems := 0
474475
for _, item := range localQueueItems {
475476
// The item is already in the pending queue. We'll be processing it
@@ -502,6 +503,7 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
502503
}
503504

504505
blobPayloads = append(blobPayloads, itemb)
506+
blobPayloadIDs = append(blobPayloadIDs, item.Id)
505507

506508
if len(blobPayloads) >= maxObservedLocalQueueItems {
507509
r.lggr.Warnw("Observed local queue exceeds batch size limit, truncating",
@@ -512,35 +514,7 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
512514
}
513515
}
514516

515-
observedLocalQueue := make([][]byte, len(blobPayloads))
516-
// Broadcast pending-queue blobs in parallel to reduce Observation() latency.
517-
// Shortening this phase helps the OCR round finish within DeltaProgress.
518-
blobBroadcastStart := time.Now()
519-
defer func() {
520-
r.lggr.Debugw("observation blob broadcast finished", "seqNr", seqNr, "blobCount", len(blobPayloads), "elapsed", time.Since(blobBroadcastStart))
521-
}()
522-
g, broadcastCtx := errgroup.WithContext(ctx)
523-
for i, payload := range blobPayloads {
524-
g.Go(func() error {
525-
blobHandle, ierr2 := blobBroadcastFetcher.BroadcastBlob(broadcastCtx, payload, ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + 2})
526-
if ierr2 != nil {
527-
return fmt.Errorf("could not broadcast pending queue item as blob: %w", ierr2)
528-
}
529-
530-
blobHandleBytes, ierr2 := r.marshalBlob(blobHandle)
531-
if ierr2 != nil {
532-
return fmt.Errorf("could not marshal blob handle to bytes: %w", ierr2)
533-
}
534-
535-
observedLocalQueue[i] = blobHandleBytes
536-
return nil
537-
})
538-
}
539-
if err = g.Wait(); err != nil {
540-
return nil, err
541-
}
542-
543-
obspb.PendingQueueItems = observedLocalQueue
517+
obspb.PendingQueueItems = r.broadcastBlobPayloads(ctx, blobBroadcastFetcher, seqNr, blobPayloads, blobPayloadIDs)
544518

545519
// Second, generate a random nonce that we'll use to sort the observations.
546520
// Each node generates a nonce idepedently, to be concatenated later on.
@@ -563,6 +537,61 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
563537
return types.Observation(obsb), nil
564538
}
565539

540+
// broadcastBlobPayloads broadcasts each payload as a blob in parallel to reduce
541+
// Observation() latency (shortening this phase helps the OCR round finish within
542+
// DeltaProgress). Individual broadcast failures are logged and skipped rather than
543+
// aborting the entire observation, so that one problematic payload does not prevent
544+
// the remaining items from being observed.
545+
func (r *ReportingPlugin) broadcastBlobPayloads(
546+
ctx context.Context,
547+
fetcher ocr3_1types.BlobBroadcastFetcher,
548+
seqNr uint64,
549+
payloads [][]byte,
550+
requestIDs []string,
551+
) [][]byte {
552+
results := make([][]byte, len(payloads))
553+
554+
start := time.Now()
555+
defer func() {
556+
r.lggr.Debugw("observation blob broadcast finished", "seqNr", seqNr, "blobCount", len(payloads), "elapsed", time.Since(start))
557+
}()
558+
559+
var g errgroup.Group
560+
for i, payload := range payloads {
561+
g.Go(func() error {
562+
blobHandle, err := fetcher.BroadcastBlob(ctx, payload, ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + 2})
563+
if err != nil {
564+
r.lggr.Warnw("failed to broadcast pending queue item as blob, skipping",
565+
"seqNr", seqNr,
566+
"requestID", requestIDs[i],
567+
"err", err)
568+
return nil
569+
}
570+
571+
blobHandleBytes, err := r.marshalBlob(blobHandle)
572+
if err != nil {
573+
r.lggr.Warnw("failed to marshal blob handle, skipping",
574+
"seqNr", seqNr,
575+
"requestID", requestIDs[i],
576+
"err", err)
577+
return nil
578+
}
579+
580+
results[i] = blobHandleBytes
581+
return nil
582+
})
583+
}
584+
_ = g.Wait()
585+
586+
filtered := make([][]byte, 0, len(results))
587+
for _, item := range results {
588+
if item != nil {
589+
filtered = append(filtered, item)
590+
}
591+
}
592+
return filtered
593+
}
594+
566595
func (r *ReportingPlugin) observeGetSecrets(ctx context.Context, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) {
567596
tp := req.(*vaultcommon.GetSecretsRequest)
568597
o.RequestType = vaultcommon.RequestType_GET_SECRETS

core/services/ocr2/plugins/vault/plugin_test.go

Lines changed: 171 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ func TestPlugin_Observation_PendingQueueEnabled_BroadcastsPendingQueueBlobsInPar
760760
}
761761

762762
func TestPlugin_Observation_PendingQueueEnabled_BroadcastBlobError(t *testing.T) {
763-
lggr := logger.TestLogger(t)
763+
lggr, observed := logger.TestLoggerObserved(t, zapcore.WarnLevel)
764764
store := requests.NewStore[*vaulttypes.Request]()
765765
r := &ReportingPlugin{
766766
lggr: lggr,
@@ -803,8 +803,15 @@ func TestPlugin_Observation_PendingQueueEnabled_BroadcastBlobError(t *testing.T)
803803
require.NoError(t, store.Add(&vaulttypes.Request{Payload: p, IDVal: "request-1"}))
804804
rdr := &kv{m: make(map[string]response)}
805805

806-
_, err = r.Observation(t.Context(), 1, types.AttributedQuery{}, rdr, &errorBlobBroadcastFetcher{err: errors.New("boom")})
807-
require.ErrorContains(t, err, "could not broadcast pending queue item as blob: boom")
806+
obs, err := r.Observation(t.Context(), 1, types.AttributedQuery{}, rdr, &errorBlobBroadcastFetcher{err: errors.New("boom")})
807+
require.NoError(t, err)
808+
require.NotNil(t, obs)
809+
810+
warnLogs := observed.FilterMessage("failed to broadcast pending queue item as blob, skipping")
811+
assert.Equal(t, 1, warnLogs.Len())
812+
fields := warnLogs.All()[0].ContextMap()
813+
assert.Equal(t, "request-1", fields["requestID"])
814+
assert.Contains(t, fmt.Sprint(fields["err"]), "boom")
808815
}
809816

810817
func TestPlugin_Observation_GetSecretsRequest_SecretIdentifierInvalid(t *testing.T) {
@@ -5166,6 +5173,21 @@ func mockMarshalBlob(ocr3_1types.BlobHandle) ([]byte, error) {
51665173
return []byte{}, nil
51675174
}
51685175

5176+
type callbackBlobFetcher struct {
5177+
fn func(payload []byte) error
5178+
}
5179+
5180+
func (f *callbackBlobFetcher) BroadcastBlob(_ context.Context, payload []byte, _ ocr3_1types.BlobExpirationHint) (ocr3_1types.BlobHandle, error) {
5181+
if err := f.fn(payload); err != nil {
5182+
return ocr3_1types.BlobHandle{}, err
5183+
}
5184+
return ocr3_1types.BlobHandle{}, nil
5185+
}
5186+
5187+
func (f *callbackBlobFetcher) FetchBlob(context.Context, ocr3_1types.BlobHandle) ([]byte, error) {
5188+
panic("FetchBlob should not be called in broadcastBlobPayloads tests")
5189+
}
5190+
51695191
func TestPlugin_StateTransition_StoresPendingQueue(t *testing.T) {
51705192
lggr := logger.TestLogger(t)
51715193
store := requests.NewStore[*vaulttypes.Request]()
@@ -7108,3 +7130,149 @@ func TestLogUserErrorAware(t *testing.T) {
71087130
assert.Contains(t, fmt.Sprint(fields["error"]), "internal error")
71097131
})
71107132
}
7133+
7134+
func TestPlugin_broadcastBlobPayloads(t *testing.T) {
7135+
t.Run("empty payloads returns empty slice", func(t *testing.T) {
7136+
lggr := logger.TestLogger(t)
7137+
r := &ReportingPlugin{
7138+
lggr: lggr,
7139+
metrics: newTestMetrics(t),
7140+
marshalBlob: func(ocr3_1types.BlobHandle) ([]byte, error) {
7141+
return []byte("handle"), nil
7142+
},
7143+
}
7144+
7145+
fetcher := &callbackBlobFetcher{fn: func([]byte) error { return nil }}
7146+
result := r.broadcastBlobPayloads(t.Context(), fetcher, 1, nil, nil)
7147+
assert.Empty(t, result)
7148+
})
7149+
7150+
t.Run("all payloads broadcast successfully", func(t *testing.T) {
7151+
lggr := logger.TestLogger(t)
7152+
r := &ReportingPlugin{
7153+
lggr: lggr,
7154+
metrics: newTestMetrics(t),
7155+
marshalBlob: func(ocr3_1types.BlobHandle) ([]byte, error) {
7156+
return []byte("handle"), nil
7157+
},
7158+
}
7159+
7160+
fetcher := &callbackBlobFetcher{fn: func([]byte) error { return nil }}
7161+
payloads := [][]byte{[]byte("p1"), []byte("p2"), []byte("p3")}
7162+
ids := []string{"req-1", "req-2", "req-3"}
7163+
7164+
result := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7165+
assert.Len(t, result, 3)
7166+
for _, item := range result {
7167+
assert.Equal(t, []byte("handle"), item)
7168+
}
7169+
})
7170+
7171+
t.Run("failed broadcast is skipped and logged", func(t *testing.T) {
7172+
lggr, observed := logger.TestLoggerObserved(t, zapcore.WarnLevel)
7173+
r := &ReportingPlugin{
7174+
lggr: lggr,
7175+
metrics: newTestMetrics(t),
7176+
marshalBlob: func(ocr3_1types.BlobHandle) ([]byte, error) {
7177+
return []byte("handle"), nil
7178+
},
7179+
}
7180+
7181+
fetcher := &callbackBlobFetcher{fn: func(payload []byte) error {
7182+
if string(payload) == "p2" {
7183+
return errors.New("broadcast error")
7184+
}
7185+
return nil
7186+
}}
7187+
7188+
payloads := [][]byte{[]byte("p1"), []byte("p2"), []byte("p3")}
7189+
ids := []string{"req-1", "req-2", "req-3"}
7190+
7191+
result := r.broadcastBlobPayloads(t.Context(), fetcher, 5, payloads, ids)
7192+
assert.Len(t, result, 2)
7193+
7194+
warnLogs := observed.FilterMessage("failed to broadcast pending queue item as blob, skipping")
7195+
assert.Equal(t, 1, warnLogs.Len())
7196+
fields := warnLogs.All()[0].ContextMap()
7197+
assert.Equal(t, "req-2", fields["requestID"])
7198+
assert.Equal(t, uint64(5), fields["seqNr"])
7199+
assert.Contains(t, fmt.Sprint(fields["err"]), "broadcast error")
7200+
})
7201+
7202+
t.Run("all broadcasts fail returns empty slice", func(t *testing.T) {
7203+
lggr, observed := logger.TestLoggerObserved(t, zapcore.WarnLevel)
7204+
r := &ReportingPlugin{
7205+
lggr: lggr,
7206+
metrics: newTestMetrics(t),
7207+
marshalBlob: func(ocr3_1types.BlobHandle) ([]byte, error) {
7208+
return []byte("handle"), nil
7209+
},
7210+
}
7211+
7212+
fetcher := &errorBlobBroadcastFetcher{err: errors.New("network down")}
7213+
payloads := [][]byte{[]byte("p1"), []byte("p2")}
7214+
ids := []string{"req-1", "req-2"}
7215+
7216+
result := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7217+
assert.Empty(t, result)
7218+
7219+
warnLogs := observed.FilterMessage("failed to broadcast pending queue item as blob, skipping")
7220+
assert.Equal(t, 2, warnLogs.Len())
7221+
})
7222+
7223+
t.Run("marshal blob failure skips item and logs warning", func(t *testing.T) {
7224+
lggr, observed := logger.TestLoggerObserved(t, zapcore.WarnLevel)
7225+
r := &ReportingPlugin{
7226+
lggr: lggr,
7227+
metrics: newTestMetrics(t),
7228+
marshalBlob: func(ocr3_1types.BlobHandle) ([]byte, error) {
7229+
return nil, errors.New("marshal error")
7230+
},
7231+
}
7232+
7233+
fetcher := &callbackBlobFetcher{fn: func([]byte) error { return nil }}
7234+
payloads := [][]byte{[]byte("p1"), []byte("p2")}
7235+
ids := []string{"req-1", "req-2"}
7236+
7237+
result := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7238+
assert.Empty(t, result)
7239+
7240+
warnLogs := observed.FilterMessage("failed to marshal blob handle, skipping")
7241+
assert.Equal(t, 2, warnLogs.Len())
7242+
})
7243+
7244+
t.Run("mix of broadcast and marshal failures", func(t *testing.T) {
7245+
lggr, observed := logger.TestLoggerObserved(t, zapcore.WarnLevel)
7246+
7247+
marshalCallCount := atomic.Int32{}
7248+
r := &ReportingPlugin{
7249+
lggr: lggr,
7250+
metrics: newTestMetrics(t),
7251+
marshalBlob: func(ocr3_1types.BlobHandle) ([]byte, error) {
7252+
n := marshalCallCount.Add(1)
7253+
if n == 1 {
7254+
return nil, errors.New("marshal error")
7255+
}
7256+
return []byte("handle"), nil
7257+
},
7258+
}
7259+
7260+
fetcher := &callbackBlobFetcher{fn: func(payload []byte) error {
7261+
if string(payload) == "p1" {
7262+
return errors.New("broadcast error")
7263+
}
7264+
return nil
7265+
}}
7266+
7267+
payloads := [][]byte{[]byte("p1"), []byte("p2"), []byte("p3")}
7268+
ids := []string{"req-1", "req-2", "req-3"}
7269+
7270+
result := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7271+
7272+
broadcastWarns := observed.FilterMessage("failed to broadcast pending queue item as blob, skipping")
7273+
marshalWarns := observed.FilterMessage("failed to marshal blob handle, skipping")
7274+
assert.Equal(t, 1, broadcastWarns.Len())
7275+
assert.Equal(t, 1, marshalWarns.Len())
7276+
assert.Len(t, result, 1)
7277+
})
7278+
}

0 commit comments

Comments
 (0)