Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion cmd/openwatch/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,16 @@ func cmdServe(cfg *config.Config, _ []string, stdout, stderr *os.File) int {
slog.String("key_id", reportSigner.KeyID()))
}

// Report service + async render processor. The service enqueues a
// report.render job for each generated attestation (WithAsyncRender);
// the processor, registered on the in-process worker, renders the bulk
// faces and publishes ReportReady on the bus.
reportSvc := report.NewService(pool).
WithGroups(group.NewService(pool)).
WithSigner(reportSigner).
WithAsyncRender()
reportRenderProc := report.NewRenderProcessor(reportSvc, bus)

srv := server.New(cfg, pool).
WithConnectivityConfig(cfgStore, liveSvc).
WithDiscovery(discoSvc).
Expand All @@ -669,7 +679,8 @@ func cmdServe(cfg *config.Config, _ []string, stdout, stderr *os.File) int {
WithExceptions(exceptionSvc).
WithRemediation(remediationSvc).
WithGroups(group.NewService(pool)).
WithReports(report.NewService(pool).WithGroups(group.NewService(pool)).WithSigner(reportSigner)).
WithReports(reportSvc).
WithReportWorker(reportRenderProc).
WithScanResults(scanresult.NewReader(pool)).
WithNotifications(notifSvc)
runErr := srv.Run(ctx)
Expand Down
17 changes: 11 additions & 6 deletions docs/engineering/reports_design.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,13 +553,18 @@ framework-lensed), so the PDF stays bounded. Cached in `report_faces` (face
`pdf`) like the others. Spec: `api-reports` v1.9.0 (C-15 / AC-21; C-10
updated: pdf kind-dispatched, not executive-only).

