@@ -165,27 +165,24 @@ internal actor AsyncSocketManager: SocketManager {
165165 return try await socket. receiveMessage ( length, fromAddressOf: addressType)
166166 }
167167
168+ nonisolated func listen( backlog: Int , for fileDescriptor: SocketDescriptor ) async throws {
169+ let socket = try await self . socket ( for: fileDescriptor)
170+ try await socket. listen ( backlog: backlog)
171+ }
172+
168173 /// Accept a connection on a socket.
169174 nonisolated func accept( for fileDescriptor: SocketDescriptor ) async throws -> SocketDescriptor {
170- let socket = try await socket ( for: fileDescriptor)
171- let result = try await retry ( sleep: state. configuration. monitorInterval) {
172- fileDescriptor. _accept ( retryOnInterrupt: true )
173- } . get ( )
174- socket. continuation. yield ( . connection)
175- return result
175+ let socket = try await wait ( for: . read, fileDescriptor: fileDescriptor)
176+ return try await socket. accept ( )
176177 }
177178
178179 /// Accept a connection on a socket.
179180 nonisolated func accept< Address: SocketAddress > (
180181 _ address: Address . Type ,
181182 for fileDescriptor: SocketDescriptor
182183 ) async throws -> ( fileDescriptor: SocketDescriptor , address: Address ) {
183- let socket = try await socket ( for: fileDescriptor)
184- let result = try await retry ( sleep: state. configuration. monitorInterval) {
185- fileDescriptor. _accept ( address, retryOnInterrupt: true )
186- } . get ( )
187- socket. continuation. yield ( . connection)
188- return result
184+ let socket = try await wait ( for: . read, fileDescriptor: fileDescriptor)
185+ return try await socket. accept ( address)
189186 }
190187
191188 /// Initiate a connection on a socket.
@@ -224,7 +221,7 @@ private extension AsyncSocketManager {
224221 var tasks = [ Task < Void , Never > ] ( )
225222 while self . state. isMonitoring {
226223 do {
227- tasks. reserveCapacity ( state. sockets. count * 2 )
224+ tasks. reserveCapacity ( state. sockets. count)
228225 // poll
229226 let hasEvents = try poll ( & tasks)
230227 // stop monitoring if no sockets
@@ -331,39 +328,28 @@ private extension AsyncSocketManager {
331328 }
332329
333330 func process( _ poll: SocketDescriptor . Poll , socket: AsyncSocketManager . SocketState , tasks: inout [ Task < Void , Never > ] ) {
334- /*
335- let isListening = self.sockets[poll.socket]?.isListening ?? false
336- if isListening, poll.returnedEvents.contains([.read, .write]) {
337- event([.read, .write], notification: .connection, for: poll.socket)
338- } else {
331+ let task = Task {
339332 if poll. returnedEvents. contains ( . read) {
340- event(.read, notification: .read, for: poll.socket)
333+ if await socket. isListening {
334+ await socket. event ( . read, notification: . connection)
335+ } else {
336+ await socket. event ( . read, notification: . read)
337+ }
341338 }
342339 if poll. returnedEvents. contains ( . write) {
343- event(.write, notification: .write, for: poll.socket )
340+ await socket . event ( . write, notification: . write)
344341 }
345- }*/
346- if poll. returnedEvents. contains ( . read) {
347- let task = Task ( priority: state. configuration. monitorPriority) {
348- await socket. event ( . read, notification: . read)
342+ if poll. returnedEvents. contains ( . invalidRequest) {
343+ error ( . badFileDescriptor, for: poll. socket)
349344 }
350- tasks . append ( task )
351- }
352- if poll . returnedEvents . contains ( . write ) {
353- let task = Task ( priority : state . configuration . monitorPriority ) {
354- await socket . event ( . write , notification : . write )
345+ if poll . returnedEvents . contains ( . error ) {
346+ error ( . connectionReset , for : poll . socket )
347+ }
348+ if poll . returnedEvents . contains ( . hangup ) {
349+ hangup ( poll . socket )
355350 }
356- tasks. append ( task)
357- }
358- if poll. returnedEvents. contains ( . invalidRequest) {
359- error ( . badFileDescriptor, for: poll. socket)
360- }
361- if poll. returnedEvents. contains ( . error) {
362- error ( . connectionReset, for: poll. socket)
363- }
364- if poll. returnedEvents. contains ( . hangup) {
365- hangup ( poll. socket)
366351 }
352+ tasks. append ( task)
367353 }
368354
369355 func error( _ error: Errno , for fileDescriptor: SocketDescriptor ) {
@@ -443,6 +429,19 @@ extension AsyncSocketManager.SocketState {
443429 didRead ( bytesRead)
444430 return ( data, address)
445431 }
432+
433+ func listen( backlog: Int ) throws {
434+ try fileDescriptor. listen ( backlog: backlog)
435+ isListening = true
436+ }
437+
438+ func accept( ) throws -> SocketDescriptor {
439+ try fileDescriptor. accept ( )
440+ }
441+
442+ func accept< Address: SocketAddress > ( _ address: Address . Type ) throws -> ( SocketDescriptor , Address ) {
443+ try fileDescriptor. accept ( address)
444+ }
446445}
447446
448447fileprivate extension AsyncSocketManager . SocketState {
0 commit comments