Skip to content

Commit 3859c94

Browse files
committed
src: implement a prototype for uv coroutines
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode/Open 4.6
1 parent f3633ef commit 3859c94

File tree

15 files changed

+2423
-121
lines changed

15 files changed

+2423
-121
lines changed

node.gyp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@
219219
'src/compile_cache.h',
220220
'src/connect_wrap.h',
221221
'src/connection_wrap.h',
222+
'src/coro/uv_task.h',
223+
'src/coro/uv_tracked_task.h',
224+
'src/coro/uv_awaitable.h',
225+
'src/coro/uv_promise.h',
222226
'src/cppgc_helpers.h',
223227
'src/cppgc_helpers.cc',
224228
'src/dataqueue/queue.h',

src/api/callback.cc

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ using v8::Value;
2222
CallbackScope::CallbackScope(Isolate* isolate,
2323
Local<Object> object,
2424
async_context async_context)
25-
: CallbackScope(Environment::GetCurrent(isolate), object, async_context) {}
25+
: CallbackScope(Environment::GetCurrent(isolate), object, async_context) {}
2626

2727
CallbackScope::CallbackScope(Environment* env,
2828
Local<Object> object,
@@ -52,8 +52,7 @@ CallbackScope::CallbackScope(Environment* env,
5252
}
5353

5454
CallbackScope::~CallbackScope() {
55-
if (try_catch_.HasCaught())
56-
private_->MarkAsFailed();
55+
if (try_catch_.HasCaught()) private_->MarkAsFailed();
5756
delete private_;
5857
}
5958

@@ -86,7 +85,15 @@ InternalCallbackScope::InternalCallbackScope(
8685
} else {
8786
object = std::get<Global<Object>*>(object_arg);
8887
}
89-
std::visit([](auto* ptr) { CHECK_NOT_NULL(ptr); }, object);
88+
// Global<Object>* may be null when no resource object was created
89+
// (e.g., coroutine tasks when async_hooks are not active).
90+
// push_async_context already handles the null case by skipping the
91+
// native_execution_async_resources_ store.
92+
if (auto* gptr = std::get_if<Global<Object>*>(&object)) {
93+
CHECK_IMPLIES(*gptr != nullptr, !(*gptr)->IsEmpty());
94+
} else {
95+
std::visit([](auto* ptr) { CHECK_NOT_NULL(ptr); }, object);
96+
}
9097

9198
env->PushAsyncCallbackScope();
9299

@@ -217,8 +224,7 @@ MaybeLocal<Value> InternalMakeCallback(Environment* env,
217224
Local<Value> context_frame) {
218225
CHECK(!recv.IsEmpty());
219226
#ifdef DEBUG
220-
for (int i = 0; i < argc; i++)
221-
CHECK(!argv[i].IsEmpty());
227+
for (int i = 0; i < argc; i++) CHECK(!argv[i].IsEmpty());
222228
#endif
223229

224230
Local<Function> hook_cb = env->async_hooks_callback_trampoline();
@@ -231,8 +237,9 @@ MaybeLocal<Value> InternalMakeCallback(Environment* env,
231237
flags = InternalCallbackScope::kSkipAsyncHooks;
232238
use_async_hooks_trampoline =
233239
async_hooks->fields()[AsyncHooks::kBefore] +
234-
async_hooks->fields()[AsyncHooks::kAfter] +
235-
async_hooks->fields()[AsyncHooks::kUsesExecutionAsyncResource] > 0;
240+
async_hooks->fields()[AsyncHooks::kAfter] +
241+
async_hooks->fields()[AsyncHooks::kUsesExecutionAsyncResource] >
242+
0;
236243
}
237244

238245
InternalCallbackScope scope(

src/async_wrap.h

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ namespace node {
3939
V(FILEHANDLE) \
4040
V(FILEHANDLECLOSEREQ) \
4141
V(BLOBREADER) \
42+
V(COROREADFILE) \
4243
V(FSEVENTWRAP) \
4344
V(FSREQCALLBACK) \
4445
V(FSREQPROMISE) \
@@ -124,11 +125,10 @@ class AsyncWrap : public BaseObject {
124125
};
125126

126127
enum ProviderType {
127-
#define V(PROVIDER) \
128-
PROVIDER_ ## PROVIDER,
128+
#define V(PROVIDER) PROVIDER_##PROVIDER,
129129
NODE_ASYNC_PROVIDER_TYPES(V)
130130
#undef V
131-
PROVIDERS_LENGTH,
131+
PROVIDERS_LENGTH,
132132
};
133133

134134
AsyncWrap(Environment* env,
@@ -171,9 +171,9 @@ class AsyncWrap : public BaseObject {
171171
const v8::FunctionCallbackInfo<v8::Value>& args);
172172
static void GetProviderType(const v8::FunctionCallbackInfo<v8::Value>& args);
173173
static void QueueDestroyAsyncId(
174-
const v8::FunctionCallbackInfo<v8::Value>& args);
174+
const v8::FunctionCallbackInfo<v8::Value>& args);
175175
static void SetCallbackTrampoline(
176-
const v8::FunctionCallbackInfo<v8::Value>& args);
176+
const v8::FunctionCallbackInfo<v8::Value>& args);
177177

178178
static void EmitAsyncInit(Environment* env,
179179
v8::Local<v8::Object> object,
@@ -212,22 +212,16 @@ class AsyncWrap : public BaseObject {
212212
int argc,
213213
v8::Local<v8::Value>* argv);
214214
inline v8::MaybeLocal<v8::Value> MakeCallback(
215-
const v8::Local<v8::Symbol> symbol,
216-
int argc,
217-
v8::Local<v8::Value>* argv);
215+
const v8::Local<v8::Symbol> symbol, int argc, v8::Local<v8::Value>* argv);
218216
inline v8::MaybeLocal<v8::Value> MakeCallback(
219-
const v8::Local<v8::String> symbol,
220-
int argc,
221-
v8::Local<v8::Value>* argv);
217+
const v8::Local<v8::String> symbol, int argc, v8::Local<v8::Value>* argv);
222218
inline v8::MaybeLocal<v8::Value> MakeCallback(
223-
const v8::Local<v8::Name> symbol,
224-
int argc,
225-
v8::Local<v8::Value>* argv);
219+
const v8::Local<v8::Name> symbol, int argc, v8::Local<v8::Value>* argv);
226220

227221
virtual std::string diagnostic_name() const;
228222
const char* MemoryInfoName() const override;
229223

230-
static void WeakCallback(const v8::WeakCallbackInfo<DestroyParam> &info);
224+
static void WeakCallback(const v8::WeakCallbackInfo<DestroyParam>& info);
231225

232226
// Returns the object that 'owns' an async wrap. For example, for a
233227
// TCP connection handle, this is the corresponding net.Socket.

src/coro/README.md

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
# C++20 Coroutine support for libuv
2+
3+
This directory contains an experimental C++20 coroutine layer for writing
4+
asynchronous libuv operations as sequential C++ code using `co_await`.
5+
6+
The primary goal is to allow multi-step async operations (such as
7+
open + stat + read + close) to be written as straight-line C++ instead of
8+
callback chains, while maintaining full integration with Node.js async\_hooks,
9+
AsyncLocalStorage, microtask draining, and environment lifecycle management.
10+
11+
## File overview
12+
13+
* `uv_task.h` -- `UvTask<T>`: The lightweight, untracked coroutine return type.
14+
No V8 or Node.js dependencies. Suitable for internal C++ coroutines that do
15+
not need async\_hooks visibility or task queue draining.
16+
17+
* `uv_tracked_task.h` -- `UvTrackedTask<T, Provider>`: The fully-integrated
18+
coroutine return type. Each resume-to-suspend segment is wrapped in an
19+
`InternalCallbackScope`, giving it the same semantics as any other callback
20+
entry into Node.js. The `Provider` template parameter is an
21+
`AsyncWrap::ProviderType` enum value that identifies the async resource type
22+
visible to `async_hooks.createHook()` and trace events, using the same
23+
type registry as the rest of Node.js.
24+
25+
* `uv_awaitable.h` -- Awaitable wrappers for libuv async operations:
26+
`UvFsAwaitable` (fs operations), `UvFsStatAwaitable` (stat-family),
27+
`UvWorkAwaitable` (thread pool work), and `UvGetAddrInfoAwaitable`
28+
(DNS resolution). Each embeds the libuv request struct directly in the
29+
coroutine frame, avoiding separate heap allocations. Each also exposes a
30+
`cancelable_req()` method returning the underlying `uv_req_t*` for
31+
cancellation support during environment teardown.
32+
33+
* `uv_promise.h` -- Helpers for bridging coroutines to JavaScript Promises:
34+
`MakePromise()`, `ResolvePromise()`, `RejectPromiseWithUVError()`. The
35+
resolve and reject helpers guard against calling V8 APIs when the
36+
environment is shutting down (`can_call_into_js()` check).
37+
38+
## Usage
39+
40+
### Basic pattern (binding function)
41+
42+
```cpp
43+
// The coroutine. The return type carries the provider type as
44+
// a compile-time template argument.
45+
static coro::UvTrackedTask<void, AsyncWrap::PROVIDER_FSREQPROMISE>
46+
DoAccessImpl(
47+
Environment* env,
48+
v8::Global<v8::Promise::Resolver> resolver,
49+
std::string path,
50+
int mode) {
51+
ssize_t result = co_await coro::UvFs(
52+
env->event_loop(), uv_fs_access, path.c_str(), mode);
53+
if (result < 0)
54+
coro::RejectPromiseWithUVError(env, resolver, result, "access",
55+
path.c_str());
56+
else
57+
coro::ResolvePromiseUndefined(env, resolver);
58+
}
59+
60+
// The binding entry point (called from JavaScript).
61+
static void Access(const FunctionCallbackInfo<Value>& args) {
62+
Environment* env = Environment::GetCurrent(args);
63+
// ... parse args, check permissions ...
64+
65+
auto resolver = coro::MakePromise(env, args);
66+
auto task = DoAccessImpl(env, std::move(resolver), path, mode);
67+
task.InitTracking(env); // assigns async_id, captures context, emits init
68+
task.Start(); // begins execution (fire-and-forget)
69+
}
70+
```
71+
72+
### Multi-step operations
73+
74+
Multiple libuv calls within a single coroutine are sequential co\_await
75+
expressions. The intermediate steps (between two co\_await points) are pure C++
76+
with no V8 overhead:
77+
78+
```cpp
79+
static coro::UvTrackedTask<void, AsyncWrap::PROVIDER_COROREADFILE>
80+
ReadFileImpl(
81+
Environment* env,
82+
v8::Global<v8::Promise::Resolver> resolver,
83+
std::string path) {
84+
ssize_t fd = co_await coro::UvFs(
85+
env->event_loop(), uv_fs_open, path.c_str(), O_RDONLY, 0);
86+
if (fd < 0) { /* reject and co_return */ }
87+
88+
auto [err, stat] = co_await coro::UvFsStat(
89+
env->event_loop(), uv_fs_fstat, static_cast<uv_file>(fd));
90+
// ... read, close, resolve ...
91+
}
92+
```
93+
94+
### Coroutine composition
95+
96+
`UvTask<T>` and `UvTrackedTask<T, Provider>` can be co\_awaited from other
97+
coroutines. This allows factoring common operations into reusable helpers:
98+
99+
```cpp
100+
UvTask<ssize_t> OpenFile(uv_loop_t* loop, const char* path, int flags) {
101+
co_return co_await UvFs(loop, uv_fs_open, path, flags, 0);
102+
}
103+
104+
UvTrackedTask<void, AsyncWrap::PROVIDER_FSREQPROMISE>
105+
OuterCoroutine(Environment* env, ...) {
106+
ssize_t fd = co_await OpenFile(env->event_loop(), path, O_RDONLY);
107+
// ...
108+
}
109+
```
110+
111+
## Lifecycle
112+
113+
### UvTask (untracked)
114+
115+
`UvTask<T>` uses lazy initialization. The coroutine does not run until it is
116+
either co\_awaited from another coroutine (symmetric transfer) or explicitly
117+
started with `Start()`. When `Start()` is called, the coroutine runs until its
118+
first `co_await`, then control returns to the caller. The coroutine frame
119+
self-destructs when the coroutine completes.
120+
121+
### UvTrackedTask (tracked)
122+
123+
`UvTrackedTask<T, Provider>` follows the same lazy/fire-and-forget pattern
124+
but adds three phases around `Start()`:
125+
126+
1. **Creation**: The coroutine frame is allocated from the thread-local
127+
free-list (see "Frame allocator" below). The coroutine is suspended at
128+
`initial_suspend` (lazy).
129+
130+
2. **`InitTracking(env)`**: Assigns an `async_id`, captures the current
131+
`async_context_frame` (for AsyncLocalStorage propagation), emits a trace
132+
event using the provider name from the `ProviderType` enum, and registers
133+
in the Environment's coroutine task list for cancellation during teardown.
134+
If async\_hooks listeners are active (`kInit > 0` or
135+
`kUsesExecutionAsyncResource > 0`), a resource object is created for
136+
`executionAsyncResource()` and the `init` hook is emitted. The type name
137+
V8 string comes from `IsolateData::async_wrap_providers_`, which is
138+
pre-cached at Isolate startup with zero per-coroutine allocation cost.
139+
140+
3. **`Start()`**: Marks the task as detached (fire-and-forget) and resumes
141+
the coroutine. Each resume-to-suspend segment is wrapped in an
142+
`InternalCallbackScope` that provides:
143+
* async\_hooks `before`/`after` events
144+
* `async_context_frame` save/restore (AsyncLocalStorage)
145+
* Microtask and `process.nextTick` draining on close
146+
* `request_waiting_` counter management for event loop liveness
147+
148+
4. **Completion**: At `final_suspend`, the last `InternalCallbackScope` is
149+
closed (draining task queues), the async\_hooks `destroy` event is emitted,
150+
the task is unregistered from the Environment, and the coroutine frame is
151+
returned to the thread-local free-list. If a detached coroutine has a
152+
captured C++ exception that was never observed, `std::terminate()` is
153+
called rather than silently discarding it.
154+
155+
## How the awaitable dispatch works
156+
157+
The `UvFs()` factory function returns a `UvFsAwaitable` that embeds a `uv_fs_t`
158+
directly in the coroutine frame. When the coroutine hits `co_await`:
159+
160+
1. `await_transform()` on the promise wraps it in a `TrackedAwaitable`.
161+
2. `TrackedAwaitable::await_suspend()`:
162+
* Closes the current `InternalCallbackScope` (drains microtasks/nextTick).
163+
* Records the `uv_req_t*` for cancellation support (via `cancelable_req()`).
164+
* Increments `request_waiting_` (event loop liveness).
165+
* Calls the inner `await_suspend()`, which dispatches the libuv call with
166+
`req_.data = this` pointing back to the awaitable.
167+
3. The coroutine is suspended. Control returns to the event loop.
168+
4. When the libuv operation completes, `OnComplete()` calls
169+
`handle_.resume()` to resume the coroutine.
170+
5. `TrackedAwaitable::await_resume()`:
171+
* Decrements `request_waiting_`.
172+
* Clears the cancellation pointer.
173+
* Opens a new `InternalCallbackScope` for the next segment.
174+
* Returns the result (e.g., `req_.result` for fs operations).
175+
176+
The liveness counter and cancellation tracking are conditional on the inner
177+
awaitable having a `cancelable_req()` method (checked at compile time via a
178+
`requires` expression). When co\_awaiting another `UvTask` or `UvTrackedTask`
179+
(coroutine composition), these steps are skipped.
180+
181+
## Environment teardown
182+
183+
During `Environment::CleanupHandles()`, the coroutine task list is iterated and
184+
`Cancel()` is called on each active task. This calls `uv_cancel()` on the
185+
in-flight libuv request (if any), which causes the libuv callback to fire with
186+
`UV_ECANCELED`. The coroutine resumes, sees the error, and completes normally.
187+
The `request_waiting_` counter ensures the teardown loop waits for all
188+
coroutine I/O to finish before destroying the Environment.
189+
190+
## Frame allocator
191+
192+
Coroutine frames are allocated from a thread-local free-list rather than going
193+
through `malloc`/`free` on every creation and destruction. This is implemented
194+
via `promise_type::operator new` and `operator delete` in `TrackedPromiseBase`,
195+
which route through `CoroFrameAlloc()` and `CoroFrameFree()`.
196+
197+
The free-list uses size-class buckets with 256-byte granularity, covering
198+
frames up to 4096 bytes (which covers typical coroutine frames). Frames larger
199+
than 4096 bytes fall through to the global `operator new`. Since all coroutines
200+
run on the event loop thread, the free-list requires no locking.
201+
202+
Each bucket has a high-water mark of 32 cached frames. When a frame is freed
203+
and its bucket is already at capacity, the frame is returned directly to the
204+
system allocator instead of being cached. This bounds the retained memory
205+
per bucket to at most 32 \* bucket\_size bytes (e.g., 32 \* 1024 = 32KB for the
206+
1024-byte size class), preventing unbounded growth after a burst of concurrent
207+
coroutines.
208+
209+
After the first coroutine of a given size class completes, subsequent
210+
coroutines of the same size class are allocated from the free-list with zero
211+
`malloc` overhead.
212+
213+
## Allocation comparison with ReqWrap
214+
215+
For a single async operation (e.g., `fsPromises.access`):
216+
217+
| | ReqWrap pattern | Coroutine (no hooks) | Coroutine (hooks active) |
218+
| -------------------- | --------------- | -------------------- | ------------------------ |
219+
| C++ heap allocations | 3 | 0 (free-list hit) | 0 (free-list hit) |
220+
| V8 heap objects | 7 | 2 (resolver+promise) | 3 (+ resource object) |
221+
| Total allocations | 10 | 2 | 3 |
222+
223+
For a multi-step operation (open + stat + read + close):
224+
225+
| | 4x ReqWrap | Single coroutine (no hooks) | Single coroutine (hooks active) |
226+
| ----------------------------- | ---------- | --------------------------- | ------------------------------- |
227+
| C++ heap allocations | 12 | 0 (free-list hit) | 0 (free-list hit) |
228+
| V8 heap objects | 28 | 2 | 3 |
229+
| Total allocations | 40 | 2 | 3 |
230+
| InternalCallbackScope entries | 4 | 5 (one per segment) | 5 |
231+
232+
The coroutine frame embeds the `uv_fs_t` (\~440 bytes) directly. The compiler
233+
may overlay non-simultaneously-live awaitables in the frame, so a multi-step
234+
coroutine does not necessarily pay N times the `uv_fs_t` cost.
235+
236+
## Known limitations
237+
238+
* **Heap snapshot visibility**: The coroutine frame is not visible to V8 heap
239+
snapshots or `MemoryRetainer`. The thread-local free-list allocator reduces
240+
malloc pressure but does not provide V8 with per-frame memory accounting.
241+
The exact frame contents are not inspectable from heap snapshot tooling.
242+
243+
* **Snapshot serialization**: `UvTrackedTask` holds `v8::Global` handles that
244+
cannot be serialized into a startup snapshot. There is currently no safety
245+
check to prevent snapshotting while coroutines are active. In practice this
246+
is not a problem because snapshots are taken at startup before I/O begins.
247+
248+
* **Free-list retention**: The thread-local free-list retains up to 32 frames
249+
per size class bucket after a burst of concurrent coroutines. These frames
250+
are held until reused or the thread exits. The bound is configurable via
251+
`kMaxCachedPerBucket`.

0 commit comments

Comments
 (0)