From cc2e0e8d32a5c05d2e10636bede908d0bed12bcc Mon Sep 17 00:00:00 2001 From: Hawstein Date: Thu, 19 May 2016 03:40:02 +0800 Subject: [PATCH] replace PushPullStage with GraphStage in HttpServerBluePrint --- .../engine/server/HttpServerBluePrint.scala | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 53bc94033e..24a1a2f598 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -518,17 +518,27 @@ private[http] object HttpServerBluePrint { def with100ContinueTrigger[T <: ParserOutput](createEntity: EntityCreator[T, RequestEntity]) = StreamedEntityCreator { createEntity.compose[Source[T, NotUsed]] { - _.via(Flow[T].transform(() ⇒ new PushPullStage[T, T] { - private var oneHundredContinueSent = false - def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) - def onPull(ctx: Context[T]) = { - if (!oneHundredContinueSent) { - oneHundredContinueSent = true - emit100ContinueResponse.invoke(()) + _.via(new GraphStage[FlowShape[T, T]] { + val in: Inlet[T] = Inlet("GraphStage.in") + val out: Outlet[T] = Outlet("GraphStage.out") + override val shape: FlowShape[T, T] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private var oneHundredContinueSent = false + + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = { + if (!oneHundredContinueSent) { + oneHundredContinueSent = true + emit100ContinueResponse.invoke(()) + } + pull(in) + } + + setHandlers(in, out, this) } - ctx.pull() - } - }).named("expect100continueTrigger")) + }.named("expect100continueTrigger")) } } }