=str #18377: Harden DroppyBroadcastSpec to not lose messages

This commit is contained in:
Endre Sándor Varga 2015-09-02 14:09:21 +02:00
parent 7bdfd4e50f
commit 1a3e1403d7

View file

@ -12,14 +12,16 @@ class RecipeDroppyBroadcast extends RecipeSpec {
"Recipe for a droppy broadcast" must {
"work" in {
val myElements = Source(immutable.Iterable.tabulate(100)(_ + 1))
val pub = TestPublisher.probe[Int]()
val myElements = Source(pub)
val sub1 = TestSubscriber.manualProbe[Int]()
val sub2 = TestSubscriber.manualProbe[Int]()
val sub3 = TestSubscriber.probe[Int]()
val futureSink = Sink.head[Seq[Int]]
val mySink1 = Sink(sub1)
val mySink2 = Sink(sub2)
val mySink3 = Flow[Int].grouped(200).toMat(futureSink)(Keep.right)
val mySink3 = Sink(sub3)
//#droppy-bcast
val graph = FlowGraph.closed(mySink1, mySink2, mySink3)((_, _, _)) { implicit b =>
@ -35,7 +37,15 @@ class RecipeDroppyBroadcast extends RecipeSpec {
}
//#droppy-bcast
Await.result(graph.run()._3, 3.seconds).sum should be(5050)
graph.run()
sub3.request(100)
for (i <- 1 to 100) {
pub.sendNext(i)
sub3.expectNext(i)
}
pub.sendComplete()
sub1.expectSubscription().request(10)
sub2.expectSubscription().request(10)