diff --git a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala index ddb57ee22a..6f1a0d6288 100644 --- a/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala +++ b/akka-http-core/src/main/scala/akka/http/engine/server/HttpServer.scala @@ -67,6 +67,14 @@ private[http] object HttpServer { HttpRequest(effectiveMethod, effectiveUri, headers, createEntity(entityParts), protocol) } + // we need to make sure that only one element per incoming request is queueing up in front of + // the bypassMerge.bypassInput. Otherwise the rising backpressure against the bypassFanout + // would eventually prevent us from reading the remaining request chunks from the transportIn + val bypass = Flow[RequestOutput].filter { + case (_: RequestStart | _: MessageStartError) ⇒ true + case _ ⇒ false + } + val rendererPipeline = Flow[ResponseRenderingContext] .section(name("recover"))(_.transform(() ⇒ new ErrorsTo500ResponseRecovery(log))) // FIXME: simplify after #16394 is closed @@ -82,7 +90,7 @@ private[http] object HttpServer { Flow() { implicit b ⇒ //FIXME: the graph is unnecessary after fixing #15957 transportIn ~> requestParsing ~> bypassFanout ~> requestPreparation ~> serverFlow ~> bypassMerge.applicationInput ~> rendererPipeline ~> transportOut - bypassFanout ~> bypassMerge.bypassInput + bypassFanout ~> bypass ~> bypassMerge.bypassInput oneHundredContinueSource ~> bypassMerge.oneHundredContinueInput b.allowCycles() @@ -107,7 +115,7 @@ private[http] object HttpServer { override val initialState = State[Any](Read(bypassInput)) { case (ctx, _, requestStart: RequestStart) ⇒ waitingForApplicationResponse(requestStart) case (ctx, _, MessageStartError(status, info)) ⇒ finishWithError(ctx, "request", status, info) - case _ ⇒ SameState // drop other parser output + case _ ⇒ throw new IllegalStateException } def waitingForApplicationResponse(requestStart: RequestStart): State[Any] =