package docs.stream import akka.stream.{ OverflowStrategy, ActorFlowMaterializer } import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec class GraphCyclesSpec extends AkkaSpec { implicit val mat = ActorFlowMaterializer() "Cycle demonstration" must { val source = Source(() => Iterator.from(0)) "include a deadlocked cycle" in { // format: OFF //#deadlocked // WARNING! The graph below deadlocks! FlowGraph.closed() { implicit b => import FlowGraph.Implicits._ val merge = b.add(Merge[Int](2)) val bcast = b.add(Broadcast[Int](2)) source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore() merge <~ bcast } //#deadlocked // format: ON } "include an unfair cycle" in { // format: OFF //#unfair // WARNING! The graph below stops consuming from "source" after a few steps FlowGraph.closed() { implicit b => import FlowGraph.Implicits._ val merge = b.add(MergePreferred[Int](1)) val bcast = b.add(Broadcast[Int](2)) source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore() merge.preferred <~ bcast } //#unfair // format: ON } "include a dropping cycle" in { // format: OFF //#dropping FlowGraph.closed() { implicit b => import FlowGraph.Implicits._ val merge = b.add(Merge[Int](2)) val bcast = b.add(Broadcast[Int](2)) source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore() merge <~ Flow[Int].buffer(10, OverflowStrategy.dropHead) <~ bcast } //#dropping // format: ON } "include a dead zipping cycle" in { // format: OFF //#zipping-dead // WARNING! The graph below never processes any elements FlowGraph.closed() { implicit b => import FlowGraph.Implicits._ val zip = b.add(ZipWith[Int, Int, Int]((left, right) => right)) val bcast = b.add(Broadcast[Int](2)) source ~> zip.in0 zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore() zip.in1 <~ bcast } //#zipping-dead // format: ON } "include a live zipping cycle" in { // format: OFF //#zipping-live FlowGraph.closed() { implicit b => import FlowGraph.Implicits._ val zip = b.add(ZipWith((left: Int, right: Int) => left)) val bcast = b.add(Broadcast[Int](2)) val concat = b.add(Concat[Int]()) source ~> zip.in0 zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore() zip.in1 <~ concat <~ bcast concat <~ Source.single(0) } //#zipping-live // format: ON } } }