diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala index 73db29bb29..0f9297c5c2 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphBalanceSpec.scala @@ -2,11 +2,13 @@ package akka.stream.scaladsl import scala.concurrent.Await import scala.concurrent.duration._ +import scala.concurrent.Future import akka.stream.ActorFlowMaterializer import akka.stream.ActorFlowMaterializerSettings import akka.stream.testkit._ +import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ class GraphBalanceSpec extends AkkaSpec { @@ -51,7 +53,7 @@ class GraphBalanceSpec extends AkkaSpec { val balance = b.add(Balance[Int](2, waitForAllDownstreams = true)) Source(List(1, 2, 3)) ~> balance.in balance.out(0) ~> Sink(s1) - balance.out(1) ~> p2Sink.inlet + balance.out(1) ~> p2Sink }.run() val sub1 = s1.expectSubscription() @@ -81,8 +83,8 @@ class GraphBalanceSpec extends AkkaSpec { val balance = b.add(Balance[Int](3, waitForAllDownstreams = true)) Source(List(1, 2, 3)) ~> balance.in balance.out(0) ~> Sink(s1) - balance.out(1) ~> p2Sink.inlet - balance.out(2) ~> p3Sink.inlet + balance.out(1) ~> p2Sink + balance.out(2) ~> p3Sink }.run() val sub1 = s1.expectSubscription() @@ -109,37 +111,65 @@ class GraphBalanceSpec extends AkkaSpec { "work with 5-way balance" in { - val (s1, s2, s3, s4, s5) = FlowGraph.closed(Sink.head[Seq[Int]], Sink.head[Seq[Int]], Sink.head[Seq[Int]], Sink.head[Seq[Int]], Sink.head[Seq[Int]])(Tuple5.apply) { + val sink = Sink.head[Seq[Int]] + val (s1, s2, s3, s4, s5) = FlowGraph.closed(sink, sink, sink, sink, sink)(Tuple5.apply) { implicit b ⇒ (f1, f2, f3, f4, f5) ⇒ val balance = b.add(Balance[Int](5, waitForAllDownstreams = true)) Source(0 to 14) ~> balance.in - balance.out(0).grouped(15) ~> f1.inlet - balance.out(1).grouped(15) ~> f2.inlet - balance.out(2).grouped(15) ~> f3.inlet - balance.out(3).grouped(15) ~> f4.inlet - balance.out(4).grouped(15) ~> f5.inlet + balance.out(0).grouped(15) ~> f1 + balance.out(1).grouped(15) ~> f2 + balance.out(2).grouped(15) ~> f3 + balance.out(3).grouped(15) ~> f4 + balance.out(4).grouped(15) ~> f5 }.run() Set(s1, s2, s3, s4, s5) flatMap (Await.result(_, 3.seconds)) should be((0 to 14).toSet) } - "fairly balance between three outputs" in { + "balance between all three outputs" in { val numElementsForSink = 10000 val outputs = Sink.fold[Int, Int](0)(_ + _) - val (r1, r2, r3) = FlowGraph.closed(outputs, outputs, outputs)(Tuple3.apply) { implicit b ⇒ + val results = FlowGraph.closed(outputs, outputs, outputs)(List(_, _, _)) { implicit b ⇒ (o1, o2, o3) ⇒ val balance = b.add(Balance[Int](3, waitForAllDownstreams = true)) Source.repeat(1).take(numElementsForSink * 3) ~> balance.in - balance.out(0) ~> o1.inlet - balance.out(1) ~> o2.inlet - balance.out(2) ~> o3.inlet + balance.out(0) ~> o1 + balance.out(1) ~> o2 + balance.out(2) ~> o3 }.run() - Await.result(r1, 3.seconds) should be(numElementsForSink +- 2000) - Await.result(r2, 3.seconds) should be(numElementsForSink +- 2000) - Await.result(r3, 3.seconds) should be(numElementsForSink +- 2000) + import system.dispatcher + val sum = Future.sequence(results).map { res ⇒ + res should not contain 0 + res.sum + } + Await.result(sum, 3.seconds) should be(numElementsForSink * 3) + } + + "fairly balance between three outputs" in { + val probe = TestSink.probe[Int] + val (p1, p2, p3) = FlowGraph.closed(probe, probe, probe)(Tuple3.apply) { implicit b ⇒ + (o1, o2, o3) ⇒ + val balance = b.add(Balance[Int](3)) + Source(1 to 7) ~> balance.in + balance.out(0) ~> o1 + balance.out(1) ~> o2 + balance.out(2) ~> o3 + }.run() + + p1.requestNext(1) + p2.requestNext(2) + p3.requestNext(3) + p2.requestNext(4) + p1.requestNext(5) + p3.requestNext(6) + p1.requestNext(7) + + p1.expectComplete() + p2.expectComplete() + p3.expectComplete() } "produce to second even though first cancels" in assertAllStagesStopped {