Skip to content

Commit d897cc7

Browse files
committed
feat(go-forwarder): add Kinesis invocation
1 parent 2236c14 commit d897cc7

3 files changed

Lines changed: 284 additions & 0 deletions

File tree

aws/logs_monitoring_go/cmd/forwarder/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ func handleRequest(cfg *config.Config) func(context.Context, json.RawMessage) er
3535
switch invocationSource {
3636
case parsing.InvocationSourceCloudwatchLogs:
3737
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleCloudwatchLogs)
38+
case parsing.InvocationSourceKinesis:
39+
return pipeline.Run(ctx, event, cfg, forwarding.CloudwatchStorage, parsing.HandleKinesis)
3840
case parsing.InvocationSourceS3:
3941
return pipeline.Run(ctx, event, cfg, forwarding.S3Storage, parsing.HandleS3)
4042
default:
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"context"
10+
"encoding/base64"
11+
"encoding/json"
12+
"fmt"
13+
"log/slog"
14+
15+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/concurrent"
16+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
17+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
18+
"github.com/aws/aws-lambda-go/events"
19+
)
20+
21+
func HandleKinesis(ctx context.Context, event json.RawMessage, cfg *config.Config, out chan<- model.CloudwatchLogEntry) error {
22+
var kinesisEvent events.KinesisEvent
23+
if err := json.Unmarshal(event, &kinesisEvent); err != nil {
24+
return fmt.Errorf("unmarshal: %w", err)
25+
}
26+
27+
for _, record := range kinesisEvent.Records {
28+
cwEvent := events.CloudwatchLogsEvent{
29+
AWSLogs: events.CloudwatchLogsRawData{
30+
Data: base64.StdEncoding.EncodeToString(record.Kinesis.Data),
31+
},
32+
}
33+
34+
cwRaw, err := json.Marshal(cwEvent)
35+
if err != nil {
36+
return fmt.Errorf("marshal cloudwatch event from kinesis: %w", err)
37+
}
38+
39+
entries, err := parseCloudwatchLogs(ctx, cwRaw, cfg)
40+
if err != nil {
41+
slog.WarnContext(ctx, "skipping kinesis record", "error", err)
42+
continue
43+
}
44+
45+
for _, entry := range entries {
46+
if err := concurrent.SafeSender(ctx, out, entry); err != nil {
47+
return err
48+
}
49+
}
50+
}
51+
return nil
52+
}
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2026-Present Datadog, Inc.
5+
6+
package parsing
7+
8+
import (
9+
"bytes"
10+
"compress/gzip"
11+
"context"
12+
"encoding/json"
13+
"testing"
14+
15+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/config"
16+
"github.com/DataDog/datadog-serverless-functions/aws/logs_monitoring_go/internal/model"
17+
"github.com/aws/aws-lambda-go/lambdacontext"
18+
"github.com/google/go-cmp/cmp"
19+
)
20+
21+
func TestHandleKinesis(t *testing.T) {
22+
t.Parallel()
23+
24+
ctx := lambdacontext.NewContext(context.Background(), &lambdacontext.LambdaContext{
25+
InvokedFunctionArn: "arn:aws:lambda:us-east-1:123456789012:function:forwarder",
26+
})
27+
28+
tests := map[string]struct {
29+
event json.RawMessage
30+
config *config.Config
31+
chanSize int
32+
want []model.CloudwatchLogEntry
33+
wantErr bool
34+
}{
35+
"not a Kinesis event": {
36+
event: json.RawMessage(`{"awslogs":{"data":"dGVzdA=="}}`),
37+
config: &config.Config{},
38+
want: nil,
39+
},
40+
"invalid JSON": {
41+
event: json.RawMessage(`not json`),
42+
config: &config.Config{},
43+
wantErr: true,
44+
},
45+
"single log": {
46+
event: mustKinesisEvent(t, gzipJSON(t, map[string]any{
47+
"messageType": "DATA_MESSAGE",
48+
"owner": "601427279990",
49+
"logGroup": "/aws/lambda/testing-datadog",
50+
"logStream": "2024/10/10/[$LATEST]20bddfd5a2dc4c6b97ac02800eae90d0",
51+
"logEvents": []map[string]any{
52+
{"id": "ev1", "timestamp": 1583425836114, "message": `{"status": "debug", "message": "hello"}`},
53+
},
54+
})),
55+
config: &config.Config{},
56+
chanSize: 1,
57+
want: []model.CloudwatchLogEntry{
58+
{
59+
ID: "ev1",
60+
Timestamp: 1583425836114,
61+
Message: `{"status": "debug", "message": "hello"}`,
62+
Source: "lambda",
63+
SourceCategory: "aws",
64+
Service: "lambda",
65+
Host: "/aws/lambda/testing-datadog",
66+
Tags: model.Tags{"service:lambda", "forwardername:", "forwarder_version:6.0"},
67+
AWS: model.CloudwatchMetadata{
68+
Metadata: model.Metadata{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"},
69+
Logs: model.CloudwatchLogsContext{
70+
LogGroup: "/aws/lambda/testing-datadog",
71+
LogStream: "2024/10/10/[$LATEST]20bddfd5a2dc4c6b97ac02800eae90d0",
72+
Owner: "601427279990",
73+
},
74+
},
75+
},
76+
},
77+
},
78+
"multiple records": {
79+
event: mustKinesisEvent(t,
80+
gzipJSON(t, map[string]any{
81+
"messageType": "DATA_MESSAGE",
82+
"owner": "111111111111",
83+
"logGroup": "/aws/lambda/fn-a",
84+
"logStream": "stream-a",
85+
"logEvents": []map[string]any{
86+
{"id": "a1", "timestamp": 1000, "message": "from-a"},
87+
},
88+
}),
89+
gzipJSON(t, map[string]any{
90+
"messageType": "DATA_MESSAGE",
91+
"owner": "222222222222",
92+
"logGroup": "/aws/rds/cluster",
93+
"logStream": "stream-b",
94+
"logEvents": []map[string]any{
95+
{"id": "b1", "timestamp": 2000, "message": "from-b"},
96+
},
97+
}),
98+
),
99+
config: &config.Config{},
100+
chanSize: 2,
101+
want: []model.CloudwatchLogEntry{
102+
{
103+
ID: "a1", Timestamp: 1000, Message: "from-a",
104+
Source: "lambda", SourceCategory: "aws", Service: "lambda",
105+
Host: "/aws/lambda/fn-a",
106+
Tags: model.Tags{"service:lambda", "forwardername:", "forwarder_version:6.0"},
107+
AWS: model.CloudwatchMetadata{
108+
Metadata: model.Metadata{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"},
109+
Logs: model.CloudwatchLogsContext{LogGroup: "/aws/lambda/fn-a", LogStream: "stream-a", Owner: "111111111111"},
110+
},
111+
},
112+
{
113+
ID: "b1", Timestamp: 2000, Message: "from-b",
114+
Source: "cloudwatch", SourceCategory: "aws", Service: "cloudwatch",
115+
Host: "/aws/rds/cluster",
116+
Tags: model.Tags{"service:cloudwatch", "forwardername:", "forwarder_version:6.0"},
117+
AWS: model.CloudwatchMetadata{
118+
Metadata: model.Metadata{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"},
119+
Logs: model.CloudwatchLogsContext{LogGroup: "/aws/rds/cluster", LogStream: "stream-b", Owner: "222222222222"},
120+
},
121+
},
122+
},
123+
},
124+
"bad record is skipped": {
125+
event: mustKinesisEvent(t,
126+
[]byte("not valid gzip"),
127+
gzipJSON(t, map[string]any{
128+
"messageType": "DATA_MESSAGE",
129+
"owner": "123456789012",
130+
"logGroup": "/aws/lambda/good",
131+
"logStream": "stream",
132+
"logEvents": []map[string]any{
133+
{"id": "g1", "timestamp": 3000, "message": "survived"},
134+
},
135+
}),
136+
),
137+
config: &config.Config{},
138+
chanSize: 1,
139+
want: []model.CloudwatchLogEntry{
140+
{
141+
ID: "g1", Timestamp: 3000, Message: "survived",
142+
Source: "lambda", SourceCategory: "aws", Service: "lambda",
143+
Host: "/aws/lambda/good",
144+
Tags: model.Tags{"service:lambda", "forwardername:", "forwarder_version:6.0"},
145+
AWS: model.CloudwatchMetadata{
146+
Metadata: model.Metadata{ARN: "arn:aws:lambda:us-east-1:123456789012:function:forwarder"},
147+
Logs: model.CloudwatchLogsContext{LogGroup: "/aws/lambda/good", LogStream: "stream", Owner: "123456789012"},
148+
},
149+
},
150+
},
151+
},
152+
}
153+
154+
for name, tc := range tests {
155+
t.Run(name, func(t *testing.T) {
156+
t.Parallel()
157+
158+
out := make(chan model.CloudwatchLogEntry, tc.chanSize)
159+
160+
err := HandleKinesis(ctx, tc.event, tc.config, out)
161+
close(out)
162+
163+
var got []model.CloudwatchLogEntry
164+
for entry := range out {
165+
got = append(got, entry)
166+
}
167+
168+
if tc.wantErr {
169+
if err == nil {
170+
t.Fatal("want error, got nil")
171+
}
172+
return
173+
}
174+
if err != nil {
175+
t.Fatalf("unexpected error: %v", err)
176+
}
177+
178+
if diff := cmp.Diff(tc.want, got); diff != "" {
179+
t.Errorf("mismatch (-want +got):\n%s", diff)
180+
}
181+
})
182+
}
183+
}
184+
185+
func gzipJSON(t *testing.T, v any) []byte {
186+
t.Helper()
187+
188+
raw, err := json.Marshal(v)
189+
if err != nil {
190+
t.Fatalf("marshal: %v", err)
191+
}
192+
193+
var buf bytes.Buffer
194+
w := gzip.NewWriter(&buf)
195+
if _, err := w.Write(raw); err != nil {
196+
t.Fatalf("gzip write: %v", err)
197+
}
198+
199+
if err := w.Close(); err != nil {
200+
t.Fatalf("gzip close: %v", err)
201+
}
202+
203+
return buf.Bytes()
204+
}
205+
206+
func mustKinesisEvent(t *testing.T, records ...[]byte) json.RawMessage {
207+
t.Helper()
208+
209+
type rec struct {
210+
Kinesis struct {
211+
Data []byte `json:"data"`
212+
} `json:"kinesis"`
213+
}
214+
evt := struct {
215+
Records []rec `json:"Records"`
216+
}{}
217+
218+
for _, data := range records {
219+
r := rec{}
220+
r.Kinesis.Data = data
221+
evt.Records = append(evt.Records, r)
222+
}
223+
224+
raw, err := json.Marshal(evt)
225+
if err != nil {
226+
t.Fatalf("marshal kinesis event: %v", err)
227+
}
228+
229+
return raw
230+
}

0 commit comments

Comments
 (0)