pekko/akka-docs/src/test/scala/docs/stream/GraphCyclesSpec.scala

113 lines
3.1 KiB
Scala
Raw Normal View History

/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
2014-12-18 14:55:15 +01:00
package docs.stream
2019-09-05 16:08:37 +02:00
import akka.stream.{ ClosedShape, OverflowStrategy }
2014-12-18 14:55:15 +01:00
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec
2014-12-18 14:55:15 +01:00
class GraphCyclesSpec extends AkkaSpec {
"Cycle demonstration" must {
val source = Source.fromIterator(() => Iterator.from(0))
2014-12-18 14:55:15 +01:00
"include a deadlocked cycle" in {
// format: OFF
2014-12-18 14:55:15 +01:00
//#deadlocked
// WARNING! The graph below deadlocks!
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
2014-12-18 14:55:15 +01:00
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[Int](2))
2014-12-18 14:55:15 +01:00
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge <~ bcast
ClosedShape
})
2014-12-18 14:55:15 +01:00
//#deadlocked
// format: ON
2014-12-18 14:55:15 +01:00
}
"include an unfair cycle" in {
// format: OFF
2014-12-18 14:55:15 +01:00
//#unfair
// WARNING! The graph below stops consuming from "source" after a few steps
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
2014-12-18 14:55:15 +01:00
val merge = b.add(MergePreferred[Int](1))
val bcast = b.add(Broadcast[Int](2))
2014-12-18 14:55:15 +01:00
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge.preferred <~ bcast
ClosedShape
})
2014-12-18 14:55:15 +01:00
//#unfair
// format: ON
2014-12-18 14:55:15 +01:00
}
"include a dropping cycle" in {
// format: OFF
2014-12-18 14:55:15 +01:00
//#dropping
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
2014-12-18 14:55:15 +01:00
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[Int](2))
2014-12-18 14:55:15 +01:00
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge <~ Flow[Int].buffer(10, OverflowStrategy.dropHead) <~ bcast
ClosedShape
})
2014-12-18 14:55:15 +01:00
//#dropping
// format: ON
2014-12-18 14:55:15 +01:00
}
"include a dead zipping cycle" in {
// format: OFF
2014-12-18 14:55:15 +01:00
//#zipping-dead
// WARNING! The graph below never processes any elements
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
2014-12-18 14:55:15 +01:00
val zip = b.add(ZipWith[Int, Int, Int]((left, right) => right))
val bcast = b.add(Broadcast[Int](2))
2014-12-18 14:55:15 +01:00
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
zip.in1 <~ bcast
ClosedShape
})
2014-12-18 14:55:15 +01:00
//#zipping-dead
// format: ON
2014-12-18 14:55:15 +01:00
}
"include a live zipping cycle" in {
// format: OFF
2014-12-18 14:55:15 +01:00
//#zipping-live
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
2014-12-18 14:55:15 +01:00
val zip = b.add(ZipWith((left: Int, right: Int) => left))
val bcast = b.add(Broadcast[Int](2))
val concat = b.add(Concat[Int]())
2015-02-26 16:37:16 +01:00
val start = Source.single(0)
2014-12-18 14:55:15 +01:00
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
2015-02-26 16:37:16 +01:00
zip.in1 <~ concat <~ start
concat <~ bcast
ClosedShape
})
2014-12-18 14:55:15 +01:00
//#zipping-live
// format: ON
2014-12-18 14:55:15 +01:00
}
}
}