Skip to content

Commit 744c702

Browse files
feat(daemon): wire Slice B into cmd/openwatch serve (eventbus + alertrouter + liveness loop) (#430)
Lands the system-daemon-orchestration spec (v1.0.0) and its implementation: eventbus + alertrouter (with stdout channel) + the liveness probe loop are now instantiated and started during openwatch serve boot, in an order that guarantees the alert router subscribes BEFORE any producer publishes. Spec additions New: system-daemon-orchestration v1.0.0 (7 ACs over 5 constraints). Scope deliberately trimmed from the draft pulled out of #429 — this PR delivers what's actually wired and tested. Worker subcommand, scheduler dispatcher tick, drift detector loop, and live Kensa binding remain follow-ups with their own specs. Amend: system-liveness-loop 1.0.0 -> 1.1.0 - C-10/AC-16/AC-17 Service.Run(ctx) blocking loop walks the active host inventory at the configured interval. - C-11/AC-18 Run skips hosts whose host_backoff_state .suppress_until is in the future. - AC-19 Run returns within 2s of ctx cancel; in-flight probes finish naturally. - C-12/AC-20/AC-21/AC-22 publishes typed HeartbeatPulse to the eventbus on every state transition; no publish on steady-state. - C-13/AC-23 NewService(pool, emit, bus) — nil bus is valid; audit emission still fires. Amend: system-drift-detector 1.0.0 -> 1.1.0 - C-09/AC-15/AC-16/AC-17 DetectForScan publishes DriftDetected to the eventbus on every non-stable detection; per-severity counts match the audit detail produced from the same Report. - C-10/AC-18 NewService(pool, emit, thresholds, bus) — nil bus is valid. Implementation internal/eventbus, internal/alertrouter — unchanged. internal/alertrouter/channels/stdout — NEW subpackage. - Channel.Send writes via slog.InfoContext at INFO with structured attributes (alert_type, severity, host_id, etc.). - Operators see fired alerts in `journalctl -u openwatch -g alertrouter.alert.sent`. - Zero external SDK deps — preserves system-alert-router AC-13 "no external SDKs in core" invariant for the boot-default channel. internal/liveness - NewService gained *eventbus.Bus parameter (v1.1.0 C-13). - Service.Run(ctx) NEW — blocking loop calling tick() at the configured interval; tick walks active hosts and calls ProbeHost. WithInterval seam for tests. - publishHeartbeat fires on state transition (the same trigger as the audit emit). - listProbeTargets SQL LEFT-JOINs host_backoff_state and excludes hosts whose suppress_until is in the future. internal/drift - NewService gained *eventbus.Bus parameter (v1.1.0 C-10). - publishDrift fires alongside emitDriftDetected when Kind is non-stable. cmd/openwatch/main.go - cmdServe instantiates: bus -> alertrouter (with stdout channel registered, wildcard Tags) -> Start -> liveSvc -> go liveSvc.Run -> server.Run. - Shutdown order is reverse: router.Stop after srv.Run returns; bus.Shutdown + audit.Shutdown via defer in reverse order. - Source-inspection tests (cmd/openwatch/source_test.go) assert the textual boot-sequence ordering in main.go matches the spec — eyeballing the file proves correctness, no runtime wrapper refactor needed in v1.0.0. Tests (5 specs at 100%, all under -race) system-daemon-orchestration 7/7 system-liveness-loop 23/23 (was 15/15 — +8 v1.1.0 ACs) system-drift-detector 18/18 (was 14/14 — +4 v1.1.0 ACs) All AC-16/17/18/19 (liveness Run) tested with Postgres + a recording probe that counts per-tick invocations. AC-20/21/22 (HeartbeatPulse) tested with an in-memory bus + subscriber that asserts presence/absence and field values. AC-15/16/17 (DriftDetected) mirror the same pattern. Verification go vet ./... clean go test -race -count=1 ./... PASS (full tree, ~4 min with Postgres) specter parse PASS (40 specs) specter check PASS specter sync PASS (all 40 specs at threshold) Not in this PR (each its own follow-up that takes a future spec as its contract): - openwatch worker subcommand (needs system-worker-subcommand spec) - scheduler dispatcher tick inside serve (needs DEK accessor + policy.Schedules loader) - Drift detector tick (worker calls DetectForScan inline) - Live Kensa binding (system-kensa-executor v2 AC-18) - Slack / email / webhook channel implementations
1 parent bdcfb10 commit 744c702

13 files changed

Lines changed: 1434 additions & 15 deletions

File tree

app/cmd/openwatch/main.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,17 @@ import (
2121
"syscall"
2222
"time"
2323

24+
"github.com/Hanalyx/openwatch/internal/alertrouter"
25+
stdoutchan "github.com/Hanalyx/openwatch/internal/alertrouter/channels/stdout"
2426
"github.com/Hanalyx/openwatch/internal/audit"
2527
"github.com/Hanalyx/openwatch/internal/config"
2628
"github.com/Hanalyx/openwatch/internal/correlation"
2729
"github.com/Hanalyx/openwatch/internal/db"
2830
"github.com/Hanalyx/openwatch/internal/db/migrations"
31+
"github.com/Hanalyx/openwatch/internal/eventbus"
2932
"github.com/Hanalyx/openwatch/internal/identity"
3033
"github.com/Hanalyx/openwatch/internal/license"
34+
"github.com/Hanalyx/openwatch/internal/liveness"
3135
openlog "github.com/Hanalyx/openwatch/internal/log"
3236
"github.com/Hanalyx/openwatch/internal/secretkey"
3337
"github.com/Hanalyx/openwatch/internal/server"
@@ -255,9 +259,51 @@ func cmdServe(cfg *config.Config, _ []string, stdout, stderr *os.File) int {
255259
}
256260
}()
257261

262+
// ---------------------------------------------------------------
263+
// Slice B wiring. Spec: system-daemon-orchestration.
264+
//
265+
// Boot order (C-01, C-09):
266+
// 1. Pub/sub bus - Bucket B events
267+
// 2. Alert router - subscriber, MUST register and Start
268+
// BEFORE any producer publishes (C-09)
269+
// 3. Liveness probe loop - producer: HeartbeatPulse on transitions
270+
//
271+
// drift service is constructed elsewhere with no long-lived loop;
272+
// the worker subcommand calls DetectForScan per-scan-completion
273+
// in a follow-up PR.
274+
//
275+
// The scheduler + cron tick land in a follow-up that adds
276+
// policy schedules loading + queue HMAC key derivation - both
277+
// require a credential DEK accessor not yet exposed.
278+
// ---------------------------------------------------------------
279+
280+
bus := eventbus.NewBus()
281+
defer bus.Shutdown()
282+
283+
router, err := alertrouter.NewRouter(bus, alertrouter.Config{})
284+
if err != nil {
285+
slog.ErrorContext(bootCtx, "alertrouter init failed", slog.String("error", err.Error()))
286+
return 1
287+
}
288+
// AC-13: register the stdout channel against an empty Tags filter
289+
// (wildcard — receives every alert). Operators see fired alerts
290+
// in `journalctl -u openwatch -g alertrouter.alert.sent`.
291+
router.Register(alertrouter.ChannelRegistration{
292+
Channel: stdoutchan.New("stdout"),
293+
})
294+
router.Start(ctx) // C-09: subscriber active before any publisher.
295+
296+
liveSvc := liveness.NewService(pool, audit.Emit, bus)
297+
go liveSvc.Run(ctx)
298+
258299
srv := server.New(cfg, pool)
259300
runErr := srv.Run(ctx)
260301

302+
// Shutdown order REVERSE of boot (C-02). liveness.Run + alertrouter
303+
// observe ctx cancellation via the same ctx srv.Run watched.
304+
router.Stop()
305+
// liveSvc.Run returns when ctx is canceled; no explicit Stop call.
306+
261307
// system.shutdown: best-effort sync emit; failures logged but ignored
262308
// because shutdown is in progress.
263309
_ = audit.EmitSync(bootCtx, audit.SystemShutdown, audit.Event{

app/cmd/openwatch/source_test.go

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
// @spec system-daemon-orchestration
2+
//
3+
// AC traceability (this file):
4+
// AC-01 TestMainImportsSliceBPackages
5+
// AC-02 TestCmdServe_BootSequenceOrder
6+
// AC-03 TestCmdServe_ShutdownOrder
7+
// AC-04 TestCmdServe_RouterStartBeforeLivenessRun
8+
// AC-05 TestSliceBPackages_DoNotImportAuditEmit
9+
// AC-06 TestCmdServe_RegistersStdoutWildcardChannel
10+
//
11+
// These tests are source-inspection — they read app/cmd/openwatch/main.go
12+
// and the Slice B package directories and assert structural invariants.
13+
// A runtime ordering test would need to refactor cmdServe to accept
14+
// injected NewX wrappers; that lands when the spec next bumps to v1.1.0.
15+
16+
package main
17+
18+
import (
19+
"go/parser"
20+
"go/token"
21+
"os"
22+
"path/filepath"
23+
"regexp"
24+
"runtime"
25+
"strings"
26+
"testing"
27+
)
28+
29+
func mainGoSource(t *testing.T) string {
30+
t.Helper()
31+
_, file, _, _ := runtime.Caller(0)
32+
src, err := os.ReadFile(filepath.Join(filepath.Dir(file), "main.go"))
33+
if err != nil {
34+
t.Fatalf("read main.go: %v", err)
35+
}
36+
return string(src)
37+
}
38+
39+
// orderedContains reports whether all needles appear in src in the
40+
// listed order (non-overlapping). Used by AC-02/03/04 to assert textual
41+
// boot-sequence ordering.
42+
func orderedContains(src string, needles []string) (int, string) {
43+
pos := 0
44+
for _, n := range needles {
45+
idx := strings.Index(src[pos:], n)
46+
if idx < 0 {
47+
return -1, n
48+
}
49+
pos += idx + len(n)
50+
}
51+
return pos, ""
52+
}
53+
54+
// @ac AC-01
55+
// AC-01: cmd/openwatch imports the Slice B packages it wires.
56+
func TestMainImportsSliceBPackages(t *testing.T) {
57+
t.Run("system-daemon-orchestration/AC-01", func(t *testing.T) {
58+
src := mainGoSource(t)
59+
for _, want := range []string{
60+
`"github.com/Hanalyx/openwatch/internal/alertrouter"`,
61+
`"github.com/Hanalyx/openwatch/internal/alertrouter/channels/stdout"`,
62+
`"github.com/Hanalyx/openwatch/internal/eventbus"`,
63+
`"github.com/Hanalyx/openwatch/internal/liveness"`,
64+
} {
65+
if !strings.Contains(src, want) {
66+
t.Errorf("main.go imports do not include %s", want)
67+
}
68+
}
69+
})
70+
}
71+
72+
// @ac AC-02
73+
// AC-02: cmdServe's textual boot sequence matches C-01.
74+
func TestCmdServe_BootSequenceOrder(t *testing.T) {
75+
t.Run("system-daemon-orchestration/AC-02", func(t *testing.T) {
76+
src := mainGoSource(t)
77+
seq := []string{
78+
"eventbus.NewBus",
79+
"alertrouter.NewRouter",
80+
"router.Register",
81+
"router.Start",
82+
"liveness.NewService",
83+
"liveSvc.Run",
84+
"server.New",
85+
"srv.Run",
86+
}
87+
if _, missing := orderedContains(src, seq); missing != "" {
88+
t.Errorf("boot sequence broken — could not find %q after the preceding step", missing)
89+
}
90+
})
91+
}
92+
93+
// @ac AC-03
94+
// AC-03: router.Stop appears AFTER srv.Run; bus.Shutdown is deferred
95+
// BEFORE alertrouter is constructed (so its defer runs after router.Stop).
96+
func TestCmdServe_ShutdownOrder(t *testing.T) {
97+
t.Run("system-daemon-orchestration/AC-03", func(t *testing.T) {
98+
src := mainGoSource(t)
99+
idxSrvRun := strings.Index(src, "srv.Run(ctx)")
100+
idxRouterStop := strings.Index(src, "router.Stop()")
101+
if idxSrvRun < 0 || idxRouterStop < 0 {
102+
t.Fatalf("srv.Run / router.Stop missing — boot sequence broken")
103+
}
104+
if idxRouterStop < idxSrvRun {
105+
t.Errorf("router.Stop() appears BEFORE srv.Run(ctx) — shutdown order broken")
106+
}
107+
// defer bus.Shutdown() must appear before NewRouter so its
108+
// deferred call fires after router.Stop().
109+
idxDefer := strings.Index(src, "defer bus.Shutdown()")
110+
idxNewRouter := strings.Index(src, "alertrouter.NewRouter")
111+
if idxDefer < 0 || idxNewRouter < 0 {
112+
t.Fatalf("defer bus.Shutdown() / alertrouter.NewRouter missing")
113+
}
114+
if idxDefer > idxNewRouter {
115+
t.Errorf("defer bus.Shutdown() appears AFTER alertrouter.NewRouter — deferred shutdown would fire BEFORE router.Stop, violating reverse order")
116+
}
117+
})
118+
}
119+
120+
// @ac AC-04
121+
// AC-04: alertrouter.Start (router.Start(ctx)) precedes the goroutine
122+
// that spawns liveSvc.Run.
123+
func TestCmdServe_RouterStartBeforeLivenessRun(t *testing.T) {
124+
t.Run("system-daemon-orchestration/AC-04", func(t *testing.T) {
125+
src := mainGoSource(t)
126+
idxRouterStart := strings.Index(src, "router.Start(ctx)")
127+
idxGoLive := strings.Index(src, "go liveSvc.Run(ctx)")
128+
if idxRouterStart < 0 || idxGoLive < 0 {
129+
t.Fatalf("router.Start / go liveSvc.Run missing")
130+
}
131+
if idxRouterStart > idxGoLive {
132+
t.Errorf("router.Start(ctx) appears AFTER go liveSvc.Run — bus could see a publish before any subscriber exists (C-09 violation)")
133+
}
134+
})
135+
}
136+
137+
// @ac AC-05
138+
// AC-05: no Slice B package calls audit.Emit directly. Each takes its
139+
// EmitFunc via NewX constructor injection.
140+
func TestSliceBPackages_DoNotImportAuditEmit(t *testing.T) {
141+
t.Run("system-daemon-orchestration/AC-05", func(t *testing.T) {
142+
_, file, _, _ := runtime.Caller(0)
143+
appDir := filepath.Join(filepath.Dir(file), "..", "..")
144+
sliceBDirs := []string{
145+
"internal/scheduler",
146+
"internal/kensa",
147+
"internal/transactionlog",
148+
"internal/liveness",
149+
"internal/alertrouter",
150+
"internal/eventbus",
151+
"internal/fleetrollup",
152+
"internal/drift",
153+
}
154+
155+
// Pattern: audit.Emit(ctx, ...) — direct package-global call.
156+
// Annotation-only allowance: any line containing //nolint:audit
157+
// is exempt. (No package uses that today.)
158+
directCall := regexp.MustCompile(`\baudit\.Emit\(`)
159+
160+
fset := token.NewFileSet()
161+
for _, d := range sliceBDirs {
162+
full := filepath.Join(appDir, d)
163+
entries, err := os.ReadDir(full)
164+
if err != nil {
165+
t.Logf("skip %s (not present): %v", d, err)
166+
continue
167+
}
168+
for _, e := range entries {
169+
if e.IsDir() || !strings.HasSuffix(e.Name(), ".go") {
170+
continue
171+
}
172+
if strings.HasSuffix(e.Name(), "_test.go") {
173+
continue
174+
}
175+
path := filepath.Join(full, e.Name())
176+
// Parse to ensure file is valid Go (lints + helps avoid
177+
// false positives in string literals — though regex above
178+
// is intentionally narrow).
179+
if _, err := parser.ParseFile(fset, path, nil, parser.ImportsOnly); err != nil {
180+
t.Errorf("parse %s: %v", path, err)
181+
continue
182+
}
183+
b, err := os.ReadFile(path)
184+
if err != nil {
185+
t.Errorf("read %s: %v", path, err)
186+
continue
187+
}
188+
if directCall.MatchString(string(b)) {
189+
t.Errorf("%s contains a direct audit.Emit(...) call — Slice B packages MUST receive audit.Emit via NewX constructor injection (system-daemon-orchestration AC-05)", path)
190+
}
191+
}
192+
}
193+
})
194+
}
195+
196+
// @ac AC-06
197+
// AC-06: cmdServe registers the stdout channel with no Tags (wildcard).
198+
func TestCmdServe_RegistersStdoutWildcardChannel(t *testing.T) {
199+
t.Run("system-daemon-orchestration/AC-06", func(t *testing.T) {
200+
src := mainGoSource(t)
201+
// Look for the canonical registration block.
202+
if !strings.Contains(src, "stdoutchan.New(") {
203+
t.Error("main.go does not call stdoutchan.New — stdout alert channel not constructed")
204+
}
205+
// Channel registered via router.Register with a
206+
// ChannelRegistration whose Tags field is omitted (nil) or
207+
// explicit empty map. Regex catches both forms; rejects an
208+
// explicit non-nil filter that would prevent wildcard match.
209+
registerRe := regexp.MustCompile(`router\.Register\(alertrouter\.ChannelRegistration\{[^}]*Channel:\s*stdoutchan\.New\([^)]*\)[^}]*\}\)`)
210+
m := registerRe.FindString(src)
211+
if m == "" {
212+
t.Fatal("main.go does not register the stdoutchan via router.Register with a ChannelRegistration{Channel: stdoutchan.New(...)}")
213+
}
214+
// Inside the matched block, if a Tags: field is present its
215+
// value must be nil or an empty map.
216+
if strings.Contains(m, "Tags:") {
217+
// Accept Tags: nil or Tags: map[string]string{}.
218+
tagsOK := strings.Contains(m, "Tags: nil") ||
219+
strings.Contains(m, "Tags: map[string]string{}") ||
220+
strings.Contains(m, "Tags:map[string]string{}")
221+
if !tagsOK {
222+
t.Errorf("stdout channel registered with a non-wildcard Tags filter; got %q", m)
223+
}
224+
}
225+
})
226+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Package stdout implements an alertrouter.Channel that logs alerts to
2+
// the structured slog default logger at INFO level.
3+
//
4+
// This is the minimum-viable channel for an operator. Once registered,
5+
// `journalctl -u openwatch -g alertrouter.alert.sent` returns the
6+
// stream of fired alerts. Slack / email / webhook channels follow the
7+
// same Channel interface and live in sibling subpackages — keeping
8+
// this one dependency-free preserves the alertrouter core's
9+
// "no external SDKs" invariant (system-alert-router AC-13).
10+
//
11+
// Spec: system-daemon-orchestration AC-13 (registered at boot).
12+
package stdout
13+
14+
import (
15+
"context"
16+
"log/slog"
17+
18+
"github.com/Hanalyx/openwatch/internal/alertrouter"
19+
)
20+
21+
// Channel writes each alert via slog.InfoContext. The struct itself
22+
// holds no state; one instance per process is enough.
23+
type Channel struct {
24+
name string
25+
}
26+
27+
// New returns a stdout channel ready for alertrouter.Register.
28+
// channelName is the identifier used in metrics + failure logs;
29+
// defaults to "stdout" when empty.
30+
func New(channelName string) *Channel {
31+
if channelName == "" {
32+
channelName = "stdout"
33+
}
34+
return &Channel{name: channelName}
35+
}
36+
37+
// Name satisfies alertrouter.Channel.
38+
func (c *Channel) Name() string { return c.name }
39+
40+
// Send writes the alert via slog at INFO with structured attributes.
41+
// Never returns an error — stdout/journald is the operator's last
42+
// resort even if a downstream channel is misbehaving.
43+
func (c *Channel) Send(ctx context.Context, a alertrouter.Alert) error {
44+
slog.InfoContext(ctx, "alertrouter.alert.sent",
45+
slog.String("channel", c.name),
46+
slog.String("alert_type", string(a.Type)),
47+
slog.String("severity", string(a.Severity)),
48+
slog.String("host_id", a.HostID.String()),
49+
slog.String("rule_id", a.RuleID),
50+
slog.Time("occurred_at", a.OccurredAt),
51+
slog.String("title", a.Title),
52+
)
53+
return nil
54+
}

0 commit comments

Comments
 (0)