Skip to content

Commit f842a74

Browse files
committed
handle the go side better
Signed-off-by: Robert Landers <landers.robert@gmail.com>
1 parent b22b15a commit f842a74

4 files changed

Lines changed: 133 additions & 116 deletions

File tree

cli/ext/build/ext.c

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -104,15 +104,6 @@ PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, __destruct) {
104104
__destruct_wrapper(intern->go_handle);
105105
}
106106

107-
PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, getNextEvent) {
108-
ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));
109-
110-
VALIDATE_GO_HANDLE(intern);
111-
ZEND_PARSE_PARAMETERS_NONE();
112-
113-
zend_string* result = getNextEvent_wrapper(intern->go_handle);
114-
RETURN_STR(result);
115-
}
116107

117108
PHP_METHOD(Bottledcode_DurablePhp_Ext_Worker, queryState) {
118109
ext_object *intern = ext_object_from_obj(Z_OBJ_P(ZEND_THIS));

cli/ext/build/ext.go

Lines changed: 132 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import "github.com/dunglas/frankenphp"
1616
import "context"
1717
import "encoding/json"
1818
import "errors"
19+
import "net/http"
1920
import "os"
2021
import "strings"
2122
import "sync"
@@ -34,7 +35,7 @@ import "github.com/nats-io/nats.go/jetstream"
3435
import "go.uber.org/zap"
3536

3637
type worker struct {
37-
requestChan chan *frankenphp.WorkerRequest
38+
// No longer needs requestChan - pulls directly from NATS
3839
}
3940

4041
var globalWorkerInstance *worker
@@ -70,17 +71,139 @@ func (w *worker) ThreadDeactivatedNotification(threadId int) {
7071
}
7172

7273
func (w *worker) ProvideRequest() *frankenphp.WorkerRequest {
73-
req := <-w.requestChan
74+
// Get the next NATS message from the consumer
75+
ctx := helpers.Ctx
76+
logger := helpers.Logger
77+
js := helpers.Js
78+
79+
// Find the next available message from any consumer
80+
var msg jetstream.Msg
81+
var err error
82+
var worker *Worker
83+
84+
// Try to get a message from each consumer type
85+
consumers := []ids.IdKind{ids.Activity, ids.Entity, ids.Orchestration}
86+
87+
for _, kind := range consumers {
88+
stream, err := js.Stream(ctx, helpers.Config.Stream)
89+
if err != nil {
90+
continue
91+
}
92+
93+
consumer, err := stream.Consumer(ctx, helpers.Config.Stream+"-"+string(kind))
94+
if err != nil {
95+
continue
96+
}
97+
98+
iter, err := consumer.Messages(jetstream.PullMaxMessages(1), jetstream.WithMessagesErrOnMissingHeartbeat(false))
99+
if err != nil {
100+
continue
101+
}
102+
103+
msg, err = iter.Next()
104+
if err == nil {
105+
// Create worker context for this message
106+
worker = &Worker{
107+
kind: kind,
108+
currentMsg: msg,
109+
}
110+
break
111+
}
112+
}
113+
114+
if msg == nil {
115+
// No messages available, return nil
116+
return nil
117+
}
118+
119+
// Process the message using the logic from getNextEvent
120+
meta, _ := msg.Metadata()
121+
headers := msg.Headers()
122+
123+
currentUser := &auth.User{}
124+
b := msg.Headers().Get(string(glue.HeaderProvenance))
125+
err = json.Unmarshal([]byte(b), currentUser)
126+
if err != nil {
127+
logger.Warn("Failed to unmarshal event provenance",
128+
zap.Any("Provenance", msg.Headers().Get(string(glue.HeaderProvenance))),
129+
zap.Error(err),
130+
)
131+
currentUser = nil
132+
} else {
133+
ctx = auth.DecorateContextWithUser(ctx, currentUser)
134+
}
135+
136+
// Handle delayed messages
137+
if headers.Get(string(glue.HeaderDelay)) != "" && meta.NumDelivered == 1 {
138+
logger.Debug("Delaying message", zap.String("delay", msg.Headers().Get("Delay")), zap.Any("Headers", meta))
139+
schedule, err := time.Parse(time.RFC3339, msg.Headers().Get("Delay"))
140+
if err != nil {
141+
helpers.ThrowPHPException(err.Error())
142+
return nil
143+
}
144+
145+
delay := time.Until(schedule)
146+
if err := msg.NakWithDelay(delay); err != nil {
147+
helpers.ThrowPHPException(err.Error())
148+
return nil
149+
}
150+
151+
// Recursively get next message after handling delay
152+
return w.ProvideRequest()
153+
}
154+
155+
// Handle delete messages
156+
if strings.HasSuffix(msg.Subject(), ".delete") {
157+
id := ids.ParseStateId(msg.Headers().Get(string(glue.HeaderStateId)))
158+
err := glue.DeleteState(ctx, js, logger, id)
159+
if err != nil {
160+
helpers.ThrowPHPException(err.Error())
161+
return nil
162+
}
163+
// Recursively get next message after handling delete
164+
return w.ProvideRequest()
165+
}
166+
167+
worker.currentCtx = lib.GetCorrelationId(ctx, nil, &headers)
168+
169+
rm := auth.GetResourceManager(ctx, js)
170+
171+
worker.authContext, worker.activeId, worker.state, err = lib.ProcessMessage(ctx, logger, msg, rm, helpers.Config, js)
172+
if err != nil {
173+
helpers.ThrowPHPException(err.Error())
174+
return nil
175+
}
176+
177+
// Set the current worker for PHP access
178+
SetCurrentWorker(worker)
179+
180+
// Create an HTTP request from the message
181+
httpReq, err := http.NewRequest("POST", "/worker", strings.NewReader(string(msg.Data())))
182+
if err != nil {
183+
helpers.ThrowPHPException(err.Error())
184+
return nil
185+
}
186+
187+
httpReq.Header.Set("Content-Type", "application/json")
188+
httpReq.Header.Set("X-Correlation-ID", worker.currentCtx.Value("cid").(string))
189+
httpReq.Header.Set("X-State-ID", worker.activeId.String())
190+
httpReq.Header.Set("X-Event-Type", msg.Headers().Get(string(glue.HeaderEventType)))
191+
httpReq.Header.Set("X-Source-ID", msg.Headers().Get(string(glue.HeaderEmittedBy)))
192+
193+
req := &frankenphp.WorkerRequest{
194+
Request: httpReq,
195+
Response: nil, // Response writer will be provided by FrankenPHP
196+
Done: make(chan struct{}),
197+
}
198+
74199
return req
75200
}
76201

77202
func init() {
78203
frankenphp.RegisterExtension(unsafe.Pointer(&C.ext_module_entry))
79204

80205
// initialize the workers
81-
globalWorkerInstance = &worker{
82-
requestChan: make(chan *frankenphp.WorkerRequest, 100), // Buffer for 100 requests
83-
}
206+
globalWorkerInstance = &worker{}
84207
frankenphp.RegisterExternalWorker(globalWorkerInstance)
85208
}
86209

@@ -579,77 +702,7 @@ func (w *Worker) __destruct() {
579702
w.consumer.Done()
580703
}
581704

582-
func (w *Worker) getNextEvent() unsafe.Pointer {
583-
c := w.consumer
584-
585-
ctx := c.Context
586-
logger := helpers.Logger
587-
js := helpers.Js
588-
589-
msg, err := c.Msg.Next()
590-
if err != nil {
591-
helpers.ThrowPHPException(err.Error())
592-
return frankenphp.PHPString("", false)
593-
}
594-
595-
meta, _ := msg.Metadata()
596-
headers := msg.Headers()
597-
598-
currentUser := &auth.User{}
599-
b := msg.Headers().Get(string(glue.HeaderProvenance))
600-
err = json.Unmarshal([]byte(b), currentUser)
601-
if err != nil {
602-
logger.Warn("Failed to unmarshal event provenance",
603-
zap.Any("Provenance", msg.Headers().Get(string(glue.HeaderProvenance))),
604-
zap.Error(err),
605-
)
606-
currentUser = nil
607-
} else {
608-
ctx = auth.DecorateContextWithUser(ctx, currentUser)
609-
}
610-
611-
if headers.Get(string(glue.HeaderDelay)) != "" && meta.NumDelivered == 1 {
612-
logger.Debug("Delaying message", zap.String("delay", msg.Headers().Get("Delay")), zap.Any("Headers", meta))
613-
schedule, err := time.Parse(time.RFC3339, msg.Headers().Get("Delay"))
614-
if err != nil {
615-
helpers.ThrowPHPException(err.Error())
616-
return frankenphp.PHPString("", false)
617-
}
618-
619-
delay := time.Until(schedule)
620-
if err := msg.NakWithDelay(delay); err != nil {
621-
helpers.ThrowPHPException(err.Error())
622-
return frankenphp.PHPString("", false)
623-
}
624-
625-
return w.getNextEvent()
626-
}
627-
628-
if strings.HasSuffix(msg.Subject(), ".delete") {
629-
id := ids.ParseStateId(msg.Headers().Get(string(glue.HeaderStateId)))
630-
// todo: remove glue!
631-
err := glue.DeleteState(ctx, js, logger, id)
632-
if err != nil {
633-
helpers.ThrowPHPException(err.Error())
634-
return frankenphp.PHPString("", false)
635-
}
636-
return w.getNextEvent()
637-
}
638-
639-
w.currentCtx = lib.GetCorrelationId(ctx, nil, &headers)
640-
641-
rm := auth.GetResourceManager(ctx, js)
642-
643-
w.authContext, w.activeId, w.state, err = lib.ProcessMessage(ctx, logger, msg, rm, helpers.Config, js)
644-
if err != nil {
645-
helpers.ThrowPHPException(err.Error())
646-
return frankenphp.PHPString("", false)
647-
}
648-
649-
w.currentMsg = msg
650-
651-
return frankenphp.PHPString(string(msg.Data()), false)
652-
}
705+
// getNextEvent removed - logic moved to ProvideRequest method
653706

654707
func (w *Worker) queryState(idStr *C.zend_string) unsafe.Pointer {
655708
id := ids.ParseStateId(frankenphp.GoString(unsafe.Pointer(idStr)))
@@ -736,15 +789,7 @@ func __destruct_wrapper(handle C.uintptr_t) {
736789
structObj.__destruct()
737790
}
738791

739-
//export getNextEvent_wrapper
740-
func getNextEvent_wrapper(handle C.uintptr_t) unsafe.Pointer {
741-
obj := getGoObject(handle)
742-
if obj == nil {
743-
return nil
744-
}
745-
structObj := obj.(*Worker)
746-
return structObj.getNextEvent()
747-
}
792+
// getNextEvent_wrapper removed - no longer needed
748793

749794
//export queryState_wrapper
750795
func queryState_wrapper(handle C.uintptr_t, stateId *C.zend_string) unsafe.Pointer {
@@ -857,19 +902,6 @@ func GetWorkerInstance() *worker {
857902
return globalWorkerInstance
858903
}
859904

860-
// InjectWorkerRequest sends a request to the worker for processing
861-
func InjectWorkerRequest(req *frankenphp.WorkerRequest) {
862-
if globalWorkerInstance != nil {
863-
globalWorkerInstance.requestChan <- req
864-
}
865-
}
905+
// InjectWorkerRequest removed - worker now pulls directly from NATS consumers
866906

867-
// StartWorkerConsumer starts a NATS consumer that feeds requests to the worker channel
868-
func StartWorkerConsumer(ctx context.Context, cfg *config.Config, logger *zap.Logger) {
869-
// This will create NATS consumers and convert messages to HTTP requests for the worker
870-
// Instead of using the old lib/consumer.go approach, we create consumers here
871-
// and feed HTTP requests directly to the worker channel
872-
873-
logger.Info("Starting worker-based consumer")
874-
// TODO: Implement NATS consumer logic here
875-
}
907+
// StartWorkerConsumer removed - NATS consumer logic now integrated into ProvideRequest method

cli/ext/build/ext.stub.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ public function drainEventLoop(): void {}
1818

1919
public function __destruct() {}
2020

21-
public function getNextEvent(): ?string {}
2221

2322
public function queryState(string $stateId): array {}
2423

cli/ext/build/ext_arginfo.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 60118ea78de15908a849b87c3010bbd972e60e71 */
2+
* Stub hash: 2cbfd4f7ba4303080cd46416ca0c29baf5b625f2 */
33

44
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_Bottledcode_DurablePhp_Ext_emit_event, 0, 3, IS_LONG, 0)
55
ZEND_ARG_TYPE_INFO(0, userContext, IS_ARRAY, 1)
@@ -22,9 +22,6 @@ ZEND_END_ARG_INFO()
2222

2323
#define arginfo_class_Bottledcode_DurablePhp_Ext_Worker___destruct arginfo_class_Bottledcode_DurablePhp_Ext_Worker___construct
2424

25-
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Bottledcode_DurablePhp_Ext_Worker_getNextEvent, 0, 0, IS_STRING, 1)
26-
ZEND_END_ARG_INFO()
27-
2825
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_Bottledcode_DurablePhp_Ext_Worker_queryState, 0, 1, IS_ARRAY, 0)
2926
ZEND_ARG_TYPE_INFO(0, stateId, IS_STRING, 0)
3027
ZEND_END_ARG_INFO()
@@ -57,7 +54,6 @@ ZEND_METHOD(Bottledcode_DurablePhp_Ext_Worker, GetCurrent);
5754
ZEND_METHOD(Bottledcode_DurablePhp_Ext_Worker, startEventLoop);
5855
ZEND_METHOD(Bottledcode_DurablePhp_Ext_Worker, drainEventLoop);
5956
ZEND_METHOD(Bottledcode_DurablePhp_Ext_Worker, __destruct);
60-
ZEND_METHOD(Bottledcode_DurablePhp_Ext_Worker, getNextEvent);
6157
ZEND_METHOD(Bottledcode_DurablePhp_Ext_Worker, queryState);
6258
ZEND_METHOD(Bottledcode_DurablePhp_Ext_Worker, getUser);
6359
ZEND_METHOD(Bottledcode_DurablePhp_Ext_Worker, getSource);
@@ -79,7 +75,6 @@ static const zend_function_entry class_Bottledcode_DurablePhp_Ext_Worker_methods
7975
ZEND_ME(Bottledcode_DurablePhp_Ext_Worker, startEventLoop, arginfo_class_Bottledcode_DurablePhp_Ext_Worker_startEventLoop, ZEND_ACC_PUBLIC)
8076
ZEND_ME(Bottledcode_DurablePhp_Ext_Worker, drainEventLoop, arginfo_class_Bottledcode_DurablePhp_Ext_Worker_drainEventLoop, ZEND_ACC_PUBLIC)
8177
ZEND_ME(Bottledcode_DurablePhp_Ext_Worker, __destruct, arginfo_class_Bottledcode_DurablePhp_Ext_Worker___destruct, ZEND_ACC_PUBLIC)
82-
ZEND_ME(Bottledcode_DurablePhp_Ext_Worker, getNextEvent, arginfo_class_Bottledcode_DurablePhp_Ext_Worker_getNextEvent, ZEND_ACC_PUBLIC)
8378
ZEND_ME(Bottledcode_DurablePhp_Ext_Worker, queryState, arginfo_class_Bottledcode_DurablePhp_Ext_Worker_queryState, ZEND_ACC_PUBLIC)
8479
ZEND_ME(Bottledcode_DurablePhp_Ext_Worker, getUser, arginfo_class_Bottledcode_DurablePhp_Ext_Worker_getUser, ZEND_ACC_PUBLIC)
8580
ZEND_ME(Bottledcode_DurablePhp_Ext_Worker, getSource, arginfo_class_Bottledcode_DurablePhp_Ext_Worker_getSource, ZEND_ACC_PUBLIC)

0 commit comments

Comments
 (0)