Skip to content

Commit 695093a

Browse files
committed
Move multi to Network: async submission, slot-based completions
1 parent 9bbc214 commit 695093a

4 files changed

Lines changed: 576 additions & 271 deletions

File tree

src/browser/HttpClient.zig

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ pub const InterceptionLayer = @import("../network/layer/InterceptionLayer.zig");
6262
// impacting those other http requests.
6363
pub const Client = @This();
6464

65-
// Our curl multi handle. Owns in_use/ready_queue/http_active/ws_active/
66-
// performing — see Network.Handle.
65+
// Our curl multi handle. Owns in_use/http_active/ws_active — see Network.Handle.
6766
handle: Network.Handle,
6867

6968
// WebSockets with queued events to be drained from the worker thread.
@@ -206,6 +205,27 @@ pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client:
206205

207206
pub fn deinit(self: *Client) void {
208207
self.abort();
208+
209+
// Cancellations submitted by abort flow through the network thread
210+
// and come back as canceled completions. If the network thread has
211+
// already stopped, drive its queues ourselves; otherwise spin until
212+
// they all arrive and we drained them.
213+
var spins: usize = 0;
214+
while (self.handle.in_use.first != null and spins < 1000) : (spins += 1) {
215+
if (self.network.shutdown.load(.acquire)) {
216+
self.network.drainPendingForShutdown();
217+
}
218+
_ = self.processMessages() catch {};
219+
self.drainReadyWs();
220+
if (self.handle.in_use.first == null) break;
221+
std.Thread.sleep(std.time.ns_per_ms);
222+
}
223+
if (self.handle.in_use.first != null) {
224+
log.warn(.http, "deinit with active conns", .{
225+
.count = self.handle.http_active + self.handle.ws_active,
226+
});
227+
}
228+
209229
self.handle.deinit();
210230

211231
self.ws_ready.deinit(self.allocator);
@@ -260,14 +280,8 @@ pub fn setTlsVerify(self: *Client, verify: bool) !void {
260280

261281
var it = self.handle.in_use.first;
262282
while (it) |node| : (it = node.next) {
263-
const conn: *http.Connection = @fieldParentPtr("node", node);
264-
try self.handle.submitTlsVerify(conn, verify, self.use_proxy);
265-
}
266-
267-
it = self.handle.ready_queue.first;
268-
while (it) |node| : (it = node.next) {
269-
const conn: *http.Connection = @fieldParentPtr("node", node);
270-
try self.handle.submitTlsVerify(conn, verify, self.use_proxy);
283+
const conn: *http.Connection = @fieldParentPtr("_worker_node", node);
284+
self.handle.submitTlsVerify(conn, verify, self.use_proxy);
271285
}
272286

273287
self.tls_verify = verify;
@@ -315,16 +329,13 @@ pub fn abort(self: *Client) void {
315329
// each transfer's deinit:
316330
// - self.transfers : transfers.remove(self.id)
317331
// - self.queue : unlinked if _queued is set
318-
// - handle.in_use / handle.ready_queue : via submitRemove
319-
// - handle.dirty : drained at end of each perform
332+
// - handle.in_use : cancellations complete through Handle completions
320333
// Any non-empty list means a transfer escaped cleanup — assert so we
321334
// catch the regression rather than silently leaking on next use.
322335
if (comptime IS_DEBUG) {
323336
std.debug.assert(self.transfers.size == 0);
324337
std.debug.assert(self.queue.first == null);
325338
std.debug.assert(self.handle.in_use.first == null);
326-
std.debug.assert(self.handle.ready_queue.first == null);
327-
std.debug.assert(self.handle.dirty.first == null);
328339
}
329340
}
330341

@@ -538,14 +549,9 @@ pub fn syncRequest(self: *Client, allocator: Allocator, req: Request) !SyncRespo
538549
// cases, the interceptor is expected to call resume to continue the transfer
539550
// or transfer.abort() to abort it.
540551
fn process(self: *Client, transfer: *Transfer) !void {
541-
// libcurl doesn't allow recursive calls, if we're in a `perform()` operation
542-
// then we _have_ to queue this.
543-
if (self.handle.performing == false) {
544-
if (self.handle.getConnection()) |conn| {
545-
return self.makeRequest(conn, transfer);
546-
}
552+
if (self.handle.getConnection()) |conn| {
553+
return self.makeRequest(conn, transfer);
547554
}
548-
549555
self.queue.append(&transfer._node);
550556
transfer._queued = true;
551557
transfer.loop_owned = true;
@@ -623,17 +629,12 @@ pub const PerformStatus = enum {
623629
};
624630

625631
fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus {
626-
// Handle.perform manages its own performing flag and drains
627-
// ready_queue after curl_multi_perform returns.
628-
const running = try self.handle.perform();
629-
630-
// Drain queued WebSocket events. ws callbacks (called from libcurl during
631-
// perform above) only buffer/queue — actual JS dispatch happens here, on
632-
// the worker thread.
632+
// The network thread drives the multi; this just drains whatever
633+
// it's already pushed (WS events queued by callbacks, completions
634+
// delivered through Handle's wake pipe).
633635
self.drainReadyWs();
634636

635-
// We're potentially going to block for a while until we get data. Process
636-
// whatever messages we have waiting ahead of time.
637+
// Process completions we already have before deciding to block.
637638
if (try self.processMessages()) {
638639
return .normal;
639640
}
@@ -649,7 +650,9 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus {
649650
if (pollfds[0].revents != 0) {
650651
status = .cdp_socket;
651652
}
652-
} else if (running > 0) {
653+
} else if (self.handle.in_use.first != null) {
654+
// Block until the network thread pushes a completion (or
655+
// timeout). With nothing in flight there's no reason to wait.
653656
try self.handle.poll(&.{}, timeout_ms);
654657
}
655658

@@ -1045,14 +1048,14 @@ pub const Transfer = struct {
10451048

10461049
fn releaseConn(self: *Transfer) void {
10471050
if (self._conn) |conn| {
1048-
self.client.handle.submitRemove(conn);
1051+
self.client.handle.finishConn(conn);
10491052
self._conn = null;
10501053
}
10511054
}
10521055

10531056
pub fn deinit(self: *Transfer) void {
10541057
if (self._conn) |conn| {
1055-
self.client.handle.submitRemove(conn);
1058+
self.client.handle.finishConn(conn);
10561059
self._conn = null;
10571060
}
10581061

@@ -1104,7 +1107,7 @@ pub const Transfer = struct {
11041107
}
11051108

11061109
// Decide whether to tear down now or defer until processOneMessage
1107-
// eventually drains the in-flight curl handle.
1110+
// eventually receives the canceled/completed connection.
11081111
//
11091112
// Two cases force deferral:
11101113
// * `_performing` — processOneMessage is currently processing THIS
@@ -1113,18 +1116,19 @@ pub const Transfer = struct {
11131116
// here would double-free. Note that `_conn` is cleared partway
11141117
// through this window (the "release conn ASAP" step before
11151118
// done_callback fires), so we cannot rely on `_conn != null`.
1116-
// * `client.performing` + we have a libcurl handle — libcurl could
1117-
// still fire callbacks for us. Releasing the arena now would UAF
1118-
// from inside curl.
1119+
// * `_conn != null` — the network thread owns the easy. Submit a
1120+
// cancellation and let the completion path call deinit.
11191121
//
11201122
// Otherwise (parked / queued / never-trackConn'd / fully drained),
11211123
// there is nothing left referencing this transfer and we can safely
11221124
// deinit inline even from inside a perform callback.
11231125
fn detachOrDeinit(self: *Transfer) void {
1124-
const must_defer = self._performing or
1125-
(self.client.handle.performing and self._conn != null);
1126-
if (must_defer) {
1126+
if (self.aborted) return;
1127+
if (self._performing) {
1128+
self.detachInPerform();
1129+
} else if (self._conn) |conn| {
11271130
self.detachInPerform();
1131+
self.client.handle.submitRemove(conn);
11281132
} else {
11291133
self.deinit();
11301134
}

src/browser/webapi/net/WebSocket.zig

Lines changed: 53 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ _http_client: *HttpClient,
5959
_req_headers: http.Headers,
6060

6161
_owner_node: std.DoublyLinkedList.Node = .{},
62+
_owner_linked: bool = false,
6263

6364
// buffered outgoing messages
6465
_send_queue: std.ArrayList(Message) = .empty,
@@ -88,6 +89,11 @@ _pending_close: ?PendingClose = null,
8889
// WebSocket stays alive between queueing and drain.
8990
_in_ready_list: bool = false,
9091

92+
// Set while a cancel is in flight. We hold an extra ref so the WS
93+
// can't be freed before the canceled completion arrives via
94+
// drainCompletions → disconnected.
95+
_cancel_pending: bool = false,
96+
9197
// close info for event dispatch
9298
_close_code: u16 = 1000,
9399
_close_reason: []const u8 = "",
@@ -185,6 +191,7 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
185191
conn.transport = .{ .websocket = self };
186192
try http_client.handle.submitRequest(conn);
187193
frame._http_owner.addWS(self);
194+
self._owner_linked = true;
188195

189196
if (comptime IS_DEBUG) {
190197
log.info(.websocket, "connecting", .{ .url = url });
@@ -199,7 +206,10 @@ pub fn init(url: []const u8, protocols: [][]const u8, frame: *Frame) !*WebSocket
199206
}
200207

201208
pub fn deinit(self: *WebSocket, page: *Page) void {
202-
self.cleanup();
209+
// By the time we reach deinit (RC=0) the canceled completion has
210+
// already routed through cleanup(true). If conn is still set
211+
// here it's an upstream invariant break — defensively finish.
212+
self.cleanup(true);
203213

204214
if (self._on_open) |func| {
205215
func.release();
@@ -235,15 +245,18 @@ fn asEventTarget(self: *WebSocket) *EventTarget {
235245

236246
// we're being aborted internally (e.g. frame shutting down)
237247
pub fn kill(self: *WebSocket) void {
238-
self.cleanup();
248+
self._ready_state = .closed;
249+
self.cleanup(false);
239250
}
240251

241252
pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
242-
if (self._ready_state == .closed) {
243-
// already disconnected (e.g. close-handshake disconnected us, then
244-
// libcurl reports the same connection completion).
245-
return;
246-
}
253+
// Always run terminal cleanup — even on the already-closed early
254+
// out, since the canceled completion delivery still needs to run
255+
// finishConn and drop the cancel-pending ref.
256+
defer self.cleanup(true);
257+
258+
if (self._ready_state == .closed) return;
259+
247260
const was_clean = self._ready_state == .closing and err_ == null;
248261
self._ready_state = .closed;
249262

@@ -253,29 +266,44 @@ pub fn disconnected(self: *WebSocket, err_: ?anyerror) void {
253266
log.info(.websocket, "disconnected", .{ .url = self._url, .reason = "closed" });
254267
}
255268

256-
// Queue events first (markReady acquires a "pending" ref), then cleanup
257-
// (which releases the create-time ref). The pending ref keeps us alive
258-
// until drainPending dispatches and releases it.
259269
self._pending_close = .{
260270
.code = if (was_clean) self._close_code else 1006,
261271
.reason = if (was_clean) self._close_reason else "",
262272
.was_clean = was_clean,
263273
.with_error = !was_clean,
264274
};
265275
self.markReady();
266-
267-
self.cleanup();
268276
}
269277

270-
fn cleanup(self: *WebSocket) void {
271-
if (self._conn) |conn| {
278+
// completed=true: terminal — conn already detached (delivered via
279+
// drainCompletions). Decrements counters via finishConn, releases
280+
// create-time ref (and cancel-pending ref if held).
281+
//
282+
// completed=false: initiate cancel — submitRemove queues the network
283+
// remove; we hold an extra ref until the canceled completion routes
284+
// back through disconnected → cleanup(true).
285+
fn cleanup(self: *WebSocket, completed: bool) void {
286+
const conn = self._conn orelse return;
287+
if (self._owner_linked) {
272288
self._frame._http_owner.removeWS(self);
289+
self._owner_linked = false;
290+
}
291+
if (!completed) {
292+
if (self._cancel_pending) return;
293+
self._cancel_pending = true;
294+
self.acquireRef();
273295
self._http_client.handle.submitRemove(conn);
274-
self._req_headers.deinit();
275-
self._conn = null;
276-
self.releaseRef(self._frame._page);
277-
self._send_queue.clearRetainingCapacity();
296+
return;
297+
}
298+
self._http_client.handle.finishConn(conn);
299+
self._req_headers.deinit();
300+
self._conn = null;
301+
self.releaseRef(self._frame._page); // create-time
302+
if (self._cancel_pending) {
303+
self._cancel_pending = false;
304+
self.releaseRef(self._frame._page); // pending-cancel
278305
}
306+
self._send_queue.clearRetainingCapacity();
279307
}
280308

281309
fn queueMessage(self: *WebSocket, msg: Message) !void {
@@ -285,7 +313,7 @@ fn queueMessage(self: *WebSocket, msg: Message) !void {
285313
if (was_empty) {
286314
// Unpause the send callback so libcurl will request data
287315
if (self._conn) |conn| {
288-
try self._http_client.handle.submitUnpause(conn);
316+
self._http_client.handle.submitUnpause(conn);
289317
}
290318
}
291319
}
@@ -396,7 +424,7 @@ pub fn close(self: *WebSocket, code_: ?u16, reason_: ?[]const u8) !void {
396424
.with_error = false,
397425
};
398426
self.markReady();
399-
self.cleanup();
427+
self.cleanup(false);
400428
return;
401429
}
402430

@@ -742,9 +770,11 @@ fn _receivedDataCallback(conn: *http.Connection, data: []const u8) !void {
742770
1005; // No status code received
743771

744772
if (self._ready_state == .closing) {
745-
// Client-initiated close: this is the server's response.
746-
// Close handshake complete - disconnect.
747-
self.disconnected(null);
773+
// Client-initiated close — server's response. Don't
774+
// disconnect inline: we're inside a libcurl callback
775+
// and tearing the conn down here would UAF the easy
776+
// handle. Curl will deliver normal completion when the
777+
// server closes the socket per RFC 6455 §5.5.1.
748778
} else {
749779
// Server-initiated close: send reciprocal close frame per RFC 6455 §5.5.1
750780
self._close_code = received_code;

0 commit comments

Comments
 (0)