=htc #19361 Rewrite Duplicator/Duplicator2 to use GraphStage instead of StatefulStage

This commit is contained in:
Tal Pressman 2016-01-11 09:33:02 +02:00
parent e6b8c86315
commit a44d78e3d0

View file

@ -1,13 +1,12 @@
package docs.stream
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ RunnableGraph, Sink, Source, Flow, Keep }
import akka.stream.stage.PushPullStage
import akka.stream._
import akka.stream.scaladsl.{ Sink, Source, Flow, Keep }
import akka.stream.testkit.AkkaSpec
import org.scalatest.concurrent.{ ScalaFutures, Futures }
import org.scalatest.concurrent.ScalaFutures
import scala.collection.immutable
import scala.concurrent.{ Future, Await }
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowStagesSpec extends AkkaSpec with ScalaFutures {
@ -107,18 +106,33 @@ class FlowStagesSpec extends AkkaSpec with ScalaFutures {
//#pushstage
}
"demonstrate StatefulStage" in {
"demonstrate GraphStage" in {
//#doubler-stateful
class Duplicator[A]() extends StatefulStage[A, A] {
override def initial: StageState[A, A] = new StageState[A, A] {
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
emit(List(elem, elem).iterator, ctx)
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Duplicator.in")
val out = Outlet[A]("Duplicator.out")
val shape: FlowShape[A, A] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
emitMultiple(out, List(elem, elem))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
//#doubler-stateful
val fold = Source(1 to 2).transform(() new Duplicator[Int]).runFold("")(_ + _)
val duplicator = Flow.fromGraph(new Duplicator[Int])
val fold = Source(1 to 2).via(duplicator).runFold("")(_ + _)
whenReady(fold) { s
s should be("1122")
}