Skip to content

Commit 36b8d68

Browse files
feat: background worker pools + multi-entrypoint
Lifts the remaining constraints on background workers: - Pools: named bg workers can now declare num > 1 (pool of threads per worker) and max_threads > 1. The Caddyfile-level rejections in unmarshalWorker are dropped. - Per-thread stop-pipe: the write fd moved from worker to handler. Each thread in a pool gets its own stop pipe, so drain() can wake them independently. Pools no longer overwrite one another's fd through the shared worker struct. - Multi-entrypoint: multiple named bg workers in the same scope can share the same entrypoint file. Drops the filename-uniqueness rejection in newWorker (it was already skipped via allowPathMatching, this lifts the last Caddyfile-level path check that prevented two named bg workers pointing at the same fixture). Tests: - TestBackgroundWorkerPool: declares num=3, asserts 3 distinct sentinel files appear (each thread tempnam()'s a unique file). - TestBackgroundWorkerMultiEntrypoint: two named bg workers share one entrypoint file; both Init successfully and produce sentinels.
1 parent 8aa81a2 commit 36b8d68

5 files changed

Lines changed: 131 additions & 21 deletions

File tree

background_worker_pool_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package frankenphp_test
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
"testing"
7+
"time"
8+
9+
"github.com/dunglas/frankenphp"
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
// countSentinels returns the number of files under dir.
15+
func countSentinels(t *testing.T, dir string) int {
16+
t.Helper()
17+
entries, err := os.ReadDir(dir)
18+
if err != nil {
19+
t.Fatalf("read sentinel dir %q: %v", dir, err)
20+
}
21+
return len(entries)
22+
}
23+
24+
// TestBackgroundWorkerPool declares a named bg worker with num=3 (pool of
25+
// three threads). Each thread touches a unique sentinel under
26+
// BG_SENTINEL_DIR via tempnam(), so the test can assert that all three
27+
// pool threads booted independently. Covers the lifted num>1 +
28+
// max_threads>1 constraints and the per-thread stop pipe.
29+
func TestBackgroundWorkerPool(t *testing.T) {
30+
cwd, _ := os.Getwd()
31+
testDataDir := cwd + "/testdata/"
32+
33+
tmp := t.TempDir()
34+
35+
require.NoError(t, frankenphp.Init(
36+
frankenphp.WithWorkers("pool-worker", testDataDir+"background-worker-pool.php", 3,
37+
frankenphp.WithWorkerBackground(),
38+
frankenphp.WithWorkerMaxThreads(3),
39+
frankenphp.WithWorkerEnv(map[string]string{"BG_SENTINEL_DIR": tmp}),
40+
),
41+
frankenphp.WithNumThreads(6),
42+
))
43+
t.Cleanup(frankenphp.Shutdown)
44+
45+
require.Eventually(t, func() bool {
46+
return countSentinels(t, tmp) == 3
47+
}, 5*time.Second, 25*time.Millisecond,
48+
"expected 3 distinct pool sentinels, got %d", countSentinels(t, tmp))
49+
}
50+
51+
// TestBackgroundWorkerMultiEntrypoint declares two named bg workers that
52+
// share the same entrypoint file. Each gets its own registry entry, so
53+
// both Init successfully (no filename-collision rejection) and both
54+
// produce sentinels.
55+
func TestBackgroundWorkerMultiEntrypoint(t *testing.T) {
56+
cwd, _ := os.Getwd()
57+
testDataDir := cwd + "/testdata/"
58+
59+
tmp := t.TempDir()
60+
61+
require.NoError(t, frankenphp.Init(
62+
frankenphp.WithWorkers("shared-a", testDataDir+"background-worker-named.php", 1,
63+
frankenphp.WithWorkerBackground(),
64+
frankenphp.WithWorkerEnv(map[string]string{"BG_SENTINEL_DIR": tmp}),
65+
),
66+
frankenphp.WithWorkers("shared-b", testDataDir+"background-worker-named.php", 1,
67+
frankenphp.WithWorkerBackground(),
68+
frankenphp.WithWorkerEnv(map[string]string{"BG_SENTINEL_DIR": tmp}),
69+
),
70+
frankenphp.WithNumThreads(5),
71+
))
72+
t.Cleanup(frankenphp.Shutdown)
73+
74+
for _, name := range []string{"shared-a", "shared-b"} {
75+
assert.Eventually(t, func() bool {
76+
_, err := os.Stat(filepath.Join(tmp, name))
77+
return err == nil
78+
}, 5*time.Second, 25*time.Millisecond,
79+
"shared bg worker %q did not touch its sentinel", name)
80+
}
81+
}

