Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions comp/observer/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,3 +673,24 @@ type Detector interface {
// dataTime is the current data timestamp (for determinism - only read data <= dataTime).
Detect(storage StorageReader, dataTime int64) DetectionResult
}

// SeriesRemover is an optional interface that Detector implementations can
// satisfy to receive notifications when storage drops series.
//
// Many detectors maintain per-series state (BOCPD posterior arrays, ScanMW
// segment buffers, ScanWelch posterior, the seriesDetectorAdapter visible
// point count map, etc.) keyed by SeriesRef. Storage frees the series
// payload itself when extractors evict their LRU contexts and the engine
// calls RemoveSeriesByKeys, but without this hook the detector-side maps
// keep growing unbounded with the cumulative number of series ever
// observed. The engine fans the freed refs out to every detector that
// implements this interface immediately after RemoveSeriesByKeys returns
// them, keeping detector state symmetric with storage state.
//
// Implementations should be cheap (a handful of map deletes) and tolerant
// of refs they have never seen — adapters routinely receive refs for
// series they were never asked to detect on (e.g. metric series on a
// log-only detector).
type SeriesRemover interface {
RemoveSeries(refs []SeriesRef)
}
88 changes: 88 additions & 0 deletions comp/observer/impl/component_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,3 +380,91 @@ func catalogEnabledCorrelators(components map[string]*componentInstance, catalog
}
return result
}

// statelessDetectorAllowlist enumerates catalog detectors that are explicitly
// permitted to NOT implement observerdef.SeriesRemover. A stateless detector
// keeps no per-series state (no posterior maps, no segment trackers, no
// visible-count tracking) and therefore needs nothing freed when storage
// evicts a series; the engine's fanOutSeriesRemoval safely no-ops on it.
//
// Any new entry added here is asserting "this detector is genuinely stateless
// across detect calls". If a detector ever grows per-series memory (cache,
// tracker, accumulator keyed by SeriesRef), it must implement SeriesRemover
// and be removed from this list — otherwise its memory grows with the
// cumulative number of series ever observed even after storage evicts them.
var statelessDetectorAllowlist = map[string]struct{}{
// SeriesDetector implementations are wrapped at instantiation time by
// seriesDetectorAdapter, which itself implements SeriesRemover — so the
// raw SeriesDetector struct doesn't need to. The adapter handles teardown
// of its own lastVisibleCount cache and forwards to the wrapped detector
// only if it also satisfies SeriesRemover.
//
// Truly-stateless catalog Detectors are listed below.

// RRCF tracks a FIXED set of metric definitions configured at construction
// (RRCFConfig.Metrics, with DefaultRRCFMetrics() as the fallback). Its
// resolvedKeys / cursors maps are keyed by cursorKey (a metric definition
// identifier), not by ingested SeriesRef — so the map size is bounded by
// the configured metrics, not by storage cardinality. Adding storage-eviction
// fan-out would not free anything because RRCF state isn't keyed by SeriesRef.
// If RRCF is ever extended to track per-tag-combination state keyed by
// SeriesRef, this entry must be removed and RRCF must implement
// SeriesRemover.
"rrcf": {},
}

