Skip to content

Commit 20703ae

Browse files
committed
feat(errortracking): wire error-tracking handler into agent startup
- Add config defaults: buffer_size, flush_interval, bouncer_window (common_settings.go) - Wire errortracking.Handler into buildSlogLogger as a synchronous sibling of the async handler branch (pkg/util/log/setup/log.go) - Add installErrortrackingHandler() fx.Lifecycle hook in agent run command - Adapt LoadComponents call to updated signature (config.Component instead of string)
1 parent be28f36 commit 20703ae

5 files changed

Lines changed: 194 additions & 8 deletions

File tree

cmd/agent/subcommands/run/command.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,8 @@ import (
205205
"github.com/DataDog/datadog-agent/pkg/util/installinfo"
206206
"github.com/DataDog/datadog-agent/pkg/util/kubernetes/apiserver/leaderelection"
207207
pkglog "github.com/DataDog/datadog-agent/pkg/util/log"
208+
errortrackingpkg "github.com/DataDog/datadog-agent/pkg/util/log/errortracking"
209+
pkglogsetup "github.com/DataDog/datadog-agent/pkg/util/log/setup"
208210
"github.com/DataDog/datadog-agent/pkg/util/option"
209211
"github.com/DataDog/datadog-agent/pkg/version"
210212

@@ -403,7 +405,7 @@ func run(log log.Component,
403405
"Establish if the agent is running",
404406
)
405407

406-
// agentStarted and agentRunning are metrics used for Cross-org Agent Telemetry (COAT)
408+
// agentStarted and agentRunning are metrics used for internal agent telemetry
407409
// for more details on the scheduling config check comp/core/agenttelemetry/impl/config.go
408410
agentStarted.Inc()
409411
agentRunning.Set(1)
@@ -579,6 +581,8 @@ func getSharedFxOption() fx.Option {
579581
}),
580582
settingsfx.Module(),
581583
agenttelemetryfx.Module(),
584+
// AGTHEAL-15: errortracking submitter wire — atel owns buffer/flush/recursion.
585+
fx.Invoke(installErrortrackingHandler),
582586
remotetraceroute.Module(),
583587
networkpath.Bundle(),
584588
syntheticsTestsfx.Module(),
@@ -599,6 +603,33 @@ func getSharedFxOption() fx.Option {
599603
)
600604
}
601605

