Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
with:
linux_6_0_enabled: false
linux_6_1_enabled: false
linux_6_2_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_6_2_enabled: false
linux_6_3_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
linux_nightly_main_arguments_override: "--explicit-target-dependency-import-check error -Xswiftc -require-explicit-sendable"
Expand All @@ -31,3 +31,5 @@ jobs:
with:
linux_6_0_enabled: false
linux_6_1_enabled: false
linux_6_2_enabled: false
linux_6_3_enabled: true
11 changes: 5 additions & 6 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
name: Soundness
uses: swiftlang/github-workflows/.github/workflows/soundness.yml@0.0.7
with:
api_breakage_check_container_image: "swift:6.2-noble"
api_breakage_check_container_image: "swift:6.3-noble"
format_check_container_image: "swiftlang/swift:nightly-main-noble" # Needed due to https://github.com/swiftlang/swift-format/issues/1081
license_header_check_project_name: "Swift HTTP Server"

Expand All @@ -23,9 +23,7 @@ jobs:
linux_5_10_enabled: false
linux_6_0_enabled: false
linux_6_1_enabled: false
# linux_6_1_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_6_2_enabled: true
linux_6_2_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_6_2_enabled: false
linux_6_3_enabled: true
linux_6_3_arguments_override: "-Xswiftc -warnings-as-errors --explicit-target-dependency-import-check error"
linux_nightly_next_arguments_override: "--explicit-target-dependency-import-check error"
Expand All @@ -38,7 +36,7 @@ jobs:
linux_5_10_enabled: false
linux_6_0_enabled: false
linux_6_1_enabled: false
linux_6_2_enabled: true
linux_6_2_enabled: false
linux_6_3_enabled: true

static-sdk:
Expand All @@ -52,4 +50,5 @@ jobs:
linux_5_10_enabled: false
linux_6_0_enabled: false
linux_6_1_enabled: false
linux_6_2_enabled: true
linux_6_2_enabled: false
linux_6_3_enabled: true
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:6.2
// swift-tools-version:6.3
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift HTTP Server open source project
Expand Down
118 changes: 70 additions & 48 deletions Sources/NIOHTTPServer/HTTPRequestConcludingAsyncReader.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
/// The type of errors that can occur during reading operations.
public typealias ReadFailure = any Error

/// The HTTP trailer fields captured at the end of the request.
/// The shared reader state that holds the iterator and captures trailers.
fileprivate var state: ReaderState

struct RequestBodyStateMachine {
Expand All @@ -60,19 +60,16 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
}

private var state: State
private var readerState: ReaderState

/// The iterator that provides HTTP request parts from the underlying channel.
private var iterator: NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator

init(iterator: NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator) {
init(readerState: ReaderState) {
self.state = .readingBody(.noExcess)
self.iterator = iterator
self.readerState = readerState
}

enum ReadResult {
case readBody(ByteBuffer)
case readEnd(HTTPFields?)
case streamFinished
case requestFinished
}

mutating func read(limit: Int?) async throws -> ReadResult {
Expand All @@ -88,21 +85,47 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable

case .noExcess:
// There is no excess from previous reads. We obtain the next element from the stream.
let requestPart = try await self.iterator.next(isolation: #isolation)
// Take the iterator from ReaderState, read one part, and put it back.
// This ensures the iterator is always recoverable even if the reader
// is dropped without consuming .end.
guard var iterator = self.readerState.takeIterator() else {
throw RequestBodyReadError.requestEndedBeforeReceivingEnd
}

let requestPart: HTTPRequestPart?
do {
requestPart = try await iterator.next(isolation: #isolation)
} catch {
// Put the iterator back before propagating the error.
nonisolated(unsafe) let iter = iterator
self.readerState.putIterator(iter)
throw error
}

switch requestPart {
case .head:
nonisolated(unsafe) let iter = iterator
self.readerState.putIterator(iter)
fatalError("Unexpectedly received a request head.")

case .none:
throw RequestBodyReadError.streamEndedBeforeReceivingRequestEnd
// Stream ended without .end — don't put iterator back.
throw RequestBodyReadError.requestEndedBeforeReceivingEnd

case .body(let element):
nonisolated(unsafe) let iter = iterator
self.readerState.putIterator(iter)
bodyElement = element

case .end(let trailers):
self.state = .finished
return .readEnd(trailers)
nonisolated(unsafe) let iter = iterator
self.readerState.putIterator(iter)
self.readerState.wrapped.withLock { state in
state.finishedReading = true
state.trailers = trailers
}
return .requestFinished
}
}

Expand All @@ -119,21 +142,16 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
return .readBody(bodyElement)

case .finished:
return .streamFinished
return .requestFinished
}
}
}

