@@ -8,12 +8,13 @@ const handlers = @import("handlers.zig");
88const dataset = @import ("dataset.zig" );
99
1010const PORT : u16 = 8080 ;
11- /// Per-worker connection cap. The bench distributes 4096 connections across
12- /// N workers via SO_REUSEPORT (4-tuple hash), so per-worker mean is ≤ 64
13- /// with σ ≈ 8. 128 gives a healthy margin and roughly 4× less BSS than the
14- /// previous 1024 — the memory bonus in HttpArena's composite uses
15- /// `sqrt(rps)/memMB`, so even when rps stays flat the lower RSS bumps the score.
16- const MAX_CONN = 128 ;
11+ /// Highest accepted fd. Reserves an O(1) lookup table; Linux assigns
12+ /// fresh fds starting from the lowest free slot so per-worker fds stay
13+ /// well below this even under churn. 4096 covers HttpArena's entire
14+ /// connection budget per worker with comfortable headroom.
15+ const MAX_FD : usize = 4096 ;
16+ /// Max concurrent JSON-promoted slots per worker (bounds the big_pool).
17+ const MAX_JSON_CONN = 128 ;
1718const RING_ENTRIES = 4096 ;
1819const LISTEN_BACKLOG : u32 = 1024 ;
1920
@@ -32,7 +33,7 @@ const WRITE_INLINE_FLUSH_AT: u32 = WRITE_INLINE - 256;
3233/// header). Pool slots are BSS — never touched, never resident — until
3334/// JSON traffic actually arrives, then released back on connection close.
3435const BIG_BUF_SIZE = 16 * 1024 ;
35- const BIG_POOL_SIZE = MAX_CONN ;
36+ const BIG_POOL_SIZE = MAX_JSON_CONN ;
3637
3738const Op = enum (u8 ) {
3839 accept = 1 ,
@@ -41,27 +42,28 @@ const Op = enum(u8) {
4142 close = 4 ,
4243};
4344
44- /// user_data = (op << 56) | slot_idx
45- inline fn ud (op : Op , slot : u32 ) u64 {
46- return (@as (u64 , @intFromEnum (op )) << 56 ) | @as (u64 , slot );
45+ /// user_data = (op << 32) | fd. The fd both identifies the operation and
46+ /// is the lookup key into `slot_table`. Linux gives us back the lowest
47+ /// free fd at each accept so the table stays sparse in the low range.
48+ inline fn ud (op : Op , fd : linux.fd_t ) u64 {
49+ return (@as (u64 , @intFromEnum (op )) << 32 ) | @as (u64 , @as (u32 , @bitCast (fd )));
4750}
4851inline fn udOp (u : u64 ) Op {
49- return @enumFromInt (@as (u8 , @intCast (u >> 56 )));
52+ return @enumFromInt (@as (u8 , @intCast (u >> 32 )));
5053}
51- inline fn udSlot (u : u64 ) u32 {
52- return @intCast ( u & 0x00FFFFFFFFFFFFFF );
54+ inline fn udFd (u : u64 ) linux.fd_t {
55+ return @as ( linux . fd_t , @bitCast ( @as ( u32 , @truncate ( u ))) );
5356}
5457
5558const Slot = struct {
56- fd : linux.fd_t = -1 ,
57- in_use : bool = false ,
59+ fd : linux.fd_t ,
5860 parser : http.Parser = .{},
59- /// Inline buffer used for small responses (baseline / pipelined /
61+ /// Inline buffer for small responses (baseline / pipelined /
6062 /// limited-conn). Pipelined batches concatenate here.
6163 write_inline : [WRITE_INLINE ]u8 = undefined ,
6264 /// Index into `big_pool` if this connection has been promoted to a
63- /// large buffer (after seeing a /json/ request). Kept across requests
64- /// on the same connection; released to the pool on close.
65+ /// large buffer (first /json/ request on the conn ). Kept across
66+ /// requests on the same connection; released to the pool on close.
6567 big_idx : ? u32 = null ,
6668 /// Pointer + length of whatever buffer the in-flight send is reading
6769 /// from. Either &write_inline[0] or &big_pool[big_idx][0].
@@ -71,7 +73,17 @@ const Slot = struct {
7173 close_after_send : bool = false ,
7274};
7375
74- var slots : [MAX_CONN ]Slot = undefined ;
76+ /// fd-indexed pointer table — null for free fds, *Slot once accepted.
77+ /// 4096 × 8 B = 32 KiB BSS, almost entirely zero-fill (kernel touches
78+ /// only the cache lines holding active fds, ~1 page resident in practice).
79+ var slot_table : [MAX_FD ]? * Slot = .{null } ** MAX_FD ;
80+
81+ /// Per-worker page-backed allocator. `page_allocator` returns pages via
82+ /// mmap and releases them via munmap on free, so freed Slots actually
83+ /// give memory back to the kernel — important for the limited-conn churn
84+ /// pattern where many connections cycle through.
85+ const slot_allocator = std .heap .page_allocator ;
86+
7587var ds : dataset.Dataset = undefined ;
7688
7789/// Per-worker pool of 16 KiB JSON response buffers. Static BSS — pages
@@ -95,23 +107,29 @@ fn bigRelease(idx: u32) void {
95107 big_used [idx ] = false ;
96108}
97109
98- fn allocSlot () ? u32 {
99- var i : u32 = 0 ;
100- while (i < MAX_CONN ) : (i += 1 ) {
101- if (! slots [i ].in_use ) {
102- slots [i ] = .{};
103- slots [i ].in_use = true ;
104- return i ;
105- }
106- }
107- return null ;
110+ fn getSlot (fd : linux.fd_t ) ? * Slot {
111+ const fdu : usize = @intCast (fd );
112+ if (fdu >= MAX_FD ) return null ;
113+ return slot_table [fdu ];
108114}
109115
110- fn freeSlot (idx : u32 ) void {
111- if (slots [idx ].big_idx ) | b | bigRelease (b );
112- slots [idx ].big_idx = null ;
113- slots [idx ].in_use = false ;
114- slots [idx ].fd = -1 ;
116+ fn allocSlotFor (fd : linux.fd_t ) ? * Slot {
117+ const fdu : usize = @intCast (fd );
118+ if (fdu >= MAX_FD ) return null ;
119+ const slot = slot_allocator .create (Slot ) catch return null ;
120+ slot .* = .{ .fd = fd };
121+ slot_table [fdu ] = slot ;
122+ return slot ;
123+ }
124+
125+ fn freeSlotFor (fd : linux.fd_t ) void {
126+ const fdu : usize = @intCast (fd );
127+ if (fdu >= MAX_FD ) return ;
128+ if (slot_table [fdu ]) | slot | {
129+ if (slot .big_idx ) | b | bigRelease (b );
130+ slot_allocator .destroy (slot );
131+ slot_table [fdu ] = null ;
132+ }
115133}
116134
117135pub fn main () ! void {
@@ -260,7 +278,7 @@ fn handleCqe(ring: *IoUring, listen_fd: linux.fd_t, cqe: *linux.io_uring_cqe) !v
260278 .accept = > try handleAccept (ring , listen_fd , cqe ),
261279 .recv = > try handleRecv (ring , cqe ),
262280 .send = > try handleSend (ring , cqe ),
263- .close = > freeSlot ( udSlot (cqe .user_data )),
281+ .close = > freeSlotFor ( udFd (cqe .user_data )),
264282 }
265283}
266284
@@ -271,51 +289,48 @@ fn handleAccept(ring: *IoUring, listen_fd: linux.fd_t, cqe: *linux.io_uring_cqe)
271289 return ;
272290 }
273291 const fd : linux.fd_t = @intCast (cqe .res );
274- const slot_idx = allocSlot ( ) orelse {
292+ const slot = allocSlotFor ( fd ) orelse {
275293 _ = linux .close (fd );
276294 if (! more ) _ = try ring .accept_multishot (ud (.accept , 0 ), listen_fd , null , null , 0 );
277295 return ;
278296 };
279- slots [slot_idx ].fd = fd ;
280- const buf = slots [slot_idx ].parser .recv_slot ();
281- _ = try ring .recv (ud (.recv , slot_idx ), fd , .{ .buffer = buf }, 0 );
297+ const buf = slot .parser .recv_slot ();
298+ _ = try ring .recv (ud (.recv , fd ), fd , .{ .buffer = buf }, 0 );
282299
283300 // If multishot fell off, re-arm.
284301 if (! more ) _ = try ring .accept_multishot (ud (.accept , 0 ), listen_fd , null , null , 0 );
285302}
286303
287304fn handleRecv (ring : * IoUring , cqe : * linux.io_uring_cqe ) ! void {
288- const slot_idx = udSlot (cqe .user_data );
289- const slot = & slots [ slot_idx ];
305+ const fd = udFd (cqe .user_data );
306+ const slot = getSlot ( fd ) orelse return ; // already freed — race with close CQE
290307 if (cqe .res <= 0 ) {
291- _ = try ring .close (ud (.close , slot_idx ), slot . fd );
308+ _ = try ring .close (ud (.close , fd ), fd );
292309 return ;
293310 }
294- try drainAndSend (ring , slot_idx , @intCast (cqe .res ));
311+ try drainAndSend (ring , slot , @intCast (cqe .res ));
295312}
296313
297314fn handleSend (ring : * IoUring , cqe : * linux.io_uring_cqe ) ! void {
298- const slot_idx = udSlot (cqe .user_data );
299- const slot = & slots [ slot_idx ] ;
315+ const fd = udFd (cqe .user_data );
316+ const slot = getSlot ( fd ) orelse return ;
300317 if (cqe .res <= 0 ) {
301- _ = try ring .close (ud (.close , slot_idx ), slot . fd );
318+ _ = try ring .close (ud (.close , fd ), fd );
302319 return ;
303320 }
304321 const n : u32 = @intCast (cqe .res );
305322 slot .send_off += n ;
306323 if (slot .send_off < slot .send_len ) {
307- // Partial send — replay from wherever the active buffer is
308- // (inline or big), tracked by send_ptr.
324+ // Partial send — replay from wherever the active buffer is.
309325 const tail = slot .send_ptr [slot .send_off .. slot .send_len ];
310- _ = try ring .send (ud (.send , slot_idx ), slot . fd , tail , linux .MSG .NOSIGNAL );
326+ _ = try ring .send (ud (.send , fd ), fd , tail , linux .MSG .NOSIGNAL );
311327 return ;
312328 }
313329 if (slot .close_after_send ) {
314- _ = try ring .close (ud (.close , slot_idx ), slot . fd );
330+ _ = try ring .close (ud (.close , fd ), fd );
315331 return ;
316332 }
317- // Keep-alive: drain any further pipelined requests already buffered.
318- try drainAndSend (ring , slot_idx , 0 );
333+ try drainAndSend (ring , slot , 0 );
319334}
320335
321336/// Drain as many complete requests as fit in the inline write buffer,
@@ -331,8 +346,7 @@ fn handleSend(ring: *IoUring, cqe: *linux.io_uring_cqe) !void {
331346/// We dispatch all complete requests buffered before submitting one batched
332347/// send — saves N-1 syscalls and avoids a deadlock that the older
333348/// "recv → dispatch one → send → recv" code path hit on pipelined input.
334- fn drainAndSend (ring : * IoUring , slot_idx : u32 , initial_feed_n : u32 ) ! void {
335- const slot = & slots [slot_idx ];
349+ fn drainAndSend (ring : * IoUring , slot : * Slot , initial_feed_n : u32 ) ! void {
336350 var inline_pos : u32 = 0 ;
337351 var feed_n = initial_feed_n ;
338352
@@ -342,63 +356,56 @@ fn drainAndSend(ring: *IoUring, slot_idx: u32, initial_feed_n: u32) !void {
342356 switch (result ) {
343357 .protocol_error = > {
344358 if (inline_pos > 0 ) {
345- submitInline (ring , slot_idx , inline_pos , true ) catch {};
359+ submitInline (ring , slot , inline_pos , true ) catch {};
346360 } else {
347- _ = try ring .close (ud (.close , slot_idx ), slot .fd );
361+ _ = try ring .close (ud (.close , slot . fd ), slot .fd );
348362 }
349363 return ;
350364 },
351365 .need_more = > {
352366 if (inline_pos > 0 ) {
353- try submitInline (ring , slot_idx , inline_pos , false );
367+ try submitInline (ring , slot , inline_pos , false );
354368 return ;
355369 }
356370 const buf = slot .parser .recv_slot ();
357371 if (buf .len == 0 ) {
358- _ = try ring .close (ud (.close , slot_idx ), slot .fd );
372+ _ = try ring .close (ud (.close , slot . fd ), slot .fd );
359373 return ;
360374 }
361- _ = try ring .recv (ud (.recv , slot_idx ), slot .fd , .{ .buffer = buf }, 0 );
375+ _ = try ring .recv (ud (.recv , slot . fd ), slot .fd , .{ .buffer = buf }, 0 );
362376 return ;
363377 },
364378 .ready = > | req | {
365379 const needs_big = std .mem .startsWith (u8 , req .path , "/json/" );
366380 if (needs_big ) {
367- // JSON responses don't fit in the 4 KiB inline buffer
368- // and can't be batched with other responses. Flush any
369- // queued inline bytes first, then dispatch JSON into
370- // big_buf and submit it as its own send.
371381 if (inline_pos > 0 ) {
372- // Defer JSON to the next drain pass after the
373- // inline batch flushes — leave the parser pointing
374- // at this request by NOT resetting yet. handleSend
375- // completion will call drainAndSend(0) and feed(0)
376- // will re-yield this same request.
377- try submitInline (ring , slot_idx , inline_pos , false );
382+ // Flush queued non-JSON bytes first; the JSON
383+ // request is re-yielded on the next drain pass
384+ // because we haven't called parser.reset yet.
385+ try submitInline (ring , slot , inline_pos , false );
378386 return ;
379387 }
380388 if (slot .big_idx == null ) {
381389 slot .big_idx = bigAcquire () orelse {
382- // Pool exhausted — refuse this connection.
383- _ = try ring .close (ud (.close , slot_idx ), slot .fd );
390+ _ = try ring .close (ud (.close , slot .fd ), slot .fd );
384391 return ;
385392 };
386393 }
387394 const big = bigSlice (slot .big_idx .? );
388395 const resp = handlers .handle (req , & ds , big );
389396 slot .parser .reset (slot .parser .consumed ());
390- try submitBig (ring , slot_idx , @intCast (resp .bytes .len ), resp .close );
397+ try submitBig (ring , slot , @intCast (resp .bytes .len ), resp .close );
391398 return ;
392399 }
393400 const resp = handlers .handle (req , & ds , slot .write_inline [inline_pos .. ]);
394401 inline_pos += @intCast (resp .bytes .len );
395402 slot .parser .reset (slot .parser .consumed ());
396403 if (resp .close ) {
397- try submitInline (ring , slot_idx , inline_pos , true );
404+ try submitInline (ring , slot , inline_pos , true );
398405 return ;
399406 }
400407 if (inline_pos > WRITE_INLINE_FLUSH_AT ) {
401- try submitInline (ring , slot_idx , inline_pos , false );
408+ try submitInline (ring , slot , inline_pos , false );
402409 return ;
403410 }
404411 // Loop: feed(0) for the next pipelined request.
@@ -411,21 +418,19 @@ fn bigSlice(idx: u32) []u8 {
411418 return & big_pool [idx ];
412419}
413420
414- fn submitInline (ring : * IoUring , slot_idx : u32 , len : u32 , close_after : bool ) ! void {
415- const slot = & slots [slot_idx ];
421+ fn submitInline (ring : * IoUring , slot : * Slot , len : u32 , close_after : bool ) ! void {
416422 slot .send_ptr = & slot .write_inline ;
417423 slot .send_len = len ;
418424 slot .send_off = 0 ;
419425 slot .close_after_send = close_after ;
420- _ = try ring .send (ud (.send , slot_idx ), slot .fd , slot .write_inline [0.. len ], linux .MSG .NOSIGNAL );
426+ _ = try ring .send (ud (.send , slot . fd ), slot .fd , slot .write_inline [0.. len ], linux .MSG .NOSIGNAL );
421427}
422428
423- fn submitBig (ring : * IoUring , slot_idx : u32 , len : u32 , close_after : bool ) ! void {
424- const slot = & slots [slot_idx ];
429+ fn submitBig (ring : * IoUring , slot : * Slot , len : u32 , close_after : bool ) ! void {
425430 const buf = bigSlice (slot .big_idx .? );
426431 slot .send_ptr = buf .ptr ;
427432 slot .send_len = len ;
428433 slot .send_off = 0 ;
429434 slot .close_after_send = close_after ;
430- _ = try ring .send (ud (.send , slot_idx ), slot .fd , buf [0.. len ], linux .MSG .NOSIGNAL );
435+ _ = try ring .send (ud (.send , slot . fd ), slot .fd , buf [0.. len ], linux .MSG .NOSIGNAL );
431436}
0 commit comments