=htc #19240 HttpServerBluePrint: replace splitWhen/prefixAndTail with simple GraphStage

This commit is contained in:
Johannes Rudolph 2016-01-05 12:13:12 +01:00
parent b77a511bba
commit ced5aa7ddc
2 changed files with 46 additions and 36 deletions

View file

@ -73,46 +73,56 @@ private[http] object HttpServerBluePrint {
BidiFlow.fromGraph(new ControllerStage(settings, log)).reversed
def requestPreparation(settings: ServerSettings): BidiFlow[HttpResponse, HttpResponse, RequestOutput, HttpRequest, Unit] =
BidiFlow.fromFlows(Flow[HttpResponse],
Flow[RequestOutput]
.splitWhen(x x.isInstanceOf[MessageStart] || x == MessageEnd)
.prefixAndTail(1)
.map(p p._1.head -> p._2)
.concatSubstreams
.via(requestStartOrRunIgnore(settings)))
BidiFlow.fromFlows(Flow[HttpResponse], new PrepareRequests(settings))
def requestStartOrRunIgnore(settings: ServerSettings): Flow[(ParserOutput.RequestOutput, Source[ParserOutput.RequestOutput, Unit]), HttpRequest, Unit] =
Flow.fromGraph(new GraphStage[FlowShape[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest]] {
val in = Inlet[(RequestOutput, Source[RequestOutput, Unit])]("RequestStartThenRunIgnore.in")
val out = Outlet[HttpRequest]("RequestStartThenRunIgnore.out")
override val shape: FlowShape[(RequestOutput, Source[RequestOutput, Unit]), HttpRequest] = FlowShape.of(in, out)
final class PrepareRequests(settings: ServerSettings) extends GraphStage[FlowShape[RequestOutput, HttpRequest]] {
val in = Inlet[RequestOutput]("RequestStartThenRunIgnore.in")
val out = Outlet[HttpRequest]("RequestStartThenRunIgnore.out")
override val shape: FlowShape[RequestOutput, HttpRequest] = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address)
override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) {
val remoteAddress = inheritedAttributes.get[HttpAttributes.RemoteAddress].flatMap(_.address)
setHandler(in, new InHandler {
override def onPush(): Unit = grab(in) match {
case (RequestStart(method, uri, protocol, hdrs, createEntity, _, _), entityParts)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
val effectiveHeaders =
if (settings.remoteAddressHeader && remoteAddress.isDefined)
headers.`Remote-Address`(RemoteAddress(remoteAddress.get)) +: hdrs
else hdrs
val idle = new InHandler {
def onPush(): Unit = grab(in) match {
case RequestStart(method, uri, protocol, hdrs, entityCreator, _, _)
val effectiveMethod = if (method == HttpMethods.HEAD && settings.transparentHeadRequests) HttpMethods.GET else method
val effectiveHeaders =
if (settings.remoteAddressHeader && remoteAddress.isDefined)
headers.`Remote-Address`(RemoteAddress(remoteAddress.get)) +: hdrs
else hdrs
val entity = createEntity(entityParts) withSizeLimit settings.parserSettings.maxContentLength
push(out, HttpRequest(effectiveMethod, uri, effectiveHeaders, entity, protocol))
case (wat, src)
SubSource.kill(src)
pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
val entity = createEntity(entityCreator) withSizeLimit settings.parserSettings.maxContentLength
push(out, HttpRequest(effectiveMethod, uri, effectiveHeaders, entity, protocol))
}
}
})
setHandler(in, idle)
def createEntity(creator: EntityCreator[RequestOutput, RequestEntity]): RequestEntity =
creator match {
case StrictEntityCreator(entity) entity
case StreamedEntityCreator(creator)
val entitySource = new SubSourceOutlet[RequestOutput]("EntitySource")
entitySource.setHandler(new OutHandler {
def onPull(): Unit = pull(in)
})
setHandler(in, new InHandler {
def onPush(): Unit = grab(in) match {
case MessageEnd
entitySource.complete()
setHandler(in, idle)
case x entitySource.push(x)
}
override def onUpstreamFinish(): Unit = completeStage()
})
creator(Source.fromGraph(entitySource.source))
}
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
def parsing(settings: ServerSettings, log: LoggingAdapter): Flow[ByteString, RequestOutput, Unit] = {
import settings._

View file

@ -393,7 +393,7 @@ class ClientServerSpec extends WordSpec with Matchers with BeforeAndAfterAll wit
private val HttpRequest(POST, uri, List(Accept(Seq(MediaRanges.`*/*`)), Host(_, _), `User-Agent`(_)),
Chunked(`chunkedContentType`, chunkStream), HttpProtocols.`HTTP/1.1`) = serverIn.expectNext()
uri shouldEqual Uri(s"http://$hostname:$port/chunked")
Await.result(chunkStream.grouped(4).runWith(Sink.head), 100.millis) shouldEqual chunks
Await.result(chunkStream.grouped(5).runWith(Sink.head), 100.millis) shouldEqual chunks
val serverOutSub = serverOut.expectSubscription()
serverOutSub.expectRequest()