Skip to content

Commit e2673b8

Browse files
feat(worker): bounded scan-concurrency pool (scan N hosts at once) (#592)
The in-process worker drained the scan queue strictly one job at a time: dequeue -> run the full ~11-min scan -> dequeue next. A fleet sweep was therefore serial (96 hosts x 11 min ~= 17 h), even though the design already supports cross-host concurrency (the per-host advisory lock + SKIP LOCKED). Run up to [server].scan_concurrency claim/process loops at once (default 4, clamped >=1). Each loop claims a disjoint job via SKIP LOCKED, so N distinct hosts scan in parallel; the per-host advisory lock still serializes same-host work, so no new locking is needed. At concurrency 8 a 96-host sweep drops from ~17 h to ~2 h. Within each host the single-connection/many-sessions model is unchanged. Spec system-job-queue v1.1.0 (C-07/AC-12). Test: a DB-backed integration test proves exactly N run at once and the (N+1)th waits (it fails if the loop is forced serial), plus a clamp unit test. Also corrects SCALING_GUIDE.md, which wrongly claimed 'serve does not drain the scan-job queue' (it does, via WithScanProcessor), and documents the knob in the shipped openwatch.toml.
1 parent 163ae8e commit e2673b8

7 files changed

Lines changed: 213 additions & 26 deletions

File tree

docs/guides/SCALING_GUIDE.md

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,44 @@ OpenWatch has two long-lived processes and one database:
1717

1818
| Component | What it does | How you scale it today |
1919
|-----------|--------------|------------------------|
20-
| `openwatch serve` | HTTPS API + embedded UI + in-process schedulers (liveness, intelligence, discovery) | Vertical: more CPU/RAM on the host. The process is stateless apart from PostgreSQL, so horizontal API replicas are possible in principle but not yet packaged (see "Not yet implemented"). |
21-
| `openwatch worker` | Drains the PostgreSQL scan-job queue and runs Kensa scans over SSH | Run more `openwatch worker` processes against the same database. The queue uses `SELECT ... FOR UPDATE SKIP LOCKED`, so multiple workers cooperate without double-claiming a job. |
20+
| `openwatch serve` | HTTPS API + embedded UI + in-process schedulers (liveness, intelligence, discovery) **and an in-process worker that drains the scan-job queue** | Raise `[server].scan_concurrency` (how many scans run at once in this process); then vertical CPU/RAM. Stateless apart from PostgreSQL. |
21+
| `openwatch worker` | An **optional, additional** process that also drains the scan-job queue and runs Kensa scans over SSH | Run one or more for extra/off-box capacity. The queue uses `SELECT ... FOR UPDATE SKIP LOCKED`, so the serve worker and any `openwatch worker` processes cooperate without double-claiming a job. |
2222
| PostgreSQL | All state: hosts, scans, transactions, audit events, queue | Vertical first (CPU, RAM, faster disk), then tune `max_connections` and the OpenWatch pool size. |
2323

24-
The scan worker is a separate process from the API server. `openwatch serve`
25-
runs the liveness, intelligence, and discovery schedulers in-process, but it
26-
does **not** drain the scan-job queue. You must run `openwatch worker`
27-
separately for scans to execute. Verify the split in `cmd/openwatch/main.go`
28-
(`cmdServe`) and `cmd/openwatch/worker.go` (`cmdWorker`).
24+
`openwatch serve` runs an in-process worker that **does** drain the scan-job
25+
queue — the single-binary deployment scans with no extra process. By default it
26+
runs **`scan_concurrency` (4) scans concurrently** (`internal/worker/worker.go`,
27+
wired in `internal/server/server.go`). A separate `openwatch worker` is
28+
optional, for additional or off-box capacity.
2929

3030
## Scaling the scan workers
3131

3232
Scans are the most resource-intensive work OpenWatch does: each one opens an SSH
3333
session to a target host and runs Kensa's native YAML checks. Worker throughput
3434
is the usual first bottleneck.
3535

36+
### Scan concurrency (the first knob to turn)
37+
38+
The in-process worker runs `[server].scan_concurrency` scan loops at once
39+
(default `4`). Each loop independently claims a job with `SKIP LOCKED`, so up to
40+
that many **different hosts** scan in parallel; a per-host advisory lock still
41+
prevents two scans of the **same** host from overlapping. This is the simplest
42+
way to clear a large queue — one config value, no extra processes:
43+
44+
```toml
45+
# /etc/openwatch/openwatch.toml
46+
[server]
47+
scan_concurrency = 8
48+
```
49+
50+
Restart `openwatch` to apply. Sizing: scans are SSH/IO-bound (they spend most of
51+
their time waiting on the remote host), so concurrency can comfortably exceed
52+
CPU core count. Mind two ceilings — the PostgreSQL pool (`[database].max_connections`
53+
/ pool size: each in-flight scan uses a connection plus the advisory-lock
54+
transaction) and how many simultaneous SSH sessions your targets and network
55+
tolerate. `8``16` is a reasonable range for a few dozen to a few hundred hosts;
56+
set it to `1` to restore strictly one-at-a-time draining.
57+
3658
### Run more worker processes
3759

3860
The scan queue is PostgreSQL-native and claims one job at a time per worker with

internal/config/config.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,11 @@ type ServerConfig struct {
4646
Listen string `toml:"listen"`
4747
TLSCert string `toml:"tls_cert"`
4848
TLSKey string `toml:"tls_key"`
49+
// ScanConcurrency is how many scan/job loops the in-process worker runs
50+
// at once. The PostgreSQL queue (SKIP LOCKED) and the per-host advisory
51+
// lock make concurrent draining safe; this only bounds the fan-out. Set
52+
// to 1 to restore strictly-serial draining. Default 4.
53+
ScanConcurrency int `toml:"scan_concurrency"`
4954
}
5055

5156
// DatabaseConfig governs the PostgreSQL connection.
@@ -65,9 +70,10 @@ type LoggingConfig struct {
6570
func Defaults() *Config {
6671
return &Config{
6772
Server: ServerConfig{
68-
Listen: "0.0.0.0:8443",
69-
TLSCert: "/etc/openwatch/tls/cert.pem",
70-
TLSKey: "/etc/openwatch/tls/key.pem",
73+
Listen: "0.0.0.0:8443",
74+
TLSCert: "/etc/openwatch/tls/cert.pem",
75+
TLSKey: "/etc/openwatch/tls/key.pem",
76+
ScanConcurrency: 4,
7177
},
7278
Database: DatabaseConfig{
7379
DSN: "postgres://openwatch@localhost/openwatch?sslmode=disable",

internal/server/server.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,14 @@ func New(cfg *config.Config, pool *pgxpool.Pool) *Server {
303303
},
304304
}
305305

306-
// Stage-0 in-process worker that drains diagnostics.test_job from the
307-
// queue. Started by Run, stopped on shutdown. Spec
306+
// In-process worker that drains the job queue (diagnostics, discovery,
307+
// and — via WithScanProcessor — scans). Started by Run, stopped on
308+
// shutdown. ScanConcurrency fans it out so a fleet of queued scans does
309+
// not drain one host at a time (system-job-queue C-07). Spec
308310
// release-stage-0-signoff AC-10.
309311
var wkr *worker.Worker
310312
if pool != nil {
311-
wkr = worker.New(pool)
313+
wkr = worker.New(pool).WithConcurrency(cfg.Server.ScanConcurrency)
312314
}
313315
return &Server{cfg: cfg, router: r, srv: srv, cm: cm, wkr: wkr, handlers: apiHandlers}
314316
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// @spec system-job-queue
2+
//
3+
// Bounded-concurrency drain: the in-process worker runs up to N claim/process
4+
// loops at once so a fleet of queued jobs does not drain one at a time. SKIP
5+
// LOCKED gives each loop a disjoint job; the per-host advisory lock (covered by
6+
// scan_worker_test.go) still serializes same-host scans.
7+
8+
package worker
9+
10+
import (
11+
"context"
12+
"testing"
13+
"time"
14+
15+
"github.com/Hanalyx/openwatch/internal/correlation"
16+
"github.com/Hanalyx/openwatch/internal/db/dbtest"
17+
"github.com/Hanalyx/openwatch/internal/queue"
18+
"github.com/google/uuid"
19+
)
20+
21+
// blockingDiscovery signals each invocation on started, then blocks until
22+
// release is closed. host.discovery is a convenient vehicle: it has no per-host
23+
// lock, so it isolates the worker's fan-out from the scan path's serialization.
24+
type blockingDiscovery struct {
25+
started chan struct{}
26+
release chan struct{}
27+
}
28+
29+
func (b *blockingDiscovery) RunDiscovery(_ context.Context, _ uuid.UUID) error {
30+
b.started <- struct{}{}
31+
<-b.release
32+
return nil
33+
}
34+
35+
// @ac AC-12
36+
// AC-12: with WithConcurrency(N) and N+1 blocking jobs, exactly N run at once
37+
// and the (N+1)th waits for a free loop.
38+
func TestWorker_BoundedConcurrency(t *testing.T) {
39+
t.Run("system-job-queue/AC-12", func(t *testing.T) {
40+
pool := dbtest.Pool(t)
41+
const n = 3
42+
d := &blockingDiscovery{started: make(chan struct{}, 16), release: make(chan struct{})}
43+
w := New(pool).WithDiscovery(d).WithConcurrency(n)
44+
45+
// Enqueue N+1 host.discovery jobs for distinct hosts.
46+
for i := 0; i < n+1; i++ {
47+
ctx := correlation.Set(context.Background(), correlation.Generate("test"))
48+
// Pass the map directly — Enqueue marshals it (passing pre-marshaled
49+
// bytes would double-encode into a JSON string).
50+
body := map[string]string{"host_id": uuid.NewString()}
51+
if _, err := queue.Enqueue(ctx, pool, "host.discovery", body); err != nil {
52+
t.Fatalf("enqueue %d: %v", i, err)
53+
}
54+
}
55+
56+
w.Start(context.Background())
57+
defer w.Stop()
58+
59+
// Exactly N jobs enter RunDiscovery concurrently.
60+
for i := 0; i < n; i++ {
61+
select {
62+
case <-d.started:
63+
case <-time.After(5 * time.Second):
64+
t.Fatalf("only %d/%d jobs started concurrently", i, n)
65+
}
66+
}
67+
// The (N+1)th must NOT have started — fan-out is bounded at N.
68+
select {
69+
case <-d.started:
70+
t.Fatal("more than N jobs ran at once — concurrency not bounded")
71+
case <-time.After(400 * time.Millisecond):
72+
// good: bounded
73+
}
74+
75+
// Free the in-flight loops; the (N+1)th now claims a freed loop.
76+
close(d.release)
77+
select {
78+
case <-d.started:
79+
case <-time.After(5 * time.Second):
80+
t.Fatal("the (N+1)th job never ran after release — a freed loop did not pick it up")
81+
}
82+
})
83+
}
84+
85+
// @ac AC-12
86+
// AC-12 (clamp): a concurrency < 1 clamps to 1 so the default worker stays
87+
// strictly serial; a positive value is kept.
88+
func TestWorker_WithConcurrencyClamps(t *testing.T) {
89+
t.Run("system-job-queue/AC-12", func(t *testing.T) {
90+
cases := map[int]int{0: 1, -5: 1, 1: 1, 8: 8}
91+
for in, want := range cases {
92+
if got := New(nil).WithConcurrency(in).concurrency; got != want {
93+
t.Errorf("WithConcurrency(%d) = %d, want %d", in, got, want)
94+
}
95+
}
96+
// New defaults to serial.
97+
if got := New(nil).concurrency; got != 1 {
98+
t.Errorf("New default concurrency = %d, want 1", got)
99+
}
100+
})
101+
}

internal/worker/worker.go

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,23 +42,44 @@ type HostDiscoveryRunner interface {
4242

4343
// Worker drains pending jobs from job_queue. One Worker per process is
4444
// enough for Stage 0; multi-worker setups are Stage 2.
45+
//
46+
// The worker can run several claim/process loops concurrently
47+
// (WithConcurrency) so a fleet of queued scans does not drain one host at a
48+
// time; the queue's SKIP LOCKED claim and the scan path's per-host advisory
49+
// lock keep concurrent draining safe (system-job-queue C-07).
4550
type Worker struct {
46-
pool *pgxpool.Pool
47-
stop chan struct{}
48-
wg sync.WaitGroup
49-
discovery HostDiscoveryRunner
50-
scanProc *ScanWorker
51+
pool *pgxpool.Pool
52+
stop chan struct{}
53+
wg sync.WaitGroup
54+
discovery HostDiscoveryRunner
55+
scanProc *ScanWorker
56+
concurrency int
5157
}
5258

5359
// New constructs a Worker bound to the given pool. Call Start to begin
54-
// the drain loop and Stop to exit cleanly.
60+
// the drain loop and Stop to exit cleanly. Defaults to one (serial) loop;
61+
// call WithConcurrency to fan out.
5562
func New(pool *pgxpool.Pool) *Worker {
5663
return &Worker{
57-
pool: pool,
58-
stop: make(chan struct{}),
64+
pool: pool,
65+
stop: make(chan struct{}),
66+
concurrency: 1,
5967
}
6068
}
6169

70+
// WithConcurrency sets how many claim/process loops run at once. A value < 1
71+
// clamps to 1 (strictly serial). Each loop independently claims jobs via
72+
// SKIP LOCKED, so N loops process up to N distinct hosts in parallel while the
73+
// per-host advisory lock still serializes same-host work. Spec
74+
// system-job-queue C-07.
75+
func (w *Worker) WithConcurrency(n int) *Worker {
76+
if n < 1 {
77+
n = 1
78+
}
79+
w.concurrency = n
80+
return w
81+
}
82+
6283
// WithDiscovery registers the OS Discovery runner. When set, the
6384
// worker processes host.discovery jobs by calling Discover; nil keeps
6485
// the legacy behavior (host.discovery fails as unsupported).
@@ -81,11 +102,18 @@ func (w *Worker) WithScanProcessor(sw *ScanWorker) *Worker {
81102
return w
82103
}
83104

84-
// Start kicks off the drain loop on a background goroutine. Returns
85-
// immediately. Safe to call once per Worker.
105+
// Start kicks off w.concurrency drain loops on background goroutines. Returns
106+
// immediately. Safe to call once per Worker. Each loop claims jobs
107+
// independently (SKIP LOCKED), so up to w.concurrency jobs run at once.
86108
func (w *Worker) Start(ctx context.Context) {
87-
w.wg.Add(1)
88-
go w.loop(ctx)
109+
n := w.concurrency
110+
if n < 1 {
111+
n = 1
112+
}
113+
w.wg.Add(n)
114+
for i := 0; i < n; i++ {
115+
go w.loop(ctx)
116+
}
89117
}
90118

91119
// Stop signals the loop to exit and waits for the in-flight drain (if

packaging/common/openwatch.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,11 @@
1111
listen = "0.0.0.0:8443"
1212
tls_cert = "/etc/openwatch/tls/cert.pem"
1313
tls_key = "/etc/openwatch/tls/key.pem"
14+
# How many compliance scans run at once in this process (different hosts in
15+
# parallel; same-host scans never overlap). Default 4. Raise for large fleets,
16+
# minding the DB pool and how many SSH sessions your targets tolerate; set 1
17+
# for strictly one-at-a-time. See docs/guides/SCALING_GUIDE.md.
18+
scan_concurrency = 4
1419

1520
[database]
1621
# Replace with a real DSN. The package does not provision Postgres.

specs/system/job-queue.spec.yaml

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
spec:
22
id: system-job-queue
33
title: PostgreSQL job queue with correlation propagation
4-
version: "1.0.0"
4+
version: "1.1.0"
55
status: approved
66
tier: 1
77

@@ -62,6 +62,20 @@ spec:
6262
description: Forbidigo lint MUST reject raw "INSERT INTO job_queue" outside internal/queue/ and "http.DefaultClient" outside internal/httpclient/
6363
type: technical
6464
enforcement: error
65+
- id: C-07
66+
description: >-
67+
The in-process worker MUST be able to run up to a configured number of
68+
claim/process loops CONCURRENTLY (ServerConfig.ScanConcurrency, default
69+
4, clamped to >= 1), so a fleet of queued scans does not drain strictly
70+
one host at a time. Concurrency is safe by construction and adds NO new
71+
locking: SKIP LOCKED (C-04) guarantees the N loops claim disjoint jobs,
72+
and the scan path's per-host advisory lock (system-worker-subcommand
73+
C-09) still serializes any two jobs for the SAME host. Each loop keeps
74+
the existing per-job contract (a fresh correlation context per job,
75+
C-02). The default is conservative; operators tune it up for large
76+
fleets or down to 1 to restore strictly-serial draining.
77+
type: technical
78+
enforcement: error
6579

6680
acceptance_criteria:
6781
- id: AC-01
@@ -105,3 +119,12 @@ spec:
105119
description: Lint forbids raw "INSERT INTO job_queue" outside internal/queue/ and "http.DefaultClient" outside internal/httpclient/.
106120
priority: high
107121
references_constraints: [C-06]
122+
- id: AC-12
123+
description: >-
124+
With Worker.WithConcurrency(N) and N pending jobs whose processing
125+
blocks, the worker runs exactly N jobs concurrently and holds the
126+
(N+1)th pending until one finishes (bounded fan-out). Stop waits for all
127+
in-flight loops to drain. WithConcurrency clamps a value < 1 up to 1, so
128+
the default-1 worker stays strictly serial.
129+
priority: critical
130+
references_constraints: [C-07]

0 commit comments

Comments
 (0)