Skip to content

Commit 5f4b2c3

Browse files
committed
batch: reserve 10KiB gRPC framing overhead in split and add batch_splits_total metric
1 parent 43fabe4 commit 5f4b2c3

1 file changed

Lines changed: 25 additions & 2 deletions

File tree

pkg/chipingress/batch/client.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type batchClientMetrics struct {
5757
requestSizeBytes otelmetric.Int64Histogram
5858
requestLatencyMS otelmetric.Float64Histogram
5959
configInfo otelmetric.Int64Gauge
60+
batchSplitsTotal otelmetric.Int64Counter
6061
batchSizeAttr otelmetric.MeasurementOption
6162
maxGRPCReqSizeAttr otelmetric.MeasurementOption
6263
successStatusAttr otelmetric.MeasurementOption
@@ -260,7 +261,11 @@ func (b *Client) sendBatch(ctx context.Context, messages []*messageWithCallback)
260261
go func() {
261262
defer func() { <-b.maxConcurrentSends }()
262263

263-
for _, batchMessages := range splitMessagesByRequestSize(messages, b.maxGRPCRequestSize) {
264+
splitBatches := splitMessagesByRequestSize(messages, b.maxGRPCRequestSize)
265+
if len(splitBatches) > 1 {
266+
b.metrics.batchSplitsTotal.Add(ctx, 1)
267+
}
268+
for _, batchMessages := range splitBatches {
264269
batchReq, batchBytes := newBatchRequest(batchMessages)
265270
if b.maxGRPCRequestSize > 0 && batchBytes > b.maxGRPCRequestSize {
266271
err := fmt.Errorf("publish batch serialized size %d exceeds max gRPC request size %d", batchBytes, b.maxGRPCRequestSize)
@@ -298,6 +303,10 @@ func (b *Client) completeBatchCallbacks(messages []*messageWithCallback, err err
298303
})
299304
}
300305

306+
// grpcFramingOverhead accounts for gRPC framing, HTTP/2 headers, auth tokens,
307+
// tracing metadata, and other per-request overhead not captured by proto.Size.
308+
const grpcFramingOverhead = 10 * 1024 // 10 KiB
309+
301310
func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize int) [][]*messageWithCallback {
302311
if len(messages) == 0 {
303312
return nil
@@ -306,12 +315,17 @@ func splitMessagesByRequestSize(messages []*messageWithCallback, maxRequestSize
306315
return [][]*messageWithCallback{messages}
307316
}
308317

318+
effectiveMax := maxRequestSize - grpcFramingOverhead
319+
if effectiveMax <= 0 {
320+
effectiveMax = maxRequestSize
321+
}
322+
309323
var batches [][]*messageWithCallback
310324
current := make([]*messageWithCallback, 0, len(messages))
311325
for _, msg := range messages {
312326
candidate := append(current, msg)
313327
_, candidateBytes := newBatchRequest(candidate)
314-
if len(current) > 0 && candidateBytes > maxRequestSize {
328+
if len(current) > 0 && candidateBytes > effectiveMax {
315329
batches = append(batches, current)
316330
current = []*messageWithCallback{msg}
317331
continue
@@ -439,6 +453,14 @@ func newBatchClientMetrics() (batchClientMetrics, error) {
439453
if err != nil {
440454
return batchClientMetrics{}, err
441455
}
456+
batchSplitsTotal, err := meter.Int64Counter(
457+
"chip_ingress.batch.batch_splits_total",
458+
otelmetric.WithDescription("Total number of times a batch was split due to exceeding max gRPC request size"),
459+
otelmetric.WithUnit("{split}"),
460+
)
461+
if err != nil {
462+
return batchClientMetrics{}, err
463+
}
442464

443465
return batchClientMetrics{
444466
sendRequestsTotal: sendRequestsTotal,
@@ -447,6 +469,7 @@ func newBatchClientMetrics() (batchClientMetrics, error) {
447469
requestSizeBytes: requestSizeBytes,
448470
requestLatencyMS: requestLatencyMS,
449471
configInfo: configInfo,
472+
batchSplitsTotal: batchSplitsTotal,
450473
successStatusAttr: otelmetric.WithAttributeSet(attribute.NewSet(
451474
attribute.String("status", "success"),
452475
)),

0 commit comments

Comments
 (0)