Skip to content

Commit d4fdd92

Browse files
feat(alertrouter): B.3b — system-alert-router implementation (15/15 ACs, 100%) (#424)
Continues Slice B.3 (orchestration plumbing). The alert router is the bridge between OpenWatch's event bus (B.3a) and external notification channels — Slack, email, webhook, PagerDuty. It subscribes to bus events at boot, translates each into a typed Alert, applies a dedup gate per (alert_type, host_id, rule_id) tuple, and dispatches matching alerts to registered Channels. Concrete channel implementations live in subpackages so the core router has no external SDK dependencies. This PR ships the interface + a test fake; Slack, email, webhook implementations land in follow-ups. Spec New: app/specs/system/alert-router.spec.yaml (status: approved). 15 ACs across 9 constraints. internal/alertrouter package doc.go Architectural choices: bus subscription on Start, closed AlertType + Severity enums, in-memory dedup with TTL, channel registration with tag-filter routing, per- channel goroutine with failure isolation, Stop drains with 10s timeout. types.go AlertType closed enum (HostUnreachable, HostRecovered, DriftMajor, DriftMinor, DriftImprovement). Severity closed enum (Critical, High, Medium, Low, Info) + SeverityOrder rank map. Alert struct with Type, Severity, HostID, RuleID, Tags. Channel interface (Name + Send). ChannelRegistration with Tags filter; empty Tags = wildcard. ValidateDedupTTL enforces [60s, 24h] range with typed ErrDedupTTLOutOfRange. dedup.go DedupGate keyed by Alert.DedupKey(); in-memory map with opportunistic reap on every ShouldSkip call to keep size bounded under churn. Injectable now() for testing. router.go Router with Start (subscribe to HeartbeatPulse + DriftDetected) / Stop (unsubscribe + drain in-flight Channel.Send with 10s timeout). Per-channel goroutine dispatch; one channel's error or panic does NOT block delivery to other channels. Event translation: HeartbeatPulse{Reachable=false} → host_unreachable (High); recovery → host_recovered (Info); DriftDetected{major} → drift_major (High); minor → drift_minor (Medium); improvement → Info. metrics.go ReceivedCount + RoutedCount + DedupedCount + ChannelFailureCount with JSON-friendly Snapshot. Tests (15/15 ACs, all under -race) types_test.go AC-01 enum closure (5 alert types). AC-02 severity enum + SeverityOrder ranking. AC-15 ValidateDedupTTL range check (boundary cases + typed error sentinel). router_test.go AC-03/04/05 event-to-alert translation. AC-06 dedup skip within TTL (Channel.Send NOT called for the skipped alert). AC-07 dedup pass after TTL (fake clock on gate). AC-08 tag-filter rejects non-match. AC-09 wildcard channel (empty Tags) receives every alert. AC-10 channel error doesn't block other channels (per-channel + aggregate failure counters). AC-11 Start subscribes to BOTH event kinds. AC-12 Stop drains slow sends + post-Stop publishes ignored. AC-14 all four metric counters increment under compound scenarios. source_test.go AC-13 internal/alertrouter (core, not subpackages) imports no external notification SDKs (slack-go, sendgrid, mailgun, twilio, PagerDuty SDK, opsgenie, gomail, etc.). AST-based import scan. Verification go vet ./... clean golangci-lint clean govulncheck clean go test -race -count=1 PASS (1.10s) specter parse PASS (system-alert-router@1.0.0) specter check PASS (32 specs) specter coverage system-alert-router 15/15 (100%) Spec: app/specs/system/alert-router.spec.yaml
1 parent b095781 commit d4fdd92

9 files changed

Lines changed: 1584 additions & 0 deletions

File tree

app/internal/alertrouter/dedup.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package alertrouter
2+
3+
import (
4+
"sync"
5+
"time"
6+
)
7+
8+
// DedupGate filters out repeat alerts with the same dedup key seen
9+
// within the configured TTL. Per Spec C-03 / C-04 / AC-06 / AC-07.
10+
//
11+
// Implementation: in-memory map keyed by Alert.DedupKey(). Each entry
12+
// holds the last-seen timestamp; ShouldSkip compares now() against
13+
// that timestamp + TTL. Stale entries are reaped opportunistically on
14+
// every ShouldSkip call to keep the map from growing unbounded under
15+
// churn.
16+
type DedupGate struct {
17+
ttl time.Duration
18+
now func() time.Time
19+
mu sync.Mutex
20+
seen map[string]time.Time
21+
}
22+
23+
// NewDedupGate constructs a DedupGate with the given TTL. Caller is
24+
// expected to have validated the TTL via ValidateDedupTTL.
25+
func NewDedupGate(ttl time.Duration) *DedupGate {
26+
return &DedupGate{
27+
ttl: ttl,
28+
now: time.Now,
29+
seen: make(map[string]time.Time),
30+
}
31+
}
32+
33+
// ShouldSkip reports whether the alert should be skipped because it
34+
// matches a recently-seen dedup key. As a side effect, on a non-skip
35+
// outcome it records the alert's timestamp so the next call within
36+
// TTL will skip.
37+
//
38+
// Spec AC-06: a repeat within TTL is skipped.
39+
// Spec AC-07: a repeat after TTL passes through.
40+
func (g *DedupGate) ShouldSkip(alert Alert) bool {
41+
key := alert.DedupKey()
42+
now := g.now()
43+
44+
g.mu.Lock()
45+
defer g.mu.Unlock()
46+
47+
// Opportunistic reap: drop entries past TTL. Cheap because the map
48+
// is small (one entry per active alert tuple). Avoids unbounded
49+
// growth under high churn.
50+
for k, ts := range g.seen {
51+
if now.Sub(ts) > g.ttl {
52+
delete(g.seen, k)
53+
}
54+
}
55+
56+
if last, ok := g.seen[key]; ok {
57+
if now.Sub(last) <= g.ttl {
58+
return true
59+
}
60+
}
61+
g.seen[key] = now
62+
return false
63+
}
64+
65+
// Size returns the number of tracked dedup keys. Test helper.
66+
func (g *DedupGate) Size() int {
67+
g.mu.Lock()
68+
defer g.mu.Unlock()
69+
return len(g.seen)
70+
}

app/internal/alertrouter/doc.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Package alertrouter is the bridge between OpenWatch's in-process
2+
// event bus (internal/eventbus) and external notification channels
3+
// (Slack, email, webhook, PagerDuty).
4+
//
5+
// Spec: specs/system/alert-router.spec.yaml (status: approved)
6+
//
7+
// Architectural choices:
8+
//
9+
// - Subscribes to EventKindHeartbeatPulse + EventKindDriftDetected on
10+
// the bus at Start. A single dispatch goroutine reads from the
11+
// subscription channels and translates each event to a typed Alert
12+
// before fan-out. Spec C-08 / AC-11.
13+
//
14+
// - Closed enums for AlertType and Severity. Each AlertType has a
15+
// default Severity that the router applies before channel routing.
16+
// Spec C-01 / C-02.
17+
//
18+
// - In-memory dedup gate keyed by (alert_type, host_id, rule_id) with
19+
// configurable TTL (default 60 min, range 60s..24h). Dedup state
20+
// does NOT survive process restart — for v1, single-instance
21+
// dedup is correct; multi-instance deploys can slot a Postgres
22+
// store behind the same interface. Spec C-03 / C-04.
23+
//
24+
// - Channel registration with tag-filter routing. Each channel
25+
// declares a Tags map of required key/value pairs; an empty Tags
26+
// map is a wildcard (channel receives every alert). Alert.Tags
27+
// always carries at minimum severity + alert_type + host_id.
28+
// Spec C-05 / C-06.
29+
//
30+
// - Per-channel goroutine for Channel.Send. A Send returning an error
31+
// increments that channel's FailureCount but does NOT halt delivery
32+
// to other channels for the same alert. The router never panics on
33+
// a channel-side error. Spec C-07 / AC-10.
34+
//
35+
// - Router.Stop unsubscribes from the bus AND waits up to 10s for
36+
// in-flight Channel.Send calls to complete (drain timeout). After
37+
// Stop, new events arriving on the bus subscription are ignored.
38+
// Spec C-08 / AC-12.
39+
//
40+
// Distinction from internal/audit: the audit log is the persistent
41+
// forensic record of who/what/when; the alert router is real-time
42+
// delivery to notification surfaces. Most alert-worthy events emit
43+
// to both — e.g., a major drift writes compliance.drift.detected to
44+
// the audit log AND fires through the alert router to Slack.
45+
//
46+
// Concrete Channel implementations (Slack, email, webhook) live in
47+
// subpackages so the core router has no external SDK dependencies.
48+
// This PR ships the interface + a fake (for tests) + stdout (for
49+
// dev/test scenarios). Spec C-09 / AC-13.
50+
package alertrouter
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package alertrouter
2+
3+
import "sync/atomic"
4+
5+
// Metrics holds the router's runtime counters. Spec AC-14.
6+
type Metrics struct {
7+
// ReceivedCount is the count of bus events the router translated
8+
// into Alert values (pre-dedup).
9+
ReceivedCount atomic.Int64
10+
11+
// RoutedCount is the count of (alert, channel) pairs where the
12+
// channel's tag filter matched and Send was invoked. Counts each
13+
// channel delivery for a fan-out alert.
14+
RoutedCount atomic.Int64
15+
16+
// DedupedCount is the count of alerts skipped by the dedup gate.
17+
// Per Spec C-03, these never reach Channel.Send.
18+
DedupedCount atomic.Int64
19+
20+
// ChannelFailureCount is the aggregate count of Channel.Send calls
21+
// that returned an error. Per-channel counters are tracked on
22+
// channelEntry.failureCount.
23+
ChannelFailureCount atomic.Int64
24+
}
25+
26+
// NewMetrics returns a fresh Metrics.
27+
func NewMetrics() *Metrics { return &Metrics{} }
28+
29+
// MetricsSnapshot is a JSON-friendly point-in-time snapshot.
30+
type MetricsSnapshot struct {
31+
ReceivedCount int64 `json:"received_count"`
32+
RoutedCount int64 `json:"routed_count"`
33+
DedupedCount int64 `json:"deduped_count"`
34+
ChannelFailureCount int64 `json:"channel_failure_count"`
35+
}
36+
37+
// Snapshot returns a point-in-time copy of all counters.
38+
func (m *Metrics) Snapshot() MetricsSnapshot {
39+
return MetricsSnapshot{
40+
ReceivedCount: m.ReceivedCount.Load(),
41+
RoutedCount: m.RoutedCount.Load(),
42+
DedupedCount: m.DedupedCount.Load(),
43+
ChannelFailureCount: m.ChannelFailureCount.Load(),
44+
}
45+
}

0 commit comments

Comments
 (0)