// validateDetectorTeardownContract checks that every detector entry in the
// catalog either implements observerdef.SeriesRemover (so engine eviction
// fan-out can free its per-series state) or is explicitly listed in
// statelessDetectorAllowlist. Returns nil on success and a descriptive error
// on the first violator.
//
// Intended use: a unit test calls this against defaultCatalog() so any new
// detector added to the catalog without a teardown story fails CI before it
// can leak memory in production. SeriesDetector entries are validated against
// the SeriesRemover interface on the wrapping adapter (newSeriesDetectorAdapter
// always returns a *seriesDetectorAdapter, which implements SeriesRemover),
// matching what Instantiate produces at runtime.
func (c *componentCatalog) validateDetectorTeardownContract() error {
for _, entry := range c.entries {
if entry.kind != componentDetector {
continue
}
if _, allowed := statelessDetectorAllowlist[entry.name]; allowed {
continue
}
// Build the same instance Instantiate would. We use defaultConfig
// because the contract under test is structural, not config-dependent.
instance := entry.factory(entry.defaultConfig)

// Mirror Instantiate's wrapping logic: SeriesDetector is wrapped
// in seriesDetectorAdapter, which is a SeriesRemover. A direct
// Detector implementation must be a SeriesRemover itself.
if sd, ok := instance.(observerdef.SeriesDetector); ok {
wrapped := newSeriesDetectorAdapter(sd, defaultAggregations)
if _, ok := any(wrapped).(observerdef.SeriesRemover); !ok {
return &detectorTeardownContractError{name: entry.name, reason: "seriesDetectorAdapter no longer implements SeriesRemover \u2014 the wrapping invariant has regressed"}
}
continue
}
if d, ok := instance.(observerdef.Detector); ok {
if _, ok := d.(observerdef.SeriesRemover); ok {
continue
}
return &detectorTeardownContractError{name: entry.name, reason: "detector neither implements observerdef.SeriesRemover nor is listed in statelessDetectorAllowlist"}
}
return &detectorTeardownContractError{name: entry.name, reason: "factory product is neither observerdef.Detector nor observerdef.SeriesDetector"}
}
return nil
}

// detectorTeardownContractError marks a catalog entry that violates the
// SeriesRemover contract.
type detectorTeardownContractError struct {
name string
reason string
}

func (e *detectorTeardownContractError) Error() string {
return "detector \"" + e.name + "\" violates teardown contract: " + e.reason
}
77 changes: 77 additions & 0 deletions comp/observer/impl/component_catalog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

package observerimpl

import (
"errors"
"testing"

"github.com/stretchr/testify/require"

observerdef "github.com/DataDog/datadog-agent/comp/observer/def"
)

// TestDefaultCatalog_DetectorTeardownContract is the structural guard that
// every catalog detector either implements observerdef.SeriesRemover or is
// explicitly listed in statelessDetectorAllowlist. Without this, a new
// detector with per-series state can be added to the catalog and silently
// leak memory in production: storage eviction will free the series, but the
// detector's per-series map will never shrink.
func TestDefaultCatalog_DetectorTeardownContract(t *testing.T) {
require.NoError(t, defaultCatalog().validateDetectorTeardownContract(),
"every catalog detector must implement SeriesRemover or be added to statelessDetectorAllowlist with a justification comment")
}

// TestValidateDetectorTeardownContract_FlagsBareDetector confirms the
// validator rejects a Detector that doesn't implement SeriesRemover and isn't
// allowlisted — i.e. the check actually fails when it should.
func TestValidateDetectorTeardownContract_FlagsBareDetector(t *testing.T) {
cat := &componentCatalog{
entries: []componentEntry{
{
name: "bare-detector",
kind: componentDetector,
factory: func(any) any { return &bareDetectorForValidator{} },
defaultEnabled: true,
},
},
}
err := cat.validateDetectorTeardownContract()
require.Error(t, err)
var contractErr *detectorTeardownContractError
require.True(t, errors.As(err, &contractErr), "error must be detectorTeardownContractError")
require.Equal(t, "bare-detector", contractErr.name)
}

// TestValidateDetectorTeardownContract_AllowlistEscape confirms an allowlisted
// detector is permitted to skip SeriesRemover. Useful for genuinely stateless
// detectors (none in the catalog today; this exercises the escape hatch).
func TestValidateDetectorTeardownContract_AllowlistEscape(t *testing.T) {
statelessDetectorAllowlist["explicitly-stateless-test"] = struct{}{}
t.Cleanup(func() { delete(statelessDetectorAllowlist, "explicitly-stateless-test") })

cat := &componentCatalog{
entries: []componentEntry{
{
name: "explicitly-stateless-test",
kind: componentDetector,
factory: func(any) any { return &bareDetectorForValidator{} },
defaultEnabled: true,
},
},
}
require.NoError(t, cat.validateDetectorTeardownContract())
}

// bareDetectorForValidator is a minimal observerdef.Detector that
// intentionally does NOT implement SeriesRemover — used to drive the
// negative cases of validateDetectorTeardownContract.
type bareDetectorForValidator struct{}

