Skip to content

[AWSINTS-3449] feat(go-forwarder): add batching#1100

Merged
ndakkoune merged 3 commits into
nabil.dakkoune/go-forwarderfrom
nabil.dakkoune/AWSINTS-3449
Apr 10, 2026
Merged

[AWSINTS-3449] feat(go-forwarder): add batching#1100
ndakkoune merged 3 commits into
nabil.dakkoune/go-forwarderfrom
nabil.dakkoune/AWSINTS-3449

Conversation

@ndakkoune
Copy link
Copy Markdown
Contributor

@ndakkoune ndakkoune commented Apr 3, 2026

What does this PR do?

Add the batching logic.

Motivation

Testing Guidelines

make test

Additional Notes

Increased batch size, items per batch and max item size to better reflect what's in the documentation/backend.

Types of changes

  • Bug fix
  • New feature
  • Breaking change
  • Misc (docs, refactoring, dependency upgrade, etc.)

Check all that apply

  • This PR's description is comprehensive
  • This PR contains breaking changes that are documented in the description
  • This PR introduces new APIs or parameters that are documented and unlikely to change in the foreseeable future
  • This PR impacts documentation, and it has been updated (or a ticket has been logged)
  • This PR's changes are covered by the automated tests
  • This PR collects user input/sensitive content into Datadog
  • This PR passes the integration tests (ask a Datadog member to run the tests)
  • This PR passes the unit tests
  • This PR passes the installation tests (ask a Datadog member to run the tests)

@ndakkoune ndakkoune requested a review from a team as a code owner April 3, 2026 13:48
@github-actions github-actions Bot added the aws label Apr 3, 2026
@ndakkoune ndakkoune changed the title [AWSX-3449] feat(go-forwarder): add batching [AWSINTS-3449] feat(go-forwarder): add batching Apr 7, 2026
@ndakkoune ndakkoune force-pushed the nabil.dakkoune/AWSINTS-3449 branch from 023f210 to 0d60d8d Compare April 8, 2026 09:05
@ge0Aja ge0Aja self-assigned this Apr 8, 2026
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
)

const (
Copy link
Copy Markdown
Contributor

@ge0Aja ge0Aja Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's have this as a proper struct that accepts an input channel / interface with methods like Batch and flush.
Also prefer to have flush as an independent method, not sure why you chose to create it as a var inside Batch

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also prefer to have flush as an independent method, not sure why you chose to create it as a var inside Batch

It was to leverage variable closures.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and what would be the benefit from that ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By doing so we avoided the need of a generic struct and made the flushing logic in very few lines of code, but it's outdated.

)

const (
maxItemSize = 512 * 1024
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you double check on the current limits that we have in the python crawler set for charcters or byte arrays ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deliberately increased the values to optimize the number of API calls we make to DD since network calls are by far the highest time consuming operations of the forwarder.

Comment thread aws/logs_monitoring_go/internal/processing/batching.go
@ge0Aja
Copy link
Copy Markdown
Contributor

ge0Aja commented Apr 8, 2026

general comment: if you've already merged the forwarding part please rebase this PR and use the batcher in the forward call

@ndakkoune
Copy link
Copy Markdown
Contributor Author

general comment: if you've already merged the forwarding part please rebase this PR and use the batcher in the forward call

@ge0Aja Imo don't think we should make the forwarding part explicitly call the batching one. By separating the two, we'll make them decoupled and let the future X forwarding (goroutine) workers read the out channel feeded by the Y (goroutines) batcher workers. The main pipeline stage will take care of wiring them together.

@ndakkoune ndakkoune requested a review from ge0Aja April 9, 2026 09:52
maxItemsPerBatch = 1000
)

type Batcher[T any] struct {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this to be generic ? afaiu we only expect byte arrays

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the next milestone, T will be either a CloudwatchLogEntry or a S3LogEntry, thus using generics.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I deliberately asked since the previous version expected byte arrays and batched them together. while in this version we're both marshalling and batch at once.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Np, I'm making a little drawing to show how the types/data flows through the stages to be on the same wave length for everyone

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep it would be great (for a small tech design doc), I'll approve for now

@ge0Aja
Copy link
Copy Markdown
Contributor

ge0Aja commented Apr 9, 2026

general comment: if you've already merged the forwarding part please rebase this PR and use the batcher in the forward call

@ge0Aja Imo don't think we should make the forwarding part explicitly call the batching one. By separating the two, we'll make them decoupled and let the future X forwarding (goroutine) workers read the out channel feeded by the Y (goroutines) batcher workers. The main pipeline stage will take care of wiring them together.

I do think this is important as I failed to understand how you wanted to use the forwarding/batching parts together which could make some comments irrelevant. afaiu, we wanted to pass raw byte arrays to the batcher passing through the forwarder, but I'm a bit lost now since you're introducing a generic batcher so I'm not sure how's is this gonna be used.
Maybe to clarify, I wanted to rebase this PR to understand how are these two parts will be put together. You don't have to necessarily to call one from the other as you mentioned if you want to keep them separate, which is fine.

}
}

func (b *Batcher[T]) Batch(ctx context.Context) error {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename to Start

Comment thread aws/logs_monitoring_go/internal/processing/batching.go
@ndakkoune ndakkoune merged commit 8b178f1 into nabil.dakkoune/go-forwarder Apr 10, 2026
10 checks passed
@ndakkoune ndakkoune deleted the nabil.dakkoune/AWSINTS-3449 branch April 10, 2026 11:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants