diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index c06cf1aa38..00f48d190f 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -83,20 +83,12 @@ private[http] object HttpServerBluePrint { def controller(settings: ServerSettings, log: LoggingAdapter): BidiFlow[HttpResponse, ResponseRenderingContext, RequestOutput, RequestOutput, Unit] = BidiFlow.fromGraph(new ControllerStage(settings, log)).reversed - def requestPreparation(settings: ServerSettings)(implicit mat: Materializer): BidiFlow[HttpResponse, HttpResponse, RequestOutput, HttpRequest, Unit] = { - val prepareRequest = + def requestPreparation(settings: ServerSettings)(implicit mat: Materializer): BidiFlow[HttpResponse, HttpResponse, RequestOutput, HttpRequest, Unit] = + BidiFlow.fromFlows(Flow[HttpResponse], Flow[RequestOutput] .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) .via(headAndTailFlow) - .via(requestStartOrRunIgnore(settings)) - // FIXME #16583 / #18170 - // `buffer` is needed because of current behavior of collect which will queue completion - // behind an ignored (= not collected) element if there is no demand. - // `buffer` will ensure demand and therefore make sure that completion is reported eagerly. - .buffer(1, OverflowStrategy.backpressure) - - BidiFlow.fromFlows(Flow[HttpResponse], prepareRequest) - } + .via(requestStartOrRunIgnore(settings))) def requestStartOrRunIgnore(settings: ServerSettings)(implicit mat: Materializer): Flow[(ParserOutput.RequestOutput, Source[ParserOutput.RequestOutput, Unit]), HttpRequest, Unit] = Flow.fromGraph(new FlowStage[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest, Unit]("RequestStartThenRunIgnore") {