Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions plugin/output/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ type Config struct {
// > Enable split big batches
SplitBatch bool `json:"split_batch" default:"false"` // *

// > @3@4@5@6
// >
// > Enable partial shipment big batches
// > Only one flag out of two (SplitBatch or ItemizeBatch) can be initialized
ItemizeBatch bool `json:"itemize_batch" default:"false"` // *

// > @3@4@5@6
// >
// > Retention milliseconds for retry to DB.
Expand Down Expand Up @@ -334,9 +340,12 @@ func (p *Plugin) out(workerData *pipeline.WorkerData, batch *pipeline.Batch) err
var statusCode int
var err error

if p.config.SplitBatch {
switch true {
case p.config.SplitBatch:
statusCode, err = p.sendSplit(0, eventsCount, data.begin, data.outBuf)
} else {
case p.config.ItemizeBatch:
statusCode, err = p.sendItemize(0, eventsCount, data.begin, data.outBuf)
default:
statusCode, err = p.send(data.outBuf)
}

Expand Down Expand Up @@ -407,6 +416,40 @@ func (p *Plugin) sendSplit(left int, right int, begin []int, data []byte) (int,
return http.StatusOK, nil
}

func (p *Plugin) sendItemize(left int, right int, begin []int, data []byte) (statusCode int, err error) {
statusCode = http.StatusOK
err = nil
if left == right {
return statusCode, err
}

for i := left; i < right; i++ {
sc, r := p.client.DoTimeout(
http.MethodPost,
p.config.ContentType,
data[begin[i]:begin[i+1]],
p.config.ConnectionTimeout_,
nil)
if r != nil {
p.sendErrorMetric.WithLabelValues(strconv.Itoa(sc)).Inc()

switch sc {
case http.StatusRequestEntityTooLarge:
// can't save even one log
if right-i == 1 {
statusCode = sc
err = r
}
default:
statusCode = sc
err = r
}
}
}

return statusCode, err
}

func (p *Plugin) getAuthHeader() string {
if p.config.APIKey != "" {
return "ApiKey " + p.config.APIKey
Expand Down