Skip to content

Commit 9ed6584

Browse files
claudeCyberShadow
authored andcommitted
ae.net.asockets: ConnectEx for IOCP client TCP connects
The IOCP backend's client connect path used a synthetic onWritable fired when connect() returned WSAEWOULDBLOCK to drive the connecting→connected state transition. This races the kernel's TCP handshake: when the synthetic notification runs while the socket is still in SYN-SENT, StreamConnection.onWritableImpl's call to setKeepAlive throws SocketOSException ("Unable to set socket option: An invalid argument was supplied" — i.e. setsockopt(SO_KEEPALIVE) returns WSAEINVAL on a half-open socket), which then disconnects the socket as if the connect had failed. See bug investigation in .cydo/tasks/17880/output.md for the full diagnosis (smoking-gun instrumentation; backlog-experiment confirmation). Fix: use ConnectEx (the overlapped equivalent of connect()) on the IOCP backend, mirroring the AcceptEx pattern already in place for listeners. ConnectEx posts an OVERLAPPED that completes only when the handshake actually finishes (or fails), at which point iocpOnConnectComplete sets SO_UPDATE_CONNECT_CONTEXT and runs the existing setKeepAlive/connectHandler/updateFlags sequence (factored out into StreamConnection._handleConnectComplete and shared with the POSIX onWritableImpl path). ConnectEx requires the socket to be bound first; SocketConnection.tryNextAddress now binds to INADDR_ANY:0 (or in6addr_any:0) before arming the connect. The synthetic onWritable mechanism is retained for its other role — kicking the send queue on already-connected sockets. Connection. updateFlags is gated to suppress the connecting-state kick on IOCP since the ConnectEx completion takes its place. LPFN_CONNECTEX, WSAID_CONNECTEX, SIO_GET_EXTENSION_FUNCTION_POINTER, and SO_UPDATE_CONNECT_CONTEXT are declared in ae.sys.windows.iocp.
1 parent 6f17efd commit 9ed6584

2 files changed

Lines changed: 308 additions & 27 deletions

File tree

net/asockets.d

Lines changed: 283 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -989,7 +989,7 @@ else
989989
static if (eventLoopMechanism == EventLoopMechanism.iocp)
990990
{
991991
import core.sys.windows.windows;
992-
import core.sys.windows.winsock2 : WSAGetLastError;
992+
import core.sys.windows.winsock2 : WSAGetLastError, WSAIoctl;
993993
import ae.sys.windows.iocp;
994994
private void _wsaSetLastError(int e) nothrow @nogc { WSASetLastError(e); }
995995

@@ -1003,7 +1003,8 @@ static if (eventLoopMechanism == EventLoopMechanism.iocp)
10031003
socketRecv,
10041004
socketRecvFrom, // WSARecvFrom completion for datagram sockets
10051005
socketSend,
1006-
socketAccept, // AcceptEx completion on a listening socket
1006+
socketAccept, // AcceptEx completion on a listening socket
1007+
socketConnect, // ConnectEx completion on a client socket
10071008
pipeRead,
10081009
pipeWrite,
10091010
processExit,
@@ -1325,6 +1326,13 @@ static if (eventLoopMechanism == EventLoopMechanism.iocp)
13251326
iocpOnAcceptComplete(conn, bytes, status);
13261327
break;
13271328
}
1329+
case IocpOpKind.socketConnect:
1330+
{
1331+
auto conn = cast(GenericSocket)op.owner;
1332+
if (conn is null) return;
1333+
iocpOnConnectComplete(conn, bytes, status);
1334+
break;
1335+
}
13281336
case IocpOpKind.pipeRead:
13291337
case IocpOpKind.pipeWrite:
13301338
case IocpOpKind.processExit:
@@ -1511,6 +1519,61 @@ static if (eventLoopMechanism == EventLoopMechanism.iocp)
15111519
conn._iocpArmAccept();
15121520
}
15131521