var requestBodyStateMachine: RequestBodyStateMachine

/// Initializes a new request body reader with the given NIO async channel iterator.
///
/// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts.
fileprivate init(
iterator: consuming sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator,
readerState: ReaderState
) {
self.requestBodyStateMachine = .init(iterator: iterator)
/// Initializes a new request body reader.
fileprivate init(readerState: ReaderState) {
self.requestBodyStateMachine = .init(readerState: readerState)
self.state = readerState
}

Expand All @@ -159,14 +177,7 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
case .readBody(let readElement):
return try await body(Array(buffer: readElement).span)

case .readEnd(let trailers):
self.state.wrapped.withLock { state in
state.trailers = trailers
state.finishedReading = true
}
return try await body(.init())

case .streamFinished:
case .requestFinished:
return try await body(.init())
}
} catch {
Expand All @@ -176,15 +187,36 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
}

final class ReaderState: Sendable {
struct Wrapped {
struct Wrapped: ~Copyable {
var trailers: HTTPFields? = nil
var finishedReading: Bool = false

/// The iterator that provides HTTP request parts from the underlying channel.
/// Stored here between read cycles for HTTP/1.1 keep-alive recovery.
var iterator: NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator?
}

let wrapped: Mutex<Wrapped>

init() {
self.wrapped = .init(.init())
init(iterator: consuming sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator) {
self.wrapped = .init(.init(iterator: iterator))
}

func takeIterator() -> sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator? {
self.wrapped.withLock { state in
let iterator = state.iterator
state.iterator = nil
return iterator
}
}

func putIterator(
_ iterator: consuming sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator
) {
var disconnected = Disconnected(value: Optional(iterator))
self.wrapped.withLock { state in
state.iterator = disconnected.swap(newValue: nil)
}
}
}

Expand All @@ -197,18 +229,12 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
/// The type of errors that can occur during reading operations.
public typealias Failure = any Error

private var iterator: Disconnected<NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator?>

internal var state: ReaderState

/// Initializes a new HTTP request body and trailers reader with the given NIO async channel iterator.
/// Initializes a new HTTP request body and trailers reader.
///
/// - Parameter iterator: The NIO async channel inbound stream iterator to use for reading request parts.
init(
iterator: consuming sending NIOAsyncChannelInboundStream<HTTPRequestPart>.AsyncIterator,
readerState: ReaderState
) {
self.iterator = Disconnected(value: iterator)
/// - Parameter readerState: The shared reader state that holds the iterator and captures trailers.
init(readerState: ReaderState) {
self.state = readerState
}

Expand Down Expand Up @@ -240,14 +266,10 @@ public struct HTTPRequestConcludingAsyncReader: ConcludingAsyncReader, ~Copyable
public consuming func consumeAndConclude<Return, Failure: Error>(
body: nonisolated(nonsending) (consuming sending RequestBodyAsyncReader) async throws(Failure) -> Return
) async throws(Failure) -> (Return, HTTPFields?) {
if let iterator = self.iterator.take() {
let partsReader = RequestBodyAsyncReader(iterator: iterator, readerState: self.state)
let result = try await body(partsReader)
let trailers = self.state.wrapped.withLock { $0.trailers }
return (result, trailers)
} else {
fatalError("consumeAndConclude called more than once")
}
let partsReader = RequestBodyAsyncReader(readerState: self.state)
let result = try await body(partsReader)
let trailers = self.state.wrapped.withLock { $0.trailers }
return (result, trailers)
}
}

Expand Down
40 changes: 39 additions & 1 deletion Sources/NIOHTTPServer/NIOHTTPServer+HTTP1_1.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

import HTTPAPIs
import Logging
import NIOCore
import NIOExtras
import NIOHTTP1
Expand Down Expand Up @@ -45,7 +46,7 @@ extension NIOHTTPServer {
do {
for try await http1Channel in inbound {
group.addTask {
await self.handleRequestChannel(channel: http1Channel, handler: handler)
await self.handleHTTP1RequestChannel(channel: http1Channel, handler: handler)
}
}

Expand Down Expand Up @@ -105,4 +106,41 @@ extension NIOHTTPServer {
)
}
}

/// Handles an HTTP/1.1 connection channel, which may carry multiple serial requests on the
/// same connection (keep-alive).
func handleHTTP1RequestChannel(
channel: NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>,
handler: some HTTPServerRequestHandler<RequestConcludingReader, ResponseConcludingWriter>
) async {
do {
try await channel.executeThenClose { inbound, outbound in
var iterator = inbound.makeAsyncIterator()

requestLoop: while !Task.isCancelled {
guard let httpRequest = try await self.nextRequestHead(from: &iterator) else {
break requestLoop
}

guard
let recoveredIterator = try await self.invokeHandler(
request: httpRequest,
iterator: iterator,
outbound: outbound,
handler: handler
)
else {
// Handler did not fully consume the request; cannot continue on this
// connection.
break requestLoop
}

iterator = recoveredIterator
}
}
} catch {
self.logger.debug("Error thrown while handling HTTP/1.1 connection", metadata: ["error": "\(error)"])
try? await channel.channel.close()
}
}
}
43 changes: 41 additions & 2 deletions Sources/NIOHTTPServer/NIOHTTPServer+SecureUpgrade.swift
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ extension NIOHTTPServer {
let chainFuture = requestChannel.channel.nioSSL_peerValidatedCertificateChain()

await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) {
await self.handleRequestChannel(
await self.handleHTTP1RequestChannel(
channel: requestChannel,
handler: handler
)
Expand All @@ -137,7 +137,7 @@ extension NIOHTTPServer {
try await Self.$connectionContext.withValue(ConnectionContext(chainFuture)) {
for try await streamChannel in multiplexer.inbound {
streamGroup.addTask {
await self.handleRequestChannel(
await self.handleHTTP2StreamChannel(
channel: streamChannel,
handler: handler
)
Expand Down Expand Up @@ -297,6 +297,45 @@ extension NIOHTTPServer {
}
}
}

/// Handles an HTTP/2 stream channel, which carries exactly one request per stream.
func handleHTTP2StreamChannel(
channel: NIOAsyncChannel<HTTPRequestPart, HTTPResponsePart>,
handler: some HTTPServerRequestHandler<RequestConcludingReader, ResponseConcludingWriter>
) async {
do {
try await channel
.executeThenClose { inbound, outbound in
var iterator = inbound.makeAsyncIterator()

guard let httpRequest = try await self.nextRequestHead(from: &iterator) else {
outbound.finish()
return
}

_ = try await self.invokeHandler(
request: httpRequest,
iterator: iterator,
outbound: outbound,
handler: handler
)

// TODO: handle other state scenarios.
// For example, if we didn't finish reading but we wrote back a response, we
// should send a RST_STREAM with NO_ERROR set. If we finished reading but we
// didn't write back a response, then RST_STREAM is also likely appropriate but
// unclear about the error.

// Finish the outbound and wait on the close future to make sure all pending
// writes are actually written.
outbound.finish()
try await channel.channel.closeFuture.get()
}
} catch {
self.logger.debug("Error thrown while handling HTTP/2 stream: \(error)")
try? await channel.channel.close()
}
}
}

@available(macOS 26.2, iOS 26.2, watchOS 26.2, tvOS 26.2, visionOS 26.2, *)
Expand Down
Loading
Loading