Skip to content

Commit 9cb8ce8

Browse files
feat(notifications): Slice 3 — governance action queue (RBAC-scoped) (#686)
Turn the bell into an action queue: route governance + remediation events to the users who can act on them, not the whole fleet. New primitives: - auth.RolesWithPermission(p) — resolve a permission to the built-in roles that grant it (pure helper over BuiltInRoles). - notifyfeed.Store.RecordForRoles — fan one row per active user holding any of the given roles (EXISTS, so a multi-role user gets one row), same upsert/collapse as RecordFanout. Producer: notifyfeed.GovernanceProjector - ExceptionRequested -> users who can approve (roles granting exception:approve: auditor, security_admin, admin), kind exception_pending, high, deep-link /settings/policies, grouped per exception. - ExceptionDecided -> the requester only, exception_approved/_rejected, medium. - RemediationFailed -> users who can act (roles granting remediation:execute: ops_lead, security_admin, admin), kind remediation_failed, high, deep-link /hosts/{id}, grouped per host+rule. Fires on a TERMINAL failure (execute that failed, or a rollback that did not restore) — NOT a successful user-initiated rollback (that is intended, not an alarm). Producer hooks (best-effort, never fail the transition/job; nil-safe): - exception.Service.WithNotifier + calls in Request/Approve/Reject. - worker.RemediationWorker GovernanceNotifier + calls on the two failure sites. - wired in cmd/openwatch/main.go (serve) and worker.go (dedicated worker). Both producers hold interfaces in their own packages, so neither imports notifyfeed. Spec system-notifications v1.5.0: C-08 + AC-15/16/17. Tests: auth.RolesWithPermission unit test; DB-backed governance fan-out tests (approver-scoped, requester-only, operator-scoped). No frontend change — rows render in the existing bell drawer.
1 parent 76f8939 commit 9cb8ce8

12 files changed

Lines changed: 594 additions & 35 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ Versioning: [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
1616
regressed (1 critical)"), severity-ranked and deep-linked to the host. A
1717
host's first scan (all-new baseline) does not flood the bell, and repeat
1818
failures collapse onto a single re-surfacing entry. (notifications Slice 2)
19+
- The notification bell is now an action queue for governance: an exception
20+
awaiting approval reaches the users who can approve it (auditors and
21+
security/admin roles), an approve or reject reaches the requester, and a
22+
remediation that fails (or a rollback that does not restore) reaches the
23+
operators who can re-run it. Each item is severity-ranked and deep-links to
24+
where you act on it. (notifications Slice 3)
1925

2026
---
2127

cmd/openwatch/main.go

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -588,9 +588,13 @@ func cmdServe(cfg *config.Config, _ []string, stdout, stderr *os.File) int {
588588
// a fresh boot has today's row. Spec system-posture-snapshots.
589589
posture.Run(ctx, pool, 0)
590590

591+
// Governance action-queue notifications (Slice 3): exceptions pending /
592+
// decided, remediation failures. Reuses the Slice-1 feed store.
593+
govProjector := notifyfeed.NewGovernanceProjector(notifFeedStore)
594+
591595
// Compliance exception governance + its hourly expiry sweep.
592596
// Spec api-compliance-exceptions.
593-
exceptionSvc := exception.NewService(pool, audit.Emit)
597+
exceptionSvc := exception.NewService(pool, audit.Emit).WithNotifier(govProjector)
594598
exceptionSvc.Run(ctx, 0)
595599

596600
// Remediation governance: request/approve/reject + projected lift (free
@@ -628,13 +632,14 @@ func cmdServe(cfg *config.Config, _ []string, stdout, stderr *os.File) int {
628632
remExecutor = remExecutor.WithRemediateFunc(remFn, rbFn)
629633
}
630634
remediationWorker := worker.NewRemediationWorker(worker.RemediationConfig{
631-
Pool: pool,
632-
Executor: remExecutor,
633-
Service: remediationSvc,
634-
Writer: remTxWriter,
635-
QueueKey: scanQueueKey,
636-
Bus: bus,
637-
Emit: audit.Emit,
635+
Pool: pool,
636+
Executor: remExecutor,
637+
Service: remediationSvc,
638+
Writer: remTxWriter,
639+
QueueKey: scanQueueKey,
640+
Bus: bus,
641+
Emit: audit.Emit,
642+
Governance: govProjector,
638643
})
639644

640645
scanWorker := worker.NewScanWorker(worker.Config{

cmd/openwatch/worker.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -230,13 +230,18 @@ func cmdWorker(cfg *config.Config, args []string, stdout, stderr *os.File) int {
230230
} else {
231231
remExecutor = remExecutor.WithRemediateFunc(remFn, rbFn)
232232
}
233+
// One in-app feed store, shared by the remediation-failure governance
234+
// notifier and the scan worker's regression projector.
235+
notifFeedStore := notifyfeed.NewStore(pool)
236+
233237
remediationWorker := worker.NewRemediationWorker(worker.RemediationConfig{
234-
Pool: pool,
235-
Executor: remExecutor,
236-
Service: remediation.NewService(pool, audit.Emit),
237-
Writer: writer,
238-
QueueKey: queueKey,
239-
Emit: audit.Emit,
238+
Pool: pool,
239+
Executor: remExecutor,
240+
Service: remediation.NewService(pool, audit.Emit),
241+
Writer: writer,
242+
QueueKey: queueKey,
243+
Emit: audit.Emit,
244+
Governance: notifyfeed.NewGovernanceProjector(notifFeedStore),
240245
// Bus nil: the dedicated worker has no SSE subscribers (cross-process
241246
// delivery is a known non-goal, same as scan.completed).
242247
})
@@ -267,7 +272,7 @@ func cmdWorker(cfg *config.Config, args []string, stdout, stderr *os.File) int {
267272
Emit: audit.Emit,
268273
Sched: sched,
269274
RemediationProcessor: remediationWorker,
270-
Regressions: notifyfeed.NewProjector(notifyfeed.NewStore(pool)),
275+
Regressions: notifyfeed.NewProjector(notifFeedStore),
271276
})
272277

273278
ctx, stop := signal.NotifyContext(bootCtx, syscall.SIGINT, syscall.SIGTERM)

internal/auth/grant.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,22 @@ func RoleGrantsWithin(caller Identity, requested RoleID) bool {
2222
}
2323
return true
2424
}
25+
26+
// RolesWithPermission returns the built-in role IDs whose definition grants p,
27+
// in declaration order. It is the recipient-resolution primitive for
28+
// permission-scoped fan-out (e.g. "every role that can approve an exception"):
29+
// callers translate a permission into the set of roles, then query user_roles
30+
// for the holders. An unknown/empty permission yields no roles.
31+
func RolesWithPermission(p Permission) []RoleID {
32+
var out []RoleID
33+
for _, id := range BuiltInRoleIDs() {
34+
def := BuiltInRoles[id]
35+
for _, granted := range def.Permissions {
36+
if granted == p {
37+
out = append(out, id)
38+
break
39+
}
40+
}
41+
}
42+
return out
43+
}

internal/auth/grant_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package auth
2+
3+
import (
4+
"sort"
5+
"testing"
6+
)
7+
8+
func roleSet(ids []RoleID) map[RoleID]bool {
9+
m := make(map[RoleID]bool, len(ids))
10+
for _, id := range ids {
11+
m[id] = true
12+
}
13+
return m
14+
}
15+
16+
func TestRolesWithPermission(t *testing.T) {
17+
cases := []struct {
18+
perm Permission
19+
want []RoleID
20+
}{
21+
// exception:approve is held by the auditor (review role) and the two
22+
// security roles — but NOT ops_lead (who can only request).
23+
{ExceptionApprove, []RoleID{RoleAuditor, RoleSecurityAdmin, RoleAdmin}},
24+
// remediation:execute is held by the operator tier and up.
25+
{RemediationExecute, []RoleID{RoleOpsLead, RoleSecurityAdmin, RoleAdmin}},
26+
// host:read is universal across the five built-in roles.
27+
{HostRead, []RoleID{RoleViewer, RoleAuditor, RoleOpsLead, RoleSecurityAdmin, RoleAdmin}},
28+
// admin-only verb.
29+
{SystemConfigWrite, []RoleID{RoleAdmin}},
30+
}
31+
for _, c := range cases {
32+
got := roleSet(RolesWithPermission(c.perm))
33+
want := roleSet(c.want)
34+
if len(got) != len(want) {
35+
t.Errorf("RolesWithPermission(%s): got %v, want %v", c.perm, sortedRoles(got), c.want)
36+
continue
37+
}
38+
for r := range want {
39+
if !got[r] {
40+
t.Errorf("RolesWithPermission(%s): missing %s (got %v)", c.perm, r, sortedRoles(got))
41+
}
42+
}
43+
}
44+
45+
// An unknown permission yields no roles.
46+
if got := RolesWithPermission(Permission("does:not_exist")); len(got) != 0 {
47+
t.Errorf("unknown permission must yield no roles, got %v", got)
48+
}
49+
}
50+
51+
func sortedRoles(m map[RoleID]bool) []RoleID {
52+
out := make([]RoleID, 0, len(m))
53+
for r := range m {
54+
out = append(out, r)
55+
}
56+
sort.Slice(out, func(i, j int) bool { return out[i] < out[j] })
57+
return out
58+
}

internal/exception/service.go

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/json"
66
"errors"
77
"fmt"
8+
"log/slog"
89
"strings"
910
"time"
1011

@@ -19,17 +20,35 @@ import (
1920
// pass a fake.
2021
type EmitFunc func(ctx context.Context, code audit.Code, ev audit.Event)
2122

23+
// Notifier receives exception lifecycle signals for the in-app notification
24+
// feed. notifyfeed.GovernanceProjector implements it; the service holds the
25+
// interface so it does not import the feed package. Methods are best-effort:
26+
// the service logs (never fails the transition on) a notifier error. nil
27+
// disables in-app exception notifications.
28+
type Notifier interface {
29+
ExceptionRequested(ctx context.Context, exceptionID, hostID uuid.UUID, ruleID string) error
30+
ExceptionDecided(ctx context.Context, exceptionID, requestedBy uuid.UUID, ruleID string, approved bool) error
31+
}
32+
2233
// Service is the exception governance service.
2334
type Service struct {
24-
pool *pgxpool.Pool
25-
emit EmitFunc
35+
pool *pgxpool.Pool
36+
emit EmitFunc
37+
notifier Notifier
2638
}
2739

2840
// NewService wires the service. emit is audit.Emit in production.
2941
func NewService(pool *pgxpool.Pool, emit EmitFunc) *Service {
3042
return &Service{pool: pool, emit: emit}
3143
}
3244

45+
// WithNotifier attaches the in-app notification projector and returns the
46+
// service for chaining at boot. nil leaves in-app notifications disabled.
47+
func (s *Service) WithNotifier(n Notifier) *Service {
48+
s.notifier = n
49+
return s
50+
}
51+
3352
const selectCols = `id, host_id, rule_id, reason, status, requested_by,
3453
reviewed_by, COALESCE(review_note, ''), expires_at, requested_at, reviewed_at`
3554

@@ -91,23 +110,32 @@ func (s *Service) Request(ctx context.Context, hostID uuid.UUID, ruleID, reason
91110
}
92111

93112
s.emitEvent(ctx, audit.ComplianceExceptionRequested, e, requestedBy, "requested")
113+
s.notifyRequested(ctx, e)
94114
return e, nil
95115
}
96116

97117
// Approve transitions a 'requested' exception to 'approved'. The
98118
// reviewer must differ from the requester (separation of duties).
99119
// Emits compliance.exception.approved.
100120
func (s *Service) Approve(ctx context.Context, id, reviewedBy uuid.UUID, note string) (Exception, error) {
101-
return s.review(ctx, id, reviewedBy, note, StatusRequested, StatusApproved,
121+
e, err := s.review(ctx, id, reviewedBy, note, StatusRequested, StatusApproved,
102122
audit.ComplianceExceptionApproved, true)
123+
if err == nil {
124+
s.notifyDecided(ctx, e, true)
125+
}
126+
return e, err
103127
}
104128

105129
// Reject transitions a 'requested' exception to 'rejected'. Like
106130
// Approve, the reviewer must differ from the requester. Emits
107131
// compliance.exception.rejected.
108132
func (s *Service) Reject(ctx context.Context, id, reviewedBy uuid.UUID, note string) (Exception, error) {
109-
return s.review(ctx, id, reviewedBy, note, StatusRequested, StatusRejected,
133+
e, err := s.review(ctx, id, reviewedBy, note, StatusRequested, StatusRejected,
110134
audit.ComplianceExceptionRejected, true)
135+
if err == nil {
136+
s.notifyDecided(ctx, e, false)
137+
}
138+
return e, err
111139
}
112140

113141
// Revoke transitions an 'approved' exception to 'revoked' before its
@@ -311,6 +339,31 @@ func (s *Service) emitEvent(ctx context.Context, code audit.Code, e Exception, a
311339
})
312340
}
313341

342+
// notifyRequested fans an "exception pending approval" notification to
343+
// approvers. Best-effort: a notifier error is logged, never propagated (the
344+
// exception was already persisted). No-op when no notifier is wired.
345+
func (s *Service) notifyRequested(ctx context.Context, e Exception) {
346+
if s.notifier == nil {
347+
return
348+
}
349+
if err := s.notifier.ExceptionRequested(ctx, e.ID, e.HostID, e.RuleID); err != nil {
350+
slog.WarnContext(ctx, "exception requested notification failed",
351+
slog.String("exception_id", e.ID.String()), slog.String("error", err.Error()))
352+
}
353+
}
354+
355+
// notifyDecided notifies the requester that their exception was approved or
356+
// rejected. Best-effort, like notifyRequested.
357+
func (s *Service) notifyDecided(ctx context.Context, e Exception, approved bool) {
358+
if s.notifier == nil {
359+
return
360+
}
361+
if err := s.notifier.ExceptionDecided(ctx, e.ID, e.RequestedBy, e.RuleID, approved); err != nil {
362+
slog.WarnContext(ctx, "exception decided notification failed",
363+
slog.String("exception_id", e.ID.String()), slog.String("error", err.Error()))
364+
}
365+
}
366+
314367
// isUniqueViolation reports whether err is a Postgres unique-violation
315368
// (SQLSTATE 23505) - the partial-unique one-open-per-host+rule index.
316369
// Same errors.As idiom as internal/host (robust to wrapping).

internal/notifyfeed/governance.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package notifyfeed
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/google/uuid"
8+
9+
"github.com/Hanalyx/openwatch/internal/auth"
10+
)
11+
12+
// GovernanceProjector turns governance + remediation lifecycle events into
13+
// RBAC-scoped in-app notifications — the bell's "action queue" (Slice 3). Unlike
14+
// the alert Channel and the regression Projector (which fan to every active
15+
// user), governance items reach only the users who can act on them: an
16+
// exception pending approval reaches approvers, a decision reaches the
17+
// requester, a failed remediation reaches the operators who can re-run it.
18+
//
19+
// It implements the Notifier interface the exception service and the
20+
// GovernanceNotifier the remediation worker hold, so neither producer imports
21+
// this package. Spec: system-notifications (Slice 3).
22+
type GovernanceProjector struct {
23+
store *Store
24+
}
25+
26+
// NewGovernanceProjector returns a governance projector over the feed store.
27+
func NewGovernanceProjector(store *Store) *GovernanceProjector {
28+
return &GovernanceProjector{store: store}
29+
}
30+
31+
// exceptionQueueLink is where the exception queue lives in the UI
32+
// (frontend/src/components/settings/ExceptionQueue.tsx, mounted on the policies
33+
// settings page). Approvers act on the queue here.
34+
const exceptionQueueLink = "/settings/policies"
35+
36+
// ExceptionRequested records an "exception pending approval" notification for
37+
// every user whose role can approve it (exception:approve → security_admin,
38+
// admin). Grouped per exception so a re-surfaced request collapses onto one
39+
// row. Best-effort: an error is returned for the caller to log, never to fail
40+
// the request.
41+
func (g *GovernanceProjector) ExceptionRequested(ctx context.Context, exceptionID, hostID uuid.UUID, ruleID string) error {
42+
host := g.store.hostName(ctx, hostID)
43+
h := hostID
44+
n := Notification{
45+
Kind: "exception_pending",
46+
Severity: "high",
47+
Title: fmt.Sprintf("Exception awaiting approval: %s on %s", ruleID, host),
48+
Body: "A compliance exception request needs review and approval or rejection.",
49+
HostID: &h,
50+
Link: exceptionQueueLink,
51+
GroupKey: "exception_pending:" + exceptionID.String(),
52+
}
53+
if err := g.store.RecordForRoles(ctx, roleStrings(auth.RolesWithPermission(auth.ExceptionApprove)), n); err != nil {
54+
return fmt.Errorf("notifyfeed: exception requested: %w", err)
55+
}
56+
return nil
57+
}
58+
59+
// ExceptionDecided records the outcome notification for the requester (only),
60+
// closing the loop for the person who asked. Grouped per exception.
61+
func (g *GovernanceProjector) ExceptionDecided(ctx context.Context, exceptionID, requestedBy uuid.UUID, ruleID string, approved bool) error {
62+
if requestedBy == uuid.Nil {
63+
return nil
64+
}
65+
verb, kind := "rejected", "exception_rejected"
66+
if approved {
67+
verb, kind = "approved", "exception_approved"
68+
}
69+
// Requester-facing: the rule id identifies the request; no host lookup.
70+
n := Notification{
71+
UserID: requestedBy,
72+
Kind: kind,
73+
Severity: "medium",
74+
Title: fmt.Sprintf("Exception %s: %s", verb, ruleID),
75+
Body: fmt.Sprintf("Your compliance exception request for %s was %s.", ruleID, verb),
76+
Link: exceptionQueueLink,
77+
GroupKey: "exception_decided:" + exceptionID.String(),
78+
}
79+
if err := g.store.Record(ctx, n); err != nil {
80+
return fmt.Errorf("notifyfeed: exception decided: %w", err)
81+
}
82+
return nil
83+
}
84+
85+
// RemediationFailed records a "remediation failed / rolled back" notification
86+
// for the operators who can act on it (remediation:execute → ops_lead,
87+
// security_admin, admin). Grouped per (host, rule). finalStatus is the
88+
// terminal remediation status ("failed" | "rolled_back"); action is
89+
// "execute" | "rollback".
90+
func (g *GovernanceProjector) RemediationFailed(ctx context.Context, hostID uuid.UUID, ruleID, action, finalStatus string) error {
91+
host := g.store.hostName(ctx, hostID)
92+
h := hostID
93+
lead := "Remediation failed"
94+
if finalStatus == "rolled_back" {
95+
lead = "Remediation rolled back"
96+
}
97+
n := Notification{
98+
Kind: "remediation_failed",
99+
Severity: "high",
100+
Title: fmt.Sprintf("%s: %s on %s", lead, ruleID, host),
101+
Body: "An automated fix did not complete successfully. Review the host and re-run or remediate manually.",
102+
HostID: &h,
103+
Link: "/hosts/" + hostID.String(),
104+
GroupKey: "remediation_failed:" + hostID.String() + ":" + ruleID,
105+
}
106+
if err := g.store.RecordForRoles(ctx, roleStrings(auth.RolesWithPermission(auth.RemediationExecute)), n); err != nil {
107+
return fmt.Errorf("notifyfeed: remediation failed: %w", err)
108+
}
109+
return nil
110+
}
111+
112+
// roleStrings adapts auth.RoleID values to the plain strings RecordForRoles
113+
// queries user_roles.role_id with.
114+
func roleStrings(ids []auth.RoleID) []string {
115+
out := make([]string, len(ids))
116+
for i, id := range ids {
117+
out[i] = string(id)
118+
}
119+
return out
120+
}

0 commit comments

Comments
 (0)