Skip to content

Commit 7afb6ed

Browse files
feat(reports): async attestation render + report.ready event (B3a) (#646)
Move attestation bulk-face rendering off the request path and emit a report.ready event - the first producer of the in-app notification bell. - eventbus: new EventKindReportReady (report.ready) + ReportReady event (snapshot id, kind, rendered faces, generating principal), added to the AllEventKinds closed set. - report/job.go: a report.render job (RenderJobType + RenderPayload) and a RenderProcessor. Generate(attestation), when async is enabled (WithAsyncRender), marks the bulk faces (csv/oscal_sar/pdf) 'pending' and enqueues the job, returning immediately; the executive summary stays synchronous. The processor renders each face via Export (flipping 'pending' -> 'ready'; a render error marks the face 'failed' and fails the job for retry), completes the job, and publishes ReportReady. - worker: a ReportRenderer interface + WithReportProcessor + a 'report.render' dispatch case (interface-typed so worker does not import report). server: WithReportWorker registers it on the in-process worker. - main.go: build the report service WithAsyncRender + a RenderProcessor over the bus, and register both. Async is an optimization, not a correctness gate: Export stays the lazy fallback, so a download arriving before the job runs still renders inline; enqueue failures are logged and do not fail Generate. Spec api-reports v1.10.0: C-16 + AC-22 (DB test: pending faces + queued job -> processor -> ready faces + completed job + ReportReady event). eventbus AC-07 closed-set test updated for the new kind.
1 parent ae8a719 commit 7afb6ed

10 files changed

Lines changed: 463 additions & 9 deletions

File tree

cmd/openwatch/main.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,16 @@ func cmdServe(cfg *config.Config, _ []string, stdout, stderr *os.File) int {
648648
slog.String("key_id", reportSigner.KeyID()))
649649
}
650650

651+
// Report service + async render processor. The service enqueues a
652+
// report.render job for each generated attestation (WithAsyncRender);
653+
// the processor, registered on the in-process worker, renders the bulk
654+
// faces and publishes ReportReady on the bus.
655+
reportSvc := report.NewService(pool).
656+
WithGroups(group.NewService(pool)).
657+
WithSigner(reportSigner).
658+
WithAsyncRender()
659+
reportRenderProc := report.NewRenderProcessor(reportSvc, bus)
660+
651661
srv := server.New(cfg, pool).
652662
WithConnectivityConfig(cfgStore, liveSvc).
653663
WithDiscovery(discoSvc).
@@ -669,7 +679,8 @@ func cmdServe(cfg *config.Config, _ []string, stdout, stderr *os.File) int {
669679
WithExceptions(exceptionSvc).
670680
WithRemediation(remediationSvc).
671681
WithGroups(group.NewService(pool)).
672-
WithReports(report.NewService(pool).WithGroups(group.NewService(pool)).WithSigner(reportSigner)).
682+
WithReports(reportSvc).
683+
WithReportWorker(reportRenderProc).
673684
WithScanResults(scanresult.NewReader(pool)).
674685
WithNotifications(notifSvc)
675686
runErr := srv.Run(ctx)

docs/engineering/reports_design.md

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -553,13 +553,18 @@ framework-lensed), so the PDF stays bounded. Cached in `report_faces` (face
553553
`pdf`) like the others. Spec: `api-reports` v1.9.0 (C-15 / AC-21; C-10
554554
updated: pdf kind-dispatched, not executive-only).
555555

