Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions app/cmd/openwatch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ import (
"syscall"
"time"

"github.com/Hanalyx/openwatch/internal/alertrouter"
stdoutchan "github.com/Hanalyx/openwatch/internal/alertrouter/channels/stdout"
"github.com/Hanalyx/openwatch/internal/audit"
"github.com/Hanalyx/openwatch/internal/config"
"github.com/Hanalyx/openwatch/internal/correlation"
"github.com/Hanalyx/openwatch/internal/db"
"github.com/Hanalyx/openwatch/internal/db/migrations"
"github.com/Hanalyx/openwatch/internal/eventbus"
"github.com/Hanalyx/openwatch/internal/identity"
"github.com/Hanalyx/openwatch/internal/license"
"github.com/Hanalyx/openwatch/internal/liveness"
openlog "github.com/Hanalyx/openwatch/internal/log"
"github.com/Hanalyx/openwatch/internal/secretkey"
"github.com/Hanalyx/openwatch/internal/server"
Expand Down Expand Up @@ -255,9 +259,51 @@ func cmdServe(cfg *config.Config, _ []string, stdout, stderr *os.File) int {
}
}()

// ---------------------------------------------------------------
// Slice B wiring. Spec: system-daemon-orchestration.
//
// Boot order (C-01, C-09):
// 1. Pub/sub bus - Bucket B events
// 2. Alert router - subscriber, MUST register and Start
// BEFORE any producer publishes (C-09)
// 3. Liveness probe loop - producer: HeartbeatPulse on transitions
//
// drift service is constructed elsewhere with no long-lived loop;
// the worker subcommand calls DetectForScan per-scan-completion
// in a follow-up PR.
//
// The scheduler + cron tick land in a follow-up that adds
// policy schedules loading + queue HMAC key derivation - both
// require a credential DEK accessor not yet exposed.
// ---------------------------------------------------------------

bus := eventbus.NewBus()
defer bus.Shutdown()

router, err := alertrouter.NewRouter(bus, alertrouter.Config{})
if err != nil {
slog.ErrorContext(bootCtx, "alertrouter init failed", slog.String("error", err.Error()))
return 1
}
// AC-13: register the stdout channel against an empty Tags filter
// (wildcard — receives every alert). Operators see fired alerts
// in `journalctl -u openwatch -g alertrouter.alert.sent`.
router.Register(alertrouter.ChannelRegistration{
Channel: stdoutchan.New("stdout"),
})
router.Start(ctx) // C-09: subscriber active before any publisher.

liveSvc := liveness.NewService(pool, audit.Emit, bus)
go liveSvc.Run(ctx)

srv := server.New(cfg, pool)
runErr := srv.Run(ctx)

// Shutdown order REVERSE of boot (C-02). liveness.Run + alertrouter
// observe ctx cancellation via the same ctx srv.Run watched.
router.Stop()
// liveSvc.Run returns when ctx is canceled; no explicit Stop call.

