diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala new file mode 100644 index 0000000000..7af794cef2 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeByteStrings.scala @@ -0,0 +1,94 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl.{ Flow, Sink, Source } +import akka.util.ByteString + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class RecipeByteStrings extends RecipeSpec { + + "Recipes for bytestring streams" must { + + "have a working chunker" in { + val rawBytes = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9))) + val ChunkLimit = 2 + + //#bytestring-chunker + import akka.stream.stage._ + + class Chunker(val chunkSize: Int) extends PushPullStage[ByteString, ByteString] { + private var buffer = ByteString.empty + + override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = { + buffer ++= elem + emitChunkOrPull(ctx) + } + + override def onPull(ctx: Context[ByteString]): Directive = emitChunkOrPull(ctx) + + private def emitChunkOrPull(ctx: Context[ByteString]): Directive = { + if (buffer.isEmpty) ctx.pull() + else { + val (emit, nextBuffer) = buffer.splitAt(chunkSize) + buffer = nextBuffer + ctx.push(emit) + } + } + + } + + val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit)) + //#bytestring-chunker + + val chunksFuture = chunksStream.grouped(10).runWith(Sink.head) + + val chunks = Await.result(chunksFuture, 3.seconds) + + chunks.forall(_.size <= 2) should be(true) + chunks.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9)) + } + + "have a working bytes limiter" in { + val SizeLimit = 9 + + //#bytes-limiter + import akka.stream.stage._ + class ByteLimiter(val maximumBytes: Long) extends PushStage[ByteString, ByteString] { + private var count = 0 + + override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = { + count += chunk.size + if (count > maximumBytes) ctx.fail(new IllegalStateException("Too much bytes")) + else ctx.push(chunk) + } + } + + val limiter = Flow[ByteString].transform(() => new ByteLimiter(SizeLimit)) + //#bytes-limiter + + val bytes1 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9))) + val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10))) + + Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head), 3.seconds) + .fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9)) + + an[IllegalStateException] must be thrownBy { + Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head), 3.seconds) + } + } + + "demonstrate compacting" in { + + val data = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9))) + + //#compacting-bytestrings + val compacted: Source[ByteString] = data.map(_.compact) + //#compacting-bytestrings + + Await.result(compacted.grouped(10).runWith(Sink.head), 3.seconds).forall(_.isCompact) should be(true) + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeCollectingMetrics.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeCollectingMetrics.scala new file mode 100644 index 0000000000..55638d809f --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeCollectingMetrics.scala @@ -0,0 +1,91 @@ +package docs.stream.cookbook + +import akka.stream.{ MaterializerSettings, FlowMaterializer } +import akka.stream.scaladsl._ +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe } + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ + +class RecipeCollectingMetrics extends RecipeSpec { + import HoldOps._ + implicit val m2 = FlowMaterializer(MaterializerSettings(system).withInputBuffer(1, 1)) + + "Recipe for periodically collecting metrics" must { + + "work" in { + // type Tick = Unit + // + // val loadPub = PublisherProbe[Int]() + // val tickPub = PublisherProbe[Tick]() + // val reportTicks = Source(tickPub) + // val loadUpdates = Source(loadPub) + // val futureSink = Sink.head[immutable.Seq[String]] + // val sink = Flow[String].grouped(10).to(futureSink) + // + // //#periodic-metrics-collection + // val currentLoad = loadUpdates.transform(() => new HoldWithWait) + // + // val graph = FlowGraph { implicit builder => + // import FlowGraphImplicits._ + // val collector = ZipWith[Int, Tick, String]( + // (load: Int, tick: Tick) => s"current load is $load") + // + // currentLoad ~> collector.left + // reportTicks ~> collector.right + // + // collector.out ~> sink + // } + // //#periodic-metrics-collection + // + // val reports = graph.run().get(futureSink) + // val manualLoad = new StreamTestKit.AutoPublisher(loadPub) + // val manualTick = new StreamTestKit.AutoPublisher(tickPub) + // + // // Prefetch elimination + // manualTick.sendNext(()) + // + // manualLoad.sendNext(53) + // manualLoad.sendNext(61) + // manualTick.sendNext(()) + // + // manualLoad.sendNext(44) + // manualLoad.sendNext(54) + // manualLoad.sendNext(78) + // Thread.sleep(500) + // + // manualTick.sendNext(()) + // + // manualTick.sendComplete() + // + // Await.result(reports, 3.seconds) should be(List("current load is 53", "current load is 61", "current load is 78")) + + // Periodically collect values of metrics expressed as stream of updates + // --------------------------------------------------------------------- + // + // **Situation:** Given performance counters expressed as a stream of updates we want to gather a periodic report of these. + // We do not want to backpressure the counter updates but always take the last value instead. Whenever we don't have a new counter + // value we want to repeat the last value. + // + // This recipe uses the :class:`HoldWithWait` recipe introduced previously. We use this element to gather updates from + // the counter stream and store the final value, and also repeat this final value if no update is received between + // metrics collection rounds. + // + // To finish the recipe, we simply use :class:`ZipWith` to trigger reading the latest value from the ``currentLoad`` + // stream whenever a new ``Tick`` arrives on the stream of ticks, ``reportTicks``. + // + // .. includecode:: code/docs/stream/cookbook/RecipeCollectingMetrics.scala#periodic-metrics-collection + // + // .. warning:: + // In order for this recipe to work the buffer size for the :class:`ZipWith` must be set to 1. The reason for this is + // explained in the "Buffering" section of the documentation. + + // FIXME: This recipe does only work with buffer size of 0, which is only available if graph fusing is implemented + pending + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDigest.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDigest.scala new file mode 100644 index 0000000000..bff9af013a --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDigest.scala @@ -0,0 +1,62 @@ +package docs.stream.cookbook + +import java.security.MessageDigest + +import akka.stream.scaladsl.{ Sink, Source } +import akka.util.ByteString + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class RecipeDigest extends RecipeSpec { + + "Recipe for calculating digest" must { + + "work" in { + + val data = Source(List( + ByteString("abcdbcdecdef"), + ByteString("defgefghfghighijhijkijkljklmklmnlmnomnopnopq"))) + + //#calculating-digest + import akka.stream.stage._ + def digestCalculator(algorithm: String) = new PushPullStage[ByteString, ByteString] { + val digest = MessageDigest.getInstance(algorithm) + + override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = { + digest.update(chunk.toArray) + ctx.pull() + } + + override def onPull(ctx: Context[ByteString]): Directive = { + if (ctx.isFinishing) ctx.pushAndFinish(ByteString(digest.digest())) + else ctx.pull() + } + + override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = { + // If the stream is finished, we need to emit the last element in the onPull block. + // It is not allowed to directly emit elements from a termination block + // (onUpstreamFinish or onUpstreamFailure) + ctx.absorbTermination() + } + } + + val digest: Source[ByteString] = data.transform(() => digestCalculator("SHA-256")) + //#calculating-digest + + Await.result(digest.runWith(Sink.head), 3.seconds) should be( + ByteString( + 0x24, 0x8d, 0x6a, 0x61, + 0xd2, 0x06, 0x38, 0xb8, + 0xe5, 0xc0, 0x26, 0x93, + 0x0c, 0x3e, 0x60, 0x39, + 0xa3, 0x3c, 0xe4, 0x59, + 0x64, 0xff, 0x21, 0x67, + 0xf6, 0xec, 0xed, 0xd4, + 0x19, 0xdb, 0x06, 0xc1)) + + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala new file mode 100644 index 0000000000..deb74cd372 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeDroppyBroadcast.scala @@ -0,0 +1,58 @@ +package docs.stream.cookbook + +import akka.stream.OverflowStrategy +import akka.stream.scaladsl._ +import akka.stream.testkit.StreamTestKit.SubscriberProbe + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ + +class RecipeDroppyBroadcast extends RecipeSpec { + + "Recipe for a droppy broadcast" must { + "work" in { + val myElements = Source(immutable.Iterable.tabulate(100)(_ + 1)) + + val sub1 = SubscriberProbe[Int]() + val sub2 = SubscriberProbe[Int]() + val mySink1 = Sink(sub1) + val mySink2 = Sink(sub2) + val futureSink = Sink.head[Seq[Int]] + val mySink3 = Flow[Int].grouped(200).to(futureSink) + + //#droppy-bcast + // Makes a sink drop elements if too slow + def droppySink[T](sink: Sink[T], bufferSize: Int): Sink[T] = { + Flow[T].buffer(bufferSize, OverflowStrategy.dropHead).to(sink) + } + + import FlowGraphImplicits._ + val graph = FlowGraph { implicit builder => + val bcast = Broadcast[Int]("broadcast") + + myElements ~> bcast + + bcast ~> droppySink(mySink1, 10) + bcast ~> droppySink(mySink2, 10) + bcast ~> droppySink(mySink3, 10) + } + //#droppy-bcast + + Await.result(graph.run().get(futureSink), 3.seconds).sum should be(5050) + + sub1.expectSubscription().request(10) + sub2.expectSubscription().request(10) + + for (i <- 91 to 100) { + sub1.expectNext(i) + sub2.expectNext(i) + } + + sub1.expectComplete() + sub2.expectComplete() + + } + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala new file mode 100644 index 0000000000..642544a606 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeFlattenSeq.scala @@ -0,0 +1,28 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl.{ Sink, Source } + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ + +class RecipeFlattenSeq extends RecipeSpec { + + "Recipe for flatteing a stream of seqs" must { + + "work" in { + + val someDataSource = Source(List(List("1"), List("2"), List("3", "4", "5"), List("6", "7"))) + + //#flattening-seqs + val myData: Source[List[Message]] = someDataSource + val flattened: Source[Message] = myData.mapConcat(identity) + //#flattening-seqs + + Await.result(flattened.grouped(8).runWith(Sink.head), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7")) + + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeGlobalRateLimit.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeGlobalRateLimit.scala new file mode 100644 index 0000000000..7c94db308d --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeGlobalRateLimit.scala @@ -0,0 +1,132 @@ +package docs.stream.cookbook + +import akka.actor.{ Props, ActorRef, Actor } +import akka.actor.Actor.Receive +import akka.stream.scaladsl._ +import akka.stream.testkit.StreamTestKit.SubscriberProbe + +import scala.collection.immutable +import scala.concurrent.duration._ + +class RecipeGlobalRateLimit extends RecipeSpec { + + "Global rate limiting recipe" must { + + //#global-limiter-actor + object Limiter { + case object WantToPass + case object MayPass + + case object ReplenishTokens + + def props(maxAvailableTokens: Int, tokenRefreshPeriod: FiniteDuration, tokenRefreshAmount: Int): Props = + Props(new Limiter(maxAvailableTokens, tokenRefreshPeriod, tokenRefreshAmount)) + } + + class Limiter( + val maxAvailableTokens: Int, + val tokenRefreshPeriod: FiniteDuration, + val tokenRefreshAmount: Int) extends Actor { + import Limiter._ + import context.dispatcher + import akka.actor.Status + + private var waitQueue = immutable.Queue.empty[ActorRef] + private var permitTokens = maxAvailableTokens + private val replenishTimer = system.scheduler.schedule( + initialDelay = tokenRefreshPeriod, + interval = tokenRefreshPeriod, + receiver = self, + ReplenishTokens) + + override def receive: Receive = open + + val open: Receive = { + case ReplenishTokens => + permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens) + case WantToPass => + permitTokens -= 1 + sender() ! MayPass + if (permitTokens == 0) context.become(closed) + } + + val closed: Receive = { + case ReplenishTokens => + permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens) + releaseWaiting() + case WantToPass => + waitQueue = waitQueue.enqueue(sender()) + } + + private def releaseWaiting(): Unit = { + val (toBeReleased, remainingQueue) = waitQueue.splitAt(permitTokens) + waitQueue = remainingQueue + permitTokens -= toBeReleased.size + toBeReleased foreach (_ ! MayPass) + if (permitTokens > 0) context.become(open) + } + + override def postStop(): Unit = { + replenishTimer.cancel() + waitQueue foreach (_ ! Status.Failure(new IllegalStateException("limiter stopped"))) + } + } + //#global-limiter-actor + + "work" in { + + //#global-limiter-flow + def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T] = { + import akka.pattern.ask + import akka.util.Timeout + Flow[T].mapAsync { (element: T) => + import system.dispatcher + implicit val triggerTimeout = Timeout(maxAllowedWait) + val limiterTriggerFuture = limiter ? Limiter.WantToPass + limiterTriggerFuture.map((_) => element) + } + + } + //#global-limiter-flow + + // Use a large period and emulate the timer by hand instead + val limiter = system.actorOf(Limiter.props(2, 100.days, 1), "limiter") + + val source1 = Source(() => Iterator.continually("E1")).via(limitGlobal(limiter, 2.seconds)) + val source2 = Source(() => Iterator.continually("E2")).via(limitGlobal(limiter, 2.seconds)) + + val probe = SubscriberProbe[String]() + + FlowGraph { implicit b => + import FlowGraphImplicits._ + val merge = Merge[String] + source1 ~> merge ~> Sink(probe) + source2 ~> merge + }.run() + + probe.expectSubscription().request(1000) + + probe.expectNext() should startWith("E") + probe.expectNext() should startWith("E") + probe.expectNoMsg(500.millis) + + limiter ! Limiter.ReplenishTokens + probe.expectNext() should startWith("E") + probe.expectNoMsg(500.millis) + + var resultSet = Set.empty[String] + for (_ <- 1 to 100) { + limiter ! Limiter.ReplenishTokens + resultSet += probe.expectNext() + } + + resultSet.contains("E1") should be(true) + resultSet.contains("E2") should be(true) + + probe.expectError() + + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeHold.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeHold.scala new file mode 100644 index 0000000000..42aa4a0116 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeHold.scala @@ -0,0 +1,120 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe } + +import scala.concurrent.duration._ + +object HoldOps { + //#hold-version-1 + import akka.stream.stage._ + class HoldWithInitial[T](initial: T) extends DetachedStage[T, T] { + private var currentValue: T = initial + + override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = { + currentValue = elem + ctx.pull() + } + + override def onPull(ctx: DetachedContext[T]): DownstreamDirective = { + ctx.push(currentValue) + } + + } + //#hold-version-1 + + //#hold-version-2 + import akka.stream.stage._ + class HoldWithWait[T] extends DetachedStage[T, T] { + private var currentValue: T = _ + private var waitingFirstValue = true + + override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = { + currentValue = elem + waitingFirstValue = false + if (ctx.isHolding) ctx.pushAndPull(currentValue) + else ctx.pull() + } + + override def onPull(ctx: DetachedContext[T]): DownstreamDirective = { + if (waitingFirstValue) ctx.hold() + else ctx.push(currentValue) + } + + } + //#hold-version-2 +} + +class RecipeHold extends RecipeSpec { + import HoldOps._ + + "Recipe for creating a holding element" must { + + "work for version 1" in { + + val pub = PublisherProbe[Int]() + val sub = SubscriberProbe[Int]() + val source = Source(pub) + val sink = Sink(sub) + + source.transform(() => new HoldWithInitial(0)).to(sink).run() + + val manualSource = new StreamTestKit.AutoPublisher(pub) + + val subscription = sub.expectSubscription() + sub.expectNoMsg(100.millis) + + subscription.request(1) + sub.expectNext(0) + + subscription.request(1) + sub.expectNext(0) + + manualSource.sendNext(1) + manualSource.sendNext(2) + + subscription.request(2) + sub.expectNext(2) + sub.expectNext(2) + + manualSource.sendComplete() + subscription.request(1) + sub.expectComplete() + } + + "work for version 2" in { + + val pub = PublisherProbe[Int]() + val sub = SubscriberProbe[Int]() + val source = Source(pub) + val sink = Sink(sub) + + source.transform(() => new HoldWithWait).to(sink).run() + + val manualSource = new StreamTestKit.AutoPublisher(pub) + + val subscription = sub.expectSubscription() + sub.expectNoMsg(100.millis) + + subscription.request(1) + sub.expectNoMsg(100.millis) + + manualSource.sendNext(1) + sub.expectNext(1) + + manualSource.sendNext(2) + manualSource.sendNext(3) + + subscription.request(2) + sub.expectNext(3) + sub.expectNext(3) + + manualSource.sendComplete() + subscription.request(1) + sub.expectComplete() + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala new file mode 100644 index 0000000000..eaa9d163c4 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala @@ -0,0 +1,76 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl._ +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe } +import akka.util.ByteString + +class RecipeKeepAlive extends RecipeSpec { + + "Recipe for injecting keepalive messages" must { + + "work" in { + + type Tick = Unit + + val tickPub = PublisherProbe[Tick]() + val dataPub = PublisherProbe[ByteString]() + val sub = SubscriberProbe[ByteString]() + val ticks = Source(tickPub) + + val dataStream = Source(dataPub) + val keepaliveMessage = ByteString(11) + val sink = Sink(sub) + + //#inject-keepalive + val keepAliveStream: Source[ByteString] = ticks + .conflate(seed = (tick) => keepaliveMessage)((msg, newTick) => msg) + + import FlowGraphImplicits._ + val graph = FlowGraph { implicit builder => + val unfairMerge = MergePreferred[ByteString]("keepAliveInjector") + + dataStream ~> unfairMerge.preferred // If data is available then no keepalive is injected + keepAliveStream ~> unfairMerge + + unfairMerge ~> sink + } + //#inject-keepalive + + graph.run() + + val manualTicks = new StreamTestKit.AutoPublisher(tickPub) + val manualData = new StreamTestKit.AutoPublisher(dataPub) + + val subscription = sub.expectSubscription() + + manualTicks.sendNext(()) + + // pending data will overcome the keepalive + manualData.sendNext(ByteString(1)) + manualData.sendNext(ByteString(2)) + manualData.sendNext(ByteString(3)) + + subscription.request(1) + sub.expectNext(ByteString(1)) + subscription.request(2) + sub.expectNext(ByteString(2)) + sub.expectNext(ByteString(3)) + + subscription.request(1) + sub.expectNext(keepaliveMessage) + + subscription.request(1) + manualTicks.sendNext(()) + sub.expectNext(keepaliveMessage) + + manualData.sendComplete() + manualTicks.sendComplete() + + sub.expectComplete() + + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeLoggingElements.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeLoggingElements.scala new file mode 100644 index 0000000000..53a88cbb52 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeLoggingElements.scala @@ -0,0 +1,61 @@ +package docs.stream.cookbook + +import akka.event.Logging +import akka.stream.scaladsl.{ Sink, Source, Flow } +import akka.testkit.{ EventFilter, TestProbe } + +class RecipeLoggingElements extends RecipeSpec { + + "Simple logging recipe" must { + + "work with println" in { + val printProbe = TestProbe() + def println(s: String): Unit = printProbe.ref ! s + + val mySource = Source(List("1", "2", "3")) + + //#println-debug + val loggedSource = mySource.map { elem => println(elem); elem } + //#println-debug + + loggedSource.runWith(Sink.ignore) + printProbe.expectMsgAllOf("1", "2", "3") + } + + "work with PushStage" in { + val mySource = Source(List("1", "2", "3")) + + //#loggingadapter + import akka.stream.stage._ + class LoggingStage[T] extends PushStage[T, T] { + private val log = Logging(system, "loggingName") + + override def onPush(elem: T, ctx: Context[T]): Directive = { + log.debug("Element flowing through: {}", elem) + ctx.push(elem) + } + + override def onUpstreamFailure(cause: Throwable, + ctx: Context[T]): TerminationDirective = { + log.error(cause, "Upstream failed.") + super.onUpstreamFailure(cause, ctx) + } + + override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { + log.debug("Upstream finished") + super.onUpstreamFinish(ctx) + } + } + + val loggedSource = mySource.transform(() => new LoggingStage) + //#loggingadapter + + EventFilter.debug(start = "Element flowing").intercept { + loggedSource.runWith(Sink.ignore) + } + + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeManualTrigger.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeManualTrigger.scala new file mode 100644 index 0000000000..97e7972797 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeManualTrigger.scala @@ -0,0 +1,94 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl._ +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe } +import scala.concurrent.duration._ + +class RecipeManualTrigger extends RecipeSpec { + + "Recipe for triggering a stream manually" must { + + "work" in { + + val elements = Source(List("1", "2", "3", "4")) + val pub = PublisherProbe[Trigger]() + val sub = SubscriberProbe[Message]() + val triggerSource = Source(pub) + val sink = Sink(sub) + + //#manually-triggered-stream + import FlowGraphImplicits._ + val graph = FlowGraph { implicit builder => + val zip = Zip[Message, Trigger] + elements ~> zip.left + triggerSource ~> zip.right + zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) => msg } ~> sink + } + //#manually-triggered-stream + + graph.run() + val manualSource = new StreamTestKit.AutoPublisher(pub) + + sub.expectSubscription().request(1000) + sub.expectNoMsg(100.millis) + + manualSource.sendNext(()) + sub.expectNext("1") + sub.expectNoMsg(100.millis) + + manualSource.sendNext(()) + manualSource.sendNext(()) + sub.expectNext("2") + sub.expectNext("3") + sub.expectNoMsg(100.millis) + + manualSource.sendNext(()) + sub.expectNext("4") + sub.expectComplete() + } + + "work with ZipWith" in { + + val elements = Source(List("1", "2", "3", "4")) + val pub = PublisherProbe[Trigger]() + val sub = SubscriberProbe[Message]() + val triggerSource = Source(pub) + val sink = Sink(sub) + + //#manually-triggered-stream-zipwith + import FlowGraphImplicits._ + val graph = FlowGraph { implicit builder => + val zip = ZipWith[Message, Trigger, Message]( + (msg: Message, trigger: Trigger) => msg) + + elements ~> zip.left + triggerSource ~> zip.right + zip.out ~> sink + } + //#manually-triggered-stream-zipwith + + graph.run() + val manualSource = new StreamTestKit.AutoPublisher(pub) + + sub.expectSubscription().request(1000) + sub.expectNoMsg(100.millis) + + manualSource.sendNext(()) + sub.expectNext("1") + sub.expectNoMsg(100.millis) + + manualSource.sendNext(()) + manualSource.sendNext(()) + sub.expectNext("2") + sub.expectNext("3") + sub.expectNoMsg(100.millis) + + manualSource.sendNext(()) + sub.expectNext("4") + sub.expectComplete() + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMissedTicks.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMissedTicks.scala new file mode 100644 index 0000000000..7bd59227e1 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMissedTicks.scala @@ -0,0 +1,53 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe } + +import scala.concurrent.duration._ + +class RecipeMissedTicks extends RecipeSpec { + + "Recipe for collecting missed ticks" must { + + "work" in { + type Tick = Unit + + val pub = PublisherProbe[Tick]() + val sub = SubscriberProbe[Int]() + val tickStream = Source(pub) + val sink = Sink(sub) + + //#missed-ticks + // tickStream is a Source[Tick] + val missedTicks: Source[Int] = + tickStream.conflate(seed = (_) => 0)( + (missedTicks, tick) => missedTicks + 1) + //#missed-ticks + + missedTicks.to(sink).run() + val manualSource = new StreamTestKit.AutoPublisher(pub) + + manualSource.sendNext(()) + manualSource.sendNext(()) + manualSource.sendNext(()) + manualSource.sendNext(()) + + val subscription = sub.expectSubscription() + subscription.request(1) + sub.expectNext(3) + + subscription.request(1) + sub.expectNoMsg(100.millis) + + manualSource.sendNext(()) + sub.expectNext(0) + + manualSource.sendComplete() + subscription.request(1) + sub.expectComplete() + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala new file mode 100644 index 0000000000..0b710bbcd9 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeMultiGroupBy.scala @@ -0,0 +1,57 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl.{ Sink, Source } + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ + +class RecipeMultiGroupBy extends RecipeSpec { + + "Recipe for multi-groupBy" must { + + "work" in { + + case class Topic(name: String) + + val elems = Source(List("1: a", "1: b", "all: c", "all: d", "1: e")) + val topicMapper: (Message) => immutable.Seq[Topic] = { msg => + if (msg.startsWith("1")) List(Topic("1")) + else List(Topic("1"), Topic("2")) + } + + class X { + //#multi-groupby + val topicMapper: (Message) => immutable.Seq[Topic] = ??? + + //#multi-groupby + } + + //#multi-groupby + val messageAndTopic: Source[(Message, Topic)] = elems.mapConcat { msg: Message => + val topicsForMessage = topicMapper(msg) + // Create a (Msg, Topic) pair for each of the topics + // the message belongs to + topicsForMessage.map(msg -> _) + } + + val multiGroups: Source[(Topic, Source[String])] = messageAndTopic.groupBy(_._2).map { + case (topic, topicStream) => + // chopping of the topic from the (Message, Topic) pairs + (topic, topicStream.map(_._1)) + } + //#multi-groupby + + val result = multiGroups.map { + case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head) + }.mapAsync(identity).grouped(10).runWith(Sink.head) + + Await.result(result, 3.seconds).toSet should be(Set( + "1[1: a, 1: b, all: c, all: d, 1: e]", + "2[all: c, all: d]")) + + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala new file mode 100644 index 0000000000..77f8bea667 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeParseLines.scala @@ -0,0 +1,87 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl.{ Sink, Source } +import akka.util.ByteString + +import scala.annotation.tailrec +import scala.concurrent.Await +import scala.concurrent.duration._ + +class RecipeParseLines extends RecipeSpec { + + "Recipe for parsing line from bytes" must { + + "work" in { + val rawData = Source(List( + ByteString("Hello World"), + ByteString("\r"), + ByteString("!\r"), + ByteString("\nHello Akka!\r\nHello Streams!"), + ByteString("\r\n\r\n"))) + import akka.stream.stage._ + + //#parse-lines + def parseLines(separator: String, maximumLineBytes: Int) = + new StatefulStage[ByteString, String] { + private val separatorBytes = ByteString(separator) + private val firstSeparatorByte = separatorBytes.head + private var buffer = ByteString.empty + private var nextPossibleMatch = 0 + + def initial = new State { + override def onPush(chunk: ByteString, ctx: Context[String]): Directive = { + buffer ++= chunk + if (buffer.size > maximumLineBytes) + ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " + + s"which is more than $maximumLineBytes without seeing a line terminator")) + else emit(doParse(Vector.empty).iterator, ctx) + } + + @tailrec + private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = { + val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch) + if (possibleMatchPos == -1) { + // No matching character, we need to accumulate more bytes into the buffer + nextPossibleMatch = buffer.size + parsedLinesSoFar + } else { + if (possibleMatchPos + separatorBytes.size > buffer.size) { + // We have found a possible match (we found the first character of the terminator + // sequence) but we don't have yet enough bytes. We remember the position to + // retry from next time. + nextPossibleMatch = possibleMatchPos + parsedLinesSoFar + } else { + if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) + == separatorBytes) { + // Found a match + val parsedLine = buffer.slice(0, possibleMatchPos).utf8String + buffer = buffer.drop(possibleMatchPos + separatorBytes.size) + nextPossibleMatch -= possibleMatchPos + separatorBytes.size + doParse(parsedLinesSoFar :+ parsedLine) + } else { + nextPossibleMatch += 1 + doParse(parsedLinesSoFar) + } + } + } + + } + } + + } + + val linesStream = rawData.transform(() => parseLines("\r\n", 100)) + + //#parse-lines + + Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List( + "Hello World\r!", + "Hello Akka!", + "Hello Streams!", + "")) + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala new file mode 100644 index 0000000000..1b4d85cde1 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeReduceByKey.scala @@ -0,0 +1,78 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl._ + +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ + +class RecipeReduceByKey extends RecipeSpec { + + "Reduce by key recipe" must { + + "work with simple word count" in { + + def words = Source(List("hello", "world", "and", "hello", "universe", "akka") ++ List.fill(1000)("rocks!")) + + //#word-count + // split the words into separate streams first + val wordStreams: Source[(String, Source[String])] = words.groupBy(identity) + + // add counting logic to the streams + val countedWords: Source[Future[(String, Int)]] = wordStreams.map { + case (word, wordStream) => + wordStream.fold((word, 0)) { + case ((w, count), _) => (w, count + 1) + } + } + + // get a stream of word counts + val counts: Source[(String, Int)] = countedWords.mapAsync(identity) + //#word-count + + Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( + ("hello", 2), + ("world", 1), + ("and", 1), + ("universe", 1), + ("akka", 1), + ("rocks!", 1000))) + } + + "work generalized" in { + + def words = Source(List("hello", "world", "and", "hello", "universe", "akka") ++ List.fill(1000)("rocks!")) + + //#reduce-by-key-general + def reduceByKey[In, K, Out]( + groupKey: (In) => K, + foldZero: (K) => Out)(fold: (Out, In) => Out): Flow[In, (K, Out)] = { + + val groupStreams = Flow[In].groupBy(groupKey) + val reducedValues = groupStreams.map { + case (key, groupStream) => + groupStream.fold((key, foldZero(key))) { + case ((key, aggregated), elem) => (key, fold(aggregated, elem)) + } + } + + reducedValues.mapAsync(identity) + } + + val wordCounts = words.via(reduceByKey( + groupKey = (word: String) => word, + foldZero = (key: String) => 0)(fold = (count: Int, elem: String) => count + 1)) + + //#reduce-by-key-general + + Await.result(wordCounts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( + ("hello", 2), + ("world", 1), + ("and", 1), + ("universe", 1), + ("akka", 1), + ("rocks!", 1000))) + + } + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeSimpleDrop.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeSimpleDrop.scala new file mode 100644 index 0000000000..950fd5938a --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeSimpleDrop.scala @@ -0,0 +1,46 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl.{ Flow, Sink, Source } +import akka.stream.testkit.StreamTestKit +import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe } + +import scala.concurrent.duration._ + +class RecipeSimpleDrop extends RecipeSpec { + + "Recipe for simply dropping elements for a faster stream" must { + + "work" in { + + //#simple-drop + val droppyStream: Flow[Message, Message] = + Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => newMessage) + //#simple-drop + + val pub = PublisherProbe[Message]() + val sub = SubscriberProbe[Message]() + val messageSource = Source(pub) + val sink = Sink(sub) + + messageSource.via(droppyStream).to(sink).run() + + val manualSource = new StreamTestKit.AutoPublisher(pub) + + val subscription = sub.expectSubscription() + sub.expectNoMsg(100.millis) + + manualSource.sendNext("1") + manualSource.sendNext("2") + manualSource.sendNext("3") + + subscription.request(1) + sub.expectNext("3") + + manualSource.sendComplete() + subscription.request(1) + sub.expectComplete() + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeSpec.scala new file mode 100644 index 0000000000..4f0f7273be --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeSpec.scala @@ -0,0 +1,13 @@ +package docs.stream.cookbook + +import akka.stream.FlowMaterializer +import akka.stream.testkit.AkkaSpec + +trait RecipeSpec extends AkkaSpec { + + implicit val m = FlowMaterializer() + type Message = String + type Trigger = Unit + type Job = String + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeToStrict.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeToStrict.scala new file mode 100644 index 0000000000..a73c581fc3 --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeToStrict.scala @@ -0,0 +1,27 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl.{ Sink, Source } + +import scala.collection.immutable +import scala.concurrent.{ Await, Future } +import scala.concurrent.duration._ + +class RecipeToStrict extends RecipeSpec { + + "Recipe for draining a stream into a strict collection" must { + + "work" in { + val myData = Source(List("1", "2", "3")) + val MaxAllowedSeqSize = 100 + + //#draining-to-seq + val strict: Future[immutable.Seq[Message]] = + myData.grouped(MaxAllowedSeqSize).runWith(Sink.head) + //#draining-to-seq + + Await.result(strict, 3.seconds) should be(List("1", "2", "3")) + } + + } + +} diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala new file mode 100644 index 0000000000..91d178b54c --- /dev/null +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeWorkerPool.scala @@ -0,0 +1,53 @@ +package docs.stream.cookbook + +import akka.stream.scaladsl._ +import akka.testkit.TestProbe + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class RecipeWorkerPool extends RecipeSpec { + + "Recipe for a pool of workers" must { + + "work" in { + val myJobs = Source(List("1", "2", "3", "4", "5")) + type Result = String + + val worker = Flow[String].map(_ + " done") + + //#worker-pool + def balancer[In, Out](worker: Flow[In, Out], workerCount: Int): Flow[In, Out] = { + import FlowGraphImplicits._ + + Flow[In, Out]() { implicit graphBuilder => + val jobsIn = UndefinedSource[In] + val resultsOut = UndefinedSink[Out] + + val balancer = Balance[In](waitForAllDownstreams = true) + val merge = Merge[Out]("merge") + + jobsIn ~> balancer // Jobs are fed into the balancer + merge ~> resultsOut // the merged results are sent out + + for (_ <- 1 to workerCount) { + // for each worker, add an edge from the balancer to the worker, then wire + // it to the merge element + balancer ~> worker ~> merge + } + + (jobsIn, resultsOut) + } + } + + val processedJobs: Source[Result] = myJobs.via(balancer(worker, 3)) + //#worker-pool + + Await.result(processedJobs.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set( + "1 done", "2 done", "3 done", "4 done", "5 done")) + + } + + } + +} diff --git a/akka-docs-dev/rst/scala/cookbook.rst b/akka-docs-dev/rst/scala/cookbook.rst new file mode 100644 index 0000000000..ccaa28a5f1 --- /dev/null +++ b/akka-docs-dev/rst/scala/cookbook.rst @@ -0,0 +1,399 @@ +.. _stream-cookbook-scala + +################ +Streams Cookbook +################ + +Introduction +============ + +This is a collection of patterns to demonstrate various usage of the Akka Streams API by solving small targeted +problems in the format of "recipes". The purpose of this page is to give inspiration and ideas how to approach +various small tasks involving streams. The recipes in this page can be used directly as-is, but they are most powerful as +starting points: customization of the code snippets is warmly encouraged. + +This part also serves as supplementary material for the main body of documentation. It is a good idea to have this page +open while reading the manual and look for examples demonstrating various streaming concepts +as they appear in the main body of documentation. + +Working with Flows +================== + +In this collection we show simple recipes that involve linear flows. The recipes in this section are rather +general, more targeted recipes are available as separate sections ("Working with rate", "Working with IO"). + +Logging elements of a stream +---------------------------- + +**Situation:** During development it is sometimes helpful to see what happens in a particular section of a stream. + +The simplest solution is to simply use a ``map`` operation and use ``println`` to print the elements received to the console. +While this recipe is rather simplistic, it is often suitable for a quick debug session. + +.. includecode:: code/docs/stream/cookbook/RecipeLoggingElements.scala#println-debug + +If a proper logging solution is needed another approach is to create a :class:`PushStage` and override all upstream event +handlers, emitting log information through an Akka :class:`LoggingAdapter`. This small stage does not influence +the elements flowing in the stream, it just emits them unmodified by calling ``ctx.push(elem)`` in its ``onPush`` +event handler logic. + +.. includecode:: code/docs/stream/cookbook/RecipeLoggingElements.scala#loggingadapter + +Flattening a stream of sequences +-------------------------------- + +**Problem:** A stream is given as a stream of sequence of elements, but a stream of elements needed instead, streaming +all the nested elements inside the sequences separately. + +The ``mapConcat`` operation can be used to implement a one-to-many transformation of elements using a mapper function +in the form of ``In ⇒ immutable.Seq[Out]``. In this case we want to map a ``Seq`` of elements to the elements in the +collection itself, so we can just call ``mapConcat(identity)``. + +.. includecode:: code/docs/stream/cookbook/RecipeFlattenSeq.scala#flattening-seqs + +Draining a stream to a strict collection +---------------------------------------- + +**Situation:** A finite sequence of elements is given as a stream, but a scala collection is needed instead. + +In this recipe we will use the ``grouped`` stream operation that groups incoming elements into a stream of limited +size collections (it can be seen as the almost opposite version of the "Flattening a stream of sequences" recipe +we showed before). By using a ``grouped(MaxAllowedSeqSize).runWith(Sink.head)`` we first create a stream of groups +with maximum size of ``MaxAllowedSeqSize`` and then we take the first element of this stream. What we get is a +:class:`Future` containing a sequence with all the elements of the original up to ``MaxAllowedSeqSize`` size (further +elements are dropped). + +.. includecode:: code/docs/stream/cookbook/RecipeToStrict.scala#draining-to-seq + +Calculating the digest of a ByteString stream +--------------------------------------------- + +**Problem:** A stream of bytes is given as a stream of ``ByteStrings`` and we want to calculate the cryptographic digest +of the stream. + +This recipe uses a :class:`PushPullStage` to host a mutable :class:`MessageDigest` class (part of the Java Cryptography +API) and update it with the bytes arriving from the stream. When the stream starts, the ``onPull`` handler of the +stage is called, which just bubbles up the ``pull`` event to its upstream. As a response to this pull, a ByteString +chunk will arrive (``onPush``) which we use to update the digest, then it will pull for the next chunk. + +Eventually the stream of ``ByteStrings`` depletes and we get a notification about this event via ``onUpstreamFinish``. +At this point we want to emit the digest value, but we cannot do it in this handler directly. Instead we call +``ctx.absorbTermination`` signalling to our context that we do not yet want to finish. When the environment decides that +we can emit further elements ``onPull`` is called again, and we see ``ctx.isFinishing`` returning true (since the upstream +source has been depleted already). Since we only want to emit a final element it is enough to call ``ctx.pushAndFinish`` +passing the digest ByteString to be emitted. + +.. includecode:: code/docs/stream/cookbook/RecipeDigest.scala#calculating-digest + +Parsing lines from a stream of ByteStrings +------------------------------------------ + +**Problem:** A stream of bytes is given as a stream of ``ByteStrings`` containing lines terminated by line ending +characters (or, alternatively, containing binary frames delimited by a special delimiter byte sequence) which +needs to be parsed. + +We express our solution as a :class:`StatefulStage` because it has support for emitting multiple elements easily +through its ``emit(iterator, ctx)`` helper method. Since an incoming ByteString chunk might contain multiple lines (frames) +this feature comes in handy. + +To create the parser we only need to hook into the ``onPush`` handler. We maintain a buffer of bytes (expressed as +a :class:`ByteString`) by simply concatenating incoming chunks with it. Since we don't want to allow unbounded size +lines (records) we always check if the buffer size is larger than the allowed ``maximumLineBytes`` value, and terminate +the stream if this invariant is violated. + +After we updated the buffer, we try to find the terminator sequence as a subsequence of the current buffer. To be +efficient, we also maintain a pointer ``nextPossibleMatch`` into the buffer so that we only search that part of the +buffer where new matches are possible. + +The search for a match is done in two steps: first we try to search for the first character of the terminator sequence +in the buffer. If we find a match, we do a full subsequence check to see if we had a false positive or not. The parsing +logic is recursive to be able to parse multiple lines (records) contained in the decoding buffer. + +.. includecode:: code/docs/stream/cookbook/RecipeParseLines.scala#parse-lines + +Implementing reduce-by-key +-------------------------- + +**Situation:** Given a stream of elements, we want to calculate some aggregated value on different subgroups of the +elements. + +The "hello world" of reduce-by-key style operations is *wordcount* which we demonstrate below. Given a stream of words +we first create a new stream ``wordStreams`` that groups the words according to the ``identity`` function, i.e. now +we have a stream of streams, where every substream will serve identical words. + +To count the words, we need to process the stream of streams (the actual groups containing identical words). By mapping +over the groups and using ``fold`` (remember that ``fold`` automatically materializes and runs the stream it is used +on) we get a stream with elements of ``Future[String,Int]``. Now all we need is to flatten this stream, which +can be achieved by calling ``mapAsynch(identity)``. + +.. includecode:: code/docs/stream/cookbook/RecipeReduceByKey.scala#word-count + +By extracting the parts specific to *wordcount* into + +* a ``groupKey`` function that defines the groups +* a ``foldZero`` that defines the zero element used by the fold on the substream given the group key +* a ``fold`` function that does the actual reduction + +we get a generalized version below: + +.. includecode:: code/docs/stream/cookbook/RecipeReduceByKey.scala#reduce-by-key-general + +.. note:: + Please note that the reduce-by-key version we discussed above is sequential, in other words it is **NOT** a + parallelization pattern like mapReduce and similar frameworks. + +Sorting elements to multiple groups with groupBy +------------------------------------------------ + +**Situation:** The ``groupBy`` operation strictly partitions incoming elements, each element belongs to exactly one group. +Sometimes we want to map elements into multiple groups simultaneously. + +To achieve the desired result, we attack the problem in two steps: + +* first, using a function ``topicMapper`` that gives a list of topics (groups) a message belongs to, we transform our + stream of ``Message`` to a stream of ``(Message, Topic)`` where for each topic the message belongs to a separate pair + will be emitted. This is achieved by using ``mapConcat`` +* Then we take this new stream of message topic pairs (containing a separate pair for each topic a given message + belongs to) and feed it into groupBy, using the topic as the group key. + +.. includecode:: code/docs/stream/cookbook/RecipeMultiGroupBy.scala#multi-groupby + +Working with Graphs +=================== + +In this collection we show recipes that use stream graph elements to achieve various goals. + +Triggering the flow of elements programmatically +------------------------------------------------ + +**Situation:** Given a stream of elements we want to control the emission of those elements according to a trigger signal. +In other words, even if the stream would be able to flow (not being backpressured) we want to hold back elements until a +trigger signal arrives. + +This recipe solves the problem by simply zipping the stream of ``Message`` elments with the stream of ``Trigger`` +signals. Since ``Zip`` produces pairs, we simply map the output stream selecting the first element of the pair. + +.. includecode:: code/docs/stream/cookbook/RecipeManualTrigger.scala#manually-triggered-stream + +Alternatively, instead of using a ``Zip``, and then using ``map`` to get the first element of the pairs, we can avoid +creating the pairs in the first place by using ``ZipWith`` which takes a two argument function to produce the output +element. If this function would return a pair of the two argument it would be exactly the behavior of ``Zip`` so +``ZipWith`` is a generalization of zipping. + +.. includecode:: code/docs/stream/cookbook/RecipeManualTrigger.scala#manually-triggered-stream-zipwith + + +Balancing jobs to a fixed pool of workers +----------------------------------------- + +**Situation:** Given a stream of jobs and a worker process expressed as a :class:`Flow` create a pool of workers +that automatically balances incoming jobs to available workers, then merges the results. + +We will express our solution as a function that takes a worker flow and the number of workers to be allocated and gives +a flow that internally contains a pool of these workers. To achieve the desired result we will create a :class:`Flow` +from a graph. + +The graph consists of a ``Balance`` node which is a special fan-out operation that tries to route elements to available +downstream consumers. In a ``for`` loop we wire all of our desired workers as outputs of this balancer element, then +we wire the outputs of these workers to a ``Merge`` element that will collect the results from the workers. + +To convert the graph to a :class:`Flow` we need to define special graph nodes that will correspond to the input and +output ports of the resulting :class:`Flow`. This is achieved by defining a pair of undefined sink and source which +we return from the builder block. + +.. includecode:: code/docs/stream/cookbook/RecipeWorkerPool.scala#worker-pool + +Working with rate +================= + +This collection of recipes demonstrate various patterns where rate differences between upstream and downstream +needs to be handled by other strategies than simple backpressure. + +Dropping elements +----------------- + +**Situation:** Given a fast producer and a slow consumer, we want to drop elements if necessary to not slow down +the producer too much. + +This can be solved by using the most versatile rate-transforming operation, ``conflate``. Conflate can be thought as +a special ``fold`` operation that collapses multiple upstream elements into one aggregate element if needed to keep +the speed of the upstream unaffected by the downstream. + +When the upstream is faster, the fold process of the ``conflate`` starts. This folding needs a zero element, which +is given by a ``seed`` function that takes the current element and produces a zero for the folding process. In our +case this is ``identity`` so our folding state starts form the message itself. The folder function is also +special: given the aggregate value (the last message) and the new element (the freshest element) our aggregate state +becomes simply the freshest element. This choice of functions results in a simple dropping operation. + +.. includecode:: code/docs/stream/cookbook/RecipeSimpleDrop.scala#simple-drop + +Dropping broadcast +------------------ + +**Situation:** The default ``Broadcast`` graph element is properly backpressured, but that means that a slow downstream +consumer can hold back the other downstream consumers resulting in lowered throughput. In other words the rate of +``Broadcast`` is the rate of its slowest downstream consumer. In certain cases it is desirable to allow faster consumers +to progress independently of their slower siblings by dropping elements if necessary. + +One solution to this problem is to append a ``buffer`` element in front of all of the downstream consumers +defining a dropping strategy instead of the default ``Backpressure``. This allows small temporary rate differences +between the different consumers (the buffer smooths out small rate variances), but also allows faster consumers to +progress by dropping from the buffer of the slow consumers if necessary. + +.. includecode:: code/docs/stream/cookbook/RecipeDroppyBroadcast.scala#droppy-bcast + +Collecting missed ticks +----------------------- + +**Situation:** Given a regular (stream) source of ticks, instead of trying to backpressure the producer of the ticks +we want to keep a counter of the missed ticks instead and pass it down when possible. + +We will use ``conflate`` to solve the problem. Conflate takes two functions: + +* A seed function that produces the zero element for the folding process that happens when the upstream is faster than + the downstream. In our case the seed function is a constant function that returns 0 since there were no missed ticks + at that point. +* A fold function that is invoked when multiple upstream messages needs to be collapsed to an aggregate value due + to the insufficient processing rate of the downstream. Our folding function simply increments the currently stored + count of the missed ticks so far. + +As a result, we have a stream of ``Int`` where the number represents the missed ticks. A number 0 means that we were +able to consume the tick fast enough (i.e. zero means: 1 non-missed tick + 0 missed ticks) + +.. includecode:: code/docs/stream/cookbook/RecipeMissedTicks.scala#missed-ticks + +Create a stream processor that repeats the last element seen +------------------------------------------------------------ + +**Situation:** Given a producer and consumer, where the rate of neither is known in advance, we want to ensure that none +of them is slowing down the other by dropping earlier unconsumed elements from the upstream if necessary, and repeating +the last value for the downstream if necessary. + +We have two options to implement this feature. In both cases we will use :class:`DetachedStage` to build our custom +element (:class:`DetachedStage` is specifically designed for rate translating elements just like ``conflate``, +``expand`` or ``buffer``). In the first version we will use a provided initial value ``initial`` that will be used +to feed the downstream if no upstream element is ready yet. In the ``onPush()`` handler we just overwrite the +``currentValue`` variable and immediately relieve the upstream by calling ``pull()`` (remember, implementations of +:class:`DetachedStage` are not allowed to call ``push()`` as a response to ``onPush()`` or call ``pull()`` as a response +of ``onPull()``). The downstream ``onPull`` handler is very similar, we immediately relieve the downstream by +emitting ``currentValue``. + +.. includecode:: code/docs/stream/cookbook/RecipeHold.scala#hold-version-1 + +While it is relatively simple, the drawback of the first version is that it needs an arbitrary initial element which is not +always possible to provide. Hence, we create a second version where the downstream might need to wait in one single +case: if the very first element is not yet available. + +We introduce a boolean variable ``waitingFirstValue`` to denote whether the first element has been provided or not +(alternatively an :class:`Option` can be used for ``currentValue`` of if the element type is a subclass of AnyRef +a null can be used with the same purpose). In the downstream ``onPull()`` handler the difference from the previous +version is that we call ``hold()`` if the first element is not yet available and thus blocking our downstream. The +upstream ``onPush()`` handler sets ``waitingFirstValue`` to false, and after checking if ``hold()`` has been called it +either releaves the upstream producer, or both the upstream producer and downstream consumer by calling ``pushAndPull()`` + +.. includecode:: code/docs/stream/cookbook/RecipeHold.scala#hold-version-2 + +Globally limiting the rate of a set of streams +---------------------------------------------- + +**Situation:** Given a set of independent streams that we cannot merge, we want to globally limit the aggregate +throughput of the set of streams. + +One possible solution uses a shared actor as the global limiter combined with mapAsync to create a reusable +:class:`Flow` that can be plugged into a stream to limit its rate. + +As the first step we define an actor that will do the accounting for the global rate limit. The actor maintains +a timer, a counter for pending permit tokens and a queue for possibly waiting participants. The actor has +an ``open`` and ``closed`` state. The actor is in the ``open`` state while it has still pending permits. Whenever a +request for permit arrives as a ``WantToPass`` message to the actor the number of available permits is decremented +and we notify the sender that it can pass by answering with a ``MayPass`` message. If the amount of permits reaches +zero, the actor transitions to the ``closed`` state. In this state requests are not immediately answered, instead the reference +of the sender is added to a queue. Once the timer for replenishing the pending permits fires by sending a ``ReplenishTokens`` +message, we increment the pending permits counter and send a reply to each of the waiting senders. If there are more +waiting senders than permits available we will stay in the ``closed`` state. + +.. includecode:: code/docs/stream/cookbook/RecipeGlobalRateLimit.scala#global-limiter-actor + +To create a Flow that uses this global limiter actor we use the ``mapAsync`` function with the combination of the ``ask`` +pattern. We also define a timeout, so if a reply is not received during the configured maximum wait period the returned +future from ``ask`` will fail, which will fail the corresponding stream as well. + +.. includecode:: code/docs/stream/cookbook/RecipeGlobalRateLimit.scala#global-limiter-flow + +.. note:: + The global actor used for limiting introduces a global bottleneck. You might want to assign a dedicated dispatcher + for this actor. + +Working with IO +=============== + +Chunking up a stream of ByteStrings into limited size ByteStrings +----------------------------------------------------------------- + +**Situation:** Given a stream of ByteStrings we want to produce a stream of ByteStrings containing the same bytes in +the same sequence, but capping the size of ByteStrings. In other words we want to slice up ByteStrings into smaller +chunks if they exceed a size threshold. + +This can be achieved with a single :class:`PushPullStage`. The main logic of our stage is in ``emitChunkOrPull()`` +which implements the following logic: + +* if the buffer is empty, we pull for more bytes +* if the buffer is nonEmpty, we split it according to the ``chunkSize``. This will give a next chunk that we will emit, + and an empty or nonempty remaining buffer. + +Both ``onPush()`` and ``onPull()`` calls ``emitChunkOrPull()`` the only difference is that the push handler also stores +the incoming chunk by appending to the end of the buffer. + +.. includecode:: code/docs/stream/cookbook/RecipeByteStrings.scala#bytestring-chunker + +Limit the number of bytes passing through a stream of ByteStrings +----------------------------------------------------------------- + +**Situation:** Given a stream of ByteStrings we want to fail the stream if more than a given maximum of bytes has been +consumed. + +This recipe uses a :class:`PushStage` to implement the desired feature. In the only handler we override, +``onPush()`` we just update a counter and see if it gets larger than ``maximumBytes``. If a violation happens +we signal failure, otherwise we forward the chunk we have received. + +.. includecode:: code/docs/stream/cookbook/RecipeByteStrings.scala#bytes-limiter + +Compact ByteStrings in a stream of ByteStrings +---------------------------------------------- + +**Situation:** After a long stream of transformations, due to their immutable, structural sharing nature ByteStrings may +refer to multiple original ByteString instances unnecessarily retaining memory. As the final step of a transformation +chain we want to have clean copies that are no longer referencing the original ByteStrings. + +The recipe is a simple use of map, calling the ``compact()`` method of the :class:`ByteString` elements. This does +copying of the underlying arrays, so this should be the last element of a long chain if used. + +.. includecode:: code/docs/stream/cookbook/RecipeByteStrings.scala#compacting-bytestrings + +Injecting keep-alive messages into a stream of ByteStrings +---------------------------------------------------------- + +**Situation:** Given a communication channel expressed as a stream of ByteStrings we want to inject keep-alive messages +but only if this does not interfere with normal traffic. + +All this recipe needs is the ``MergePreferred`` element which is a version of a merge that is not fair. In other words, +whenever the merge can choose because multiple upstream producers have elements to produce it will always choose the +preferred upstream effectively giving it an absolute priority. + +.. includecode:: code/docs/stream/cookbook/RecipeKeepAlive.scala#inject-keepalive + + + + + + + + + + + + + + + +