Merge pull request #17241 from 2m/wip-graphbalancespec-robustify

=str #17176 do not check for arbitrary conditions that depends on thread scheduling
This commit is contained in:
Martynas Mickevičius 2015-04-24 15:30:43 +03:00
commit eb4cb47762

View file

@ -2,11 +2,13 @@ package akka.stream.scaladsl
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.Future
import akka.stream.ActorFlowMaterializer
import akka.stream.ActorFlowMaterializerSettings
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
import akka.stream.testkit.Utils._
class GraphBalanceSpec extends AkkaSpec {
@ -51,7 +53,7 @@ class GraphBalanceSpec extends AkkaSpec {
val balance = b.add(Balance[Int](2, waitForAllDownstreams = true))
Source(List(1, 2, 3)) ~> balance.in
balance.out(0) ~> Sink(s1)
balance.out(1) ~> p2Sink.inlet
balance.out(1) ~> p2Sink
}.run()
val sub1 = s1.expectSubscription()
@ -81,8 +83,8 @@ class GraphBalanceSpec extends AkkaSpec {
val balance = b.add(Balance[Int](3, waitForAllDownstreams = true))
Source(List(1, 2, 3)) ~> balance.in
balance.out(0) ~> Sink(s1)
balance.out(1) ~> p2Sink.inlet
balance.out(2) ~> p3Sink.inlet
balance.out(1) ~> p2Sink
balance.out(2) ~> p3Sink
}.run()
val sub1 = s1.expectSubscription()
@ -109,37 +111,65 @@ class GraphBalanceSpec extends AkkaSpec {
"work with 5-way balance" in {
val (s1, s2, s3, s4, s5) = FlowGraph.closed(Sink.head[Seq[Int]], Sink.head[Seq[Int]], Sink.head[Seq[Int]], Sink.head[Seq[Int]], Sink.head[Seq[Int]])(Tuple5.apply) {
val sink = Sink.head[Seq[Int]]
val (s1, s2, s3, s4, s5) = FlowGraph.closed(sink, sink, sink, sink, sink)(Tuple5.apply) {
implicit b
(f1, f2, f3, f4, f5)
val balance = b.add(Balance[Int](5, waitForAllDownstreams = true))
Source(0 to 14) ~> balance.in
balance.out(0).grouped(15) ~> f1.inlet
balance.out(1).grouped(15) ~> f2.inlet
balance.out(2).grouped(15) ~> f3.inlet
balance.out(3).grouped(15) ~> f4.inlet
balance.out(4).grouped(15) ~> f5.inlet
balance.out(0).grouped(15) ~> f1
balance.out(1).grouped(15) ~> f2
balance.out(2).grouped(15) ~> f3
balance.out(3).grouped(15) ~> f4
balance.out(4).grouped(15) ~> f5
}.run()
Set(s1, s2, s3, s4, s5) flatMap (Await.result(_, 3.seconds)) should be((0 to 14).toSet)
}
"fairly balance between three outputs" in {
"balance between all three outputs" in {
val numElementsForSink = 10000
val outputs = Sink.fold[Int, Int](0)(_ + _)
val (r1, r2, r3) = FlowGraph.closed(outputs, outputs, outputs)(Tuple3.apply) { implicit b
val results = FlowGraph.closed(outputs, outputs, outputs)(List(_, _, _)) { implicit b
(o1, o2, o3)
val balance = b.add(Balance[Int](3, waitForAllDownstreams = true))
Source.repeat(1).take(numElementsForSink * 3) ~> balance.in
balance.out(0) ~> o1.inlet
balance.out(1) ~> o2.inlet
balance.out(2) ~> o3.inlet
balance.out(0) ~> o1
balance.out(1) ~> o2
balance.out(2) ~> o3
}.run()
Await.result(r1, 3.seconds) should be(numElementsForSink +- 2000)
Await.result(r2, 3.seconds) should be(numElementsForSink +- 2000)
Await.result(r3, 3.seconds) should be(numElementsForSink +- 2000)
import system.dispatcher
val sum = Future.sequence(results).map { res
res should not contain 0
res.sum
}
Await.result(sum, 3.seconds) should be(numElementsForSink * 3)
}
"fairly balance between three outputs" in {
val probe = TestSink.probe[Int]
val (p1, p2, p3) = FlowGraph.closed(probe, probe, probe)(Tuple3.apply) { implicit b
(o1, o2, o3)
val balance = b.add(Balance[Int](3))
Source(1 to 7) ~> balance.in
balance.out(0) ~> o1
balance.out(1) ~> o2
balance.out(2) ~> o3
}.run()
p1.requestNext(1)
p2.requestNext(2)
p3.requestNext(3)
p2.requestNext(4)
p1.requestNext(5)
p3.requestNext(6)
p1.requestNext(7)
p1.expectComplete()
p2.expectComplete()
p3.expectComplete()
}
"produce to second even though first cancels" in assertAllStagesStopped {