Skip to content

Commit 7fc3790

Browse files
committed
src: implement a prototype for uv coroutines
Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode/Open 4.6
1 parent e78ccd8 commit 7fc3790

File tree

7 files changed

+1307
-2
lines changed

7 files changed

+1307
-2
lines changed

node.gyp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,8 +217,11 @@
217217
'src/cleanup_queue.h',
218218
'src/cleanup_queue-inl.h',
219219
'src/compile_cache.h',
220-
'src/connect_wrap.h',
221-
'src/connection_wrap.h',
220+
'src/connect_wrap.h',
221+
'src/connection_wrap.h',
222+
'src/coro/uv_task.h',
223+
'src/coro/uv_awaitable.h',
224+
'src/coro/uv_promise.h',
222225
'src/cppgc_helpers.h',
223226
'src/cppgc_helpers.cc',
224227
'src/dataqueue/queue.h',

src/coro/uv_awaitable.h

Lines changed: 347 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,347 @@
1+
#ifndef SRC_CORO_UV_AWAITABLE_H_
2+
#define SRC_CORO_UV_AWAITABLE_H_
3+
4+
#include <coroutine>
5+
#include <cstring>
6+
#include <functional>
7+
#include <tuple>
8+
#include <type_traits>
9+
#include <utility>
10+
11+
#include "uv.h"
12+
13+
namespace node {
14+
namespace coro {
15+
16+
// ---------------------------------------------------------------------------
17+
// UvFsAwaitable — Awaitable that wraps a single uv_fs_* async operation.
18+
//
19+
// Created via the UvFs() factory function. Usage:
20+
//
21+
// ssize_t fd = co_await UvFs(loop, uv_fs_open, path, O_RDONLY, 0644);
22+
// if (fd < 0) { /* handle error: fd is the negative uv_errno_t */ }
23+
//
24+
// The uv_fs_t request struct is embedded directly in this object, which
25+
// lives in the coroutine frame across the suspension point. This means
26+
// no separate heap allocation is needed for the request.
27+
//
28+
// On await_resume(), uv_fs_req_cleanup() is called automatically.
29+
// ---------------------------------------------------------------------------
30+
31+
template <typename Fn, typename... Args>
32+
class UvFsAwaitable {
33+
public:
34+
UvFsAwaitable(uv_loop_t* loop, Fn fn, Args... args)
35+
: loop_(loop), fn_(fn), args_(std::move(args)...) {
36+
memset(&req_, 0, sizeof(req_));
37+
}
38+
39+
~UvFsAwaitable() {
40+
// Safety net: if the coroutine is destroyed without resuming
41+
// (e.g. cancellation), still clean up any libuv-internal state.
42+
if (needs_cleanup_) uv_fs_req_cleanup(&req_);
43+
}
44+
45+
UvFsAwaitable(const UvFsAwaitable&) = delete;
46+
UvFsAwaitable& operator=(const UvFsAwaitable&) = delete;
47+
UvFsAwaitable(UvFsAwaitable&&) = delete;
48+
UvFsAwaitable& operator=(UvFsAwaitable&&) = delete;
49+
50+
bool await_ready() noexcept { return false; }
51+
52+
void await_suspend(std::coroutine_handle<> h) noexcept {
53+
handle_ = h;
54+
req_.data = this;
55+
56+
// Dispatch: call uv_fs_*(loop, &req_, ...args..., OnComplete)
57+
int err = std::apply(
58+
[this](auto&&... a) {
59+
return fn_(loop_, &req_, std::forward<decltype(a)>(a)..., OnComplete);
60+
},
61+
args_);
62+
63+
if (err < 0) {
64+
// Synchronous dispatch failure — store the error in req_.result
65+
// so await_resume sees it, and resume immediately.
66+
req_.result = err;
67+
needs_cleanup_ = false; // nothing was dispatched
68+
h.resume();
69+
} else {
70+
needs_cleanup_ = true;
71+
}
72+
}
73+
74+
// Returns req->result: the fd for open, byte count for read/write,
75+
// 0 for success, or a negative uv_errno_t on error.
76+
ssize_t await_resume() noexcept {
77+
ssize_t result = req_.result;
78+
uv_fs_req_cleanup(&req_);
79+
needs_cleanup_ = false;
80+
return result;
81+
}
82+
83+
// Access the raw request (useful before cleanup for stat, readdir, etc.)
84+
const uv_fs_t& req() const { return req_; }
85+
86+
private:
87+
static void OnComplete(uv_fs_t* req) {
88+
auto* self = static_cast<UvFsAwaitable*>(req->data);
89+
self->handle_.resume();
90+
}
91+
92+
uv_loop_t* loop_;
93+
Fn fn_;
94+
std::tuple<Args...> args_;
95+
uv_fs_t req_;
96+
std::coroutine_handle<> handle_;
97+
bool needs_cleanup_ = false;
98+
};
99+
100+
// ---------------------------------------------------------------------------
101+
// UvFs() — Factory function for creating a UvFsAwaitable.
102+
//
103+
// Deduces the libuv function signature and argument types automatically.
104+
// The callback parameter is NOT passed — it's injected by the awaitable.
105+
//
106+
// Examples:
107+
// co_await UvFs(loop, uv_fs_open, "/tmp/test", O_RDONLY, 0644);
108+
// co_await UvFs(loop, uv_fs_close, fd);
109+
// co_await UvFs(loop, uv_fs_stat, "/tmp/test");
110+
// co_await UvFs(loop, uv_fs_read, fd, &iov, 1, offset);
111+
// ---------------------------------------------------------------------------
112+
113+
template <typename Fn, typename... Args>
114+
auto UvFs(uv_loop_t* loop, Fn fn, Args... args) {
115+
return UvFsAwaitable<Fn, Args...>(loop, fn, std::move(args)...);
116+
}
117+
118+
// ---------------------------------------------------------------------------
119+
// UvFsStatAwaitable — Like UvFsAwaitable but await_resume returns the
120+
// uv_stat_t instead of the raw result code. Used for stat/fstat/lstat.
121+
//
122+
// Usage:
123+
// auto [err, stat] = co_await UvFsStat(loop, uv_fs_stat, path);
124+
// ---------------------------------------------------------------------------
125+
126+
template <typename Fn, typename... Args>
127+
class UvFsStatAwaitable {
128+
public:
129+
UvFsStatAwaitable(uv_loop_t* loop, Fn fn, Args... args)
130+
: loop_(loop), fn_(fn), args_(std::move(args)...) {
131+
memset(&req_, 0, sizeof(req_));
132+
}
133+
134+
~UvFsStatAwaitable() {
135+
if (needs_cleanup_) uv_fs_req_cleanup(&req_);
136+
}
137+
138+
UvFsStatAwaitable(const UvFsStatAwaitable&) = delete;
139+
UvFsStatAwaitable& operator=(const UvFsStatAwaitable&) = delete;
140+
UvFsStatAwaitable(UvFsStatAwaitable&&) = delete;
141+
UvFsStatAwaitable& operator=(UvFsStatAwaitable&&) = delete;
142+
143+
bool await_ready() noexcept { return false; }
144+
145+
void await_suspend(std::coroutine_handle<> h) noexcept {
146+
handle_ = h;
147+
req_.data = this;
148+
149+
int err = std::apply(
150+
[this](auto&&... a) {
151+
return fn_(loop_, &req_, std::forward<decltype(a)>(a)..., OnComplete);
152+
},
153+
args_);
154+
155+
if (err < 0) {
156+
req_.result = err;
157+
needs_cleanup_ = false;
158+
h.resume();
159+
} else {
160+
needs_cleanup_ = true;
161+
}
162+
}
163+
164+
struct StatResult {
165+
int error; // 0 on success, negative uv_errno_t on failure
166+
uv_stat_t stat; // valid only when error == 0
167+
};
168+
169+
StatResult await_resume() noexcept {
170+
StatResult r{};
171+
if (req_.result < 0) {
172+
r.error = static_cast<int>(req_.result);
173+
} else {
174+
r.error = 0;
175+
r.stat = req_.statbuf;
176+
}
177+
uv_fs_req_cleanup(&req_);
178+
needs_cleanup_ = false;
179+
return r;
180+
}
181+
182+
private:
183+
static void OnComplete(uv_fs_t* req) {
184+
auto* self = static_cast<UvFsStatAwaitable*>(req->data);
185+
self->handle_.resume();
186+
}
187+
188+
uv_loop_t* loop_;
189+
Fn fn_;
190+
std::tuple<Args...> args_;
191+
uv_fs_t req_;
192+
std::coroutine_handle<> handle_;
193+
bool needs_cleanup_ = false;
194+
};
195+
196+
template <typename Fn, typename... Args>
197+
auto UvFsStat(uv_loop_t* loop, Fn fn, Args... args) {
198+
return UvFsStatAwaitable<Fn, Args...>(loop, fn, std::move(args)...);
199+
}
200+
201+
// ---------------------------------------------------------------------------
202+
// UvWork — Awaitable that wraps uv_queue_work.
203+
//
204+
// Runs a callable on the libuv thread pool. The coroutine suspends
205+
// until the work is complete, then resumes on the event loop thread.
206+
//
207+
// Usage:
208+
// int status = co_await UvWork(loop, [&]() {
209+
// // expensive computation on thread pool
210+
// });
211+
// ---------------------------------------------------------------------------
212+
213+
template <typename WorkFn>
214+
class UvWorkAwaitable {
215+
public:
216+
UvWorkAwaitable(uv_loop_t* loop, WorkFn work_fn)
217+
: loop_(loop), work_fn_(std::move(work_fn)) {
218+
memset(&req_, 0, sizeof(req_));
219+
}
220+
221+
~UvWorkAwaitable() = default;
222+
223+
UvWorkAwaitable(const UvWorkAwaitable&) = delete;
224+
UvWorkAwaitable& operator=(const UvWorkAwaitable&) = delete;
225+
UvWorkAwaitable(UvWorkAwaitable&&) = delete;
226+
UvWorkAwaitable& operator=(UvWorkAwaitable&&) = delete;
227+
228+
bool await_ready() noexcept { return false; }
229+
230+
void await_suspend(std::coroutine_handle<> h) noexcept {
231+
handle_ = h;
232+
req_.data = this;
233+
234+
int err = uv_queue_work(loop_, &req_, OnWork, OnAfterWork);
235+
if (err < 0) {
236+
status_ = err;
237+
h.resume();
238+
}
239+
}
240+
241+
// Returns 0 on success, UV_ECANCELED if cancelled, or negative error.
242+
int await_resume() noexcept { return status_; }
243+
244+
private:
245+
static void OnWork(uv_work_t* req) {
246+
auto* self = static_cast<UvWorkAwaitable*>(req->data);
247+
self->work_fn_();
248+
}
249+
250+
static void OnAfterWork(uv_work_t* req, int status) {
251+
auto* self = static_cast<UvWorkAwaitable*>(req->data);
252+
self->status_ = status;
253+
self->handle_.resume();
254+
}
255+
256+
uv_loop_t* loop_;
257+
WorkFn work_fn_;
258+
uv_work_t req_;
259+
std::coroutine_handle<> handle_;
260+
int status_ = 0;
261+
};
262+
263+
template <typename WorkFn>
264+
auto UvWork(uv_loop_t* loop, WorkFn&& work_fn) {
265+
return UvWorkAwaitable<std::decay_t<WorkFn>>(loop,
266+
std::forward<WorkFn>(work_fn));
267+
}
268+
269+
// ---------------------------------------------------------------------------
270+
// UvGetAddrInfo — Awaitable wrapper for uv_getaddrinfo.
271+
//
272+
// Usage:
273+
// auto [status, info] = co_await UvGetAddrInfo(loop, "example.com", "80",
274+
// &hints);
275+
// if (status == 0) {
276+
// // use info...
277+
// uv_freeaddrinfo(info);
278+
// }
279+
// ---------------------------------------------------------------------------
280+
281+
class UvGetAddrInfoAwaitable {
282+
public:
283+
UvGetAddrInfoAwaitable(uv_loop_t* loop,
284+
const char* node,
285+
const char* service,
286+
const struct addrinfo* hints)
287+
: loop_(loop), node_(node), service_(service), hints_(hints) {
288+
memset(&req_, 0, sizeof(req_));
289+
}
290+
291+
~UvGetAddrInfoAwaitable() = default;
292+
293+
UvGetAddrInfoAwaitable(const UvGetAddrInfoAwaitable&) = delete;
294+
UvGetAddrInfoAwaitable& operator=(const UvGetAddrInfoAwaitable&) = delete;
295+
296+
bool await_ready() noexcept { return false; }
297+
298+
void await_suspend(std::coroutine_handle<> h) noexcept {
299+
handle_ = h;
300+
req_.data = this;
301+
302+
int err = uv_getaddrinfo(loop_, &req_, OnComplete, node_, service_, hints_);
303+
if (err < 0) {
304+
status_ = err;
305+
addrinfo_ = nullptr;
306+
h.resume();
307+
}
308+
}
309+
310+
struct AddrInfoResult {
311+
int status; // 0 on success, negative uv_errno_t on error
312+
struct addrinfo* addrinfo; // caller must uv_freeaddrinfo() on success
313+
};
314+
315+
AddrInfoResult await_resume() noexcept { return {status_, addrinfo_}; }
316+
317+
private:
318+
static void OnComplete(uv_getaddrinfo_t* req,
319+
int status,
320+
struct addrinfo* res) {
321+
auto* self = static_cast<UvGetAddrInfoAwaitable*>(req->data);
322+
self->status_ = status;
323+
self->addrinfo_ = res;
324+
self->handle_.resume();
325+
}
326+
327+
uv_loop_t* loop_;
328+
const char* node_;
329+
const char* service_;
330+
const struct addrinfo* hints_;
331+
uv_getaddrinfo_t req_;
332+
std::coroutine_handle<> handle_;
333+
int status_ = 0;
334+
struct addrinfo* addrinfo_ = nullptr;
335+
};
336+
337+
inline auto UvGetAddrInfo(uv_loop_t* loop,
338+
const char* node,
339+
const char* service,
340+
const struct addrinfo* hints = nullptr) {
341+
return UvGetAddrInfoAwaitable(loop, node, service, hints);
342+
}
343+
344+
} // namespace coro
345+
} // namespace node
346+
347+
#endif // SRC_CORO_UV_AWAITABLE_H_

0 commit comments

Comments
 (0)