Skip to content

Commit fc39437

Browse files
committed
fix(nif): harden callback suspend/resume lifetime
Keep the worker resource alive while a callback is suspended (it could be GC'd mid-suspension -> use-after-free on resume); release it in the suspended-state destructor and NULL-check before the replay deref. Free any prior result_data before a resume stores a new one (result_data, not the toggling has_result flag, is the real pending-result indicator, so a duplicate/raced resume no longer leaks). Clear the pending-callback TLS at the worker request boundary so a reused worker thread doesn't trip the stale-TLS invariant. Run the callback-response pipe writes on dirty IO schedulers with non-blocking, deadline-bounded writes so a stalled reader or large payload can't wedge a scheduler or desync the framed protocol. Also Py_XDECREF callback_args in the GIL-held destructor branch. Adds py_reentrant_SUITE:test_reentrant_resume_stress.
1 parent bd4d85b commit fc39437

5 files changed

Lines changed: 106 additions & 22 deletions

File tree

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@
44

55
### Security
66

7+
- **Callback suspend/resume lifetime hardening** - The worker resource is now kept
8+
alive for the lifetime of a suspended callback (it could previously be GC'd mid-
9+
suspension, causing a use-after-free on resume). A resume frees any prior result
10+
before storing a new one (no leak/double-replay on a duplicate resume), the
11+
pending-callback thread-local is cleared at the worker request boundary, and the
12+
callback-response pipe writes run on dirty schedulers with non-blocking, deadline-
13+
bounded writes so a stalled reader or large payload can't wedge a scheduler or
14+
desync the framed protocol.
715
- **Zero-copy buffer pinning** - `py_buffer` no longer relocates (and frees) its
816
storage while a Python `memoryview` points into it. A write that would grow the
917
buffer while a view is held now returns an error instead of dangling the view into

c_src/py_callback.c

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,14 @@ static suspended_state_t *create_suspended_state_ex(
399399
} else {
400400
state->worker = source->data.existing->worker;
401401
}
402+
/* Keep the worker resource alive for as long as the suspended state exists.
403+
* Without this the worker can be GC'd while a callback is suspended, and
404+
* nif_resume_callback_dirty would dereference a freed worker (use-after-free
405+
* with the GIL held). Mirrors the enif_keep_resource(ctx) on the context path;
406+
* suspended_state_destructor releases it. */
407+
if (state->worker != NULL) {
408+
enif_keep_resource(state->worker);
409+
}
402410

403411
state->callback_id = PyLong_AsUnsignedLongLong(callback_id_obj);
404412

@@ -4316,7 +4324,14 @@ static ERL_NIF_TERM nif_resume_callback(ErlNifEnv *env, int argc, const ERL_NIF_
43164324
/* Store the result in the suspended state */
43174325
pthread_mutex_lock(&state->mutex);
43184326

4319-
/* Copy result data */
4327+
/* Copy result data. Free any prior result first: a duplicate/raced resume
4328+
* would otherwise leak the previous buffer. (has_result is not a one-shot
4329+
* flag -- it toggles during nested replay -- so result_data is the real
4330+
* pending-result indicator.) */
4331+
if (state->result_data != NULL) {
4332+
enif_free(state->result_data);
4333+
state->result_data = NULL;
4334+
}
43204335
state->result_data = enif_alloc(result_bin.size);
43214336
if (state->result_data == NULL) {
43224337
pthread_mutex_unlock(&state->mutex);
@@ -4364,6 +4379,12 @@ static ERL_NIF_TERM nif_resume_callback_dirty(ErlNifEnv *env, int argc, const ER
43644379
return make_error(env, "no_result");
43654380
}
43664381

4382+
/* The worker is kept alive for the lifetime of the suspended state, but
4383+
* guard rather than dereference NULL in the replay below. */
4384+
if (state->worker == NULL) {
4385+
return make_error(env, "no_worker");
4386+
}
4387+
43674388
/* Set up thread-local state for replay */
43684389
tl_current_worker = state->worker;
43694390
tl_callback_env = env;

c_src/py_exec.c

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -304,11 +304,14 @@ static void process_request(py_request_t *req) {
304304
suspended_state_t *suspended = create_suspended_state(env, exc_args, req);
305305
Py_DECREF(exc_args);
306306
if (suspended == NULL) {
307-
tl_pending_callback = false;
308-
Py_CLEAR(tl_pending_args);
307+
clear_pending_callback_tls();
309308
req->result = make_error(env, "create_suspended_state_failed");
310309
} else {
311310
req->result = build_suspended_result(env, suspended);
311+
/* func_name/args are copied into the suspended state; clear
312+
* the pending-callback TLS so a later request on this reused
313+
* worker thread doesn't trip the stale-TLS entry invariant. */
314+
clear_pending_callback_tls();
312315
}
313316
}
314317
} else {
@@ -408,11 +411,11 @@ static void process_request(py_request_t *req) {
408411
suspended_state_t *suspended = create_suspended_state(env, exc_args, req);
409412
Py_DECREF(exc_args);
410413
if (suspended == NULL) {
411-
tl_pending_callback = false;
412-
Py_CLEAR(tl_pending_args);
414+
clear_pending_callback_tls();
413415
req->result = make_error(env, "create_suspended_state_failed");
414416
} else {
415417
req->result = build_suspended_result(env, suspended);
418+
clear_pending_callback_tls();
416419
}
417420
}
418421
} else {

c_src/py_nif.c

Lines changed: 45 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -513,11 +513,18 @@ static void suspended_state_destructor(ErlNifEnv *env, void *obj) {
513513
(void)env;
514514
suspended_state_t *state = (suspended_state_t *)obj;
515515

516+
/* Release the worker resource kept alive in create_suspended_state_ex. */
517+
if (state->worker != NULL) {
518+
enif_release_resource(state->worker);
519+
state->worker = NULL;
520+
}
521+
516522
/* Clean up Python objects if Python is still initialized.
517523
* suspended_state_t is used with the worker-based API which runs in
518524
* the main interpreter, so we always use PyGILState_Ensure. */
519525
if (runtime_is_running() && state->callback_args != NULL) {
520526
if (PyGILState_GetThisThreadState() != NULL || PyGILState_Check()) {
527+
Py_XDECREF(state->callback_args);
521528
state->callback_args = NULL;
522529
} else {
523530
PyGILState_STATE gstate = PyGILState_Ensure();
@@ -1700,6 +1707,11 @@ static ERL_NIF_TERM nif_set_callback_handler(ErlNifEnv *env, int argc, const ERL
17001707
if (pipe(worker->callback_pipe) < 0) {
17011708
return make_error(env, "pipe_failed");
17021709
}
1710+
/* Non-blocking write end so write_all_with_deadline can bound the write. */
1711+
{
1712+
int wfl = fcntl(worker->callback_pipe[1], F_GETFL, 0);
1713+
if (wfl >= 0) (void)fcntl(worker->callback_pipe[1], F_SETFL, wfl | O_NONBLOCK);
1714+
}
17031715

17041716
worker->has_callback_handler = true;
17051717

@@ -1708,6 +1720,10 @@ static ERL_NIF_TERM nif_set_callback_handler(ErlNifEnv *env, int argc, const ERL
17081720
enif_make_int(env, worker->callback_pipe[1]));
17091721
}
17101722

1723+
/* Bound for callback-response pipe writes: a stalled reader must not block a
1724+
* dirty scheduler forever (the pipe write ends are set non-blocking). */
1725+
#define CALLBACK_RESPONSE_IO_TIMEOUT_MS 30000
1726+
17111727
static ERL_NIF_TERM nif_send_callback_response(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
17121728
(void)argc;
17131729
int fd;
@@ -1721,15 +1737,16 @@ static ERL_NIF_TERM nif_send_callback_response(ErlNifEnv *env, int argc, const E
17211737
return make_error(env, "invalid_response");
17221738
}
17231739

1724-
/* Write length then data */
1740+
/* Write length then data with a timed, non-blocking writer (the pipe write
1741+
* end is O_NONBLOCK) so a stalled reader or a large payload can't block a
1742+
* dirty scheduler forever or desync the length-framed protocol on EINTR. */
17251743
uint32_t len = (uint32_t)response.size;
1726-
ssize_t n = write(fd, &len, sizeof(len));
1727-
if (n != sizeof(len)) {
1744+
if (write_all_with_deadline(fd, &len, sizeof(len),
1745+
CALLBACK_RESPONSE_IO_TIMEOUT_MS) != WRITE_OK) {
17281746
return make_error(env, "write_length_failed");
17291747
}
1730-
1731-
n = write(fd, response.data, response.size);
1732-
if (n != (ssize_t)response.size) {
1748+
if (write_all_with_deadline(fd, response.data, response.size,
1749+
CALLBACK_RESPONSE_IO_TIMEOUT_MS) != WRITE_OK) {
17331750
return make_error(env, "write_data_failed");
17341751
}
17351752

@@ -4702,6 +4719,11 @@ static ERL_NIF_TERM nif_context_create(ErlNifEnv *env, int argc, const ERL_NIF_T
47024719
enif_release_resource(ctx);
47034720
return make_error(env, "pipe_create_failed");
47044721
}
4722+
/* Non-blocking write end so write_all_with_deadline can bound the write. */
4723+
{
4724+
int wfl = fcntl(ctx->callback_pipe[1], F_GETFL, 0);
4725+
if (wfl >= 0) (void)fcntl(ctx->callback_pipe[1], F_SETFL, wfl | O_NONBLOCK);
4726+
}
47054727

47064728
#ifdef HAVE_SUBINTERPRETERS
47074729
ctx->uses_own_gil = false;
@@ -6579,15 +6601,17 @@ static ERL_NIF_TERM nif_context_write_callback_response(ErlNifEnv *env, int argc
65796601
return make_error(env, "pipe_not_initialized");
65806602
}
65816603

6582-
/* Write length prefix (4 bytes, native endianness - must match read_length_prefixed_data) */
6604+
/* Write length prefix + data with a timed, non-blocking writer (the pipe
6605+
* write end is O_NONBLOCK) so a stalled reader or large payload can't block a
6606+
* dirty scheduler forever or desync the framed protocol. 4-byte native-endian
6607+
* length must match read_length_prefixed_data. */
65836608
uint32_t len = (uint32_t)data.size;
6584-
ssize_t written = write(ctx->callback_pipe[1], &len, sizeof(len));
6585-
if (written != sizeof(len)) {
6609+
if (write_all_with_deadline(ctx->callback_pipe[1], &len, sizeof(len),
6610+
CALLBACK_RESPONSE_IO_TIMEOUT_MS) != WRITE_OK) {
65866611
return make_error(env, "write_failed");
65876612
}
6588-
6589-
written = write(ctx->callback_pipe[1], data.data, data.size);
6590-
if (written != (ssize_t)data.size) {
6613+
if (write_all_with_deadline(ctx->callback_pipe[1], data.data, data.size,
6614+
CALLBACK_RESPONSE_IO_TIMEOUT_MS) != WRITE_OK) {
65916615
return make_error(env, "write_failed");
65926616
}
65936617

@@ -6639,7 +6663,13 @@ static ERL_NIF_TERM nif_context_resume(ErlNifEnv *env, int argc, const ERL_NIF_T
66396663
return make_error(env, "context_mismatch");
66406664
}
66416665

6642-
/* Store the callback result */
6666+
/* Store the callback result. Free any prior result first to avoid leaking it
6667+
* on a duplicate/raced resume (result_data, not the toggling has_result flag,
6668+
* is the real pending-result indicator). */
6669+
if (state->result_data != NULL) {
6670+
enif_free(state->result_data);
6671+
state->result_data = NULL;
6672+
}
66436673
state->result_data = enif_alloc(result_bin.size);
66446674
if (state->result_data == NULL) {
66456675
return make_error(env, "alloc_failed");
@@ -7812,7 +7842,7 @@ static ErlNifFunc nif_funcs[] = {
78127842

78137843
/* Callback support */
78147844
{"set_callback_handler", 2, nif_set_callback_handler, 0},
7815-
{"send_callback_response", 2, nif_send_callback_response, 0},
7845+
{"send_callback_response", 2, nif_send_callback_response, ERL_NIF_DIRTY_JOB_IO_BOUND},
78167846
{"resume_callback", 2, nif_resume_callback, 0},
78177847

78187848
/* Async worker management */
@@ -7961,7 +7991,7 @@ static ErlNifFunc nif_funcs[] = {
79617991
{"context_interp_id", 1, nif_context_interp_id, 0},
79627992
{"context_set_callback_handler", 2, nif_context_set_callback_handler, 0},
79637993
{"context_get_callback_pipe", 1, nif_context_get_callback_pipe, 0},
7964-
{"context_write_callback_response", 2, nif_context_write_callback_response, 0},
7994+
{"context_write_callback_response", 2, nif_context_write_callback_response, ERL_NIF_DIRTY_JOB_IO_BOUND},
79657995
{"context_resume", 3, nif_context_resume, ERL_NIF_DIRTY_JOB_CPU_BOUND},
79667996
{"context_cancel_resume", 2, nif_context_cancel_resume, 0},
79677997
{"context_get_event_loop", 1, nif_context_get_event_loop, 0},

test/py_reentrant_SUITE.erl

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
test_callback_with_try_except/1,
2626
test_async_call/1,
2727
test_callback_name_registry/1,
28-
test_etf_decode_safe/1
28+
test_etf_decode_safe/1,
29+
test_reentrant_resume_stress/1
2930
]).
3031

3132
all() ->
@@ -40,7 +41,8 @@ all() ->
4041
test_callback_with_try_except,
4142
test_async_call,
4243
test_callback_name_registry,
43-
test_etf_decode_safe
44+
test_etf_decode_safe,
45+
test_reentrant_resume_stress
4446
].
4547

4648
init_per_suite(Config) ->
@@ -69,6 +71,7 @@ end_per_testcase(_TestCase, _Config) ->
6971
try py:unregister_function(test_registry_func) catch _:_ -> ok end,
7072
try py:unregister_function(etf_probe_ok) catch _:_ -> ok end,
7173
try py:unregister_function(etf_probe_novel) catch _:_ -> ok end,
74+
try py:unregister_function(rs_double) catch _:_ -> ok end,
7275
ok.
7376

7477
%%% ============================================================================
@@ -93,6 +96,25 @@ test_basic_reentrant(_Config) ->
9396

9497
ok.
9598

99+
%% @doc Many reentrant suspend/resume cycles must all succeed with no leak,
100+
%% stale-TLS invariant trip, or crash. Regression for the suspend/resume lifetime
101+
%% fixes (worker keep, TLS clear, result_data free, hardened callback-pipe writes);
102+
%% exercises the callback pipe and context resume under repetition.
103+
test_reentrant_resume_stress(_Config) ->
104+
py:register_function(rs_double, fun([X]) ->
105+
{ok, R} = py:eval(iolist_to_binary(io_lib:format("~p * 2", [X]))),
106+
R
107+
end),
108+
Got = [begin
109+
Code = iolist_to_binary(
110+
io_lib:format("__import__('erlang').call('rs_double', ~p) + 1", [N])),
111+
{ok, V} = py:eval(Code),
112+
V
113+
end || N <- lists:seq(1, 100)],
114+
Got = [N * 2 + 1 || N <- lists:seq(1, 100)],
115+
py:unregister_function(rs_double),
116+
ok.
117+
96118
%% @doc Regression for the binary_to_term SAFE-flag hardening (atom exhaustion).
97119
%% A `__etf__:` callback result that encodes a brand-new atom must be rejected (not
98120
%% decoded) so it cannot mint non-GC'd atoms, while a valid existing-atom payload

0 commit comments

Comments
 (0)