Skip to content

Commit 0658298

Browse files
committed
Fix pipeline bootstrapping
1 parent 5ffc0fe commit 0658298

1 file changed

Lines changed: 135 additions & 119 deletions

File tree

Sources/HTTPServer/HTTPServer.swift

Lines changed: 135 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -156,115 +156,162 @@ public final class Server<RequestHandler: HTTPServerRequestHandler> {
156156
configuration: HTTPServerConfiguration,
157157
handler: RequestHandler
158158
) async throws {
159-
let serverChannel = try await Self.bind(bindTarget: configuration.bindTarget) {
160-
(channel) -> EventLoopFuture<
161-
EventLoopFuture<
162-
NIONegotiatedHTTPVersion<
163-
NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>,
164-
(
165-
Void,
166-
NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>>
167-
)
168-
>
169-
>
170-
> in
171-
channel.eventLoop.makeCompletedFuture {
172-
switch configuration.tlSConfiguration.backing {
173-
case .insecure:
174-
break
159+
switch configuration.tlSConfiguration.backing {
160+
case .insecure:
161+
try await Self.serveInsecureHTTP1_1(
162+
bindTarget: configuration.bindTarget,
163+
handler: handler,
164+
logger: logger
165+
)
175166

176-
case .certificateChainAndPrivateKey(let certificateChain, let privateKey):
177-
let certificateChain =
178-
try certificateChain
179-
.map {
180-
try NIOSSLCertificate(
181-
bytes: $0.serializeAsPEM().derBytes,
182-
format: .der
183-
)
184-
}
185-
.map { NIOSSLCertificateSource.certificate($0) }
186-
let privateKey = NIOSSLPrivateKeySource.privateKey(
187-
try NIOSSLPrivateKey(
188-
bytes: privateKey.serializeAsPEM().derBytes,
189-
format: .der
190-
)
191-
)
167+
case .certificateChainAndPrivateKey(let certificateChain, let privateKey):
168+
try await Self.serveSecureUpgrade(
169+
bindTarget: configuration.bindTarget,
170+
certificateChain: certificateChain,
171+
privateKey: privateKey,
172+
handler: handler,
173+
logger: logger
174+
)
175+
}
176+
}
192177

193-
try channel.pipeline.syncOperations
194-
.addHandler(
195-
NIOSSLServerHandler(
196-
context: .init(
197-
configuration:
198-
.makeServerConfiguration(
199-
certificateChain: certificateChain,
200-
privateKey: privateKey
201-
)
202-
)
203-
)
204-
)
205-
}
206-
}.flatMap {
207-
channel
208-
.configureAsyncHTTPServerPipeline { channel in
178+
private static func serveInsecureHTTP1_1(
179+
bindTarget: HTTPServerConfiguration.BindTarget,
180+
handler: RequestHandler,
181+
logger: Logger
182+
) async throws {
183+
switch bindTarget.backing {
184+
case .hostAndPort(let host, let port):
185+
let serverChannel = try await ServerBootstrap(group: .singletonMultiThreadedEventLoopGroup)
186+
.serverChannelOption(.socketOption(.so_reuseaddr), value: 1)
187+
.bind(host: host, port: port) { channel in
209188
channel.eventLoop.makeCompletedFuture {
189+
channel.pipeline.configureHTTPServerPipeline()
210190
try channel.pipeline.syncOperations.addHandler(HTTP1ToHTTPServerCodec(secure: false))
211-
212191
return try NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>(
213192
wrappingChannelSynchronously: channel,
214193
configuration: .init(isOutboundHalfClosureEnabled: true)
215194
)
216195
}
217-
} http2ConnectionInitializer: { channel in
218-
channel.eventLoop.makeSucceededVoidFuture()
219-
} http2StreamInitializer: { channel in
220-
channel.eventLoop.makeCompletedFuture {
221-
try channel.pipeline.syncOperations
222-
.addHandler(
223-
HTTP2FramePayloadToHTTPServerCodec()
224-
)
196+
}
225197

226-
return try NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>(
227-
wrappingChannelSynchronously: channel,
228-
configuration: .init(isOutboundHalfClosureEnabled: true)
198+
try await withThrowingDiscardingTaskGroup { group in
199+
try await serverChannel.executeThenClose { inbound in
200+
for try await http1Channel in inbound {
201+
group.addTask {
202+
await Self.handleRequestChannel(
203+
logger: logger,
204+
channel: http1Channel,
205+
handler: handler
229206
)
230207
}
231208
}
209+
}
232210
}
233211
}
212+
}
213+
214+
private static func serveSecureUpgrade(
215+
bindTarget: HTTPServerConfiguration.BindTarget,
216+
certificateChain: [Certificate],
217+
privateKey: Certificate.PrivateKey,
218+
handler: RequestHandler,
219+
logger: Logger
220+
) async throws {
221+
switch bindTarget.backing {
222+
case .hostAndPort(let host, let port):
223+
let serverChannel = try await ServerBootstrap(group: .singletonMultiThreadedEventLoopGroup)
224+
.serverChannelOption(.socketOption(.so_reuseaddr), value: 1)
225+
.bind(host: host, port: port) { channel in
226+
channel.eventLoop.makeCompletedFuture {
227+
let certificateChain = try certificateChain
228+
.map {
229+
try NIOSSLCertificate(
230+
bytes: $0.serializeAsPEM().derBytes,
231+
format: .der
232+
)
233+
}
234+
.map { NIOSSLCertificateSource.certificate($0) }
235+
let privateKey = NIOSSLPrivateKeySource.privateKey(
236+
try NIOSSLPrivateKey(
237+
bytes: privateKey.serializeAsPEM().derBytes,
238+
format: .der
239+
)
240+
)
234241

235-
try await withThrowingDiscardingTaskGroup { group in
236-
try await serverChannel.executeThenClose { inbound in
237-
for try await upgradeResult in inbound {
238-
group.addTask {
239-
do {
240-
try await withThrowingDiscardingTaskGroup { connectionGroup in
241-
switch try await upgradeResult.get() {
242-
case .http1_1(let http1Channel):
243-
connectionGroup.addTask {
244-
await Self.handleRequestChannel(
245-
logger: logger,
246-
channel: http1Channel,
247-
handler: handler
242+
try channel.pipeline.syncOperations
243+
.addHandler(
244+
NIOSSLServerHandler(
245+
context: .init(
246+
configuration: .makeServerConfiguration(
247+
certificateChain: certificateChain,
248+
privateKey: privateKey
248249
)
249-
}
250-
case .http2((_, let http2Multiplexer)):
251-
do {
252-
for try await http2StreamChannel in http2Multiplexer.inbound {
253-
connectionGroup.addTask {
254-
await Self.handleRequestChannel(
255-
logger: logger,
256-
channel: http2StreamChannel,
257-
handler: handler
258-
)
250+
)
251+
)
252+
)
253+
}.flatMap {
254+
channel.configureAsyncHTTPServerPipeline { channel in
255+
channel.eventLoop.makeCompletedFuture {
256+
try channel.pipeline.syncOperations.addHandler(HTTP1ToHTTPServerCodec(secure: true))
257+
258+
return try NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>(
259+
wrappingChannelSynchronously: channel,
260+
configuration: .init(isOutboundHalfClosureEnabled: true)
261+
)
262+
}
263+
} http2ConnectionInitializer: { channel in
264+
channel.eventLoop.makeSucceededVoidFuture()
265+
} http2StreamInitializer: { channel in
266+
channel.eventLoop.makeCompletedFuture {
267+
try channel.pipeline.syncOperations
268+
.addHandler(
269+
HTTP2FramePayloadToHTTPServerCodec()
270+
)
271+
272+
return try NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>(
273+
wrappingChannelSynchronously: channel,
274+
configuration: .init(isOutboundHalfClosureEnabled: true)
275+
)
276+
}
277+
}
278+
}
279+
}
280+
281+
try await withThrowingDiscardingTaskGroup { group in
282+
try await serverChannel.executeThenClose { inbound in
283+
for try await upgradeResult in inbound {
284+
group.addTask {
285+
do {
286+
try await withThrowingDiscardingTaskGroup { connectionGroup in
287+
switch try await upgradeResult.get() {
288+
case .http1_1(let http1Channel):
289+
connectionGroup.addTask {
290+
await Self.handleRequestChannel(
291+
logger: logger,
292+
channel: http1Channel,
293+
handler: handler
294+
)
295+
}
296+
case .http2((_, let http2Multiplexer)):
297+
do {
298+
for try await http2StreamChannel in http2Multiplexer.inbound {
299+
connectionGroup.addTask {
300+
await Self.handleRequestChannel(
301+
logger: logger,
302+
channel: http2StreamChannel,
303+
handler: handler
304+
)
305+
}
259306
}
307+
} catch {
308+
logger.debug("HTTP2 connection closed: \(error)")
260309
}
261-
} catch {
262-
logger.debug("HTTP2 connection closed: \(error)")
263310
}
264311
}
312+
} catch {
313+
logger.debug("Negotiating ALPN failed: \(error)")
265314
}
266-
} catch {
267-
logger.debug("Negotiating ALPN failed: \(error)")
268315
}
269316
}
270317
}
@@ -312,39 +359,8 @@ public final class Server<RequestHandler: HTTPServerRequestHandler> {
312359
// TODO: We need to send a response head here potentially
313360
}
314361
} catch {
315-
logger.debug("Error thrown while handling connection")
362+
logger.debug("Error thrown while handling connection: \(error)")
316363
// TODO: We need to send a response head here potentially
317364
}
318365
}
319-
320-
private static func bind(
321-
bindTarget: HTTPServerConfiguration.BindTarget,
322-
childChannelInitializer: @escaping @Sendable (any Channel) -> EventLoopFuture<
323-
EventLoopFuture<
324-
NIONegotiatedHTTPVersion<
325-
NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>,
326-
(Void, NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>>)
327-
>
328-
>
329-
>
330-
) async throws -> NIOAsyncChannel<
331-
EventLoopFuture<
332-
NIONegotiatedHTTPVersion<
333-
NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>,
334-
(Void, NIOHTTP2Handler.AsyncStreamMultiplexer<NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>>)
335-
>
336-
>, Never
337-
> {
338-
switch bindTarget.backing {
339-
case .hostAndPort(let host, let port):
340-
return try await ServerBootstrap(group: .singletonMultiThreadedEventLoopGroup)
341-
.serverChannelOption(.socketOption(.so_reuseaddr), value: 1)
342-
.bind(
343-
host: host,
344-
port: port,
345-
childChannelInitializer: childChannelInitializer
346-
)
347-
}
348-
349-
}
350366
}

0 commit comments

Comments
 (0)