Skip to content

[AWSX-3190] feat(go-forwarder): log model and parsing logic#1088

Merged
ndakkoune merged 10 commits into
nabil.dakkoune/go-forwarderfrom
nabil.dakkoune/AWSX-2143-1
Apr 1, 2026
Merged

[AWSX-3190] feat(go-forwarder): log model and parsing logic#1088
ndakkoune merged 10 commits into
nabil.dakkoune/go-forwarderfrom
nabil.dakkoune/AWSX-2143-1

Conversation

@ndakkoune
Copy link
Copy Markdown
Contributor

@ndakkoune ndakkoune commented Mar 27, 2026

What does this PR do?

Set up log parsing and model logic.

Motivation

Following the migration of the Lambda Log Forwarder to Go, the log model is a necessary and critical step that will dictate how the data flow.

Testing Guidelines

Additional Notes

The natural divergence of both the log events received by the Lambda and the outbound payload sent to Datadog needs a model as generic as possible. The most noticeable design choices are :

  • The CloudwatchLogsContext and CloudwatchLogsLogEvent types that closely mimic some of their equivalents in the github.com/aws/aws-lambda-go/events package.
  • The LogContent interface is responsible to return its own Message(), required in the future steps (e.g. scrubbing) and MarshalFields(), responsible to return the top-level fields to match the intake API format (different for each event source for S3 invocations, especially for CloudTrail).
  • LogEntry.MarshalJSON() is the custom marshaler that merges Content.MarshalFields() so that each LogEntry is serialized to the correct format.

Types of changes

  • Bug fix
  • New feature
  • Breaking change
  • Misc (docs, refactoring, dependency upgrade, etc.)

Check all that apply

  • This PR's description is comprehensive
  • This PR contains breaking changes that are documented in the description
  • This PR introduces new APIs or parameters that are documented and unlikely to change in the foreseeable future
  • This PR impacts documentation, and it has been updated (or a ticket has been logged)
  • This PR's changes are covered by the automated tests
  • This PR collects user input/sensitive content into Datadog
  • This PR passes the integration tests (ask a Datadog member to run the tests)
  • This PR passes the unit tests
  • This PR passes the installation tests (ask a Datadog member to run the tests)

@github-actions github-actions Bot added the aws label Mar 27, 2026
"log/slog"
)

var ForwarderVersion = "dev"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used for ldflags