// system.shutdown: best-effort sync emit; failures logged but ignored
// because shutdown is in progress.
_ = audit.EmitSync(bootCtx, audit.SystemShutdown, audit.Event{
Expand Down
226 changes: 226 additions & 0 deletions app/cmd/openwatch/source_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
// @spec system-daemon-orchestration
//
// AC traceability (this file):
// AC-01 TestMainImportsSliceBPackages
// AC-02 TestCmdServe_BootSequenceOrder
// AC-03 TestCmdServe_ShutdownOrder
// AC-04 TestCmdServe_RouterStartBeforeLivenessRun
// AC-05 TestSliceBPackages_DoNotImportAuditEmit
// AC-06 TestCmdServe_RegistersStdoutWildcardChannel
//
// These tests are source-inspection — they read app/cmd/openwatch/main.go
// and the Slice B package directories and assert structural invariants.
// A runtime ordering test would need to refactor cmdServe to accept
// injected NewX wrappers; that lands when the spec next bumps to v1.1.0.

package main

import (
"go/parser"
"go/token"
"os"
"path/filepath"
"regexp"
"runtime"
"strings"
"testing"
)

func mainGoSource(t *testing.T) string {
t.Helper()
_, file, _, _ := runtime.Caller(0)
src, err := os.ReadFile(filepath.Join(filepath.Dir(file), "main.go"))
if err != nil {
t.Fatalf("read main.go: %v", err)
}
return string(src)
}

// orderedContains reports whether all needles appear in src in the
// listed order (non-overlapping). Used by AC-02/03/04 to assert textual
// boot-sequence ordering.
func orderedContains(src string, needles []string) (int, string) {
pos := 0
for _, n := range needles {
idx := strings.Index(src[pos:], n)
if idx < 0 {
return -1, n
}
pos += idx + len(n)
}
return pos, ""
}

// @ac AC-01
// AC-01: cmd/openwatch imports the Slice B packages it wires.
func TestMainImportsSliceBPackages(t *testing.T) {
t.Run("system-daemon-orchestration/AC-01", func(t *testing.T) {
src := mainGoSource(t)
for _, want := range []string{
`"github.com/Hanalyx/openwatch/internal/alertrouter"`,
`"github.com/Hanalyx/openwatch/internal/alertrouter/channels/stdout"`,
`"github.com/Hanalyx/openwatch/internal/eventbus"`,
`"github.com/Hanalyx/openwatch/internal/liveness"`,
} {
if !strings.Contains(src, want) {
t.Errorf("main.go imports do not include %s", want)
}
}
})
}

// @ac AC-02
// AC-02: cmdServe's textual boot sequence matches C-01.
func TestCmdServe_BootSequenceOrder(t *testing.T) {
t.Run("system-daemon-orchestration/AC-02", func(t *testing.T) {
src := mainGoSource(t)
seq := []string{
"eventbus.NewBus",
"alertrouter.NewRouter",
"router.Register",
"router.Start",
"liveness.NewService",
"liveSvc.Run",
"server.New",
"srv.Run",
}
if _, missing := orderedContains(src, seq); missing != "" {
t.Errorf("boot sequence broken — could not find %q after the preceding step", missing)
}
})
}

// @ac AC-03
// AC-03: router.Stop appears AFTER srv.Run; bus.Shutdown is deferred
// BEFORE alertrouter is constructed (so its defer runs after router.Stop).
func TestCmdServe_ShutdownOrder(t *testing.T) {
t.Run("system-daemon-orchestration/AC-03", func(t *testing.T) {
src := mainGoSource(t)
idxSrvRun := strings.Index(src, "srv.Run(ctx)")
idxRouterStop := strings.Index(src, "router.Stop()")
if idxSrvRun < 0 || idxRouterStop < 0 {
t.Fatalf("srv.Run / router.Stop missing — boot sequence broken")
}
if idxRouterStop < idxSrvRun {
t.Errorf("router.Stop() appears BEFORE srv.Run(ctx) — shutdown order broken")
}
// defer bus.Shutdown() must appear before NewRouter so its
// deferred call fires after router.Stop().
idxDefer := strings.Index(src, "defer bus.Shutdown()")
idxNewRouter := strings.Index(src, "alertrouter.NewRouter")
if idxDefer < 0 || idxNewRouter < 0 {
t.Fatalf("defer bus.Shutdown() / alertrouter.NewRouter missing")
}
if idxDefer > idxNewRouter {
t.Errorf("defer bus.Shutdown() appears AFTER alertrouter.NewRouter — deferred shutdown would fire BEFORE router.Stop, violating reverse order")
}
})
}

// @ac AC-04
// AC-04: alertrouter.Start (router.Start(ctx)) precedes the goroutine
// that spawns liveSvc.Run.
func TestCmdServe_RouterStartBeforeLivenessRun(t *testing.T) {
t.Run("system-daemon-orchestration/AC-04", func(t *testing.T) {
src := mainGoSource(t)
idxRouterStart := strings.Index(src, "router.Start(ctx)")
idxGoLive := strings.Index(src, "go liveSvc.Run(ctx)")
if idxRouterStart < 0 || idxGoLive < 0 {
t.Fatalf("router.Start / go liveSvc.Run missing")
}
if idxRouterStart > idxGoLive {
t.Errorf("router.Start(ctx) appears AFTER go liveSvc.Run — bus could see a publish before any subscriber exists (C-09 violation)")
}
})
}

// @ac AC-05
// AC-05: no Slice B package calls audit.Emit directly. Each takes its
// EmitFunc via NewX constructor injection.
func TestSliceBPackages_DoNotImportAuditEmit(t *testing.T) {
t.Run("system-daemon-orchestration/AC-05", func(t *testing.T) {
_, file, _, _ := runtime.Caller(0)
appDir := filepath.Join(filepath.Dir(file), "..", "..")
sliceBDirs := []string{
"internal/scheduler",
"internal/kensa",
"internal/transactionlog",
"internal/liveness",
"internal/alertrouter",
"internal/eventbus",
"internal/fleetrollup",
"internal/drift",
}

// Pattern: audit.Emit(ctx, ...) — direct package-global call.
// Annotation-only allowance: any line containing //nolint:audit
// is exempt. (No package uses that today.)
directCall := regexp.MustCompile(`\baudit\.Emit\(`)

fset := token.NewFileSet()
for _, d := range sliceBDirs {
full := filepath.Join(appDir, d)
entries, err := os.ReadDir(full)
if err != nil {
t.Logf("skip %s (not present): %v", d, err)
continue
}
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".go") {
continue
}
if strings.HasSuffix(e.Name(), "_test.go") {
continue
}
path := filepath.Join(full, e.Name())
// Parse to ensure file is valid Go (lints + helps avoid
// false positives in string literals — though regex above
// is intentionally narrow).
if _, err := parser.ParseFile(fset, path, nil, parser.ImportsOnly); err != nil {
t.Errorf("parse %s: %v", path, err)
continue
}
b, err := os.ReadFile(path)
if err != nil {
t.Errorf("read %s: %v", path, err)
continue
}
if directCall.MatchString(string(b)) {
t.Errorf("%s contains a direct audit.Emit(...) call — Slice B packages MUST receive audit.Emit via NewX constructor injection (system-daemon-orchestration AC-05)", path)
}
}
}
})
}

