@@ -3,6 +3,9 @@ package build
33/*
44#include <stdlib.h>
55#include "ext.h"
6+
7+ void set_current_worker_handle(uintptr_t handle);
8+ void clear_current_worker(void);
69*/
710import "C"
811import (
@@ -31,19 +34,22 @@ import "github.com/nats-io/nats.go/jetstream"
3134import "go.uber.org/zap"
3235
3336type worker struct {
37+ requestChan chan * frankenphp.WorkerRequest
3438}
3539
40+ var globalWorkerInstance * worker
41+
3642func (w * worker ) Name () string {
3743 return "m#durable-php"
3844}
3945
4046func (w * worker ) FileName () string {
4147 // check if target exists
42- if _ , err := os .Stat ("src/glue /worker.php" ); ! os .IsNotExist (err ) {
43- return "src/glue /worker.php"
48+ if _ , err := os .Stat ("src/Glue /worker.php" ); ! os .IsNotExist (err ) {
49+ return "src/Glue /worker.php"
4450 }
4551
46- return "vendor/bottledcode/durable-php/src/glue /worker.php"
52+ return "vendor/bottledcode/durable-php/src/Glue /worker.php"
4753}
4854
4955func (w * worker ) Env () frankenphp.PreparedEnv {
@@ -64,14 +70,18 @@ func (w *worker) ThreadDeactivatedNotification(threadId int) {
6470}
6571
6672func (w * worker ) ProvideRequest () * frankenphp.WorkerRequest {
67- return nil
73+ req := <- w .requestChan
74+ return req
6875}
6976
7077func init () {
7178 frankenphp .RegisterExtension (unsafe .Pointer (& C .ext_module_entry ))
7279
7380 // initialize the workers
74- frankenphp .RegisterExternalWorker (& worker {})
81+ globalWorkerInstance = & worker {
82+ requestChan : make (chan * frankenphp.WorkerRequest , 100 ), // Buffer for 100 requests
83+ }
84+ frankenphp .RegisterExternalWorker (globalWorkerInstance )
7585}
7686
7787func Authorize (ctx context.Context , ev * glue.EventMessage , from * ids.StateId , preventCreation bool , operation auth.Operation ) (bool , error ) {
@@ -825,3 +835,41 @@ func delete_wrapper(handle C.uintptr_t) {
825835 structObj := obj .(* Worker )
826836 structObj .delete ()
827837}
838+
839+ // SetCurrentWorker sets the current worker context for the PHP extension
840+ func SetCurrentWorker (worker * Worker ) {
841+ if worker == nil {
842+ C .clear_current_worker ()
843+ return
844+ }
845+
846+ handle := registerGoObject (worker )
847+ C .set_current_worker_handle (C .uintptr_t (handle ))
848+ }
849+
850+ // ClearCurrentWorker clears the current worker context
851+ func ClearCurrentWorker () {
852+ C .clear_current_worker ()
853+ }
854+
855+ // GetWorkerInstance returns the global worker instance for feeding requests
856+ func GetWorkerInstance () * worker {
857+ return globalWorkerInstance
858+ }
859+
860+ // InjectWorkerRequest sends a request to the worker for processing
861+ func InjectWorkerRequest (req * frankenphp.WorkerRequest ) {
862+ if globalWorkerInstance != nil {
863+ globalWorkerInstance .requestChan <- req
864+ }
865+ }
866+
867+ // StartWorkerConsumer starts a NATS consumer that feeds requests to the worker channel
868+ func StartWorkerConsumer (ctx context.Context , cfg * config.Config , logger * zap.Logger ) {
869+ // This will create NATS consumers and convert messages to HTTP requests for the worker
870+ // Instead of using the old lib/consumer.go approach, we create consumers here
871+ // and feed HTTP requests directly to the worker channel
872+
873+ logger .Info ("Starting worker-based consumer" )
874+ // TODO: Implement NATS consumer logic here
875+ }
0 commit comments