Skip to content

feat(go-forwarder): add SQS as failed event storage#1152

Open
ndakkoune wants to merge 18 commits into
nabil.dakkoune/go-forwarderfrom
nabil.dakkoune/AWSINTS-3666
Open

feat(go-forwarder): add SQS as failed event storage#1152
ndakkoune wants to merge 18 commits into
nabil.dakkoune/go-forwarderfrom
nabil.dakkoune/AWSINTS-3666

Conversation

@ndakkoune

@ndakkoune ndakkoune commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

This PR adds SQS as a failed event storage.

A refactor was needed on storing.Storage, which previously was not abstract enough to support diverse storage services. A new storing.Batch struct has been created to represent failed batches from the forwarder. Batch.DeleteHandle is a unique identifier of the underlying stored batch mechanism (key for S3, receiptHandler for SQS), used to delete the batch if the retry succeeds.

Diverging behaviors from the Python behavior are :

  • Using sqs.SendMessageBatch, a batch equivalent of sqs.SendMessage to minimize API calls on Store().
  • retry_prefix which would equals to either logs, metrics or traces, and stored as a message attribute is deleted since we know only deals with logs.
  • function_prefix which would equal to the sha1 of the forwarder's arn and stored as a message attribute is deleted. It was used to differentiate between message sent from the forwarder, from other the consumers. Sharing the same SQS queue between unrelated use-cases is a bad design and would highly enhance API costs by receiving noisy messages.

Side notes :

  • We now have DD_STORE_FAILED_EVENTS, DD_S3_BUCKET_NAME and DD_SQS_QUEUE_URL which wraps the same logic around the retry mechanism: storing failed event. Imo, we should only keep one of these (since is one or the other), or a new DD_FAILED_EVENT_STORAGE which would directly take either an S3 bucket name or an SQS queue URL, if such identification is feasible.

@github-actions github-actions Bot added the aws label Jun 9, 2026
@ndakkoune ndakkoune requested a review from ge0Aja June 9, 2026 19:36
Comment thread aws/logs_monitoring_go/internal/batching/batcher.go Outdated
Comment thread aws/logs_monitoring_go/internal/storing/sqs.go Outdated
@datadog-official

datadog-official Bot commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

Pipelines

Fix all issues with BitsAI

⚠️ Warnings

🚦 1 Pipeline job failed

Go Forwarder | test   View in Datadog   GitHub Actions

Useful? React with 👍 / 👎

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 4d9ae86 | Docs | Datadog PR Page | Give us feedback!

Base automatically changed from nabil.dakkoune/AWSINTS-3736 to nabil.dakkoune/go-forwarder June 15, 2026 15:33
@ndakkoune ndakkoune force-pushed the nabil.dakkoune/AWSINTS-3666 branch from e4b4d3a to 002e0bb Compare June 15, 2026 15:42
Comment thread aws/logs_monitoring_go/internal/batching/batcher.go Outdated
Comment thread aws/logs_monitoring_go/internal/batching/batcher.go Outdated
Comment thread aws/logs_monitoring_go/internal/storing/s3_test.go
)

const (
maxSizePerSQSMessage = 1000 * 1024 // Overhead for other attributes than message body

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The maximum SQS message size is 1MiB.
However, we have to take all the attributes of sqs.SendMessageInput into account, so we're forced to keep an overhead.

@ndakkoune ndakkoune marked this pull request as ready for review June 16, 2026 15:30
@ndakkoune ndakkoune requested a review from a team as a code owner June 16, 2026 15:30
@ge0Aja ge0Aja self-assigned this Jun 18, 2026
Comment thread aws/logs_monitoring_go/internal/batching/batcher.go
Comment thread aws/logs_monitoring_go/internal/storing/storage.go
Comment thread aws/logs_monitoring_go/internal/batching/batcher.go
Comment thread aws/logs_monitoring_go/internal/batching/batcher.go Outdated
Comment thread aws/logs_monitoring_go/internal/batching/batcher.go Outdated
Comment thread aws/logs_monitoring_go/internal/batching/batcher.go Outdated
return nil
}

func (b *Batcher) StartYield(items []json.RawMessage) iter.Seq2[json.RawMessage, error] {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed IRL let's try the two channel approach

Comment thread aws/logs_monitoring_go/internal/batching/batcher.go Outdated
Comment thread aws/logs_monitoring_go/internal/forwarding/forwarding.go Outdated
Comment thread aws/logs_monitoring_go/internal/forwarding/forwarding.go Outdated
return Batch{
Data: batch,
StorageTag: out.Metadata[metadataStorageTagKey],
DeleteKey: aws.ToString(object.Key),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't the key known to the getter ? why are we returning it with the batch ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would have to return either return (body io.Reader, storageTag string, err error) or manually add the storedBatch.DeleteKey=... inside the loop.
Since we are forced to encapsulate the GetObject call into a function (since we have to defer.Close() the out.Body), I found it more simpler to encapsulate the whole batching logic inside getBatch instead to additionally set the DeleteKey inside the loop.

@ndakkoune ndakkoune force-pushed the nabil.dakkoune/AWSINTS-3666 branch from ff37812 to 7855ace Compare June 18, 2026 11:23
Comment on lines +24 to +28
func NewConfig(maxItemSize, maxBatchSize, maxItemsPerBatch int) Config {
return Config{
maxItemSize: maxItemSize,
maxBatchSize: maxBatchSize,
maxItemsPerBatch: maxItemsPerBatch,

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had three solution to reuse the Batcher.Start() method :

  1. Keep the old functional option pattern and change the in channel from model.LogEntry to any. Too much restrictive: we would lose type safety and change the Handler signature.
  2. Keep the old functional option pattern, make the Batcher and the Option both generic. Too much verbose, e.g. batching.WithMaxItemSize[model.LogEntry](1*1024*1024) three times and it does not provide any value since only the Start method leverage genericity.
  3. Delete the old functional option pattern, use a config struct directly and make the batcher generic.

Opted for 3.

@ndakkoune ndakkoune requested a review from ge0Aja June 18, 2026 12:56
Comment thread aws/logs_monitoring_go/internal/storing/sqs.go Outdated

@ge0Aja ge0Aja left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm i trust you on the comments

@ndakkoune ndakkoune force-pushed the nabil.dakkoune/AWSINTS-3666 branch from 80bf865 to c9592eb Compare June 19, 2026 13:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants