Skip to content

Commit 4dc1c47

Browse files
repro: add TCPSocket.open ENOTCONN stress test with io_wait instrumentation
Add DEBUG_IO_WAIT logging to io_wait_transfer so every io_uring poll completion is emitted to stderr; lines with flags outside the requested mask are tagged UNEXPECTED. Add test/stress/tcp_connect.rb: hammers TCPSocket.open from N concurrent fibers under the URing backend and reports any ENOTCONN errors, exiting 1 if any are observed. See bug.md for the full investigation context. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent ce3f585 commit 4dc1c47

2 files changed

Lines changed: 137 additions & 1 deletion

File tree

ext/io/event/selector/uring.c

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ enum {
2222
DEBUG = 0,
2323
DEBUG_COMPLETION = 0,
2424
DEBUG_CQE = 0,
25+
// Log all io_wait poll completions during the TCPSocket.open ENOTCONN repro.
26+
// Emits a line per completion so `grep UNEXPECTED` isolates surprising flags.
27+
DEBUG_IO_WAIT = 1,
2528
};
2629

2730
enum {URING_ENTRIES = 64};
@@ -627,7 +630,23 @@ VALUE io_wait_transfer(VALUE _arguments) {
627630
} else if (result > 0) {
628631
// We explicitly filter the resulting events based on the requested events.
629632
// In some cases, poll will report events we didn't ask for.
630-
return RB_INT2NUM(events_from_poll_flags(arguments->waiting->result & arguments->flags));
633+
int translated = events_from_poll_flags(result);
634+
int returned = events_from_poll_flags(result & arguments->flags);
635+
636+
if (DEBUG_IO_WAIT) {
637+
short unexpected = result & ~arguments->flags;
638+
// Log all completions; mark ones that carry flags outside the requested mask.
639+
fprintf(stderr,
640+
"io_wait: result=%#06x flags=%#06x translated=%#x returned=%#x%s\n",
641+
(unsigned)result,
642+
(unsigned)arguments->flags,
643+
(unsigned)translated,
644+
(unsigned)returned,
645+
unexpected ? " UNEXPECTED" : ""
646+
);
647+
}
648+
649+
return RB_INT2NUM(returned);
631650
} else {
632651
return Qfalse;
633652
}

test/stress/tcp_connect.rb

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/usr/bin/env ruby
2+
# frozen_string_literal: true
3+
4+
# Released under the MIT License.
5+
# Copyright, 2026, by Samuel Williams.
6+
7+
# Stress test for Errno::ENOTCONN on TCPSocket.open with the io_uring backend.
8+
#
9+
# When built with DEBUG_IO_WAIT=1 in uring.c, each io_wait poll completion is
10+
# logged to stderr. Grep for "UNEXPECTED" to find completions whose raw flags
11+
# contained bits outside the originally-requested poll mask.
12+
#
13+
# Usage (on Linux with liburing):
14+
# ruby test/stress/tcp_connect.rb [CONCURRENCY [TOTAL]]
15+
#
16+
# Exits 0 on clean run, 1 if any ENOTCONN was observed.
17+
18+
require "io/event"
19+
require "io/event/test_scheduler"
20+
require "socket"
21+
22+
unless IO::Event::Selector.const_defined?(:URing)
23+
abort "ERROR: URing backend not available. Run this on Linux with liburing installed."
24+
end
25+
26+
CONCURRENCY = (ARGV[0] || 100).to_i
27+
TOTAL = (ARGV[1] || 5_000).to_i
28+
PER_FIBER = (TOTAL.to_f / CONCURRENCY).ceil
29+
30+
$stderr.puts "io-event stress test: TCPSocket.open ENOTCONN repro"
31+
$stderr.puts " Backend : #{IO::Event::Selector::URing}"
32+
$stderr.puts " Concurrency: #{CONCURRENCY} fibers"
33+
$stderr.puts " Total : #{TOTAL} connections (#{PER_FIBER} per fiber)"
34+
$stderr.puts
35+
36+
# ── acceptor ────────────────────────────────────────────────────────────────
37+
38+
server = TCPServer.new("127.0.0.1", 0)
39+
port = server.addr[1]
40+
$stderr.puts "Server listening on 127.0.0.1:#{port}"
41+
42+
# Accept connections in a background thread so the scheduler fibers are never
43+
# blocked waiting for an accept. Each accepted socket is closed immediately so
44+
# the connect side sees a clean EOF rather than a hung connection.
45+
acceptor_thread = Thread.new do
46+
loop do
47+
begin
48+
conn = server.accept_nonblock
49+
conn.close
50+
rescue IO::WaitReadable
51+
IO.select([server])
52+
retry
53+
rescue IOError, Errno::EBADF
54+
break # server was closed
55+
rescue => e
56+
$stderr.puts "[acceptor] #{e.class}: #{e.message}"
57+
end
58+
end
59+
end
60+
61+
# ── scheduler ───────────────────────────────────────────────────────────────
62+
63+
counters = Hash.new(0)
64+
mutex = Mutex.new
65+
66+
selector = IO::Event::Selector::URing.new(Fiber.current)
67+
scheduler = IO::Event::TestScheduler.new(selector: selector)
68+
69+
Fiber.set_scheduler(scheduler)
70+
71+
# ── worker fibers ────────────────────────────────────────────────────────────
72+
73+
CONCURRENCY.times do
74+
Fiber.schedule do
75+
PER_FIBER.times do
76+
begin
77+
sock = TCPSocket.new("127.0.0.1", port)
78+
sock.close
79+
mutex.synchronize { counters[:ok] += 1 }
80+
rescue Errno::ENOTCONN => e
81+
mutex.synchronize { counters[:ENOTCONN] += 1 }
82+
$stderr.puts "[ENOTCONN] #{e.message}"
83+
rescue Errno::ECONNREFUSED
84+
mutex.synchronize { counters[:ECONNREFUSED] += 1 }
85+
rescue => e
86+
mutex.synchronize { counters[e.class] += 1 }
87+
$stderr.puts "[ERROR] #{e.class}: #{e.message}"
88+
end
89+
end
90+
end
91+
end
92+
93+
scheduler.run
94+
Fiber.set_scheduler(nil)
95+
96+
# ── tear-down ────────────────────────────────────────────────────────────────
97+
98+
server.close
99+
acceptor_thread.join(2)
100+
101+
# ── report ───────────────────────────────────────────────────────────────────
102+
103+
$stderr.puts
104+
$stderr.puts "=== Results ==="
105+
$stderr.puts " Successful : #{counters[:ok]}"
106+
$stderr.puts " ENOTCONN : #{counters[:ENOTCONN]}"
107+
$stderr.puts " Other : #{(counters.reject { |k, _| k == :ok || k == :ENOTCONN }).inspect}" unless counters.size <= 2
108+
109+
if counters[:ENOTCONN] > 0
110+
$stderr.puts
111+
$stderr.puts "REPRODUCED: #{counters[:ENOTCONN]} ENOTCONN error(s) observed."
112+
exit 1
113+
else
114+
$stderr.puts
115+
$stderr.puts "No ENOTCONN errors observed in this run."
116+
exit 0
117+
end

0 commit comments

Comments
 (0)