Skip to content

Commit 024f180

Browse files
committed
feat(errortracking): add FakeIntake aggregator and client support for agent telemetry logs
Add apmTelemetryAggregator to parse agent-logs payloads from /api/v2/apmtelemetry and expose GetAgentTelemetryLogs() on the fakeintake client.
1 parent 7489510 commit 024f180

7 files changed

Lines changed: 270 additions & 0 deletions

File tree

test/fakeintake/aggregator/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ go_library(
55
srcs = [
66
"agenthealthAggregator.go",
77
"apmStatsAggregator.go",
8+
"apmTelemetryAggregator.go",
89
"checkRunAggregator.go",
910
"common.go",
1011
"connectionsAggregator.go",
@@ -58,6 +59,7 @@ go_test(
5859
srcs = [
5960
"agenthealthAggregator_test.go",
6061
"apmStatsAggregator_test.go",
62+
"apmTelemetryAggregator_test.go",
6163
"checkRunAggregator_test.go",
6264
"common_test.go",
6365
"connectionsAggregator_test.go",
@@ -87,6 +89,7 @@ go_test(
8789
"fixtures/agenthealth_multi_bytes",
8890
"fixtures/agenthealth_noissues_bytes",
8991
"fixtures/apm_stats_bytes",
92+
"fixtures/apm_telemetry_bytes",
9093
"fixtures/checkrun_bytes",
9194
"fixtures/connections_bytes",
9295
"fixtures/contimage_bytes",
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016-present Datadog, Inc.
5+
6+
package aggregator
7+
8+
import (
9+
"bytes"
10+
"encoding/json"
11+
"time"
12+
13+
"github.com/DataDog/datadog-agent/test/fakeintake/api"
14+
)
15+
16+
// AgentTelemetryLog represents a single log record from the agent's internal
17+
// telemetry pipeline. It mirrors comp/core/agenttelemetry/impl/logs_payload.go::Log
18+
// and is populated only for payloads whose request_type is "agent-logs".
19+
type AgentTelemetryLog struct {
20+
Level string `json:"level"`
21+
StackTrace string `json:"stack_trace"`
22+
TracerTime int64 `json:"tracer_time"`
23+
Count int `json:"count"`
24+
IsCrash bool `json:"is_crash"`
25+
Message string `json:"message"`
26+
ErrorKind string `json:"error_kind"`
27+
Tags string `json:"tags"`
28+
collectedTime time.Time
29+
}
30+
31+
// name returns the grouping key used by the aggregator. All agent-logs records
32+
// share one key; per-hostname grouping is not needed for single-agent test VMs.
33+
func (l *AgentTelemetryLog) name() string { return "agent-errortracking" }
34+
35+
// GetTags returns no tags — agent telemetry logs carry none on the wire.
36+
func (l *AgentTelemetryLog) GetTags() []string { return []string{} }
37+
38+
// GetCollectedTime returns when the fakeintake server received this payload.
39+
func (l *AgentTelemetryLog) GetCollectedTime() time.Time { return l.collectedTime }
40+
41+
// apmTelemetryEnvelope is the outer POST body sent to /api/v2/apmtelemetry.
42+
// The same endpoint receives metrics, heartbeats, and other request types;
43+
// request_type discriminates agent-logs records from the rest.
44+
type apmTelemetryEnvelope struct {
45+
RequestType string `json:"request_type"`
46+
Payload struct {
47+
Logs []*AgentTelemetryLog `json:"logs"`
48+
} `json:"payload"`
49+
}
50+
51+
// ParseAgentTelemetryLogs parses one api.Payload into zero-or-more
52+
// AgentTelemetryLog items. Payloads whose request_type is not "agent-logs"
53+
// are silently skipped — the /api/v2/apmtelemetry endpoint is shared with
54+
// agent metrics, heartbeats, and other telemetry types.
55+
func ParseAgentTelemetryLogs(payload api.Payload) ([]*AgentTelemetryLog, error) {
56+
if bytes.Equal(payload.Data, []byte("{}")) {
57+
return []*AgentTelemetryLog{}, nil
58+
}
59+
60+
inflated, err := inflate(payload.Data, payload.Encoding)
61+
if err != nil {
62+
return nil, err
63+
}
64+
65+
// The /api/v2/apmtelemetry endpoint is shared: agent metrics, heartbeats,
66+
// and other payload types arrive here too, some encoded as protobuf. If
67+
// JSON unmarshal fails, skip silently — this payload is not agent-logs.
68+
var env apmTelemetryEnvelope
69+
if err := json.Unmarshal(inflated, &env); err != nil {
70+
return []*AgentTelemetryLog{}, nil
71+
}
72+
73+
if env.RequestType != "agent-logs" {
74+
return []*AgentTelemetryLog{}, nil
75+
}
76+
77+
for _, l := range env.Payload.Logs {
78+
l.collectedTime = payload.Timestamp
79+
}
80+
return env.Payload.Logs, nil
81+
}
82+
83+
// AgentTelemetryLogAggregator aggregates AgentTelemetryLog payloads received
84+
// on the /api/v2/apmtelemetry endpoint.
85+
type AgentTelemetryLogAggregator struct {
86+
Aggregator[*AgentTelemetryLog]
87+
}
88+
89+
// NewAgentTelemetryLogAggregator returns a new AgentTelemetryLogAggregator.
90+
func NewAgentTelemetryLogAggregator() AgentTelemetryLogAggregator {
91+
return AgentTelemetryLogAggregator{
92+
Aggregator: newAggregator(ParseAgentTelemetryLogs),
93+
}
94+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
// Unless explicitly stated otherwise all files in this repository are licensed
2+
// under the Apache License Version 2.0.
3+
// This product includes software developed at Datadog (https://www.datadoghq.com/).
4+
// Copyright 2016-present Datadog, Inc.
5+
6+
package aggregator
7+
8+
import (
9+
_ "embed"
10+
"testing"
11+
"time"
12+
13+
"github.com/DataDog/datadog-agent/test/fakeintake/api"
14+
"github.com/stretchr/testify/assert"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
//go:embed fixtures/apm_telemetry_bytes
19+
var apmTelemetryData []byte
20+
21+
func TestParseAgentTelemetryLogs(t *testing.T) {
22+
t.Run("empty JSON object is silently skipped", func(t *testing.T) {
23+
logs, err := ParseAgentTelemetryLogs(api.Payload{
24+
Data: []byte("{}"),
25+
Encoding: encodingJSON,
26+
})
27+
require.NoError(t, err)
28+
assert.Empty(t, logs)
29+
})
30+
31+
t.Run("non-agent-logs request_type is silently skipped", func(t *testing.T) {
32+
logs, err := ParseAgentTelemetryLogs(api.Payload{
33+
Data: []byte(`{"request_type":"agent-metrics","payload":{}}`),
34+
Encoding: encodingJSON,
35+
})
36+
require.NoError(t, err)
37+
assert.Empty(t, logs)
38+
})
39+
40+
t.Run("non-JSON payload (e.g. protobuf) is silently skipped", func(t *testing.T) {
41+
logs, err := ParseAgentTelemetryLogs(api.Payload{
42+
Data: []byte("not valid json"),
43+
Encoding: encodingJSON,
44+
})
45+
require.NoError(t, err)
46+
assert.Empty(t, logs)
47+
})
48+
49+
t.Run("valid agent-logs payload parses all fields and stamps collected time", func(t *testing.T) {
50+
ts := time.Date(2025, 6, 1, 12, 0, 0, 0, time.UTC)
51+
logs, err := ParseAgentTelemetryLogs(api.Payload{
52+
Data: apmTelemetryData,
53+
Encoding: encodingJSON,
54+
Timestamp: ts,
55+
})
56+
require.NoError(t, err)
57+
require.Len(t, logs, 1)
58+
59+
l := logs[0]
60+
assert.Equal(t, "ERROR", l.Level)
61+
assert.Contains(t, l.StackTrace, "main.main()")
62+
assert.Equal(t, int64(1234567890), l.TracerTime)
63+
assert.Equal(t, 3, l.Count)
64+
assert.False(t, l.IsCrash)
65+
assert.Empty(t, l.Message)
66+
assert.Equal(t, ts, l.GetCollectedTime())
67+
})
68+
69+
t.Run("multiple logs in one payload are all returned", func(t *testing.T) {
70+
data := []byte(`{"request_type":"agent-logs","payload":{"logs":[` +
71+
`{"level":"ERROR","stack_trace":"foo","tracer_time":1,"count":1,"is_crash":false},` +
72+
`{"level":"ERROR","stack_trace":"bar","tracer_time":2,"count":2,"is_crash":true}` +
73+
`]}}`)
74+
logs, err := ParseAgentTelemetryLogs(api.Payload{
75+
Data: data,
76+
Encoding: encodingJSON,
77+
})
78+
require.NoError(t, err)
79+
require.Len(t, logs, 2)
80+
assert.Equal(t, int64(1), logs[0].TracerTime)
81+
assert.Equal(t, int64(2), logs[1].TracerTime)
82+
assert.True(t, logs[1].IsCrash)
83+
})
84+
85+
t.Run("GetTags returns empty slice", func(t *testing.T) {
86+
l := &AgentTelemetryLog{}
87+
assert.Empty(t, l.GetTags())
88+
})
89+
90+
t.Run("name returns agent-errortracking", func(t *testing.T) {
91+
l := &AgentTelemetryLog{}
92+
assert.Equal(t, "agent-errortracking", l.name())
93+
})
94+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"request_type":"agent-logs","payload":{"logs":[{"level":"ERROR","stack_trace":"goroutine 1 [running]:\nmain.main()\n\t/app/main.go:10 +0x58","tracer_time":1234567890,"count":3,"is_crash":false,"message":""}]}}

test/fakeintake/client/client.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ type Client struct {
154154
ncmAggregator aggregator.NCMAggregator
155155
hostAggregator aggregator.HostTagsAggregator
156156
agentHealthAggregator aggregator.AgentHealthAggregator
157+
agentTelemetryLogAggregator aggregator.AgentTelemetryLogAggregator
157158
}
158159

159160
// NewClient creates a new fake intake client
@@ -189,6 +190,7 @@ func NewClient(fakeIntakeURL string, opts ...Option) *Client {
189190
ncmAggregator: aggregator.NewNCMAggregator(),
190191
hostAggregator: aggregator.NewHostTagsAggregator(),
191192
agentHealthAggregator: aggregator.NewAgentHealthAggregator(),
193+
agentTelemetryLogAggregator: aggregator.NewAgentTelemetryLogAggregator(),
192194
}
193195
for _, opt := range opts {
194196
opt(client)
@@ -380,6 +382,14 @@ func (c *Client) getAgentHealth() error {
380382
return c.agentHealthAggregator.UnmarshallPayloads(payloads)
381383
}
382384

385+
func (c *Client) getAgentTelemetryLogs() error {
386+
payloads, err := c.getFakePayloads(apmTelemetryEndpoint)
387+
if err != nil {
388+
return err
389+
}
390+
return c.agentTelemetryLogAggregator.UnmarshallPayloads(payloads)
391+
}
392+
383393
// FilterMetrics fetches fakeintake on both `/api/v2/series` and `/api/intake/metrics/v3/series`
384394
// and returns metrics matching `name` and any [MatchOpt](#MatchOpt) options.
385395
// Results from both endpoints are merged.
@@ -742,6 +752,7 @@ func (c *Client) FlushServerAndResetAggregators() error {
742752
c.logAggregator.Reset()
743753
c.apmStatsAggregator.Reset()
744754
c.traceAggregator.Reset()
755+
c.agentTelemetryLogAggregator.Reset()
745756
return nil
746757
}
747758

@@ -1187,6 +1198,21 @@ func (c *Client) GetHosts() ([]string, error) {
11871198
return c.hostAggregator.GetNames(), nil
11881199
}
11891200

1201+
// GetAgentTelemetryLogs fetches fakeintake on `/api/v2/apmtelemetry` and returns
1202+
// all agent-logs telemetry records received since the last flush. Each call
1203+
// fetches the server's full accumulated state (UnmarshallPayloads replaces, not
1204+
// appends). Payloads whose request_type is not "agent-logs" are silently skipped.
1205+
func (c *Client) GetAgentTelemetryLogs() ([]*aggregator.AgentTelemetryLog, error) {
1206+
if err := c.getAgentTelemetryLogs(); err != nil {
1207+
return nil, err
1208+
}
1209+
var logs []*aggregator.AgentTelemetryLog
1210+
for _, name := range c.agentTelemetryLogAggregator.GetNames() {
1211+
logs = append(logs, c.agentTelemetryLogAggregator.GetPayloadsByName(name)...)
1212+
}
1213+
return logs, nil
1214+
}
1215+
11901216
// GetAgentHealth fetches fakeintake on `/api/v2/agenthealth` endpoint and returns all received agent health payloads
11911217
func (c *Client) GetAgentHealth() ([]*aggregator.AgentHealthPayload, error) {
11921218
err := c.getAgentHealth()

test/fakeintake/client/client_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -732,4 +732,51 @@ func TestClient(t *testing.T) {
732732
assert.Contains(t, issue.Tags, "docker:installed")
733733
})
734734

735+
t.Run("getAgentTelemetryLogs", func(t *testing.T) {
736+
payload := `{"request_type":"agent-logs","payload":{"logs":[{"level":"ERROR","stack_trace":"main.main()","tracer_time":1234567890,"count":3,"is_crash":false,"message":""}]}}`
737+
response, err := json.Marshal(api.APIFakeIntakePayloadsRawGETResponse{
738+
Payloads: []api.Payload{
739+
{Data: []byte(payload), Encoding: "application/json"},
740+
},
741+
})
742+
require.NoError(t, err)
743+
744+
ts := NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
745+
w.Write(response)
746+
}))
747+
defer ts.Close()
748+
749+
client := NewClient(ts.URL)
750+
err = client.getAgentTelemetryLogs()
751+
require.NoError(t, err)
752+
assert.True(t, client.agentTelemetryLogAggregator.ContainsPayloadName("agent-errortracking"))
753+
assert.False(t, client.agentTelemetryLogAggregator.ContainsPayloadName("totoro"))
754+
})
755+
756+
t.Run("GetAgentTelemetryLogs", func(t *testing.T) {
757+
payload := `{"request_type":"agent-logs","payload":{"logs":[{"level":"ERROR","stack_trace":"main.main()","tracer_time":1234567890,"count":3,"is_crash":false,"message":""}]}}`
758+
response, err := json.Marshal(api.APIFakeIntakePayloadsRawGETResponse{
759+
Payloads: []api.Payload{
760+
{Data: []byte(payload), Encoding: "application/json"},
761+
},
762+
})
763+
require.NoError(t, err)
764+
765+
ts := NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
766+
w.Write(response)
767+
}))
768+
defer ts.Close()
769+
770+
client := NewClient(ts.URL)
771+
logs, err := client.GetAgentTelemetryLogs()
772+
require.NoError(t, err)
773+
require.Len(t, logs, 1)
774+
assert.Equal(t, "ERROR", logs[0].Level)
775+
assert.Equal(t, "main.main()", logs[0].StackTrace)
776+
assert.Equal(t, int64(1234567890), logs[0].TracerTime)
777+
assert.Equal(t, 3, logs[0].Count)
778+
assert.False(t, logs[0].IsCrash)
779+
assert.Empty(t, logs[0].Message)
780+
})
781+
735782
}

test/fakeintake/server/serverstore/parser.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ var parserMap = map[string]parserFunc{
2020
"/api/v1/connections": getConnectionsPayLoadProtobuf,
2121
"/api/beta/sketches": getSketchPayloadProtobuf,
2222
"/api/intake/metrics/v3/series": getMetricV3SeriesPayload,
23+
"/api/v2/apmtelemetry": getAgentTelemetryLogsJSON,
2324
}
2425

2526
func getLogPayLoadJSON(payload api.Payload) (interface{}, error) {
@@ -50,6 +51,10 @@ func getMetricV3SeriesPayload(payload api.Payload) (interface{}, error) {
5051
return aggregator.ParseMetricSeriesV3(payload)
5152
}
5253

54+
func getAgentTelemetryLogsJSON(payload api.Payload) (interface{}, error) {
55+
return aggregator.ParseAgentTelemetryLogs(payload)
56+
}
57+
5358
// IsRouteHandled checks if a route is handled by the Datadog parsed store
5459
func IsRouteHandled(route string) bool {
5560
_, ok := parserMap[route]

0 commit comments

Comments
 (0)