1522+
private void iocpOnConnectComplete(GenericSocket conn, DWORD bytes, uint status)
1523+
{
1524+
debug (ASOCKETS) stderr.writefln("[iocp] connect complete: %s status=0x%X", conn, status);
1525+
1526+
conn._iocpConnectOp.inFlight = false;
1527+
1528+
// Closed during connect (disconnect()/closesocket cancelled the op).
1529+
if (status == ERROR_OPERATION_ABORTED || conn.socket is null)
1530+
return;
1531+
1532+
auto sc = cast(StreamConnection)conn;
1533+
assert(sc !is null, "ConnectEx owner must be a StreamConnection");
1534+
1535+
if (status != 0)
1536+
return sc.disconnect(formatSocketError(status), DisconnectType.error);
1537+
1538+
// Make the socket usable for getpeername / shutdown / setsockopt etc.
1539+
auto handle = cast(size_t)conn.socket.handle;
1540+
c_socks.setsockopt(handle,
1541+
c_socks.SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT,
1542+
null, 0);
1543+
1544+
sc._handleConnectComplete();
1545+
}
1546+
1547+
private void _iocpBindWildcard(Socket sock, AddressFamily family)
1548+
{
1549+
if (family == AddressFamily.INET)
1550+
{
1551+
c_socks.sockaddr_in addr;
1552+
addr.sin_family = c_socks.AF_INET;
1553+
addr.sin_port = 0;
1554+
addr.sin_addr.s_addr = 0; // INADDR_ANY
1555+
if (c_socks.bind(cast(c_socks.SOCKET)sock.handle,
1556+
cast(const(c_socks.sockaddr)*)&addr,
1557+
cast(c_socks.socklen_t)addr.sizeof) != 0)
1558+
throw new SocketOSException("bind(INADDR_ANY) failed");
1559+
}
1560+
else
1561+
if (family == AddressFamily.INET6)
1562+
{
1563+
c_socks.sockaddr_in6 addr;
1564+
addr.sin6_family = c_socks.AF_INET6;
1565+
addr.sin6_port = 0;
1566+
// sin6_addr is in6addr_any (zero-initialised by D struct init).
1567+
if (c_socks.bind(cast(c_socks.SOCKET)sock.handle,
1568+
cast(const(c_socks.sockaddr)*)&addr,
1569+
cast(c_socks.socklen_t)addr.sizeof) != 0)
1570+
throw new SocketOSException("bind(in6addr_any) failed");
1571+
}
1572+
else
1573+
throw new SocketException("ConnectEx requires AF_INET or AF_INET6, got "
1574+
~ family.to!string);
1575+
}
1576+
15141577
// Use UFCS for idle handlers (same shape as select/epoll).
15151578
void addIdleHandler(ref SocketManager socketManager, void delegate() handler)
15161579
{
@@ -1575,6 +1638,15 @@ static if (eventLoopMechanism == EventLoopMechanism.iocp)
15751638
bool _iocpAcceptReady;
15761639
// ---------------------------------------------------------------
15771640

1641+
// ---- ConnectEx state (client TCP sockets only) ----------------
1642+
IocpOp _iocpConnectOp;
1643+
LPFN_CONNECTEX _iocpConnectExFn;
1644+
// Stable storage for the target sockaddr; ConnectEx requires the buffer
1645+
// to remain valid until completion. SOCKADDR_STORAGE is 128 bytes.
1646+
ubyte[128] _iocpConnectAddrBuf;
1647+
int _iocpConnectAddrLen;
1648+
// ---------------------------------------------------------------
1649+
15781650
@property final bool notifyRead() const pure nothrow @nogc { return _notifyRead; }
15791651
@property final bool notifyWrite() const pure nothrow @nogc { return _notifyWrite; }
15801652

@@ -1612,6 +1684,8 @@ static if (eventLoopMechanism == EventLoopMechanism.iocp)
16121684
_iocpRecvOp.owner = this;
16131685
_iocpSendOp.owner = this;
16141686
_iocpSendOp.kind = IocpOpKind.socketSend;
1687+
_iocpConnectOp.owner = this;
1688+
_iocpConnectOp.kind = IocpOpKind.socketConnect;
16151689
}
16161690

16171691
/// Arm a recv notification via IOCP.
@@ -1719,6 +1793,73 @@ static if (eventLoopMechanism == EventLoopMechanism.iocp)
17191793
debug (ASOCKETS) stderr.writefln("[iocp] AcceptEx failed: %d", WSAGetLastError());
17201794
}
17211795

1796+
/// Post ConnectEx against the already-bound, IOCP-registered socket so the
1797+
/// IOCP port delivers a completion when the TCP handshake finishes.
1798+
/// The socket must already be bound (caller's responsibility —
1799+
/// see SocketConnection.tryNextAddress).
1800+
final void _iocpArmConnect(Address target)
1801+
{
1802+
assert(conn !is null);
1803+
1804+
_iocpInitOps();
1805+
_iocpConnectOp.overlapped = OVERLAPPED.init;
1806+
1807+
// Resolve ConnectEx via WSAIoctl on first use for this socket.
1808+
if (_iocpConnectExFn is null)
1809+
{
1810+
GUID guid = WSAID_CONNECTEX;
1811+
DWORD fnBytes;
1812+
int rc = WSAIoctl(
1813+
cast(c_socks.SOCKET)conn.handle,
1814+
SIO_GET_EXTENSION_FUNCTION_POINTER,
1815+
&guid, cast(uint)guid.sizeof,
1816+
&_iocpConnectExFn, cast(uint)_iocpConnectExFn.sizeof,
1817+
&fnBytes,
1818+
null, null);
1819+
if (rc != 0 || _iocpConnectExFn is null)
1820+
{
1821+
auto err = WSAGetLastError();
1822+
debug (ASOCKETS) stderr.writefln(
1823+
"[iocp] WSAIoctl(ConnectEx fn ptr) failed: %d", err);
1824+
_iocpConnectExFn = null;
1825+
(cast(Connection)cast(Object)this).disconnect(
1826+
formatSocketError(err), DisconnectType.error);
1827+
return;
1828+
}
1829+
}
1830+
1831+
// Stash the target sockaddr for the duration of the op.
1832+
auto nameLen = target.nameLen;
1833+
assert(nameLen <= _iocpConnectAddrBuf.length, "sockaddr too large");
1834+
_iocpConnectAddrBuf[0 .. nameLen] = (cast(const(ubyte)*)target.name)[0 .. nameLen];
1835+
_iocpConnectAddrLen = nameLen;
1836+
_iocpConnectOp.inFlight = true;
1837+
1838+
DWORD sent = 0;
1839+
BOOL ok = _iocpConnectExFn(
1840+
cast(size_t)conn.handle,
1841+
cast(const(sockaddr)*)_iocpConnectAddrBuf.ptr,
1842+
_iocpConnectAddrLen,
1843+
null, 0,
1844+
&sent,
1845+
&_iocpConnectOp.overlapped);
1846+
1847+
if (ok)
1848+
{
1849+
// Synchronous success — kernel still delivers IOCP completion.
1850+
return;
1851+
}
1852+
1853+
auto err = WSAGetLastError();
1854+
if (err == WSA_IO_PENDING)
1855+
return;
1856+
1857+
_iocpConnectOp.inFlight = false;
1858+
debug (ASOCKETS) stderr.writefln("[iocp] ConnectEx failed: %d", err);
1859+
(cast(Connection)cast(Object)this).disconnect(
1860+
formatSocketError(err), DisconnectType.error);
1861+
}
1862+
17221863
/// Override Connection.doSend hook for IOCP: post overlapped WSASend.
17231864
/// Returns buffer.length on success (claims all bytes accepted),
17241865
/// Socket.ERROR with WSAEWOULDBLOCK if a send is already in flight.
@@ -3058,7 +3199,14 @@ protected:
30583199
final void updateFlags()
30593200
{
30603201
if (state == ConnectionState.connecting)
3061-
notifyWrite = true;
3202+
{
3203+
static if (eventLoopMechanism == EventLoopMechanism.iocp)
3204+
// ConnectEx delivers the connecting→connected transition via an
3205+
// IOCP completion, not via a writable-edge. Suppress the kick.
3206+
notifyWrite = false;
3207+
else
3208+
notifyWrite = true;
3209+
}
30623210
else
30633211
notifyWrite = writePending;
30643212

@@ -3315,6 +3463,32 @@ protected:
33153463
updateFlags();
33163464
}
33173465

