Skip to content

Commit 12e85b0

Browse files
committed
src: handle microtask/nextTick draining in coroutines
1 parent 9f29587 commit 12e85b0

File tree

3 files changed

+82
-107
lines changed

3 files changed

+82
-107
lines changed

src/coro/uv_promise.h

Lines changed: 9 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,35 @@
11
#ifndef SRC_CORO_UV_PROMISE_H_
22
#define SRC_CORO_UV_PROMISE_H_
33

4-
#include "coro/uv_task.h"
54
#include "env-inl.h"
65
#include "node_errors.h"
76
#include "v8.h"
87

98
namespace node {
109
namespace coro {
1110

12-
// ---------------------------------------------------------------------------
13-
// CoroutineScope — RAII helper for V8 scope management inside coroutines.
14-
//
15-
// After co_await returns, we're back on the event loop thread but may
16-
// not have an active HandleScope or Context. This sets them up:
17-
//
18-
// CoroutineScope scope(env);
19-
// auto value = v8::Integer::New(scope.isolate(), 42);
20-
// resolver.Get(scope.isolate())->Resolve(scope.context(), value);
21-
// ---------------------------------------------------------------------------
22-
23-
class CoroutineScope {
24-
public:
25-
explicit CoroutineScope(Environment* env)
26-
: handle_scope_(env->isolate()), context_scope_(env->context()) {}
27-
28-
v8::Isolate* isolate() const { return handle_scope_.GetIsolate(); }
29-
30-
private:
31-
v8::HandleScope handle_scope_;
32-
v8::Context::Scope context_scope_;
33-
34-
// Access context via the environment to avoid storing it
35-
};
36-
3711
// ---------------------------------------------------------------------------
3812
// ResolvePromise / RejectPromise — helpers for resolving/rejecting a
3913
// v8::Global<v8::Promise::Resolver> from inside a coroutine.
14+
//
15+
// When used inside a UvTrackedTask, the InternalCallbackScope from
16+
// on_resume() already provides a HandleScope and will drain microtasks
17+
// and nextTick on close (in on_suspend / final_suspend).
18+
//
19+
// When used inside an untracked UvTask (no InternalCallbackScope),
20+
// these create their own HandleScope.
4021
// ---------------------------------------------------------------------------
4122

4223
inline void ResolvePromise(Environment* env,
4324
v8::Global<v8::Promise::Resolver>& resolver,
4425
v8::Local<v8::Value> value) {
45-
CoroutineScope scope(env);
26+
v8::HandleScope scope(env->isolate());
4627
auto local = resolver.Get(env->isolate());
4728
USE(local->Resolve(env->context(), value));
4829
}
4930

5031
inline void ResolvePromiseUndefined(
5132
Environment* env, v8::Global<v8::Promise::Resolver>& resolver) {
52-
CoroutineScope scope(env);
5333
ResolvePromise(env, resolver, v8::Undefined(env->isolate()));
5434
}
5535

@@ -59,7 +39,7 @@ inline void RejectPromiseWithUVError(
5939
int uv_err,
6040
const char* syscall,
6141
const char* path = nullptr) {
62-
CoroutineScope scope(env);
42+
v8::HandleScope scope(env->isolate());
6343
auto local = resolver.Get(env->isolate());
6444
v8::Local<v8::Value> exception =
6545
UVException(env->isolate(), uv_err, syscall, nullptr, path);

src/coro/uv_tracked_task.h

Lines changed: 70 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -12,39 +12,36 @@
1212
#include "async_context_frame.h"
1313
#include "async_wrap.h"
1414
#include "env-inl.h"
15+
#include "node_internals.h"
1516
#include "v8.h"
1617

1718
#include "coro/uv_task.h" // for detail::PromiseBase
1819

1920
namespace node {
2021
namespace coro {
2122

22-
// ---------------------------------------------------------------------------
23-
// FixedString — a structural type that can be used as a non-type template
24-
// parameter in C++20. Allows compile-time string literals:
25-
//
26-
// UvTrackedTask<void, "FSREQPROMISE"> my_coro(...) { ... }
27-
//
28-
// The implicit deduction guide lets the string literal deduce N.
29-
// ---------------------------------------------------------------------------
30-
3123
template <size_t N>
32-
struct FixedString {
24+
struct ConstString {
3325
char data[N]{};
34-
consteval FixedString(const char (&str)[N]) { // NOLINT(runtime/explicit)
26+
consteval ConstString(const char (&str)[N]) { // NOLINT(runtime/explicit)
3527
std::copy_n(str, N, data);
3628
}
3729
constexpr const char* c_str() const { return data; }
3830
constexpr size_t size() const { return N - 1; } // exclude null terminator
3931
};
4032

41-
// Deduction guide: FixedString("hello") deduces N = 6.
33+
// Deduction guide: ConstString("hello") deduces N = 6.
4234
template <size_t N>
43-
FixedString(const char (&)[N]) -> FixedString<N>;
35+
ConstString(const char (&)[N]) -> ConstString<N>;
4436

4537
// ---------------------------------------------------------------------------
46-
// UvTrackedTask<T, Name> — A coroutine return type with full async_hooks
47-
// and async_context_frame integration.
38+
// UvTrackedTask<T, Name> — A coroutine return type with full async_hooks,
39+
// async_context_frame, and microtask/nextTick draining integration.
40+
//
41+
// Each resume-to-suspend segment of the coroutine is wrapped in an
42+
// InternalCallbackScope, giving it the same semantics as any other
43+
// callback entry into Node.js: async_hooks before/after, context frame
44+
// save/restore, and microtask+nextTick draining on close.
4845
//
4946
// The Name template parameter identifies the async resource type visible
5047
// to async_hooks.createHook() init callbacks — analogous to the per-
@@ -62,14 +59,13 @@ FixedString(const char (&)[N]) -> FixedString<N>;
6259
// task.InitTracking(env);
6360
// task.Start();
6461
// ---------------------------------------------------------------------------
65-
66-
template <typename T, FixedString Name>
62+
template <typename T, ConstString Name>
6763
class UvTrackedTask;
6864

6965
namespace detail {
7066

71-
// TrackedAwaitable — wraps any awaitable with before/after hook emission
72-
// and async_context_frame save/restore. Created by await_transform.
67+
// TrackedAwaitable — wraps any awaitable with InternalCallbackScope
68+
// management. Created by await_transform.
7369
template <typename Inner, typename Promise>
7470
struct TrackedAwaitable {
7571
Inner inner_;
@@ -78,34 +74,50 @@ struct TrackedAwaitable {
7874
bool await_ready() noexcept { return inner_.await_ready(); }
7975

8076
// Called when the coroutine is about to suspend.
77+
// Close the InternalCallbackScope — this emits after hooks,
78+
// restores the async context, and drains microtasks + nextTick.
8179
template <typename H>
8280
auto await_suspend(H h) noexcept {
8381
promise_->on_suspend();
8482
return inner_.await_suspend(h);
8583
}
8684

8785
// Called when the coroutine resumes after the async operation.
86+
// Open a new InternalCallbackScope — this emits before hooks,
87+
// pushes the async context, and swaps in the context frame.
8888
auto await_resume() noexcept(noexcept(inner_.await_resume())) {
8989
promise_->on_resume();
9090
return inner_.await_resume();
9191
}
9292
};
9393

9494
// TrackedPromiseBase — extends PromiseBase with async_hooks state.
95+
// Each resume-to-suspend segment is bracketed by an InternalCallbackScope,
96+
// which handles all async context management and task queue draining.
9597
struct TrackedPromiseBase : PromiseBase {
9698
Environment* env_ = nullptr;
9799
double async_id_ = -1;
98100
double trigger_async_id_ = -1;
101+
102+
// The async_context_frame captured at init — passed to each scope.
99103
v8::Global<v8::Value> context_frame_;
100-
v8::Global<v8::Value> prior_context_frame_;
101104

102-
// Whether any async_hooks are active (checked once at init).
103-
bool hooks_active_ = false;
105+
// The resource object — created once at init, reused for each scope
106+
// and for executionAsyncResource().
107+
v8::Global<v8::Object> resource_;
104108

105-
// Track whether we're inside a resume (to avoid double-pop on final).
106-
bool in_resume_ = false;
109+
// HandleScope that lives across the entire resume-to-suspend segment.
110+
// Must be created before and destroyed after the InternalCallbackScope.
111+
std::optional<v8::HandleScope> handle_scope_;
112+
113+
// The active InternalCallbackScope for the current resume segment.
114+
// Created in on_resume(), closed and destroyed in on_suspend().
115+
std::optional<InternalCallbackScope> scope_;
107116

108117
// Called by InitTracking() on the task.
118+
// `type_name` identifies the subsystem (e.g. "FSREQPROMISE",
119+
// "GETADDRINFOREQWRAP") — analogous to AsyncWrap::ProviderType but
120+
// as a free-form string so callers aren't coupled to the enum.
109121
void init_tracking(Environment* env, const char* type_name) {
110122
env_ = env;
111123
v8::Isolate* isolate = env->isolate();
@@ -118,76 +130,62 @@ struct TrackedPromiseBase : PromiseBase {
118130
// Capture the current async_context_frame (for AsyncLocalStorage)
119131
context_frame_.Reset(isolate, async_context_frame::current(isolate));
120132

121-
// Check if any hooks are active
122-
AsyncHooks* hooks = env->async_hooks();
123-
hooks_active_ = hooks->fields()[AsyncHooks::kInit] > 0 ||
124-
hooks->fields()[AsyncHooks::kBefore] > 0 ||
125-
hooks->fields()[AsyncHooks::kAfter] > 0 ||
126-
hooks->fields()[AsyncHooks::kDestroy] > 0;
133+
// Create the resource object (reused across scopes)
134+
resource_.Reset(isolate, v8::Object::New(isolate));
127135

128136
// Emit init hook with the caller-supplied type name
129-
if (hooks_active_ && hooks->fields()[AsyncHooks::kInit] > 0) {
130-
v8::Local<v8::Object> resource = v8::Object::New(isolate);
137+
AsyncHooks* hooks = env->async_hooks();
138+
if (hooks->fields()[AsyncHooks::kInit] > 0) {
131139
v8::Local<v8::String> type =
132140
v8::String::NewFromUtf8(isolate, type_name).ToLocalChecked();
133141
AsyncWrap::EmitAsyncInit(
134-
env, resource, type, async_id_, trigger_async_id_);
142+
env, resource_.Get(isolate), type, async_id_, trigger_async_id_);
135143
}
136144
}
137145

138146
// Called when the coroutine resumes (at start or after co_await).
147+
// Opens an InternalCallbackScope which:
148+
// - Pushes async callback depth
149+
// - Swaps in the context frame (for AsyncLocalStorage)
150+
// - Pushes async context (async_id, trigger_async_id)
151+
// - Emits before hook
139152
void on_resume() noexcept {
140153
if (env_ == nullptr) return;
141-
in_resume_ = true;
142-
143-
v8::Isolate* isolate = env_->isolate();
144-
v8::HandleScope handle_scope(isolate);
145154

146-
// Phase 1: Restore async_context_frame (for AsyncLocalStorage)
147-
prior_context_frame_.Reset(
148-
isolate,
149-
async_context_frame::exchange(isolate, context_frame_.Get(isolate)));
150-
151-
// Phase 2: async_hooks before
152-
if (hooks_active_) {
153-
env_->async_hooks()->push_async_context(
154-
async_id_,
155-
trigger_async_id_,
156-
static_cast<v8::Local<v8::Object>*>(nullptr));
157-
AsyncWrap::EmitBefore(env_, async_id_);
158-
}
155+
// HandleScope must be created BEFORE and destroyed AFTER the
156+
// InternalCallbackScope — matching the pattern in FSReqPromise::Resolve.
157+
handle_scope_.emplace(env_->isolate());
158+
scope_.emplace(env_,
159+
&resource_,
160+
async_context{async_id_, trigger_async_id_},
161+
InternalCallbackScope::kNoFlags,
162+
context_frame_.Get(env_->isolate()));
159163
}
160164

161165
// Called when the coroutine suspends (at co_await or completion).
166+
// Closes and destroys the InternalCallbackScope which:
167+
// - Emits after hook
168+
// - Pops async context
169+
// - Restores prior context frame
170+
// - At outermost depth: drains microtasks and nextTick queue
171+
// - Pops async callback depth
162172
void on_suspend() noexcept {
163-
if (env_ == nullptr || !in_resume_) return;
164-
in_resume_ = false;
165-
166-
v8::Isolate* isolate = env_->isolate();
167-
v8::HandleScope handle_scope(isolate);
168-
169-
// Phase 2: async_hooks after
170-
if (hooks_active_) {
171-
AsyncWrap::EmitAfter(env_, async_id_);
172-
env_->async_hooks()->pop_async_context(async_id_);
173-
}
174-
175-
// Phase 1: Restore prior async_context_frame
176-
async_context_frame::set(isolate, prior_context_frame_.Get(isolate));
173+
if (!scope_.has_value()) return;
174+
scope_->Close();
175+
scope_.reset(); // ~InternalCallbackScope (pops callback depth)
176+
handle_scope_.reset(); // ~HandleScope (must be after scope_)
177177
}
178178

179179
// Called when the coroutine is done (at final_suspend).
180180
void on_destroy() noexcept {
181181
if (env_ == nullptr) return;
182182

183183
// Emit destroy hook (batched by Node.js internals)
184-
if (hooks_active_) {
185-
AsyncWrap::EmitDestroy(env_, async_id_);
186-
}
184+
AsyncWrap::EmitDestroy(env_, async_id_);
187185

188186
// Release globals
189187
context_frame_.Reset();
190-
prior_context_frame_.Reset();
188+
resource_.Reset();
191189
}
192190
};
193191

@@ -199,18 +197,17 @@ struct TrackedInitialAwaiter {
199197
bool await_ready() noexcept { return false; }
200198
void await_suspend(std::coroutine_handle<>) noexcept {}
201199

202-
// When resumed, fire on_resume to set up async context.
200+
// When resumed, open the first InternalCallbackScope.
203201
void await_resume() noexcept { promise_.on_resume(); }
204202

205203
Promise& promise_;
206204
};
207205

208-
// FinalAwaiter for tracked tasks — emits on_suspend + on_destroy,
206+
// FinalAwaiter for tracked tasks — closes the last scope, emits destroy,
209207
// then handles the standard detach/continuation logic.
210208
template <typename Promise>
211209
struct TrackedFinalAwaiter {
212210
bool await_ready() noexcept {
213-
// Emit hooks before deciding whether to suspend.
214211
promise_.on_suspend();
215212
promise_.on_destroy();
216213
return promise_.detached_;
@@ -232,7 +229,7 @@ struct TrackedFinalAwaiter {
232229
// ===========================================================================
233230
// UvTrackedTask<T, Name> — tracked coroutine that produces a T value
234231
// ===========================================================================
235-
template <typename T, FixedString Name>
232+
template <typename T, ConstString Name>
236233
class UvTrackedTask {
237234
public:
238235
struct promise_type : detail::TrackedPromiseBase {
@@ -253,7 +250,6 @@ class UvTrackedTask {
253250

254251
void return_value(T value) { result_.emplace(std::move(value)); }
255252

256-
// await_transform — intercepts every co_await to wrap with tracking.
257253
template <typename Awaitable>
258254
auto await_transform(Awaitable&& aw) {
259255
return detail::TrackedAwaitable<std::decay_t<Awaitable>, promise_type>{
@@ -277,7 +273,6 @@ class UvTrackedTask {
277273
}
278274

279275
// -- Tracking setup --
280-
// The type name comes from the template parameter — no string arg needed.
281276

282277
void InitTracking(Environment* env) {
283278
handle_.promise().init_tracking(env, Name.c_str());
@@ -320,7 +315,7 @@ class UvTrackedTask {
320315
// ===========================================================================
321316
// Partial specialization for T = void
322317
// ===========================================================================
323-
template <FixedString Name>
318+
template <ConstString Name>
324319
class UvTrackedTask<void, Name> {
325320
public:
326321
struct promise_type : detail::TrackedPromiseBase {

src/node_file.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4206,15 +4206,15 @@ static coro::UvTrackedTask<void, "COROREADFILE"> CoroReadFileBytesImpl(
42064206
// 4. Close the file
42074207
co_await coro::UvFs(env->event_loop(), uv_fs_close, static_cast<uv_file>(fd));
42084208

4209-
// 5. Create a Buffer and resolve the promise
4209+
// 5. Create a Buffer and resolve the promise.
4210+
// The InternalCallbackScope from on_resume() provides the HandleScope.
4211+
// Microtask and nextTick draining happens in on_suspend() via Close().
42104212
{
4211-
coro::CoroutineScope scope(env);
42124213
MaybeLocal<Object> maybe_buf = Buffer::Copy(env, data.data(), total_read);
42134214
Local<Object> buf;
42144215
if (maybe_buf.ToLocal(&buf)) {
42154216
coro::ResolvePromise(env, resolver, buf);
42164217
} else {
4217-
// Buffer allocation failed — reject with an error
42184218
auto msg = String::NewFromUtf8Literal(env->isolate(),
42194219
"Failed to allocate buffer");
42204220
auto err = v8::Exception::Error(msg);

0 commit comments

Comments
 (0)