@@ -8,13 +8,15 @@ import (
88 "testing"
99 "time"
1010
11+ cepb "github.com/cloudevents/sdk-go/binding/format/protobuf/v2/pb"
1112 "github.com/stretchr/testify/assert"
1213 "github.com/stretchr/testify/mock"
1314 "github.com/stretchr/testify/require"
1415 "go.opentelemetry.io/otel"
1516 "go.opentelemetry.io/otel/attribute"
1617 sdkmetric "go.opentelemetry.io/otel/sdk/metric"
1718 "go.opentelemetry.io/otel/sdk/metric/metricdata"
19+ "google.golang.org/protobuf/proto"
1820
1921 "github.com/smartcontractkit/chainlink-common/pkg/chipingress"
2022 "github.com/smartcontractkit/chainlink-common/pkg/chipingress/mocks"
@@ -211,6 +213,112 @@ func TestSendBatch(t *testing.T) {
211213
212214 mockClient .AssertExpectations (t )
213215 })
216+
217+ t .Run ("splits oversized batch by max gRPC request size" , func (t * testing.T ) {
218+ events := []* chipingress.CloudEventPb {
219+ largeTestEvent ("test-id-1" ),
220+ largeTestEvent ("test-id-2" ),
221+ largeTestEvent ("test-id-3" ),
222+ largeTestEvent ("test-id-4" ),
223+ largeTestEvent ("test-id-5" ),
224+ }
225+ maxRequestSize := proto .Size (& chipingress.CloudEventBatch {Events : events [:2 ]})
226+ require .LessOrEqual (t , proto .Size (& chipingress.CloudEventBatch {Events : events [:1 ]}), maxRequestSize )
227+ require .Greater (t , proto .Size (& chipingress.CloudEventBatch {Events : events [:3 ]}), maxRequestSize )
228+
229+ mockClient := mocks .NewClient (t )
230+ done := make (chan struct {})
231+ callbackDone := make (chan error , len (events ))
232+ var mu sync.Mutex
233+ var publishedIDs []string
234+ var publishedSizes []int
235+
236+ mockClient .
237+ On ("PublishBatch" ,
238+ mock .Anything ,
239+ mock .MatchedBy (func (batch * chipingress.CloudEventBatch ) bool {
240+ return len (batch .Events ) > 0 && proto .Size (batch ) <= maxRequestSize
241+ }),
242+ ).
243+ Return (& chipingress.PublishResponse {}, nil ).
244+ Run (func (args mock.Arguments ) {
245+ batch := args .Get (1 ).(* chipingress.CloudEventBatch )
246+ mu .Lock ()
247+ for _ , event := range batch .Events {
248+ publishedIDs = append (publishedIDs , event .Id )
249+ }
250+ publishedSizes = append (publishedSizes , proto .Size (batch ))
251+ if len (publishedIDs ) == len (events ) {
252+ close (done )
253+ }
254+ mu .Unlock ()
255+ }).
256+ Times (3 )
257+
258+ client , err := NewBatchClient (mockClient , WithMaxGRPCRequestSize (maxRequestSize ))
259+ require .NoError (t , err )
260+
261+ messages := make ([]* messageWithCallback , 0 , len (events ))
262+ for _ , event := range events {
263+ messages = append (messages , & messageWithCallback {
264+ event : event ,
265+ callback : func (err error ) {
266+ callbackDone <- err
267+ },
268+ })
269+ }
270+
271+ client .sendBatch (t .Context (), messages )
272+
273+ select {
274+ case <- done :
275+ case <- time .After (time .Second ):
276+ t .Fatal ("timeout waiting for split batches to be sent" )
277+ }
278+ for range events {
279+ select {
280+ case err := <- callbackDone :
281+ require .NoError (t , err )
282+ case <- time .After (time .Second ):
283+ t .Fatal ("timeout waiting for split batch callback" )
284+ }
285+ }
286+
287+ assert .Equal (t , []string {"test-id-1" , "test-id-2" , "test-id-3" , "test-id-4" , "test-id-5" }, publishedIDs )
288+ for _ , size := range publishedSizes {
289+ assert .LessOrEqual (t , size , maxRequestSize )
290+ }
291+ mockClient .AssertExpectations (t )
292+ })
293+
294+ t .Run ("doesn't publish a single event over max gRPC request size" , func (t * testing.T ) {
295+ mockClient := mocks .NewClient (t )
296+ callbackDone := make (chan error , 1 )
297+ event := largeTestEvent ("oversized-id" )
298+ maxRequestSize := proto .Size (& chipingress.CloudEventBatch {Events : []* chipingress.CloudEventPb {event }}) - 1
299+
300+ client , err := NewBatchClient (mockClient , WithMaxGRPCRequestSize (maxRequestSize ))
301+ require .NoError (t , err )
302+
303+ client .sendBatch (t .Context (), []* messageWithCallback {
304+ {
305+ event : event ,
306+ callback : func (err error ) {
307+ callbackDone <- err
308+ },
309+ },
310+ })
311+
312+ select {
313+ case err := <- callbackDone :
314+ require .Error (t , err )
315+ assert .Contains (t , err .Error (), "exceeds max gRPC request size" )
316+ case <- time .After (time .Second ):
317+ t .Fatal ("timeout waiting for oversized batch callback" )
318+ }
319+
320+ mockClient .AssertNotCalled (t , "PublishBatch" , mock .Anything , mock .Anything )
321+ })
214322}
215323
216324func TestStart (t * testing.T ) {
@@ -903,6 +1011,18 @@ func countCounters(counters *sync.Map) int {
9031011 return n
9041012}
9051013
1014+ func largeTestEvent (id string ) * chipingress.CloudEventPb {
1015+ return & chipingress.CloudEventPb {
1016+ Id : id ,
1017+ Source : "test-source" ,
1018+ Type : "test.event.type" ,
1019+ SpecVersion : "1.0" ,
1020+ Data : & cepb.CloudEvent_BinaryData {
1021+ BinaryData : []byte ("0123456789abcdefghijklmnopqrstuvwxyz" ),
1022+ },
1023+ }
1024+ }
1025+
9061026func TestSeqnum (t * testing.T ) {
9071027 t .Run ("dropped messages consume seqnum and create detectable gaps" , func (t * testing.T ) {
9081028 client , err := NewBatchClient (nil , WithMessageBuffer (1 ))
0 commit comments