From 0ef576011bff9eb660b4fc522cc1f62a717c27aa Mon Sep 17 00:00:00 2001 From: lixoi Date: Wed, 18 Mar 2026 18:05:13 +0300 Subject: [PATCH] update http output pugin --- plugin/output/http/http.go | 47 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/plugin/output/http/http.go b/plugin/output/http/http.go index c3ed464ca..2885c8bdc 100644 --- a/plugin/output/http/http.go +++ b/plugin/output/http/http.go @@ -149,6 +149,12 @@ type Config struct { // > Enable split big batches SplitBatch bool `json:"split_batch" default:"false"` // * + // > @3@4@5@6 + // > + // > Enable partial shipment big batches + // > Only one flag out of two (SplitBatch or ItemizeBatch) can be initialized + ItemizeBatch bool `json:"itemize_batch" default:"false"` // * + // > @3@4@5@6 // > // > Retention milliseconds for retry to DB. @@ -334,9 +340,12 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err var statusCode int var err error - if p.config.SplitBatch { + switch true { + case p.config.SplitBatch: statusCode, err = p.sendSplit(0, eventsCount, data.begin, data.outBuf) - } else { + case p.config.ItemizeBatch: + statusCode, err = p.sendItemize(0, eventsCount, data.begin, data.outBuf) + default: statusCode, err = p.send(data.outBuf) } @@ -407,6 +416,40 @@ func (p *Plugin) sendSplit(left int, right int, begin []int, data []byte) (int, return http.StatusOK, nil } +func (p *Plugin) sendItemize(left int, right int, begin []int, data []byte) (statusCode int, err error) { + statusCode = http.StatusOK + err = nil + if left == right { + return statusCode, err + } + + for i := left; i < right; i++ { + sc, r := p.client.DoTimeout( + http.MethodPost, + p.config.ContentType, + data[begin[i]:begin[i+1]], + p.config.ConnectionTimeout_, + nil) + if r != nil { + p.sendErrorMetric.WithLabelValues(strconv.Itoa(sc)).Inc() + + switch sc { + case http.StatusRequestEntityTooLarge: + // can't save even one log + if right-i == 1 { + statusCode = sc + err = r + } + default: + statusCode = sc + err = r + } + } + } + + return statusCode, err +} + func (p *Plugin) getAuthHeader() string { if p.config.APIKey != "" { return "ApiKey " + p.config.APIKey