diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala index 2cdb6a42c4..eaac91b913 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/Http2.scala @@ -90,11 +90,15 @@ private[http] final class Http2Ext(implicit val system: ActorSystem) else if (connectionContext.isSecure) settings.defaultHttpsPort else settings.defaultHttpPort + val handlerWithErrorHandling = withErrorHandling(log, handler) + val http1: HttpImplementation = - Flow[HttpRequest].mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler, settings, log)) + Flow[HttpRequest].mapAsync(settings.pipeliningLimit)(handleUpgradeRequests(handler, handlerWithErrorHandling, + settings, log)) .joinMat(GracefulTerminatorStage(system, settings).atop(http.serverLayer(settings, log = log)))(Keep.right) val http2: HttpImplementation = - Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)(system.dispatcher) + Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handlerWithErrorHandling)( + system.dispatcher) .joinMat(Http2Blueprint.serverStackTls(settings, log, telemetry, Http().dateHeaderRendering))(Keep.right) val masterTerminator = new MasterServerTerminator(log) @@ -138,6 +142,23 @@ private[http] final class Http2Ext(implicit val system: ActorSystem) }.to(Sink.ignore).run() } + private def withErrorHandling( + log: LoggingAdapter, + handler: HttpRequest => Future[HttpResponse]): HttpRequest => Future[HttpResponse] = { request => + try { + handler(request).recover { + case NonFatal(ex) => handleHandlerError(log, ex) + }(ExecutionContext.parasitic) + } catch { + case NonFatal(ex) => Future.successful(handleHandlerError(log, ex)) + } + } + + private def handleHandlerError(log: LoggingAdapter, ex: Throwable): HttpResponse = { + log.error(ex, "Internal server error, sending 500 response") + HttpResponse(StatusCodes.InternalServerError) + } + private def prepareServerAttributes(settings: ServerSettings, incoming: Tcp.IncomingConnection) = { val attrs = Http.prepareAttributes(settings, incoming) if (telemetry == NoOpTelemetry) attrs @@ -149,6 +170,7 @@ private[http] final class Http2Ext(implicit val system: ActorSystem) private def handleUpgradeRequests( handler: HttpRequest => Future[HttpResponse], + handlerWithErrorHandling: HttpRequest => Future[HttpResponse], settings: ServerSettings, log: LoggingAdapter): HttpRequest => Future[HttpResponse] = { req => req.header[Upgrade] match { @@ -172,8 +194,8 @@ private[http] final class Http2Ext(implicit val system: ActorSystem) Flow[HttpRequest] .watchTermination(Keep.right) .prepend(injectedRequest) - .via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)(handler)( - system.dispatcher)) + .via(Http2Blueprint.handleWithStreamIdHeader(settings.http2Settings.maxConcurrentStreams)( + handlerWithErrorHandling)(system.dispatcher)) // the settings from the header are injected into the blueprint as initial demuxer settings .joinMat(Http2Blueprint.serverStack(settings, log, settingsFromHeader, true, telemetry, Http().dateHeaderRendering))(Keep.left)) diff --git a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala index 8b3f148b7c..99e0c20be9 100644 --- a/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala +++ b/http-core/src/main/scala/org/apache/pekko/http/impl/engine/http2/RequestParsing.scala @@ -160,6 +160,9 @@ private[http2] object RequestParsing { case ":status" => protocolError("Pseudo-header ':status' is for responses only; it cannot appear in a request") + case name if name.startsWith(":") => + protocolError(s"Unexpected pseudo-header '$name' in request") + case "content-length" => if (contentLength == -1) { val contentLengthValue = ContentLength.get(value).toLong diff --git a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala index 96ddec9f66..6f718ec56a 100644 --- a/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala +++ b/http2-tests/src/test/scala/org/apache/pekko/http/impl/engine/http2/Http2ClientServerSpec.scala @@ -30,7 +30,6 @@ import pekko.http.scaladsl.model.{ HttpHeader, HttpMethod, HttpMethods, - HttpProtocols, HttpRequest, HttpResponse, RequestResponseAssociation, @@ -45,6 +44,8 @@ import pekko.http.scaladsl.unmarshalling.Unmarshal import pekko.stream.StreamTcpException import pekko.stream.scaladsl.{ Sink, Source } import pekko.stream.testkit.{ TestPublisher, TestSubscriber } +import pekko.stream.testkit.Utils.TE +import pekko.testkit.EventFilter import pekko.testkit.TestProbe import pekko.util.ByteString @@ -144,6 +145,31 @@ class Http2ClientServerSpec extends PekkoSpecWithMaterializer( val response = expectClientResponse() response.status should be(StatusCodes.BadRequest) } + + "return internal server error when handler future fails" in new TestSetup { + sendClientRequest() + val serverRequest = expectServerRequest() + + EventFilter[TE](message = "boom", occurrences = 1).intercept { + serverRequest.promise.failure(TE("boom")) + val response = expectClientResponse() + response.status should be(StatusCodes.InternalServerError) + } + + sendClientRequest() + val nextServerRequest = expectServerRequest() + nextServerRequest.sendResponse(HttpResponse()) + expectClientResponse().status should be(StatusCodes.OK) + } + + "return internal server error when handler throws synchronously" in new TestSetup { + override def handler: HttpRequest => Future[HttpResponse] = _ => throw TE("boom-sync") + + EventFilter[TE](message = "boom-sync", occurrences = 1).intercept { + sendClientRequest() + expectClientResponse().status should be(StatusCodes.InternalServerError) + } + } } case class ServerRequest(request: HttpRequest, promise: Promise[HttpResponse]) { @@ -169,7 +195,7 @@ class Http2ClientServerSpec extends PekkoSpecWithMaterializer( def serverSettings: ServerSettings = ServerSettings(system) def clientSettings: ClientConnectionSettings = ClientConnectionSettings(system) private lazy val serverRequestProbe = TestProbe() - private lazy val handler: HttpRequest => Future[HttpResponse] = { req => + def handler: HttpRequest => Future[HttpResponse] = { req => val p = Promise[HttpResponse]() serverRequestProbe.ref ! ServerRequest(req, p) p.future