Skip to content

Commit 6212192

Browse files
committed
fix: split oversized chip ingress batches
1 parent 555d35c commit 6212192

2 files changed

Lines changed: 190 additions & 23 deletions

File tree

pkg/chipingress/batch/client.go

Lines changed: 70 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package batch
33
import (
44
"context"
55
"errors"
6+
"fmt"
67
"strconv"
78
"sync"
89
"sync/atomic"
@@ -252,34 +253,80 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback)
252253

253254
go func() {
254255
defer func() { <-b.maxConcurrentSends }()
255-
// this is specifically to prevent long running network calls
256-
ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout)
257-
defer cancel()
258256

259-
events := make([]*chipingress.CloudEventPb, len(messages))
260-
for i, msg := range messages {
261-
events[i] = msg.event
262-
}
263-
batchReq := &chipingress.CloudEventBatch{Events: events}
264-
batchBytes := proto.Size(batchReq)
265-
startedAt := time.Now()
266-
_, err := b.client.PublishBatch(ctxTimeout, batchReq)
267-
b.metrics.recordSend(context.Background(), len(messages), batchBytes, time.Since(startedAt), err == nil)
268-
if err != nil {
269-
b.log.Errorw("failed to publish batch", "error", err)
270-
}
271-
// the callbacks are placed in their own goroutine to not block releasing the semaphore
272-
// we use a wait group, to ensure all callbacks are completed if .Stop() is called.
273-
b.callbackWg.Go(func() {
274-
for _, msg := range messages {
275-
if msg.callback != nil {
276-
msg.callback(err)
277-
}
257+
for _, batchMessages := range splitMessagesByRequestSize(messages, b.maxGRPCRequestSize) {
258+
batchReq, batchBytes := newBatchRequest(batchMessages)
259+
if b.maxGRPCRequestSize > 0 && batchBytes > b.maxGRPCRequestSize {
260+
err := fmt.Errorf("publish batch serialized size %d exceeds max gRPC request size %d", batchBytes, b.maxGRPCRequestSize)
261+
b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, 0, false)
262+
b.log.Errorw("failed to publish batch", "error", err)
263+
b.completeBatchCallbacks(batchMessages, err)
264+
continue
278265
}
279-
})
266+
267+
// this is specifically to prevent long running network calls
268+
ctxTimeout, cancel := context.WithTimeout(ctx, b.maxPublishTimeout)
269+
startedAt := time.Now()
270+
_, err := b.client.PublishBatch(ctxTimeout, batchReq)
271+
cancel()
272+
273+
b.metrics.recordSend(context.Background(), len(batchMessages), batchBytes, time.Since(startedAt), err == nil)
274+
if err != nil {
275+
b.log.Errorw("failed to publish batch", "error", err)
276+
}
277+
b.completeBatchCallbacks(batchMessages, err)
278+
}
280279
}()
281280
}
282281