**B3a — Async generation + report.ready.** *(REMAINING.)* Fleet attestation
generation (the bulk query + SAR/CSV/PDF render) moves to the job queue: a
`FleetReportJobType` + payload + a worker processor that flips
`report_faces` status `pending → ready` and publishes
**B3a — Async generation + report.ready.** *(SHIPPED 2026-06-21, PR #646.)*
Generating an attestation marks its bulk faces (`csv`, `oscal_sar`, `pdf`)
`pending` in `report_faces` and enqueues a `report.render` job
(`internal/report/job.go`), returning immediately (the executive summary
stays synchronous). A `RenderProcessor` registered on the in-process worker
(`worker.WithReportProcessor`) claims the job, renders each face via
`Export` (flipping `pending → ready`; a render error marks the face
`failed` and fails the job for retry), and publishes
`EventKindReportReady` on the event bus — **the in-app notification bell's
first producer** (closes that coupling). Spec: `system-report-faces`
(async + status), eventbus types.
first producer**. Async is an optimization, not a correctness gate: `Export`
stays the lazy fallback so a download before the job runs still renders
inline. Spec: `api-reports` v1.10.0 (C-16 / AC-22) + the new eventbus kind.

**B3c — Notification bell (frontend).** *(REMAINING; needs product input.)*
Turn the stubbed TopBar bell into a real consumer of `report.ready` (and
Expand Down
4 changes: 3 additions & 1 deletion internal/eventbus/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,8 @@ func TestEventKindEnum_HasExactlyTwoValues(t *testing.T) {
// HostDiscovered (system-host-discovery PR 1.1),
// IntelligenceEvent (system-os-intelligence PR 1.2),
// ScanCompleted (api-host-scan / scan foundation),
// RemediationCompleted (api-remediation execute/rollback).
// RemediationCompleted (api-remediation execute/rollback),
// ReportReady (api-reports async render, B3a).
expected := map[EventKind]bool{
EventKindHeartbeatPulse: false,
EventKindDriftDetected: false,
Expand All @@ -264,6 +265,7 @@ func TestEventKindEnum_HasExactlyTwoValues(t *testing.T) {
EventKindIntelligenceEvent: false,
EventKindScanCompleted: false,
EventKindRemediationCompleted: false,
EventKindReportReady: false,
}
if len(AllEventKinds) != len(expected) {
t.Errorf("AllEventKinds = %d, want %d", len(AllEventKinds), len(expected))
Expand Down
29 changes: 29 additions & 0 deletions internal/eventbus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ const (
// remediation queue + host compliance surfaces without polling.
// Spec api-remediation.
EventKindRemediationCompleted EventKind = "remediation.completed"

// EventKindReportReady is emitted by the report render worker once a
// report's bulk faces (CSV / OSCAL SAR / PDF for an attestation) have
// been rendered and cached 'ready' in report_faces. It is the first
// producer of the in-app notification bell: a generator who kicked off
// a fleet attestation learns the bundle is downloadable without polling.
// Spec api-reports.
EventKindReportReady EventKind = "report.ready"
)

// AllEventKinds is the closed set, in registration order. Spec AC-07's
Expand All @@ -70,6 +78,7 @@ var AllEventKinds = []EventKind{
EventKindIntelligenceEvent,
EventKindScanCompleted,
EventKindRemediationCompleted,
EventKindReportReady,
}

// Event is the contract every bus event satisfies. Implementations are
Expand Down Expand Up @@ -255,6 +264,26 @@ func (r RemediationCompleted) Kind() EventKind { return EventKindRemediationComp
// Timestamp satisfies Event.
func (r RemediationCompleted) Timestamp() time.Time { return r.CompletedAt }

// ReportReady is fired by the report render worker once a report's bulk
// faces are rendered and cached. SnapshotID is the report; ReportKind is
// the report kind (e.g. "attestation"); Faces lists the faces that were
// rendered ready (e.g. ["csv","oscal_sar","pdf"]). GeneratedBy carries the
// principal who generated the report so the notification can be routed to
// the right operator.
type ReportReady struct {
SnapshotID uuid.UUID
ReportKind string
Faces []string
GeneratedBy string
OccurredAt time.Time
}

// Kind satisfies Event.
func (r ReportReady) Kind() EventKind { return EventKindReportReady }

// Timestamp satisfies Event.
func (r ReportReady) Timestamp() time.Time { return r.OccurredAt }

// DefaultBufferSize is the per-subscriber channel buffer when
// SubscribeOptions.BufferSize is zero. Spec C-04.
const DefaultBufferSize = 1024
Expand Down
167 changes: 167 additions & 0 deletions internal/report/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package report

// Async report rendering: the bulk attestation faces (CSV / OSCAL SAR /
// PDF) can be expensive to assemble for a large fleet, so generating an
// attestation enqueues a render job instead of blocking the request. A
// worker claims the job, renders each face (warming the report_faces cache
// and flipping each face's row from 'pending' to 'ready'), then publishes
// a ReportReady event on the bus - the first producer of the in-app
// notification bell.
//
// Export stays the lazy fallback: a download that arrives before the job
// runs still renders the face inline (a cache miss), so async rendering is
// a warm-the-cache-and-notify optimization, never a correctness gate.
//
// Spec: api-reports.

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"time"

"github.com/google/uuid"

"github.com/Hanalyx/openwatch/internal/eventbus"
"github.com/Hanalyx/openwatch/internal/queue"
)

// RenderJobType is the job_type for an async report-face render.
const RenderJobType = "report.render"

// RenderPayload is the job payload: which snapshot to render faces for.
type RenderPayload struct {
SnapshotID uuid.UUID `json:"snapshot_id"`
}

// attestationFaces are the bulk faces rendered asynchronously for an
// attestation report, in render order.
var attestationFaces = []string{FaceCSV, FaceOSCALSAR, FacePDF}

// markFacesPending inserts a 'pending' report_faces row for each face that
// does not already exist, so the lifecycle status is genuine (the render
// worker flips them to 'ready'). ON CONFLICT DO NOTHING leaves an already
// rendered ('ready') face untouched.
func (s *Service) markFacesPending(ctx context.Context, snapshotID uuid.UUID, faces []string) error {
for _, face := range faces {
mediaType := "application/octet-stream"
switch face {
case FaceCSV:
mediaType = "text/csv"
case FaceOSCALSAR, FaceJSON:
mediaType = "application/json"
case FacePDF:
mediaType = "application/pdf"
}
if _, err := s.pool.Exec(ctx, `
INSERT INTO report_faces (snapshot_id, face, media_type, size_bytes, status)
VALUES ($1, $2, $3, 0, 'pending')
ON CONFLICT (snapshot_id, face) DO NOTHING`,
snapshotID, face, mediaType); err != nil {
return fmt.Errorf("report: mark face pending: %w", err)
}
}
return nil
}

// enqueueRender marks the attestation's bulk faces pending and enqueues a
// render job. Best-effort: a failure to enqueue is logged but does not fail
// Generate, because Export still renders each face lazily on first
// download (the async path is an optimization, not a correctness gate).
func (s *Service) enqueueRender(ctx context.Context, snapshotID uuid.UUID) {
if err := s.markFacesPending(ctx, snapshotID, attestationFaces); err != nil {
slog.WarnContext(ctx, "report: mark faces pending failed",
slog.String("snapshot_id", snapshotID.String()), slog.String("error", err.Error()))
return
}
if _, err := queue.Enqueue(ctx, s.pool, RenderJobType, RenderPayload{SnapshotID: snapshotID}); err != nil {
slog.WarnContext(ctx, "report: enqueue render job failed",
slog.String("snapshot_id", snapshotID.String()), slog.String("error", err.Error()))
}
}

// RenderProcessor renders a report's faces for a claimed report.render job
// and publishes ReportReady. It is registered on the in-process worker via
// WithReportProcessor; its ProcessJob signature matches the worker's other
// processors.
type RenderProcessor struct {
svc *Service
bus *eventbus.Bus
}

// NewRenderProcessor builds the processor over a report Service (for
// Export) and an event bus (to publish ReportReady). A nil bus renders the
// faces but publishes nothing.
func NewRenderProcessor(svc *Service, bus *eventbus.Bus) *RenderProcessor {
return &RenderProcessor{svc: svc, bus: bus}
}

// ProcessJob renders every face that applies to the report's kind (warming
// the cache and flipping each 'pending' row to 'ready' via Export's upsert)
// and publishes a ReportReady event. A render error fails the job so it can
// be retried; faces already rendered are idempotent (deterministic bytes).
func (p *RenderProcessor) ProcessJob(ctx context.Context, j *queue.Job) {
var payload RenderPayload
if err := json.Unmarshal(j.Payload, &payload); err != nil {
_ = queue.Fail(ctx, p.svc.pool, j.ID, fmt.Sprintf("report.render: payload decode: %v", err))
return
}
if payload.SnapshotID == uuid.Nil {
_ = queue.Fail(ctx, p.svc.pool, j.ID, "report.render: payload snapshot_id missing")
return
}

rep, err := p.svc.Get(ctx, payload.SnapshotID)
if err != nil {
// An unknown snapshot (e.g. deleted before the job ran) is terminal,
// not retryable.
_ = queue.Fail(ctx, p.svc.pool, j.ID, fmt.Sprintf("report.render: load snapshot: %v", err))
return
}

faces := facesForKind(rep.Kind)
rendered := make([]string, 0, len(faces))
for _, face := range faces {
if _, _, err := p.svc.Export(ctx, payload.SnapshotID, face); err != nil {
// Mark the face failed so the lifecycle reflects reality, then
// fail the job for retry.
_, _ = p.svc.pool.Exec(ctx,
`UPDATE report_faces SET status = 'failed' WHERE snapshot_id = $1 AND face = $2 AND status = 'pending'`,
payload.SnapshotID, face)
_ = queue.Fail(ctx, p.svc.pool, j.ID, fmt.Sprintf("report.render: face %s: %v", face, err))
return
}
rendered = append(rendered, face)
}

if p.bus != nil {
p.bus.Publish(ctx, eventbus.ReportReady{
SnapshotID: rep.ID,
ReportKind: string(rep.Kind),
Faces: rendered,
GeneratedBy: rep.GeneratedBy,
OccurredAt: time.Now().UTC(),
})
}

if err := queue.Complete(ctx, p.svc.pool, j.ID); err != nil {
slog.WarnContext(ctx, "report.render: complete failed",
slog.String("job_id", j.ID.String()), slog.String("error", err.Error()))
}
}

// facesForKind lists the faces an async render produces for a report kind:
// the bulk faces for an attestation; just the (cheap) PDF for an executive
// so the executive path can also notify if ever enqueued. The JSON face is
// always available lazily and is not pre-rendered.
func facesForKind(kind Kind) []string {
switch kind {
case KindAttestation:
return attestationFaces
case KindExecutive:
return []string{FacePDF}
default:
return nil
}
}
Loading
Loading