func (*bareDetectorForValidator) Name() string { return "bare-detector" }
func (*bareDetectorForValidator) Detect(_ observerdef.StorageReader, _ int64) observerdef.DetectionResult {
return observerdef.DetectionResult{}
}
118 changes: 110 additions & 8 deletions comp/observer/impl/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ type engine struct {
latePointsBySource map[string]int64 // per-source breakdown (single-goroutine access from run loop)
handles []*handle // registered handles for per-source drop collection
handlesMu sync.Mutex // protects handles slice

// sourceTagCache memoises the "observer_source:<source>" string used in
// IngestLog/IngestMetric. Without this we allocate a fresh string per
// log/metric ingest. Sources are a small bounded set (e.g. "logs",
// "profiles", "traces") so a single-goroutine map is plenty; access is
// confined to the engine run loop. Lock-free via atomic.Pointer to a
// copy-on-write map so we don't add a mutex to the hot path.
sourceTagCache atomic.Pointer[map[string]string]
}

// engineConfig holds the parameters for constructing an engine.
Expand Down Expand Up @@ -207,6 +215,49 @@ func (e *engine) registerHandle(h *handle) {
e.handlesMu.Unlock()
}

// sourceTagForIngest returns "observer_source:<source>" with memoisation so
// IngestLog / IngestMetric don't allocate a fresh string per ingest. The
// source set is small and bounded; a copy-on-write map indexed via an
// atomic.Pointer keeps reads lock-free on the hot path.
//
// The bounded-source assumption: every production caller of obs.GetHandle()
// passes a statically-defined string constant. As of this writing the full
// set is:
// - "all-metrics" (pkg/aggregator/demultiplexer_agent.go)
// - "dogstatsd" (comp/dogstatsd/server/server.go)
// - "logs" (comp/observer/logssource/impl/component.go)
// - "agent-internal-logs" (comp/observer/impl/observer.go)
// - "profile-agent" (comp/observer/impl/observer.go)
// - hfrunner.HFSource (comp/observer/impl/observer.go)
// - hfrunner.HFContainerSource (comp/observer/impl/observer.go)
//
// If a future caller ever passes a user-controlled or per-container source
// string, the COW map becomes unbounded and this memoisation strategy is
// the wrong shape (use sync.Map or a bounded LRU). Adding an entry to that
// list above means revisiting this function.
func (e *engine) sourceTagForIngest(source string) string {
if m := e.sourceTagCache.Load(); m != nil {
if tag, ok := (*m)[source]; ok {
return tag
}
}
tag := "observer_source:" + source
for {
old := e.sourceTagCache.Load()
newMap := make(map[string]string, 4)
if old != nil {
for k, v := range *old {
newMap[k] = v
}
}
newMap[source] = tag
if e.sourceTagCache.CompareAndSwap(old, &newMap) {
break
}
}
return tag
}