caddy/workerconfig.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,10 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) {
164164
}
165165

166166
if wc.Background {
167-
if wc.Num > 1 {
168-
return wc, d.Err(`"num" > 1 is not yet supported for background workers`)
169-
}
170-
// For named bg workers, max_threads is threads-per-worker (>1 not
171-
// yet supported). For the catch-all (no name), it's the cap on
172-
// lazy-started instance count, which is a legitimate user knob.
173-
if wc.Name != "" && wc.MaxThreads > 1 {
174-
return wc, d.Err(`"max_threads" > 1 is not yet supported for named background workers`)
175-
}
167+
// Named bg workers: num and max_threads mean "threads per worker"
168+
// (pool size and pool cap). Catch-all bg workers: num is the pool
169+
// size of the placeholder instance (usually 1), max_threads caps
170+
// how many distinct names can be lazy-started.
176171
if len(wc.MatchPath) != 0 {
177172
return wc, d.Err(`"match" is not supported for background workers`)
178173
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
// Long-lived bg worker: disable PHP max_execution_time so the 30s default
4+
// cannot interrupt the stream_select park. The C side calls
5+
// zend_unset_timeout() too, but the belt-and-suspenders here covers PHP
6+
// builds where that path does not fully disarm the timer.
7+
set_time_limit(0);
8+
9+
// Pool worker: num > 1 means multiple threads share this worker name.
10+
// Each thread runs in its own ZTS context, but getmypid() returns the
11+
// shared process pid. We use tempnam() to atomically create a unique
12+
// file per thread so the test can count N distinct booted threads.
13+
if (!empty($_SERVER['BG_SENTINEL_DIR'])) {
14+
@tempnam($_SERVER['BG_SENTINEL_DIR'], 'pool');
15+
}
16+
17+
$stream = frankenphp_get_worker_handle();
18+
if ($stream !== null) {
19+
$read = [$stream];
20+
$write = null;
21+
$except = null;
22+
stream_select($read, $write, $except, null);
23+
}

threadbackgroundworker.go

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"log/slog"
99
"path/filepath"
1010
"strings"
11+
"sync/atomic"
1112
"time"
1213

1314
"github.com/dunglas/frankenphp/internal/state"
@@ -26,6 +27,11 @@ type backgroundWorkerThread struct {
2627
dummyFrankenPHPContext *frankenPHPContext
2728
dummyContext context.Context
2829
failureCount int
30+
31+
// stopFdWrite is the write end of THIS thread's stop pipe. Per-thread
32+
// so pool workers (num > 1) can each be drained independently; the
33+
// bg script uses the read end via frankenphp_get_worker_handle().
34+
stopFdWrite atomic.Int32
2935
}
3036

3137
func convertToBackgroundWorkerThread(thread *phpThread, worker *worker) {
@@ -34,6 +40,7 @@ func convertToBackgroundWorkerThread(thread *phpThread, worker *worker) {
3440
thread: thread,
3541
worker: worker,
3642
}
43+
handler.stopFdWrite.Store(-1)
3744
thread.setHandler(handler)
3845
worker.attachThread(thread)
3946
}
@@ -56,9 +63,10 @@ func (handler *backgroundWorkerThread) context() context.Context {
5663
// drain is called by drainWorkerThreads (and thread.shutdown) right before
5764
// drainChan is closed. We close the stop-pipe's write end so the PHP worker
5865
// script, which is typically parked in stream_select on the read end, wakes
59-
// up and can finish its loop gracefully.
66+
// up and can finish its loop gracefully. Per-thread fd so pool workers
67+
// drain their threads independently.
6068
func (handler *backgroundWorkerThread) drain() {
61-
if fd := handler.worker.backgroundStopFdWrite.Swap(-1); fd >= 0 {
69+
if fd := handler.stopFdWrite.Swap(-1); fd >= 0 {
6270
C.frankenphp_worker_close_fd(C.int(fd))
6371
}
6472
}
@@ -103,7 +111,7 @@ func (handler *backgroundWorkerThread) setupScript() {
103111

104112
opts := append([]RequestOption(nil), handler.worker.requestOptions...)
105113
C.frankenphp_set_worker_name(handler.thread.pinCString(strings.TrimPrefix(handler.worker.name, "m#")), C._Bool(true))
106-
handler.worker.backgroundStopFdWrite.Store(int32(C.frankenphp_worker_get_stop_fd_write()))
114+
handler.stopFdWrite.Store(int32(C.frankenphp_worker_get_stop_fd_write()))
107115

108116
fc, err := newDummyContext(
109117
filepath.Base(handler.worker.fileName),
@@ -128,7 +136,7 @@ func (handler *backgroundWorkerThread) setupScript() {
128136
}
129137

130138
func (handler *backgroundWorkerThread) afterScriptExecution(exitStatus int) {
131-
handler.worker.backgroundStopFdWrite.Store(-1)
139+
handler.stopFdWrite.Store(-1)
132140
worker := handler.worker
133141
handler.dummyFrankenPHPContext = nil
134142
handler.dummyContext = nil

worker.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ type worker struct {
3636
isBackgroundWorker bool
3737
backgroundScope BackgroundScope
3838
backgroundRegistry *backgroundWorkerRegistry
39-
backgroundStopFdWrite atomic.Int32 // write end of the stop pipe, -1 if not set
4039
}
4140

4241
var (
@@ -147,13 +146,16 @@ func newWorker(o workerOpt) (*worker, error) {
147146
// they don't handle HTTP requests.
148147
allowPathMatching := !strings.HasPrefix(o.name, "m#") && !o.isBackgroundWorker
149148

150-
if w := workersByPath[absFileName]; w != nil && allowPathMatching {
151-
return w, fmt.Errorf("two workers cannot have the same filename: %q", absFileName)
152-
}
153-
// Background workers are resolved through per-scope lookups, not the
154-
// global workersByName map; the same user-facing name can appear in
155-
// multiple php_server scopes without collision.
149+
// Background workers are matched only by name, never by path. Multiple
150+
// named bg workers can share an entrypoint file; uniqueness is enforced
151+
// per-scope via backgroundLookups, not at the global path level.
156152
if !o.isBackgroundWorker {
153+
if w := workersByPath[absFileName]; w != nil && allowPathMatching {
154+
return w, fmt.Errorf("two workers cannot have the same filename: %q", absFileName)
155+
}
156+
// Background workers are resolved through per-scope lookups, not the
157+
// global workersByName map; the same user-facing name can appear in
158+
// multiple php_server scopes without collision.
157159
if w := workersByName[o.name]; w != nil {
158160
return w, fmt.Errorf("two workers cannot have the same name: %q", o.name)
159161
}
@@ -180,7 +182,8 @@ func newWorker(o workerOpt) (*worker, error) {
180182
backgroundScope: o.backgroundScope,
181183
}
182184

183-
w.backgroundStopFdWrite.Store(-1)
185+
// The stop-pipe write fd is per-thread (handler field), not per-worker,
186+
// so each thread in a num > 1 pool can be drained independently.
184187

185188
w.configureMercure(&o)
186189

0 commit comments

Comments
 (0)