282+
func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err error) {
283+
callbackMessages, callbackErr := messages, err
284+
// the callbacks are placed in their own goroutine to not block releasing the semaphore
285+
// we use a wait group, to ensure all callbacks are completed if .Stop() is called.
286+
b.callbackWg.Go(func() {
287+
for _, msg := range callbackMessages {
288+
if msg.callback != nil {
289+
msg.callback(callbackErr)
290+
}
291+
}
292+
})
293+
}
294+
295+
func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback {
296+
if len(messages) == 0 {
297+
return nil
298+
}
299+
if maxRequestSize <= 0 {
300+
return [][]*messageWithCallback{messages}
301+
}
302+
303+
var batches [][]*messageWithCallback
304+
current := make([]*messageWithCallback, 0, len(messages))
305+
for _, msg := range messages {
306+
candidate := append(current, msg)
307+
_, candidateBytes := newBatchRequest(candidate)
308+
if len(current) > 0 && candidateBytes > maxRequestSize {
309+
batches = append(batches, current)
310+
current = []*messageWithCallback{msg}
311+
continue
312+
}
313+
current = candidate
314+
}
315+
if len(current) > 0 {
316+
batches = append(batches, current)
317+
}
318+
return batches
319+
}
320+
321+
func newBatchRequest(messages []*messageWithCallback) (*chipingress.CloudEventBatch, int) {
322+
events := make([]*chipingress.CloudEventPb, len(messages))
323+
for i, msg := range messages {
324+
events[i] = msg.event
325+
}
326+
batchReq := &chipingress.CloudEventBatch{Events: events}
327+
return batchReq, proto.Size(batchReq)
328+
}
329+
283330
// WithBatchSize sets the number of messages to accumulate before sending a batch
284331
func WithBatchSize(batchSize int) Opt {
285332
return func(c *Client) {

pkg/chipingress/batch/client_test.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,15 @@ import (
88
"testing"
99
"time"
1010

11+
cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
1112
"github.com/stretchr/testify/assert"
1213
"github.com/stretchr/testify/mock"
1314
"github.com/stretchr/testify/require"
1415
"go.opentelemetry.io/otel"
1516
"go.opentelemetry.io/otel/attribute"
1617
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
1718
"go.opentelemetry.io/otel/sdk/metric/metricdata"
19+
"google.golang.org/protobuf/proto"
1820

1921
"github.com/smartcontractkit/chainlink-common/pkg/chipingress"
2022
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks"
@@ -211,6 +213,112 @@ func TestSendBatch(t *testing.T) {
211213

212214
mockClient.AssertExpectations(t)
213215
})
216+
217+
t.Run("splits oversized batch by max gRPC request size", func(t *testing.T) {
218+
events := []*chipingress.CloudEventPb{
219+
largeTestEvent("test-id-1"),
220+
largeTestEvent("test-id-2"),
221+
largeTestEvent("test-id-3"),
222+
largeTestEvent("test-id-4"),
223+
largeTestEvent("test-id-5"),
224+
}
225+
maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: events[:2]})
226+
require.LessOrEqual(t, proto.Size(&chipingress.CloudEventBatch{Events: events[:1]}), maxRequestSize)
227+
require.Greater(t, proto.Size(&chipingress.CloudEventBatch{Events: events[:3]}), maxRequestSize)
228+
229+
mockClient := mocks.NewClient(t)
230+
done := make(chan struct{})
231+
callbackDone := make(chan error, len(events))
232+
var mu sync.Mutex
233+
var publishedIDs []string
234+
var publishedSizes []int
235+
236+
mockClient.
237+
On("PublishBatch",
238+
mock.Anything,
239+
mock.MatchedBy(func(batch *chipingress.CloudEventBatch) bool {
240+
return len(batch.Events) > 0 && proto.Size(batch) <= maxRequestSize
241+
}),
242+
).
243+
Return(&chipingress.PublishResponse{}, nil).
244+
Run(func(args mock.Arguments) {
245+
batch := args.Get(1).(*chipingress.CloudEventBatch)
246+
mu.Lock()
247+
for _, event := range batch.Events {
248+
publishedIDs = append(publishedIDs, event.Id)
249+
}
250+
publishedSizes = append(publishedSizes, proto.Size(batch))
251+
if len(publishedIDs) == len(events) {
252+
close(done)
253+
}
254+
mu.Unlock()
255+
}).
256+
Times(3)
257+
258+
client, err := NewBatchClient(mockClient, WithMaxGRPCRequestSize(maxRequestSize))
259+
require.NoError(t, err)
260+
261+
messages := make([]*messageWithCallback, 0, len(events))
262+
for _, event := range events {
263+
messages = append(messages, &messageWithCallback{
264+
event: event,
265+
callback: func(err error) {
266+
callbackDone <- err
267+
},
268+
})
269+
}
270+
271+
client.sendBatch(t.Context(), messages)
272+
273+
select {
274+
case <-done:
275+
case <-time.After(time.Second):
276+
t.Fatal("timeout waiting for split batches to be sent")
277+
}
278+
for range events {
279+
select {
280+
case err := <-callbackDone:
281+
require.NoError(t, err)
282+
case <-time.After(time.Second):
283+
t.Fatal("timeout waiting for split batch callback")
284+
}
285+
}
286+
287+
assert.Equal(t, []string{"test-id-1", "test-id-2", "test-id-3", "test-id-4", "test-id-5"}, publishedIDs)
288+
for _, size := range publishedSizes {
289+
assert.LessOrEqual(t, size, maxRequestSize)
290+
}
291+
mockClient.AssertExpectations(t)
292+
})
293+
294+
t.Run("doesn't publish a single event over max gRPC request size", func(t *testing.T) {
295+
mockClient := mocks.NewClient(t)
296+
callbackDone := make(chan error, 1)
297+
event := largeTestEvent("oversized-id")
298+
maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{event}}) - 1
299+
300+
client, err := NewBatchClient(mockClient, WithMaxGRPCRequestSize(maxRequestSize))
301+
require.NoError(t, err)
302+
303+
client.sendBatch(t.Context(), []*messageWithCallback{
304+
{
305+
event: event,
306+
callback: func(err error) {
307+
callbackDone <- err
308+
},
309+
},
310+
})
311+
312+
select {
313+
case err := <-callbackDone:
314+
require.Error(t, err)
315+
assert.Contains(t, err.Error(), "exceeds max gRPC request size")
316+
case <-time.After(time.Second):
317+
t.Fatal("timeout waiting for oversized batch callback")
318+
}
319+
320+
mockClient.AssertNotCalled(t, "PublishBatch", mock.Anything, mock.Anything)
321+
})
214322
}
215323

216324
func TestStart(t *testing.T) {
@@ -903,6 +1011,18 @@ func countCounters(counters *sync.Map) int {
9031011
return n
9041012
}
9051013

1014+
func largeTestEvent(id string) *chipingress.CloudEventPb {
1015+
return &chipingress.CloudEventPb{
1016+
Id: id,
1017+
Source: "test-source",
1018+
Type: "test.event.type",
1019+
SpecVersion: "1.0",
1020+
Data: &cepb.CloudEvent_BinaryData{
1021+
BinaryData: []byte("0123456789abcdefghijklmnopqrstuvwxyz"),
1022+
},
1023+
}
1024+
}
1025+
9061026
func TestSeqnum(t *testing.T) {
9071027
t.Run("dropped messages consume seqnum and create detectable gaps", func(t *testing.T) {
9081028
client, err := NewBatchClient(nil, WithMessageBuffer(1))

0 commit comments

Comments
 (0)