Skip to content

Commit 1213d9c

Browse files
CelianRcursoragent
andcommitted
fix(reporter): restore telemetry semantics, align allium spec with stateless reporter model
- emittedCounter now counts only CorrelationDetected events (not episode events), matching the pre-refactor "new pattern first-seen" semantics - ongoingCounter now fires only when at least one active pattern was not newly detected this cycle, matching the pre-refactor "ongoing but not new" semantics - ongoing debug log now excludes patterns that were newly detected this cycle - EventReporter is fully stateless (no seen_patterns, no retry queue); all events are at-most-once — forwarder failure logs and discards the event - reporter.allium updated: removed seen_patterns/previously_active/retry_queue, replaced PublishNewCorrelation+DropExpiredSeenPatterns with PublishCorrelatorEvents, updated ObserverReportBoundary surface to use correlator_events instead of correlation_history, added CorrelatorEvent external entity and CorrelatorEventKind enum Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 0c728f9 commit 1213d9c

3 files changed

Lines changed: 168 additions & 147 deletions

File tree

comp/anomalydetection/reporter/impl/reporter.go

Lines changed: 34 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -103,18 +103,31 @@ func (r *stdoutReporter) Name() string { return "stdout_reporter" }
103103
func (r *stdoutReporter) Report(output reporterdef.ReportOutput) bool {
104104
emitted := false
105105

106+
// Build the set of newly-detected patterns from this cycle so they can be
107+
// excluded from the "ongoing" telemetry path below.
108+
newlyDetected := make(map[string]struct{}, len(output.CorrelatorEvents))
109+
106110
// Log all correlator events at info level and drive the emitted counter.
107-
if r.stdoutEnabled {
108-
for _, ce := range output.CorrelatorEvents {
109-
switch ce.Kind {
110-
case observerdef.CorrelatorEventEpisodeStarted:
111+
// emittedCounter counts only CorrelationDetected events (new pattern first-seen
112+
// or recurrence); episode events are not counted.
113+
for _, ce := range output.CorrelatorEvents {
114+
switch ce.Kind {
115+
case observerdef.CorrelatorEventEpisodeStarted:
116+
if r.stdoutEnabled {
111117
r.logger.Infof("[observer] scorer episode started: scorer=%s pattern=%s t=%d",
112118
ce.CorrelatorName, ce.Correlation.Pattern, ce.Timestamp)
113-
case observerdef.CorrelatorEventEpisodeEnded:
119+
}
120+
case observerdef.CorrelatorEventEpisodeEnded:
121+
if r.stdoutEnabled {
114122
r.logger.Infof("[observer] scorer episode ended: scorer=%s pattern=%s t=%d duration=%ds",
115123
ce.CorrelatorName, ce.Correlation.Pattern, ce.Timestamp,
116124
ce.Correlation.LastUpdated-ce.Correlation.FirstSeen)
117-
case observerdef.CorrelatorEventCorrelationDetected:
125+
}
126+
case observerdef.CorrelatorEventCorrelationDetected:
127+
newlyDetected[ce.Correlation.Pattern] = struct{}{}
128+
r.emittedCounter.Add(1)
129+
emitted = true
130+
if r.stdoutEnabled {
118131
r.logger.Infof("[observer] anomaly detection report: pattern=%s title=%q members=%d",
119132
ce.Correlation.Pattern, ce.Correlation.Title, len(ce.Correlation.Members))
120133
if r.stdoutVerbose {
@@ -127,23 +140,24 @@ func (r *stdoutReporter) Report(output reporterdef.ReportOutput) bool {
127140
}
128141
}
129142
}
130-
for range output.CorrelatorEvents {
131-
r.emittedCounter.Add(1)
132-
emitted = true
133-
}
134-
135-
// Ongoing counter: at least one correlation is currently active.
136-
if len(output.ActiveCorrelations) > 0 {
137-
r.ongoingCounter.Add(1)
138-
}
139143

140-
// Debug log for ongoing correlations.
141-
if r.stdoutEnabled {
142-
for _, ac := range output.ActiveCorrelations {
143-
r.logger.Debugf("[observer] ongoing anomaly correlation: pattern=%s members=%d",
144-
ac.Pattern, len(ac.Members))
144+
// Ongoing counter: fires when at least one active correlation was already
145+
// seen in a prior cycle (i.e. not newly detected this cycle). This mirrors
146+
// the pre-refactor semantics where ongoingCounter incremented once per
147+
// advance that had any pattern not in the freshly-emitted set.
148+
hasOngoing := false
149+
for _, ac := range output.ActiveCorrelations {
150+
if _, isNew := newlyDetected[ac.Pattern]; !isNew {
151+
if r.stdoutEnabled {
152+
r.logger.Debugf("[observer] ongoing anomaly correlation: pattern=%s members=%d",
153+
ac.Pattern, len(ac.Members))
154+
}
155+
hasOngoing = true
145156
}
146157
}
158+
if hasOngoing {
159+
r.ongoingCounter.Add(1)
160+
}
147161

148162
// Debug log for raw new anomalies detected this cycle.
149163
if r.stdoutEnabled {

comp/anomalydetection/reporter/impl/reporter_event.go

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,13 @@ import (
1212
)
1313

1414
// EventReporter sends Datadog events for correlator lifecycle events.
15-
// It is a stateless forwarder: all deduplication and recurrence logic lives
16-
// inside each correlator via the correlationEmitter helper. Reporters simply
17-
// iterate output.CorrelatorEvents and dispatch to the appropriate sender method.
15+
// Deduplication and recurrence logic live inside each correlator via the
16+
// correlationEmitter helper. The reporter forwards each CorrelatorEvent to the
17+
// appropriate sender method.
18+
//
19+
// All events are at-most-once: a transient forwarder failure logs an error and
20+
// discards the event. The correlator has already drained the event from its
21+
// pending queue so there is no replay mechanism.
1822
//
1923
// It implements reporterdef.StorageConsumer so the observer can inject engine
2024
// storage post-construction for windowed log-rate annotations in change messages.
@@ -44,8 +48,8 @@ func (r *EventReporter) SetStorage(storage observerdef.StorageReader) {
4448
// - EpisodeStarted / EpisodeEnded → sendEpisodeEvent (scorer severity transitions)
4549
// - CorrelationDetected → send (cluster/pattern first-seen, emitter-deduplicated)
4650
//
47-
// Events are at-most-once: a transient forwarder error drops the event (the
48-
// correlator already drained it). This matches the existing scorer episode model.
51+
// All events are at-most-once: a transient forwarder error logs the failure and
52+
// discards the event. The correlator has already drained it from its pending queue.
4953
func (r *EventReporter) Report(output reporterdef.ReportOutput) bool {
5054
emitted := false
5155
for _, ce := range output.CorrelatorEvents {

0 commit comments

Comments
 (0)