Skip to content

Commit 83b6757

Browse files
feat: background workers = non-HTTP workers with shared state
1 parent d26834c commit 83b6757

File tree

56 files changed

+2891
-24
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+2891
-24
lines changed

background_worker.go

Lines changed: 391 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,391 @@
1+
package frankenphp
2+
3+
// #include <stdint.h>
4+
// #include "frankenphp.h"
5+
import "C"
6+
import (
7+
"fmt"
8+
"log/slog"
9+
"strings"
10+
"sync"
11+
"sync/atomic"
12+
"time"
13+
"unsafe"
14+
)
15+
16+
var backgroundScopeCounter atomic.Uint64
17+
18+
// NextBackgroundWorkerScope returns a unique scope ID for background worker isolation.
19+
// Each php_server block should call this once during provisioning.
20+
func NextBackgroundWorkerScope() string {
21+
return fmt.Sprintf("php_server_%d", backgroundScopeCounter.Add(1))
22+
}
23+
24+
// defaultMaxBackgroundWorkers is the default safety cap for catch-all background workers.
25+
const defaultMaxBackgroundWorkers = 16
26+
27+
// backgroundLookups maps scope IDs to their background worker lookups.
28+
// Each php_server block gets its own scope. The global frankenphp block
29+
// uses the empty string as its scope ID.
30+
var backgroundLookups map[string]*backgroundWorkerLookup
31+
32+
// backgroundWorkerLookup maps worker names to registries, enabling multiple entrypoint files.
33+
type backgroundWorkerLookup struct {
34+
byName map[string]*backgroundWorkerRegistry
35+
catchAll *backgroundWorkerRegistry
36+
}
37+
38+
func newBackgroundWorkerLookup() *backgroundWorkerLookup {
39+
return &backgroundWorkerLookup{
40+
byName: make(map[string]*backgroundWorkerRegistry),
41+
}
42+
}
43+
44+
func (l *backgroundWorkerLookup) AddNamed(name string, registry *backgroundWorkerRegistry) {
45+
l.byName[name] = registry
46+
}
47+
48+
func (l *backgroundWorkerLookup) SetCatchAll(registry *backgroundWorkerRegistry) {
49+
l.catchAll = registry
50+
}
51+
52+
// Resolve returns the registry for the given name, falling back to catch-all.
53+
func (l *backgroundWorkerLookup) Resolve(name string) *backgroundWorkerRegistry {
54+
if r, ok := l.byName[name]; ok {
55+
return r
56+
}
57+
return l.catchAll
58+
}
59+
60+
type backgroundWorkerRegistry struct {
61+
entrypoint string
62+
num int // threads per background worker (0 = lazy-start with 1 thread)
63+
maxWorkers int // max lazy-started instances (0 = unlimited)
64+
autoStartNames []string // names to start at boot when num >= 1
65+
mu sync.Mutex
66+
workers map[string]*backgroundWorkerState
67+
}
68+
69+
func newBackgroundWorkerRegistry(entrypoint string) *backgroundWorkerRegistry {
70+
return &backgroundWorkerRegistry{
71+
entrypoint: entrypoint,
72+
workers: make(map[string]*backgroundWorkerState),
73+
}
74+
}
75+
76+
func (registry *backgroundWorkerRegistry) MaxThreads() int {
77+
if registry.num > 0 {
78+
return registry.num
79+
}
80+
return 1
81+
}
82+
83+
func (registry *backgroundWorkerRegistry) SetNum(num int) {
84+
registry.num = num
85+
}
86+
87+
func (registry *backgroundWorkerRegistry) AddAutoStartNames(names ...string) {
88+
registry.autoStartNames = append(registry.autoStartNames, names...)
89+
}
90+
91+
func (registry *backgroundWorkerRegistry) SetMaxWorkers(max int) {
92+
registry.maxWorkers = max
93+
}
94+
95+
// buildBackgroundWorkerLookups constructs per-scope background worker lookups
96+
// from worker options. Each scope (php_server block) gets its own lookup.
97+
func buildBackgroundWorkerLookups(workers []*worker, opts []workerOpt) map[string]*backgroundWorkerLookup {
98+
lookups := make(map[string]*backgroundWorkerLookup)
99+
scopeRegistries := make(map[string]map[string]*backgroundWorkerRegistry)
100+
101+
for i, o := range opts {
102+
if !o.isBackgroundWorker {
103+
continue
104+
}
105+
106+
scope := o.backgroundScope
107+
lookup, ok := lookups[scope]
108+
if !ok {
109+
lookup = newBackgroundWorkerLookup()
110+
lookups[scope] = lookup
111+
scopeRegistries[scope] = make(map[string]*backgroundWorkerRegistry)
112+
}
113+
114+
entrypoint := o.fileName
115+
registry, ok := scopeRegistries[scope][entrypoint]
116+
if !ok {
117+
registry = newBackgroundWorkerRegistry(entrypoint)
118+
scopeRegistries[scope][entrypoint] = registry
119+
}
120+
121+
workers[i].backgroundScope = scope
122+
123+
w := workers[i]
124+
phpName := strings.TrimPrefix(w.name, "m#")
125+
if phpName != "" && phpName != w.fileName {
126+
// Named background worker
127+
if o.num > 0 {
128+
registry.AddAutoStartNames(phpName)
129+
registry.SetNum(o.num)
130+
}
131+
lookup.AddNamed(phpName, registry)
132+
} else {
133+
// Catch-all background worker; maxThreads > 1 means user set it explicitly
134+
maxW := defaultMaxBackgroundWorkers
135+
if o.maxThreads > 1 {
136+
maxW = o.maxThreads
137+
}
138+
registry.SetMaxWorkers(maxW)
139+
lookup.SetCatchAll(registry)
140+
}
141+
142+
w.backgroundRegistry = registry
143+
}
144+
145+
if len(lookups) == 0 {
146+
return nil
147+
}
148+
149+
return lookups
150+
}
151+
152+
func (registry *backgroundWorkerRegistry) reserve(name string) (*backgroundWorkerState, bool, error) {
153+
registry.mu.Lock()
154+
defer registry.mu.Unlock()
155+
156+
if bgw := registry.workers[name]; bgw != nil {
157+
return bgw, true, nil
158+
}
159+
160+
if registry.maxWorkers > 0 && len(registry.workers) >= registry.maxWorkers {
161+
return nil, false, fmt.Errorf("cannot start background worker %q: limit of %d reached - increase max_threads on the catch-all background worker or declare it as a named worker", name, registry.maxWorkers)
162+
}
163+
164+
bgw := &backgroundWorkerState{
165+
ready: make(chan struct{}),
166+
}
167+
registry.workers[name] = bgw
168+
169+
return bgw, false, nil
170+
}
171+
172+
func (registry *backgroundWorkerRegistry) remove(name string, bgw *backgroundWorkerState) {
173+
registry.mu.Lock()
174+
defer registry.mu.Unlock()
175+
176+
if registry.workers[name] == bgw {
177+
delete(registry.workers, name)
178+
}
179+
}
180+
181+
func startBackgroundWorker(thread *phpThread, bgWorkerName string) error {
182+
if bgWorkerName == "" {
183+
return fmt.Errorf("background worker name must not be empty")
184+
}
185+
186+
lookup := getLookup(thread)
187+
if lookup == nil {
188+
return fmt.Errorf("no background worker configured in this php_server")
189+
}
190+
191+
registry := lookup.Resolve(bgWorkerName)
192+
if registry == nil || registry.entrypoint == "" {
193+
return fmt.Errorf("no background worker configured in this php_server")
194+
}
195+
196+
return startBackgroundWorkerWithRegistry(registry, bgWorkerName)
197+
}
198+
199+
func startBackgroundWorkerWithRegistry(registry *backgroundWorkerRegistry, bgWorkerName string) error {
200+
bgw, exists, err := registry.reserve(bgWorkerName)
201+
if err != nil {
202+
return err
203+
}
204+
if exists {
205+
return nil
206+
}
207+
208+
numThreads := registry.MaxThreads()
209+
210+
worker, err := newWorker(workerOpt{
211+
name: bgWorkerName,
212+
fileName: registry.entrypoint,
213+
num: numThreads,
214+
isBackgroundWorker: true,
215+
env: PrepareEnv(nil),
216+
watch: []string{},
217+
maxConsecutiveFailures: -1,
218+
})
219+
if err != nil {
220+
registry.remove(bgWorkerName, bgw)
221+
222+
return fmt.Errorf("failed to create background worker: %w", err)
223+
}
224+
225+
worker.isBackgroundWorker = true
226+
worker.backgroundWorker = bgw
227+
worker.backgroundRegistry = registry
228+
229+
for i := 0; i < numThreads; i++ {
230+
bgWorkerThread := getInactivePHPThread()
231+
if bgWorkerThread == nil {
232+
if i == 0 {
233+
registry.remove(bgWorkerName, bgw)
234+
}
235+
236+
return fmt.Errorf("no available PHP thread for background worker (increase max_threads)")
237+
}
238+
239+
scalingMu.Lock()
240+
workers = append(workers, worker)
241+
scalingMu.Unlock()
242+
243+
convertToBackgroundWorkerThread(bgWorkerThread, worker)
244+
}
245+
246+
if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
247+
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "background worker started", slog.String("name", bgWorkerName), slog.Int("threads", numThreads))
248+
}
249+
250+
return nil
251+
}
252+
253+
func getLookup(thread *phpThread) *backgroundWorkerLookup {
254+
if handler, ok := thread.handler.(*workerThread); ok && handler.worker.backgroundLookup != nil {
255+
return handler.worker.backgroundLookup
256+
}
257+
if handler, ok := thread.handler.(*backgroundWorkerThread); ok && handler.worker.backgroundLookup != nil {
258+
return handler.worker.backgroundLookup
259+
}
260+
// Non-worker requests: resolve scope from context
261+
if fc, ok := fromContext(thread.context()); ok && fc.backgroundScope != "" {
262+
if backgroundLookups != nil {
263+
return backgroundLookups[fc.backgroundScope]
264+
}
265+
}
266+
// Fall back to global scope
267+
if backgroundLookups != nil {
268+
return backgroundLookups[""]
269+
}
270+
271+
return nil
272+
}
273+
274+
// go_frankenphp_worker_get_vars starts background workers if needed, waits for them
275+
// to be ready, takes read locks, copies vars via C helper, and releases locks.
276+
// All locking/unlocking happens within this single Go call.
277+
//
278+
// callerVersions/outVersions: if callerVersions is non-nil and all versions match,
279+
// the copy is skipped entirely (returns 1). outVersions receives current versions.
280+
//
281+
//export go_frankenphp_worker_get_vars
282+
func go_frankenphp_worker_get_vars(threadIndex C.uintptr_t, names **C.char, nameLens *C.size_t, nameCount C.int, timeoutMs C.int, returnValue *C.zval, callerVersions *C.uint64_t, outVersions *C.uint64_t) *C.char {
283+
thread := phpThreads[threadIndex]
284+
lookup := getLookup(thread)
285+
if lookup == nil {
286+
return C.CString("no background worker configured in this php_server")
287+
}
288+
289+
n := int(nameCount)
290+
nameSlice := unsafe.Slice(names, n)
291+
nameLenSlice := unsafe.Slice(nameLens, n)
292+
293+
sks := make([]*backgroundWorkerState, n)
294+
goNames := make([]string, n)
295+
for i := 0; i < n; i++ {
296+
goNames[i] = C.GoStringN(nameSlice[i], C.int(nameLenSlice[i]))
297+
298+
// Start background worker if not already running
299+
if err := startBackgroundWorker(thread, goNames[i]); err != nil {
300+
return C.CString(err.Error())
301+
}
302+
303+
registry := lookup.Resolve(goNames[i])
304+
if registry == nil {
305+
return C.CString("background worker not found: " + goNames[i])
306+
}
307+
registry.mu.Lock()
308+
sks[i] = registry.workers[goNames[i]]
309+
registry.mu.Unlock()
310+
if sks[i] == nil {
311+
return C.CString("background worker not found: " + goNames[i])
312+
}
313+
}
314+
315+
// Wait for all workers to be ready (shared deadline across all workers)
316+
deadline := time.After(time.Duration(timeoutMs) * time.Millisecond)
317+
for i, sk := range sks {
318+
select {
319+
case <-sk.ready:
320+
// background worker has called set_vars
321+
case <-deadline:
322+
return C.CString(fmt.Sprintf("timeout waiting for background worker: %s", goNames[i]))
323+
}
324+
}
325+
326+
// Fast path: if all caller versions match, skip lock + copy entirely.
327+
// Read each version once and write to outVersions for the C side to compare.
328+
if callerVersions != nil && outVersions != nil {
329+
callerVSlice := unsafe.Slice(callerVersions, n)
330+
outVSlice := unsafe.Slice(outVersions, n)
331+
allMatch := true
332+
for i, sk := range sks {
333+
v := sk.varsVersion.Load()
334+
outVSlice[i] = C.uint64_t(v)
335+
if uint64(callerVSlice[i]) != v {
336+
allMatch = false
337+
}
338+
}
339+
if allMatch {
340+
return nil // C side sees out == caller, uses cached value
341+
}
342+
}
343+
344+
// Take all read locks, collect pointers, copy via C helper, then release
345+
ptrs := make([]unsafe.Pointer, n)
346+
for i, sk := range sks {
347+
sk.mu.RLock()
348+
ptrs[i] = sk.varsPtr
349+
}
350+
351+
C.frankenphp_worker_copy_vars(returnValue, C.int(n), names, nameLens, (*unsafe.Pointer)(unsafe.Pointer(&ptrs[0])))
352+
353+
// Write versions while locks are still held
354+
if outVersions != nil {
355+
outVSlice := unsafe.Slice(outVersions, n)
356+
for i, sk := range sks {
357+
outVSlice[i] = C.uint64_t(sk.varsVersion.Load())
358+
}
359+
}
360+
361+
for _, sk := range sks {
362+
sk.mu.RUnlock()
363+
}
364+
365+
return nil
366+
}
367+
368+
//export go_frankenphp_worker_set_vars
369+
func go_frankenphp_worker_set_vars(threadIndex C.uintptr_t, varsPtr unsafe.Pointer, oldPtr *unsafe.Pointer) *C.char {
370+
thread := phpThreads[threadIndex]
371+
372+
bgHandler, ok := thread.handler.(*backgroundWorkerThread)
373+
if !ok || bgHandler.worker.backgroundWorker == nil {
374+
return C.CString("frankenphp_worker_set_vars() can only be called from a background worker")
375+
}
376+
377+
sk := bgHandler.worker.backgroundWorker
378+
379+
sk.mu.Lock()
380+
*oldPtr = sk.varsPtr
381+
sk.varsPtr = varsPtr
382+
sk.varsVersion.Add(1)
383+
sk.mu.Unlock()
384+
385+
sk.readyOnce.Do(func() {
386+
bgHandler.markBackgroundReady()
387+
close(sk.ready)
388+
})
389+
390+
return nil
391+
}

0 commit comments

Comments
 (0)