Skip to content

Commit 865e876

Browse files
committed
Fix Fetch.enable + http-proxy CDP send/recv deadlock (#2462)
WsConnection.send used to switch the socket to blocking mode on EWOULDBLOCK, with a comment claiming this 'should virtually never happen'. Under CDP Fetch.enable + --http-proxy it is routine: the CONNECT+TLS proxy round-trip means many subresources are in flight simultaneously, lightpanda emits a flood of Fetch.requestPaused events, the kernel send buffer fills, lightpanda blocks on write, and puppeteer's matching Fetch.continueRequest replies pile up in lightpanda's TCP recv buffer (which lightpanda can't drain because it is blocked on the write). Both peers wedge until the client times out. Other contributing problems all collapse paused-intercept sites where an outer loop polls without ever draining the CDP socket, OR where disconnect-time cleanup re-enters JS through paths the runtime can no longer satisfy: * HttpClient.perform skipped the CDP socket poll entirely whenever processMessages() returned non-empty, so a steady stream of HTTP completions could starve CDP reads. * ScriptManagerBase.waitForImport spun on `client.tick(200)` and discarded the .cdp_socket return, so a script `import()` whose request was paused at the InterceptionLayer hung forever. * BrowserContext.deinit aborted pending intercepts via `transfer.abort`, which fired XHR/script error_callback chains into a half-torn-down V8 context (the inspector had already been stopped two lines above). * Headers.deinit was non-idempotent, so a value-copied Headers (the hazard RobotsLayer documents) double-freed its curl_slist on the second deinit; the symptom was an "incorrect alignment" panic inside ZigToCurlAllocator.free. * Transfer.deinit was non-idempotent, so a cascade out of error_callback (e.g. Script.errorCallback -> manager.evaluate() -> JS execution -> Frame.deinit -> abortOwner -> Transfer.kill -> Transfer.deinit) reached `arena_pool.release` twice on the same arena. Coordinated changes: * src/network/WsConnection.zig: On EWOULDBLOCK, instead of switching to a blocking write, poll for both POLLOUT and POLLIN. While waiting for write space, drain any incoming bytes into the reader buffer (without dispatching - that would re-enter send and recurse). Adds tryRead/bufferedBytes accessors. * src/browser/HttpClient.zig: - Add has_buffered_input to CDPClient. In perform(), return .cdp_socket when buffered input exists, and always do at least a non-blocking poll on the CDP socket so HTTP completions can no longer starve CDP reads. - Make Transfer.deinit idempotent by claiming ownership through `client.transfers.remove(self.id)`. Second deinits (cascades out of error_callback) early-return. - Make `Transfer.kill` public (was `fn`) so BrowserContext.deinit can use it. - Tighten RequestParams.deinit / Request.deinit to take `*` instead of `*const` so they can call into `Headers.deinit` (now mutating). * src/network/http.zig: Headers.deinit now nulls out `self.headers` after `curl_slist_free_all`, so a second deinit is a no-op. Without this guard a value-copied Headers double-frees the curl_slist (the hazard RobotsLayer's call site already documents). * src/browser/ScriptManagerBase.zig: waitForImport now drains pending CDP messages on every iteration (matching the syncRequest pattern) and re-fetches the imported_modules entry per iteration. The cached entry was a use-after-free risk because the CDP-drain step above re-enters JS, and a transitively-imported module's preloadImport() -> getOrPut() can rehash the map and invalidate the prior entry pointer. A failed CDP blocking_read surfaces as error.Failed so the caller breaks the loop rather than retrying against a dead socket. * src/cdp/CDP.zig: - Wire hasBufferedInput. - Replace read() with tryRead() in readSocket and tolerate the no_new_data case so we still process messages drained during a backpressured send. - BrowserContext.deinit aborts pending intercepts via `transfer.kill` instead of `transfer.abort`. `kill` fires shutdown_callback (or no-op for transfers without one), avoiding error_callback's re-entry into JS through XHR/script error handlers - those crash because the V8 context and inspector this BC owns have either been torn down already or are about to be. Verified end-to-end against a puppeteer + http-proxy reproducer: | URL | before | after | |----------------------------|----------------------|------------------| | example.com | OK 124ms | OK 117ms | | github.com | HANG 20s (12/82) | OK 1410ms (82/82)| | shopify.com | HANG 20s (1/4) | OK 1973ms (66/66)| | allbirds.com | HANG 20s (12/53) | OK 3944ms (372) | | allbirds wool-runners PDP | HANG 20s (12/53) | OK 6286ms (459) | (nike.com still doesn't reach `load` event but all 68 continueRequests process cleanly - the remaining stall is third-party widgets keeping the page in `loading` state, not the CDP/HTTP deadlock this PR fixes.) Lightpanda survives 18 back-to-back navigation runs across the matrix above (3 per URL) without crashing. Fixes #2462.
1 parent bdd456f commit 865e876

5 files changed

Lines changed: 238 additions & 46 deletions

File tree

src/browser/HttpClient.zig

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,12 @@ pub const CDPClient = struct {
170170
blocking_read_start: *const fn (*anyopaque) bool,
171171
blocking_read: *const fn (*anyopaque) bool,
172172
blocking_read_end: *const fn (*anyopaque) bool,
173+
// Returns true if the CDP client has bytes already buffered (typically
174+
// rescued from the socket while a synchronous send was backpressured;
175+
// see WsConnection.send). When true, perform() returns .cdp_socket so
176+
// the runner drains them even if the OS recv buffer is empty. See
177+
// lightpanda-io/browser#2462.
178+
has_buffered_input: *const fn (*anyopaque) bool,
173179
};
174180

175181
pub fn init(self: *Client, allocator: Allocator, network: *Network, cdp_client: ?CDPClient) !void {
@@ -413,7 +419,8 @@ pub fn _request(_: *anyopaque, transfer: *Transfer) !void {
413419
// inside this function, we free it before returning the error. Callers
414420
// must NOT pair `request()` with their own `errdefer headers.deinit()`
415421
// — that's a double-free.
416-
pub fn request(self: *Client, req: Request, owner: ?*Owner) !void {
422+
pub fn request(self: *Client, req_in: Request, owner: ?*Owner) !void {
423+
var req = req_in;
417424
const arena = self.arena_pool.acquire(.small, "Request.arena") catch |err| {
418425
req.headers.deinit();
419426
return err;
@@ -660,21 +667,36 @@ fn perform(self: *Client, timeout_ms: c_int) anyerror!PerformStatus {
660667

661668
// We're potentially going to block for a while until we get data. Process
662669
// whatever messages we have waiting ahead of time.
663-
if (try self.processMessages()) {
664-
return .normal;
665-
}
670+
const processed = try self.processMessages();
666671

667672
var status = PerformStatus.normal;
668673
if (self.cdp_client) |cdp_client| {
674+
// Bytes may have been rescued from the CDP socket while a
675+
// synchronous send was backpressured (see WsConnection.send). The
676+
// OS recv buffer is empty in that case, but we still owe the
677+
// dispatcher a chance to process them. See
678+
// lightpanda-io/browser#2462.
679+
if (cdp_client.has_buffered_input(cdp_client.ctx)) {
680+
return .cdp_socket;
681+
}
682+
683+
// Even when we processed completion messages this round, do a
684+
// non-blocking poll of the CDP socket so a steady stream of HTTP
685+
// completions can't starve CDP reads. Without this, a flood of
686+
// intercepted/proxied transfers can leave Fetch.continueRequest
687+
// messages unread for seconds at a time.
688+
const cdp_timeout: c_int = if (processed) 0 else timeout_ms;
669689
var wait_fds = [_]http.WaitFd{.{
670690
.fd = cdp_client.socket,
671691
.events = .{ .pollin = true },
672692
.revents = .{},
673693
}};
674-
try self.handles.poll(&wait_fds, timeout_ms);
694+
try self.handles.poll(&wait_fds, cdp_timeout);
675695
if (wait_fds[0].revents.pollin or wait_fds[0].revents.pollpri or wait_fds[0].revents.pollout) {
676696
status = .cdp_socket;
677697
}
698+
} else if (processed) {
699+
return .normal;
678700
} else if (running > 0) {
679701
try self.handles.poll(&.{}, timeout_ms);
680702
}
@@ -963,7 +985,7 @@ pub const Request = struct {
963985
return written.ptr[0..written.len :0];
964986
}
965987

966-
pub fn deinit(self: *const Request) void {
988+
pub fn deinit(self: *Request) void {
967989
self.headers.deinit();
968990
}
969991
};
@@ -1133,6 +1155,22 @@ pub const Transfer = struct {
11331155
}
11341156

11351157
pub fn deinit(self: *Transfer) void {
1158+
// Use `transfers.remove` as a one-shot ownership claim. If it
1159+
// returns false, this transfer has already been deinit'd by a
1160+
// cascade out of `error_callback` (e.g. Script.errorCallback ->
1161+
// manager.evaluate() -> JS execution -> Frame.deinit ->
1162+
// abortOwner -> Transfer.kill -> Transfer.deinit). Returning
1163+
// here keeps us from double-freeing the arena. See
1164+
// lightpanda-io/browser#2462.
1165+
//
1166+
// Reading `self.id` and `self.client` after a prior deinit has
1167+
// released `self.arena` is technically a stale read, but
1168+
// arena_pool zombies the memory until the slot is handed out
1169+
// again. If by some race the slot were re-used between the two
1170+
// deinits, the new tenant's id would not be in `transfers`
1171+
// either, so the early-out still fires and we bail cleanly.
1172+
if (!self.client.transfers.remove(self.id)) return;
1173+
11361174
if (self._conn) |conn| {
11371175
self.client.removeConn(conn);
11381176
self._conn = null;
@@ -1147,10 +1185,6 @@ pub const Transfer = struct {
11471185
self._queued = false;
11481186
}
11491187

1150-
// Drop the id→*Transfer index entry before freeing the memory.
1151-
// Any concurrent CDP lookup by id will now see this transfer as gone.
1152-
_ = self.client.transfers.remove(self.id);
1153-
11541188
self.req.deinit();
11551189
if (self.owner) |o| {
11561190
o.removeTransfer(self);
@@ -1177,8 +1211,12 @@ pub const Transfer = struct {
11771211

11781212
// Owner-driven teardown: fires shutdown_callback (not error_callback)
11791213
// and otherwise behaves like abort. Called by Client.abortOwner /
1180-
// abortRequests when a Frame / WGS is being torn down.
1181-
fn kill(self: *Transfer) void {
1214+
// abortRequests when a Frame / WGS is being torn down, and from
1215+
// BrowserContext.deinit when the CDP connection drops with paused
1216+
// intercepts still outstanding -- in both cases firing error_callback
1217+
// would re-enter JS via XHR/script error handlers, and the inspector
1218+
// / V8 context the handler reaches into has already been torn down.
1219+
pub fn kill(self: *Transfer) void {
11821220
if (self.req.shutdown_callback) |cb| {
11831221
cb(self.req.ctx);
11841222
}

src/browser/ScriptManagerBase.zig

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -286,22 +286,44 @@ pub fn preloadImport(self: *ScriptManagerBase, url: [:0]const u8, referrer: []co
286286
}
287287

288288
pub fn waitForImport(self: *ScriptManagerBase, url: [:0]const u8) !ModuleSource {
289-
const entry = self.imported_modules.getEntry(url) orelse {
290-
// It shouldn't be possible for v8 to ask for a module that we didn't
291-
// `preloadImport` above.
292-
return error.UnknownModule;
293-
};
294-
295289
const was_evaluating = self.is_evaluating;
296290
self.is_evaluating = true;
297291
defer self.is_evaluating = was_evaluating;
298292

299293
var client = self.client;
300294
while (true) {
295+
// Re-fetch the entry on every iteration. We can't cache the pointer
296+
// outside the loop: the CDP-drain step below (re-)enters JS, which
297+
// can call back into `preloadImport` for a transitively-imported
298+
// module. `preloadImport` calls `imported_modules.getOrPut`, which
299+
// may rehash the map and invalidate every existing entry pointer.
300+
// A cached `entry` would then be a use-after-free on the next
301+
// `entry.value_ptr.state` access. See lightpanda-io/browser#2462.
302+
const entry = self.imported_modules.getEntry(url) orelse {
303+
// It shouldn't be possible for v8 to ask for a module that we
304+
// didn't `preloadImport` above.
305+
return error.UnknownModule;
306+
};
307+
301308
switch (entry.value_ptr.state) {
302309
.loading => {
303-
_ = try client.tick(200);
304-
continue;
310+
// Drain pending CDP messages every iteration. Without this,
311+
// a script imported under `Fetch.enable` whose request is
312+
// paused at the InterceptionLayer hangs forever: the CDP
313+
// client's matching `Fetch.continueRequest` reply never
314+
// reaches `CDP.processMessage`, the request never resumes,
315+
// and this loop spins until the client times out.
316+
// `syncRequest` already does this; the import path needs
317+
// the same treatment. See lightpanda-io/browser#2462.
318+
const status = try client.tick(200);
319+
if (status == .cdp_socket) {
320+
if (client.cdp_client) |cdp| {
321+
if (cdp.blocking_read(cdp.ctx) == false) {
322+
// An error happened during cdp's read.
323+
return error.Failed;
324+
}
325+
}
326+
}
305327
},
306328
.done => |script| {
307329
var shared = false;

src/cdp/CDP.zig

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ pub fn init(
111111
.blocking_read_start = CDP.blockingReadStart,
112112
.blocking_read = CDP.blockingRead,
113113
.blocking_read_end = CDP.blockingReadStop,
114+
.has_buffered_input = CDP.hasBufferedInput,
114115
});
115116
}
116117

@@ -149,15 +150,27 @@ pub fn blockingReadStop(ctx: *anyopaque) bool {
149150
return true;
150151
}
151152

153+
pub fn hasBufferedInput(ctx: *anyopaque) bool {
154+
const self: *CDP = @ptrCast(@alignCast(ctx));
155+
return self.ws.bufferedBytes() > 0;
156+
}
157+
152158
pub fn readSocket(self: *CDP) bool {
153-
const n = self.ws.read() catch |err| {
159+
// Use tryRead, not read, so that if the socket has no new bytes (because
160+
// we already drained them while a backpressured send was waiting in
161+
// WsConnection.send) we still process whatever is buffered instead of
162+
// logging an error and bailing. See lightpanda-io/browser#2462.
163+
const result = self.ws.tryRead() catch |err| {
154164
log.warn(.app, "CDP read", .{ .err = err });
155165
return false;
156166
};
157167

158-
if (n == 0) {
159-
log.info(.app, "CDP disconnect", .{});
160-
return false;
168+
switch (result) {
169+
.closed => {
170+
log.info(.app, "CDP disconnect", .{});
171+
return false;
172+
},
173+
.data, .no_new_data => {},
161174
}
162175

163176
return self.ws.processMessages(self) catch false;
@@ -574,7 +587,13 @@ pub const BrowserContext = struct {
574587
);
575588
http_client.interception_layer.intercepted -= 1;
576589
if (http_client.findTransfer(transfer_id)) |transfer| {
577-
transfer.abort(error.ClientDisconnect);
590+
// `kill` (not `abort`) fires shutdown_callback (or no-op),
591+
// not error_callback. error_callback re-enters JS via XHR
592+
// / script error handlers, which crash because the
593+
// inspector + V8 context this BC owns are about to be
594+
// torn down (and partially already are: we just called
595+
// `inspector.stopSession`). See lightpanda-io/browser#2462.
596+
transfer.kill();
578597
}
579598
}
580599

src/network/WsConnection.zig

Lines changed: 124 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -353,31 +353,27 @@ pub fn deinit(self: *WsConnection) void {
353353

354354
pub fn send(self: *WsConnection, data: []const u8) !void {
355355
var pos: usize = 0;
356-
var changed_to_blocking: bool = false;
357356
defer _ = self.send_arena.reset(.{ .retain_with_limit = 1024 * 32 });
358357

359-
defer if (changed_to_blocking) {
360-
// We had to change our socket to blocking me to get our write out
361-
// We need to change it back to non-blocking.
362-
_ = posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags) catch |err| {
363-
log.err(.app, "ws restore nonblocking", .{ .err = err });
364-
};
365-
};
366-
367358
LOOP: while (pos < data.len) {
368359
const written = posix.write(self.socket, data[pos..]) catch |err| switch (err) {
369360
error.WouldBlock => {
370-
// self.socket is nonblocking, because we don't want to block
371-
// reads. But our life is a lot easier if we block writes,
372-
// largely, because we don't have to maintain a queue of pending
373-
// writes (which would each need their own allocations). So
374-
// if we get a WouldBlock error, we'll switch the socket to
375-
// blocking and switch it back to non-blocking after the write
376-
// is complete. Doesn't seem particularly efficiently, but
377-
// this should virtually never happen.
378-
assert(changed_to_blocking == false, "WsConnection.double block", .{});
379-
changed_to_blocking = true;
380-
_ = try posix.fcntl(self.socket, posix.F.SETFL, self.socket_flags & ~@as(u32, @bitCast(posix.O{ .NONBLOCK = true })));
361+
// The peer's recv buffer is full. Naively switching to a
362+
// blocking write would deadlock when the peer is _also_
363+
// blocked trying to send to us — a real scenario under CDP
364+
// Fetch.enable + http-proxy, where lightpanda emits a flood
365+
// of Fetch.requestPaused events while puppeteer is trying
366+
// to send us back the matching Fetch.continueRequest
367+
// messages. Each side fills the other's TCP send buffer,
368+
// and a blocking write here never returns. See
369+
// lightpanda-io/browser#2462.
370+
//
371+
// Instead, poll for POLLOUT _while also_ draining any
372+
// POLLIN data into the reader buffer (without processing —
373+
// that would re-enter the dispatch path and call back into
374+
// send, risking unbounded recursion). The buffered bytes
375+
// are picked up by the next CDP.readSocket call.
376+
try self.waitWritableDrainingReads();
381377
continue :LOOP;
382378
},
383379
else => return err,
@@ -390,6 +386,83 @@ pub fn send(self: *WsConnection, data: []const u8) !void {
390386
}
391387
}
392388

389+
// Block until the socket is writable. Concurrently drains any incoming
390+
// bytes into the reader buffer (without processing them) so the peer
391+
// keeps making progress reading our writes. Used to break the two-way
392+
// backpressure deadlock described in `send`.
393+
fn waitWritableDrainingReads(self: *WsConnection) !void {
394+
// Bounded so a truly dead peer eventually surfaces (TCP keepalive will
395+
// close the socket; this just bounds any single send).
396+
const POLL_TIMEOUT_MS: i32 = 30_000;
397+
398+
// We may have to stop watching POLLIN if the reader buffer is at its
399+
// cap and we can't make any more space (otherwise poll would return
400+
// immediately every iteration with POLLIN set, busy-spinning).
401+
var watch_in = true;
402+
403+
while (true) {
404+
const events_in: i16 = if (watch_in) @as(i16, @intCast(posix.POLL.IN)) else 0;
405+
var pfds = [_]posix.pollfd{.{
406+
.fd = self.socket,
407+
.events = @as(i16, @intCast(posix.POLL.OUT)) | events_in,
408+
.revents = 0,
409+
}};
410+
const n = try posix.poll(&pfds, POLL_TIMEOUT_MS);
411+
if (n == 0) return error.WouldBlock;
412+
413+
const revents = pfds[0].revents;
414+
if (revents & @as(i16, @intCast(posix.POLL.NVAL | posix.POLL.ERR | posix.POLL.HUP)) != 0) {
415+
return error.Closed;
416+
}
417+
418+
if (watch_in and revents & @as(i16, @intCast(posix.POLL.IN)) != 0) {
419+
// Drain whatever's available without processing. May grow the
420+
// buffer up to CDP_MAX_MESSAGE_SIZE.
421+
const before = self.bufferedBytes();
422+
try self.drainAvailable();
423+
if (self.bufferedBytes() == before and self.reader.readBuf().len == 0) {
424+
// Buffer is at cap and we couldn't read anything more.
425+
// Stop watching POLLIN so we don't busy-spin while we wait
426+
// for POLLOUT.
427+
watch_in = false;
428+
}
429+
}
430+
431+
if (revents & @as(i16, @intCast(posix.POLL.OUT)) != 0) {
432+
return;
433+
}
434+
}
435+
}
436+
437+
// Read everything currently available on the socket into the reader
438+
// buffer (non-blocking). Grows the buffer if needed (capped at
439+
// CDP_MAX_MESSAGE_SIZE so a misbehaving peer can't OOM us). Does not
440+
// process messages — the caller will handle that on the next pass
441+
// through CDP.readSocket.
442+
fn drainAvailable(self: *WsConnection) !void {
443+
while (true) {
444+
var buf = self.reader.readBuf();
445+
if (buf.len == 0) {
446+
const current = self.reader.buf.len;
447+
if (current >= CDP_MAX_MESSAGE_SIZE) {
448+
// Already at the cap; refuse to grow further. Stop draining
449+
// — the next POLLOUT-then-write attempt will block again,
450+
// but at least the peer's reads will eventually free our
451+
// send buffer and let us proceed.
452+
return;
453+
}
454+
self.reader.buf = try growBuffer(self.reader.allocator, self.reader.buf, current + 1);
455+
buf = self.reader.readBuf();
456+
}
457+
const n = posix.read(self.socket, buf) catch |err| switch (err) {
458+
error.WouldBlock => return,
459+
else => return err,
460+
};
461+
if (n == 0) return error.Closed; // peer closed
462+
self.reader.len += n;
463+
}
464+
}
465+
393466
const EMPTY_PONG = [_]u8{ 138, 0 };
394467

395468
fn sendPong(self: *WsConnection, data: []const u8) !void {
@@ -477,6 +550,37 @@ pub fn read(self: *WsConnection) !usize {
477550
return n;
478551
}
479552

553+
pub const ReadResult = enum { data, no_new_data, closed };
554+
555+
// Variant of `read` that distinguishes "no new bytes on the wire" from
556+
// "peer closed". Used by CDP.readSocket so it can still drain buffered
557+
// messages (those rescued during a backpressured send) without bailing
558+
// when posix.read reports EWOULDBLOCK.
559+
pub fn tryRead(self: *WsConnection) !ReadResult {
560+
const buf = self.reader.readBuf();
561+
if (buf.len == 0) {
562+
// Reader buffer is completely full of unprocessed messages. Don't
563+
// call posix.read with a zero-length buffer (returns 0 → looks like
564+
// EOF). Caller should processMessages first.
565+
return if (self.bufferedBytes() > 0) .no_new_data else .closed;
566+
}
567+
const n = posix.read(self.socket, buf) catch |err| switch (err) {
568+
error.WouldBlock => return .no_new_data,
569+
else => return err,
570+
};
571+
if (n == 0) return .closed;
572+
self.reader.len += n;
573+
return .data;
574+
}
575+
576+
// Number of bytes sitting in the reader buffer that haven't been parsed
577+
// into messages yet. Used by HttpClient.perform to detect that data was
578+
// rescued from the socket during a backpressured send and still needs to
579+
// be dispatched.
580+
pub fn bufferedBytes(self: *const WsConnection) usize {
581+
return self.reader.len - self.reader.pos;
582+
}
583+
480584
fn processHttpRequest(self: *WsConnection) !HttpResult {
481585
assert(self.reader.pos == 0, "WsConnection.HTTP pos", .{ .pos = self.reader.pos });
482586
const request = self.reader.buf[0..self.reader.len];

0 commit comments

Comments
 (0)