556-
**B3a — Async generation + report.ready.** *(REMAINING.)* Fleet attestation
557-
generation (the bulk query + SAR/CSV/PDF render) moves to the job queue: a
558-
`FleetReportJobType` + payload + a worker processor that flips
559-
`report_faces` status `pending → ready` and publishes
556+
**B3a — Async generation + report.ready.** *(SHIPPED 2026-06-21, PR #646.)*
557+
Generating an attestation marks its bulk faces (`csv`, `oscal_sar`, `pdf`)
558+
`pending` in `report_faces` and enqueues a `report.render` job
559+
(`internal/report/job.go`), returning immediately (the executive summary
560+
stays synchronous). A `RenderProcessor` registered on the in-process worker
561+
(`worker.WithReportProcessor`) claims the job, renders each face via
562+
`Export` (flipping `pending → ready`; a render error marks the face
563+
`failed` and fails the job for retry), and publishes
560564
`EventKindReportReady` on the event bus — **the in-app notification bell's
561-
first producer** (closes that coupling). Spec: `system-report-faces`
562-
(async + status), eventbus types.
565+
first producer**. Async is an optimization, not a correctness gate: `Export`
566+
stays the lazy fallback so a download before the job runs still renders
567+
inline. Spec: `api-reports` v1.10.0 (C-16 / AC-22) + the new eventbus kind.
563568

564569
**B3c — Notification bell (frontend).** *(REMAINING; needs product input.)*
565570
Turn the stubbed TopBar bell into a real consumer of `report.ready` (and

internal/eventbus/bus_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ func TestEventKindEnum_HasExactlyTwoValues(t *testing.T) {
254254
// HostDiscovered (system-host-discovery PR 1.1),
255255
// IntelligenceEvent (system-os-intelligence PR 1.2),
256256
// ScanCompleted (api-host-scan / scan foundation),
257-
// RemediationCompleted (api-remediation execute/rollback).
257+
// RemediationCompleted (api-remediation execute/rollback),
258+
// ReportReady (api-reports async render, B3a).
258259
expected := map[EventKind]bool{
259260
EventKindHeartbeatPulse: false,
260261
EventKindDriftDetected: false,
@@ -264,6 +265,7 @@ func TestEventKindEnum_HasExactlyTwoValues(t *testing.T) {
264265
EventKindIntelligenceEvent: false,
265266
EventKindScanCompleted: false,
266267
EventKindRemediationCompleted: false,
268+
EventKindReportReady: false,
267269
}
268270
if len(AllEventKinds) != len(expected) {
269271
t.Errorf("AllEventKinds = %d, want %d", len(AllEventKinds), len(expected))

internal/eventbus/types.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ const (
5757
// remediation queue + host compliance surfaces without polling.
5858
// Spec api-remediation.
5959
EventKindRemediationCompleted EventKind = "remediation.completed"
60+
61+
// EventKindReportReady is emitted by the report render worker once a
62+
// report's bulk faces (CSV / OSCAL SAR / PDF for an attestation) have
63+
// been rendered and cached 'ready' in report_faces. It is the first
64+
// producer of the in-app notification bell: a generator who kicked off
65+
// a fleet attestation learns the bundle is downloadable without polling.
66+
// Spec api-reports.
67+
EventKindReportReady EventKind = "report.ready"
6068
)
6169

6270
// AllEventKinds is the closed set, in registration order. Spec AC-07's
@@ -70,6 +78,7 @@ var AllEventKinds = []EventKind{
7078
EventKindIntelligenceEvent,
7179
EventKindScanCompleted,
7280
EventKindRemediationCompleted,
81+
EventKindReportReady,
7382
}
7483

7584
// Event is the contract every bus event satisfies. Implementations are
@@ -255,6 +264,26 @@ func (r RemediationCompleted) Kind() EventKind { return EventKindRemediationComp
255264
// Timestamp satisfies Event.
256265
func (r RemediationCompleted) Timestamp() time.Time { return r.CompletedAt }
257266

267+
// ReportReady is fired by the report render worker once a report's bulk
268+
// faces are rendered and cached. SnapshotID is the report; ReportKind is
269+
// the report kind (e.g. "attestation"); Faces lists the faces that were
270+
// rendered ready (e.g. ["csv","oscal_sar","pdf"]). GeneratedBy carries the
271+
// principal who generated the report so the notification can be routed to
272+
// the right operator.
273+
type ReportReady struct {
274+
SnapshotID uuid.UUID
275+
ReportKind string
276+
Faces []string
277+
GeneratedBy string
278+
OccurredAt time.Time
279+
}
280+
281+
// Kind satisfies Event.
282+
func (r ReportReady) Kind() EventKind { return EventKindReportReady }
283+
284+
// Timestamp satisfies Event.
285+
func (r ReportReady) Timestamp() time.Time { return r.OccurredAt }
286+
258287
// DefaultBufferSize is the per-subscriber channel buffer when
259288
// SubscribeOptions.BufferSize is zero. Spec C-04.
260289
const DefaultBufferSize = 1024

internal/report/job.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package report
2+
3+
// Async report rendering: the bulk attestation faces (CSV / OSCAL SAR /
4+
// PDF) can be expensive to assemble for a large fleet, so generating an
5+
// attestation enqueues a render job instead of blocking the request. A
6+
// worker claims the job, renders each face (warming the report_faces cache
7+
// and flipping each face's row from 'pending' to 'ready'), then publishes
8+
// a ReportReady event on the bus - the first producer of the in-app
9+
// notification bell.
10+
//
11+
// Export stays the lazy fallback: a download that arrives before the job
12+
// runs still renders the face inline (a cache miss), so async rendering is
13+
// a warm-the-cache-and-notify optimization, never a correctness gate.
14+
//
15+
// Spec: api-reports.
16+
17+
import (
18+
"context"
19+
"encoding/json"
20+
"fmt"
21+
"log/slog"
22+
"time"
23+
24+
"github.com/google/uuid"
25+
26+
"github.com/Hanalyx/openwatch/internal/eventbus"
27+
"github.com/Hanalyx/openwatch/internal/queue"
28+
)
29+
30+
// RenderJobType is the job_type for an async report-face render.
31+
const RenderJobType = "report.render"
32+
33+
// RenderPayload is the job payload: which snapshot to render faces for.
34+
type RenderPayload struct {
35+
SnapshotID uuid.UUID `json:"snapshot_id"`
36+
}
37+
38+
// attestationFaces are the bulk faces rendered asynchronously for an
39+
// attestation report, in render order.
40+
var attestationFaces = []string{FaceCSV, FaceOSCALSAR, FacePDF}
41+
42+
// markFacesPending inserts a 'pending' report_faces row for each face that
43+
// does not already exist, so the lifecycle status is genuine (the render
44+
// worker flips them to 'ready'). ON CONFLICT DO NOTHING leaves an already
45+
// rendered ('ready') face untouched.
46+
func (s *Service) markFacesPending(ctx context.Context, snapshotID uuid.UUID, faces []string) error {
47+
for _, face := range faces {
48+
mediaType := "application/octet-stream"
49+
switch face {
50+
case FaceCSV:
51+
mediaType = "text/csv"
52+
case FaceOSCALSAR, FaceJSON:
53+
mediaType = "application/json"
54+
case FacePDF:
55+
mediaType = "application/pdf"
56+
}
57+
if _, err := s.pool.Exec(ctx, `
58+
INSERT INTO report_faces (snapshot_id, face, media_type, size_bytes, status)
59+
VALUES ($1, $2, $3, 0, 'pending')
60+
ON CONFLICT (snapshot_id, face) DO NOTHING`,
61+
snapshotID, face, mediaType); err != nil {
62+
return fmt.Errorf("report: mark face pending: %w", err)
63+
}
64+
}
65+
return nil
66+
}
67+
68+
// enqueueRender marks the attestation's bulk faces pending and enqueues a
69+
// render job. Best-effort: a failure to enqueue is logged but does not fail
70+
// Generate, because Export still renders each face lazily on first
71+
// download (the async path is an optimization, not a correctness gate).
72+
func (s *Service) enqueueRender(ctx context.Context, snapshotID uuid.UUID) {
73+
if err := s.markFacesPending(ctx, snapshotID, attestationFaces); err != nil {
74+
slog.WarnContext(ctx, "report: mark faces pending failed",
75+
slog.String("snapshot_id", snapshotID.String()), slog.String("error", err.Error()))
76+
return
77+
}
78+
if _, err := queue.Enqueue(ctx, s.pool, RenderJobType, RenderPayload{SnapshotID: snapshotID}); err != nil {
79+
slog.WarnContext(ctx, "report: enqueue render job failed",
80+
slog.String("snapshot_id", snapshotID.String()), slog.String("error", err.Error()))
81+
}
82+
}
83+
84+
// RenderProcessor renders a report's faces for a claimed report.render job
85+
// and publishes ReportReady. It is registered on the in-process worker via
86+
// WithReportProcessor; its ProcessJob signature matches the worker's other
87+
// processors.
88+
type RenderProcessor struct {
89+
svc *Service
90+
bus *eventbus.Bus
91+
}
92+
93+
// NewRenderProcessor builds the processor over a report Service (for
94+
// Export) and an event bus (to publish ReportReady). A nil bus renders the
95+
// faces but publishes nothing.
96+
func NewRenderProcessor(svc *Service, bus *eventbus.Bus) *RenderProcessor {
97+
return &RenderProcessor{svc: svc, bus: bus}
98+
}
99+
100+
// ProcessJob renders every face that applies to the report's kind (warming
101+
// the cache and flipping each 'pending' row to 'ready' via Export's upsert)
102+
// and publishes a ReportReady event. A render error fails the job so it can
103+
// be retried; faces already rendered are idempotent (deterministic bytes).
104+
func (p *RenderProcessor) ProcessJob(ctx context.Context, j *queue.Job) {
105+
var payload RenderPayload
106+
if err := json.Unmarshal(j.Payload, &payload); err != nil {
107+
_ = queue.Fail(ctx, p.svc.pool, j.ID, fmt.Sprintf("report.render: payload decode: %v", err))
108+
return
109+
}
110+
if payload.SnapshotID == uuid.Nil {
111+
_ = queue.Fail(ctx, p.svc.pool, j.ID, "report.render: payload snapshot_id missing")
112+
return
113+
}
114+
115+
rep, err := p.svc.Get(ctx, payload.SnapshotID)
116+
if err != nil {
117+
// An unknown snapshot (e.g. deleted before the job ran) is terminal,
118+
// not retryable.
119+
_ = queue.Fail(ctx, p.svc.pool, j.ID, fmt.Sprintf("report.render: load snapshot: %v", err))
120+
return
121+
}
122+
123+
faces := facesForKind(rep.Kind)
124+
rendered := make([]string, 0, len(faces))
125+
for _, face := range faces {
126+
if _, _, err := p.svc.Export(ctx, payload.SnapshotID, face); err != nil {
127+
// Mark the face failed so the lifecycle reflects reality, then
128+
// fail the job for retry.
129+
_, _ = p.svc.pool.Exec(ctx,
130+
`UPDATE report_faces SET status = 'failed' WHERE snapshot_id = $1 AND face = $2 AND status = 'pending'`,
131+
payload.SnapshotID, face)
132+
_ = queue.Fail(ctx, p.svc.pool, j.ID, fmt.Sprintf("report.render: face %s: %v", face, err))
133+
return
134+
}
135+
rendered = append(rendered, face)
136+
}
137+
138+
if p.bus != nil {
139+
p.bus.Publish(ctx, eventbus.ReportReady{
140+
SnapshotID: rep.ID,
141+
ReportKind: string(rep.Kind),
142+
Faces: rendered,
143+
GeneratedBy: rep.GeneratedBy,
144+
OccurredAt: time.Now().UTC(),
145+
})
146+
}
147+
148+
if err := queue.Complete(ctx, p.svc.pool, j.ID); err != nil {
149+
slog.WarnContext(ctx, "report.render: complete failed",
150+
slog.String("job_id", j.ID.String()), slog.String("error", err.Error()))
151+
}
152+
}
153+
154+
// facesForKind lists the faces an async render produces for a report kind:
155+
// the bulk faces for an attestation; just the (cheap) PDF for an executive
156+
// so the executive path can also notify if ever enqueued. The JSON face is
157+
// always available lazily and is not pre-rendered.
158+
func facesForKind(kind Kind) []string {
159+
switch kind {
160+
case KindAttestation:
161+
return attestationFaces
162+
case KindExecutive:
163+
return []string{FacePDF}
164+
default:
165+
return nil
166+
}
167+
}

0 commit comments

Comments
 (0)