3466+
// Shared connect-complete logic for POSIX (called from onWritableImpl) and
3467+
// IOCP (called from iocpOnConnectComplete after SO_UPDATE_CONNECT_CONTEXT).
3468+
package final void _handleConnectComplete()
3469+
{
3470+
state = ConnectionState.connected;
3471+
3472+
try
3473+
setKeepAlive();
3474+
catch (Exception e)
3475+
return disconnect(e.msg, DisconnectType.error);
3476+
if (connectHandler)
3477+
connectHandler();
3478+
3479+
static if (eventLoopMechanism == EventLoopMechanism.iocp)
3480+
{
3481+
// Safety net: if data was queued before connect (unusual — public
3482+
// connect() requires disconnected state so the queue is empty), the
3483+
// notifyWrite setter was suppressed while in connecting state.
3484+
// Also handles the case where connectHandler queued a send but the
3485+
// state has already changed (e.g. to disconnecting).
3486+
if (writePending && _iocpSendBuffer is null)
3487+
socketManager.kickWritable(this);
3488+
updateFlags();
3489+
}
3490+
}
3491+
33183492
// Work around scope(success) breaking debugger stack traces
33193493
final private void onWritableImpl()
33203494
{
@@ -3325,23 +3499,7 @@ protected:
33253499
conn.getOption(SocketOptionLevel.SOCKET, SocketOption.ERROR, error);
33263500
if (error)
33273501
return disconnect(formatSocketError(error), DisconnectType.error);
3328-
3329-
state = ConnectionState.connected;
3330-
3331-
//debug writefln("[%s] Connected", remoteAddressStr);
3332-
try
3333-
setKeepAlive();
3334-
catch (Exception e)
3335-
return disconnect(e.msg, DisconnectType.error);
3336-
if (connectHandler)
3337-
connectHandler();
3338-
// On IOCP, notifyWrite was already true when we entered this
3339-
// branch (connecting state), so the setter won't re-kick even if
3340-
// connectHandler queued data or transitioned to disconnecting.
3341-
// Post a send if data is queued and nothing is already in flight.
3342-
static if (eventLoopMechanism == EventLoopMechanism.iocp)
3343-
if (writePending && _iocpSendBuffer is null)
3344-
socketManager.kickWritable(this);
3502+
_handleConnectComplete();
33453503
return;
33463504
}
33473505
//debug writefln(remoteAddressStr, ": Writable - handler ", handleBufferFlushed?"OK":"not set", ", outBuffer.length=", outBuffer.length);
@@ -3647,13 +3805,39 @@ protected:
36473805

