[AWSINTS-3449] feat(go-forwarder): add batching#1100
Conversation
023f210 to
0d60d8d
Compare
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent" | ||
| ) | ||
|
|
||
| const ( |
There was a problem hiding this comment.
let's have this as a proper struct that accepts an input channel / interface with methods like Batch and flush.
Also prefer to have flush as an independent method, not sure why you chose to create it as a var inside Batch
There was a problem hiding this comment.
Also prefer to have flush as an independent method, not sure why you chose to create it as a var inside Batch
It was to leverage variable closures.
There was a problem hiding this comment.
and what would be the benefit from that ?
There was a problem hiding this comment.
By doing so we avoided the need of a generic struct and made the flushing logic in very few lines of code, but it's outdated.
| ) | ||
|
|
||
| const ( | ||
| maxItemSize = 512 * 1024 |
There was a problem hiding this comment.
could you double check on the current limits that we have in the python crawler set for charcters or byte arrays ?
There was a problem hiding this comment.
I deliberately increased the values to optimize the number of API calls we make to DD since network calls are by far the highest time consuming operations of the forwarder.
|
general comment: if you've already merged the forwarding part please rebase this PR and use the batcher in the forward call |
@ge0Aja Imo don't think we should make the forwarding part explicitly call the batching one. By separating the two, we'll make them decoupled and let the future X forwarding (goroutine) workers read the |
| maxItemsPerBatch = 1000 | ||
| ) | ||
|
|
||
| type Batcher[T any] struct { |
There was a problem hiding this comment.
Do we need this to be generic ? afaiu we only expect byte arrays
There was a problem hiding this comment.
For the next milestone, T will be either a CloudwatchLogEntry or a S3LogEntry, thus using generics.
There was a problem hiding this comment.
ok, I deliberately asked since the previous version expected byte arrays and batched them together. while in this version we're both marshalling and batch at once.
There was a problem hiding this comment.
Np, I'm making a little drawing to show how the types/data flows through the stages to be on the same wave length for everyone
There was a problem hiding this comment.
yep it would be great (for a small tech design doc), I'll approve for now
I do think this is important as I failed to understand how you wanted to use the forwarding/batching parts together which could make some comments irrelevant. afaiu, we wanted to pass raw byte arrays to the batcher passing through the forwarder, but I'm a bit lost now since you're introducing a generic batcher so I'm not sure how's is this gonna be used. |
| } | ||
| } | ||
|
|
||
| func (b *Batcher[T]) Batch(ctx context.Context) error { |
What does this PR do?
Add the batching logic.
Motivation
Testing Guidelines
make testAdditional Notes
Increased batch size, items per batch and max item size to better reflect what's in the documentation/backend.
Types of changes
Check all that apply