606+
// installErrortrackingHandler is a no-op when the feature is disabled
607+
// (agent_telemetry.errortracking.enabled or the parent agent_telemetry
608+
// gate). The OnStart hook installs the submitter into pkg/util/log/setup;
609+
// the matching clear runs synchronously inside atel.stop()
610+
// (deliberately not as a separate OnStop hook here) so it precedes the
611+
// final flush-goroutine drain.
612+
func installErrortrackingHandler(lc fx.Lifecycle, cfg config.Component, at agenttelemetry.Component) {
613+
if !configUtils.IsErrorTrackingEnabled(cfg) {
614+
return
615+
}
616+
617+
submitter := func(elog errortrackingpkg.ErrorLog) {
618+
at.SubmitErrorLog(elog)
619+
}
620+
621+
bouncerWindow := time.Duration(cfg.GetInt("agent_telemetry.errortracking.bouncer_window_seconds")) * time.Second
622+
bouncer := errortrackingpkg.NewBouncer(bouncerWindow, 0)
623+
624+
lc.Append(fx.Hook{
625+
OnStart: func(_ context.Context) error {
626+
pkglogsetup.RegisterErrortrackingSubmitter(submitter)
627+
pkglogsetup.RegisterErrortrackingBouncer(bouncer)
628+
return nil
629+
},
630+
})
631+
}
632+
602633
// startAgent Initializes the agent process
603634
func startAgent(
604635
log log.Component,

pkg/config/setup/common_settings.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1395,7 +1395,7 @@ func remoteconfig(config pkgconfigmodel.Setup) {
13951395
config.BindEnvAndSetDefault("remote_configuration.no_tls_validation", false)
13961396
config.BindEnvAndSetDefault("remote_configuration.config_root", "")
13971397
config.BindEnvAndSetDefault("remote_configuration.director_root", "")
1398-
config.BindEnvAndSetDefault("remote_configuration.refresh_interval", "0s")
1398+
config.BindEnvAndSetDefault("remote_configuration.refresh_interval", time.Duration(0))
13991399
config.BindEnvAndSetDefault("remote_configuration.org_status_refresh_interval", 1*time.Minute)
14001400
config.BindEnvAndSetDefault("remote_configuration.max_backoff_interval", 5*time.Minute)
14011401
config.BindEnvAndSetDefault("remote_configuration.clients.ttl_seconds", 30*time.Second)
@@ -1507,6 +1507,16 @@ func telemetry(config pkgconfigmodel.Setup) {
15071507
config.BindEnvAndSetDefault("agent_telemetry.compression_level", 1)
15081508
config.BindEnvAndSetDefault("agent_telemetry.use_compression", true)
15091509
config.BindEnvAndSetDefault("agent_telemetry.startup_trace_sampling", 0)
1510+
1511+
// AGTHEAL-15: experimental error log forwarding to COAT. Use-sites must
1512+
// additionally gate on pkg/config/utils.IsAgentTelemetryEnabled so
1513+
// gov/FIPS exclusion is inherited from the parent agent_telemetry flag.
1514+
config.BindEnvAndSetDefault("agent_telemetry.errortracking.enabled", false)
1515+
config.BindEnvAndSetDefault("agent_telemetry.errortracking.bouncer_window_seconds", 900)
1516+
config.BindEnvAndSetDefault("agent_telemetry.errortracking.flush_interval_seconds", 60)
1517+
config.BindEnvAndSetDefault("agent_telemetry.errortracking.buffer_size", 2048)
1518+
config.BindEnvAndSetDefault("agent_telemetry.errortracking.startup_jitter_seconds", 0)
1519+
config.BindEnvAndSetDefault("agent_telemetry.errortracking.shutdown_drain_timeout_seconds", 5)
15101520
}
15111521

15121522
func serializer(config pkgconfigmodel.Setup) {

pkg/util/log/setup/log.go

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,18 @@ package logs
99
import (
1010
"context"
1111
"errors"
12-
"io"
13-
stdslog "log/slog"
14-
"os"
15-
"strings"
16-
1712
pkgconfigmodel "github.com/DataDog/datadog-agent/pkg/config/model"
1813
"github.com/DataDog/datadog-agent/pkg/util/log"
14+
"github.com/DataDog/datadog-agent/pkg/util/log/errortracking"
1915
"github.com/DataDog/datadog-agent/pkg/util/log/slog"
2016
"github.com/DataDog/datadog-agent/pkg/util/log/slog/filewriter"
2117
"github.com/DataDog/datadog-agent/pkg/util/log/slog/handlers"
2218
"github.com/DataDog/datadog-agent/pkg/util/log/syslog"
2319
"github.com/DataDog/datadog-agent/pkg/util/log/types"
20+
"io"
21+
stdslog "log/slog"
22+
"os"
23+
"strings"
2424
)
2525

2626
// LoggerName specifies the name of an instantiated logger.
@@ -186,9 +186,19 @@ func buildSlogLogger(
186186
multiHandler := handlers.NewMulti(handlerList...)
187187
asyncHandler := handlers.NewAsync(multiHandler)
188188

189+
// AGTHEAL-15: errortracking branch is a sibling of asyncHandler under the
190+
// global level filter. The Handler atomic-loads the current Submitter
191+
// (and per-PC Bouncer) on every record, so the chain is correctly wired
192+
// whether or not the agenttelemetry component has registered yet; its
193+
// Enabled returns false when no Submitter is registered, short-circuiting
194+
// the multi-handler.
195+
errortrackingHandler := errortracking.NewHandler(loadErrortrackingSubmitter).
196+
WithBouncerLoader(loadErrortrackingBouncer)
197+
topHandler := handlers.NewMulti(asyncHandler, errortrackingHandler)
198+
189199
levelVar := new(stdslog.LevelVar)
190200
levelVar.Set(types.ToSlogLevel(logLevel))
191-
levelHandler := handlers.NewLevel(levelVar, asyncHandler)
201+
levelHandler := handlers.NewLevel(levelVar, topHandler)
192202

193203
// Close async handler first so it drains and stops writing, then close writers.
194204
// Otherwise the async goroutine can still call Write() while the file writer is closed (data race).

pkg/util/log/setup/log_test.go

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ import (
1010
"path/filepath"
1111
"sync"
1212
"testing"
13+
"time"
1314

1415
"github.com/stretchr/testify/require"
1516

1617
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
1718
"github.com/DataDog/datadog-agent/pkg/util/log"
19+
"github.com/DataDog/datadog-agent/pkg/util/log/errortracking"
20+
pkgslog "github.com/DataDog/datadog-agent/pkg/util/log/slog"
1821
)
1922

2023
func BenchmarkSlogParallel(b *testing.B) {
@@ -76,3 +79,125 @@ func runLog(b *testing.B) {
7679
}
7780
log.Flush()
7881
}
82+
83+
// --- Errortracking handler-chain tests ---------------------------------
84+
//
85+
// These tests previously lived in log_errortracking_test.go; folded
86+
// here per iglendd's "filename should match the source file" comment
87+
// on PR #50607. log.go's tests live in log_test.go.
88+
89+
// recordingSubmitter captures every ErrorLog routed through the chain.
90+
// Tests use it as the registered Submitter to assert the chain forwards
91+
// (or filters) records.
92+
type recordingSubmitter struct {
93+
mu sync.Mutex
94+
logs []errortracking.ErrorLog
95+
}
96+
97+
func (r *recordingSubmitter) submit(e errortracking.ErrorLog) {
98+
r.mu.Lock()
99+
defer r.mu.Unlock()
100+
r.logs = append(r.logs, e)
101+
}
102+
103+
func (r *recordingSubmitter) snapshot() []errortracking.ErrorLog {
104+
r.mu.Lock()
105+
defer r.mu.Unlock()
106+
out := make([]errortracking.ErrorLog, len(r.logs))
107+
copy(out, r.logs)
108+
return out
109+
}
110+
111+
func resetErrortrackingSlot(t *testing.T) {
112+
t.Helper()
113+
t.Cleanup(func() {
114+
RegisterErrortrackingSubmitter(nil)
115+
RegisterErrortrackingBouncer(nil)
116+
})
117+
RegisterErrortrackingSubmitter(nil)
118+
RegisterErrortrackingBouncer(nil)
119+
}
120+
121+
// TestRegisterErrortrackingSubmitter_NilResets verifies the on/off contract:
122+
// after registering and then clearing, the slot must return to a no-op
123+
// state. Tests rely on this for cleanup between cases.
124+
func TestRegisterErrortrackingSubmitter_NilResets(t *testing.T) {
125+
resetErrortrackingSlot(t)
126+
127+
rec := &recordingSubmitter{}
128+
RegisterErrortrackingSubmitter(rec.submit)
129+
require.NotNil(t, loadErrortrackingSubmitter())
130+
131+
RegisterErrortrackingSubmitter(nil)
132+
require.Nil(t, loadErrortrackingSubmitter())
133+
}
134+
135+
// TestBuildSlogLogger_ForwardsErrorRecord asserts that the chain assembled
136+
// by buildSlogLogger fans error records out to the registered Submitter
137+
// while routing non-error records only to the formatted writer branch.
138+
func TestBuildSlogLogger_ForwardsErrorRecord(t *testing.T) {
139+
resetErrortrackingSlot(t)
140+
141+
rec := &recordingSubmitter{}
142+
RegisterErrortrackingSubmitter(rec.submit)
143+
// A Bouncer must be registered alongside the Submitter: when loadBouncer
144+
// is set but returns nil the handler drops the record (safe default for
145+
// the Fx startup window). The production wiring registers both together.
146+
RegisterErrortrackingBouncer(errortracking.NewBouncer(15*time.Minute, 0))
147+
148+
dir := t.TempDir()
149+
ddCfg := pkgconfigsetup.Datadog()
150+
logger, levelVar, err := buildSlogLogger(
151+
log.DebugLvl,
152+
false,
153+
filepath.Join(dir, "test.log"), 1000, 2,
154+
"",
155+
commonFormatter("TEST", ddCfg), nil,
156+
)
157+
require.NoError(t, err)
158+
t.Cleanup(logger.Close)
159+
levelVar.Set(slog.LevelDebug)
160+
161+
wrapper, ok := logger.(*pkgslog.Wrapper)
162+
require.True(t, ok, "expected *pkgslog.Wrapper, got %T", logger)
163+
sl := slog.New(wrapper.Handler())
164+
165+
sl.Info("info message - should not reach errortracking")
166+
sl.Warn("warn message - should not reach errortracking")
167+
sl.Error("error message - should reach errortracking")
168+
logger.Flush()
169+
170+
got := rec.snapshot()
171+
require.Len(t, got, 1, "exactly one Error record must reach the registered Submitter")
172+
require.NotZero(t, got[0].PC, "captured record must carry a call-site PC")
173+
require.Greater(t, got[0].PCsLen, 0, "captured record must carry stack PCs")
174+
require.Equal(t, uint32(1), got[0].Count, "first sighting always has Count=1")
175+
}
176+
177+
// TestBuildSlogLogger_NoForwardingWhenUnregistered asserts that the chain
178+
// built by buildSlogLogger remains a no-op for the errortracking branch
179+
// when no Submitter has been registered (the common path before opt-in).
180+
func TestBuildSlogLogger_NoForwardingWhenUnregistered(t *testing.T) {
181+
resetErrortrackingSlot(t)
182+
183+
dir := t.TempDir()
184+
ddCfg := pkgconfigsetup.Datadog()
185+
logger, levelVar, err := buildSlogLogger(
186+
log.DebugLvl,
187+
false,
188+
filepath.Join(dir, "test.log"), 1000, 2,
189+
"",
190+
commonFormatter("TEST", ddCfg), nil,
191+
)
192+
require.NoError(t, err)
193+
t.Cleanup(logger.Close)
194+
levelVar.Set(slog.LevelDebug)
195+
196+
wrapper := logger.(*pkgslog.Wrapper)
197+
sl := slog.New(wrapper.Handler())
198+
199+
// Should not panic; should produce nothing observable on the
200+
// errortracking side.
201+
sl.Error("error with nobody listening")
202+
logger.Flush()
203+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
---
2+
features:
3+
- |
4+
Added an experimental Cross-Org Agent Telemetry (COAT) error log
5+
forwarder. When ``errortracking.enabled`` is set to ``true`` in
6+
``datadog.yaml`` the Agent forwards records logged at ``ERROR`` level or
7+
higher to the COAT intake so Datadog Engineers can aggregate Agent
8+
errors across customer organizations. The forwarder is off by default
9+
and shares the agent telemetry transport, inheriting endpoint and
10+
compression settings from the ``agent_telemetry.*`` configuration.

0 commit comments

Comments
 (0)