Skip to content

Commit 5e2cc5c

Browse files
feat: background workers = non-HTTP workers with shared state
1 parent d26834c commit 5e2cc5c

File tree

56 files changed

+2883
-24
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2883
-24
lines changed

background_worker.go

Lines changed: 385 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,385 @@
1+
package frankenphp
2+
3+
// #include <stdint.h>
4+
// #include "frankenphp.h"
5+
import "C"
6+
import (
7+
"fmt"
8+
"log/slog"
9+
"strings"
10+
"sync"
11+
"sync/atomic"
12+
"time"
13+
"unsafe"
14+
)
15+
16+
var backgroundScopeCounter atomic.Uint64
17+
18+
// NextBackgroundWorkerScope returns a unique scope ID for background worker isolation.
19+
// Each php_server block should call this once during provisioning.
20+
func NextBackgroundWorkerScope() string {
21+
return fmt.Sprintf("php_server_%d", backgroundScopeCounter.Add(1))
22+
}
23+
24+
// defaultMaxBackgroundWorkers is the default safety cap for catch-all background workers.
25+
const defaultMaxBackgroundWorkers = 16
26+
27+
// backgroundLookups maps scope IDs to their background worker lookups.
28+
// Each php_server block gets its own scope. The global frankenphp block
29+
// uses the empty string as its scope ID.
30+
var backgroundLookups map[string]*backgroundWorkerLookup
31+
32+
// backgroundWorkerLookup maps worker names to registries, enabling multiple entrypoint files.
33+
type backgroundWorkerLookup struct {
34+
byName map[string]*backgroundWorkerRegistry
35+
catchAll *backgroundWorkerRegistry
36+
}
37+
38+
func newBackgroundWorkerLookup() *backgroundWorkerLookup {
39+
return &backgroundWorkerLookup{
40+
byName: make(map[string]*backgroundWorkerRegistry),
41+
}
42+
}
43+
44+
func (l *backgroundWorkerLookup) AddNamed(name string, registry *backgroundWorkerRegistry) {
45+
l.byName[name] = registry
46+
}
47+
48+
func (l *backgroundWorkerLookup) SetCatchAll(registry *backgroundWorkerRegistry) {
49+
l.catchAll = registry
50+
}
51+
52+
// Resolve returns the registry for the given name, falling back to catch-all.
53+
func (l *backgroundWorkerLookup) Resolve(name string) *backgroundWorkerRegistry {
54+
if r, ok := l.byName[name]; ok {
55+
return r
56+
}
57+
return l.catchAll
58+
}
59+
60+
type backgroundWorkerRegistry struct {
61+
entrypoint string
62+
num int // threads per background worker (0 = lazy-start with 1 thread)
63+
maxWorkers int // max lazy-started instances (0 = unlimited)
64+
mu sync.Mutex
65+
workers map[string]*backgroundWorkerState
66+
}
67+
68+
func newBackgroundWorkerRegistry(entrypoint string) *backgroundWorkerRegistry {
69+
return &backgroundWorkerRegistry{
70+
entrypoint: entrypoint,
71+
workers: make(map[string]*backgroundWorkerState),
72+
}
73+
}
74+
75+
func (registry *backgroundWorkerRegistry) MaxThreads() int {
76+
if registry.num > 0 {
77+
return registry.num
78+
}
79+
return 1
80+
}
81+
82+
func (registry *backgroundWorkerRegistry) SetNum(num int) {
83+
registry.num = num
84+
}
85+
86+
func (registry *backgroundWorkerRegistry) SetMaxWorkers(max int) {
87+
registry.maxWorkers = max
88+
}
89+
90+
// buildBackgroundWorkerLookups constructs per-scope background worker lookups
91+
// from worker options. Each scope (php_server block) gets its own lookup.
92+
func buildBackgroundWorkerLookups(workers []*worker, opts []workerOpt) map[string]*backgroundWorkerLookup {
93+
lookups := make(map[string]*backgroundWorkerLookup)
94+
scopeRegistries := make(map[string]map[string]*backgroundWorkerRegistry)
95+
96+
for i, o := range opts {
97+
if !o.isBackgroundWorker {
98+
continue
99+
}
100+
101+
scope := o.backgroundScope
102+
lookup, ok := lookups[scope]
103+
if !ok {
104+
lookup = newBackgroundWorkerLookup()
105+
lookups[scope] = lookup
106+
scopeRegistries[scope] = make(map[string]*backgroundWorkerRegistry)
107+
}
108+
109+
entrypoint := o.fileName
110+
registry, ok := scopeRegistries[scope][entrypoint]
111+
if !ok {
112+
registry = newBackgroundWorkerRegistry(entrypoint)
113+
scopeRegistries[scope][entrypoint] = registry
114+
}
115+
116+
workers[i].backgroundScope = scope
117+
118+
w := workers[i]
119+
phpName := strings.TrimPrefix(w.name, "m#")
120+
if phpName != "" && phpName != w.fileName {
121+
// Named background worker
122+
if o.num > 0 {
123+
registry.SetNum(o.num)
124+
}
125+
lookup.AddNamed(phpName, registry)
126+
} else {
127+
// Catch-all background worker; maxThreads > 1 means user set it explicitly
128+
maxW := defaultMaxBackgroundWorkers
129+
if o.maxThreads > 1 {
130+
maxW = o.maxThreads
131+
}
132+
registry.SetMaxWorkers(maxW)
133+
lookup.SetCatchAll(registry)
134+
}
135+
136+
w.backgroundRegistry = registry
137+
}
138+
139+
if len(lookups) == 0 {
140+
return nil
141+
}
142+
143+
return lookups
144+
}
145+
146+
func (registry *backgroundWorkerRegistry) reserve(name string) (*backgroundWorkerState, bool, error) {
147+
registry.mu.Lock()
148+
defer registry.mu.Unlock()
149+
150+
if bgw := registry.workers[name]; bgw != nil {
151+
return bgw, true, nil
152+
}
153+
154+
if registry.maxWorkers > 0 && len(registry.workers) >= registry.maxWorkers {
155+
return nil, false, fmt.Errorf("cannot start background worker %q: limit of %d reached - increase max_threads on the catch-all background worker or declare it as a named worker", name, registry.maxWorkers)
156+
}
157+
158+
bgw := &backgroundWorkerState{
159+
ready: make(chan struct{}),
160+
}
161+
registry.workers[name] = bgw
162+
163+
return bgw, false, nil
164+
}
165+
166+
func (registry *backgroundWorkerRegistry) remove(name string, bgw *backgroundWorkerState) {
167+
registry.mu.Lock()
168+
defer registry.mu.Unlock()
169+
170+
if registry.workers[name] == bgw {
171+
delete(registry.workers, name)
172+
}
173+
}
174+
175+
func startBackgroundWorker(thread *phpThread, bgWorkerName string) error {
176+
if bgWorkerName == "" {
177+
return fmt.Errorf("background worker name must not be empty")
178+
}
179+
180+
lookup := getLookup(thread)
181+
if lookup == nil {
182+
return fmt.Errorf("no background worker configured in this php_server")
183+
}
184+
185+
registry := lookup.Resolve(bgWorkerName)
186+
if registry == nil || registry.entrypoint == "" {
187+
return fmt.Errorf("no background worker configured in this php_server")
188+
}
189+
190+
return startBackgroundWorkerWithRegistry(registry, bgWorkerName)
191+
}
192+
193+
func startBackgroundWorkerWithRegistry(registry *backgroundWorkerRegistry, bgWorkerName string) error {
194+
bgw, exists, err := registry.reserve(bgWorkerName)
195+
if err != nil {
196+
return err
197+
}
198+
if exists {
199+
return nil
200+
}
201+
202+
numThreads := registry.MaxThreads()
203+
204+
worker, err := newWorker(workerOpt{
205+
name: bgWorkerName,
206+
fileName: registry.entrypoint,
207+
num: numThreads,
208+
isBackgroundWorker: true,
209+
env: PrepareEnv(nil),
210+
watch: []string{},
211+
maxConsecutiveFailures: -1,
212+
})
213+
if err != nil {
214+
registry.remove(bgWorkerName, bgw)
215+
216+
return fmt.Errorf("failed to create background worker: %w", err)
217+
}
218+
219+
worker.isBackgroundWorker = true
220+
worker.backgroundWorker = bgw
221+
worker.backgroundRegistry = registry
222+
223+
for i := 0; i < numThreads; i++ {
224+
bgWorkerThread := getInactivePHPThread()
225+
if bgWorkerThread == nil {
226+
if i == 0 {
227+
registry.remove(bgWorkerName, bgw)
228+
}
229+
230+
return fmt.Errorf("no available PHP thread for background worker (increase max_threads)")
231+
}
232+
233+
scalingMu.Lock()
234+
workers = append(workers, worker)
235+
scalingMu.Unlock()
236+
237+
convertToBackgroundWorkerThread(bgWorkerThread, worker)
238+
}
239+
240+
if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
241+
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "background worker started", slog.String("name", bgWorkerName), slog.Int("threads", numThreads))
242+
}
243+
244+
return nil
245+
}
246+
247+
func getLookup(thread *phpThread) *backgroundWorkerLookup {
248+
if handler, ok := thread.handler.(*workerThread); ok && handler.worker.backgroundLookup != nil {
249+
return handler.worker.backgroundLookup
250+
}
251+
if handler, ok := thread.handler.(*backgroundWorkerThread); ok && handler.worker.backgroundLookup != nil {
252+
return handler.worker.backgroundLookup
253+
}
254+
// Non-worker requests: resolve scope from context
255+
if fc, ok := fromContext(thread.context()); ok && fc.backgroundScope != "" {
256+
if backgroundLookups != nil {
257+
return backgroundLookups[fc.backgroundScope]
258+
}
259+
}
260+
// Fall back to global scope
261+
if backgroundLookups != nil {
262+
return backgroundLookups[""]
263+
}
264+
265+
return nil
266+
}
267+
268+
// go_frankenphp_get_worker_vars starts background workers if needed, waits for them
269+
// to be ready, takes read locks, copies vars via C helper, and releases locks.
270+
// All locking/unlocking happens within this single Go call.
271+
//
272+
// callerVersions/outVersions: if callerVersions is non-nil and all versions match,
273+
// the copy is skipped entirely (returns 1). outVersions receives current versions.
274+
//
275+
//export go_frankenphp_get_worker_vars
276+
func go_frankenphp_get_worker_vars(threadIndex C.uintptr_t, names **C.char, nameLens *C.size_t, nameCount C.int, timeoutMs C.int, returnValue *C.zval, callerVersions *C.uint64_t, outVersions *C.uint64_t) *C.char {
277+
thread := phpThreads[threadIndex]
278+
lookup := getLookup(thread)
279+
if lookup == nil {
280+
return C.CString("no background worker configured in this php_server")
281+
}
282+
283+
n := int(nameCount)
284+
nameSlice := unsafe.Slice(names, n)
285+
nameLenSlice := unsafe.Slice(nameLens, n)
286+
287+
sks := make([]*backgroundWorkerState, n)
288+
goNames := make([]string, n)
289+
for i := 0; i < n; i++ {
290+
goNames[i] = C.GoStringN(nameSlice[i], C.int(nameLenSlice[i]))
291+
292+
// Start background worker if not already running
293+
if err := startBackgroundWorker(thread, goNames[i]); err != nil {
294+
return C.CString(err.Error())
295+
}
296+
297+
registry := lookup.Resolve(goNames[i])
298+
if registry == nil {
299+
return C.CString("background worker not found: " + goNames[i])
300+
}
301+
registry.mu.Lock()
302+
sks[i] = registry.workers[goNames[i]]
303+
registry.mu.Unlock()
304+
if sks[i] == nil {
305+
return C.CString("background worker not found: " + goNames[i])
306+
}
307+
}
308+
309+
// Wait for all workers to be ready (shared deadline across all workers)
310+
deadline := time.After(time.Duration(timeoutMs) * time.Millisecond)
311+
for i, sk := range sks {
312+
select {
313+
case <-sk.ready:
314+
// background worker has called set_worker_vars
315+
case <-deadline:
316+
return C.CString(fmt.Sprintf("timeout waiting for background worker: %s", goNames[i]))
317+
}
318+
}
319+
320+
// Fast path: if all caller versions match, skip lock + copy entirely.
321+
// Read each version once and write to outVersions for the C side to compare.
322+
if callerVersions != nil && outVersions != nil {
323+
callerVSlice := unsafe.Slice(callerVersions, n)
324+
outVSlice := unsafe.Slice(outVersions, n)
325+
allMatch := true
326+
for i, sk := range sks {
327+
v := sk.varsVersion.Load()
328+
outVSlice[i] = C.uint64_t(v)
329+
if uint64(callerVSlice[i]) != v {
330+
allMatch = false
331+
}
332+
}
333+
if allMatch {
334+
return nil // C side sees out == caller, uses cached value
335+
}
336+
}
337+
338+
// Take all read locks, collect pointers, copy via C helper, then release
339+
ptrs := make([]unsafe.Pointer, n)
340+
for i, sk := range sks {
341+
sk.mu.RLock()
342+
ptrs[i] = sk.varsPtr
343+
}
344+
345+
C.frankenphp_worker_copy_vars(returnValue, C.int(n), names, nameLens, (*unsafe.Pointer)(unsafe.Pointer(&ptrs[0])))
346+
347+
// Write versions while locks are still held
348+
if outVersions != nil {
349+
outVSlice := unsafe.Slice(outVersions, n)
350+
for i, sk := range sks {
351+
outVSlice[i] = C.uint64_t(sk.varsVersion.Load())
352+
}
353+
}
354+
355+
for _, sk := range sks {
356+
sk.mu.RUnlock()
357+
}
358+
359+
return nil
360+
}
361+
362+
//export go_frankenphp_set_worker_vars
363+
func go_frankenphp_set_worker_vars(threadIndex C.uintptr_t, varsPtr unsafe.Pointer, oldPtr *unsafe.Pointer) *C.char {
364+
thread := phpThreads[threadIndex]
365+
366+
bgHandler, ok := thread.handler.(*backgroundWorkerThread)
367+
if !ok || bgHandler.worker.backgroundWorker == nil {
368+
return C.CString("frankenphp_set_worker_vars() can only be called from a background worker")
369+
}
370+
371+
sk := bgHandler.worker.backgroundWorker
372+
373+
sk.mu.Lock()
374+
*oldPtr = sk.varsPtr
375+
sk.varsPtr = varsPtr
376+
sk.varsVersion.Add(1)
377+
sk.mu.Unlock()
378+
379+
sk.readyOnce.Do(func() {
380+
bgHandler.markBackgroundReady()
381+
close(sk.ready)
382+
})
383+
384+
return nil
385+
}

0 commit comments

Comments
 (0)