Skip to content

Commit e5ee85f

Browse files
committed
fix(nif): harden OWN_GIL worker robustness
A per-request dict allocation failure in a subinterpreter worker no longer breaks (and permanently kills) the worker command loop while holding the GIL; it now sends an error response and keeps serving. The owngil_* dispatch NIFs run on dirty IO schedulers and use the non-blocking, deadline-bounded read_with_timeout/write_all_with_deadline helpers (cmd_pipe write end set O_NONBLOCK), so a stalled or dead worker can't wedge a scheduler forever or desync the framed protocol. SuspensionRequired is now resolved from the current interpreter's erlang module (like ProcessError), avoiding cross-interpreter PyObject use under OWN_GIL. Adds py_owngil_features_SUITE:owngil_reentrant_multi_stress_test.
1 parent fc39437 commit e5ee85f

5 files changed

Lines changed: 122 additions & 28 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,13 @@
44

55
### Security
66

7+
- **OWN_GIL worker robustness** (Python 3.14+) - A per-request allocation failure in
8+
a subinterpreter worker no longer `break`s (and permanently kills) the worker command
9+
loop; it returns an error and keeps serving. The `owngil_*` dispatch NIFs now run on
10+
dirty IO schedulers and use non-blocking, deadline-bounded pipe reads and writes, so a
11+
stalled or dead worker can't wedge a scheduler forever. The internal `SuspensionRequired`
12+
exception is now looked up per-interpreter (like `ProcessError`), avoiding cross-
13+
interpreter object use under OWN_GIL.
714
- **Callback suspend/resume lifetime hardening** - The worker resource is now kept
815
alive for the lifetime of a suspended callback (it could previously be GC'd mid-
916
suspension, causing a use-after-free on resume). A resume frees any prior result

c_src/py_callback.c

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,33 @@ static PyObject *get_current_process_error(void) {
147147
return exc_class;
148148
}
149149

150+
/**
151+
* Get the SuspensionRequired exception class from the current interpreter's
152+
* erlang module. Under OWN_GIL subinterpreters each interpreter has its own
153+
* erlang module/class, so raising the process-global object (which belongs to
154+
* whichever interpreter initialized last) is cross-interpreter UB. Mirrors
155+
* get_current_process_error().
156+
*/
157+
static PyObject *get_current_suspension_required(void) {
158+
PyObject *erlang_module = PyImport_ImportModule("erlang");
159+
if (erlang_module == NULL) {
160+
PyErr_Clear();
161+
return SuspensionRequiredException; /* Fallback to global */
162+
}
163+
164+
PyObject *exc_class = PyObject_GetAttrString(erlang_module, "SuspensionRequired");
165+
Py_DECREF(erlang_module);
166+
167+
if (exc_class == NULL) {
168+
PyErr_Clear();
169+
return SuspensionRequiredException; /* Fallback to global */
170+
}
171+
172+
/* See get_current_process_error: decref and rely on the module keeping it alive. */
173+
Py_DECREF(exc_class);
174+
return exc_class;
175+
}
176+
150177
/* ============================================================================
151178
* Callback Name Registry
152179
*
@@ -2117,7 +2144,7 @@ static PyObject *erlang_call_impl(PyObject *self, PyObject *args) {
21172144
Py_XSETREF(tl_pending_args, call_args);
21182145

21192146
/* Raise exception to abort Python execution */
2120-
PyErr_SetString(SuspensionRequiredException, "callback pending");
2147+
PyErr_SetString(get_current_suspension_required(), "callback pending");
21212148
return NULL;
21222149
}
21232150

c_src/py_nif.c

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1724,6 +1724,10 @@ static ERL_NIF_TERM nif_set_callback_handler(ErlNifEnv *env, int argc, const ERL
17241724
* dirty scheduler forever (the pipe write ends are set non-blocking). */
17251725
#define CALLBACK_RESPONSE_IO_TIMEOUT_MS 30000
17261726

1727+
/* Bound for OWN_GIL dispatch pipe I/O so a stalled/dead worker thread can't
1728+
* block the dispatching dirty scheduler forever. */
1729+
#define OWNGIL_IO_TIMEOUT_MS 30000
1730+
17271731
static ERL_NIF_TERM nif_send_callback_response(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
17281732
(void)argc;
17291733
int fd;
@@ -7292,16 +7296,19 @@ static ERL_NIF_TERM nif_owngil_create_session(ErlNifEnv *env, int argc,
72927296
.payload_len = 0,
72937297
};
72947298

7295-
/* Write header */
7296-
if (write(w->cmd_pipe[1], &header, sizeof(header)) != sizeof(header)) {
7299+
/* Write header (non-blocking write end + deadline so a stalled/dead worker
7300+
* can't block this dirty scheduler forever). */
7301+
if (write_all_with_deadline(w->cmd_pipe[1], &header, sizeof(header),
7302+
OWNGIL_IO_TIMEOUT_MS) != WRITE_OK) {
72977303
pthread_mutex_unlock(&w->dispatch_mutex);
72987304
return enif_make_tuple2(env, ATOM_ERROR,
72997305
enif_make_atom(env, "write_failed"));
73007306
}
73017307

7302-
/* Wait for response */
7308+
/* Wait for response, bounded by a deadline. */
73037309
owngil_header_t resp;
7304-
if (read(w->result_pipe[0], &resp, sizeof(resp)) != sizeof(resp)) {
7310+
if (read_with_timeout(w->result_pipe[0], &resp, sizeof(resp),
7311+
OWNGIL_IO_TIMEOUT_MS) != (ssize_t)sizeof(resp)) {
73057312
pthread_mutex_unlock(&w->dispatch_mutex);
73067313
return enif_make_tuple2(env, ATOM_ERROR,
73077314
enif_make_atom(env, "read_failed"));
@@ -7385,9 +7392,11 @@ static ERL_NIF_TERM nif_owngil_submit_task(ErlNifEnv *env, int argc,
73857392
.payload_len = payload_bin.size,
73867393
};
73877394

7388-
/* Write header and payload */
7389-
if (write(w->cmd_pipe[1], &header, sizeof(header)) != sizeof(header) ||
7390-
write(w->cmd_pipe[1], payload_bin.data, payload_bin.size) != (ssize_t)payload_bin.size) {
7395+
/* Write header and payload (non-blocking write end + deadline). */
7396+
if (write_all_with_deadline(w->cmd_pipe[1], &header, sizeof(header),
7397+
OWNGIL_IO_TIMEOUT_MS) != WRITE_OK ||
7398+
write_all_with_deadline(w->cmd_pipe[1], payload_bin.data, payload_bin.size,
7399+
OWNGIL_IO_TIMEOUT_MS) != WRITE_OK) {
73917400
pthread_mutex_unlock(&w->dispatch_mutex);
73927401
enif_release_binary(&payload_bin);
73937402
return enif_make_tuple2(env, ATOM_ERROR,
@@ -7443,11 +7452,13 @@ static ERL_NIF_TERM nif_owngil_destroy_session(ErlNifEnv *env, int argc,
74437452
.payload_len = 0,
74447453
};
74457454

7446-
/* Write header */
7447-
if (write(w->cmd_pipe[1], &header, sizeof(header)) == sizeof(header)) {
7448-
/* Wait for response */
7455+
/* Write header (best-effort, bounded). */
7456+
if (write_all_with_deadline(w->cmd_pipe[1], &header, sizeof(header),
7457+
OWNGIL_IO_TIMEOUT_MS) == WRITE_OK) {
7458+
/* Wait for response (best-effort, bounded). */
74497459
owngil_header_t resp;
7450-
read(w->result_pipe[0], &resp, sizeof(resp));
7460+
(void)read_with_timeout(w->result_pipe[0], &resp, sizeof(resp),
7461+
OWNGIL_IO_TIMEOUT_MS);
74517462
}
74527463

74537464
pthread_mutex_unlock(&w->dispatch_mutex);
@@ -7504,12 +7515,15 @@ static ERL_NIF_TERM nif_owngil_apply_imports(ErlNifEnv *env, int argc,
75047515
.payload_len = payload_bin.size,
75057516
};
75067517

7507-
/* Write header and payload */
7508-
if (write(w->cmd_pipe[1], &header, sizeof(header)) == sizeof(header)) {
7509-
write(w->cmd_pipe[1], payload_bin.data, payload_bin.size);
7510-
/* Wait for response */
7518+
/* Write header and payload (best-effort, bounded). */
7519+
if (write_all_with_deadline(w->cmd_pipe[1], &header, sizeof(header),
7520+
OWNGIL_IO_TIMEOUT_MS) == WRITE_OK) {
7521+
(void)write_all_with_deadline(w->cmd_pipe[1], payload_bin.data, payload_bin.size,
7522+
OWNGIL_IO_TIMEOUT_MS);
7523+
/* Wait for response (best-effort, bounded). */
75117524
owngil_header_t resp;
7512-
read(w->result_pipe[0], &resp, sizeof(resp));
7525+
(void)read_with_timeout(w->result_pipe[0], &resp, sizeof(resp),
7526+
OWNGIL_IO_TIMEOUT_MS);
75137527
}
75147528

75157529
enif_release_binary(&payload_bin);
@@ -7567,12 +7581,15 @@ static ERL_NIF_TERM nif_owngil_apply_paths(ErlNifEnv *env, int argc,
75677581
.payload_len = payload_bin.size,
75687582
};
75697583

7570-
/* Write header and payload */
7571-
if (write(w->cmd_pipe[1], &header, sizeof(header)) == sizeof(header)) {
7572-
write(w->cmd_pipe[1], payload_bin.data, payload_bin.size);
7573-
/* Wait for response */
7584+
/* Write header and payload (best-effort, bounded). */
7585+
if (write_all_with_deadline(w->cmd_pipe[1], &header, sizeof(header),
7586+
OWNGIL_IO_TIMEOUT_MS) == WRITE_OK) {
7587+
(void)write_all_with_deadline(w->cmd_pipe[1], payload_bin.data, payload_bin.size,
7588+
OWNGIL_IO_TIMEOUT_MS);
7589+
/* Wait for response (best-effort, bounded). */
75747590
owngil_header_t resp;
7575-
read(w->result_pipe[0], &resp, sizeof(resp));
7591+
(void)read_with_timeout(w->result_pipe[0], &resp, sizeof(resp),
7592+
OWNGIL_IO_TIMEOUT_MS);
75767593
}
75777594

75787595
enif_release_binary(&payload_bin);
@@ -7866,11 +7883,11 @@ static ErlNifFunc nif_funcs[] = {
78667883
{"subinterp_thread_pool_stats", 0, nif_subinterp_thread_pool_stats, 0},
78677884

78687885
/* OWN_GIL session management for event loop pool */
7869-
{"owngil_create_session", 1, nif_owngil_create_session, 0},
7870-
{"owngil_submit_task", 7, nif_owngil_submit_task, 0},
7871-
{"owngil_destroy_session", 2, nif_owngil_destroy_session, 0},
7872-
{"owngil_apply_imports", 3, nif_owngil_apply_imports, 0},
7873-
{"owngil_apply_paths", 3, nif_owngil_apply_paths, 0},
7886+
{"owngil_create_session", 1, nif_owngil_create_session, ERL_NIF_DIRTY_JOB_IO_BOUND},
7887+
{"owngil_submit_task", 7, nif_owngil_submit_task, ERL_NIF_DIRTY_JOB_IO_BOUND},
7888+
{"owngil_destroy_session", 2, nif_owngil_destroy_session, ERL_NIF_DIRTY_JOB_IO_BOUND},
7889+
{"owngil_apply_imports", 3, nif_owngil_apply_imports, ERL_NIF_DIRTY_JOB_IO_BOUND},
7890+
{"owngil_apply_paths", 3, nif_owngil_apply_paths, ERL_NIF_DIRTY_JOB_IO_BOUND},
78747891

78757892
/* Execution mode info */
78767893
{"execution_mode", 0, nif_execution_mode, 0},

c_src/py_subinterp_thread.c

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,13 @@ int subinterp_thread_pool_init(int num_workers) {
107107
pthread_mutex_destroy(&w->ns_mutex);
108108
goto cleanup_workers;
109109
}
110+
/* Non-blocking cmd_pipe write end so the dispatch NIFs can bound their
111+
* writes with write_all_with_deadline (no dirty-scheduler stall on a full
112+
* pipe). The worker thread reads cmd_pipe[0] blocking, which is fine. */
113+
{
114+
int wfl = fcntl(w->cmd_pipe[1], F_GETFL, 0);
115+
if (wfl >= 0) (void)fcntl(w->cmd_pipe[1], F_SETFL, wfl | O_NONBLOCK);
116+
}
110117
if (pipe(w->result_pipe) < 0) {
111118
fprintf(stderr, "subinterp_thread_pool_init: failed to create result_pipe for worker %d: %s\n",
112119
i, strerror(errno));
@@ -562,7 +569,12 @@ static void *worker_thread_main(void *arg) {
562569
if ((owns_globals && globals == NULL) || (owns_locals && locals == NULL)) {
563570
if (owns_globals) Py_XDECREF(globals);
564571
if (owns_locals) Py_XDECREF(locals);
565-
break;
572+
/* Per-request dict allocation failure: respond with an
573+
* error and keep serving. This previously `break`ed the
574+
* worker command loop, permanently killing the thread (and
575+
* leaving the GIL held), wedging every session routed to it. */
576+
resp_header.msg_type = MSG_ERROR;
577+
goto send_response;
566578
}
567579

568580
switch (header.req_type) {
@@ -933,6 +945,7 @@ static void *worker_thread_main(void *arg) {
933945
}
934946
}
935947

948+
send_response:
936949
if (tmp_env) {
937950
enif_free_env(tmp_env);
938951
}

test/py_owngil_features_SUITE.erl

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
owngil_reentrant_basic_test/1,
4848
owngil_reentrant_nested_test/1,
4949
owngil_reentrant_concurrent_test/1,
50+
owngil_reentrant_multi_stress_test/1,
5051
owngil_reentrant_complex_types_test/1,
5152
owngil_reentrant_thread_callback_test/1,
5253
owngil_reentrant_try_except_test/1
@@ -156,6 +157,7 @@ groups() ->
156157
owngil_reentrant_basic_test,
157158
owngil_reentrant_nested_test,
158159
owngil_reentrant_concurrent_test,
160+
owngil_reentrant_multi_stress_test,
159161
owngil_reentrant_complex_types_test,
160162
owngil_reentrant_thread_callback_test,
161163
owngil_reentrant_try_except_test
@@ -705,6 +707,34 @@ owngil_reentrant_nested_test(_Config) ->
705707

706708
py_context:stop(Ctx).
707709

710+
%% @doc Stress the per-interpreter exception machinery (H8) and the bounded
711+
%% OWN_GIL dispatch (H7): many reentrant suspend/resume cycles across several
712+
%% subinterpreter contexts concurrently. A cross-interpreter exception object or
713+
%% a dispatch read/write desync would corrupt or crash the node under this load.
714+
owngil_reentrant_multi_stress_test(_Config) ->
715+
py:register_function(owngil_level, fun([Level, N]) ->
716+
case Level >= N of
717+
true -> Level;
718+
false ->
719+
Code = iolist_to_binary(io_lib:format(
720+
"__import__('erlang').call('owngil_level', ~p, ~p)", [Level + 1, N])),
721+
{ok, R} = py:eval(Code),
722+
R
723+
end
724+
end),
725+
Parent = self(),
726+
Pids = [spawn(fun() ->
727+
{ok, Ctx} = py_context:start_link(CtxId, owngil),
728+
Ok = lists:all(fun(_) ->
729+
{ok, 6} =:= py_context:eval(Ctx,
730+
<<"__import__('erlang').call('owngil_level', 1, 6)">>, #{})
731+
end, lists:seq(1, 20)),
732+
py_context:stop(Ctx),
733+
Parent ! {self(), Ok}
734+
end) || CtxId <- lists:seq(1, 4)],
735+
[receive {P, true} -> ok after 60000 -> error({stress_failed, P}) end || P <- Pids],
736+
ok.
737+
708738
%% @doc Concurrent callbacks from multiple owngil contexts
709739
owngil_reentrant_concurrent_test(_Config) ->
710740
NumContexts = 4,

0 commit comments

Comments
 (0)