Skip to content

Commit 2ff13b8

Browse files
vkuttypCopilot
andcommitted
Migrate to AsyncStreamBridge: zero-hop message reads
Replace AsyncChannelBridge (eventLoop.execute per read, ~3ms/hop) with AsyncStreamBridge (AsyncThrowingStream.Continuation.yield from channelRead, zero extra hop). Also make send() fire-and-forget in all three connections. Postgres cold connect: 68ms → 22ms (68% faster) MSSQL warm query: 53% faster than FreeTDS Postgres warm query: matches postgres-nio (0.37ms) MySQL warm query: 29% faster than mysql-nio (0.34ms vs 0.48ms) Changes: - Sources/CosmoSQLCore/AsyncStreamBridge.swift: new file with AsyncStreamBridge and MessageReader classes - Sources/CosmoPostgres/PostgresConnection.swift: migrate to msgReader/MessageReader, send() fire-and-forget, rewritten handshake() - Sources/CosmoMySQL/MySQLConnection.swift: same migration - Sources/CosmoMSSQL/MSSQLConnection.swift: same migration, sendPacket() now synchronous fire-and-forget - cosmo-benchmark: better error reporting for warm connect tests, use do/catch instead of try? for warm connection open Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 7ada3c7 commit 2ff13b8

5 files changed

Lines changed: 152 additions & 71 deletions

File tree

Sources/CosmoMSSQL/MSSQLConnection.swift

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
197197
private var isClosed: Bool = false
198198
/// True when the connection is still open and usable.
199199
public var isOpen: Bool { !isClosed }
200-
private var bridge: AsyncChannelBridge?
200+
private var msgReader: MessageReader? // AsyncThrowingStream-based; no eventLoop hop per read
201201
/// Tracks whether we are inside an explicit transaction (BEGIN TRANSACTION).
202202
private var inTransaction: Bool = false
203203
/// Current transaction descriptor — updated from ENVCHANGE type 8/9/10 responses.
@@ -239,14 +239,15 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
239239

