Skip to content

Commit 53660b0

Browse files
Limit vault blob broadcast concurrency
1 parent 1580972 commit 53660b0

2 files changed

Lines changed: 99 additions & 9 deletions

File tree

core/services/ocr2/plugins/vault/plugin.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ import (
4343
"github.com/smartcontractkit/chainlink/v2/core/logger"
4444
)
4545

46+
const (
47+
blobBroadcastTimeout = 2 * time.Second
48+
maxConcurrentBlobBroadcasts = 10
49+
)
50+
4651
var (
4752
isValidIDComponent = regexp.MustCompile(`^[a-zA-Z0-9_]+$`).MatchString
4853
)
@@ -544,11 +549,12 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type
544549
// broadcastBlobPayloads broadcasts each payload as a blob in parallel to reduce
545550
// Observation() latency (shortening this phase helps the OCR round finish within
546551
// DeltaProgress). Each call is given a 2-second timeout so that a single slow
547-
// broadcast cannot stall the entire batch. Individual broadcast failures are logged
548-
// and skipped rather than aborting the entire observation, so that one problematic
549-
// payload does not prevent the remaining items from being observed. Context
550-
// cancellation/deadline errors on the parent context are propagated immediately so
551-
// that expired rounds fail fast.
552+
// broadcast cannot stall the entire batch. No more than 10 broadcasts are allowed
553+
// in flight at a time. Individual broadcast failures are logged and skipped rather
554+
// than aborting the entire observation, so that one problematic payload does not
555+
// prevent the remaining items from being observed. Context cancellation/deadline
556+
// errors on the parent context are propagated immediately so that expired rounds
557+
// fail fast.
552558
func (r *ReportingPlugin) broadcastBlobPayloads(
553559
ctx context.Context,
554560
fetcher ocr3_1types.BlobBroadcastFetcher,
@@ -563,11 +569,12 @@ func (r *ReportingPlugin) broadcastBlobPayloads(
563569
r.lggr.Debugw("observation blob broadcast finished", "seqNr", seqNr, "blobCount", len(payloads), "elapsed", time.Since(start))
564570
}()
565571

566-
const perBlobTimeout = 2 * time.Second
567572
var g errgroup.Group
573+
g.SetLimit(maxConcurrentBlobBroadcasts)
568574
for i, payload := range payloads {
575+
requestID := requestIDs[i]
569576
g.Go(func() error {
570-
broadcastCtx, cancel := context.WithTimeout(ctx, perBlobTimeout)
577+
broadcastCtx, cancel := context.WithTimeout(ctx, blobBroadcastTimeout)
571578
defer cancel()
572579

573580
blobHandle, err := fetcher.BroadcastBlob(broadcastCtx, payload, ocr3_1types.BlobExpirationHintSequenceNumber{SeqNr: seqNr + 2})
@@ -577,7 +584,7 @@ func (r *ReportingPlugin) broadcastBlobPayloads(
577584
}
578585
r.lggr.Warnw("failed to broadcast pending queue item as blob, skipping",
579586
"seqNr", seqNr,
580-
"requestID", requestIDs[i],
587+
"requestID", requestID,
581588
"err", err)
582589
return nil
583590
}
@@ -586,7 +593,7 @@ func (r *ReportingPlugin) broadcastBlobPayloads(
586593
if err != nil {
587594
r.lggr.Warnw("failed to marshal blob handle, skipping",
588595
"seqNr", seqNr,
589-
"requestID", requestIDs[i],
596+
"requestID", requestID,
590597
"err", err)
591598
return nil
592599
}

core/services/ocr2/plugins/vault/plugin_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7185,6 +7185,89 @@ func TestPlugin_broadcastBlobPayloads(t *testing.T) {
71857185
}
71867186
})
71877187

7188+
t.Run("does not exceed max concurrent broadcasts", func(t *testing.T) {
7189+
lggr := logger.TestLogger(t)
7190+
r := &ReportingPlugin{
7191+
lggr: lggr,
7192+
metrics: newTestMetrics(t),
7193+
marshalBlob: func(ocr3_1types.BlobHandle) ([]byte, error) {
7194+
return []byte("handle"), nil
7195+
},
7196+
}
7197+
7198+
payloads := make([][]byte, maxConcurrentBlobBroadcasts*2+1)
7199+
ids := make([]string, len(payloads))
7200+
for i := range payloads {
7201+
payloads[i] = []byte(fmt.Sprintf("payload-%d", i))
7202+
ids[i] = fmt.Sprintf("req-%d", i)
7203+
}
7204+
7205+
var active atomic.Int32
7206+
var maxActive atomic.Int32
7207+
started := make(chan struct{}, len(payloads))
7208+
release := make(chan struct{})
7209+
released := atomic.Bool{}
7210+
releaseBroadcasts := func() {
7211+
if released.CompareAndSwap(false, true) {
7212+
close(release)
7213+
}
7214+
}
7215+
defer releaseBroadcasts()
7216+
7217+
fetcher := &ctxCallbackBlobFetcher{fn: func(ctx context.Context, _ []byte) error {
7218+
current := active.Add(1)
7219+
defer active.Add(-1)
7220+
7221+
for {
7222+
maxSeen := maxActive.Load()
7223+
if current <= maxSeen || maxActive.CompareAndSwap(maxSeen, current) {
7224+
break
7225+
}
7226+
}
7227+
7228+
started <- struct{}{}
7229+
select {
7230+
case <-release:
7231+
return nil
7232+
case <-ctx.Done():
7233+
return ctx.Err()
7234+
}
7235+
}}
7236+
7237+
type broadcastResult struct {
7238+
payloads [][]byte
7239+
err error
7240+
}
7241+
done := make(chan broadcastResult, 1)
7242+
go func() {
7243+
result, err := r.broadcastBlobPayloads(t.Context(), fetcher, 1, payloads, ids)
7244+
done <- broadcastResult{payloads: result, err: err}
7245+
}()
7246+
7247+
for i := 0; i < maxConcurrentBlobBroadcasts; i++ {
7248+
select {
7249+
case <-started:
7250+
case <-time.After(time.Second):
7251+
t.Fatalf("timed out waiting for broadcast %d to start", i+1)
7252+
}
7253+
}
7254+
7255+
assert.Never(t, func() bool {
7256+
return maxActive.Load() > int32(maxConcurrentBlobBroadcasts)
7257+
}, 100*time.Millisecond, 10*time.Millisecond)
7258+
7259+
releaseBroadcasts()
7260+
7261+
select {
7262+
case result := <-done:
7263+
require.NoError(t, result.err)
7264+
assert.Len(t, result.payloads, len(payloads))
7265+
case <-time.After(time.Second):
7266+
t.Fatal("timed out waiting for broadcasts to complete")
7267+
}
7268+
assert.LessOrEqual(t, maxActive.Load(), int32(maxConcurrentBlobBroadcasts))
7269+
})
7270+
71887271
t.Run("failed broadcast is skipped and logged", func(t *testing.T) {
71897272
lggr, observed := logger.TestLoggerObserved(t, zapcore.WarnLevel)
71907273
r := &ReportingPlugin{

0 commit comments

Comments
 (0)