diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala index deb74cd372..a538371ea2 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala @@ -29,7 +29,7 @@ class RecipeDroppyBroadcast extends RecipeSpec { import FlowGraphImplicits._ val graph = FlowGraph { implicit builder => - val bcast = Broadcast[Int]("broadcast") + val bcast = Broadcast[Int] myElements ~> bcast diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala index eaa9d163c4..4ab0b90728 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala @@ -28,7 +28,7 @@ class RecipeKeepAlive extends RecipeSpec { import FlowGraphImplicits._ val graph = FlowGraph { implicit builder => - val unfairMerge = MergePreferred[ByteString]("keepAliveInjector") + val unfairMerge = MergePreferred[ByteString] dataStream ~> unfairMerge.preferred // If data is available then no keepalive is injected keepAliveStream ~> unfairMerge diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala index 1b4d85cde1..671c5c7625 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala @@ -1,5 +1,6 @@ package docs.stream.cookbook +import akka.stream.OverflowStrategy import akka.stream.scaladsl._ import scala.concurrent.{ Await, Future } @@ -9,6 +10,8 @@ class RecipeReduceByKey extends RecipeSpec { "Reduce by key recipe" must { + val MaximumDistinctWords = 1000 + "work with simple word count" in { def words = Source(List("hello", "world", "and", "hello", "universe", "akka") ++ List.fill(1000)("rocks!")) @@ -26,7 +29,10 @@ class RecipeReduceByKey extends RecipeSpec { } // get a stream of word counts - val counts: Source[(String, Int)] = countedWords.mapAsync(identity) + val counts: Source[(String, Int)] = + countedWords + .buffer(MaximumDistinctWords, OverflowStrategy.error) + .mapAsync(identity) //#word-count Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( @@ -44,6 +50,7 @@ class RecipeReduceByKey extends RecipeSpec { //#reduce-by-key-general def reduceByKey[In, K, Out]( + maximumGroupSize: Int, groupKey: (In) => K, foldZero: (K) => Out)(fold: (Out, In) => Out): Flow[In, (K, Out)] = { @@ -55,10 +62,11 @@ class RecipeReduceByKey extends RecipeSpec { } } - reducedValues.mapAsync(identity) + reducedValues.buffer(maximumGroupSize, OverflowStrategy.error).mapAsync(identity) } val wordCounts = words.via(reduceByKey( + MaximumDistinctWords, groupKey = (word: String) => word, foldZero = (key: String) => 0)(fold = (count: Int, elem: String) => count + 1)) diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala index 91d178b54c..99f23a7ae6 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala @@ -25,7 +25,7 @@ class RecipeWorkerPool extends RecipeSpec { val resultsOut = UndefinedSink[Out] val balancer = Balance[In](waitForAllDownstreams = true) - val merge = Merge[Out]("merge") + val merge = Merge[Out] jobsIn ~> balancer // Jobs are fed into the balancer merge ~> resultsOut // the merged results are sent out diff --git a/akka-docs-dev/rst/scala/cookbook.rst b/akka-docs-dev/rst/scala/cookbook.rst index ccaa28a5f1..090d581731 100644 --- a/akka-docs-dev/rst/scala/cookbook.rst +++ b/akka-docs-dev/rst/scala/cookbook.rst @@ -126,6 +126,14 @@ over the groups and using ``fold`` (remember that ``fold`` automatically materia on) we get a stream with elements of ``Future[String,Int]``. Now all we need is to flatten this stream, which can be achieved by calling ``mapAsynch(identity)``. +There is one tricky issue to be noted here. The careful reader probably noticed that we put a ``buffer`` between the +``mapAsync()`` operation that flattens the stream of futures and the actual stream of futures. The reason for this is +that the substreams produced by ``groupBy()`` can only complete when the original upstream source completes. This means +that ``mapAsync()`` cannot pull for more substreams because it still waits on folding futures to finish, but these +futures never finish if the additional group streams are not consumed. This typical deadlock situation is resolved by +this buffer which either able to contain all the group streams (which ensures that they are already running and folding) +or fails with an explicit error instead of a silent deadlock. + .. includecode:: code/docs/stream/cookbook/RecipeReduceByKey.scala#word-count By extracting the parts specific to *wordcount* into