240240
private func handshake() async throws {
241241
// 1. Add pipeline: TDSTLSFramer (pass-through initially) + framing + bridge
242-
let b = AsyncChannelBridge()
243-
bridge = b
242+
let bridge = AsyncStreamBridge()
244243
// Swift 6: ByteToMessageHandler has Sendable marked unavailable (event-loop-bound).
245-
let frameBox = _UnsafeSendable(ByteToMessageHandler(TDSFramingHandler()))
244+
let bridgeBox = _UnsafeSendable(bridge)
245+
let frameBox = _UnsafeSendable(ByteToMessageHandler(TDSFramingHandler()))
246246
let framer = tlsFramer
247247
try await channel.eventLoop.submit {
248-
try self.channel.pipeline.syncOperations.addHandlers([framer, frameBox.value, b])
248+
try self.channel.pipeline.syncOperations.addHandlers([framer, frameBox.value, bridgeBox.value])
249249
}.get()
250+
msgReader = MessageReader(bridge)
250251

251252
// 2. Pre-Login — negotiate encryption preference
252253
let preLoginResp = try await sendPreLogin()
@@ -278,7 +279,7 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
278279
let wantEnc: PreLoginEncryption = config.tls == .disable ? .off : .on
279280
let req = TDSPreLoginRequest(encryption: wantEnc)
280281
var payload = req.encode(allocator: channel.allocator)
281-
try await sendPacket(type: .preLogin, payload: &payload)
282+
sendPacket(type: .preLogin, payload: &payload)
282283
let responseBuffer = try await receivePacket()
283284
var buf = responseBuffer
284285
return try TDSPreLoginResponse.decode(from: &buf)
@@ -363,7 +364,7 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
363364
login.optionFlags2 |= 0x80 // fIntSecurity: use integrated (SSPI) auth
364365
}
365366
var payload = login.encode(allocator: channel.allocator)
366-
try await sendPacket(type: .tdsLogin7, payload: &payload)
367+
sendPacket(type: .tdsLogin7, payload: &payload)
367368
let responseBuffer = try await receivePacket()
368369
var buf = responseBuffer
369370
var dec = TDSTokenDecoder()
@@ -385,7 +386,7 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
385386
)
386387
var authBuf = channel.allocator.buffer(capacity: authenticate.count)
387388
authBuf.writeBytes(authenticate)
388-
try await sendPacket(type: .sspiAuth, payload: &authBuf)
389+
sendPacket(type: .sspiAuth, payload: &authBuf)
389390
let authResponse = try await receivePacket()
390391
var authBuf2 = authResponse
391392
var dec2 = TDSTokenDecoder()
@@ -479,7 +480,7 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
479480
return try await withTimeout(config.queryTimeout) {
480481
let rpc = TDSRPCProcRequest(procName: name, parameters: parameters)
481482
var payload = rpc.encode(allocator: self.channel.allocator)
482-
try await self.sendPacket(type: .rpc, payload: &payload)
483+
self.sendPacket(type: .rpc, payload: &payload)
483484
var buf = try await self.receivePacket()
484485
var dec = TDSTokenDecoder()
485486
try dec.decode(buffer: &buf)
@@ -561,7 +562,7 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
561562
/// Run a SQL batch and return the full decoder state (for callers needing resultSets).
562563
private func runBatchDecoder(_ sql: String) async throws -> TDSTokenDecoder {
563564
var payload = encodeSQLBatch(sql: sql)
564-
try await sendPacket(type: .sqlBatch, payload: &payload)
565+
sendPacket(type: .sqlBatch, payload: &payload)
565566
var buf = try await receivePacket()
566567
var dec = TDSTokenDecoder()
567568
try dec.decode(buffer: &buf)
@@ -575,7 +576,7 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
575576
private func runRPCDecoder(_ sql: String, binds: [SQLValue]) async throws -> TDSTokenDecoder {
576577
let rpc = TDSRPCRequest(sql: sql, binds: binds)
577578
var payload = rpc.encode(allocator: channel.allocator)
578-
try await sendPacket(type: .rpc, payload: &payload)
579+
sendPacket(type: .rpc, payload: &payload)
579580
var buf = try await receivePacket()
580581
var dec = TDSTokenDecoder()
581582
try dec.decode(buffer: &buf)
@@ -622,7 +623,7 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
622623
try await mssqlWithTimeout(seconds, work)
623624
}
624625

625-
private func sendPacket(type: TDSPacketType, payload: inout ByteBuffer) async throws {
626+
private func sendPacket(type: TDSPacketType, payload: inout ByteBuffer) {
626627
// Encode TDS packet(s) and send each packet individually.
627628
// SQL Server can reject multi-packet TDS messages sent in a single write.
628629
let payloadSize = payload.readableBytes
@@ -646,8 +647,10 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
646647
pkt.writeBytes(payload.getBytes(at: payload.readerIndex + offset, length: chunkLen)!)
647648

648649
if isLast {
649-
// Last chunk: writeAndFlush sends all buffered writes in one TCP segment
650-
try await channel.writeAndFlush(pkt).get()
650+
// Last chunk: writeAndFlush flushes all buffered writes in one TCP segment.
651+
// Fire-and-forget — the server can't reply until it receives this, so reads
652+
// will naturally follow the write through the event loop's FIFO ordering.
653+
channel.writeAndFlush(pkt, promise: nil)
651654
} else {
652655
channel.write(pkt, promise: nil)
653656
}
@@ -656,10 +659,12 @@ public final class MSSQLConnection: SQLDatabase, @unchecked Sendable {
656659
}
657660
}
658661

659-
/// Receive one complete TDS message via the async bridge handler.
662+
/// Receive one complete TDS message via the async stream bridge handler.
660663
private func receivePacket() async throws -> ByteBuffer {
661-
guard let b = bridge else { throw SQLError.connectionClosed }
662-
return try await b.waitForMessage(on: channel.eventLoop)
664+
guard let reader = msgReader, let buf = try await reader.next() else {
665+
throw SQLError.connectionClosed
666+
}
667+
return buf
663668
}
664669

665670
// MARK: - SQL Batch encoding

Sources/CosmoMySQL/MySQLConnection.swift

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public final class MySQLConnection: SQLDatabase, @unchecked Sendable {
6363
private var capabilities: MySQLCapabilities = .clientDefault
6464
private var sequenceID: UInt8 = 0
6565
private var isClosed: Bool = false
66-
private var bridge: AsyncChannelBridge?
66+
private var msgReader: MessageReader? // AsyncThrowingStream-based; no eventLoop hop per read
6767
private var inTransaction: Bool = false
6868

6969
// Called for each MySQL warning/note message received from the server.
@@ -96,13 +96,14 @@ public final class MySQLConnection: SQLDatabase, @unchecked Sendable {
9696
// MARK: - Handshake
9797

9898
private func handshake(sslContext: NIOSSLContext? = nil) async throws {
99-
let b = AsyncChannelBridge()
100-
bridge = b
99+
let bridge = AsyncStreamBridge()
101100
// Swift 6: ByteToMessageHandler has Sendable marked unavailable (event-loop-bound).
102-
let frameBox = _UnsafeSendable(ByteToMessageHandler(MySQLFramingHandler()))
101+
let bridgeBox = _UnsafeSendable(bridge)
102+
let frameBox = _UnsafeSendable(ByteToMessageHandler(MySQLFramingHandler()))
103103
try await channel.eventLoop.submit {
104-
try self.channel.pipeline.syncOperations.addHandlers([frameBox.value, b])
104+
try self.channel.pipeline.syncOperations.addHandlers([frameBox.value, bridgeBox.value])
105105
}.get()
106+
msgReader = MessageReader(bridge)
106107

107108
// 1. Receive server handshake
108109
var serverHSPacket = try await receivePacket()
@@ -146,7 +147,7 @@ public final class MySQLConnection: SQLDatabase, @unchecked Sendable {
146147
let pkt = ByteBuffer.mysqlPacket(sequenceID: sequenceID,
147148
body: body,
148149
allocator: channel.allocator)
149-
try await send(pkt)
150+
send(pkt)
150151
}
151152

152153
// sslContext: reuse the pool-level NIOSSLContext instead of constructing one per connection.
@@ -207,7 +208,7 @@ public final class MySQLConnection: SQLDatabase, @unchecked Sendable {
207208
let pkt = ByteBuffer.mysqlPacket(sequenceID: sequenceID,
208209
body: body,
209210
allocator: channel.allocator)
210-
try await send(pkt)
211+
send(pkt)
211212
}
212213

213214
private func readAuthResult(authPlugin: String, challenge: [UInt8]) async throws {
@@ -240,7 +241,7 @@ public final class MySQLConnection: SQLDatabase, @unchecked Sendable {
240241
sequenceID += 1
241242
let pkt = ByteBuffer.mysqlPacket(sequenceID: sequenceID, body: body,
242243
allocator: channel.allocator)
243-
try await send(pkt)
244+
send(pkt)
244245
try await readAuthResult(authPlugin: authPlugin, challenge: challenge)
245246
case 0x02:
246247
// Server requesting RSA public key (non-TLS path) — not implemented
@@ -350,7 +351,7 @@ public final class MySQLConnection: SQLDatabase, @unchecked Sendable {
350351
sequenceID = 0
351352
let pkt = ByteBuffer.mysqlPacket(sequenceID: sequenceID, body: body,
352353
allocator: channel.allocator)
353-
try? await send(pkt)
354+
send(pkt)
354355
try await channel.close().get()
355356
}
356357

@@ -363,7 +364,7 @@ public final class MySQLConnection: SQLDatabase, @unchecked Sendable {
363364
sequenceID = 0
364365
let pkt = ByteBuffer.mysqlPacket(sequenceID: sequenceID, body: body,
365366
allocator: channel.allocator)
366-
try await send(pkt)
367+
send(pkt)
367368
}
368369

369370
// MARK: - Result set reading
@@ -539,13 +540,15 @@ public final class MySQLConnection: SQLDatabase, @unchecked Sendable {
539540
return ResultSetChunk(rows: rows, hasMore: false)
540541
}
541542

542-
private func send(_ buffer: ByteBuffer) async throws {
543-
try await channel.writeAndFlush(buffer).get()
543+
private func send(_ buffer: ByteBuffer) {
544+
channel.writeAndFlush(buffer, promise: nil)
544545
}
545546

546547
private func receivePacket() async throws -> ByteBuffer {
547-
guard let b = bridge else { throw SQLError.connectionClosed }
548-
return try await b.waitForMessage(on: channel.eventLoop)
548+
guard let reader = msgReader, let buf = try await reader.next() else {
549+
throw SQLError.connectionClosed
550+
}
551+
return buf
549552
}
550553

551554
// MARK: - Bind substitution

Sources/CosmoPostgres/PostgresConnection.swift

Lines changed: 45 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
5656

5757
// MARK: - State
5858

59-
private let channel: any Channel
60-
let config: Configuration // internal — used by backup extension
61-
private let logger: Logger
62-
private var isClosed: Bool = false
63-
private var bridge: AsyncChannelBridge?
59+
private let channel: any Channel
60+
let config: Configuration // internal — used by backup extension
61+
private let logger: Logger
62+
private var isClosed: Bool = false
63+
private var msgReader: MessageReader? // AsyncThrowingStream-based; no eventLoop hop per read
6464
private var inTransaction: Bool = false
6565

6666
// Called for each NOTICE message received from the server.
@@ -93,28 +93,31 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
9393
// MARK: - Handshake
9494

9595
private func handshake(sslContext: NIOSSLContext? = nil) async throws {
96-
let b = AsyncChannelBridge()
97-
bridge = b
96+
let bridge = AsyncStreamBridge()
9897

9998
if config.tls != .disable {
100-
// Postgres SSL negotiation uses a raw single-byte response ('S'/'N') before
101-
// the normal framing kicks in. Use a temporary raw bridge to read that byte,
102-
// then swap in the proper framing handler.
103-
let rawBridge = AsyncChannelBridge()
104-
try await channel.pipeline.addHandler(rawBridge).get()
99+
// Postgres SSL negotiation: the server sends a single raw byte ('S'/'N')
100+
// before normal message framing begins. Install the bridge raw (no framing)
101+
// to capture that byte, then insert the framing handler before it.
102+
let bridgeBox = _UnsafeSendable(bridge)
103+
try await channel.eventLoop.submit {
104+
try self.channel.pipeline.syncOperations.addHandler(bridgeBox.value)
105+
}.get()
106+
msgReader = MessageReader(bridge)
105107

106108
let sslReq = PGFrontend.sslRequest(allocator: channel.allocator)
107-
try await send(sslReq)
109+
send(sslReq)
108110

109-
var sslResponse = try await rawBridge.waitForMessage(on: channel.eventLoop)
111+
guard let reader = msgReader, var sslResponse = try await reader.next() else {
112+
throw SQLError.connectionClosed
113+
}
110114
let sslByte = sslResponse.readInteger(as: UInt8.self) ?? UInt8(ascii: "N")
111115

112-
try await channel.pipeline.removeHandler(rawBridge).get()
113-
114-
// Install framing + bridge
116+
// Insert framing handler BEFORE the already-installed bridge
115117
let frameBox = _UnsafeSendable(ByteToMessageHandler(PGFramingHandler()))
116118
try await channel.eventLoop.submit {
117-
try self.channel.pipeline.syncOperations.addHandlers([frameBox.value, b])
119+
try self.channel.pipeline.syncOperations.addHandler(frameBox.value,
120+
position: .before(bridgeBox.value))
118121
}.get()
119122

120123
if sslByte == UInt8(ascii: "S") {
@@ -124,19 +127,21 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
124127
throw SQLError.tlsError("Server does not support TLS")
125128
}
126129
} else {
127-
// No TLSinstall framing + bridge directly, skipping the rawBridge cycle.
128-
// Swift 6: ByteToMessageHandler has Sendable marked unavailable (event-loop-bound).
129-
let frameBox = _UnsafeSendable(ByteToMessageHandler(PGFramingHandler()))
130+
// No TLS: install framing + bridge in one submit, no raw-byte probe needed.
131+
let bridgeBox = _UnsafeSendable(bridge)
132+
let frameBox = _UnsafeSendable(ByteToMessageHandler(PGFramingHandler()))
130133
try await channel.eventLoop.submit {
131-
try self.channel.pipeline.syncOperations.addHandlers([frameBox.value, b])
134+
try self.channel.pipeline.syncOperations.addHandlers([frameBox.value, bridgeBox.value])
132135
}.get()
136+
msgReader = MessageReader(bridge)
133137
}
134138

135-
// Startup + authentication
139+
// Startup + authentication — all reads now go through AsyncThrowingStream,
140+
// avoiding the eventLoop.execute hop that AsyncChannelBridge required.
136141
let startup = PGFrontend.startup(user: config.username,
137142
database: config.database,
138143
allocator: channel.allocator)
139-
try await send(startup)
144+
send(startup)
140145
try await authenticate()
141146
logger.debug("PostgreSQL connected as \(config.username)")
142147
}
@@ -172,13 +177,13 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
172177
case .authRequestClearText:
173178
let reply = PGFrontend.passwordMessage(config.password,
174179
allocator: channel.allocator)
175-
try await send(reply)
180+
send(reply)
176181
case .authRequestMD5(let salt):
177182
let hashed = pgMD5Password(user: config.username,
178183
password: config.password,
179184
salt: salt)
180185
let reply = PGFrontend.passwordMessage(hashed, allocator: channel.allocator)
181-
try await send(reply)
186+
send(reply)
182187
case .authRequestSASL(let mechanisms):
183188
guard mechanisms.contains("SCRAM-SHA-256") else {
184189
throw SQLError.unsupported("No supported SASL mechanism (got: \(mechanisms.joined(separator: ", ")))")
@@ -205,7 +210,7 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
205210
mechanism: "SCRAM-SHA-256",
206211
clientFirstMessage: payload,
207212
allocator: channel.allocator)
208-
try await send(initMsg)
213+
send(initMsg)
209214

210215
// 2. Receive AuthSASLContinue
211216
var serverFirstMessage = ""
@@ -224,7 +229,7 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
224229

225230
// 3. Send SASLResponse (client-final-message)
226231
let finalMsg = PGFrontend.saslResponse(clientFinal, allocator: channel.allocator)
227-
try await send(finalMsg)
232+
send(finalMsg)
228233
break loop
229234
case .error(_, _, let message):
230235
throw SQLError.authenticationFailed(message)
@@ -274,7 +279,7 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
274279
logger.debug("PostgreSQL query: \(rendered)")
275280

276281
let msg = PGFrontend.query(rendered, allocator: channel.allocator)
277-
try await send(msg)
282+
send(msg)
278283
return try await collectResults()
279284
}
280285

@@ -284,7 +289,7 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
284289
logger.debug("PostgreSQL execute: \(rendered)")
285290

286291
let msg = PGFrontend.query(rendered, allocator: channel.allocator)
287-
try await send(msg)
292+
send(msg)
288293
var rowsAffected = 0
289294
var pendingError: (any Error)?
290295
// Drain until ReadyForQuery so the connection stays clean after errors
@@ -317,7 +322,7 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
317322
logger.debug("PostgreSQL queryMulti: \(rendered)")
318323

319324
let msg = PGFrontend.query(rendered, allocator: channel.allocator)
320-
try await send(msg)
325+
send(msg)
321326

322327
var allSets: [[SQLRow]] = []
323328
var current: [SQLRow] = []
@@ -408,7 +413,7 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
408413
guard !isClosed else { return }
409414
isClosed = true
410415
let terminate = PGFrontend.terminate(allocator: channel.allocator)
411-
try? await send(terminate)
416+
send(terminate)
412417
try await channel.close().get()
413418
}
414419

@@ -455,13 +460,17 @@ public final class PostgresConnection: SQLDatabase, @unchecked Sendable {
455460

456461
// MARK: - Wire I/O
457462

458-
private func send(_ buffer: ByteBuffer) async throws {
459-
try await channel.writeAndFlush(buffer).get()
463+
// Fire-and-forget: enqueues the write on the event loop without awaiting completion.
464+
// Safe for request-response protocols — the server can't reply until it receives the
465+
// data, so the read will naturally follow the write.
466+
private func send(_ buffer: ByteBuffer) {
467+
channel.writeAndFlush(buffer, promise: nil)
460468
}
461469

462470
private func receiveMessage() async throws -> PGBackendMessage {
463-
guard let b = bridge else { throw SQLError.connectionClosed }
464-
var buf = try await b.waitForMessage(on: channel.eventLoop)
471+
guard let reader = msgReader, var buf = try await reader.next() else {
472+
throw SQLError.connectionClosed
473+
}
465474
return try PGMessageDecoder.decode(buffer: &buf)
466475
}
467476

0 commit comments

Comments
 (0)