From 16ac832b14a4cb12d694080c1de5f7550685b6b0 Mon Sep 17 00:00:00 2001 From: zhxiaog Date: Thu, 14 Apr 2016 00:37:15 +0800 Subject: [PATCH] migrating Doubler, KeepGoing to GraphStage --- .../http/scaladsl/HttpServerExampleSpec.scala | 2 +- .../stream/impl/fusing/InterpreterSpec.scala | 98 ++++++++++++------- 2 files changed, 64 insertions(+), 36 deletions(-) diff --git a/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala b/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala index faaaa4f51d..9081c7d4c2 100644 --- a/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala +++ b/akka-docs/rst/scala/code/docs/http/scaladsl/HttpServerExampleSpec.scala @@ -132,7 +132,7 @@ class HttpServerExampleSpec extends WordSpec with Matchers "connection-stream-failure-handling" in compileOnlySpec { import akka.actor.ActorSystem import akka.http.scaladsl.Http - import akka.http.scaladsl.model.{ContentTypes, HttpEntity} + import akka.http.scaladsl.model.{ ContentTypes, HttpEntity } import akka.stream.ActorMaterializer implicit val system = ActorSystem() diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala index 637fd3516b..e88bd7c674 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSpec.scala @@ -8,7 +8,7 @@ import akka.stream.stage._ import akka.testkit.AkkaSpec import akka.testkit.EventFilter -import akka.stream.Supervision +import akka.stream._ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { import Supervision.stoppingDecider @@ -80,7 +80,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { } "implement one-to-many many-to-one chain correctly" in new OneBoundedSetup[Int]( - Doubler().toGS, + Doubler(), Filter((x: Int) ⇒ x != 0)) { lastEvents() should be(Set.empty) @@ -106,7 +106,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { "implement many-to-one one-to-many chain correctly" in new OneBoundedSetup[Int]( Filter((x: Int) ⇒ x != 0), - Doubler().toGS) { + Doubler()) { lastEvents() should be(Set.empty) @@ -412,7 +412,7 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { } "implement doubler-conflate (doubler-batch)" in new OneBoundedSetup[Int]( - Doubler().toGS, + Doubler(), Batch( 1L, ConstantFun.zeroLong, @@ -432,12 +432,12 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { } // Note, the new interpreter has no jumpback table, still did not want to remove the test - "work with jumpback table and completed elements" in new OneBoundedSetup[Int](Seq( - Map((x: Int) ⇒ x, stoppingDecider), - Map((x: Int) ⇒ x, stoppingDecider), + "work with jumpback table and completed elements" in new OneBoundedSetup[Int]( + Map((x: Int) ⇒ x, stoppingDecider).toGS, + Map((x: Int) ⇒ x, stoppingDecider).toGS, KeepGoing(), - Map((x: Int) ⇒ x, stoppingDecider), - Map((x: Int) ⇒ x, stoppingDecider))) { + Map((x: Int) ⇒ x, stoppingDecider).toGS, + Map((x: Int) ⇒ x, stoppingDecider).toGS) { lastEvents() should be(Set.empty) @@ -569,39 +569,67 @@ class InterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { } - private[akka] case class Doubler[T]() extends PushPullStage[T, T] { - var oneMore: Boolean = false - var lastElem: T = _ + private[akka] final case class Doubler[T]() extends GraphStage[FlowShape[T, T]] { + val out: Outlet[T] = Outlet("Doubler.out") + val in: Inlet[T] = Inlet("Doubler.in") - override def onPush(elem: T, ctx: Context[T]): SyncDirective = { - lastElem = elem - oneMore = true - ctx.push(elem) - } + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + var latest: T = _ + var oneMore = false - override def onPull(ctx: Context[T]): SyncDirective = { - if (oneMore) { - oneMore = false - ctx.push(lastElem) - } else ctx.pull() - } + override def onPush(): Unit = { + latest = grab(in) + oneMore = true + push(out, latest) + } + + /** + * Called when the output port has received a pull, and therefore ready to emit an element, i.e. [[GraphStageLogic.push()]] + * is now allowed to be called on this port. + */ + override def onPull(): Unit = { + if (oneMore) { + push(out, latest) + oneMore = false + } else { + pull(in) + } + } + + setHandlers(in, out, this) + } + + override val shape: FlowShape[T, T] = FlowShape(in, out) } - private[akka] case class KeepGoing[T]() extends PushPullStage[T, T] { - var lastElem: T = _ + private[akka] final case class KeepGoing[T]() extends GraphStage[FlowShape[T, T]] { + val in = Inlet[T]("KeepGoing.in") + val out = Outlet[T]("KeepGoing.out") - override def onPush(elem: T, ctx: Context[T]): SyncDirective = { - lastElem = elem - ctx.push(elem) - } + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + var lastElem: T = _ - override def onPull(ctx: Context[T]): SyncDirective = { - if (ctx.isFinishing) { - ctx.push(lastElem) - } else ctx.pull() - } + override def onPush(): Unit = { + lastElem = grab(in) + push(out, lastElem) + } - override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = ctx.absorbTermination() + override def onPull(): Unit = { + if (isClosed(in)) { + push(out, lastElem) + } else { + pull(in) + } + } + + override def onUpstreamFinish(): Unit = {} + + setHandlers(in, out, this) + } + + override val shape: FlowShape[T, T] = FlowShape(in, out) } // This test is related to issue #17351