Skip to content

Commit 5b5a91c

Browse files
authored
fix: recover HTTP/2 handler errors as 500 (#1041)
Motivation: Port upstream akka-http fix 02bb9872e1d391f53e21b663336e5052085a0b8c, which is now Apache licensed. HTTP/2 user handler failures should be converted into 500 responses instead of leaking through the connection handling path. Modification: Wrap HTTP/2 handlers with NonFatal error recovery for failed futures and synchronous throws. Keep HTTP/1 fallback on the original handler so existing HTTP/1 error classification is preserved, and reject unknown HTTP/2 pseudo-headers during request parsing so protocol errors are not converted to 500 responses. Result: HTTP/2 handler failures now return InternalServerError while malformed pseudo-headers still produce HTTP/2 protocol errors. Added regression coverage for failed futures, synchronous throws, and verified h2spec pseudo-header behavior. References: Upstream: akka/akka-http@02bb987
1 parent 0b6bca0 commit 5b5a91c

3 files changed

Lines changed: 57 additions & 6 deletions

File tree

http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,11 +90,15 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
9090
else if (connectionContext.isSecure) settings.defaultHttpsPort
9191
else settings.defaultHttpPort
9292

93+
val handlerWithErrorHandling = withErrorHandling(log, handler)
94+
9395
val http1: HttpImplementation =
94-
Flow[HttpRequest].mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler, settings, log))
96+
Flow[HttpRequest].mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler, handlerWithErrorHandling,
97+
settings, log))
9598
.joinMat(GracefulTerminatorStage(system, settings).atop(http.serverLayer(settings, log = log)))(Keep.right)
9699
val http2: HttpImplementation =
97-
Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(system.dispatcher)
100+
Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handlerWithErrorHandling)(
101+
system.dispatcher)
98102
.joinMat(Http2Blueprint.serverStackTls(settings, log, telemetry, Http().dateHeaderRendering))(Keep.right)
99103

100104
val masterTerminator = new MasterServerTerminator(log)
@@ -138,6 +142,23 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
138142
}.to(Sink.ignore).run()
139143
}
140144

145+
private def withErrorHandling(
146+
log: LoggingAdapter,
147+
handler: HttpRequest => Future[HttpResponse]): HttpRequest => Future[HttpResponse] = { request =>
148+
try {
149+
handler(request).recover {
150+
case NonFatal(ex) => handleHandlerError(log, ex)
151+
}(ExecutionContext.parasitic)
152+
} catch {
153+
case NonFatal(ex) => Future.successful(handleHandlerError(log, ex))
154+
}
155+
}
156+
157+
private def handleHandlerError(log: LoggingAdapter, ex: Throwable): HttpResponse = {
158+
log.error(ex, "Internal server error, sending 500 response")
159+
HttpResponse(StatusCodes.InternalServerError)
160+
}
161+
141162
private def prepareServerAttributes(settings: ServerSettings, incoming: Tcp.IncomingConnection) = {
142163
val attrs = Http.prepareAttributes(settings, incoming)
143164
if (telemetry == NoOpTelemetry) attrs
@@ -149,6 +170,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
149170

150171
private def handleUpgradeRequests(
151172
handler: HttpRequest => Future[HttpResponse],
173+
handlerWithErrorHandling: HttpRequest => Future[HttpResponse],
152174
settings: ServerSettings,
153175
log: LoggingAdapter): HttpRequest => Future[HttpResponse] = { req =>
154176
req.header[Upgrade] match {
@@ -172,8 +194,8 @@ private[http] final class Http2Ext(implicit val system: ActorSystem)
172194
Flow[HttpRequest]
173195
.watchTermination(Keep.right)
174196
.prepend(injectedRequest)
175-
.via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(
176-
system.dispatcher))
197+
.via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(
198+
handlerWithErrorHandling)(system.dispatcher))
177199
// the settings from the header are injected into the blueprint as initial demuxer settings
178200
.joinMat(Http2Blueprint.serverStack(settings, log, settingsFromHeader, true, telemetry,
179201
Http().dateHeaderRendering))(Keep.left))

http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,9 @@ private[http2] object RequestParsing {
160160
case ":status" =>
161161
protocolError("Pseudo-header ':status' is for responses only; it cannot appear in a request")
162162

163+
case name if name.startsWith(":") =>
164+
protocolError(s"Unexpected pseudo-header '$name' in request")
165+
163166
case "content-length" =>
164167
if (contentLength == -1) {
165168
val contentLengthValue = ContentLength.get(value).toLong

http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import pekko.http.scaladsl.model.{
3030
HttpHeader,
3131
HttpMethod,
3232
HttpMethods,
33-
HttpProtocols,
3433
HttpRequest,
3534
HttpResponse,
3635
RequestResponseAssociation,
@@ -45,6 +44,8 @@ import pekko.http.scaladsl.unmarshalling.Unmarshal
4544
import pekko.stream.StreamTcpException
4645
import pekko.stream.scaladsl.{ Sink, Source }
4746
import pekko.stream.testkit.{ TestPublisher, TestSubscriber }
47+
import pekko.stream.testkit.Utils.TE
48+
import pekko.testkit.EventFilter
4849
import pekko.testkit.TestProbe
4950
import pekko.util.ByteString
5051

@@ -144,6 +145,31 @@ class Http2ClientServerSpec extends PekkoSpecWithMaterializer(
144145
val response = expectClientResponse()
145146
response.status should be(StatusCodes.BadRequest)
146147
}
148+
149+
"return internal server error when handler future fails" in new TestSetup {
150+
sendClientRequest()
151+
val serverRequest = expectServerRequest()
152+
153+
EventFilter[TE](message = "boom", occurrences = 1).intercept {
154+
serverRequest.promise.failure(TE("boom"))
155+
val response = expectClientResponse()
156+
response.status should be(StatusCodes.InternalServerError)
157+
}
158+
159+
sendClientRequest()
160+
val nextServerRequest = expectServerRequest()
161+
nextServerRequest.sendResponse(HttpResponse())
162+
expectClientResponse().status should be(StatusCodes.OK)
163+
}
164+
165+
"return internal server error when handler throws synchronously" in new TestSetup {
166+
override def handler: HttpRequest => Future[HttpResponse] = _ => throw TE("boom-sync")
167+
168+
EventFilter[TE](message = "boom-sync", occurrences = 1).intercept {
169+
sendClientRequest()
170+
expectClientResponse().status should be(StatusCodes.InternalServerError)
171+
}
172+
}
147173
}
148174

149175
case class ServerRequest(request: HttpRequest, promise: Promise[HttpResponse]) {
@@ -169,7 +195,7 @@ class Http2ClientServerSpec extends PekkoSpecWithMaterializer(
169195
def serverSettings: ServerSettings = ServerSettings(system)
170196
def clientSettings: ClientConnectionSettings = ClientConnectionSettings(system)
171197
private lazy val serverRequestProbe = TestProbe()
172-
private lazy val handler: HttpRequest => Future[HttpResponse] = { req =>
198+
def handler: HttpRequest => Future[HttpResponse] = { req =>
173199
val p = Promise[HttpResponse]()
174200
serverRequestProbe.ref ! ServerRequest(req, p)
175201
p.future

0 commit comments

Comments
 (0)