Skip to content

Commit 43fe99b

Browse files
feat: background worker pools + multi-entrypoint
Seventh (final in the split) step of the split suggested in #2287. Lifts the remaining constraints from the minimal path: - Pools: named bg workers can now declare num > 1 (pool of threads per worker) and max_threads > 1. Each thread in the pool shares the same backgroundWorkerState, so set_vars / get_vars are scoped per-worker-name, not per-thread. - Per-thread stop-pipe: the write fd moved from worker to handler. Each thread in a pool gets its own stop pipe, so drain() can wake them independently. Pools no longer overwrite one another's fd through the shared worker struct. - Multi-entrypoint: multiple named bg workers in the same scope can share the same entrypoint file. Each gets its own registry from buildBackgroundWorkerLookups, so they inherit independent env/watch/failure-policy options. Drops the filename-uniqueness check for bg workers (it was already skipped via allowPathMatching, but this step lifts the last Caddyfile-level rejection). - Caddyfile: `num > 1` and `max_threads > 1` on named background workers no longer error out. Catch-all semantics unchanged: max_threads caps lazy-started instance count. ## Tests - TestBackgroundWorkerPool: num=3 pool, verifies all threads boot and share state through set_vars/get_vars. - TestBackgroundWorkerMultiEntrypoint: two named bg workers sharing one entrypoint file resolve to distinct instances by name. All previous bg worker tests still pass.
1 parent 4ce621e commit 43fe99b

5 files changed

Lines changed: 137 additions & 16 deletions

File tree

