Skip to content

Commit 87303b5

Browse files
committed
Add async_pending action and reactor NIF support
- Add handoff/2,3 API functions for FD handoff - Add setup_code option to run Python code in context - Add async_pending action handling for task-based ASGI - Add {write_ready, Fd} message handling for async completion - Register reactor NIFs in nif_funcs table - Fix erlang.reactor module import path in NIFs - Add async_pending_test to py_reactor_SUITE
1 parent e811e7b commit 87303b5

5 files changed

Lines changed: 310 additions & 9 deletions

File tree

c_src/py_event_loop.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3088,7 +3088,7 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,
30883088
gil_guard_t guard = gil_acquire();
30893089

30903090
/* Import erlang_reactor module */
3091-
PyObject *reactor_module = PyImport_ImportModule("erlang_reactor");
3091+
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
30923092
if (reactor_module == NULL) {
30933093
PyErr_Clear();
30943094
gil_release(guard);
@@ -3153,7 +3153,7 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc,
31533153
gil_guard_t guard = gil_acquire();
31543154

31553155
/* Import erlang_reactor module */
3156-
PyObject *reactor_module = PyImport_ImportModule("erlang_reactor");
3156+
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
31573157
if (reactor_module == NULL) {
31583158
PyErr_Clear();
31593159
gil_release(guard);
@@ -3231,7 +3231,7 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc,
32313231
}
32323232

32333233
/* Import erlang_reactor module */
3234-
PyObject *reactor_module = PyImport_ImportModule("erlang_reactor");
3234+
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
32353235
if (reactor_module == NULL) {
32363236
Py_DECREF(client_info);
32373237
PyErr_Clear();
@@ -3286,7 +3286,7 @@ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc,
32863286
if (fd >= 0) {
32873287
gil_guard_t guard = gil_acquire();
32883288

3289-
PyObject *reactor_module = PyImport_ImportModule("erlang_reactor");
3289+
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
32903290
if (reactor_module != NULL) {
32913291
PyObject *result = PyObject_CallMethod(reactor_module,
32923292
"close_connection", "i", fd);

c_src/py_event_loop.h

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,4 +799,72 @@ int create_default_event_loop(ErlNifEnv *env);
799799
*/
800800
int init_subinterpreter_event_loop(ErlNifEnv *env);
801801

802+
/* ============================================================================
803+
* Reactor NIF Functions (Erlang-as-Reactor architecture)
804+
* ============================================================================ */
805+
806+
/**
807+
* @brief Register a file descriptor for reactor monitoring
808+
*
809+
* NIF: reactor_register_fd(ContextRef, Fd, OwnerPid) -> {ok, FdRef} | {error, Reason}
810+
*/
811+
ERL_NIF_TERM nif_reactor_register_fd(ErlNifEnv *env, int argc,
812+
const ERL_NIF_TERM argv[]);
813+
814+
/**
815+
* @brief Re-register for read events after a one-shot event
816+
*
817+
* NIF: reactor_reselect_read(FdRef) -> ok | {error, Reason}
818+
*/
819+
ERL_NIF_TERM nif_reactor_reselect_read(ErlNifEnv *env, int argc,
820+
const ERL_NIF_TERM argv[]);
821+
822+
/**
823+
* @brief Register for write events
824+
*
825+
* NIF: reactor_select_write(FdRef) -> ok | {error, Reason}
826+
*/
827+
ERL_NIF_TERM nif_reactor_select_write(ErlNifEnv *env, int argc,
828+
const ERL_NIF_TERM argv[]);
829+
830+
/**
831+
* @brief Get the FD integer from an FD resource
832+
*
833+
* NIF: get_fd_from_resource(FdRef) -> Fd | {error, Reason}
834+
*/
835+
ERL_NIF_TERM nif_get_fd_from_resource(ErlNifEnv *env, int argc,
836+
const ERL_NIF_TERM argv[]);
837+
838+
/**
839+
* @brief Call Python protocol on_read_ready
840+
*
841+
* NIF: reactor_on_read_ready(ContextRef, Fd) -> {ok, Action} | {error, Reason}
842+
*/
843+
ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,
844+
const ERL_NIF_TERM argv[]);
845+
846+
/**
847+
* @brief Call Python protocol on_write_ready
848+
*
849+
* NIF: reactor_on_write_ready(ContextRef, Fd) -> {ok, Action} | {error, Reason}
850+
*/
851+
ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc,
852+
const ERL_NIF_TERM argv[]);
853+
854+
/**
855+
* @brief Initialize connection with Python protocol
856+
*
857+
* NIF: reactor_init_connection(ContextRef, Fd, ClientInfo) -> ok | {error, Reason}
858+
*/
859+
ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc,
860+
const ERL_NIF_TERM argv[]);
861+
862+
/**
863+
* @brief Close FD and cleanup Python protocol
864+
*
865+
* NIF: reactor_close_fd(FdRef) -> ok | {error, Reason}
866+
*/
867+
ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc,
868+
const ERL_NIF_TERM argv[]);
869+
802870
#endif /* PY_EVENT_LOOP_H */

c_src/py_nif.c

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include "py_nif.h"
4040
#include "py_asgi.h"
4141
#include "py_wsgi.h"
42+
#include "py_event_loop.h"
4243

4344
/* ============================================================================
4445
* Global state definitions
@@ -3865,7 +3866,17 @@ static ErlNifFunc nif_funcs[] = {
38653866
{"ref_interp_id", 1, nif_ref_interp_id, 0},
38663867
{"ref_to_term", 1, nif_ref_to_term, 0},
38673868
{"ref_getattr", 2, nif_ref_getattr, ERL_NIF_DIRTY_JOB_CPU_BOUND},
3868-
{"ref_call_method", 3, nif_ref_call_method, ERL_NIF_DIRTY_JOB_CPU_BOUND}
3869+
{"ref_call_method", 3, nif_ref_call_method, ERL_NIF_DIRTY_JOB_CPU_BOUND},
3870+
3871+
/* Reactor NIFs - Erlang-as-Reactor architecture */
3872+
{"reactor_register_fd", 3, nif_reactor_register_fd, 0},
3873+
{"reactor_reselect_read", 1, nif_reactor_reselect_read, 0},
3874+
{"reactor_select_write", 1, nif_reactor_select_write, 0},
3875+
{"get_fd_from_resource", 1, nif_get_fd_from_resource, 0},
3876+
{"reactor_on_read_ready", 2, nif_reactor_on_read_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND},
3877+
{"reactor_on_write_ready", 2, nif_reactor_on_write_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND},
3878+
{"reactor_init_connection", 3, nif_reactor_init_connection, ERL_NIF_DIRTY_JOB_CPU_BOUND},
3879+
{"reactor_close_fd", 1, nif_reactor_close_fd, 0}
38693880
};
38703881

38713882
ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload)

src/py_reactor_context.erl

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
start_link/2,
4141
start_link/3,
4242
stop/1,
43-
stats/1
43+
stats/1,
44+
handoff/2,
45+
handoff/3
4446
]).
4547

4648
%% Internal exports
@@ -89,6 +91,8 @@ start_link(Id, Mode) ->
8991
%% - max_connections: Maximum connections per context (default: 100)
9092
%% - app_module: Python module containing ASGI/WSGI app
9193
%% - app_callable: Python callable name (e.g., "app", "application")
94+
%% - setup_code: Binary Python code to execute after context creation
95+
%% (useful for setting up protocol factory)
9296
%%
9397
%% @param Id Unique identifier for this context
9498
%% @param Mode Context mode (auto, subinterp, worker)
@@ -141,6 +145,33 @@ stats(Ctx) when is_pid(Ctx) ->
141145
{error, timeout}
142146
end.
143147

148+
%% @doc Hand off a file descriptor to this reactor context.
149+
%%
150+
%% The context takes ownership of the FD and will handle I/O events.
151+
%% This is the main entry point for handing off accepted connections.
152+
%%
153+
%% @param Fd The raw file descriptor (from inet:getfd/1)
154+
%% @param ClientInfo Map with connection metadata (addr, port, type, etc.)
155+
-spec handoff(integer(), map()) -> ok | {error, term()}.
156+
handoff(Fd, ClientInfo) when is_integer(Fd), is_map(ClientInfo) ->
157+
%% Get a reactor context from the pool or use default
158+
case whereis(py_reactor_context_default) of
159+
undefined ->
160+
{error, no_reactor_context};
161+
Ctx ->
162+
handoff(Ctx, Fd, ClientInfo)
163+
end.
164+
165+
%% @doc Hand off a file descriptor to a specific reactor context.
166+
%%
167+
%% @param Ctx The reactor context pid
168+
%% @param Fd The raw file descriptor (from inet:getfd/1)
169+
%% @param ClientInfo Map with connection metadata
170+
-spec handoff(pid(), integer(), map()) -> ok | {error, term()}.
171+
handoff(Ctx, Fd, ClientInfo) when is_pid(Ctx), is_integer(Fd), is_map(ClientInfo) ->
172+
Ctx ! {fd_handoff, Fd, ClientInfo},
173+
ok.
174+
144175
%% ============================================================================
145176
%% Process loop
146177
%% ============================================================================
@@ -169,6 +200,19 @@ init(Parent, Id, Mode, Opts) ->
169200
AppModule = maps:get(app_module, Opts, undefined),
170201
AppCallable = maps:get(app_callable, Opts, undefined),
171202

203+
%% Execute setup code if specified (e.g., set protocol factory)
204+
SetupCode = maps:get(setup_code, Opts, undefined),
205+
case SetupCode of
206+
undefined -> ok;
207+
_ when is_binary(SetupCode) ->
208+
case py_nif:context_exec(Ref, SetupCode) of
209+
ok -> ok;
210+
{error, Reason} ->
211+
error_logger:error_msg(
212+
"py_reactor_context setup_code failed: ~p~n", [Reason])
213+
end
214+
end,
215+
172216
%% Initialize app in Python context if specified
173217
case AppModule of
174218
undefined -> ok;
@@ -214,6 +258,15 @@ loop(State) ->
214258
{select, FdRes, _Ref, ready_output} ->
215259
handle_write_ready(FdRes, State);
216260

261+
%% Async completion signal from Python
262+
%% Sent when an async task (e.g., ASGI app) completes and response is ready
263+
%% Accept both atom and binary forms since Python sends binaries
264+
{write_ready, Fd} ->
265+
handle_async_write_ready(Fd, State);
266+
267+
{<<"write_ready">>, Fd} ->
268+
handle_async_write_ready(Fd, State);
269+
217270
%% Control messages
218271
{stop, From, MRef} ->
219272
cleanup(State),
@@ -265,8 +318,11 @@ handle_fd_handoff(Fd, ClientInfo, State) ->
265318
%% Register FD for monitoring
266319
case py_nif:reactor_register_fd(Ref, Fd, self()) of
267320
{ok, FdRef} ->
321+
%% Inject reactor_pid into client_info for async signaling
322+
ClientInfoWithPid = ClientInfo#{reactor_pid => self()},
323+
268324
%% Initialize Python protocol handler
269-
case py_nif:reactor_init_connection(Ref, Fd, ClientInfo) of
325+
case py_nif:reactor_init_connection(Ref, Fd, ClientInfoWithPid) of
270326
ok ->
271327
%% Store connection info
272328
ConnInfo = #{
@@ -319,6 +375,16 @@ handle_read_ready(FdRes, State) ->
319375
},
320376
loop(NewState);
321377

378+
{ok, <<"async_pending">>} ->
379+
%% Async task submitted (e.g., ASGI app running as task)
380+
%% Don't reselect - wait for {write_ready, Fd} signal from Python
381+
%% Increment request count since task was accepted
382+
error_logger:info_msg("Received async_pending for fd=~p~n", [Fd]),
383+
NewState = State#state{
384+
total_requests = State#state.total_requests + 1
385+
},
386+
loop(NewState);
387+
322388
{ok, <<"close">>} ->
323389
%% Close connection
324390
close_connection(Fd, FdRes, State);
@@ -370,6 +436,31 @@ handle_write_ready(FdRes, State) ->
370436
loop(State)
371437
end.
372438

439+
%% ============================================================================
440+
%% Async Write Ready Handler
441+
%% ============================================================================
442+
443+
%% @private
444+
%% Handle async completion signal from Python.
445+
%% This is sent when an async task (like an ASGI app) has completed
446+
%% and the response buffer is ready to be written.
447+
handle_async_write_ready(Fd, State) ->
448+
#state{connections = Conns} = State,
449+
450+
error_logger:info_msg("handle_async_write_ready called for fd=~p~n", [Fd]),
451+
452+
case maps:get(Fd, Conns, undefined) of
453+
#{fd_ref := FdRef} ->
454+
%% Response buffer is ready, trigger write selection
455+
error_logger:info_msg("Triggering write selection for fd=~p~n", [Fd]),
456+
py_nif:reactor_select_write(FdRef),
457+
loop(State);
458+
undefined ->
459+
%% Connection not found (may have been closed), ignore
460+
error_logger:warning_msg("Connection not found for fd=~p~n", [Fd]),
461+
loop(State)
462+
end.
463+
373464
%% ============================================================================
374465
%% Connection Management
375466
%% ============================================================================

0 commit comments

Comments
 (0)