replace PushPullStage with GraphStage in HttpServerBluePrint

This commit is contained in:
Hawstein 2016-05-19 03:40:02 +08:00
parent e2c1638171
commit cc2e0e8d32

View file

@ -518,17 +518,27 @@ private[http] object HttpServerBluePrint {
def with100ContinueTrigger[T <: ParserOutput](createEntity: EntityCreator[T, RequestEntity]) =
StreamedEntityCreator {
createEntity.compose[Source[T, NotUsed]] {
_.via(Flow[T].transform(() new PushPullStage[T, T] {
private var oneHundredContinueSent = false
def onPush(elem: T, ctx: Context[T]) = ctx.push(elem)
def onPull(ctx: Context[T]) = {
if (!oneHundredContinueSent) {
oneHundredContinueSent = true
emit100ContinueResponse.invoke(())
_.via(new GraphStage[FlowShape[T, T]] {
val in: Inlet[T] = Inlet("GraphStage.in")
val out: Outlet[T] = Outlet("GraphStage.out")
override val shape: FlowShape[T, T] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private var oneHundredContinueSent = false
override def onPush(): Unit = push(out, grab(in))
override def onPull(): Unit = {
if (!oneHundredContinueSent) {
oneHundredContinueSent = true
emit100ContinueResponse.invoke(())
}
pull(in)
}
setHandlers(in, out, this)
}
ctx.pull()
}
}).named("expect100continueTrigger"))
}.named("expect100continueTrigger"))
}
}
}