From ced5aa7ddcac1408e08ed159da9a04fecfad18ed Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Tue, 5 Jan 2016 12:13:12 +0100 Subject: [PATCH] =htc #19240 HttpServerBluePrint: replace splitWhen/prefixAndTail with simple GraphStage --- .../engine/server/HttpServerBluePrint.scala | 80 +++++++++++-------- .../akka/http/scaladsl/ClientServerSpec.scala | 2 +- 2 files changed, 46 insertions(+), 36 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 2c54a09db4..c4c8fb1bae 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -73,46 +73,56 @@ private[http] object HttpServerBluePrint { BidiFlow.fromGraph(new ControllerStage(settings, log)).reversed def requestPreparation(settings: ServerSettings): BidiFlow[HttpResponse, HttpResponse, RequestOutput, HttpRequest, Unit] = - BidiFlow.fromFlows(Flow[HttpResponse], - Flow[RequestOutput] - .splitWhen(x ⇒ x.isInstanceOf[MessageStart] || x == MessageEnd) - .prefixAndTail(1) - .map(p ⇒ p._1.head -> p._2) - .concatSubstreams - .via(requestStartOrRunIgnore(settings))) + BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings)) - def requestStartOrRunIgnore(settings: ServerSettings): Flow[(ParserOutput.RequestOutput, Source[ParserOutput.RequestOutput, Unit]), HttpRequest, Unit] = - Flow.fromGraph(new GraphStage[FlowShape[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest]] { - val in = Inlet[(RequestOutput, Source[RequestOutput, Unit])]("RequestStartThenRunIgnore.in") - val out = Outlet[HttpRequest]("RequestStartThenRunIgnore.out") - override val shape: FlowShape[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest] = FlowShape.of(in, out) + final class PrepareRequests(settings: ServerSettings) extends GraphStage[FlowShape[RequestOutput, HttpRequest]] { + val in = Inlet[RequestOutput]("RequestStartThenRunIgnore.in") + val out = Outlet[HttpRequest]("RequestStartThenRunIgnore.out") + override val shape: FlowShape[RequestOutput, HttpRequest] = FlowShape.of(in, out) - override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { - val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address) + override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { + val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address) - setHandler(in, new InHandler { - override def onPush(): Unit = grab(in) match { - case (RequestStart(method, uri, protocol, hdrs, createEntity, _, _), entityParts) ⇒ - val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method - val effectiveHeaders = - if (settings.remoteAddressHeader && remoteAddress.isDefined) - headers.`Remote-Address`(RemoteAddress(remoteAddress.get)) +: hdrs - else hdrs + val idle = new InHandler { + def onPush(): Unit = grab(in) match { + case RequestStart(method, uri, protocol, hdrs, entityCreator, _, _) ⇒ + val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method + val effectiveHeaders = + if (settings.remoteAddressHeader && remoteAddress.isDefined) + headers.`Remote-Address`(RemoteAddress(remoteAddress.get)) +: hdrs + else hdrs - val entity = createEntity(entityParts) withSizeLimit settings.parserSettings.maxContentLength - push(out, HttpRequest(effectiveMethod, uri, effectiveHeaders, entity, protocol)) - - case (wat, src) ⇒ - SubSource.kill(src) - pull(in) - } - }) - - setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) - }) + val entity = createEntity(entityCreator) withSizeLimit settings.parserSettings.maxContentLength + push(out, HttpRequest(effectiveMethod, uri, effectiveHeaders, entity, protocol)) + } } - }) + setHandler(in, idle) + + def createEntity(creator: EntityCreator[RequestOutput, RequestEntity]): RequestEntity = + creator match { + case StrictEntityCreator(entity) ⇒ entity + case StreamedEntityCreator(creator) ⇒ + val entitySource = new SubSourceOutlet[RequestOutput]("EntitySource") + entitySource.setHandler(new OutHandler { + def onPull(): Unit = pull(in) + }) + setHandler(in, new InHandler { + def onPush(): Unit = grab(in) match { + case MessageEnd ⇒ + entitySource.complete() + setHandler(in, idle) + case x ⇒ entitySource.push(x) + } + override def onUpstreamFinish(): Unit = completeStage() + }) + creator(Source.fromGraph(entitySource.source)) + } + + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + }) + } + } def parsing(settings: ServerSettings, log: LoggingAdapter): Flow[ByteString, RequestOutput, Unit] = { import settings._ diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala index f79657359a..3d45b3ac21 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/ClientServerSpec.scala @@ -393,7 +393,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit private val HttpRequest(POST, uri, List(Accept(Seq(MediaRanges.`*/*`)), Host(_, _), `User-Agent`(_)), Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext() uri shouldEqual Uri(s"http://$hostname:$port/chunked") - Await.result(chunkStream.grouped(4).runWith(Sink.head), 100.millis) shouldEqual chunks + Await.result(chunkStream.grouped(5).runWith(Sink.head), 100.millis) shouldEqual chunks val serverOutSub = serverOut.expectSubscription() serverOutSub.expectRequest()