Skip to content

Commit ebe07e6

Browse files
feat: add task API for bidirectional background worker communication
1 parent 8fe74b2 commit ebe07e6

27 files changed

Lines changed: 1562 additions & 16 deletions

background_worker.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,9 @@ func buildBackgroundWorkerLookups(workers []*worker, opts []workerOpt) map[strin
130130
workers[i].backgroundScope = scope
131131

132132
w := workers[i]
133-
if w.name != "" && w.name != w.fileName {
134-
// Named background worker; strip m# prefix so PHP sees the clean name
135-
phpName := strings.TrimPrefix(w.name, "m#")
133+
phpName := strings.TrimPrefix(w.name, "m#")
134+
if phpName != "" && phpName != w.fileName {
135+
// Named background worker
136136
if o.num > 0 {
137137
registry.AddAutoStartNames(phpName)
138138
registry.SetNum(o.num)
@@ -172,6 +172,8 @@ func (registry *backgroundWorkerRegistry) reserve(name string) (*backgroundWorke
172172

173173
bgw := &backgroundWorkerState{
174174
ready: make(chan struct{}),
175+
tasks: make(chan *taskRequest, 1), // buffer=1: backpressure with signaling
176+
dead: make(chan struct{}),
175177
}
176178
registry.workers[name] = bgw
177179

@@ -185,6 +187,9 @@ func (registry *backgroundWorkerRegistry) remove(name string, bgw *backgroundWor
185187
if registry.workers[name] == bgw {
186188
delete(registry.workers, name)
187189
}
190+
191+
// Signal waiting senders that this worker is gone
192+
bgw.deadOnce.Do(func() { close(bgw.dead) })
188193
}
189194

190195
func startBackgroundWorker(thread *phpThread, bgWorkerName string) error {
@@ -234,6 +239,7 @@ func startBackgroundWorkerWithRegistry(registry *backgroundWorkerRegistry, bgWor
234239
worker.isBackgroundWorker = true
235240
worker.backgroundWorker = bgw
236241
worker.backgroundRegistry = registry
242+
bgw.fds = &worker.backgroundFds
237243

238244
for i := 0; i < numThreads; i++ {
239245
bgWorkerThread := getInactivePHPThread()

0 commit comments

Comments
 (0)