// IngestMetric stores a metric observation and consults the scheduler policy
// to determine whether detectors should advance. Returns advance requests
// that the caller should execute via Advance.
Expand All @@ -229,7 +280,7 @@ func (e *engine) IngestMetric(source string, m *metricObs) []advanceRequest {
// notifies log observers, and consults the scheduler policy to determine whether
// detectors should advance. Returns advance requests that the caller should execute.
func (e *engine) IngestLog(source string, l *logObs) ([]advanceRequest, []observerdef.ObserverTelemetry) {
sourceTag := "observer_source:" + source
sourceTag := e.sourceTagForIngest(source)
view := &logView{obs: l}
var logTelemetry = []observerdef.ObserverTelemetry{}
for _, extractor := range e.extractors {
Expand All @@ -239,14 +290,26 @@ func (e *engine) IngestLog(source string, l *logObs) ([]advanceRequest, []observ
processingTime := time.Since(processingStartTime)
logTelemetry = append(logTelemetry, newTelemetryGauge([]string{"detector:" + extractor.Name()}, telemetryDetectorProcessingTimeNs, float64(processingTime.Nanoseconds()), l.timestampMs/1000))
for _, m := range out.Metrics {
tags := copyTags(m.Tags)
// Avoid copying m.Tags when sourceTag is already present: storage.Add
// performs its own deep copy on first-write of a series via
// canonicalizeTags, and seriesKey sorts a copy internally — neither
// mutates the input. The copy is only required when we need to
// append sourceTag without disturbing the extractor's slice.
tags := m.Tags
if !sliceContains(tags, sourceTag) {
tags = append(tags, sourceTag)
newTags := make([]string, len(tags), len(tags)+1)
copy(newTags, tags)
tags = append(newTags, sourceTag)
}
e.storage.Add(extractor.Name(), m.Name, m.Value, l.timestampMs/1000, tags)
if m.ContextKey != "" {
sk := seriesKey(extractor.Name(), m.Name, tags)
e.contextRefs[sk] = seriesContextRef{
res := e.storage.Add(extractor.Name(), m.Name, m.Value, l.timestampMs/1000, tags)
if m.ContextKey != "" && res.StorageKey != "" {
// Reuse the storage key computed inside storage.Add instead of
// recomputing seriesKey here. seriesKey is hot enough that this
// duplicate accounted for ~14.5 MiB heap-live in the
// quality_gate_container_logs SMP profile (now renamed to
// observer_logs_anomaly_stress; the 'quality_gate_*' prefix is
// reserved for SMP quality-gate cases).
e.contextRefs[res.StorageKey] = seriesContextRef{
namespace: extractor.Name(),
contextKey: m.ContextKey,
}
Expand Down Expand Up @@ -283,7 +346,10 @@ func sliceContains(items []string, want string) bool {
}

// removeContextRefsForEvictedKeys drops engine contextRefs whose extractor
// namespace and context key match an eviction from extractor GC.
// namespace and context key match an eviction from extractor GC, and frees
// the corresponding storage series. Without the storage cleanup, evicted
// patterns leak their tags + columnar arrays indefinitely (the contextRefs
// map is just metadata; the heavy data lives in storage.series).
func (e *engine) removeContextRefsForEvictedKeys(namespace string, evictedKeys []string) {
// No garbage collection done
if len(evictedKeys) == 0 {
Expand All @@ -298,12 +364,48 @@ func (e *engine) removeContextRefsForEvictedKeys(namespace string, evictedKeys [
if len(want) == 0 {
return
}
var storageKeys []string
for seriesID, ref := range e.contextRefs {
if ref.namespace != namespace {
continue
}
if _, ok := want[ref.contextKey]; ok {
delete(e.contextRefs, seriesID)
storageKeys = append(storageKeys, seriesID)
}
}
if len(storageKeys) > 0 {
freedRefs := e.storage.RemoveSeriesByKeys(storageKeys)
e.fanOutSeriesRemoval(freedRefs)
}
}

// fanOutSeriesRemoval notifies every detector that implements the optional
// SeriesRemover interface that the listed SeriesRefs have been freed by
// storage. This keeps detector-side per-series state (BOCPD posterior maps,
// ScanMW/ScanWelch segment trackers, seriesDetectorAdapter visible-count
// maps) symmetric with storage so the LRU caps placed on extractors’
// contexts actually translate into bounded heap usage end-to-end.
//
// The caller (removeContextRefsForEvictedKeys / Reset / future GC paths)
// is responsible for invoking this with whatever refs storage actually
// freed. Detectors are expected to ignore unknown refs, so it’s safe to
// broadcast the same ref list to all of them.
//
// Concurrency invariant: this method, like every method on engine and
// every detector RemoveSeries / Detect callback, runs only on the single
// goroutine driving observerImpl.run() (observer.go). Ingest, advance,
// detection, and these eviction fan-outs are all serialised through that
// loop, so detector implementations may mutate per-series state without
// taking their own locks. Adding a new caller of this function from a
// different goroutine would break that invariant for every detector.
func (e *engine) fanOutSeriesRemoval(refs []observerdef.SeriesRef) {
if len(refs) == 0 || len(e.detectors) == 0 {
return
}
for _, d := range e.detectors {
if remover, ok := d.(observerdef.SeriesRemover); ok {
remover.RemoveSeries(refs)
}
}
}
Expand Down
Loading
Loading