diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala index 33324536d6..bf64e862e9 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala @@ -12,14 +12,16 @@ class RecipeDroppyBroadcast extends RecipeSpec { "Recipe for a droppy broadcast" must { "work" in { - val myElements = Source(immutable.Iterable.tabulate(100)(_ + 1)) + val pub = TestPublisher.probe[Int]() + val myElements = Source(pub) val sub1 = TestSubscriber.manualProbe[Int]() val sub2 = TestSubscriber.manualProbe[Int]() + val sub3 = TestSubscriber.probe[Int]() val futureSink = Sink.head[Seq[Int]] val mySink1 = Sink(sub1) val mySink2 = Sink(sub2) - val mySink3 = Flow[Int].grouped(200).toMat(futureSink)(Keep.right) + val mySink3 = Sink(sub3) //#droppy-bcast val graph = FlowGraph.closed(mySink1, mySink2, mySink3)((_, _, _)) { implicit b => @@ -35,7 +37,15 @@ class RecipeDroppyBroadcast extends RecipeSpec { } //#droppy-bcast - Await.result(graph.run()._3, 3.seconds).sum should be(5050) + graph.run() + + sub3.request(100) + for (i <- 1 to 100) { + pub.sendNext(i) + sub3.expectNext(i) + } + + pub.sendComplete() sub1.expectSubscription().request(10) sub2.expectSubscription().request(10)