[AWSX-3190] feat(go-forwarder): log model and parsing logic#1088
Conversation
| "log/slog" | ||
| ) | ||
|
|
||
| var ForwarderVersion = "dev" |
|
|
||
| // detectInvokationSource inspects the raw JSON to determine the invokation type. | ||
| func detectInvokationSource(event json.RawMessage) invokationSource { | ||
| var probe struct { |
There was a problem hiding this comment.
Even though this is not very clean, I don't see another way to detect the invokation type apart from unmarshalling the whole JSON (way more expensive).
|
|
||
| if err := json.Unmarshal(event, &probe); err != nil { | ||
| slog.Error("failed unmarshal", slog.Any("error", err)) | ||
| return invokationSourceUnknown |
There was a problem hiding this comment.
Will log a double error from both the unmarshalling + unknown source. Fair ?
| "github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model" | ||
| ) | ||
|
|
||
| type invokationSource int |
There was a problem hiding this comment.
Decided to replace the old AWSEventSource by invokationSource, which is way more explicit.
| Service string | ||
| Host string | ||
| Tags string | ||
| AWS *AWSMetadata |
There was a problem hiding this comment.
Pointer here because its shared between all log entries.
There was a problem hiding this comment.
It doesn't need to be a pointer for this, we can copy it on each LogEntry. it will not escape to the heap and might be more efficient in the end.
| UseFIPS bool | ||
| Source string | ||
| Host string | ||
| CustomTags string |
There was a problem hiding this comment.
💬 suggestion: We'll mainly used them as a slice of string, so it might be interesting to convert the string directly to a slice when loading the config.
There was a problem hiding this comment.
I agree for CustomTags (and maybe UseFIPS) since we'll append all the specified tags (e.g. env:prod,team:aws) along with other tags such as forwarder_version. However, Source and Host belong other top-level fields (i.e. ddsource:X and host:Y) and I don't see any benefit from that.
| Service string | ||
| Host string | ||
| Tags string | ||
| AWS *AWSMetadata |
There was a problem hiding this comment.
It doesn't need to be a pointer for this, we can copy it on each LogEntry. it will not escape to the heap and might be more efficient in the end.
| case invocationSourceCloudwatchLogs: | ||
| parseCloudwatchLogs(ctx, event, cfg, out) | ||
| default: | ||
| slog.Error("unsupported invocation source") |
There was a problem hiding this comment.
💬 suggestion: Add the event as a log field, otherwise it will be complex to debug.
| if err := json.Unmarshal(event, &probe); err != nil { | ||
| slog.Error("failed unmarshal", slog.Any("error", err)) | ||
| return invocationSourceUnknown | ||
| } | ||
|
|
There was a problem hiding this comment.
Benchmark this and try the decoder approach
There was a problem hiding this comment.
Will make a separate PR for this. Thank you!
| type CloudwatchLogsContent struct { | ||
| ID string `json:"id"` | ||
| Timestamp int64 `json:"timestamp"` | ||
| Msg string `json:"message"` |
There was a problem hiding this comment.
nit: prefer to have complete variable names
| func (CloudwatchLogsContext) InvocationKey() string { return "awslogs" } | ||
|
|
||
| type CloudwatchLogsContent struct { | ||
| ID string `json:"id"` |
| return c.Msg | ||
| } | ||
|
|
||
| func (c CloudwatchLogsContent) MarshalFields() (map[string]any, error) { |
There was a problem hiding this comment.
nit: Marshal gives the impression that the output should be a json string, instead we're getting a map. we should either rename the function, or implement as it says
| Service string | ||
| Host string | ||
| Tags []string | ||
| AWS AWSMetadata |
There was a problem hiding this comment.
please rename the variable to match the type suggestion: Metadata
49d7d3a to
f9f52b4
Compare
| ID string `json:"id,omitempty"` | ||
| Timestamp int64 `json:"timestamp,omitempty"` |
There was a problem hiding this comment.
These two lines are CloudwatchLogs specific. We'll probably have to put them as pointers in the future, so that they won't appear with their zero values when Marshalling. The other possibility would be to detect the zero values in the backend to deal with them properly (e.g. timestamp: 0 => impossible => generate current timestamp).
| type AWSMetadata struct { | ||
| InvokedFunctionARN string `json:"invoked_function_arn"` | ||
| FunctionVersion string `json:"function_version,omitempty"` | ||
| CloudwatchLogs *CloudwatchLogsContext `json:"awslogs,omitempty"` |
There was a problem hiding this comment.
Adding the context here avoids us to write a custom MarshalJSON. Since there is only two invocation sources (i.e. CW and S3), it seems a good compromise.
There was a problem hiding this comment.
if this is to be shared then I don't think we should adda a cloudwatch logs specific context here.
| } | ||
|
|
||
| func detectInvocationSource(event json.RawMessage) invocationSource { | ||
| var probe struct { |
There was a problem hiding this comment.
Will benchmark this and probably try the Decoder strategy.
There was a problem hiding this comment.
yes, we did that for cloudtrail events if you're looking for specific keywords at the beginning of the raw json
|
|
||
| type LogEntry struct { | ||
| ID string `json:"id,omitempty"` | ||
| Timestamp int64 `json:"timestamp,omitempty"` |
There was a problem hiding this comment.
regarding CloudwatchLogs specific. I thought we agreed to have a separate struct for cloudwatch logs, s3 records, and events. which should remove the need for optional fields.
also , did you check the timestamp format ? is it a linux timestamp or a different format ?
There was a problem hiding this comment.
The timestamps provided by AWS are RFC3339, the Datadog intake endpoints ones expect unix epoch times.
There was a problem hiding this comment.
Rectifying: Cloudwatch events are also unix epoch time, but S3/SQS/... are still RFC3339
| type invocationSource int | ||
|
|
||
| const ( | ||
| invocationSourceUnknown invocationSource = iota |
There was a problem hiding this comment.
could we have this case ? unknown invocation source ? I think the default case in the switch should be enough
There was a problem hiding this comment.
Nothing prevent a customer to create a trigger from an event source we do not support (e.g. Alexa, API Gateway, CloudFront, AppSync).
There was a problem hiding this comment.
Btw the 0 value of iota would become the Cloudwatch one, which could be thought as a default case (we could _ invocationSource = iota to skip this).
| APIURL string | ||
| LogLevel string | ||
| UseFIPS bool | ||
| Source string |
There was a problem hiding this comment.
forwarder level source override (for all logs) if set by the customer
| return invocationSourceUnknown | ||
| } | ||
|
|
||
| func parseCloudwatchLogs(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) { |
There was a problem hiding this comment.
suggestion: use emit function callback instead of passing the channel explicitly.
What does this PR do?
Set up log parsing and model logic.
Motivation
Following the migration of the Lambda Log Forwarder to Go, the log model is a necessary and critical step that will dictate how the data flow.
Testing Guidelines
Additional Notes
The natural divergence of both the log events received by the Lambda and the outbound payload sent to Datadog needs a model as generic as possible. The most noticeable design choices are :
CloudwatchLogsContextandCloudwatchLogsLogEventtypes that closely mimic some of their equivalents in thegithub.com/aws/aws-lambda-go/eventspackage.LogContentinterface is responsible to return its ownMessage(), required in the future steps (e.g. scrubbing) andMarshalFields(), responsible to return the top-level fields to match the intake API format (different for each event source for S3 invocations, especially for CloudTrail).LogEntry.MarshalJSON()is the custom marshaler that mergesContent.MarshalFields()so that eachLogEntryis serialized to the correct format.Types of changes
Check all that apply