// detectInvokationSource inspects the raw JSON to determine the invokation type.
func detectInvokationSource(event json.RawMessage) invokationSource {
var probe struct {
Copy link
Copy Markdown
Contributor Author

@ndakkoune ndakkoune Mar 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even though this is not very clean, I don't see another way to detect the invokation type apart from unmarshalling the whole JSON (way more expensive).


if err := json.Unmarshal(event, &probe); err != nil {
slog.Error("failed unmarshal", slog.Any("error", err))
return invokationSourceUnknown
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will log a double error from both the unmarshalling + unknown source. Fair ?

"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
)

type invokationSource int
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to replace the old AWSEventSource by invokationSource, which is way more explicit.

Service string
Host string
Tags string
AWS *AWSMetadata
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pointer here because its shared between all log entries.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to be a pointer for this, we can copy it on each LogEntry. it will not escape to the heap and might be more efficient in the end.

UseFIPS bool
Source string
Host string
CustomTags string
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 suggestion: ‏ We'll mainly used them as a slice of string, so it might be interesting to convert the string directly to a slice when loading the config.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree for CustomTags (and maybe UseFIPS) since we'll append all the specified tags (e.g. env:prod,team:aws) along with other tags such as forwarder_version. However, Source and Host belong other top-level fields (i.e. ddsource:X and host:Y) and I don't see any benefit from that.

Service string
Host string
Tags string
AWS *AWSMetadata
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't need to be a pointer for this, we can copy it on each LogEntry. it will not escape to the heap and might be more efficient in the end.

case invocationSourceCloudwatchLogs:
parseCloudwatchLogs(ctx, event, cfg, out)
default:
slog.Error("unsupported invocation source")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 suggestion: ‏ Add the event as a log field, otherwise it will be complex to debug.

Comment on lines +41 to +45
if err := json.Unmarshal(event, &probe); err != nil {
slog.Error("failed unmarshal", slog.Any("error", err))
return invocationSourceUnknown
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark this and try the decoder approach

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make a separate PR for this. Thank you!

@ndakkoune ndakkoune marked this pull request as ready for review March 30, 2026 09:09
@ndakkoune ndakkoune requested a review from a team as a code owner March 30, 2026 09:09
@ndakkoune ndakkoune changed the title parsing [AWSX-3190] feat(go-forwarder): log model and parsing logic Mar 30, 2026
@ge0Aja ge0Aja self-assigned this Mar 30, 2026
type CloudwatchLogsContent struct {
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Msg string `json:"message"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer to have complete variable names

func (CloudwatchLogsContext) InvocationKey() string { return "awslogs" }

type CloudwatchLogsContent struct {
ID string `json:"id"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the ID here ?

return c.Msg
}

func (c CloudwatchLogsContent) MarshalFields() (map[string]any, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Marshal gives the impression that the output should be a json string, instead we're getting a map. we should either rename the function, or implement as it says

Service string
Host string
Tags []string
AWS AWSMetadata
Copy link
Copy Markdown
Contributor

@ge0Aja ge0Aja Mar 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please rename the variable to match the type suggestion: Metadata

@ndakkoune ndakkoune force-pushed the nabil.dakkoune/AWSX-2143-1 branch from 49d7d3a to f9f52b4 Compare March 31, 2026 09:13
Comment on lines +9 to +10
ID string `json:"id,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These two lines are CloudwatchLogs specific. We'll probably have to put them as pointers in the future, so that they won't appear with their zero values when Marshalling. The other possibility would be to detect the zero values in the backend to deal with them properly (e.g. timestamp: 0 => impossible => generate current timestamp).

type AWSMetadata struct {
InvokedFunctionARN string `json:"invoked_function_arn"`
FunctionVersion string `json:"function_version,omitempty"`
CloudwatchLogs *CloudwatchLogsContext `json:"awslogs,omitempty"`
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the context here avoids us to write a custom MarshalJSON. Since there is only two invocation sources (i.e. CW and S3), it seems a good compromise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this is to be shared then I don't think we should adda a cloudwatch logs specific context here.

}

func detectInvocationSource(event json.RawMessage) invocationSource {
var probe struct {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will benchmark this and probably try the Decoder strategy.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, we did that for cloudtrail events if you're looking for specific keywords at the beginning of the raw json

@ndakkoune ndakkoune requested a review from ge0Aja March 31, 2026 09:30

type LogEntry struct {
ID string `json:"id,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding CloudwatchLogs specific. I thought we agreed to have a separate struct for cloudwatch logs, s3 records, and events. which should remove the need for optional fields.

also , did you check the timestamp format ? is it a linux timestamp or a different format ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timestamps provided by AWS are RFC3339, the Datadog intake endpoints ones expect unix epoch times.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rectifying: Cloudwatch events are also unix epoch time, but S3/SQS/... are still RFC3339

type invocationSource int

const (
invocationSourceUnknown invocationSource = iota
Copy link
Copy Markdown
Contributor

@ge0Aja ge0Aja Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we have this case ? unknown invocation source ? I think the default case in the switch should be enough

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing prevent a customer to create a trigger from an event source we do not support (e.g. Alexa, API Gateway, CloudFront, AppSync).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw the 0 value of iota would become the Cloudwatch one, which could be thought as a default case (we could _ invocationSource = iota to skip this).

@ndakkoune ndakkoune requested a review from ge0Aja April 1, 2026 06:45
APIURL string
LogLevel string
UseFIPS bool
Source string
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forwarder level source override (for all logs) if set by the customer

return invocationSourceUnknown
}

func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: use emit function callback instead of passing the channel explicitly.

@ndakkoune ndakkoune merged commit 5a38306 into nabil.dakkoune/go-forwarder Apr 1, 2026
10 checks passed
@ndakkoune ndakkoune deleted the nabil.dakkoune/AWSX-2143-1 branch April 1, 2026 15:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants