Skip to content

Commit d6bb934

Browse files
committed
Async dispatch for env-bearing call/eval/exec
dispatch_to_worker_thread_impl blocked on a 30s pthread_cond_timedwait in the env path. ML inference and other long-running calls returned {error, worker_timeout} while the worker kept going. Add three async-with-env NIFs that wrap the existing dispatch_to_worker_thread_async (which already takes a local_env), and rewire handle_*_with_suspension_and_env plus the {exec,_,_,_,EnvRef} loop arm to async-first / sync-fallback. The Erlang side waits in wait_for_async_result/2, which has the stale-result drain from ef31f76, so the env path now matches the non-env path's behavior. Adds test/py_context_async_env_SUITE (2 cases).
1 parent 527e7e3 commit d6bb934

4 files changed

Lines changed: 284 additions & 7 deletions

File tree

c_src/py_nif.c

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5210,6 +5210,134 @@ static ERL_NIF_TERM nif_context_exec_async(ErlNifEnv *env, int argc, const ERL_N
52105210
return make_error(env, "async_requires_worker_thread");
52115211
}
52125212

5213+
/**
5214+
* @brief Async call with process-local environment
5215+
*
5216+
* nif_context_call_with_env_async(ContextRef, CallerPid, RequestId,
5217+
* Module, Func, Args, Kwargs, EnvRef)
5218+
* -> {enqueued, RequestId} | {error, Reason}
5219+
*
5220+
* Same contract as nif_context_call_async but threads the process-local
5221+
* env through to the worker. Replaces the 30-second pthread_cond_timedwait
5222+
* dispatch path; the Erlang side waits in a normal receive.
5223+
*/
5224+
static ERL_NIF_TERM nif_context_call_with_env_async(ErlNifEnv *env, int argc,
5225+
const ERL_NIF_TERM argv[]) {
5226+
py_context_t *ctx;
5227+
py_env_resource_t *penv;
5228+
5229+
if (!runtime_is_running()) {
5230+
return make_error(env, "python_not_running");
5231+
}
5232+
if (argc < 8) {
5233+
return make_error(env, "badarg");
5234+
}
5235+
if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) {
5236+
return make_error(env, "invalid_context");
5237+
}
5238+
ErlNifPid caller_pid;
5239+
if (!enif_get_local_pid(env, argv[1], &caller_pid)) {
5240+
return make_error(env, "invalid_pid");
5241+
}
5242+
ERL_NIF_TERM request_id = argv[2];
5243+
if (!enif_get_resource(env, argv[7], PY_ENV_RESOURCE_TYPE, (void **)&penv)) {
5244+
return make_error(env, "invalid_env");
5245+
}
5246+
5247+
if (!ctx->uses_worker_thread) {
5248+
return make_error(env, "async_requires_worker_thread");
5249+
}
5250+
5251+
ERL_NIF_TERM kwargs = enif_is_map(env, argv[6])
5252+
? argv[6] : enif_make_new_map(env);
5253+
ERL_NIF_TERM request = enif_make_tuple4(env,
5254+
argv[3], /* Module */
5255+
argv[4], /* Func */
5256+
argv[5], /* Args */
5257+
kwargs);
5258+
return dispatch_to_worker_thread_async(env, ctx, CTX_REQ_CALL_WITH_ENV,
5259+
request, caller_pid, request_id, penv);
5260+
}
5261+
5262+
/**
5263+
* @brief Async eval with process-local environment
5264+
*
5265+
* nif_context_eval_with_env_async(ContextRef, CallerPid, RequestId,
5266+
* Code, Locals, EnvRef)
5267+
* -> {enqueued, RequestId} | {error, Reason}
5268+
*/
5269+
static ERL_NIF_TERM nif_context_eval_with_env_async(ErlNifEnv *env, int argc,
5270+
const ERL_NIF_TERM argv[]) {
5271+
py_context_t *ctx;
5272+
py_env_resource_t *penv;
5273+
5274+
if (!runtime_is_running()) {
5275+
return make_error(env, "python_not_running");
5276+
}
5277+
if (argc < 6) {
5278+
return make_error(env, "badarg");
5279+
}
5280+
if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) {
5281+
return make_error(env, "invalid_context");
5282+
}
5283+
ErlNifPid caller_pid;
5284+
if (!enif_get_local_pid(env, argv[1], &caller_pid)) {
5285+
return make_error(env, "invalid_pid");
5286+
}
5287+
ERL_NIF_TERM request_id = argv[2];
5288+
if (!enif_get_resource(env, argv[5], PY_ENV_RESOURCE_TYPE, (void **)&penv)) {
5289+
return make_error(env, "invalid_env");
5290+
}
5291+
5292+
if (!ctx->uses_worker_thread) {
5293+
return make_error(env, "async_requires_worker_thread");
5294+
}
5295+
5296+
ERL_NIF_TERM locals = enif_is_map(env, argv[4])
5297+
? argv[4] : enif_make_new_map(env);
5298+
ERL_NIF_TERM request = enif_make_tuple2(env, argv[3], locals);
5299+
return dispatch_to_worker_thread_async(env, ctx, CTX_REQ_EVAL_WITH_ENV,
5300+
request, caller_pid, request_id, penv);
5301+
}
5302+
5303+
/**
5304+
* @brief Async exec with process-local environment
5305+
*
5306+
* nif_context_exec_with_env_async(ContextRef, CallerPid, RequestId,
5307+
* Code, EnvRef)
5308+
* -> {enqueued, RequestId} | {error, Reason}
5309+
*/
5310+
static ERL_NIF_TERM nif_context_exec_with_env_async(ErlNifEnv *env, int argc,
5311+
const ERL_NIF_TERM argv[]) {
5312+
py_context_t *ctx;
5313+
py_env_resource_t *penv;
5314+
5315+
if (!runtime_is_running()) {
5316+
return make_error(env, "python_not_running");
5317+
}
5318+
if (argc < 5) {
5319+
return make_error(env, "badarg");
5320+
}
5321+
if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) {
5322+
return make_error(env, "invalid_context");
5323+
}
5324+
ErlNifPid caller_pid;
5325+
if (!enif_get_local_pid(env, argv[1], &caller_pid)) {
5326+
return make_error(env, "invalid_pid");
5327+
}
5328+
ERL_NIF_TERM request_id = argv[2];
5329+
if (!enif_get_resource(env, argv[4], PY_ENV_RESOURCE_TYPE, (void **)&penv)) {
5330+
return make_error(env, "invalid_env");
5331+
}
5332+
5333+
if (!ctx->uses_worker_thread) {
5334+
return make_error(env, "async_requires_worker_thread");
5335+
}
5336+
5337+
return dispatch_to_worker_thread_async(env, ctx, CTX_REQ_EXEC_WITH_ENV,
5338+
argv[3], caller_pid, request_id, penv);
5339+
}
5340+
52135341
/**
52145342
* @brief Evaluate a Python expression in a context
52155343
*
@@ -7772,6 +7900,9 @@ static ErlNifFunc nif_funcs[] = {
77727900
{"context_call_async", 7, nif_context_call_async, 0},
77737901
{"context_eval_async", 5, nif_context_eval_async, 0},
77747902
{"context_exec_async", 4, nif_context_exec_async, 0},
7903+
{"context_call_with_env_async", 8, nif_context_call_with_env_async, 0},
7904+
{"context_eval_with_env_async", 6, nif_context_eval_with_env_async, 0},
7905+
{"context_exec_with_env_async", 5, nif_context_exec_with_env_async, 0},
77757906
{"create_local_env", 1, nif_create_local_env, 0},
77767907
{"interp_apply_imports", 2, nif_interp_apply_imports, ERL_NIF_DIRTY_JOB_CPU_BOUND},
77777908
{"interp_apply_paths", 2, nif_interp_apply_paths, ERL_NIF_DIRTY_JOB_CPU_BOUND},

src/py_context.erl

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -567,10 +567,10 @@ loop(#state{ref = Ref, interp_id = InterpId} = State) ->
567567
From ! {MRef, Result},
568568
loop(State);
569569

570-
%% Exec with process-local environment (worker mode)
571-
%% Note: Uses blocking dispatch since async+env isn't implemented yet.
570+
%% Exec with process-local environment (worker mode).
571+
%% Async dispatch with sync fallback (mirrors call/eval).
572572
{exec, From, MRef, Code, EnvRef} ->
573-
Result = py_nif:context_exec(Ref, Code, EnvRef),
573+
Result = handle_exec_with_async_and_env(Ref, Code, EnvRef),
574574
From ! {MRef, Result},
575575
loop(State);
576576

@@ -846,9 +846,24 @@ process_async_result(_Ref, Result) ->
846846
Result.
847847

848848
%% @private
849-
%% Handle call with process-local environment
850-
%% Note: Uses blocking dispatch since async+env isn't implemented yet.
849+
%% Handle call with process-local environment.
850+
%% Tries async dispatch first (no 30 s NIF timeout); falls back to the
851+
%% blocking NIF only when the worker thread isn't available.
851852
handle_call_with_suspension_and_env(Ref, Module, Func, Args, Kwargs, EnvRef) ->
853+
RequestId = make_ref(),
854+
case py_nif:context_call_with_env_async(Ref, self(), RequestId,
855+
Module, Func, Args, Kwargs,
856+
EnvRef) of
857+
{enqueued, RequestId} ->
858+
wait_for_async_result(Ref, RequestId);
859+
{error, async_requires_worker_thread} ->
860+
handle_call_with_env_blocking(Ref, Module, Func, Args, Kwargs, EnvRef);
861+
{error, Reason} ->
862+
{error, Reason}
863+
end.
864+
865+
%% @private
866+
handle_call_with_env_blocking(Ref, Module, Func, Args, Kwargs, EnvRef) ->
852867
case py_nif:context_call(Ref, Module, Func, Args, Kwargs, EnvRef) of
853868
{suspended, _CallbackId, StateRef, {FuncName, CallbackArgs}} ->
854869
CallbackResult = handle_callback_with_nested_receive(Ref, FuncName, CallbackArgs),
@@ -860,9 +875,23 @@ handle_call_with_suspension_and_env(Ref, Module, Func, Args, Kwargs, EnvRef) ->
860875
end.
861876

862877
%% @private
863-
%% Handle eval with process-local environment
864-
%% Note: Uses blocking dispatch since async+env isn't implemented yet.
878+
%% Handle eval with process-local environment.
879+
%% Tries async dispatch first; falls back to the blocking NIF only when
880+
%% the worker thread isn't available.
865881
handle_eval_with_suspension_and_env(Ref, Code, Locals, EnvRef) ->
882+
RequestId = make_ref(),
883+
case py_nif:context_eval_with_env_async(Ref, self(), RequestId,
884+
Code, Locals, EnvRef) of
885+
{enqueued, RequestId} ->
886+
wait_for_async_result(Ref, RequestId);
887+
{error, async_requires_worker_thread} ->
888+
handle_eval_with_env_blocking(Ref, Code, Locals, EnvRef);
889+
{error, Reason} ->
890+
{error, Reason}
891+
end.
892+
893+
%% @private
894+
handle_eval_with_env_blocking(Ref, Code, Locals, EnvRef) ->
866895
case py_nif:context_eval(Ref, Code, Locals, EnvRef) of
867896
{suspended, _CallbackId, StateRef, {FuncName, CallbackArgs}} ->
868897
CallbackResult = handle_callback_with_nested_receive(Ref, FuncName, CallbackArgs),
@@ -873,6 +902,21 @@ handle_eval_with_suspension_and_env(Ref, Code, Locals, EnvRef) ->
873902
Result
874903
end.
875904

905+
%% @private
906+
%% Handle exec with process-local environment via the same async-first
907+
%% path used for call/eval.
908+
handle_exec_with_async_and_env(Ref, Code, EnvRef) ->
909+
RequestId = make_ref(),
910+
case py_nif:context_exec_with_env_async(Ref, self(), RequestId,
911+
Code, EnvRef) of
912+
{enqueued, RequestId} ->
913+
wait_for_async_result(Ref, RequestId);
914+
{error, async_requires_worker_thread} ->
915+
py_nif:context_exec(Ref, Code, EnvRef);
916+
{error, Reason} ->
917+
{error, Reason}
918+
end.
919+
876920
%% @private
877921
%% Check if a context is a subinterpreter (has interp_id > 0)
878922
is_context_subinterp(Ref) ->

src/py_nif.erl

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@
166166
context_call_async/7,
167167
context_eval_async/5,
168168
context_exec_async/4,
169+
context_call_with_env_async/8,
170+
context_eval_with_env_async/6,
171+
context_exec_with_env_async/5,
169172
context_call_method/4,
170173
create_local_env/1,
171174
context_to_term/1,
@@ -1299,6 +1302,34 @@ context_eval_async(_ContextRef, _CallerPid, _RequestId, _Code, _Locals) ->
12991302
context_exec_async(_ContextRef, _CallerPid, _RequestId, _Code) ->
13001303
?NIF_STUB.
13011304

1305+
%% @doc Async call with process-local environment.
1306+
%% @private
1307+
-spec context_call_with_env_async(reference(), pid(), term(),
1308+
binary(), binary(), list(), map(),
1309+
reference()) ->
1310+
{enqueued, term()} | {error, term()}.
1311+
context_call_with_env_async(_CtxRef, _CallerPid, _RequestId,
1312+
_Module, _Func, _Args, _Kwargs, _EnvRef) ->
1313+
?NIF_STUB.
1314+
1315+
%% @doc Async eval with process-local environment.
1316+
%% @private
1317+
-spec context_eval_with_env_async(reference(), pid(), term(),
1318+
binary(), map(), reference()) ->
1319+
{enqueued, term()} | {error, term()}.
1320+
context_eval_with_env_async(_CtxRef, _CallerPid, _RequestId,
1321+
_Code, _Locals, _EnvRef) ->
1322+
?NIF_STUB.
1323+
1324+
%% @doc Async exec with process-local environment.
1325+
%% @private
1326+
-spec context_exec_with_env_async(reference(), pid(), term(),
1327+
binary(), reference()) ->
1328+
{enqueued, term()} | {error, term()}.
1329+
context_exec_with_env_async(_CtxRef, _CallerPid, _RequestId,
1330+
_Code, _EnvRef) ->
1331+
?NIF_STUB.
1332+
13021333
%% @doc Call a method on a Python object in a context.
13031334
%%
13041335
%% NO MUTEX - caller must ensure exclusive access (process ownership).
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
%%% @doc Pin the async-with-env dispatch path.
2+
%%%
3+
%%% v3.0 introduced an async dispatch path for call / eval / exec that
4+
%%% returns {enqueued, RequestId} from the NIF and lets the Erlang side
5+
%%% wait in a normal receive. The env-bearing variants
6+
%%% (py_context:call/5, eval/5 with EnvRef, exec/3) used to take a
7+
%%% blocking sync dispatch with a 30-second pthread_cond_timedwait,
8+
%%% returning {error, worker_timeout} for long-running Python while
9+
%%% the worker kept going.
10+
%%%
11+
%%% These cases verify the env path now uses the async dispatch and
12+
%%% completes correctly.
13+
-module(py_context_async_env_SUITE).
14+
15+
-include_lib("common_test/include/ct.hrl").
16+
17+
-export([
18+
all/0,
19+
init_per_suite/1,
20+
end_per_suite/1
21+
]).
22+
23+
-export([
24+
async_env_call_returns_correct_result/1,
25+
env_call_does_not_dispatch_timeout/1
26+
]).
27+
28+
all() ->
29+
[
30+
async_env_call_returns_correct_result,
31+
env_call_does_not_dispatch_timeout
32+
].
33+
34+
init_per_suite(Config) ->
35+
{ok, _} = application:ensure_all_started(erlang_python),
36+
{ok, _} = py:start_contexts(),
37+
Config.
38+
39+
end_per_suite(_Config) ->
40+
ok = application:stop(erlang_python),
41+
ok.
42+
43+
async_env_call_returns_correct_result(_Config) ->
44+
%% py:call/3 wraps an EnvRef under the hood, so a successful
45+
%% round-trip proves the new context_call_with_env_async path is
46+
%% wired and the worker delivers a {py_result, _, _} for it.
47+
{ok, 4.0} = py:call(math, sqrt, [16]),
48+
{ok, 5.0} = py:call(math, sqrt, [25]),
49+
ok.
50+
51+
env_call_does_not_dispatch_timeout(_Config) ->
52+
%% Have the Python side block for 1 second. Under the old sync
53+
%% dispatch this exercised the 30-second pthread_cond_timedwait;
54+
%% now it's an Erlang-side receive on {py_result, _, _} so latency
55+
%% should track wall-clock and never produce {error, worker_timeout}.
56+
Ctx = py:context(1),
57+
EnvRef = py:get_local_env(Ctx),
58+
ok = py_context:exec(Ctx, <<
59+
"import time\n"
60+
"def _slow_round(x):\n"
61+
" time.sleep(1.0)\n"
62+
" return x * 2\n"
63+
>>, EnvRef),
64+
Start = erlang:monotonic_time(millisecond),
65+
{ok, 14} = py_context:call(Ctx, '__main__', '_slow_round', [7], #{},
66+
infinity, EnvRef),
67+
Elapsed = erlang:monotonic_time(millisecond) - Start,
68+
ct:pal("env-async call elapsed: ~p ms", [Elapsed]),
69+
true = Elapsed >= 900,
70+
true = Elapsed < 5000,
71+
ok.

0 commit comments

Comments
 (0)