diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 04f826c7c9..00c2f10540 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -51,14 +51,15 @@ private[http] object HttpServerBluePrint { stack(settings, remoteAddress, log) def stack(settings: ServerSettings, remoteAddress: Option[InetSocketAddress], log: LoggingAdapter)(implicit mat: Materializer): Http.ServerLayer = { - One2OneBidiFlow[HttpRequest, HttpResponse](settings.pipeliningLimit).reversed atop + userHandlerGuard(settings.pipeliningLimit) atop requestPreparation(settings, remoteAddress) atop controller(settings, log) atop parsingRendering(settings, log) atop websocketSupport(settings, log) atop unwrapTls } - def unwrapTls: BidiFlow[ByteString, SslTlsOutbound, SslTlsInbound, ByteString, Unit] = + + val unwrapTls: BidiFlow[ByteString, SslTlsOutbound, SslTlsInbound, ByteString, Unit] = BidiFlow.fromFlows(Flow[ByteString].map(SendBytes), Flow[SslTlsInbound].collect { case x: SessionBytes ⇒ x.bytes }) /** Wrap an HTTP implementation with support for switching to Websocket */ @@ -324,6 +325,14 @@ private[http] object HttpServerBluePrint { } } + /** + * Ensures that the user handler + * - produces exactly one response per request + * - has not more than ``pipeliningLimit`` responses outstanding + */ + def userHandlerGuard(pipeliningLimit: Int): BidiFlow[HttpResponse, HttpResponse, HttpRequest, HttpRequest, Unit] = + One2OneBidiFlow[HttpRequest, HttpResponse](pipeliningLimit).reversed + private trait WebsocketSetup { def websocketFlow: Flow[ByteString, ByteString, Any] def installHandler(handlerFlow: Flow[FrameEvent, FrameEvent, Any])(implicit mat: Materializer): Unit