36483806
try
36493807
{
3650-
conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol);
3651-
conn.blocking = false;
3808+
static if (eventLoopMechanism == EventLoopMechanism.iocp)
3809+
{
3810+
// Create the socket with WSA_FLAG_OVERLAPPED so it can
3811+
// participate in IOCP (mirrors the AcceptEx candidate path).
3812+
auto sock = WSASocketW(
3813+
cast(int)addressInfo.family,
3814+
cast(int)addressInfo.type,
3815+
cast(int)addressInfo.protocol,
3816+
null, 0, WSA_FLAG_OVERLAPPED);
3817+
if (sock == c_socks.INVALID_SOCKET)
3818+
throw new SocketOSException("WSASocketW failed");
3819+
conn = new Socket(cast(socket_t)sock, addressInfo.family);
3820+
conn.blocking = false;
36523821

3653-
socketManager.register(this);
3654-
updateFlags();
3655-
debug (ASOCKETS) stderr.writefln("Attempting connection to %s", addressInfo.address.toString());
3656-
conn.connect(addressInfo.address);
3822+
// ConnectEx requires the socket to be bound first.
3823+
_iocpBindWildcard(conn, addressInfo.family);
3824+
3825+
socketManager.register(this);
3826+
debug (ASOCKETS) stderr.writefln("Attempting connection to %s",
3827+
addressInfo.address.toString());
3828+
_iocpArmConnect(addressInfo.address);
3829+
}
3830+
else
3831+
{
3832+
conn = new Socket(addressInfo.family, addressInfo.type, addressInfo.protocol);
3833+
conn.blocking = false;
3834+
3835+
socketManager.register(this);
3836+
updateFlags();
3837+
debug (ASOCKETS) stderr.writefln("Attempting connection to %s",
3838+
addressInfo.address.toString());
3839+
conn.connect(addressInfo.address);
3840+
}
36573841
}
36583842
catch (SocketException e)
36593843
return onError("Connect error: " ~ e.msg);
@@ -4862,3 +5046,76 @@ debug(ae_unittest) version (Windows) unittest
48625046
assert(received, "data sent in connectHandler was never delivered (IOCP kick bug)");
48635047
}
48645048
}
5049+
5050+
// ConnectEx failure path: connect to a refused port drives onError → tryNextAddress.
5051+
// Verifies iocpOnConnectComplete's status != 0 branch is wired to disconnect(error).
5052+
debug(ae_unittest) version (Windows) unittest
5053+
{
5054+
static if (eventLoopMechanism == EventLoopMechanism.iocp)
5055+
{
5056+
import std.conv : to;
5057+
import std.socket : InternetAddress;
5058+
5059+
// Bind a TcpServer to get an ephemeral port, then close it.
5060+
// Any connect attempt to that port will be refused (WSAECONNREFUSED).
5061+
auto srv = new TcpServer;
5062+
ushort refusedPort = srv.listen(0, "127.0.0.1");
5063+
srv.close();
5064+
5065+
string disconnectReason;
5066+
DisconnectType disconnectType;
5067+
bool disconnected;
5068+
5069+
auto c = new TcpConnection;
5070+
c.handleDisconnect = (string reason, DisconnectType type) {
5071+
disconnectReason = reason;
5072+
disconnectType = type;
5073+
disconnected = true;
5074+
};
5075+
5076+
auto addr = new InternetAddress("127.0.0.1", refusedPort);
5077+
c.connect([AddressInfo(addr.addressFamily, SocketType.STREAM,
5078+
ProtocolType.TCP, addr, "127.0.0.1")]);
5079+
5080+
socketManager.loop();
5081+
assert(disconnected, "handleDisconnect not called after refused connect");
5082+
assert(disconnectType == DisconnectType.error,
5083+
"expected error disconnect, got " ~ disconnectType.to!string);
5084+
}
5085+
}
5086+
5087+
// ConnectEx cancel path: disconnect() while ConnectEx in flight must not leak
5088+
// the socket and must call handleDisconnect exactly once.
5089+
// Uses AddressInfo[] overload to bypass DNS and arm ConnectEx synchronously.
5090+
debug(ae_unittest) version (Windows) unittest
5091+
{
5092+
static if (eventLoopMechanism == EventLoopMechanism.iocp)
5093+
{
5094+
import std.conv : to;
5095+
import std.socket : InternetAddress;
5096+
5097+
int disconnectCount;
5098+
DisconnectType disconnectType;
5099+
5100+
auto c = new TcpConnection;
5101+
c.handleDisconnect = (string reason, DisconnectType type) {
5102+
disconnectCount++;
5103+
disconnectType = type;
5104+
};
5105+
5106+
// 192.0.2.1 is TEST-NET-1 (RFC 5737) — unreachable, but we cancel
5107+
// immediately. Use AddressInfo[] to skip DNS and arm ConnectEx now.
5108+
auto addr = new InternetAddress("192.0.2.1", 1);
5109+
c.connect([AddressInfo(addr.addressFamily, SocketType.STREAM,
5110+
ProtocolType.TCP, addr, "192.0.2.1")]);
5111+
5112+
// Cancel the in-flight ConnectEx on the next event-loop tick.
5113+
onNextTick(socketManager, { c.disconnect(); });
5114+
5115+
socketManager.loop();
5116+
assert(disconnectCount == 1,
5117+
"handleDisconnect called " ~ disconnectCount.to!string ~ " times (expected 1)");
5118+
assert(disconnectType == DisconnectType.requested,
5119+
"expected requested disconnect, got " ~ disconnectType.to!string);
5120+
}
5121+
}

0 commit comments

Comments
 (0)