diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index c531d44cdc..e2780d6ef2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -58,6 +58,16 @@ class FlowGraphCompileSpec extends AkkaSpec { }.run() } + "build simple balance" in { + FlowGraph { b ⇒ + val balance = Balance[String] + b. + addEdge(in1, f1, balance). + addEdge(balance, f2, out1). + addEdge(balance, f3, out2) + } + } + "build simple merge - broadcast" in { FlowGraph { b ⇒ val merge = Merge[String] diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBalanceSpec.scala new file mode 100644 index 0000000000..459c4a7529 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphBalanceSpec.scala @@ -0,0 +1,154 @@ +package akka.stream.scaladsl2 + +import akka.stream.MaterializerSettings +import akka.stream.scaladsl2.FlowGraphImplicits._ +import akka.stream.testkit.{ AkkaSpec, StreamTestKit } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class GraphBalanceSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) + + "A balance" must { + + "balance between subscribers which signal demand" in { + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + + FlowGraph { implicit b ⇒ + val balance = Balance[Int]("balance") + Source(List(1, 2, 3)) ~> balance + balance ~> SubscriberDrain(c1) + balance ~> SubscriberDrain(c2) + }.run() + + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + + sub1.request(1) + c1.expectNext(1) + c1.expectNoMsg(100.millis) + + sub2.request(2) + c2.expectNext(2) + c2.expectNext(3) + c1.expectComplete() + c2.expectComplete() + } + + "work with 5-way balance" in { + val f1 = FutureDrain[Seq[Int]] + val f2 = FutureDrain[Seq[Int]] + val f3 = FutureDrain[Seq[Int]] + val f4 = FutureDrain[Seq[Int]] + val f5 = FutureDrain[Seq[Int]] + + val g = FlowGraph { implicit b ⇒ + val balance = Balance[Int]("balance") + Source(0 to 14) ~> balance + balance ~> Flow[Int].grouped(15) ~> f1 + balance ~> Flow[Int].grouped(15) ~> f2 + balance ~> Flow[Int].grouped(15) ~> f3 + balance ~> Flow[Int].grouped(15) ~> f4 + balance ~> Flow[Int].grouped(15) ~> f5 + }.run() + + Set(f1, f2, f3, f4, f5) flatMap (sink ⇒ Await.result(sink.future(g), 3.seconds)) should be((0 to 14).toSet) + } + + "fairly balance between three outputs" in { + val numElementsForSink = 10000 + val f1, f2, f3 = FoldDrain[Int, Int](0)(_ + _) + val g = FlowGraph { implicit b ⇒ + val balance = Balance[Int]("balance") + Source(Stream.fill(10000 * 3)(1)) ~> balance ~> f1 + balance ~> f2 + balance ~> f3 + }.run() + + Seq(f1, f2, f3) map { sink ⇒ + Await.result(sink.future(g), 3.seconds) should be(numElementsForSink +- 1000) + } + } + + "produce to second even though first cancels" in { + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + + FlowGraph { implicit b ⇒ + val balance = Balance[Int]("balance") + Source(List(1, 2, 3)) ~> balance + balance ~> Flow[Int] ~> SubscriberDrain(c1) + balance ~> Flow[Int] ~> SubscriberDrain(c2) + }.run() + + val sub1 = c1.expectSubscription() + sub1.cancel() + val sub2 = c2.expectSubscription() + sub2.request(3) + c2.expectNext(1) + c2.expectNext(2) + c2.expectNext(3) + c2.expectComplete() + } + + "produce to first even though second cancels" in { + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + + FlowGraph { implicit b ⇒ + val balance = Balance[Int]("balance") + Source(List(1, 2, 3)) ~> balance + balance ~> Flow[Int] ~> SubscriberDrain(c1) + balance ~> Flow[Int] ~> SubscriberDrain(c2) + }.run() + + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + sub2.cancel() + sub1.request(3) + c1.expectNext(1) + c1.expectNext(2) + c1.expectNext(3) + c1.expectComplete() + } + + "cancel upstream when downstreams cancel" in { + val p1 = StreamTestKit.PublisherProbe[Int]() + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + + FlowGraph { implicit b ⇒ + val balance = Balance[Int]("balance") + Source(p1.getPublisher) ~> balance + balance ~> Flow[Int] ~> SubscriberDrain(c1) + balance ~> Flow[Int] ~> SubscriberDrain(c2) + }.run() + + val bsub = p1.expectSubscription() + val sub1 = c1.expectSubscription() + val sub2 = c2.expectSubscription() + + sub1.request(1) + p1.expectRequest(bsub, 16) + bsub.sendNext(1) + c1.expectNext(1) + + sub2.request(1) + bsub.sendNext(2) + c2.expectNext(2) + + sub1.cancel() + sub2.cancel() + bsub.expectCancellation() + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala index 8eabed8723..26351f1f4f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/GraphOpsIntegrationSpec.scala @@ -34,6 +34,26 @@ class GraphOpsIntegrationSpec extends AkkaSpec { Await.result(g.getDrainFor(resultFuture), 3.seconds).sorted should be(List(1, 2, 3, 4, 5, 6)) } + "support balance - merge (parallelization) layouts" in { + val elements = 0 to 10 + val in = Source(elements) + val f = Flow[Int] + val out = FutureDrain[Seq[Int]] + + val g = FlowGraph { implicit b ⇒ + val balance = Balance[Int] + val merge = Merge[Int] + + in ~> balance ~> f ~> merge + balance ~> f ~> merge + balance ~> f ~> merge + balance ~> f ~> merge + balance ~> f ~> merge ~> Flow[Int].grouped(elements.size * 2) ~> out + }.run() + + Await.result(out.future(g), 3.seconds).sorted should be(elements) + } + "support wikipedia Topological_sorting 2" in { // see https://en.wikipedia.org/wiki/Topological_sorting#mediaviewer/File:Directed_acyclic_graph.png val resultFuture2 = FutureDrain[Seq[Int]] diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index 4744cf82c8..a79895fe1b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -89,6 +89,10 @@ private[akka] object Ast { override def name = "broadcast" } + case object Balance extends FanOutAstNode { + override def name = "balance" + } + case object Zip extends FanInAstNode { override def name = "zip" } @@ -234,6 +238,8 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting val impl = op match { case Ast.Broadcast ⇒ actorOf(Broadcast.props(settings, outputCount).withDispatcher(settings.dispatcher), actorName) + case Ast.Balance ⇒ + actorOf(Balance.props(settings, outputCount).withDispatcher(settings.dispatcher), actorName) case Ast.Unzip ⇒ actorOf(Unzip.props(settings).withDispatcher(settings.dispatcher), actorName) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 48e04ecd0d..86b3ad9b5a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -68,6 +68,7 @@ object Merge { */ def apply[T](name: String): Merge[T] = new Merge[T](Some(name)) } + /** * Merge several streams, taking elements as they arrive from input streams * (picking randomly when several have elements ready). @@ -101,6 +102,7 @@ object Broadcast { */ def apply[T](name: String): Broadcast[T] = new Broadcast[T](Some(name)) } + /** * Fan-out the stream to several streams. Each element is produced to * the other streams. It will not shutdown until the subscriptions for at least @@ -116,6 +118,38 @@ final class Broadcast[T](override val name: Option[String]) extends FlowGraphInt override private[akka] def astNode = Ast.Broadcast } +object Balance { + /** + * Create a new anonymous `Balance` vertex with the specified input type. + * Note that a `Balance` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ + def apply[T]: Balance[T] = new Balance[T](None) + /** + * Create a named `Balance` vertex with the specified input type. + * Note that a `Balance` with a specific name can only be used at one place (one vertex) + * in the `FlowGraph`. Calling this method several times with the same name + * returns instances that are `equal`. + */ + def apply[T](name: String): Balance[T] = new Balance[T](Some(name)) +} + +/** + * Fan-out the stream to several streams. Each element is produced to + * one of the other streams. It will not shutdown until the subscriptions for at least + * two downstream subscribers have been established. + */ +final class Balance[T](override val name: Option[String]) extends FlowGraphInternal.InternalVertex with Junction[T] { + override private[akka] def vertex = this + override def minimumInputCount: Int = 1 + override def maximumInputCount: Int = 1 + override def minimumOutputCount: Int = 2 + override def maximumOutputCount: Int = Int.MaxValue + + override private[akka] def astNode = Ast.Balance +} + object Zip { /** * Create a new anonymous `Zip` vertex with the specified input types.