background_worker_pool_test.go

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package frankenphp_test
2+
3+
import (
4+
"errors"
5+
"io"
6+
"net/http/httptest"
7+
"os"
8+
"testing"
9+
10+
"github.com/dunglas/frankenphp"
11+
"github.com/stretchr/testify/assert"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
// TestBackgroundWorkerPool declares a named bg worker with num=3 (pool
16+
// of three threads). All three threads should boot, share the same
17+
// registered backgroundWorkerState, and the reader can see the pool's
18+
// vars. This covers the lifted num>1 + max_threads>1 constraint.
19+
func TestBackgroundWorkerPool(t *testing.T) {
20+
cwd, _ := os.Getwd()
21+
testDataDir := cwd + "/testdata/"
22+
23+
require.NoError(t, frankenphp.Init(
24+
frankenphp.WithWorkers("pool-worker", testDataDir+"background-worker-pool.php", 3,
25+
frankenphp.WithWorkerBackground(),
26+
frankenphp.WithWorkerMaxThreads(3)),
27+
frankenphp.WithNumThreads(6),
28+
))
29+
t.Cleanup(frankenphp.Shutdown)
30+
31+
// Read the pool worker's vars via get_vars; all three threads share
32+
// the same state so we don't need to target a specific one. ensure()
33+
// waits for at least one pool thread's first set_vars so the eager
34+
// start can't race the reader.
35+
php := `<?php
36+
frankenphp_ensure_background_worker('pool-worker');
37+
$vars = frankenphp_get_vars('pool-worker');
38+
echo 'name=', $vars['name'] ?? 'MISSING', "\n";
39+
echo 'has-pid=', isset($vars['pid']) ? '1' : '0', "\n";
40+
`
41+
tmp := testDataDir + "pool-reader.php"
42+
require.NoError(t, os.WriteFile(tmp, []byte(php), 0644))
43+
t.Cleanup(func() { _ = os.Remove(tmp) })
44+
45+
req := httptest.NewRequest("GET", "http://example.com/pool-reader.php", nil)
46+
fr, err := frankenphp.NewRequestWithContext(req, frankenphp.WithRequestDocumentRoot(testDataDir, false))
47+
require.NoError(t, err)
48+
49+
w := httptest.NewRecorder()
50+
if err := frankenphp.ServeHTTP(w, fr); err != nil && !errors.As(err, &frankenphp.ErrRejected{}) {
51+
t.Fatalf("serve: %v", err)
52+
}
53+
body, _ := io.ReadAll(w.Result().Body)
54+
out := string(body)
55+
56+
assert.NotContains(t, out, "MISSING", "pool worker should have published its vars:\n"+out)
57+
assert.Contains(t, out, "name=pool-worker")
58+
assert.Contains(t, out, "has-pid=1")
59+
}
60+
61+
// TestBackgroundWorkerMultiEntrypoint declares two named bg workers that
62+
// share the same entrypoint file. Each registry is independent, so ensure()
63+
// + get_vars resolve to the correct instance by name.
64+
func TestBackgroundWorkerMultiEntrypoint(t *testing.T) {
65+
cwd, _ := os.Getwd()
66+
testDataDir := cwd + "/testdata/"
67+
68+
require.NoError(t, frankenphp.Init(
69+
frankenphp.WithWorkers("shared-a", testDataDir+"background-worker-named.php", 1,
70+
frankenphp.WithWorkerBackground()),
71+
frankenphp.WithWorkers("shared-b", testDataDir+"background-worker-named.php", 1,
72+
frankenphp.WithWorkerBackground()),
73+
frankenphp.WithNumThreads(5),
74+
))
75+
t.Cleanup(frankenphp.Shutdown)
76+
77+
read := func(name string) string {
78+
php := `<?php
79+
frankenphp_ensure_background_worker(` + "'" + name + "'" + `);
80+
$vars = frankenphp_get_vars(` + "'" + name + "'" + `);
81+
echo $vars['FRANKENPHP_WORKER_NAME'] ?? 'MISSING';
82+
`
83+
tmp := testDataDir + "multi-entry-reader-" + name + ".php"
84+
require.NoError(t, os.WriteFile(tmp, []byte(php), 0644))
85+
t.Cleanup(func() { _ = os.Remove(tmp) })
86+
87+
req := httptest.NewRequest("GET", "http://example.com/multi-entry-reader-"+name+".php", nil)
88+
fr, err := frankenphp.NewRequestWithContext(req, frankenphp.WithRequestDocumentRoot(testDataDir, false))
89+
require.NoError(t, err)
90+
w := httptest.NewRecorder()
91+
_ = frankenphp.ServeHTTP(w, fr)
92+
body, _ := io.ReadAll(w.Result().Body)
93+
return string(body)
94+
}
95+
96+
assert.Equal(t, "shared-a", read("shared-a"), "shared-a name must resolve to its own worker")
97+
assert.Equal(t, "shared-b", read("shared-b"), "shared-b name must resolve to its own worker")
98+
}

caddy/workerconfig.go

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -164,15 +164,10 @@ func unmarshalWorker(d *caddyfile.Dispenser) (workerConfig, error) {
164164
}
165165

166166
if wc.Background {
167-
if wc.Num > 1 {
168-
return wc, d.Err(`"num" > 1 is not yet supported for background workers`)
169-
}
170-
// For named bg workers, max_threads is threads-per-worker (>1 not
171-
// yet supported). For the catch-all (no name), it's the cap on
172-
// lazy-started instance count, which is a legitimate user knob.
173-
if wc.Name != "" && wc.MaxThreads > 1 {
174-
return wc, d.Err(`"max_threads" > 1 is not yet supported for named background workers`)
175-
}
167+
// Named bg workers: num and max_threads mean "threads per worker"
168+
// (pool size and pool cap). Catch-all bg workers: num is the pool
169+
// size of the placeholder instance (usually 1), max_threads caps
170+
// how many distinct names can be lazy-started.
176171
if len(wc.MatchPath) != 0 {
177172
return wc, d.Err(`"match" is not supported for background workers`)
178173
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
<?php
2+
3+
// Long-lived bg worker: disable PHP max_execution_time so the 30s default
4+
// cannot interrupt the stream_select park. The C side calls
5+
// zend_unset_timeout() too, but the belt-and-suspenders here covers PHP
6+
// builds where that path does not fully disarm the timer.
7+
set_time_limit(0);
8+
9+
// Pool worker: num > 1 means multiple threads share this worker name.
10+
// Each thread is a separate instance. We publish the thread's $_SERVER
11+
// identifying fields so the test can see both instances are live.
12+
frankenphp_set_vars([
13+
'name' => $_SERVER['FRANKENPHP_WORKER_NAME'] ?? 'unknown',
14+
'pid' => getmypid(),
15+
]);
16+
17+
$stream = frankenphp_get_worker_handle();
18+
if ($stream !== null) {
19+
$read = [$stream];
20+
$write = null;
21+
$except = null;
22+
stream_select($read, $write, $except, null);
23+
}

threadbackgroundworker.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,11 @@ type backgroundWorkerThread struct {
6767
dummyContext context.Context
6868
isBootingScript bool
6969
failureCount int
70+
71+
// stopFdWrite is the write end of THIS thread's stop pipe. Per-thread
72+
// so pool workers (num > 1) can each be drained independently; the
73+
// bg script uses the read end via frankenphp_get_worker_handle().
74+
stopFdWrite atomic.Int32
7075
}
7176

7277
func convertToBackgroundWorkerThread(thread *phpThread, worker *worker) {
@@ -97,9 +102,10 @@ func (handler *backgroundWorkerThread) context() context.Context {
97102
// drain is called by drainWorkerThreads (and thread.shutdown) right before
98103
// drainChan is closed. We close the stop-pipe's write end so the PHP worker
99104
// script, which is typically parked in stream_select on the read end, wakes
100-
// up and can finish its loop gracefully.
105+
// up and can finish its loop gracefully. Per-thread fd so pool workers
106+
// drain their threads independently.
101107
func (handler *backgroundWorkerThread) drain() {
102-
if fd := handler.worker.backgroundStopFdWrite.Swap(-1); fd >= 0 {
108+
if fd := handler.stopFdWrite.Swap(-1); fd >= 0 {
103109
C.frankenphp_worker_close_fd(C.int(fd))
104110
}
105111
}
@@ -162,7 +168,7 @@ func (handler *backgroundWorkerThread) setupScript() {
162168

163169
opts := append([]RequestOption(nil), handler.worker.requestOptions...)
164170
C.frankenphp_set_worker_name(handler.thread.pinCString(strings.TrimPrefix(handler.worker.name, "m#")), C._Bool(true))
165-
handler.worker.backgroundStopFdWrite.Store(int32(C.frankenphp_worker_get_stop_fd_write()))
171+
handler.stopFdWrite.Store(int32(C.frankenphp_worker_get_stop_fd_write()))
166172

167173
fc, err := newDummyContext(
168174
filepath.Base(handler.worker.fileName),
@@ -188,7 +194,7 @@ func (handler *backgroundWorkerThread) setupScript() {
188194
}
189195

190196
func (handler *backgroundWorkerThread) afterScriptExecution(exitStatus int) {
191-
handler.worker.backgroundStopFdWrite.Store(-1)
197+
handler.stopFdWrite.Store(-1)
192198
worker := handler.worker
193199
handler.dummyFrankenPHPContext = nil
194200
handler.dummyContext = nil

worker.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ type worker struct {
3939
backgroundWorker *backgroundWorkerState
4040
backgroundRegistry *backgroundWorkerRegistry
4141
backgroundReserveOnce sync.Once
42-
backgroundStopFdWrite atomic.Int32 // write end of the stop pipe, -1 if not set
4342
}
4443

4544
var (
@@ -178,10 +177,10 @@ func newWorker(o workerOpt) (*worker, error) {
178177
isBackgroundWorker: o.isBackgroundWorker,
179178
}
180179

181-
w.backgroundStopFdWrite.Store(-1)
182180
// backgroundWorker state is reserved lazily via the registry at
183181
// thread-setup time, not here; lazy-start callers set it directly
184-
// and eager inits go through setupScript's sync.Once.
182+
// and eager inits go through setupScript's sync.Once. The stop-pipe
183+
// write fd is per-thread (handler field), not per-worker.
185184

186185
w.configureMercure(&o)
187186

0 commit comments

Comments
 (0)