diff --git a/comp/observer/def/component.go b/comp/observer/def/component.go index 6a9898a2867e..0e47b0cba8a7 100644 --- a/comp/observer/def/component.go +++ b/comp/observer/def/component.go @@ -673,3 +673,24 @@ type Detector interface { // dataTime is the current data timestamp (for determinism - only read data <= dataTime). Detect(storage StorageReader, dataTime int64) DetectionResult } + +// SeriesRemover is an optional interface that Detector implementations can +// satisfy to receive notifications when storage drops series. +// +// Many detectors maintain per-series state (BOCPD posterior arrays, ScanMW +// segment buffers, ScanWelch posterior, the seriesDetectorAdapter visible +// point count map, etc.) keyed by SeriesRef. Storage frees the series +// payload itself when extractors evict their LRU contexts and the engine +// calls RemoveSeriesByKeys, but without this hook the detector-side maps +// keep growing unbounded with the cumulative number of series ever +// observed. The engine fans the freed refs out to every detector that +// implements this interface immediately after RemoveSeriesByKeys returns +// them, keeping detector state symmetric with storage state. +// +// Implementations should be cheap (a handful of map deletes) and tolerant +// of refs they have never seen — adapters routinely receive refs for +// series they were never asked to detect on (e.g. metric series on a +// log-only detector). +type SeriesRemover interface { + RemoveSeries(refs []SeriesRef) +} diff --git a/comp/observer/impl/component_catalog.go b/comp/observer/impl/component_catalog.go index 8109e571dcf3..9d09104c8ac4 100644 --- a/comp/observer/impl/component_catalog.go +++ b/comp/observer/impl/component_catalog.go @@ -380,3 +380,91 @@ func catalogEnabledCorrelators(components map[string]*componentInstance, catalog } return result } + +// statelessDetectorAllowlist enumerates catalog detectors that are explicitly +// permitted to NOT implement observerdef.SeriesRemover. A stateless detector +// keeps no per-series state (no posterior maps, no segment trackers, no +// visible-count tracking) and therefore needs nothing freed when storage +// evicts a series; the engine's fanOutSeriesRemoval safely no-ops on it. +// +// Any new entry added here is asserting "this detector is genuinely stateless +// across detect calls". If a detector ever grows per-series memory (cache, +// tracker, accumulator keyed by SeriesRef), it must implement SeriesRemover +// and be removed from this list — otherwise its memory grows with the +// cumulative number of series ever observed even after storage evicts them. +var statelessDetectorAllowlist = map[string]struct{}{ + // SeriesDetector implementations are wrapped at instantiation time by + // seriesDetectorAdapter, which itself implements SeriesRemover — so the + // raw SeriesDetector struct doesn't need to. The adapter handles teardown + // of its own lastVisibleCount cache and forwards to the wrapped detector + // only if it also satisfies SeriesRemover. + // + // Truly-stateless catalog Detectors are listed below. + + // RRCF tracks a FIXED set of metric definitions configured at construction + // (RRCFConfig.Metrics, with DefaultRRCFMetrics() as the fallback). Its + // resolvedKeys / cursors maps are keyed by cursorKey (a metric definition + // identifier), not by ingested SeriesRef — so the map size is bounded by + // the configured metrics, not by storage cardinality. Adding storage-eviction + // fan-out would not free anything because RRCF state isn't keyed by SeriesRef. + // If RRCF is ever extended to track per-tag-combination state keyed by + // SeriesRef, this entry must be removed and RRCF must implement + // SeriesRemover. + "rrcf": {}, +} + +// validateDetectorTeardownContract checks that every detector entry in the +// catalog either implements observerdef.SeriesRemover (so engine eviction +// fan-out can free its per-series state) or is explicitly listed in +// statelessDetectorAllowlist. Returns nil on success and a descriptive error +// on the first violator. +// +// Intended use: a unit test calls this against defaultCatalog() so any new +// detector added to the catalog without a teardown story fails CI before it +// can leak memory in production. SeriesDetector entries are validated against +// the SeriesRemover interface on the wrapping adapter (newSeriesDetectorAdapter +// always returns a *seriesDetectorAdapter, which implements SeriesRemover), +// matching what Instantiate produces at runtime. +func (c *componentCatalog) validateDetectorTeardownContract() error { + for _, entry := range c.entries { + if entry.kind != componentDetector { + continue + } + if _, allowed := statelessDetectorAllowlist[entry.name]; allowed { + continue + } + // Build the same instance Instantiate would. We use defaultConfig + // because the contract under test is structural, not config-dependent. + instance := entry.factory(entry.defaultConfig) + + // Mirror Instantiate's wrapping logic: SeriesDetector is wrapped + // in seriesDetectorAdapter, which is a SeriesRemover. A direct + // Detector implementation must be a SeriesRemover itself. + if sd, ok := instance.(observerdef.SeriesDetector); ok { + wrapped := newSeriesDetectorAdapter(sd, defaultAggregations) + if _, ok := any(wrapped).(observerdef.SeriesRemover); !ok { + return &detectorTeardownContractError{name: entry.name, reason: "seriesDetectorAdapter no longer implements SeriesRemover \u2014 the wrapping invariant has regressed"} + } + continue + } + if d, ok := instance.(observerdef.Detector); ok { + if _, ok := d.(observerdef.SeriesRemover); ok { + continue + } + return &detectorTeardownContractError{name: entry.name, reason: "detector neither implements observerdef.SeriesRemover nor is listed in statelessDetectorAllowlist"} + } + return &detectorTeardownContractError{name: entry.name, reason: "factory product is neither observerdef.Detector nor observerdef.SeriesDetector"} + } + return nil +} + +// detectorTeardownContractError marks a catalog entry that violates the +// SeriesRemover contract. +type detectorTeardownContractError struct { + name string + reason string +} + +func (e *detectorTeardownContractError) Error() string { + return "detector \"" + e.name + "\" violates teardown contract: " + e.reason +} diff --git a/comp/observer/impl/component_catalog_test.go b/comp/observer/impl/component_catalog_test.go new file mode 100644 index 000000000000..b6dfd2f2751e --- /dev/null +++ b/comp/observer/impl/component_catalog_test.go @@ -0,0 +1,77 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package observerimpl + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" + + observerdef "github.com/DataDog/datadog-agent/comp/observer/def" +) + +// TestDefaultCatalog_DetectorTeardownContract is the structural guard that +// every catalog detector either implements observerdef.SeriesRemover or is +// explicitly listed in statelessDetectorAllowlist. Without this, a new +// detector with per-series state can be added to the catalog and silently +// leak memory in production: storage eviction will free the series, but the +// detector's per-series map will never shrink. +func TestDefaultCatalog_DetectorTeardownContract(t *testing.T) { + require.NoError(t, defaultCatalog().validateDetectorTeardownContract(), + "every catalog detector must implement SeriesRemover or be added to statelessDetectorAllowlist with a justification comment") +} + +// TestValidateDetectorTeardownContract_FlagsBareDetector confirms the +// validator rejects a Detector that doesn't implement SeriesRemover and isn't +// allowlisted — i.e. the check actually fails when it should. +func TestValidateDetectorTeardownContract_FlagsBareDetector(t *testing.T) { + cat := &componentCatalog{ + entries: []componentEntry{ + { + name: "bare-detector", + kind: componentDetector, + factory: func(any) any { return &bareDetectorForValidator{} }, + defaultEnabled: true, + }, + }, + } + err := cat.validateDetectorTeardownContract() + require.Error(t, err) + var contractErr *detectorTeardownContractError + require.True(t, errors.As(err, &contractErr), "error must be detectorTeardownContractError") + require.Equal(t, "bare-detector", contractErr.name) +} + +// TestValidateDetectorTeardownContract_AllowlistEscape confirms an allowlisted +// detector is permitted to skip SeriesRemover. Useful for genuinely stateless +// detectors (none in the catalog today; this exercises the escape hatch). +func TestValidateDetectorTeardownContract_AllowlistEscape(t *testing.T) { + statelessDetectorAllowlist["explicitly-stateless-test"] = struct{}{} + t.Cleanup(func() { delete(statelessDetectorAllowlist, "explicitly-stateless-test") }) + + cat := &componentCatalog{ + entries: []componentEntry{ + { + name: "explicitly-stateless-test", + kind: componentDetector, + factory: func(any) any { return &bareDetectorForValidator{} }, + defaultEnabled: true, + }, + }, + } + require.NoError(t, cat.validateDetectorTeardownContract()) +} + +// bareDetectorForValidator is a minimal observerdef.Detector that +// intentionally does NOT implement SeriesRemover — used to drive the +// negative cases of validateDetectorTeardownContract. +type bareDetectorForValidator struct{} + +func (*bareDetectorForValidator) Name() string { return "bare-detector" } +func (*bareDetectorForValidator) Detect(_ observerdef.StorageReader, _ int64) observerdef.DetectionResult { + return observerdef.DetectionResult{} +} diff --git a/comp/observer/impl/engine.go b/comp/observer/impl/engine.go index d072575283ea..583b6affe846 100644 --- a/comp/observer/impl/engine.go +++ b/comp/observer/impl/engine.go @@ -107,6 +107,14 @@ type engine struct { latePointsBySource map[string]int64 // per-source breakdown (single-goroutine access from run loop) handles []*handle // registered handles for per-source drop collection handlesMu sync.Mutex // protects handles slice + + // sourceTagCache memoises the "observer_source:" string used in + // IngestLog/IngestMetric. Without this we allocate a fresh string per + // log/metric ingest. Sources are a small bounded set (e.g. "logs", + // "profiles", "traces") so a single-goroutine map is plenty; access is + // confined to the engine run loop. Lock-free via atomic.Pointer to a + // copy-on-write map so we don't add a mutex to the hot path. + sourceTagCache atomic.Pointer[map[string]string] } // engineConfig holds the parameters for constructing an engine. @@ -207,6 +215,49 @@ func (e *engine) registerHandle(h *handle) { e.handlesMu.Unlock() } +// sourceTagForIngest returns "observer_source:" with memoisation so +// IngestLog / IngestMetric don't allocate a fresh string per ingest. The +// source set is small and bounded; a copy-on-write map indexed via an +// atomic.Pointer keeps reads lock-free on the hot path. +// +// The bounded-source assumption: every production caller of obs.GetHandle() +// passes a statically-defined string constant. As of this writing the full +// set is: +// - "all-metrics" (pkg/aggregator/demultiplexer_agent.go) +// - "dogstatsd" (comp/dogstatsd/server/server.go) +// - "logs" (comp/observer/logssource/impl/component.go) +// - "agent-internal-logs" (comp/observer/impl/observer.go) +// - "profile-agent" (comp/observer/impl/observer.go) +// - hfrunner.HFSource (comp/observer/impl/observer.go) +// - hfrunner.HFContainerSource (comp/observer/impl/observer.go) +// +// If a future caller ever passes a user-controlled or per-container source +// string, the COW map becomes unbounded and this memoisation strategy is +// the wrong shape (use sync.Map or a bounded LRU). Adding an entry to that +// list above means revisiting this function. +func (e *engine) sourceTagForIngest(source string) string { + if m := e.sourceTagCache.Load(); m != nil { + if tag, ok := (*m)[source]; ok { + return tag + } + } + tag := "observer_source:" + source + for { + old := e.sourceTagCache.Load() + newMap := make(map[string]string, 4) + if old != nil { + for k, v := range *old { + newMap[k] = v + } + } + newMap[source] = tag + if e.sourceTagCache.CompareAndSwap(old, &newMap) { + break + } + } + return tag +} + // IngestMetric stores a metric observation and consults the scheduler policy // to determine whether detectors should advance. Returns advance requests // that the caller should execute via Advance. @@ -229,7 +280,7 @@ func (e *engine) IngestMetric(source string, m *metricObs) []advanceRequest { // notifies log observers, and consults the scheduler policy to determine whether // detectors should advance. Returns advance requests that the caller should execute. func (e *engine) IngestLog(source string, l *logObs) ([]advanceRequest, []observerdef.ObserverTelemetry) { - sourceTag := "observer_source:" + source + sourceTag := e.sourceTagForIngest(source) view := &logView{obs: l} var logTelemetry = []observerdef.ObserverTelemetry{} for _, extractor := range e.extractors { @@ -239,14 +290,26 @@ func (e *engine) IngestLog(source string, l *logObs) ([]advanceRequest, []observ processingTime := time.Since(processingStartTime) logTelemetry = append(logTelemetry, newTelemetryGauge([]string{"detector:" + extractor.Name()}, telemetryDetectorProcessingTimeNs, float64(processingTime.Nanoseconds()), l.timestampMs/1000)) for _, m := range out.Metrics { - tags := copyTags(m.Tags) + // Avoid copying m.Tags when sourceTag is already present: storage.Add + // performs its own deep copy on first-write of a series via + // canonicalizeTags, and seriesKey sorts a copy internally — neither + // mutates the input. The copy is only required when we need to + // append sourceTag without disturbing the extractor's slice. + tags := m.Tags if !sliceContains(tags, sourceTag) { - tags = append(tags, sourceTag) + newTags := make([]string, len(tags), len(tags)+1) + copy(newTags, tags) + tags = append(newTags, sourceTag) } - e.storage.Add(extractor.Name(), m.Name, m.Value, l.timestampMs/1000, tags) - if m.ContextKey != "" { - sk := seriesKey(extractor.Name(), m.Name, tags) - e.contextRefs[sk] = seriesContextRef{ + res := e.storage.Add(extractor.Name(), m.Name, m.Value, l.timestampMs/1000, tags) + if m.ContextKey != "" && res.StorageKey != "" { + // Reuse the storage key computed inside storage.Add instead of + // recomputing seriesKey here. seriesKey is hot enough that this + // duplicate accounted for ~14.5 MiB heap-live in the + // quality_gate_container_logs SMP profile (now renamed to + // observer_logs_anomaly_stress; the 'quality_gate_*' prefix is + // reserved for SMP quality-gate cases). + e.contextRefs[res.StorageKey] = seriesContextRef{ namespace: extractor.Name(), contextKey: m.ContextKey, } @@ -283,7 +346,10 @@ func sliceContains(items []string, want string) bool { } // removeContextRefsForEvictedKeys drops engine contextRefs whose extractor -// namespace and context key match an eviction from extractor GC. +// namespace and context key match an eviction from extractor GC, and frees +// the corresponding storage series. Without the storage cleanup, evicted +// patterns leak their tags + columnar arrays indefinitely (the contextRefs +// map is just metadata; the heavy data lives in storage.series). func (e *engine) removeContextRefsForEvictedKeys(namespace string, evictedKeys []string) { // No garbage collection done if len(evictedKeys) == 0 { @@ -298,12 +364,48 @@ func (e *engine) removeContextRefsForEvictedKeys(namespace string, evictedKeys [ if len(want) == 0 { return } + var storageKeys []string for seriesID, ref := range e.contextRefs { if ref.namespace != namespace { continue } if _, ok := want[ref.contextKey]; ok { delete(e.contextRefs, seriesID) + storageKeys = append(storageKeys, seriesID) + } + } + if len(storageKeys) > 0 { + freedRefs := e.storage.RemoveSeriesByKeys(storageKeys) + e.fanOutSeriesRemoval(freedRefs) + } +} + +// fanOutSeriesRemoval notifies every detector that implements the optional +// SeriesRemover interface that the listed SeriesRefs have been freed by +// storage. This keeps detector-side per-series state (BOCPD posterior maps, +// ScanMW/ScanWelch segment trackers, seriesDetectorAdapter visible-count +// maps) symmetric with storage so the LRU caps placed on extractors’ +// contexts actually translate into bounded heap usage end-to-end. +// +// The caller (removeContextRefsForEvictedKeys / Reset / future GC paths) +// is responsible for invoking this with whatever refs storage actually +// freed. Detectors are expected to ignore unknown refs, so it’s safe to +// broadcast the same ref list to all of them. +// +// Concurrency invariant: this method, like every method on engine and +// every detector RemoveSeries / Detect callback, runs only on the single +// goroutine driving observerImpl.run() (observer.go). Ingest, advance, +// detection, and these eviction fan-outs are all serialised through that +// loop, so detector implementations may mutate per-series state without +// taking their own locks. Adding a new caller of this function from a +// different goroutine would break that invariant for every detector. +func (e *engine) fanOutSeriesRemoval(refs []observerdef.SeriesRef) { + if len(refs) == 0 || len(e.detectors) == 0 { + return + } + for _, d := range e.detectors { + if remover, ok := d.(observerdef.SeriesRemover); ok { + remover.RemoveSeries(refs) } } } diff --git a/comp/observer/impl/log_pattern_extractor.go b/comp/observer/impl/log_pattern_extractor.go index d12276943342..b8060eaec125 100644 --- a/comp/observer/impl/log_pattern_extractor.go +++ b/comp/observer/impl/log_pattern_extractor.go @@ -48,6 +48,17 @@ type LogPatternExtractorConfig struct { ClusterTimeToLiveSec int64 `json:"cluster_time_to_live_sec,omitempty"` // GarbageCollectionIntervalSec is the minimum time between GC passes when ClusterTimeToLiveSec > 0. GarbageCollectionIntervalSec int64 `json:"garbage_collection_interval_sec,omitempty"` + // MaxPatternsPerGroup caps the number of live clusters in any single tag + // group. When exceeded, the least-recently-seen cluster is evicted (LRU) + // and its engine context is dropped. Zero means use the default; set + // negative to disable. Bounds memory/series-cardinality on workloads with + // high pattern diversity (e.g. container log churn). + MaxPatternsPerGroup int `json:"max_patterns_per_group,omitempty"` + // MaxTagGroups caps the number of distinct tag groups (source/service/env/host + // combinations) tracked simultaneously. When exceeded, the least-recently- + // touched group's clusters are all evicted at once. Zero means use the + // default; set negative to disable. + MaxTagGroups int `json:"max_tag_groups,omitempty"` } // DefaultLogPatternExtractorConfig returns defaults aligned with the patterns package. @@ -62,6 +73,8 @@ func DefaultLogPatternExtractorConfig() LogPatternExtractorConfig { MaxTokenizedStringLength: 12500, MaxNumTokens: 250, ParseHexDump: &parseHexDump, + MaxPatternsPerGroup: 1024, + MaxTagGroups: 256, } } @@ -155,6 +168,8 @@ func (c *logPatternExtractorContext) removeTaggedCluster(taggedKey string) { // NewLogPatternExtractor creates a new LogPatternExtractor. // A zero-value cfg is accepted; zero fields fall back to DefaultLogPatternExtractorConfig values. +// MaxPatternsPerGroup and MaxTagGroups follow the same convention: 0 → default, +// negative → disabled (unbounded). func NewLogPatternExtractor(cfg LogPatternExtractorConfig) *LogPatternExtractor { // Apply defaults first and then refresh config to finalize it defaults := DefaultLogPatternExtractorConfig() @@ -169,6 +184,12 @@ func NewLogPatternExtractor(cfg LogPatternExtractorConfig) *LogPatternExtractor cfg.GarbageCollectionIntervalSec = defaults.GarbageCollectionIntervalSec } } + if cfg.MaxPatternsPerGroup == 0 { + cfg.MaxPatternsPerGroup = defaults.MaxPatternsPerGroup + } + if cfg.MaxTagGroups == 0 { + cfg.MaxTagGroups = defaults.MaxTagGroups + } cfg.RefreshConfig() registry := NewTagGroupByKeyRegistry() @@ -176,8 +197,15 @@ func NewLogPatternExtractor(cfg LogPatternExtractorConfig) *LogPatternExtractor newSub := func() *patterns.PatternClusterer { return patterns.NewPatternClustererWithTokenizer(tok, cfg.MinTokenMatchRatio) } + tc := NewTaggedPatternClustererWithFactory(registry, newSub) + if cfg.MaxPatternsPerGroup > 0 { + tc.MaxClustersPerGroup = cfg.MaxPatternsPerGroup + } + if cfg.MaxTagGroups > 0 { + tc.MaxTagGroups = cfg.MaxTagGroups + } return &LogPatternExtractor{ - taggedClusterer: NewTaggedPatternClustererWithFactory(registry, newSub), + taggedClusterer: tc, registry: registry, config: cfg, } @@ -252,7 +280,26 @@ func (e *LogPatternExtractor) ProcessLog(log observerdef.LogView) observerdef.Lo message := string(log.GetContent()) groupTags := tagsForPatternGrouping(log.GetTags(), log.GetHostname()) groupHash, cluster, ok := e.taggedClusterer.Process(groupTags, message, logUnixSec) + // Drain LRU evictions — from per-group MaxClusters or whole-group MaxTagGroups + // caps. Treated identically to GC evictions: drop engine context, decrement + // pattern_count telemetry. Done unconditionally so that whole-group evictions + // caused by the new sub-clusterer creation aren't lost when Process returns + // !ok (defensive; current Process only returns !ok for empty messages, after + // which no eviction can have occurred, but keep this path honest). + if evicted := e.taggedClusterer.DrainLRUEvictions(); len(evicted) > 0 { + var lruKeys []string + for _, ev := range evicted { + taggedKey := globalClusterHash(ev.GroupHash, ev.ClusterID) + if e.ctx.keysByTaggedCluster != nil { + lruKeys = append(lruKeys, e.ctx.keysByTaggedCluster[taggedKey]...) + } + e.ctx.removeTaggedCluster(taggedKey) + } + result.EvictedContextKeys = append(result.EvictedContextKeys, lruKeys...) + telemetry = append(telemetry, newTelemetryCounter([]string{"detector:" + e.Name()}, telemetryLogPatternExtractorPatternCount, -float64(len(evicted)), logUnixSec)) + } if !ok { + result.Telemetry = telemetry return result } // Not enough patterns yet, don't emit metric @@ -260,6 +307,7 @@ func (e *LogPatternExtractor) ProcessLog(log observerdef.LogView) observerdef.Lo if cluster.Count == e.config.MinClusterSizeBeforeEmit { telemetry = append(telemetry, newTelemetryCounter([]string{"detector:" + e.Name()}, telemetryLogPatternExtractorPatternCount, 1, logUnixSec)) } else if cluster.Count < e.config.MinClusterSizeBeforeEmit { + result.Telemetry = telemetry return result } diff --git a/comp/observer/impl/log_pattern_extractor_test.go b/comp/observer/impl/log_pattern_extractor_test.go index 826b2a7d9cf2..9dfb60c04444 100644 --- a/comp/observer/impl/log_pattern_extractor_test.go +++ b/comp/observer/impl/log_pattern_extractor_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + observerdef "github.com/DataDog/datadog-agent/comp/observer/def" ) func TestLogPatternExtractor_GetContextByKeyUsesOutputContextKey(t *testing.T) { @@ -333,3 +335,283 @@ func TestLogPatternExtractor_NoGCBeforeInterval(t *testing.T) { res2 := e.ProcessLog(&mockLogView{content: msg, status: "warn", tags: nil, timestampMs: 2_000_000}) assert.Empty(t, res2.EvictedContextKeys) } + +func TestLogPatternExtractor_LRUCapEvictsAndDropsContext(t *testing.T) { + // Configure tight cap with MinClusterSizeBeforeEmit=1 so each new shape + // emits a metric (and therefore a context entry) on its first appearance. + cfg := DefaultLogPatternExtractorConfig() + cfg.MinClusterSizeBeforeEmit = 1 + cfg.MaxPatternsPerGroup = 2 + cfg.MaxTagGroups = -1 // disable group cap so we test only per-group LRU + e := NewLogPatternExtractor(cfg) + + tags := []string{"service:api"} + // Three distinct shapes (different token counts → different signatures → + // different clusters; they don't merge). + msgs := []string{ + "WARN alpha", + "WARN beta gamma", + "WARN x y z w", + } + + var ctxKeys []string + for i, m := range msgs { + res := e.ProcessLog(&mockLogView{ + content: []byte(m), + status: "warn", + tags: tags, + timestampMs: int64(1_000_000 + i*1_000), // 1s apart so LastSeenUnix differs + }) + require.Len(t, res.Metrics, 1, "each distinct shape should emit a metric (i=%d)", i) + ctxKeys = append(ctxKeys, res.Metrics[0].ContextKey) + + switch i { + case 0, 1: + require.Empty(t, res.EvictedContextKeys, "no eviction below or at cap (i=%d)", i) + case 2: + // Third shape pushes over the cap of 2; the oldest (ctxKeys[0]) is evicted. + require.Equal(t, []string{ctxKeys[0]}, res.EvictedContextKeys, + "oldest cluster's context key surfaced for the engine to drop") + // Confirm the engine-side context entry was actually removed. + _, ok := e.GetContextByKey(ctxKeys[0]) + require.False(t, ok, "evicted cluster's pattern context should be gone") + _, ok = e.GetContextByKey(ctxKeys[1]) + require.True(t, ok, "surviving cluster's context still resolvable") + _, ok = e.GetContextByKey(ctxKeys[2]) + require.True(t, ok, "newly-inserted cluster's context resolvable") + // Pattern_count telemetry must include a -1 decrement for the eviction. + var found bool + for _, tel := range res.Telemetry { + if tel.Metric == nil { + continue + } + if tel.Metric.GetName() == "observer.log_pattern_extractor.pattern_count" && tel.Metric.GetValue() < 0 { + found = true + } + } + require.True(t, found, "LRU eviction should emit a negative pattern_count telemetry") + } + } + + require.Equal(t, 1, e.taggedClusterer.NumSubClusterers(), "single tag group across all messages") + require.Len(t, e.taggedClusterer.GetAllClusters(), 2, "cap holds at MaxPatternsPerGroup=2") +} + +func TestLogPatternExtractor_TagGroupCapEvictsLRUGroup(t *testing.T) { + cfg := DefaultLogPatternExtractorConfig() + cfg.MinClusterSizeBeforeEmit = 1 + cfg.MaxPatternsPerGroup = -1 // disable per-group cap + cfg.MaxTagGroups = 2 + e := NewLogPatternExtractor(cfg) + + processAt := func(service string, msg string, tsMs int64) string { + res := e.ProcessLog(&mockLogView{ + content: []byte(msg), + status: "warn", + tags: []string{"service:" + service}, + timestampMs: tsMs, + }) + require.Len(t, res.Metrics, 1) + return res.Metrics[0].ContextKey + } + + // Two groups, two contexts. + kA := processAt("a", "WARN alpha", 1_000_000) + kB := processAt("b", "WARN beta", 1_001_000) + + // Touch A again so B is the LRU group. + _ = processAt("a", "WARN alpha", 1_002_000) + + // Adding a third group must evict B's lone cluster. + res := e.ProcessLog(&mockLogView{ + content: []byte("WARN gamma"), + status: "warn", + tags: []string{"service:c"}, + timestampMs: 1_003_000, + }) + require.Len(t, res.Metrics, 1) + require.Contains(t, res.EvictedContextKeys, kB, "LRU group's context key surfaced") + _, ok := e.GetContextByKey(kB) + require.False(t, ok, "evicted group's context entry removed") + _, ok = e.GetContextByKey(kA) + require.True(t, ok, "surviving group's context preserved") +} + +// TestEngine_LogPatternLRUEvictionFreesStorage is the end-to-end proof that +// the structural leak is fixed: when the extractor's LRU evicts a cluster, +// the engine no longer just drops its contextRefs entry — it also calls +// storage.RemoveSeriesByKeys so the per-series tags slice + columnar arrays +// + sample buffer are actually freed. Before this fix, timeSeriesStorage.series +// grew monotonically for the lifetime of the agent, regardless of LRU caps. +func TestEngine_LogPatternLRUEvictionFreesStorage(t *testing.T) { + cfg := DefaultLogPatternExtractorConfig() + cfg.MinClusterSizeBeforeEmit = 1 + cfg.MaxPatternsPerGroup = 2 + cfg.MaxTagGroups = -1 + extractor := NewLogPatternExtractor(cfg) + + storage := newTimeSeriesStorage() + e := newEngine(engineConfig{ + storage: storage, + extractors: []observerdef.LogMetricsExtractor{extractor}, + }) + + tags := []string{"service:api"} + msgs := []string{ + "WARN alpha", + "WARN beta gamma", + "WARN x y z w", + } + + for i, m := range msgs { + e.IngestLog("src", &logObs{ + content: []byte(m), + status: "warn", + tags: tags, + timestampMs: int64(1_000_000 + i*1_000), + }) + } + + // Without the storage-side eviction, count would be 3 (every shape ever + // seen leaves a series behind). With it, the LRU eviction during the 3rd + // ingest removes cluster #1 from storage before cluster #3's series is + // added, so count is 2. + require.Equal(t, 2, storage.TotalSeriesCount(""), + "LRU eviction must shrink storage; before the fix storage grew unboundedly") + + // All surviving contextRefs must resolve to live storage series — i.e. + // nothing dangling on the engine side either. + for key := range e.contextRefs { + ref, ok := storage.seriesIDs[key] + require.True(t, ok, "engine contextRef without storage series for key %q", key) + require.NotNil(t, storage.GetSeriesMeta(ref), + "engine contextRef points at a retired storage ref for key %q", key) + } + require.Len(t, e.contextRefs, 2, + "engine should keep one contextRef per surviving extractor cluster") +} + +// TestEngine_LogPatternLRUEvictionFreesDetectorState extends +// TestEngine_LogPatternLRUEvictionFreesStorage to cover the detector side. +// Storage frees the evicted series via RemoveSeriesByKeys, but stateful +// detectors keep parallel per-series maps (BOCPD posterior, ScanMW/ScanWelch +// segment trackers, seriesDetectorAdapter.lastVisibleCount) keyed by the +// SeriesRef they observed during Detect(). Without engine.fanOutSeriesRemoval +// those maps grow with the cumulative number of series ever seen, defeating +// the LRU caps on the upstream extractors. This test checks that BOCPD's map +// shrinks back in lockstep with storage. +func TestEngine_LogPatternLRUEvictionFreesDetectorState(t *testing.T) { + cfg := DefaultLogPatternExtractorConfig() + cfg.MinClusterSizeBeforeEmit = 1 + cfg.MaxPatternsPerGroup = 2 + cfg.MaxTagGroups = -1 + extractor := NewLogPatternExtractor(cfg) + + bocpd := NewBOCPDDetector(BOCPDConfig{}) + scanmw := NewScanMWDetector() + scanwelch := NewScanWelchDetector() + + // Stateless detector that does NOT implement SeriesRemover. Registering it + // alongside the stateful ones exercises the fanOutSeriesRemoval type-assertion + // fast-path: detectors without per-series state must be silently skipped, never + // invoked with RemoveSeries (which would panic since they don't implement it), + // and never block the eviction broadcast to the stateful detectors that follow. + stateless := &statelessTestDetector{name: "stateless"} + + storage := newTimeSeriesStorage() + e := newEngine(engineConfig{ + storage: storage, + extractors: []observerdef.LogMetricsExtractor{extractor}, + detectors: []observerdef.Detector{ + bocpd, + scanmw, + scanwelch, + stateless, + }, + }) + + tags := []string{"service:api"} + msgs := []string{ + "WARN alpha", + "WARN beta gamma", + } + for i, m := range msgs { + e.IngestLog("src", &logObs{ + content: []byte(m), + status: "warn", + tags: tags, + timestampMs: int64(1_000_000 + i*1_000), + }) + } + + // Drive Detect() so the detectors observe the series and populate their + // per-series state maps. dataTime needs to be ahead of the last point. + bocpd.Detect(storage, 1_001_000) + scanmw.Detect(storage, 1_001_000) + scanwelch.Detect(storage, 1_001_000) + + bocpdBefore := len(bocpd.series) + scanmwBefore := len(scanmw.series) + scanwelchBefore := len(scanwelch.series) + require.Greater(t, bocpdBefore, 0, "BOCPD should have observed the series before eviction") + require.Greater(t, scanmwBefore, 0, "ScanMW should have observed the series before eviction") + require.Greater(t, scanwelchBefore, 0, "ScanWelch should have observed the series before eviction") + + // Trigger LRU eviction by ingesting a third pattern that pushes the + // oldest cluster out. After the fix, the engine fans the freed refs + // out to every detector and the per-series maps shrink accordingly. + e.IngestLog("src", &logObs{ + content: []byte("WARN x y z w"), + status: "warn", + tags: tags, + timestampMs: 1_002_000, + }) + + // Storage shrunk to two series (LRU cap), so detector maps must now + // have at most two entries per agg too. Without the fan-out, they + // would still hold three entries (one per series ever observed). + require.Equal(t, 2, storage.TotalSeriesCount(""), "LRU should keep storage bounded") + + // Each detector defaults to 2 aggregations (Average, Count). Before the + // fan-out fix, the maps held 3 series × 2 aggs = 6 entries even though + // only 2 series remained live. After the fix the engine drops the evicted + // ref's entries, so we expect at most 2 × 2 = 4 (and certainly fewer than + // the pre-eviction count, which had to grow first to detect the leak). + // Before the fan-out fix the maps held bocpdBefore entries (2 series × + // 2 aggs = 4) and stayed there even after one of the series was evicted, + // because storage cleanup didn't propagate to detector state. After the + // fix, fanOutSeriesRemoval drops exactly the evicted ref's entries. + require.Less(t, len(bocpd.series), bocpdBefore, + "BOCPD per-series map must shrink when storage evicts a series; without the fan-out it stays at %d", bocpdBefore) + require.LessOrEqual(t, len(bocpd.series), 2*len(bocpd.config.Aggregations), + "BOCPD per-series map must not exceed live series × aggregations") + require.Less(t, len(scanmw.series), scanmwBefore, + "ScanMW per-series map must shrink when storage evicts a series; without the fan-out it stays at %d", scanmwBefore) + require.LessOrEqual(t, len(scanmw.series), 2*len(scanmw.Aggregations), + "ScanMW per-series map must not exceed live series × aggregations") + require.Less(t, len(scanwelch.series), scanwelchBefore, + "ScanWelch per-series map must shrink when storage evicts a series; without the fan-out it stays at %d", scanwelchBefore) + require.LessOrEqual(t, len(scanwelch.series), 2*len(scanwelch.Aggregations), + "ScanWelch per-series map must not exceed live series \u00d7 aggregations") + + // Sanity check the stateless detector: registering it alongside the + // stateful ones means the eviction fan-out above iterated over it. The + // SeriesRemover type-assertion in fanOutSeriesRemoval is what keeps that + // safe — if it ever regresses (e.g. someone replaces the optional check + // with a hard call), this test panics on the runtime type-assertion + // failure during the eviction triggered above. + _ = stateless +} + +// statelessTestDetector is a minimal observerdef.Detector that intentionally +// does NOT implement observerdef.SeriesRemover. Used to verify that the +// engine's eviction fan-out doesn't assume every detector tracks per-series +// state. +type statelessTestDetector struct { + name string +} + +func (s *statelessTestDetector) Name() string { return s.name } +func (s *statelessTestDetector) Detect(_ observerdef.StorageReader, _ int64) observerdef.DetectionResult { + return observerdef.DetectionResult{} +} diff --git a/comp/observer/impl/log_tagged_pattern_clusterer.go b/comp/observer/impl/log_tagged_pattern_clusterer.go index 18568c317a7d..4d3b1835ca24 100644 --- a/comp/observer/impl/log_tagged_pattern_clusterer.go +++ b/comp/observer/impl/log_tagged_pattern_clusterer.go @@ -6,6 +6,7 @@ package observerimpl import ( + "container/heap" "encoding/binary" "fmt" "hash/fnv" @@ -152,6 +153,35 @@ type TaggedPatternClusterer struct { registry *TagGroupByKeyRegistry subClusterers map[uint64]*patterns.PatternClusterer newPatternClusterer func() *patterns.PatternClusterer + // MaxClustersPerGroup, when > 0, is propagated as patterns.PatternClusterer.MaxClusters + // on each newly created sub-clusterer; existing sub-clusterers are NOT + // retroactively updated. Zero means unbounded. + MaxClustersPerGroup int + // MaxTagGroups, when > 0, caps the number of live sub-clusterers. When a new + // tag group would push subClusterers past this size, the least-recently- + // touched group (smallest entry in lastTouchByGroup) is evicted; all of + // its clusters are surfaced via DrainLRUEvictions. Zero disables the cap. + MaxTagGroups int + // lastTouchByGroup tracks the most recent unixSec passed to Process for + // each group; consulted only when MaxTagGroups > 0. + lastTouchByGroup map[uint64]int64 + // touchHeap is a lazy-deletion min-heap of (touch, groupHash) entries. + // Each Process call that updates lastTouchByGroup pushes a new entry rather + // than performing an in-place decrease-key (which would be O(N) in + // container/heap). On eviction we pop until the top entry's touch matches + // the current lastTouchByGroup value for its hash — stale entries (older + // touch than the current map value, or a hash already deleted from the map) + // are silently dropped. This replaces an O(N) full scan over subClusterers + // with amortised O(log N) eviction. + // + // Bounded growth: the heap can accumulate at most one entry per touch; + // when it exceeds heapCompactionThreshold * len(subClusterers) we rebuild + // it from lastTouchByGroup so memory stays O(MaxTagGroups). + touchHeap *groupTouchHeap + // lruEvicted accumulates per-cluster evictions from both layer-1 (per-group + // MaxClusters cap inside a sub-clusterer) and layer-2 (MaxTagGroups cap + // here) since the last DrainLRUEvictions. + lruEvicted []EvictedCluster } // NewTaggedPatternClusterer creates a TaggedPatternClusterer that writes group @@ -176,23 +206,203 @@ func NewTaggedPatternClustererWithFactory(registry *TagGroupByKeyRegistry, newPC // Process extracts the tag group from tags, routes the message to the matching // sub-clusterer (created lazily), and returns the group hash plus the cluster. // unixSec is Unix seconds for timestamp tracking (use time.Now().Unix() when unknown). +// +// LRU evictions (if any) caused by this call — from per-group MaxClusters or +// global MaxTagGroups caps — must be retrieved via DrainLRUEvictions before +// the next Process call to avoid silently dropping eviction context. func (tc *TaggedPatternClusterer) Process(tags []string, message string, unixSec int64) (uint64, *patterns.Cluster, bool) { group := extractTagGroupByKey(tags) groupHash := tc.registry.Register(group) sub, exists := tc.subClusterers[groupHash] if !exists { + // Two-phase create: build a transient sub-clusterer but do NOT + // commit it (insert into tc.subClusterers, possibly evicting the + // LRU group to make room) until sub.Process actually accepts the + // message. Otherwise an empty/whitespace-only first message from + // a new tag group — which PatternClusterer rejects when IgnoreEmpty + // is on — would steal a slot from an active group while leaving an + // empty sub-clusterer behind to count against MaxTagGroups. A burst + // of empty logs from new containers could then evict real pattern + // state and suppress later anomalies. sub = tc.newPatternClusterer() - tc.subClusterers[groupHash] = sub + sub.MaxClusters = tc.MaxClustersPerGroup } cluster, ok := sub.Process(message, unixSec) if !ok { + // Transient sub-clusterer (when !exists) is dropped on the floor + // here — nothing was ever inserted into tc.subClusterers, so no + // eviction or LRU bookkeeping is needed. return 0, nil, false } + + // Process accepted the message; only now do we commit the new + // sub-clusterer (and evict the LRU group if we've hit the cap). + if !exists { + tc.evictLRUTagGroupIfOverCap(groupHash) + tc.subClusterers[groupHash] = sub + } + + // Drain layer-1 LRU evictions from this sub-clusterer and tag them with groupHash. + if evicted := sub.DrainLRUEvictedClusterIDs(); len(evicted) > 0 { + for _, id := range evicted { + tc.lruEvicted = append(tc.lruEvicted, EvictedCluster{GroupHash: groupHash, ClusterID: id}) + } + } + + if tc.MaxTagGroups > 0 { + if tc.lastTouchByGroup == nil { + tc.lastTouchByGroup = make(map[uint64]int64) + } + tc.lastTouchByGroup[groupHash] = unixSec + if tc.touchHeap == nil { + tc.touchHeap = &groupTouchHeap{} + heap.Init(tc.touchHeap) + } + heap.Push(tc.touchHeap, groupTouchEntry{touch: unixSec, hash: groupHash}) + tc.maybeCompactTouchHeap() + } + return groupHash, cluster, true } +// evictLRUTagGroupIfOverCap removes the least-recently-touched tag group when +// adding a new group would exceed MaxTagGroups. The about-to-be-added groupHash +// is excluded from eviction. All clusters belonging to the evicted group are +// surfaced via DrainLRUEvictions and the group is removed from +// lastTouchByGroup. The group's hash remains in the registry (registry is +// append-only by design). +// +// Implementation: pops stale entries off touchHeap (entries whose touch no +// longer matches lastTouchByGroup, or whose hash has already been deleted) +// until the top is a valid victim, then evicts it. Groups never touched by +// Process (i.e. absent from lastTouchByGroup) cannot be victims because their +// hash never appears in the heap; in that pathological case we fall back to +// the original O(N) scan so behaviour matches the pre-heap implementation. +func (tc *TaggedPatternClusterer) evictLRUTagGroupIfOverCap(incoming uint64) { + if tc.MaxTagGroups <= 0 || len(tc.subClusterers) < tc.MaxTagGroups { + return + } + if tc.touchHeap != nil { + for tc.touchHeap.Len() > 0 { + top := (*tc.touchHeap)[0] + if top.hash == incoming { + // Skip the incoming group: it would be re-pushed and we don't + // want to evict it. Pop the stale entry and try the next. + heap.Pop(tc.touchHeap) + continue + } + current, present := tc.lastTouchByGroup[top.hash] + if !present || current != top.touch { + // Stale entry: the hash was either evicted earlier or has a + // newer touch already in the heap. Drop it and continue. + heap.Pop(tc.touchHeap) + continue + } + // Valid victim. + heap.Pop(tc.touchHeap) + if sub, ok := tc.subClusterers[top.hash]; ok { + for _, c := range sub.GetClusters() { + tc.lruEvicted = append(tc.lruEvicted, EvictedCluster{GroupHash: top.hash, ClusterID: c.ID}) + } + } + delete(tc.subClusterers, top.hash) + delete(tc.lastTouchByGroup, top.hash) + return + } + } + // Fallback: heap was empty (e.g. groups never touched while MaxTagGroups + // was 0) but we are now over cap. Use the original O(N) scan to find a + // victim. This path is exercised only when MaxTagGroups is enabled + // retroactively after Process was already called with it disabled. + var victim uint64 + var victimUnix int64 + victimSet := false + for gh := range tc.subClusterers { + if gh == incoming { + continue + } + touch := tc.lastTouchByGroup[gh] + if !victimSet || touch < victimUnix { + victim = gh + victimUnix = touch + victimSet = true + } + } + if !victimSet { + return + } + if sub, ok := tc.subClusterers[victim]; ok { + for _, c := range sub.GetClusters() { + tc.lruEvicted = append(tc.lruEvicted, EvictedCluster{GroupHash: victim, ClusterID: c.ID}) + } + } + delete(tc.subClusterers, victim) + delete(tc.lastTouchByGroup, victim) +} + +// heapCompactionThreshold sets when maybeCompactTouchHeap rebuilds touchHeap +// from scratch: when the heap has more than threshold * len(subClusterers) +// entries the cost of rebuilding is amortised against the savings from no +// longer carrying stale entries through future heap operations. Tuned so +// rebuilds happen rarely under steady-state churn but reliably under heavy +// per-group re-touching. +const heapCompactionThreshold = 4 + +func (tc *TaggedPatternClusterer) maybeCompactTouchHeap() { + if tc.touchHeap == nil { + return + } + if tc.touchHeap.Len() <= heapCompactionThreshold*len(tc.lastTouchByGroup) { + return + } + rebuilt := make(groupTouchHeap, 0, len(tc.lastTouchByGroup)) + for hash, touch := range tc.lastTouchByGroup { + rebuilt = append(rebuilt, groupTouchEntry{touch: touch, hash: hash}) + } + heap.Init(&rebuilt) + tc.touchHeap = &rebuilt +} + +// groupTouchEntry is one entry in the lazy-deletion min-heap used by +// evictLRUTagGroupIfOverCap. Entries become stale when the matching hash +// receives a newer touch (a fresh entry is pushed; the old one is left to be +// skipped at the top of the heap) or when the hash is evicted entirely +// (no longer present in lastTouchByGroup). +type groupTouchEntry struct { + touch int64 + hash uint64 +} + +// groupTouchHeap is a min-heap of groupTouchEntry ordered by touch ascending. +// Implements heap.Interface. +type groupTouchHeap []groupTouchEntry + +func (h groupTouchHeap) Len() int { return len(h) } +func (h groupTouchHeap) Less(i, j int) bool { return h[i].touch < h[j].touch } +func (h groupTouchHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } +func (h *groupTouchHeap) Push(x any) { *h = append(*h, x.(groupTouchEntry)) } +func (h *groupTouchHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[:n-1] + return x +} + +// DrainLRUEvictions returns and clears all LRU evictions accumulated since the +// last call. Includes both per-group MaxClusters evictions and whole-group +// MaxTagGroups evictions. GC evictions go through GarbageCollectBefore instead. +func (tc *TaggedPatternClusterer) DrainLRUEvictions() []EvictedCluster { + if len(tc.lruEvicted) == 0 { + return nil + } + out := tc.lruEvicted + tc.lruEvicted = nil + return out +} + // GetCluster retrieves a cluster by group hash and intra-clusterer ID. func (tc *TaggedPatternClusterer) GetCluster(groupHash uint64, clusterID int64) (*patterns.Cluster, error) { sub, ok := tc.subClusterers[groupHash] @@ -203,9 +413,13 @@ func (tc *TaggedPatternClusterer) GetCluster(groupHash uint64, clusterID int64) } // Reset drops all sub-clusterers. The registry is intentionally kept so that -// previously registered hashes remain resolvable after a reset. +// previously registered hashes remain resolvable after a reset. LRU bookkeeping +// (lastTouchByGroup, pending evictions) is also cleared. func (tc *TaggedPatternClusterer) Reset() { tc.subClusterers = make(map[uint64]*patterns.PatternClusterer) + tc.lastTouchByGroup = nil + tc.touchHeap = nil + tc.lruEvicted = nil } // NumSubClusterers returns the number of currently active sub-clusterers. diff --git a/comp/observer/impl/log_tagged_pattern_clusterer_test.go b/comp/observer/impl/log_tagged_pattern_clusterer_test.go index 8535bc2f3f3b..b90c36f55e22 100644 --- a/comp/observer/impl/log_tagged_pattern_clusterer_test.go +++ b/comp/observer/impl/log_tagged_pattern_clusterer_test.go @@ -199,3 +199,122 @@ func TestExtractTagGroupByKey_MalformedTagsIgnored(t *testing.T) { g := extractTagGroupByKey([]string{"nocolon", "service:api"}) assert.Equal(t, TagGroupByKey{Service: "api"}, g) } + +func TestTaggedPatternClusterer_MaxClustersPerGroupPropagated(t *testing.T) { + tc := NewTaggedPatternClusterer(NewTagGroupByKeyRegistry()) + tc.MaxClustersPerGroup = 2 + tags := []string{"source:s", "service:svc", "env:prod", "host:h"} + + // 3 distinct (length-varying) shapes → first eviction surfaces via DrainLRUEvictions. + tc.Process(tags, "alpha", 1000) + tc.Process(tags, "beta gamma", 1001) + require.Empty(t, tc.DrainLRUEvictions(), "no eviction at cap") + + _, _, ok := tc.Process(tags, "x y z", 1002) + require.True(t, ok) + evicted := tc.DrainLRUEvictions() + require.Len(t, evicted, 1, "one cluster evicted by per-group cap") + require.Equal(t, int64(0), evicted[0].ClusterID) +} + +func TestTaggedPatternClusterer_MaxTagGroupsEvictsLRUGroup(t *testing.T) { + tc := NewTaggedPatternClusterer(NewTagGroupByKeyRegistry()) + tc.MaxTagGroups = 2 + + tagsA := []string{"service:a"} + tagsB := []string{"service:b"} + tagsC := []string{"service:c"} + + // Touch group A first (oldest), then B; then route to C and expect A evicted. + hashA, _, _ := tc.Process(tagsA, "alpha one", 1000) + hashA2, _, _ := tc.Process(tagsA, "two distinct shape pattern", 1001) // second cluster within A + require.Equal(t, hashA, hashA2) + require.Equal(t, 1, tc.NumSubClusterers()) + + hashB, _, _ := tc.Process(tagsB, "beta", 1100) + require.NotEqual(t, hashA, hashB) + require.Equal(t, 2, tc.NumSubClusterers()) + require.Empty(t, tc.DrainLRUEvictions(), "at cap, no eviction yet") + + // Now touch A again to make B the LRU group. + tc.Process(tagsA, "alpha one", 1200) + require.Empty(t, tc.DrainLRUEvictions()) + + // Adding C must evict B (least-recently touched at unixSec=1100). + hashC, _, _ := tc.Process(tagsC, "gamma", 1300) + require.NotEqual(t, hashB, hashC) + require.Equal(t, 2, tc.NumSubClusterers(), "cap holds at MaxTagGroups") + evicted := tc.DrainLRUEvictions() + require.NotEmpty(t, evicted, "the LRU group's clusters are surfaced as evictions") + for _, ev := range evicted { + require.Equal(t, hashB, ev.GroupHash, "all evictions tagged with the evicted group's hash") + } +} + +// TestTaggedPatternClusterer_EmptyMessageFromNewGroupDoesNotEvict regresses +// the bug where an empty (or whitespace-only) first message from a brand-new +// tag group, while at MaxTagGroups capacity, would trigger eviction of an +// active group BEFORE the inner PatternClusterer.Process rejected the empty +// message. The active group's clusters were lost, an empty sub-clusterer +// stayed wedged into tc.subClusterers counting against the cap, and a stream +// of empty logs from new containers could evict real pattern state and +// suppress later anomalies. +// +// The fix is two-phase create in Process: build a transient sub-clusterer +// without committing it; only insert + evict-LRU after sub.Process returns +// ok. This test confirms an empty message from a new group at-cap is a +// no-op for both the existing groups and the eviction queue. +func TestTaggedPatternClusterer_EmptyMessageFromNewGroupDoesNotEvict(t *testing.T) { + tc := NewTaggedPatternClusterer(NewTagGroupByKeyRegistry()) + tc.MaxTagGroups = 2 + + tagsA := []string{"service:a"} + tagsB := []string{"service:b"} + tagsC := []string{"service:c"} // would be the new “third” group at-cap + + hashA, _, okA := tc.Process(tagsA, "alpha", 1000) + require.True(t, okA) + hashB, _, okB := tc.Process(tagsB, "beta", 1100) + require.True(t, okB) + require.Equal(t, 2, tc.NumSubClusterers(), "both groups resident at cap") + require.Empty(t, tc.DrainLRUEvictions()) + + for _, msg := range []string{"", " ", "\t\n"} { + hashC, _, okC := tc.Process(tagsC, msg, 1200) + require.False(t, okC, "empty/whitespace messages must be rejected by inner Process") + require.Equal(t, uint64(0), hashC, "rejected calls return zero hash") + require.Equal(t, 2, tc.NumSubClusterers(), + "empty msg from new group at-cap must NOT create a sub-clusterer (would steal a slot from A or B)") + require.Empty(t, tc.DrainLRUEvictions(), + "empty msg from new group at-cap must NOT evict an existing group") + } + + // Both original groups must still be reachable after the empty stream. + gotA, _, _ := tc.Process(tagsA, "alpha", 1300) + require.Equal(t, hashA, gotA, "group A must survive a stream of empty new-group messages") + gotB, _, _ := tc.Process(tagsB, "beta", 1301) + require.Equal(t, hashB, gotB, "group B must survive a stream of empty new-group messages") +} + +func TestTaggedPatternClusterer_DrainLRUEvictionsIsOneShot(t *testing.T) { + tc := NewTaggedPatternClusterer(NewTagGroupByKeyRegistry()) + tc.MaxClustersPerGroup = 1 + tags := []string{"service:s"} + tc.Process(tags, "alpha", 1000) + tc.Process(tags, "beta gamma", 1001) // triggers eviction of cluster id=0 + first := tc.DrainLRUEvictions() + require.Len(t, first, 1) + second := tc.DrainLRUEvictions() + require.Empty(t, second, "drain returns nothing on the second call") +} + +func TestTaggedPatternClusterer_ResetClearsLRUState(t *testing.T) { + tc := NewTaggedPatternClusterer(NewTagGroupByKeyRegistry()) + tc.MaxClustersPerGroup = 1 + tc.MaxTagGroups = 1 + tc.Process([]string{"service:a"}, "alpha", 1000) + tc.Process([]string{"service:b"}, "beta", 1001) // evicts service:a's group + tc.Reset() + require.Empty(t, tc.DrainLRUEvictions(), "Reset must clear pending evictions") + require.Equal(t, 0, tc.NumSubClusterers()) +} diff --git a/comp/observer/impl/metrics_detector_bocpd.go b/comp/observer/impl/metrics_detector_bocpd.go index 76d3f8365201..1eb3a8662035 100644 --- a/comp/observer/impl/metrics_detector_bocpd.go +++ b/comp/observer/impl/metrics_detector_bocpd.go @@ -256,6 +256,27 @@ func (b *BOCPDDetector) Reset() { b.cachedGen = 0 } +// RemoveSeries drops posterior state for refs that storage has freed. +// Each (ref, agg) entry in the per-series map carries six float64 arrays +// of size MaxRunLength+2 (~9.7 KB at default config), so without this +// teardown the map grows with the cumulative number of series ever seen +// even after their storage payload is gone. Called by the engine right +// after timeSeriesStorage.RemoveSeriesByKeys returns the freed refs. +func (b *BOCPDDetector) RemoveSeries(refs []observer.SeriesRef) { + if len(refs) == 0 || len(b.series) == 0 { + return + } + for _, ref := range refs { + for _, agg := range b.config.Aggregations { + delete(b.series, bocpdStateKey{ref: ref, agg: agg}) + } + } + // Drop the cached series snapshot so the next Detect re-lists from + // storage and we don't iterate over removed refs. + b.cachedSeries = nil + b.cachedGen = 0 +} + // processPoint handles a single new observation for a series. // Returns an anomaly pointer if this point triggers a new alert onset. func (b *BOCPDDetector) processPoint(state *bocpdSeriesState, p observer.Point, series *observer.Series, agg observer.Aggregate) *observer.Anomaly { diff --git a/comp/observer/impl/metrics_detector_scanmw.go b/comp/observer/impl/metrics_detector_scanmw.go index 1e0e45835140..3f46b02e6f71 100644 --- a/comp/observer/impl/metrics_detector_scanmw.go +++ b/comp/observer/impl/metrics_detector_scanmw.go @@ -107,6 +107,25 @@ func (d *ScanMWDetector) Reset() { d.cachedGen = 0 } +// RemoveSeries drops segment-tracking state for refs that storage has freed. +// Each per-series entry holds a reusable point buffer that grows to the +// segment size, so without this teardown the map keeps growing with the +// cumulative series count even after storage shrinks. Called by the engine +// right after timeSeriesStorage.RemoveSeriesByKeys returns the freed refs. +func (d *ScanMWDetector) RemoveSeries(refs []observer.SeriesRef) { + d.ensureDefaults() + if len(refs) == 0 || len(d.series) == 0 { + return + } + for _, ref := range refs { + for _, agg := range d.Aggregations { + delete(d.series, scanmwStateKey{ref: ref, agg: agg}) + } + } + d.cachedSeries = nil + d.cachedGen = 0 +} + // Detect implements Detector. It discovers series, reads segment data, // and scans for changepoints. After finding one, the segment start advances // so subsequent calls only examine post-change data. diff --git a/comp/observer/impl/metrics_detector_scanwelch.go b/comp/observer/impl/metrics_detector_scanwelch.go index a2471a6b2138..f0ddf3751487 100644 --- a/comp/observer/impl/metrics_detector_scanwelch.go +++ b/comp/observer/impl/metrics_detector_scanwelch.go @@ -100,6 +100,25 @@ func (d *ScanWelchDetector) Reset() { d.cachedGen = 0 } +// RemoveSeries drops segment-tracking state for refs that storage has freed. +// Each per-series entry holds a reusable point buffer that grows to the +// segment size, so without this teardown the map keeps growing with the +// cumulative series count even after storage shrinks. Called by the engine +// right after timeSeriesStorage.RemoveSeriesByKeys returns the freed refs. +func (d *ScanWelchDetector) RemoveSeries(refs []observer.SeriesRef) { + d.ensureDefaults() + if len(refs) == 0 || len(d.series) == 0 { + return + } + for _, ref := range refs { + for _, agg := range d.Aggregations { + delete(d.series, scanwelchStateKey{ref: ref, agg: agg}) + } + } + d.cachedSeries = nil + d.cachedGen = 0 +} + // Detect implements Detector. Same iteration pattern as ScanMW and BOCPD — // consider dedup if more scan-based detectors are added. func (d *ScanWelchDetector) Detect(storage observer.StorageReader, dataTime int64) observer.DetectionResult { diff --git a/comp/observer/impl/observer.go b/comp/observer/impl/observer.go index 563165fc6026..94d02ae6a75b 100644 --- a/comp/observer/impl/observer.go +++ b/comp/observer/impl/observer.go @@ -574,7 +574,18 @@ type seriesDetectorAdapter struct { // bounding per-call cost to O(windowSec) instead of O(totalPoints). windowSec int64 - lastVisibleCount map[string]int + // cachedSeries / cachedGen mirror the pattern used by BOCPDDetector, + // ScanWelchDetector, and ScanMWDetector: storage.SeriesGeneration() only + // advances when a brand-new series key is created, so we can avoid the + // per-Detect full-map ListSeries scan on steady-state cardinality. + cachedSeries []observerdef.SeriesMeta + cachedGen uint64 + + // lastVisibleCount is keyed by the storage's compact SeriesRef so we + // avoid rebuilding a string key per series per Detect call. SeriesRefs + // are append-only (storage.go:217) so they remain stable for the lifetime + // of a series. + lastVisibleCount map[observerdef.SeriesRef]int } func newSeriesDetectorAdapter(detector observerdef.SeriesDetector, aggregations []observerdef.Aggregate) *seriesDetectorAdapter { @@ -582,7 +593,7 @@ func newSeriesDetectorAdapter(detector observerdef.SeriesDetector, aggregations detector: detector, aggregations: aggregations, windowSec: defaultDetectorWindowSec, - lastVisibleCount: make(map[string]int), + lastVisibleCount: make(map[observerdef.SeriesRef]int), } } @@ -592,25 +603,55 @@ func (a *seriesDetectorAdapter) Name() string { // Reset clears adapter-local caches and resets the wrapped detector when supported. func (a *seriesDetectorAdapter) Reset() { - a.lastVisibleCount = make(map[string]int) + a.lastVisibleCount = make(map[observerdef.SeriesRef]int) + a.cachedSeries = nil + a.cachedGen = 0 if resetter, ok := a.detector.(interface{ Reset() }); ok { resetter.Reset() } } +// RemoveSeries drops adapter-local point-count tracking for the given refs +// and forwards the call to the wrapped detector if it also tracks per-series +// state. Without this hook lastVisibleCount grows with the cumulative number +// of series ever observed even after storage frees them. +// +// Concurrency invariant: this method runs on the single observerImpl.run() +// goroutine that drives every other adapter callback (Detect, Reset). The +// engine's fanOutSeriesRemoval is the only caller. Mutating lastVisibleCount +// and cachedSeries without a lock is safe under that invariant only. +func (a *seriesDetectorAdapter) RemoveSeries(refs []observerdef.SeriesRef) { + if len(refs) == 0 { + return + } + if len(a.lastVisibleCount) > 0 { + for _, ref := range refs { + delete(a.lastVisibleCount, ref) + } + } + a.cachedSeries = nil + a.cachedGen = 0 + if remover, ok := a.detector.(observerdef.SeriesRemover); ok { + remover.RemoveSeries(refs) + } +} + func (a *seriesDetectorAdapter) Detect(storage observerdef.StorageReader, dataTime int64) observerdef.DetectionResult { - allSeries := storage.ListSeries(observerdef.WorkloadSeriesFilter()) + gen := storage.SeriesGeneration() + if a.cachedSeries == nil || gen != a.cachedGen { + a.cachedSeries = storage.ListSeries(observerdef.WorkloadSeriesFilter()) + a.cachedGen = gen + } var allAnomalies []observerdef.Anomaly var allTelemetry []observerdef.ObserverTelemetry - for _, meta := range allSeries { - keyStr := seriesKey(meta.Namespace, meta.Name, meta.Tags) + for _, meta := range a.cachedSeries { visibleCount := storage.PointCountUpTo(meta.Ref, dataTime) - if prev, ok := a.lastVisibleCount[keyStr]; ok && prev == visibleCount { + if prev, ok := a.lastVisibleCount[meta.Ref]; ok && prev == visibleCount { continue } - a.lastVisibleCount[keyStr] = visibleCount + a.lastVisibleCount[meta.Ref] = visibleCount for _, agg := range a.aggregations { start := int64(0) diff --git a/comp/observer/impl/patterns/cluster.go b/comp/observer/impl/patterns/cluster.go index 76cc087e9dbf..b4c4fb968873 100644 --- a/comp/observer/impl/patterns/cluster.go +++ b/comp/observer/impl/patterns/cluster.go @@ -162,6 +162,13 @@ type PatternClusterer struct { // match by value for an incoming line to merge into an existing cluster. Set at // construction from rawMinTokenMatchRatio via effectiveMinTokenMatchRatio. minTokenMatchRatio float64 + // MaxClusters caps the number of live clusters. When a new cluster would push + // allClusters past this size, the cluster with the smallest LastSeenUnix is + // evicted (LRU). Zero (the default) disables the cap. + MaxClusters int + // lruEvicted holds cluster IDs evicted by the LRU cap since the last + // DrainLRUEvictedClusterIDs call. Distinct from GC eviction. + lruEvicted []int64 } func NewPatternClusterer() *PatternClusterer { @@ -242,9 +249,53 @@ func (pc *PatternClusterer) ProcessTokens(tokens []Token, message string, unixSe pc.allClusters = append(pc.allClusters, c) pc.nextID++ + pc.evictLRUOverCap(c.ID) + return c, true } +// evictLRUOverCap evicts the least-recently-seen cluster (smallest LastSeenUnix) +// when MaxClusters > 0 and len(allClusters) exceeds it. The just-inserted cluster +// id (justInserted) is excluded from eviction; in the corner case where every +// existing cluster has a younger or equal LastSeenUnix than justInserted, the +// oldest non-just-inserted cluster is evicted. Evicted IDs are appended to +// pc.lruEvicted; callers retrieve them via DrainLRUEvictedClusterIDs. +func (pc *PatternClusterer) evictLRUOverCap(justInserted int64) { + if pc.MaxClusters <= 0 || len(pc.allClusters) <= pc.MaxClusters { + return + } + // Find min-LastSeenUnix cluster, excluding justInserted. + var oldestIdx = -1 + var oldestUnix int64 + for i, c := range pc.allClusters { + if c.ID == justInserted { + continue + } + if oldestIdx < 0 || c.LastSeenUnix < oldestUnix { + oldestIdx = i + oldestUnix = c.LastSeenUnix + } + } + if oldestIdx < 0 { + return + } + evictID := pc.allClusters[oldestIdx].ID + _ = pc.RemoveClusters([]int64{evictID}) + pc.lruEvicted = append(pc.lruEvicted, evictID) +} + +// DrainLRUEvictedClusterIDs returns and clears the IDs of clusters evicted by +// the LRU cap (MaxClusters) since the last call. GC-evicted clusters are NOT +// reported here; see ClusterIDsBeforeUnix / RemoveClusters. +func (pc *PatternClusterer) DrainLRUEvictedClusterIDs() []int64 { + if len(pc.lruEvicted) == 0 { + return nil + } + out := pc.lruEvicted + pc.lruEvicted = nil + return out +} + // ClusterIDsBeforeUnix returns IDs of clusters whose LastSeenUnix is strictly // less than cutoff (both in Unix seconds). func (pc *PatternClusterer) ClusterIDsBeforeUnix(cutoff int64) []int64 { diff --git a/comp/observer/impl/patterns/cluster_test.go b/comp/observer/impl/patterns/cluster_test.go index 2aa0b004f289..c03c643a7a84 100644 --- a/comp/observer/impl/patterns/cluster_test.go +++ b/comp/observer/impl/patterns/cluster_test.go @@ -6,7 +6,11 @@ package patterns import ( + "fmt" + "strings" "testing" + + "github.com/stretchr/testify/require" ) // Fixed event time for SignatureClusterer tests (Unix seconds, non-zero). @@ -606,3 +610,66 @@ func TestPatternClustererRemoveClusters(t *testing.T) { t.Fatalf("expected 0 clusters, got %d", pc.NumClusters()) } } + +func TestPatternClusterer_MaxClustersEvictsLRU(t *testing.T) { + pc := NewPatternClustererWithTokenizer(NewTokenizer(), 0) + pc.MaxClusters = 2 + + // Three distinct signatures → three clusters; cap is 2. + _, ok := pc.Process("first kind of message", 1000) + require.True(t, ok) + require.Empty(t, pc.DrainLRUEvictedClusterIDs(), "no eviction below cap") + + _, ok = pc.Process("alpha beta gamma delta", 1001) + require.True(t, ok) + require.Empty(t, pc.DrainLRUEvictedClusterIDs(), "no eviction at cap") + require.Equal(t, 2, pc.NumClusters()) + + // This third distinct cluster pushes us over cap. The oldest (id=0, + // LastSeenUnix=1000) must be evicted. + _, ok = pc.Process("x y z w", 1002) + require.True(t, ok) + require.Equal(t, 2, pc.NumClusters(), "cap holds at MaxClusters") + evicted := pc.DrainLRUEvictedClusterIDs() + require.Equal(t, []int64{0}, evicted, "oldest cluster evicted") + require.Empty(t, pc.DrainLRUEvictedClusterIDs(), "drain is one-shot") +} + +func TestPatternClusterer_MaxClustersExcludesJustInserted(t *testing.T) { + // Even when the just-inserted cluster has the smallest LastSeenUnix + // (e.g. backdated log timestamp), it must not be the eviction victim. + pc := NewPatternClustererWithTokenizer(NewTokenizer(), 0) + pc.MaxClusters = 2 + + pc.Process("alpha", 1100) + pc.Process("beta gamma", 1200) + _, ok := pc.Process("newer pattern shape", 900) + require.True(t, ok) + evicted := pc.DrainLRUEvictedClusterIDs() + require.Len(t, evicted, 1) + require.Equal(t, int64(0), evicted[0], "oldest pre-existing cluster evicted, not the just-inserted one") + require.Equal(t, 2, pc.NumClusters()) + // Confirm the survivor set: the just-inserted cluster (id=2) and the more + // recent pre-existing one (id=1) should remain. + ids := []int64{} + for _, c := range pc.GetClusters() { + ids = append(ids, c.ID) + } + require.ElementsMatch(t, []int64{1, 2}, ids) +} + +func TestPatternClusterer_MaxClustersZeroIsUnbounded(t *testing.T) { + pc := NewPatternClustererWithTokenizer(NewTokenizer(), 0) + require.Equal(t, 0, pc.MaxClusters) + // Use distinct word counts to guarantee distinct signatures (canMergeTokenLists + // requires equal length); 50 unique cluster shapes. + for i := 0; i < 50; i++ { + parts := make([]string, i+1) + for j := range parts { + parts[j] = fmt.Sprintf("word%c", 'a'+j%26) + } + pc.Process(strings.Join(parts, " "), int64(1000+i)) + } + require.Equal(t, 50, pc.NumClusters()) + require.Empty(t, pc.DrainLRUEvictedClusterIDs()) +} diff --git a/comp/observer/impl/patterns/signature.go b/comp/observer/impl/patterns/signature.go index 0c4601e80933..eae659fce9e1 100644 --- a/comp/observer/impl/patterns/signature.go +++ b/comp/observer/impl/patterns/signature.go @@ -8,58 +8,57 @@ package patterns import "strings" // TokenListSignature computes the signature of a list of tokens. -// Sequences of word-like tokens separated by single '.' or ':' are collapsed into "specialWord". +// Sequences of word-like tokens separated by single '.' or ':' are collapsed +// into "specialWord". A single pass over the tokens emits directly into a +// builder, with a small lookahead state for chain detection — no per-token +// signature string slice is allocated. func TokenListSignature(tokens []Token) string { n := len(tokens) if n == 0 { return "" } - sigs := make([]string, n) - for i, t := range tokens { - sigs[i] = t.Signature() - } + var b strings.Builder + // Most token signatures are short; this is a generous estimate. + b.Grow(n * 6) - // In-place collapse: chains of word-like tokens separated by '.' or ':' become "specialWord". - // Reading always moves forward (lookahead at j+1 is never behind any write position), - // so mutating sigs in place is safe. i := 0 for i < n { - if !isWordLikeForConcat(sigs[i]) { + t := tokens[i] + if !isTokenWordLikeForConcat(t) { + b.WriteString(t.Signature()) i++ continue } - chainStart := i + // Probe forward for a chain `word ('.'|':') word ...`. We require at + // least one separator+word to flip to "specialWord"; otherwise emit the + // single word's normal signature. j := i + 1 - for j+1 < n { - if tokens[j].Type == TypeSpecialCharacter && - (tokens[j].Value == "." || tokens[j].Value == ":") && - isWordLikeForConcat(sigs[j+1]) { - j += 2 - } else { - break - } + chained := false + for j+1 < n && + tokens[j].Type == TypeSpecialCharacter && + (tokens[j].Value == "." || tokens[j].Value == ":") && + isTokenWordLikeForConcat(tokens[j+1]) { + j += 2 + chained = true } - if j > chainStart+1 { - sigs[chainStart] = "specialWord" - for k := chainStart + 1; k < j; k++ { - sigs[k] = "" - } + if chained { + b.WriteString("specialWord") + i = j + continue } - i = j + b.WriteString(t.Signature()) + i++ } - var b strings.Builder - for _, s := range sigs { - if s != "" { - b.WriteString(s) - } - } return b.String() } -func isWordLikeForConcat(sig string) bool { - return sig == "word" || sig == "specialWord" +// isTokenWordLikeForConcat is true when a token participates in the +// "specialWord" chain collapse. Word tokens always qualify; the chained form +// becomes "specialWord" via the caller. +func isTokenWordLikeForConcat(t Token) bool { + return t.Type == TypeWord } // Parse tokenizes a message and returns the token list. diff --git a/comp/observer/impl/patterns/token.go b/comp/observer/impl/patterns/token.go index bf4948401561..ab30458f8ba8 100644 --- a/comp/observer/impl/patterns/token.go +++ b/comp/observer/impl/patterns/token.go @@ -148,11 +148,34 @@ func NumericValueToken(text string) Token { return Token{Type: TypeNumericValue, Value: text} } +// specialCharStrings interns single-byte strings so SpecialCharToken — called +// once per non-token byte during tokenization — does not allocate. +var specialCharStrings = func() [256]string { + var arr [256]string + for i := range arr { + arr[i] = string([]byte{byte(i)}) + } + return arr +}() + func SpecialCharToken(ch byte) Token { - return Token{Type: TypeSpecialCharacter, Value: string(ch)} + return Token{Type: TypeSpecialCharacter, Value: specialCharStrings[ch]} } +// whitespacePool holds shared "space-only" strings for common whitespace runs. +// Avoids per-call allocations from strings.Repeat for typical small runs. +var whitespacePool = func() [33]string { + var arr [33]string + for i := range arr { + arr[i] = strings.Repeat(" ", i) + } + return arr +}() + func WhitespaceToken(count int) Token { + if count >= 0 && count < len(whitespacePool) { + return Token{Type: TypeWhitespace, Value: whitespacePool[count]} + } return Token{Type: TypeWhitespace, Value: strings.Repeat(" ", count)} } @@ -169,71 +192,132 @@ func IPv4Token(text string, _, _, _, _ int) Token { } func PathToken(segments []string) Token { + return pathTokenRaw("", segments) +} + +func pathTokenRaw(value string, segments []string) Token { + if value == "" { + var b strings.Builder + b.WriteByte('/') + for i, seg := range segments { + if i > 0 { + b.WriteByte('/') + } + b.WriteString(seg) + } + value = b.String() + } return Token{ Type: TypeAbsolutePath, - Value: "/" + strings.Join(segments, "/"), + Value: value, extra: &tokenExtra{Segments: segments}, } } func PathQueryFragmentToken(segments []string, query, fragment *string) Token { - v := "/" + strings.Join(segments, "/") - if query != nil { - v += "?" + *query - } - if fragment != nil { - v += "#" + *fragment + return pathQueryFragmentTokenRaw("", segments, query, fragment) +} + +// pathQueryFragmentTokenRaw is like PathQueryFragmentToken but takes a +// pre-built Value. The tokenizer always already has the matched substring at +// hand, so it can skip the strings.Join + concat the public constructor would +// do (the caller used to overwrite Value anyway). +func pathQueryFragmentTokenRaw(value string, segments []string, query, fragment *string) Token { + if value == "" { + var b strings.Builder + b.WriteByte('/') + for i, seg := range segments { + if i > 0 { + b.WriteByte('/') + } + b.WriteString(seg) + } + if query != nil { + b.WriteByte('?') + b.WriteString(*query) + } + if fragment != nil { + b.WriteByte('#') + b.WriteString(*fragment) + } + value = b.String() } return Token{ Type: TypePathQueryFragment, - Value: v, + Value: value, extra: &tokenExtra{Segments: segments, Query: query, Fragment: fragment}, } } func URIToken(scheme string, authority *Token, path *Token, query, fragment *string) Token { - v := scheme + "://" - if authority != nil { - v += authority.Value - } - if path != nil { - v += path.Value - } - if query != nil { - v += "?" + *query - } - if fragment != nil { - v += "#" + *fragment + return uriTokenRaw("", scheme, authority, path, query, fragment) +} + +func uriTokenRaw(value string, scheme string, authority *Token, path *Token, query, fragment *string) Token { + if value == "" { + var b strings.Builder + b.WriteString(scheme) + b.WriteString("://") + if authority != nil { + b.WriteString(authority.Value) + } + if path != nil { + b.WriteString(path.Value) + } + if query != nil { + b.WriteByte('?') + b.WriteString(*query) + } + if fragment != nil { + b.WriteByte('#') + b.WriteString(*fragment) + } + value = b.String() } return Token{ Type: TypeURI, - Value: v, + Value: value, extra: &tokenExtra{Scheme: scheme, Authority: authority, Path: path, Query: query, Fragment: fragment}, } } func AuthorityToken(host *Token, port int, hasPort bool, userInfo string, hasUser bool) Token { - v := "" - if hasUser { - v += userInfo + "@" - } - if host != nil { - v += host.Value - } - if hasPort { - v += fmt.Sprintf(":%d", port) + return authorityTokenRaw("", host, port, hasPort, userInfo, hasUser) +} + +func authorityTokenRaw(value string, host *Token, port int, hasPort bool, userInfo string, hasUser bool) Token { + if value == "" { + var b strings.Builder + if hasUser { + b.WriteString(userInfo) + b.WriteByte('@') + } + if host != nil { + b.WriteString(host.Value) + } + if hasPort { + fmt.Fprintf(&b, ":%d", port) + } + value = b.String() } return Token{ Type: TypeAuthority, - Value: v, + Value: value, extra: &tokenExtra{Host: host, Port: port, HasPort: hasPort, UserInfo: userInfo, HasUser: hasUser}, } } func EmailToken(localPart, domain string) Token { + return emailTokenRaw("", localPart, domain) +} + +func emailTokenRaw(value, localPart, domain string) Token { + if value == "" { + value = localPart + "@" + domain + } return Token{ Type: TypeEmailAddress, - Value: localPart + "@" + domain, + Value: value, extra: &tokenExtra{LocalPart: localPart, Domain: domain}, } } diff --git a/comp/observer/impl/patterns/tokenizer.go b/comp/observer/impl/patterns/tokenizer.go index 605c46557fcb..b53f7c6cbe0f 100644 --- a/comp/observer/impl/patterns/tokenizer.go +++ b/comp/observer/impl/patterns/tokenizer.go @@ -8,7 +8,6 @@ package patterns import ( "fmt" "os" - "regexp" "strconv" "strings" "time" @@ -24,6 +23,12 @@ type Tokenizer struct { MaxStringLen int MaxTokens int ParseHexDump bool + + // scratch is a reusable working buffer for Tokenize. The returned slice is + // always a fresh exact-sized copy, so callers can retain it safely; the + // scratch only amortizes growth across calls. A single Tokenizer is not + // safe for concurrent use. + scratch []Token } func NewTokenizer() *Tokenizer { @@ -43,7 +48,16 @@ func (t *Tokenizer) Tokenize(message string) []Token { return nil } - var tokens []Token + // Most logs produce ~len(msg)/4 tokens; size scratch so the inner appends + // rarely grow it. Not a hard cap — it can still grow. + estTokens := len(msg)/4 + 8 + if estTokens > t.MaxTokens { + estTokens = t.MaxTokens + } + if cap(t.scratch) < estTokens { + t.scratch = make([]Token, 0, estTokens) + } + tokens := t.scratch[:0] pos := 0 for pos < len(msg) && len(tokens) < t.MaxTokens { @@ -57,78 +71,166 @@ func (t *Tokenizer) Tokenize(message string) []Token { pos += advance } - return tokens + // Stash the (possibly grown) buffer for the next call. + t.scratch = tokens + // Return a fresh exact-sized slice so callers can safely retain it. + out := make([]Token, len(tokens)) + copy(out, tokens) + return out } func (t *Tokenizer) matchAt(msg string, pos int) (Token, int) { s := msg[pos:] + c := s[0] + cc := charClass[c] + + switch { + case cc&ccDigit != 0: + // Hex-dump heads start with a hex digit; ISO dates start with 4 digits; + // CLF dates / local times / IPv4 all start with digits. + if t.ParseHexDump { + if tok, n := tryHexDump(s, msg, pos); n > 0 { + return tok, n + } + } + if tok, n := tryOffsetDateTime(s); n > 0 { + return tok, n + } + if tok, n := tryLocalDateTime(s); n > 0 { + return tok, n + } + if tok, n := tryCLFDateTime(s); n > 0 { + return tok, n + } + if tok, n := tryLocalDate(s); n > 0 { + return tok, n + } + if tok, n := tryCLFDate(s); n > 0 { + return tok, n + } + if tok, n := tryLocalTime(s); n > 0 { + return tok, n + } + // Email local-parts can start with a digit (e.g. 123@example.com) + // or even look IPv4-shaped (e.g. 192.168.1.1@example.com); try + // before tryIPv4Authority so the full address tokenizes as one email. + if tok, n := tryEmail(s); n > 0 { + return tok, n + } + if tok, n := tryIPv4Authority(s, msg, pos); n > 0 { + return tok, n + } + if tok, n := tryAlphanumeric(s); n > 0 { + return tok, n + } - if t.ParseHexDump { - if tok, n := tryHexDump(s, msg, pos); n > 0 { + case cc&ccAlpha != 0: + // Hex-dump head can also start with a-f / A-F. + if t.ParseHexDump && cc&ccHexAlpha != 0 { + if tok, n := tryHexDump(s, msg, pos); n > 0 { + return tok, n + } + } + // URI scheme is one of "http", "https", "ftp" — only h/f can start one. + if c == 'h' || c == 'f' { + if tok, n := tryURI(s); n > 0 { + return tok, n + } + } + if tok, n := tryEmail(s); n > 0 { + return tok, n + } + if tok, n := tryAlphanumeric(s); n > 0 { return tok, n } - } - if tok, n := tryOffsetDateTime(s); n > 0 { - return tok, n - } - if tok, n := tryLocalDateTime(s); n > 0 { - return tok, n - } - if tok, n := tryCLFDateTime(s); n > 0 { - return tok, n - } - if tok, n := tryLocalDate(s); n > 0 { - return tok, n - } - if tok, n := tryCLFDate(s); n > 0 { - return tok, n - } - if tok, n := tryLocalTime(s); n > 0 { - return tok, n - } + case cc&ccSpaceTab != 0: + if tok, n := tryWhitespace(s); n > 0 { + return tok, n + } - if tok, n := tryURI(s); n > 0 { - return tok, n - } + case cc&ccUnder != 0: + // Underscore is a valid email local-part start (e.g. _svc@example.com). + if tok, n := tryEmail(s); n > 0 { + return tok, n + } + // Underscore is alphanumeric-extended; falls into tryWordOrKeyword. + if tok, n := tryAlphanumeric(s); n > 0 { + return tok, n + } - if tok, n := tryEmail(s); n > 0 { - return tok, n + default: + switch c { + case '/': + if tok, n := tryPath(s, msg, pos); n > 0 { + return tok, n + } + case '-': + // Negative number iff next byte is a digit. + if tok, n := tryAlphanumeric(s); n > 0 { + return tok, n + } + } } - if tok, n := tryIPv4Authority(s, msg, pos); n > 0 { - return tok, n - } + return SpecialCharToken(c), 1 +} - if tok, n := tryPath(s, msg, pos); n > 0 { - return tok, n - } +// ---- Hex dump ---- - if tok, n := tryWhitespace(s); n > 0 { - return tok, n +// scanHexDumpHead matches `^[0-9A-Fa-f]{4,8}:[ \t]+(?:[0-9A-Fa-f]{2}[ \t]+)*[0-9A-Fa-f]{2}`, +// equivalent to the original hexDumpRe. Returns the displacement length, the +// number of hex byte groups parsed, and the total bytes consumed. +func scanHexDumpHead(s string) (dispLen, numBytes, n int, ok bool) { + // Displacement: 4-8 hex digits. + i := 0 + for i < len(s) && i < 8 && isHexByte(s[i]) { + i++ } - - if tok, n := tryAlphanumeric(s); n > 0 { - return tok, n + if i < 4 { + return } - - return SpecialCharToken(s[0]), 1 + dispLen = i + if i >= len(s) || s[i] != ':' { + return + } + i++ + wsStart := i + for i < len(s) && (s[i] == ' ' || s[i] == '\t') { + i++ + } + if i == wsStart { + return + } + // At least one hex byte (2 hex digits). + if i+2 > len(s) || !isHexByte(s[i]) || !isHexByte(s[i+1]) { + return + } + i += 2 + numBytes = 1 + // Subsequent (whitespace + 2 hex digits) groups. + for { + j := i + for j < len(s) && (s[j] == ' ' || s[j] == '\t') { + j++ + } + if j == i { + break + } + if j+2 > len(s) || !isHexByte(s[j]) || !isHexByte(s[j+1]) { + break + } + i = j + 2 + numBytes++ + } + return dispLen, numBytes, i, true } -// ---- Hex dump ---- - -var hexDumpRe = regexp.MustCompile(`^([0-9A-Fa-f]{4,8}):\s+((?:[0-9A-Fa-f]{2}\s+)*[0-9A-Fa-f]{2})`) - func tryHexDump(s, fullMsg string, pos int) (Token, int) { - m := hexDumpRe.FindStringSubmatch(s) - if m == nil { + dispLen, numBytes, totalMatch, ok := scanHexDumpHead(s) + if !ok { return Token{}, 0 } - displacement := m[1] - dispLen := len(displacement) - hexPart := strings.TrimRight(m[2], " \t") - byteParts := strings.Fields(hexPart) - totalMatch := len(m[0]) // Skip trailing whitespace after last hex byte trailingWS := totalMatch @@ -148,7 +250,7 @@ func tryHexDump(s, fullMsg string, pos int) (Token, int) { } else { candidate = rest[:nextSpace] } - if len(candidate) > 0 && len(candidate) <= len(byteParts) { + if len(candidate) > 0 && len(candidate) <= numBytes { allPrintable := true for _, c := range candidate { if c < 0x20 || c > 0x7e { @@ -182,233 +284,364 @@ func tryHexDump(s, fullMsg string, pos int) (Token, int) { return HexDumpToken(text, dispLen, hasASCII), endPos } -// ---- Date/Time patterns ---- +// ---- Date/Time scanners ---- +// +// Hand-coded byte scanners replacing the original regex matchers. Each scanner +// is a tiny prefix-match against a known shape; on a hit it returns enough +// information for the caller to build the same DateToken/LocalTimeToken value +// (raw text + format string) the regex version would have produced. + +// readDigits reads up to maxN consecutive ASCII digits at s[i:i+maxN]. +// Returns the parsed integer value and the count of digits read. +func readDigits(s string, i, maxN int) (val, n int) { + end := i + maxN + if end > len(s) { + end = len(s) + } + for j := i; j < end && isDigit(s[j]); j++ { + val = val*10 + int(s[j]-'0') + n++ + } + return val, n +} -var reOffsetDateTime = regexp.MustCompile( - `^(\d{4})([-/])(\d{2})[-/](\d{2})([T ])(\d{2}):(\d{2}):(\d{2})` + - `(?:\.(\d{1,9}))?` + - `(Z|[+-]\d{2}:?\d{2})`) +// scanISODate matches `\d{4}[-/]\d{2}[-/]\d{2}` with both separators equal. +// Returns the chosen separator, the parsed year/month/day, and the number of +// bytes consumed (always 10 on success). +func scanISODate(s string) (sep byte, year, month, day, n int, ok bool) { + if len(s) < 10 { + return + } + var c int + if year, c = readDigits(s, 0, 4); c != 4 { + return + } + sep = s[4] + if sep != '-' && sep != '/' { + return + } + if month, c = readDigits(s, 5, 2); c != 2 || s[7] != sep { + return + } + if day, c = readDigits(s, 8, 2); c != 2 { + return + } + return sep, year, month, day, 10, true +} -var reLocalDateTime = regexp.MustCompile( - `^(\d{4})([-/])(\d{2})[-/](\d{2})([T ])(\d{2}):(\d{2}):(\d{2})` + - `(?:\.(\d{1,9}))?`) +// scanTime matches `\d{2}:\d{2}:\d{2}(?:\.\d{1,9})?` at s[i:]. Returns the +// parsed hour/min/sec, the fractional-digits substring (zero-length when +// absent), and total bytes consumed from i. +func scanTime(s string, i int) (hour, min, sec int, frac string, n int, ok bool) { + if i+8 > len(s) { + return + } + var c int + if hour, c = readDigits(s, i, 2); c != 2 || s[i+2] != ':' { + return + } + if min, c = readDigits(s, i+3, 2); c != 2 || s[i+5] != ':' { + return + } + if sec, c = readDigits(s, i+6, 2); c != 2 { + return + } + end := i + 8 + if end < len(s) && s[end] == '.' { + if _, fc := readDigits(s, end+1, 9); fc >= 1 { + frac = s[end+1 : end+1+fc] + end += 1 + fc + } + } + return hour, min, sec, frac, end - i, true +} -var reLocalDate = regexp.MustCompile( - `^(\d{4})([-/])(\d{2})[-/](\d{2})`) +// scanISOTZ matches `Z | [+-]\d{2}:?\d{2}` at s[i:]. +func scanISOTZ(s string, i int) (tz string, n int, ok bool) { + if i >= len(s) { + return + } + if s[i] == 'Z' { + return "Z", 1, true + } + if s[i] != '+' && s[i] != '-' { + return + } + if i+5 > len(s) || !isDigit(s[i+1]) || !isDigit(s[i+2]) { + return + } + j := i + 3 + if s[j] == ':' { + j++ + } + if j+2 > len(s) || !isDigit(s[j]) || !isDigit(s[j+1]) { + return + } + end := j + 2 + return s[i:end], end - i, true +} + +// scanCLFTZ matches `\s+[+-]\d{4}` at s[i:]. +func scanCLFTZ(s string, i int) (tz string, n int, ok bool) { + j := i + for j < len(s) && (s[j] == ' ' || s[j] == '\t') { + j++ + } + if j == i { + return + } + if j+5 > len(s) { + return + } + if s[j] != '+' && s[j] != '-' { + return + } + for k := 1; k <= 4; k++ { + if !isDigit(s[j+k]) { + return + } + } + end := j + 5 + return s[j:end], end - i, true +} -var reCLFDateTime = regexp.MustCompile( - `^(\d{2})/([A-Za-z]{3})/(\d{4})([: ])(\d{2}):(\d{2}):(\d{2})` + - `(?:\.(\d{1,9}))?` + - `(?:\s+([+-]\d{4}))?`) +// appendISODateFormat writes "yyyy{sep}MM{sep}ddT{HH:mm:ss}" or its space-T +// variant into b. tSep is 'T' or ' '. The original code went through `q()` and +// `cleanDateFormat` to strip quote markers; we just build the cleaned form. +func appendISODateFormat(b *strings.Builder, sep, tSep byte) { + b.WriteString("yyyy") + b.WriteByte(sep) + b.WriteString("MM") + b.WriteByte(sep) + b.WriteString("dd") + b.WriteByte(tSep) + b.WriteString("HH:mm:ss") +} -var reCLFDate = regexp.MustCompile( - `^(\d{2})/([A-Za-z]{3})/(\d{4})`) +func appendCLFDateFormat(b *strings.Builder, tSep byte) { + b.WriteString("dd/MMM/yyyy") + b.WriteByte(tSep) + b.WriteString("HH:mm:ss") +} -var reLocalTime = regexp.MustCompile( - `^(\d{2}):(\d{2}):(\d{2})(?:\.(\d{1,9}))?`) +func appendFrac(b *strings.Builder, frac string) { + if frac == "" { + return + } + b.WriteByte('.') + for range frac { + b.WriteByte('S') + } +} func tryOffsetDateTime(s string) (Token, int) { - m := reOffsetDateTime.FindStringSubmatch(s) - if m == nil { + sep, year, month, day, dn, ok := scanISODate(s) + if !ok { return Token{}, 0 } - year, _ := strconv.Atoi(m[1]) - sep := m[2] - month, _ := strconv.Atoi(m[3]) - day, _ := strconv.Atoi(m[4]) - tSep := m[5] - hour, _ := strconv.Atoi(m[6]) - min, _ := strconv.Atoi(m[7]) - sec, _ := strconv.Atoi(m[8]) - fracStr := m[9] - tzStr := m[10] - - // Verify both date separators match - secondSep := m[0][len(m[1])+len(m[2])+len(m[3]) : len(m[1])+len(m[2])+len(m[3])+1] - if sep != secondSep { + if dn >= len(s) { return Token{}, 0 } - - if !validateDate(year, month, day) || hour > 23 || min > 59 || sec > 59 { + tSep := s[dn] + if tSep != 'T' && tSep != ' ' { return Token{}, 0 } - - if isFollowedByAlphanumeric(s, len(m[0])) { + hour, min, sec, frac, tn, ok := scanTime(s, dn+1) + if !ok { return Token{}, 0 } - - format := "yyyy" + q(sep) + "MM" + q(sep) + "dd" + q(tSep) + "HH" + q(":") + "mm" + q(":") + "ss" - if fracStr != "" { - format += q(".") + fracFormat(fracStr) + end := dn + 1 + tn + tzStr, tzN, ok := scanISOTZ(s, end) + if !ok { + return Token{}, 0 + } + end += tzN + if !validateDate(year, month, day) || hour > 23 || min > 59 || sec > 59 { + return Token{}, 0 + } + if isFollowedByAlphanumeric(s, end) { + return Token{}, 0 } - format += tzFormat(tzStr) - sigFmt := cleanDateFormat(format) - return DateToken(sigFmt, m[0]), len(m[0]) + var b strings.Builder + b.Grow(32) + appendISODateFormat(&b, sep, tSep) + appendFrac(&b, frac) + b.WriteString(tzFormat(tzStr)) + return DateToken(b.String(), s[:end]), end } func tryLocalDateTime(s string) (Token, int) { - m := reLocalDateTime.FindStringSubmatch(s) - if m == nil { + sep, year, month, day, dn, ok := scanISODate(s) + if !ok { return Token{}, 0 } - year, _ := strconv.Atoi(m[1]) - sep := m[2] - month, _ := strconv.Atoi(m[3]) - day, _ := strconv.Atoi(m[4]) - tSep := m[5] - hour, _ := strconv.Atoi(m[6]) - min, _ := strconv.Atoi(m[7]) - sec, _ := strconv.Atoi(m[8]) - fracStr := m[9] - - secondSep := m[0][len(m[1])+len(m[2])+len(m[3]) : len(m[1])+len(m[2])+len(m[3])+1] - if sep != secondSep { + if dn >= len(s) { return Token{}, 0 } - + tSep := s[dn] + if tSep != 'T' && tSep != ' ' { + return Token{}, 0 + } + hour, min, sec, frac, tn, ok := scanTime(s, dn+1) + if !ok { + return Token{}, 0 + } + end := dn + 1 + tn if !validateDate(year, month, day) || hour > 23 || min > 59 || sec > 59 { return Token{}, 0 } - - if isFollowedByAlphanumeric(s, len(m[0])) { + if isFollowedByAlphanumeric(s, end) { return Token{}, 0 } - format := "yyyy" + q(sep) + "MM" + q(sep) + "dd" + q(tSep) + "HH" + q(":") + "mm" + q(":") + "ss" - if fracStr != "" { - format += q(".") + fracFormat(fracStr) - } + var b strings.Builder + b.Grow(24) + appendISODateFormat(&b, sep, tSep) + appendFrac(&b, frac) + return DateToken(b.String(), s[:end]), end +} - sigFmt := cleanDateFormat(format) - return DateToken(sigFmt, m[0]), len(m[0]) +// scanCLFDate matches `\d{2}/[A-Za-z]{3}/\d{4}`. Returns parsed day, month +// (1-12), year, and bytes consumed. +func scanCLFDate(s string) (day, month, year, n int, ok bool) { + if len(s) < 11 { + return + } + var c int + if day, c = readDigits(s, 0, 2); c != 2 || s[2] != '/' { + return + } + if !isAlpha(s[3]) || !isAlpha(s[4]) || !isAlpha(s[5]) || s[6] != '/' { + return + } + m := parseMonthAbbr(s[3:6]) + if m == 0 { + return + } + if year, c = readDigits(s, 7, 4); c != 4 { + return + } + return day, int(m), year, 11, true } func tryCLFDateTime(s string) (Token, int) { - m := reCLFDateTime.FindStringSubmatch(s) - if m == nil { + day, month, year, dn, ok := scanCLFDate(s) + if !ok { return Token{}, 0 } - day, _ := strconv.Atoi(m[1]) - monthStr := m[2] - year, _ := strconv.Atoi(m[3]) - tSep := m[4] - hour, _ := strconv.Atoi(m[5]) - min, _ := strconv.Atoi(m[6]) - sec, _ := strconv.Atoi(m[7]) - fracStr := m[8] - tzStr := m[9] - - month := parseMonthAbbr(monthStr) - if month == 0 || !validateDate(year, int(month), day) || hour > 23 || min > 59 || sec > 59 { + if dn >= len(s) { return Token{}, 0 } - - if isFollowedByAlphanumeric(s, len(m[0])) { + tSep := s[dn] + if tSep != ':' && tSep != ' ' { return Token{}, 0 } - - format := "dd" + q("/") + "MMM" + q("/") + "yyyy" + q(tSep) + "HH" + q(":") + "mm" + q(":") + "ss" - if fracStr != "" { - format += q(".") + fracFormat(fracStr) + hour, min, sec, frac, tn, ok := scanTime(s, dn+1) + if !ok { + return Token{}, 0 } - - sigFmt := cleanDateFormat(format) - - if tzStr != "" { - sigFmt += " " + tzSigFormat(tzStr) - return DateToken(sigFmt, m[0]), len(m[0]) + end := dn + 1 + tn + tzStr, tzN, hasTZ := scanCLFTZ(s, end) + if hasTZ { + end += tzN + } + if !validateDate(year, month, day) || hour > 23 || min > 59 || sec > 59 { + return Token{}, 0 + } + if isFollowedByAlphanumeric(s, end) { + return Token{}, 0 } - return DateToken(sigFmt, m[0]), len(m[0]) + var b strings.Builder + b.Grow(32) + appendCLFDateFormat(&b, tSep) + appendFrac(&b, frac) + if hasTZ { + b.WriteByte(' ') + b.WriteString(tzSigFormat(tzStr)) + } + return DateToken(b.String(), s[:end]), end } func tryLocalDate(s string) (Token, int) { - m := reLocalDate.FindStringSubmatch(s) - if m == nil { + sep, year, month, day, dn, ok := scanISODate(s) + if !ok { return Token{}, 0 } - year, _ := strconv.Atoi(m[1]) - sep := m[2] - month, _ := strconv.Atoi(m[3]) - day, _ := strconv.Atoi(m[4]) - - secondSep := m[0][len(m[1])+len(m[2])+len(m[3]) : len(m[1])+len(m[2])+len(m[3])+1] - if sep != secondSep { - return Token{}, 0 - } - if !validateDate(year, month, day) { return Token{}, 0 } - - if isFollowedByAlphanumeric(s, len(m[0])) { + if isFollowedByAlphanumeric(s, dn) { return Token{}, 0 } - - format := "yyyy" + q(sep) + "MM" + q(sep) + "dd" - sigFmt := cleanDateFormat(format) - return DateToken(sigFmt, m[0]), len(m[0]) + var b strings.Builder + b.Grow(10) + b.WriteString("yyyy") + b.WriteByte(sep) + b.WriteString("MM") + b.WriteByte(sep) + b.WriteString("dd") + return DateToken(b.String(), s[:dn]), dn } func tryCLFDate(s string) (Token, int) { - m := reCLFDate.FindStringSubmatch(s) - if m == nil { + day, month, year, dn, ok := scanCLFDate(s) + if !ok { return Token{}, 0 } - day, _ := strconv.Atoi(m[1]) - monthStr := m[2] - year, _ := strconv.Atoi(m[3]) - - month := parseMonthAbbr(monthStr) - if month == 0 || !validateDate(year, int(month), day) { + if !validateDate(year, month, day) { return Token{}, 0 } - - if isFollowedByAlphanumeric(s, len(m[0])) { + if isFollowedByAlphanumeric(s, dn) { return Token{}, 0 } - - sigFmt := "dd/MMM/yyyy" - return DateToken(sigFmt, m[0]), len(m[0]) + return DateToken("dd/MMM/yyyy", s[:dn]), dn } func tryLocalTime(s string) (Token, int) { - m := reLocalTime.FindStringSubmatch(s) - if m == nil { + hour, min, sec, frac, n, ok := scanTime(s, 0) + if !ok { return Token{}, 0 } - hour, _ := strconv.Atoi(m[1]) - min, _ := strconv.Atoi(m[2]) - sec, _ := strconv.Atoi(m[3]) - fracStr := m[4] - if hour > 23 || min > 59 || sec > 59 { return Token{}, 0 } - - if isFollowedByAlphanumeric(s, len(m[0])) { + if isFollowedByAlphanumeric(s, n) { return Token{}, 0 } - - format := "HH:mm:ss" - if fracStr != "" { - format += "." + fracFormat(fracStr) + if frac == "" { + return LocalTimeToken("HH:mm:ss", s[:n]), n } - return LocalTimeToken(format, m[0]), len(m[0]) + var b strings.Builder + b.Grow(8 + 1 + len(frac)) + b.WriteString("HH:mm:ss") + appendFrac(&b, frac) + return LocalTimeToken(b.String(), s[:n]), n } // ---- URI ---- -var reURIScheme = regexp.MustCompile(`^(https?|ftp)://`) +// scanURIScheme matches `^(https?|ftp)://` and returns the scheme without "://" +// plus the total bytes consumed. +func scanURIScheme(s string) (scheme string, n int) { + switch { + case len(s) >= 8 && s[0] == 'h' && s[1] == 't' && s[2] == 't' && s[3] == 'p' && s[4] == 's' && s[5] == ':' && s[6] == '/' && s[7] == '/': + return "https", 8 + case len(s) >= 7 && s[0] == 'h' && s[1] == 't' && s[2] == 't' && s[3] == 'p' && s[4] == ':' && s[5] == '/' && s[6] == '/': + return "http", 7 + case len(s) >= 6 && s[0] == 'f' && s[1] == 't' && s[2] == 'p' && s[3] == ':' && s[4] == '/' && s[5] == '/': + return "ftp", 6 + } + return "", 0 +} func tryURI(s string) (Token, int) { - sm := reURIScheme.FindString(s) - if sm == "" { + scheme, schemeLen := scanURIScheme(s) + if schemeLen == 0 { return Token{}, 0 } - scheme := sm[:len(sm)-3] - rest := s[len(sm):] + rest := s[schemeLen:] authEnd := strings.IndexAny(rest, "/?# \t\n\r") var authStr string @@ -423,7 +656,7 @@ func tryURI(s string) (Token, int) { } auth := parseAuthority(authStr) - totalLen := len(sm) + authEnd + totalLen := schemeLen + authEnd afterAuth := rest[authEnd:] var pathTok *Token @@ -467,9 +700,7 @@ func tryURI(s string) (Token, int) { totalLen += fEnd + 1 } - tok := URIToken(scheme, &auth, pathTok, query, fragment) - tok.Value = s[:totalLen] - return tok, totalLen + return uriTokenRaw(s[:totalLen], scheme, &auth, pathTok, query, fragment), totalLen } func parseAuthority(s string) Token { @@ -495,7 +726,7 @@ func parseAuthority(s string) Token { } hostTok := parseHost(host) - return AuthorityToken(&hostTok, portVal, hasPort, userInfo, hasUser) + return authorityTokenRaw(s, &hostTok, portVal, hasPort, userInfo, hasUser) } func parseHost(s string) Token { @@ -510,36 +741,105 @@ func parseHost(s string) Token { // ---- Email ---- -var reEmail = regexp.MustCompile(`^([a-zA-Z0-9._%+()"\ -]+)\s*@\s*([a-zA-Z0-9.-]+\s*)`) +// isEmailLocalChar mirrors the original regex's local-part class +// `[a-zA-Z0-9._%+()"\ -]`. +func isEmailLocalChar(b byte) bool { + if isAlphaNum(b) { + return true + } + switch b { + case '.', '%', '+', '(', ')', '"', ' ', '-': + return true + } + return false +} + +// isEmailDomainChar mirrors `[a-zA-Z0-9.-]`. +func isEmailDomainChar(b byte) bool { + if isAlphaNum(b) { + return true + } + return b == '.' || b == '-' +} func tryEmail(s string) (Token, int) { - m := reEmail.FindStringSubmatch(s) - if m == nil { + // local-part: 1+ chars from the local-part class. + i := 0 + for i < len(s) && isEmailLocalChar(s[i]) { + i++ + } + if i == 0 { + return Token{}, 0 + } + local := s[:i] + // optional whitespace before '@'. + for i < len(s) && (s[i] == ' ' || s[i] == '\t') { + i++ + } + if i >= len(s) || s[i] != '@' { return Token{}, 0 } - return EmailToken(m[1], m[2]), len(m[0]) + i++ + for i < len(s) && (s[i] == ' ' || s[i] == '\t') { + i++ + } + domStart := i + for i < len(s) && isEmailDomainChar(s[i]) { + i++ + } + if i == domStart { + return Token{}, 0 + } + // Trailing whitespace is captured into the domain group by the original + // regex (`([a-zA-Z0-9.-]+\s*)`); preserve that. + domEnd := i + for i < len(s) && (s[i] == ' ' || s[i] == '\t') { + i++ + } + domain := s[domStart:i] + _ = domEnd + return emailTokenRaw(s[:i], local, domain), i } // ---- IPv4 with optional port (standalone Authority) ---- -var reIPv4WithPort = regexp.MustCompile(`^(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})(?::(\d+))?`) - -func tryIPv4Authority(s, _ string, _ int) (Token, int) { - m := reIPv4WithPort.FindStringSubmatch(s) - if m == nil { - return Token{}, 0 +// scanIPv4 matches `\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}(?::(\d+))?`. Returns +// total bytes consumed, port (0 if absent), whether a port was matched, and +// the start index of the port digits in s (used by the caller to recover the +// canonical port string). +func scanIPv4(s string) (n, port int, hasPort, ok bool) { + idx := 0 + for octet := 0; octet < 4; octet++ { + val, c := readDigits(s, idx, 3) + if c == 0 || val > 255 { + return + } + idx += c + if octet < 3 { + if idx >= len(s) || s[idx] != '.' { + return + } + idx++ + } + } + totalLen := idx + if idx < len(s) && s[idx] == ':' { + val, c := readDigits(s, idx+1, 10) + if c > 0 { + port = val + hasPort = true + idx += 1 + c + } } + return idx, port, hasPort, totalLen > 0 +} - a, _ := strconv.Atoi(m[1]) - b, _ := strconv.Atoi(m[2]) - c, _ := strconv.Atoi(m[3]) - d, _ := strconv.Atoi(m[4]) - if a > 255 || b > 255 || c > 255 || d > 255 { +func tryIPv4Authority(s, _ string, _ int) (Token, int) { + totalLen, portVal, hasPort, ok := scanIPv4(s) + if !ok { return Token{}, 0 } - totalLen := len(m[0]) - // If followed by "/" it's part of a path, not a standalone IP if totalLen < len(s) && s[totalLen] == '/' { return Token{}, 0 @@ -550,19 +850,18 @@ func tryIPv4Authority(s, _ string, _ int) (Token, int) { return Token{}, 0 } - ipStr := m[1] + "." + m[2] + "." + m[3] + "." + m[4] - ipTok := Token{Type: TypeIPv4Address, Value: ipStr} - - portVal := 0 - hasPort := false - if m[5] != "" { - portVal, _ = strconv.Atoi(m[5]) - hasPort = true + // IP string is the input slice up to (but not including) any ':port' suffix. + ipEnd := totalLen + if hasPort { + // Walk back to the colon. + for ipEnd > 0 && s[ipEnd-1] != ':' { + ipEnd-- + } + ipEnd-- // drop the ':' } + ipTok := Token{Type: TypeIPv4Address, Value: s[:ipEnd]} - auth := AuthorityToken(&ipTok, portVal, hasPort, "", false) - auth.Value = m[0] - return auth, totalLen + return authorityTokenRaw(s[:totalLen], &ipTok, portVal, hasPort, "", false), totalLen } // ---- Path ---- @@ -620,25 +919,15 @@ func tryPath(s, fullMsg string, pos int) (Token, int) { segments := strings.Split(pathStr, "/") - tok := PathQueryFragmentToken(segments, query, fragment) - tok.Value = s[:totalLen] - return tok, totalLen + return pathQueryFragmentTokenRaw(s[:totalLen], segments, query, fragment), totalLen } func looksLikeIPAfterSlash(s string) bool { - m := reIPv4WithPort.FindStringSubmatch(s) - if m == nil { - return false - } - a, _ := strconv.Atoi(m[1]) - b, _ := strconv.Atoi(m[2]) - c, _ := strconv.Atoi(m[3]) - d, _ := strconv.Atoi(m[4]) - if a > 255 || b > 255 || c > 255 || d > 255 { + matchLen, _, _, ok := scanIPv4(s) + if !ok { return false } // Only treat as IP if not followed by more path segments - matchLen := len(m[0]) if matchLen < len(s) && s[matchLen] == '/' { return false } @@ -763,18 +1052,21 @@ func tryWordStartingWithDigits(s string) (Token, int) { return WordToken(text), i } -var httpMethods = map[string]bool{ - "GET": true, "POST": true, "PUT": true, "DELETE": true, - "PATCH": true, "HEAD": true, "OPTIONS": true, "CONNECT": true, "TRACE": true, +func isHTTPMethod(s string) bool { + switch s { + case "GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS", "CONNECT", "TRACE": + return true + } + return false } -var severityKeywords = map[string]bool{ - "INFO": true, "WARN": true, "WARNING": true, - "ERROR": true, "ERR": true, - "DEBUG": true, "TRACE": true, - "FATAL": true, "CRITICAL": true, - "ALERT": true, "EMERGENCY": true, "EMERG": true, - "NOTICE": true, +func isSeverityKeyword(s string) bool { + switch s { + case "INFO", "WARN", "WARNING", "ERROR", "ERR", "DEBUG", "TRACE", + "FATAL", "CRITICAL", "ALERT", "EMERGENCY", "EMERG", "NOTICE": + return true + } + return false } func tryWordOrKeyword(s string) (Token, int) { @@ -785,10 +1077,10 @@ func tryWordOrKeyword(s string) (Token, int) { text := s[:i] - if httpMethods[text] { + if isHTTPMethod(text) { return HTTPMethodToken(text), i } - if severityKeywords[text] { + if isSeverityKeyword(text) { return SeverityToken(text), i } @@ -806,42 +1098,36 @@ func validateDate(year, month, day int) bool { } func parseMonthAbbr(s string) time.Month { - months := map[string]time.Month{ - "Jan": time.January, "Feb": time.February, "Mar": time.March, - "Apr": time.April, "May": time.May, "Jun": time.June, - "Jul": time.July, "Aug": time.August, "Sep": time.September, - "Oct": time.October, "Nov": time.November, "Dec": time.December, - } - return months[s] -} - -func q(s string) string { - return "'" + s + "'" -} - -func fracFormat(frac string) string { - switch len(frac) { - case 1: - return "S" - case 2: - return "SS" - case 3: - return "SSS" - case 4: - return "SSSS" - case 5: - return "SSSSS" - case 6: - return "SSSSSS" - case 7: - return "SSSSSSS" - case 8: - return "SSSSSSSS" - case 9: - return "SSSSSSSSS" - default: - return strings.Repeat("S", len(frac)) - } + if len(s) != 3 { + return 0 + } + switch s { + case "Jan": + return time.January + case "Feb": + return time.February + case "Mar": + return time.March + case "Apr": + return time.April + case "May": + return time.May + case "Jun": + return time.June + case "Jul": + return time.July + case "Aug": + return time.August + case "Sep": + return time.September + case "Oct": + return time.October + case "Nov": + return time.November + case "Dec": + return time.December + } + return 0 } func tzFormat(tz string) string { @@ -861,19 +1147,57 @@ func tzSigFormat(tz string) string { return "xx" } -func cleanDateFormat(format string) string { - return strings.ReplaceAll(format, "'", "") -} - // ---- Character classification ---- +// +// charClass is a 256-byte lookup table with bit flags for the char tests we +// run hottest. A single table read is faster than the per-byte comparison +// chains we previously had, and the compiler can lift the global into a +// register-friendly indexed load. -func isDigit(b byte) bool { return b >= '0' && b <= '9' } -func isAlpha(b byte) bool { return (b >= 'a' && b <= 'z') || (b >= 'A' && b <= 'Z') } -func isAlphaNum(b byte) bool { return isAlpha(b) || isDigit(b) || b == '_' } -func isWhitespace(b byte) bool { return b == ' ' || b == '\t' || b == '\n' || b == '\r' } +const ( + ccDigit byte = 1 << 0 // 0-9 + ccAlpha byte = 1 << 1 // a-z, A-Z + ccUnder byte = 1 << 2 // _ + ccDash byte = 1 << 3 // - + ccHexAlpha byte = 1 << 4 // a-f, A-F (combine with ccDigit for hex) + ccSpaceTab byte = 1 << 5 // ' ' | '\t' + ccLineEnd byte = 1 << 6 // '\n' | '\r' +) + +var charClass = func() [256]byte { + var t [256]byte + for c := byte('0'); c <= '9'; c++ { + t[c] |= ccDigit + } + for c := byte('a'); c <= 'z'; c++ { + t[c] |= ccAlpha + } + for c := byte('A'); c <= 'Z'; c++ { + t[c] |= ccAlpha + } + for c := byte('a'); c <= 'f'; c++ { + t[c] |= ccHexAlpha + } + for c := byte('A'); c <= 'F'; c++ { + t[c] |= ccHexAlpha + } + t['_'] |= ccUnder + t['-'] |= ccDash + t[' '] |= ccSpaceTab + t['\t'] |= ccSpaceTab + t['\n'] |= ccLineEnd + t['\r'] |= ccLineEnd + return t +}() + +func isDigit(b byte) bool { return charClass[b]&ccDigit != 0 } +func isAlpha(b byte) bool { return charClass[b]&ccAlpha != 0 } +func isAlphaNum(b byte) bool { return charClass[b]&(ccDigit|ccAlpha|ccUnder) != 0 } +func isWhitespace(b byte) bool { return charClass[b]&(ccSpaceTab|ccLineEnd) != 0 } +func isHexByte(b byte) bool { return charClass[b]&(ccDigit|ccHexAlpha) != 0 } func isWordChar(b byte) bool { - return isAlpha(b) || isDigit(b) || b == '_' || b == '-' + return charClass[b]&(ccDigit|ccAlpha|ccUnder|ccDash) != 0 } func isIPv4(s string) bool { @@ -890,20 +1214,20 @@ func isIPv4(s string) bool { return true } -var validHTTPStatusCodes = map[int]bool{ - 100: true, 101: true, - 200: true, 201: true, 202: true, 203: true, 204: true, 206: true, 207: true, - 300: true, 301: true, 302: true, 303: true, 304: true, 307: true, 308: true, - 400: true, 401: true, 402: true, 403: true, 404: true, 405: true, - 406: true, 407: true, 408: true, 409: true, 410: true, 411: true, - 413: true, 414: true, 415: true, 416: true, 422: true, 425: true, - 426: true, 429: true, 431: true, 451: true, - 500: true, 501: true, 502: true, 503: true, 504: true, 505: true, - 507: true, 508: true, 510: true, 511: true, -} - func isValidHTTPStatus(code int) bool { - return validHTTPStatusCodes[code] + switch code { + case 100, 101, + 200, 201, 202, 203, 204, 206, 207, + 300, 301, 302, 303, 304, 307, 308, + 400, 401, 402, 403, 404, 405, + 406, 407, 408, 409, 410, 411, + 413, 414, 415, 416, 422, 425, + 426, 429, 431, 451, + 500, 501, 502, 503, 504, 505, + 507, 508, 510, 511: + return true + } + return false } func isFollowedByAlphanumeric(s string, pos int) bool { diff --git a/comp/observer/impl/patterns/tokenizer_bench_test.go b/comp/observer/impl/patterns/tokenizer_bench_test.go new file mode 100644 index 000000000000..f0f9cfe2dbb9 --- /dev/null +++ b/comp/observer/impl/patterns/tokenizer_bench_test.go @@ -0,0 +1,75 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +package patterns + +import ( + "fmt" + "testing" +) + +var benchLines = []string{ + `{"msg":"log from series 12","level":"info"}`, + `{"@timestamp":"2024-01-01T00:00:00Z","message":"evt-7","severity":"WARN","svc":"api"}`, + `{"trace_id":"deadbeef","span_id":"cafebabe","msg":"child span","ok":true}`, + `{"nested":{"user":42,"shard":3},"event":"login","ip":"10.0.0.42"}`, + `level=INFO ts=1704067200 series=99 msg="request done" duration_ms=12`, + `level=ERROR logger=com.example req=51 stack=java.lang.Exception`, + `[2024-01-15 14:30:00] INFO worker-3 task=flush completed=true`, + `<134>1 2024-01-15T14:30:00Z host app-7 - - - msg="syslog style"`, + `10.1.2.3 - - [15/Jan/2024:14:30:00 +0000] "GET /api/v3/items HTTP/1.1" 200 4321`, + `time="2024-01-15T14:30:00Z" level=debug msg="slow query" series=22 ms=450`, + `ERROR: connection reset by peer series=8 errno=104`, + `kafka: topic=logs partition=4 offset=999 key=null`, + `[pid=12345] series=11 action=gc pause_ms=3`, + `{"http":{"method":"POST","path":"/hook/3","status":201}}`, + `plain text line series=5 no json here`, +} + +func BenchmarkTokenize(b *testing.B) { + t := NewTokenizer() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, line := range benchLines { + _ = t.Tokenize(line) + } + } +} + +func BenchmarkTokenizePerShape(b *testing.B) { + for idx, line := range benchLines { + line := line + b.Run(fmt.Sprintf("shape=%d", idx), func(b *testing.B) { + t := NewTokenizer() + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = t.Tokenize(line) + } + }) + } +} + +func BenchmarkMessageSignature(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, line := range benchLines { + _ = MessageSignature(line) + } + } +} + +func BenchmarkPatternClustererProcess(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + pc := NewPatternClusterer() + for _, line := range benchLines { + pc.Process(line, 1704067200) + } + } +} diff --git a/comp/observer/impl/patterns/tokenizer_test.go b/comp/observer/impl/patterns/tokenizer_test.go index 6d03622cca70..f9a31e07bde9 100644 --- a/comp/observer/impl/patterns/tokenizer_test.go +++ b/comp/observer/impl/patterns/tokenizer_test.go @@ -202,6 +202,11 @@ func TestTokenizeEmail(t *testing.T) { assertTokenTypes(t, "example-indeed@strange-example.com", []TokenType{TypeEmailAddress}) assertTokenTypes(t, "admin@mailserver1", []TokenType{TypeEmailAddress}) assertTokenTypes(t, "example@s.example", []TokenType{TypeEmailAddress}) + // Local-parts starting with non-letter characters that the local-part + // scanner accepts must still tokenize as a single email. + assertTokenTypes(t, "123@example.com", []TokenType{TypeEmailAddress}) + assertTokenTypes(t, "_svc@example.com", []TokenType{TypeEmailAddress}) + assertTokenTypes(t, "192.168.1.1@example.com", []TokenType{TypeEmailAddress}) } // --- HTTP Method --- diff --git a/comp/observer/impl/storage.go b/comp/observer/impl/storage.go index c661c7cbacd3..c7aade57c47b 100644 --- a/comp/observer/impl/storage.go +++ b/comp/observer/impl/storage.go @@ -182,23 +182,39 @@ func newTimeSeriesStorage() *timeSeriesStorage { } // Add records a data point for a named metric in a namespace. +// AddResult bundles the outputs of timeSeriesStorage.Add. Wrapping these +// in a named struct keeps call sites self-documenting (`res.IsNew` / +// `res.StorageKey` rather than two anonymous booleans/strings) and gives +// us a single point to extend if future callers need additional metadata +// (e.g. ID, eviction signal) without breaking every existing caller. +type AddResult struct { + // IsNew is true if this Add created a brand-new series (cardinality +1). + IsNew bool + // StorageKey is the canonical seriesKey for this point. Callers that + // need to index further state by the same key (e.g. engine.contextRefs) + // can reuse this value instead of recomputing seriesKey(...) themselves. + // Empty string is returned when the point is dropped pre-key-compute + // (non-finite or sentinel values). + StorageKey string +} + +// Add inserts a (namespace, name, value, timestamp, tags) point into storage. // Invalid values are dropped at ingest with accounting and sampled logging. // Timestamps are maintained in sorted order so replay and live ingestion remain // correct even when data arrives out of order. -// Returns true if this point created a new series (cardinality +1), false otherwise. -func (s *timeSeriesStorage) Add(namespace, name string, value float64, timestamp int64, tags []string) bool { +func (s *timeSeriesStorage) Add(namespace, name string, value float64, timestamp int64, tags []string) AddResult { s.mu.Lock() defer s.mu.Unlock() if math.IsInf(value, 0) || math.IsNaN(value) { s.recordDroppedValue("non_finite", namespace, name, value, timestamp, tags) - return false + return AddResult{} } // Guard against known finite sentinel values (MaxFloat64 used as "unlimited") // that overflow downstream aggregation math when summed. if value == math.MaxFloat64 || value == -math.MaxFloat64 { s.recordDroppedValue("extreme", namespace, name, value, timestamp, tags) - return false + return AddResult{} } key := seriesKey(namespace, name, tags) @@ -218,7 +234,7 @@ func (s *timeSeriesStorage) Add(namespace, name string, value float64, timestamp s.seriesIDStats = append(s.seriesIDStats, stats) s.seriesGen++ } - isNew := !exists + res := AddResult{IsNew: !exists, StorageKey: key} stats.writeGeneration++ // Bucket by second. @@ -239,7 +255,7 @@ func (s *timeSeriesStorage) Add(namespace, name string, value float64, timestamp if value > stats.maxes[idx] { stats.maxes[idx] = value } - return isNew + return res } stats.timestamps = insertInt64(stats.timestamps, idx, bucket) @@ -247,7 +263,7 @@ func (s *timeSeriesStorage) Add(namespace, name string, value float64, timestamp stats.counts = insertInt64(stats.counts, idx, 1) stats.mins = insertFloat64(stats.mins, idx, value) stats.maxes = insertFloat64(stats.maxes, idx, value) - return isNew + return res } // insertInt64 inserts v at position idx in s, maintaining order. @@ -448,11 +464,36 @@ func (s *timeSeriesStorage) MaxTimestamp() int64 { } // seriesKey creates a unique key for a series. +// +// The result has the form "namespace|name|tag1,tag2,...". This function is on +// the hot path for log ingestion and detector loops, so we build the key with +// a single growth via strings.Builder to avoid the chained `+` and intermediate +// joinTags allocations that the naive form produces. func seriesKey(namespace, name string, tags []string) string { if len(tags) > 1 && !tagsSorted(tags) { tags = canonicalizeTags(tags) } - return namespace + "|" + name + "|" + joinTags(tags) + // Pre-compute exact length: namespace + '|' + name + '|' + joined(tags). + n := len(namespace) + 1 + len(name) + 1 + for i, t := range tags { + if i > 0 { + n++ // ',' separator + } + n += len(t) + } + var b strings.Builder + b.Grow(n) + b.WriteString(namespace) + b.WriteByte('|') + b.WriteString(name) + b.WriteByte('|') + for i, t := range tags { + if i > 0 { + b.WriteByte(',') + } + b.WriteString(t) + } + return b.String() } // parseSeriesKey parses a series key back into its parts. @@ -679,14 +720,70 @@ func (s *timeSeriesStorage) DataTimestamps() []int64 { return timestamps } -// SeriesGeneration returns a counter that increments whenever a new series key -// is created. Callers can use this to safely cache ListSeries results. +// SeriesGeneration returns a counter that increments whenever the series +// catalog changes — either when a new series key is created or when an +// existing key is removed via RemoveSeriesByKeys. Callers can use this to +// safely cache ListSeries results. func (s *timeSeriesStorage) SeriesGeneration() uint64 { s.mu.RLock() defer s.mu.RUnlock() return s.seriesGen } +// RemoveSeriesByKeys deletes the listed internal series keys (as produced by +// seriesKey). The compact numeric SeriesRef IDs assigned to each removed +// series are retired but NEVER reused: the slot in seriesIDStats is set to +// nil so any stale SeriesRef resolves to nil via resolveByID, and the slot +// in seriesIDKeys is set to "" — the slice length is preserved so subsequent +// index lookups remain bounds-safe, but the original key string is no longer +// referenced and can be garbage-collected. GetSeriesByNumericID's nil-stats +// guard handles the empty-string lookup safely (s.series[""] is always nil). +// Returns the SeriesRefs that were actually freed (one per successful removal, +// in input order; unknown keys are silently skipped). seriesGen is bumped iff +// at least one series was removed so cached ListSeries results are invalidated. +// +// Callers use the returned refs to fan out per-series teardown to detector +// state that's keyed by SeriesRef (BOCPD, ScanMW, ScanWelch posterior maps, +// seriesDetectorAdapter.lastVisibleCount, etc.). Without that fan-out, those +// maps grow with the cumulative number of series ever observed even though +// storage shrinks — defeating the LRU caps put on the upstream extractors. +// +// This is the storage-side counterpart to engine.removeContextRefsForEvictedKeys: +// the engine's contextRefs index keeps track of which storage key was created +// for which extractor context key, so when an extractor evicts a context the +// engine can pass the corresponding storage keys here to free their tags + +// columnar arrays. Without this path, evicted patterns leak indefinitely. +func (s *timeSeriesStorage) RemoveSeriesByKeys(keys []string) []observer.SeriesRef { + if len(keys) == 0 { + return nil + } + s.mu.Lock() + defer s.mu.Unlock() + var removed []observer.SeriesRef + for _, key := range keys { + if _, exists := s.series[key]; !exists { + continue + } + delete(s.series, key) + if id, ok := s.seriesIDs[key]; ok { + if int(id) < len(s.seriesIDStats) { + s.seriesIDStats[id] = nil + } + if int(id) < len(s.seriesIDKeys) { + // Free the key string so it can be GC'd; keep the slot so + // seriesIDKeys remains addressable for stale-ref reads. + s.seriesIDKeys[id] = "" + } + delete(s.seriesIDs, key) + removed = append(removed, id) + } + } + if len(removed) > 0 { + s.seriesGen++ + } + return removed +} + // CompactSeriesID translates a full series key to its compact numeric ID string. // The full key format is "namespace|name:agg|tags" where the storage key is // "namespace|name|tags" (without the agg suffix). This method strips the agg @@ -729,7 +826,12 @@ func (s *timeSeriesStorage) ListSeries(filter observer.SeriesFilter) []observer. s.mu.RLock() defer s.mu.RUnlock() - var result []observer.SeriesMeta + // Preallocate to len(s.series): an upper bound under the lock that lets + // us avoid repeated growslice in the common case where the filter matches + // most series. Detectors and the adapter call this on every advance, so + // even after the cache-by-gen optimisations the worst-case cost matters + // when seriesGen does churn (e.g. cardinality blow-ups in extractors). + result := make([]observer.SeriesMeta, 0, len(s.series)) listSeriesLoop: for key, stats := range s.series { if filter.Namespace != "" { diff --git a/comp/observer/impl/storage_test.go b/comp/observer/impl/storage_test.go index cb909309dedd..1170c5cc4843 100644 --- a/comp/observer/impl/storage_test.go +++ b/comp/observer/impl/storage_test.go @@ -577,3 +577,109 @@ func TestTimeSeriesStorage_ListSeries_ExcludeNamespaces(t *testing.T) { require.Len(t, onlyTel, 1) assert.Equal(t, observer.TelemetryNamespace, onlyTel[0].Namespace) } + +func TestTimeSeriesStorage_RemoveSeriesByKeys(t *testing.T) { + s := newTimeSeriesStorage() + + s.Add("ns", "a", 1.0, 1000, []string{"k:1"}) + s.Add("ns", "b", 2.0, 1000, []string{"k:2"}) + s.Add("ns", "c", 3.0, 1000, []string{"k:3"}) + require.Equal(t, 3, s.TotalSeriesCount("")) + genBefore := s.SeriesGeneration() + + keyB := seriesKey("ns", "b", []string{"k:2"}) + keyC := seriesKey("ns", "c", []string{"k:3"}) + refB := s.seriesIDs[keyB] + refC := s.seriesIDs[keyC] + + removed := s.RemoveSeriesByKeys([]string{keyB, keyC, "nonexistent"}) + require.Len(t, removed, 2, "unknown keys are silently ignored") + require.ElementsMatch(t, []observer.SeriesRef{refB, refC}, removed, "freed refs are returned for fan-out to detectors") + require.Equal(t, 1, s.TotalSeriesCount(""), "only series 'a' should remain") + require.Greater(t, s.SeriesGeneration(), genBefore, "seriesGen bumps on removal") + + require.Nil(t, s.GetSeriesMeta(refB), "removed ref resolves to nil") + require.Nil(t, s.GetSeriesMeta(refC), "removed ref resolves to nil") + + refA := s.seriesIDs[seriesKey("ns", "a", []string{"k:1"})] + require.NotNil(t, s.GetSeriesMeta(refA), "surviving series still resolvable") + + // Evicted slots in seriesIDKeys are cleared to "" so the original key + // string can be GC'd. Slot is kept in place (slice length unchanged) so + // subsequent stale-ref index lookups stay bounds-safe. + require.Equal(t, "", s.seriesIDKeys[refB], "evicted seriesIDKeys slot must be empty so the key string can be GC'd") + require.Equal(t, "", s.seriesIDKeys[refC], "evicted seriesIDKeys slot must be empty so the key string can be GC'd") + require.NotEqual(t, "", s.seriesIDKeys[refA], "surviving seriesIDKeys slot must be intact") + + // A subsequent Add for the same key creates a fresh series with a new ref. + s.Add("ns", "b", 99.0, 1100, []string{"k:2"}) + require.Equal(t, 2, s.TotalSeriesCount(""), "re-add re-creates the series") + newRefB := s.seriesIDs[keyB] + require.NotEqual(t, refB, newRefB, "new ref minted; old ref id is retired") + require.Nil(t, s.GetSeriesMeta(refB), "old ref still resolves to nil after re-add") +} + +func TestTimeSeriesStorage_RemoveSeriesByKeysEmptyOrUnknown(t *testing.T) { + s := newTimeSeriesStorage() + s.Add("ns", "a", 1.0, 1000, nil) + genBefore := s.SeriesGeneration() + + require.Empty(t, s.RemoveSeriesByKeys(nil)) + require.Empty(t, s.RemoveSeriesByKeys([]string{})) + require.Empty(t, s.RemoveSeriesByKeys([]string{"unknown1", "unknown2"})) + require.Equal(t, genBefore, s.SeriesGeneration(), "no removal → no gen bump") +} +func TestTimeSeriesStorage_AddReturnsCanonicalKey(t *testing.T) { + // Add returns the same string seriesKey would compute from the same + // inputs, including under tag canonicalization. Callers (e.g. the engine + // populating contextRefs) rely on this so they can skip a second + // seriesKey call. If this contract drifts, the optimisation silently + // produces wrong-keyed entries. + s := newTimeSeriesStorage() + + cases := []struct { + name string + namespace string + metric string + tags []string + }{ + {"no_tags", "ns", "m1", nil}, + {"single_tag", "ns", "m2", []string{"env:prod"}}, + {"sorted_tags", "ns", "m3", []string{"a:1", "b:2", "c:3"}}, + {"unsorted_tags", "ns", "m4", []string{"c:3", "a:1", "b:2"}}, + {"empty_namespace", "", "m5", []string{"env:prod"}}, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + res := s.Add(tc.namespace, tc.metric, 1.0, 1000, tc.tags) + assert.True(t, res.IsNew, "first write should report IsNew=true") + wantKey := seriesKey(tc.namespace, tc.metric, tc.tags) + assert.Equal(t, wantKey, res.StorageKey, "Add must return seriesKey-equivalent storage key") + + // Second write of the same series returns the same key and IsNew=false. + res2 := s.Add(tc.namespace, tc.metric, 2.0, 1001, tc.tags) + assert.False(t, res2.IsNew, "second write should report IsNew=false") + assert.Equal(t, wantKey, res2.StorageKey, "Add must return the same key on subsequent writes") + }) + } +} + +func TestTimeSeriesStorage_AddDroppedReturnsEmptyKey(t *testing.T) { + // Pre-key-compute drops (non-finite, sentinel values) return empty key. + // Callers must check StorageKey != "" before reusing it for downstream + // state (e.g. contextRefs). + s := newTimeSeriesStorage() + + res := s.Add("ns", "m", math.NaN(), 1000, nil) + assert.False(t, res.IsNew) + assert.Empty(t, res.StorageKey, "NaN drop must return empty key") + + res = s.Add("ns", "m", math.Inf(1), 1000, nil) + assert.False(t, res.IsNew) + assert.Empty(t, res.StorageKey, "+Inf drop must return empty key") + + res = s.Add("ns", "m", math.MaxFloat64, 1000, nil) + assert.False(t, res.IsNew) + assert.Empty(t, res.StorageKey, "MaxFloat64 sentinel drop must return empty key") +} diff --git a/comp/observer/impl/testbench.go b/comp/observer/impl/testbench.go index 3ea09ebeb394..5b38170e36e9 100644 --- a/comp/observer/impl/testbench.go +++ b/comp/observer/impl/testbench.go @@ -514,7 +514,7 @@ func (tb *TestBench) loadParquetDir(dir string) error { byTimestampCounter[m.Timestamp]++ - if storage.Add("parquet", metricName, m.Value, m.Timestamp, m.Tags) { + if res := storage.Add("parquet", metricName, m.Value, m.Timestamp, m.Tags); res.IsNew { byTimestampCardinality[m.Timestamp]++ } } diff --git a/test/regression/cases/observer_logs_anomaly_stress/datadog-agent/datadog.yaml b/test/regression/cases/observer_logs_anomaly_stress/datadog-agent/datadog.yaml new file mode 100644 index 000000000000..7a8bb3c1143d --- /dev/null +++ b/test/regression/cases/observer_logs_anomaly_stress/datadog-agent/datadog.yaml @@ -0,0 +1,35 @@ +auth_token_file_path: /tmp/agent-auth-token + +# Disable cloud detection. This stops the Agent from poking around the +# execution environment & network. This is particularly important if the target +# has network access. +cloud_provider_metadata: [] + +dd_url: http://127.0.0.1:9091 + +# Logs management feature is OFF on purpose. The point of this experiment +# is that the Observer (in the comparison image) ingests container logs +# even when the user has not enabled logs collection. The baseline image +# therefore does no log work at all -- the comparison's overhead is the +# observer ingestion + live anomaly detection cost. +logs_enabled: false + +logs_config: + # Surface container stdout so the comparison image's observer can pick + # them up. With logs_enabled: false this is a no-op on the baseline. + container_collect_all: true + # Kept so anything that does route through the existing logs forwarder + # has a safe blackhole to send to. + logs_dd_url: 127.0.0.1:9092 + logs_no_ssl: true + force_use_http: true + +process_config.process_dd_url: http://localhost:9093 + +telemetry.enabled: true +telemetry.checks: '*' + +# NOTE: Observer flags are intentionally NOT set here. They live in +# experiment.yaml `environment` so that the baseline agent (which does not +# recognize the keys) does not break. See the wiki page on running SMP +# experiments for the observer. diff --git a/test/regression/cases/observer_logs_anomaly_stress/experiment.yaml b/test/regression/cases/observer_logs_anomaly_stress/experiment.yaml new file mode 100644 index 000000000000..fd506d9e4445 --- /dev/null +++ b/test/regression/cases/observer_logs_anomaly_stress/experiment.yaml @@ -0,0 +1,67 @@ +# Container-log workload for the always-on Observer (anomaly detection) +# experiment. Lading spawns busybox containers that emit dynamic +# apache-common log lines on stdout; the Datadog Agent collects them as +# container logs via the docker socket. +# +# Logs management is OFF in datadog.yaml (logs_enabled: false). Only the +# comparison image's Observer ingests logs (always-on path). This isolates +# observer-on-logs overhead vs a baseline agent that does no log work. +# +# DD_OBSERVER_INGEST_METRICS_ENABLED=false disables the metrics ingest path +# in the Observer so this case measures *just* the log codepath. +optimization_goal: cpu +erratic: false + +target: + name: datadog-agent + cpu_allotment: 4 + # Generous allotment so we don't OOM during ramp-up. The actual quality + # gate is the memory_usage check below. + memory_allotment: 8 GiB + + environment: + DD_API_KEY: a0000001 + DD_HOSTNAME: smp-regression + # Observer flags live here, NOT in datadog.yaml -- the baseline agent + # would crash on unknown keys. Unknown env vars are simply ignored. + DD_OBSERVER_ANALYSIS_ENABLED: "true" + DD_OBSERVER_INGEST_METRICS_ENABLED: "false" + + profiling_environment: + DD_INTERNAL_PROFILING_BLOCK_PROFILE_RATE: 10000 + DD_INTERNAL_PROFILING_CPU_DURATION: 1m + DD_INTERNAL_PROFILING_DELTA_PROFILES: true + DD_INTERNAL_PROFILING_ENABLED: true + DD_INTERNAL_PROFILING_ENABLE_GOROUTINE_STACKTRACES: true + DD_INTERNAL_PROFILING_MUTEX_PROFILE_FRACTION: 10 + DD_INTERNAL_PROFILING_PERIOD: 1m + DD_INTERNAL_PROFILING_UNIX_SOCKET: /smp-host/apm.socket + DD_PROFILING_EXECUTION_TRACE_ENABLED: true + DD_PROFILING_EXECUTION_TRACE_PERIOD: 1m + DD_PROFILING_WAIT_PROFILE: true + DD_APM_INTERNAL_PROFILING_ENABLED: true + + DD_INTERNAL_PROFILING_EXTRA_TAGS: experiment:observer_logs_anomaly_stress + +checks: + - name: memory_usage + description: "Memory usage. Baseline (~220 MiB) + 150 MiB observer budget." + bounds: + series: total_pss_bytes + upper_bound: "370 MiB" + + - name: cpu_usage + description: "CPU usage. Allows +50% overhead over baseline for observer." + bounds: + series: avg(total_cpu_usage_millicores) + upper_bound: 500 + + - name: intake_connections + description: "Connections established to intake APIs. This puts a bound on total connections per Agent instance." + bounds: + series: "connection.current" + upper_bound: 6 + +report_links: + - text: "bounds checks dashboard" + link: "https://app.datadoghq.com/dashboard/vz3-jd5-bdi?fromUser=true&refresh_mode=paused&tpl_var_experiment%5B0%5D={{ experiment }}&tpl_var_job_id%5B0%5D={{ job_id }}&view=spans&from_ts={{ start_time_ms }}&to_ts={{ end_time_ms }}&live=false" diff --git a/test/regression/cases/observer_logs_anomaly_stress/lading/lading.yaml b/test/regression/cases/observer_logs_anomaly_stress/lading/lading.yaml new file mode 100644 index 000000000000..70767caa9ffd --- /dev/null +++ b/test/regression/cases/observer_logs_anomaly_stress/lading/lading.yaml @@ -0,0 +1,69 @@ +# Container-log workload for the always-on Observer (anomaly detection) +# experiment. Spins up many short-lived busybox containers that emit +# apache-common-style log lines on stdout. The Datadog Agent collects these +# via the docker socket (`logs_config.container_collect_all`). +# +# Each container loops in pure busybox `ash` so the only image we have to +# pull is the upstream public busybox. + +generator: + - container: + repository: public.ecr.aws/docker/library/busybox + tag: latest + args: + - sh + - -c + # Emits ~ apache-common log lines with varying IP, method, path, + # status, and response size. Sleep 5ms keeps each container at a + # human-realistic rate (~200 lines/sec/container) without burning + # the host. Status is biased 90/5/3/2 on 200/404/500/302 so the + # observer's anomaly detector has a stable distribution it can + # later see deviations from. + - | + paths="/api/v1/users /api/v1/orders /api/v2/products /api/v2/cart /healthz /metrics /login /logout /search /api/v1/checkout" + methods="GET GET GET GET GET POST POST PUT DELETE" + while :; do + ip="$((RANDOM%256)).$((RANDOM%256)).$((RANDOM%256)).$((RANDOM%256))" + set -- $paths; n=$#; idx=$((RANDOM%n+1)); shift $((idx-1)); path=$1 + set -- $methods; n=$#; idx=$((RANDOM%n+1)); shift $((idx-1)); method=$1 + r=$((RANDOM%100)) + if [ $r -lt 90 ]; then status=200 + elif [ $r -lt 95 ]; then status=404 + elif [ $r -lt 98 ]; then status=500 + else status=302 + fi + bytes=$((RANDOM*8 + RANDOM%512)) + ts=$(date -u "+%d/%b/%Y:%H:%M:%S +0000") + printf '%s - - [%s] "%s %s HTTP/1.1" %s %s\n' "$ip" "$ts" "$method" "$path" "$status" "$bytes" + usleep 5000 + done + env: + - DD_ENV=smp_observer + - DD_SERVICE=container_logs_demo + - DD_VERSION=1.0.0 + labels: + com.datadoghq.tags.env: smp_observer + com.datadoghq.tags.service: container_logs_demo + com.datadoghq.tags.version: "1.0.0" + # No network needed -- log collection is via docker stdout / the + # docker socket on the host, not via TCP from the workload. + network_disabled: true + number_of_containers: 50 + # Omitting max_lifetime_seconds -> default NonZeroU32::MAX, no + # recycling. We want a stable log stream for the duration of the + # experiment so observer's AD has clean baselines to learn from. + +blackhole: + - http: + binding_addr: "127.0.0.1:9091" + - http: + binding_addr: "127.0.0.1:9092" + response_delay_millis: 75 + - http: + binding_addr: "127.0.0.1:9093" + +target_metrics: + - prometheus: # core agent telemetry + uri: "http://127.0.0.1:5000/telemetry" + tags: + sub_agent: "core"