Skip to content

Commit abf7486

Browse files
committed
batch: reserve 10KiB gRPC framing overhead in split and add batch_splits_total metric
1 parent 11a4f36 commit abf7486

2 files changed

Lines changed: 147 additions & 19 deletions

File tree

pkg/chipingress/batch/client.go

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ type seqnumKey struct {
3131

3232
// Client is a batching client that accumulates messages and sends them in batches.
3333
type Client struct {
34-
client chipingress.Client
35-
batchSize int
36-
maxGRPCRequestSize int
37-
cloneEvent bool
34+
client chipingress.Client
35+
batchSize int
36+
maxGRPCRequestSize int // configured max, used for metrics/error reporting
37+
effectiveMaxRequestSize int // maxGRPCRequestSize minus grpcFramingOverhead, used for splitting
38+
cloneEvent bool
3839
maxConcurrentSends chan struct{}
3940
batchInterval time.Duration
4041
maxPublishTimeout time.Duration
@@ -57,6 +58,7 @@ type batchClientMetrics struct {
5758
requestSizeBytes otelmetric.Int64Histogram
5859
requestLatencyMS otelmetric.Float64Histogram
5960
configInfo otelmetric.Int64Gauge
61+
batchSplitsTotal otelmetric.Int64Counter
6062
batchSizeAttr otelmetric.MeasurementOption
6163
maxGRPCReqSizeAttr otelmetric.MeasurementOption
6264
successStatusAttr otelmetric.MeasurementOption
@@ -69,11 +71,12 @@ type Opt func(*Client)
6971
// NewBatchClient creates a new batching client with the given options.
7072
func NewBatchClient(client chipingress.Client, opts ...Opt) (*Client, error) {
7173
c := &Client{
72-
client: client,
73-
log: zap.NewNop().Sugar(),
74-
batchSize: 10,
75-
maxGRPCRequestSize: 10 * 1024 * 1024,
76-
cloneEvent: true,
74+
client: client,
75+
log: zap.NewNop().Sugar(),
76+
batchSize: 10,
77+
maxGRPCRequestSize: 10 * 1024 * 1024,
78+
effectiveMaxRequestSize: 10*1024*1024 - grpcFramingOverhead,
79+
cloneEvent: true,
7780
maxConcurrentSends: make(chan struct{}, 1),
7881
messageBuffer: make(chan *messageWithCallback, 200),
7982
batchInterval: 100 * time.Millisecond,
@@ -260,7 +263,11 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback)
260263
go func() {
261264
defer func() { <-b.maxConcurrentSends }()
262265

263-
for _, batchMessages := range splitMessagesByRequestSize(messages, b.maxGRPCRequestSize) {
266+
splitBatches := splitMessagesByRequestSize(messages, b.effectiveMaxRequestSize)
267+
if len(splitBatches) > 1 {
268+
b.metrics.batchSplitsTotal.Add(ctx, 1)
269+
}
270+
for _, batchMessages := range splitBatches {
264271
batchReq, batchBytes := newBatchRequest(batchMessages)
265272
if b.maxGRPCRequestSize > 0 && batchBytes > b.maxGRPCRequestSize {
266273
err := fmt.Errorf("publish batch serialized size %d exceeds max gRPC request size %d", batchBytes, b.maxGRPCRequestSize)
@@ -298,6 +305,15 @@ func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err err
298305
})
299306
}
300307

308+
// grpcFramingOverhead accounts for gRPC framing, HTTP/2 headers, auth tokens,
309+
// tracing metadata, and other per-request overhead not captured by proto.Size.
310+
const grpcFramingOverhead = 10 * 1024 // 10 KiB
311+
312+
// minMaxGRPCRequestSize is the minimum allowed value for maxGRPCRequestSize.
313+
// Values below this threshold are clamped to ensure the framing overhead
314+
// reservation remains meaningful.
315+
const minMaxGRPCRequestSize = 1024 * 1024 // 1 MiB
316+
301317
func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback {
302318
if len(messages) == 0 {
303319
return nil
@@ -340,10 +356,14 @@ func WithBatchSize(batchSize int) Opt {
340356
}
341357
}
342358

343-
// WithMaxGRPCRequestSize sets the max gRPC request size in bytes used for metric comparison attributes.
359+
// WithMaxGRPCRequestSize sets the max gRPC request size in bytes used for splitting batches.
360+
// Values below minMaxGRPCRequestSize (1 MiB) are clamped up to ensure the framing
361+
// overhead reservation remains meaningful.
344362
func WithMaxGRPCRequestSize(maxReqSize int) Opt {
345363
return func(c *Client) {
346-
c.maxGRPCRequestSize = maxReqSize
364+
clamped := max(maxReqSize, minMaxGRPCRequestSize)
365+
c.maxGRPCRequestSize = clamped
366+
c.effectiveMaxRequestSize = clamped - grpcFramingOverhead
347367
}
348368
}
349369

@@ -439,6 +459,14 @@ func newBatchClientMetrics() (batchClientMetrics, error) {
439459
if err != nil {
440460
return batchClientMetrics{}, err
441461
}
462+
batchSplitsTotal, err := meter.Int64Counter(
463+
"chip_ingress.batch.batch_splits_total",
464+
otelmetric.WithDescription("Total number of times a batch was split due to exceeding the effective gRPC request size limit (max request size minus reserved framing overhead)"),
465+
otelmetric.WithUnit("{split}"),
466+
)
467+
if err != nil {
468+
return batchClientMetrics{}, err
469+
}
442470

443471
return batchClientMetrics{
444472
sendRequestsTotal: sendRequestsTotal,
@@ -447,6 +475,7 @@ func newBatchClientMetrics() (batchClientMetrics, error) {
447475
requestSizeBytes: requestSizeBytes,
448476
requestLatencyMS: requestLatencyMS,
449477
configInfo: configInfo,
478+
batchSplitsTotal: batchSplitsTotal,
450479
successStatusAttr: otelmetric.WithAttributeSet(attribute.NewSet(
451480
attribute.String("status", "success"),
452481
)),

pkg/chipingress/batch/client_test.go

Lines changed: 106 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,43 @@ func TestNewBatchClient(t *testing.T) {
6464
assert.Equal(t, 1000, cap(client.messageBuffer))
6565
})
6666

67+
t.Run("WithMaxGRPCRequestSize", func(t *testing.T) {
68+
t.Run("applies value at or above minimum", func(t *testing.T) {
69+
client, err := NewBatchClient(mocks.NewClient(t), WithMaxGRPCRequestSize(4*1024*1024))
70+
require.NoError(t, err)
71+
assert.Equal(t, 4*1024*1024, client.maxGRPCRequestSize)
72+
assert.Equal(t, 4*1024*1024-grpcFramingOverhead, client.effectiveMaxRequestSize)
73+
})
74+
75+
t.Run("clamps value below minimum to minMaxGRPCRequestSize", func(t *testing.T) {
76+
client, err := NewBatchClient(mocks.NewClient(t), WithMaxGRPCRequestSize(512))
77+
require.NoError(t, err)
78+
assert.Equal(t, minMaxGRPCRequestSize, client.maxGRPCRequestSize)
79+
assert.Equal(t, minMaxGRPCRequestSize-grpcFramingOverhead, client.effectiveMaxRequestSize)
80+
})
81+
82+
t.Run("clamps zero to minMaxGRPCRequestSize", func(t *testing.T) {
83+
client, err := NewBatchClient(mocks.NewClient(t), WithMaxGRPCRequestSize(0))
84+
require.NoError(t, err)
85+
assert.Equal(t, minMaxGRPCRequestSize, client.maxGRPCRequestSize)
86+
assert.Equal(t, minMaxGRPCRequestSize-grpcFramingOverhead, client.effectiveMaxRequestSize)
87+
})
88+
89+
t.Run("clamps negative to minMaxGRPCRequestSize", func(t *testing.T) {
90+
client, err := NewBatchClient(mocks.NewClient(t), WithMaxGRPCRequestSize(-1))
91+
require.NoError(t, err)
92+
assert.Equal(t, minMaxGRPCRequestSize, client.maxGRPCRequestSize)
93+
assert.Equal(t, minMaxGRPCRequestSize-grpcFramingOverhead, client.effectiveMaxRequestSize)
94+
})
95+
96+
t.Run("exact minimum is accepted as-is", func(t *testing.T) {
97+
client, err := NewBatchClient(mocks.NewClient(t), WithMaxGRPCRequestSize(minMaxGRPCRequestSize))
98+
require.NoError(t, err)
99+
assert.Equal(t, minMaxGRPCRequestSize, client.maxGRPCRequestSize)
100+
assert.Equal(t, minMaxGRPCRequestSize-grpcFramingOverhead, client.effectiveMaxRequestSize)
101+
})
102+
})
103+
67104
t.Run("records failure metrics when request exceeds configured max grpc size", func(t *testing.T) {
68105
reader, restore := useTestMeterProvider(t)
69106
defer restore()
@@ -79,9 +116,11 @@ func TestNewBatchClient(t *testing.T) {
79116
WithBatchSize(1),
80117
WithBatchInterval(time.Second),
81118
WithMessageBuffer(10),
82-
WithMaxGRPCRequestSize(maxGRPCSize),
83119
)
84120
require.NoError(t, err)
121+
client.maxGRPCRequestSize = maxGRPCSize
122+
client.effectiveMaxRequestSize = maxGRPCSize
123+
require.NoError(t, err)
85124
client.Start(t.Context())
86125

87126
err = client.QueueMessage(&chipingress.CloudEventPb{
@@ -303,8 +342,10 @@ func TestSendBatch(t *testing.T) {
303342
}).
304343
Times(3)
305344

306-
client, err := NewBatchClient(mockClient, WithMaxGRPCRequestSize(maxRequestSize))
345+
client, err := NewBatchClient(mockClient)
307346
require.NoError(t, err)
347+
client.maxGRPCRequestSize = maxRequestSize
348+
client.effectiveMaxRequestSize = maxRequestSize
308349

309350
messages := make([]*messageWithCallback, 0, len(events))
310351
for _, event := range events {
@@ -339,14 +380,71 @@ func TestSendBatch(t *testing.T) {
339380
mockClient.AssertExpectations(t)
340381
})
341382

383+
t.Run("records batch_splits_total metric when batch is split", func(t *testing.T) {
384+
reader, restore := useTestMeterProvider(t)
385+
defer restore()
386+
387+
events := []*chipingress.CloudEventPb{
388+
largeTestEvent("split-metric-1"),
389+
largeTestEvent("split-metric-2"),
390+
largeTestEvent("split-metric-3"),
391+
}
392+
// Set maxRequestSize so that 2 events fit but 3 do not, forcing a split.
393+
maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: events[:2]})
394+
395+
mockClient := mocks.NewClient(t)
396+
done := make(chan struct{})
397+
var mu sync.Mutex
398+
var publishCount int
399+
400+
mockClient.
401+
On("PublishBatch", mock.Anything, mock.Anything).
402+
Return(&chipingress.PublishResponse{}, nil).
403+
Run(func(args mock.Arguments) {
404+
mu.Lock()
405+
publishCount++
406+
if publishCount == 2 {
407+
close(done)
408+
}
409+
mu.Unlock()
410+
})
411+
412+
client, err := NewBatchClient(mockClient)
413+
require.NoError(t, err)
414+
client.maxGRPCRequestSize = maxRequestSize
415+
client.effectiveMaxRequestSize = maxRequestSize
416+
417+
messages := make([]*messageWithCallback, 0, len(events))
418+
for _, event := range events {
419+
messages = append(messages, &messageWithCallback{event: event})
420+
}
421+
422+
client.sendBatch(t.Context(), messages)
423+
424+
select {
425+
case <-done:
426+
case <-time.After(time.Second):
427+
t.Fatal("timeout waiting for split batches to be sent")
428+
}
429+
430+
rm := collectResourceMetrics(t, reader)
431+
splitsMetric := mustMetric(t, rm, "chip_ingress.batch.batch_splits_total")
432+
splitsSum, ok := splitsMetric.Data.(metricdata.Sum[int64])
433+
require.True(t, ok)
434+
require.Len(t, splitsSum.DataPoints, 1)
435+
assert.Equal(t, int64(1), splitsSum.DataPoints[0].Value)
436+
})
437+
342438
t.Run("doesn't publish a single event over max gRPC request size", func(t *testing.T) {
343439
mockClient := mocks.NewClient(t)
344440
callbackDone := make(chan error, 1)
345441
event := largeTestEvent("oversized-id")
346442
maxRequestSize := proto.Size(&chipingress.CloudEventBatch{Events: []*chipingress.CloudEventPb{event}}) - 1
347443

348-
client, err := NewBatchClient(mockClient, WithMaxGRPCRequestSize(maxRequestSize))
444+
client, err := NewBatchClient(mockClient)
349445
require.NoError(t, err)
446+
client.maxGRPCRequestSize = maxRequestSize
447+
client.effectiveMaxRequestSize = maxRequestSize
350448

351449
client.sendBatch(t.Context(), []*messageWithCallback{
352450
{
@@ -1350,7 +1448,7 @@ func TestBatchClient_Metrics(t *testing.T) {
13501448
WithBatchSize(1),
13511449
WithBatchInterval(time.Second),
13521450
WithMessageBuffer(10),
1353-
WithMaxGRPCRequestSize(2048),
1451+
WithMaxGRPCRequestSize(minMaxGRPCRequestSize),
13541452
)
13551453
require.NoError(t, err)
13561454
client.Start(t.Context())
@@ -1386,7 +1484,7 @@ func TestBatchClient_Metrics(t *testing.T) {
13861484
reqSize := mustMetric(t, rm, "chip_ingress.batch.request_size_bytes")
13871485
reqSizeHist, ok := reqSize.Data.(metricdata.Histogram[int64])
13881486
require.True(t, ok)
1389-
reqSizePoint := mustInt64HistogramPointWithIntAttr(t, reqSizeHist, "max_grpc_request_size_bytes", 2048)
1487+
reqSizePoint := mustInt64HistogramPointWithIntAttr(t, reqSizeHist, "max_grpc_request_size_bytes", minMaxGRPCRequestSize)
13901488
assert.GreaterOrEqual(t, reqSizePoint.Count, uint64(1))
13911489

13921490
latency := mustMetric(t, rm, "chip_ingress.batch.request_latency_ms")
@@ -1401,7 +1499,7 @@ func TestBatchClient_Metrics(t *testing.T) {
14011499
require.NotEmpty(t, configGauge.DataPoints)
14021500
assert.Equal(t, int64(1), configGauge.DataPoints[0].Value)
14031501
assert.True(t, hasIntAttr(configGauge.DataPoints[0].Attributes, "max_batch_size", 1))
1404-
assert.True(t, hasIntAttr(configGauge.DataPoints[0].Attributes, "max_grpc_request_size_bytes", 2048))
1502+
assert.True(t, hasIntAttr(configGauge.DataPoints[0].Attributes, "max_grpc_request_size_bytes", minMaxGRPCRequestSize))
14051503
})
14061504

14071505
t.Run("records failure counters and latency", func(t *testing.T) {
@@ -1540,11 +1638,12 @@ func BenchmarkSendBatch(b *testing.B) {
15401638
WithBatchSize(100),
15411639
WithMessageBuffer(b.N*100+10),
15421640
WithBatchInterval(time.Hour),
1543-
WithMaxGRPCRequestSize(512),
15441641
)
15451642
if err != nil {
15461643
b.Fatal(err)
15471644
}
1645+
client.maxGRPCRequestSize = 512
1646+
client.effectiveMaxRequestSize = 512
15481647
client.Start(b.Context())
15491648
defer client.Stop()
15501649

0 commit comments

Comments
 (0)