diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala index 53bc94033e..24a1a2f598 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/server/HttpServerBluePrint.scala @@ -518,17 +518,27 @@ private[http] object HttpServerBluePrint { def with100ContinueTrigger[T <: ParserOutput](createEntity: EntityCreator[T, RequestEntity]) = StreamedEntityCreator { createEntity.compose[Source[T, NotUsed]] { - _.via(Flow[T].transform(() ⇒ new PushPullStage[T, T] { - private var oneHundredContinueSent = false - def onPush(elem: T, ctx: Context[T]) = ctx.push(elem) - def onPull(ctx: Context[T]) = { - if (!oneHundredContinueSent) { - oneHundredContinueSent = true - emit100ContinueResponse.invoke(()) + _.via(new GraphStage[FlowShape[T, T]] { + val in: Inlet[T] = Inlet("GraphStage.in") + val out: Outlet[T] = Outlet("GraphStage.out") + override val shape: FlowShape[T, T] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private var oneHundredContinueSent = false + + override def onPush(): Unit = push(out, grab(in)) + override def onPull(): Unit = { + if (!oneHundredContinueSent) { + oneHundredContinueSent = true + emit100ContinueResponse.invoke(()) + } + pull(in) + } + + setHandlers(in, out, this) } - ctx.pull() - } - }).named("expect100continueTrigger")) + }.named("expect100continueTrigger")) } } }