Skip to content

[AWSINTS-3517] feat(go-forwarder): add EventBridge#1119

Merged
ndakkoune merged 6 commits into
nabil.dakkoune/go-forwarderfrom
nabil.dakkoune/AWSINTS-3517
May 11, 2026
Merged

[AWSINTS-3517] feat(go-forwarder): add EventBridge#1119
ndakkoune merged 6 commits into
nabil.dakkoune/go-forwarderfrom
nabil.dakkoune/AWSINTS-3517

Conversation

@ndakkoune

@ndakkoune ndakkoune commented May 6, 2026

Copy link
Copy Markdown
Contributor

What does this PR do?

Adds EventBridge as supported event (both S3 and Cloudwatch) + refactor parsing logic to extract the payloads.

Motivation

Testing Guidelines

Additional Notes

  • We only have one EventBridge handler which is triggered on EventBridge events not coming from S3, otherwise we format it as an S3 one and throw it to the s3 handler

Types of changes

  • Bug fix
  • New feature
  • Breaking change
  • Misc (docs, refactoring, dependency upgrade, etc.)

Check all that apply

  • This PR's description is comprehensive
  • This PR contains breaking changes that are documented in the description
  • This PR introduces new APIs or parameters that are documented and unlikely to change in the foreseeable future
  • This PR impacts documentation, and it has been updated (or a ticket has been logged)
  • This PR's changes are covered by the automated tests
  • This PR collects user input/sensitive content into Datadog
  • This PR passes the integration tests (ask a Datadog member to run the tests)
  • This PR passes the unit tests
  • This PR passes the installation tests (ask a Datadog member to run the tests)

@github-actions github-actions Bot added the aws label May 6, 2026
@ndakkoune ndakkoune requested a review from ViBiOh May 7, 2026 07:19
eventSourceKinesis = "aws:kinesis"
)

func Parse(event json.RawMessage) ([]ParsedEvent, error) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[]ParsedEvent instead of ParsedEvent because the future SQS PR will have to handle multiple records

@ndakkoune ndakkoune marked this pull request as ready for review May 7, 2026 07:44
@ndakkoune ndakkoune requested a review from a team as a code owner May 7, 2026 07:44
@ViBiOh ViBiOh self-assigned this May 7, 2026
if err != nil {
return nil, fmt.Errorf("read record key: %w", err)
}
if key != eventSourceKey && key != eventSourceSNSKey {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To have case insensitive comparison, you can use strings.EqualFold

Comment on lines +76 to +78
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use testify/assert to have something like assert.NoError(t, err)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make a codebase test update in a separate PR.

Comment on lines +80 to +91
if len(got) != len(tc.want) {
t.Fatalf("got %d events, want %d", len(got), len(tc.want))
}

for i, pe := range got {
if pe.ContentType != tc.want[i] {
t.Errorf("event[%d]: got ContentType %v, want %v", i, pe.ContentType, tc.want[i])
}
if len(pe.Payload) == 0 {
t.Errorf("event[%d]: empty payload", i)
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if len(got) != len(tc.want) {
t.Fatalf("got %d events, want %d", len(got), len(tc.want))
}
for i, pe := range got {
if pe.ContentType != tc.want[i] {
t.Errorf("event[%d]: got ContentType %v, want %v", i, pe.ContentType, tc.want[i])
}
if len(pe.Payload) == 0 {
t.Errorf("event[%d]: empty payload", i)
}
}
assert.Equal(t, tc.want, got)

})

g.Go(func() error {
eg.Go(func() error {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove from goroutine

for _, parsedEvent := range parsedEvents {
handler, err := handling.NewHandler(parsedEvent.ContentType, cfg)
if err != nil {
return err

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error chaining

dec := json.NewDecoder(bytes.NewReader(event))

if t, err := dec.Token(); err != nil || t != json.Delim('{') {
return "", errors.New("decode eventbridge source: expected '{'")

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return "", errors.New("decode eventbridge source: expected '{'")
return "", errors.New("decode source: expected '{'")

if err != nil {
return nil, fmt.Errorf("build s3 event from eventbridge: %w", err)
}
return []ParsedEvent{{ContentTypeS3, s3Event}}, nil

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add field name and do not rely on struct ordering

@ndakkoune ndakkoune requested a review from ViBiOh May 7, 2026 12:58
@ndakkoune ndakkoune merged commit cc56236 into nabil.dakkoune/go-forwarder May 11, 2026
10 checks passed
@ndakkoune ndakkoune deleted the nabil.dakkoune/AWSINTS-3517 branch May 11, 2026 08:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants