From 1a3e1403d7d843a57c1427671d7eddddf09c3b3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Wed, 2 Sep 2015 14:09:21 +0200 Subject: [PATCH] =str #18377: Harden DroppyBroadcastSpec to not lose messages --- .../stream/cookbook/RecipeDroppyBroadcast.scala | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala index 33324536d6..bf64e862e9 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala @@ -12,14 +12,16 @@ class RecipeDroppyBroadcast extends RecipeSpec { "Recipe for a droppy broadcast" must { "work" in { - val myElements = Source(immutable.Iterable.tabulate(100)(_ + 1)) + val pub = TestPublisher.probe[Int]() + val myElements = Source(pub) val sub1 = TestSubscriber.manualProbe[Int]() val sub2 = TestSubscriber.manualProbe[Int]() + val sub3 = TestSubscriber.probe[Int]() val futureSink = Sink.head[Seq[Int]] val mySink1 = Sink(sub1) val mySink2 = Sink(sub2) - val mySink3 = Flow[Int].grouped(200).toMat(futureSink)(Keep.right) + val mySink3 = Sink(sub3) //#droppy-bcast val graph = FlowGraph.closed(mySink1, mySink2, mySink3)((_, _, _)) { implicit b => @@ -35,7 +37,15 @@ class RecipeDroppyBroadcast extends RecipeSpec { } //#droppy-bcast - Await.result(graph.run()._3, 3.seconds).sum should be(5050) + graph.run() + + sub3.request(100) + for (i <- 1 to 100) { + pub.sendNext(i) + sub3.expectNext(i) + } + + pub.sendComplete() sub1.expectSubscription().request(10) sub2.expectSubscription().request(10)