// @ac AC-06
// AC-06: cmdServe registers the stdout channel with no Tags (wildcard).
func TestCmdServe_RegistersStdoutWildcardChannel(t *testing.T) {
t.Run("system-daemon-orchestration/AC-06", func(t *testing.T) {
src := mainGoSource(t)
// Look for the canonical registration block.
if !strings.Contains(src, "stdoutchan.New(") {
t.Error("main.go does not call stdoutchan.New — stdout alert channel not constructed")
}
// Channel registered via router.Register with a
// ChannelRegistration whose Tags field is omitted (nil) or
// explicit empty map. Regex catches both forms; rejects an
// explicit non-nil filter that would prevent wildcard match.
registerRe := regexp.MustCompile(`router\.Register\(alertrouter\.ChannelRegistration\{[^}]*Channel:\s*stdoutchan\.New\([^)]*\)[^}]*\}\)`)
m := registerRe.FindString(src)
if m == "" {
t.Fatal("main.go does not register the stdoutchan via router.Register with a ChannelRegistration{Channel: stdoutchan.New(...)}")
}
// Inside the matched block, if a Tags: field is present its
// value must be nil or an empty map.
if strings.Contains(m, "Tags:") {
// Accept Tags: nil or Tags: map[string]string{}.
tagsOK := strings.Contains(m, "Tags: nil") ||
strings.Contains(m, "Tags: map[string]string{}") ||
strings.Contains(m, "Tags:map[string]string{}")
if !tagsOK {
t.Errorf("stdout channel registered with a non-wildcard Tags filter; got %q", m)
}
}
})
}
54 changes: 54 additions & 0 deletions app/internal/alertrouter/channels/stdout/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Package stdout implements an alertrouter.Channel that logs alerts to
// the structured slog default logger at INFO level.
//
// This is the minimum-viable channel for an operator. Once registered,
// `journalctl -u openwatch -g alertrouter.alert.sent` returns the
// stream of fired alerts. Slack / email / webhook channels follow the
// same Channel interface and live in sibling subpackages — keeping
// this one dependency-free preserves the alertrouter core's
// "no external SDKs" invariant (system-alert-router AC-13).
//
// Spec: system-daemon-orchestration AC-13 (registered at boot).
package stdout

import (
"context"
"log/slog"

"github.com/Hanalyx/openwatch/internal/alertrouter"
)

// Channel writes each alert via slog.InfoContext. The struct itself
// holds no state; one instance per process is enough.
type Channel struct {
name string
}

// New returns a stdout channel ready for alertrouter.Register.
// channelName is the identifier used in metrics + failure logs;
// defaults to "stdout" when empty.
func New(channelName string) *Channel {
if channelName == "" {
channelName = "stdout"
}
return &Channel{name: channelName}
}

// Name satisfies alertrouter.Channel.
func (c *Channel) Name() string { return c.name }

// Send writes the alert via slog at INFO with structured attributes.
// Never returns an error — stdout/journald is the operator's last
// resort even if a downstream channel is misbehaving.
func (c *Channel) Send(ctx context.Context, a alertrouter.Alert) error {
slog.InfoContext(ctx, "alertrouter.alert.sent",
slog.String("channel", c.name),
slog.String("alert_type", string(a.Type)),
slog.String("severity", string(a.Severity)),
slog.String("host_id", a.HostID.String()),
slog.String("rule_id", a.RuleID),
slog.Time("occurred_at", a.OccurredAt),
slog.String("title", a.Title),
)
return nil
}
Loading
Loading