+doc #15239: Stream cookbook

This commit is contained in:
Endre Sándor Varga 2014-12-08 17:29:40 +01:00
parent ef2835d60e
commit 2c01bed1a7
19 changed files with 1629 additions and 0 deletions

View file

@ -0,0 +1,94 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeByteStrings extends RecipeSpec {
"Recipes for bytestring streams" must {
"have a working chunker" in {
val rawBytes = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
val ChunkLimit = 2
//#bytestring-chunker
import akka.stream.stage._
class Chunker(val chunkSize: Int) extends PushPullStage[ByteString, ByteString] {
private var buffer = ByteString.empty
override def onPush(elem: ByteString, ctx: Context[ByteString]): Directive = {
buffer ++= elem
emitChunkOrPull(ctx)
}
override def onPull(ctx: Context[ByteString]): Directive = emitChunkOrPull(ctx)
private def emitChunkOrPull(ctx: Context[ByteString]): Directive = {
if (buffer.isEmpty) ctx.pull()
else {
val (emit, nextBuffer) = buffer.splitAt(chunkSize)
buffer = nextBuffer
ctx.push(emit)
}
}
}
val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit))
//#bytestring-chunker
val chunksFuture = chunksStream.grouped(10).runWith(Sink.head)
val chunks = Await.result(chunksFuture, 3.seconds)
chunks.forall(_.size <= 2) should be(true)
chunks.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
}
"have a working bytes limiter" in {
val SizeLimit = 9
//#bytes-limiter
import akka.stream.stage._
class ByteLimiter(val maximumBytes: Long) extends PushStage[ByteString, ByteString] {
private var count = 0
override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = {
count += chunk.size
if (count > maximumBytes) ctx.fail(new IllegalStateException("Too much bytes"))
else ctx.push(chunk)
}
}
val limiter = Flow[ByteString].transform(() => new ByteLimiter(SizeLimit))
//#bytes-limiter
val bytes1 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10)))
Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head), 3.seconds)
.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
an[IllegalStateException] must be thrownBy {
Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head), 3.seconds)
}
}
"demonstrate compacting" in {
val data = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
//#compacting-bytestrings
val compacted: Source[ByteString] = data.map(_.compact)
//#compacting-bytestrings
Await.result(compacted.grouped(10).runWith(Sink.head), 3.seconds).forall(_.isCompact) should be(true)
}
}
}

View file

@ -0,0 +1,91 @@
package docs.stream.cookbook
import akka.stream.{ MaterializerSettings, FlowMaterializer }
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeCollectingMetrics extends RecipeSpec {
import HoldOps._
implicit val m2 = FlowMaterializer(MaterializerSettings(system).withInputBuffer(1, 1))
"Recipe for periodically collecting metrics" must {
"work" in {
// type Tick = Unit
//
// val loadPub = PublisherProbe[Int]()
// val tickPub = PublisherProbe[Tick]()
// val reportTicks = Source(tickPub)
// val loadUpdates = Source(loadPub)
// val futureSink = Sink.head[immutable.Seq[String]]
// val sink = Flow[String].grouped(10).to(futureSink)
//
// //#periodic-metrics-collection
// val currentLoad = loadUpdates.transform(() => new HoldWithWait)
//
// val graph = FlowGraph { implicit builder =>
// import FlowGraphImplicits._
// val collector = ZipWith[Int, Tick, String](
// (load: Int, tick: Tick) => s"current load is $load")
//
// currentLoad ~> collector.left
// reportTicks ~> collector.right
//
// collector.out ~> sink
// }
// //#periodic-metrics-collection
//
// val reports = graph.run().get(futureSink)
// val manualLoad = new StreamTestKit.AutoPublisher(loadPub)
// val manualTick = new StreamTestKit.AutoPublisher(tickPub)
//
// // Prefetch elimination
// manualTick.sendNext(())
//
// manualLoad.sendNext(53)
// manualLoad.sendNext(61)
// manualTick.sendNext(())
//
// manualLoad.sendNext(44)
// manualLoad.sendNext(54)
// manualLoad.sendNext(78)
// Thread.sleep(500)
//
// manualTick.sendNext(())
//
// manualTick.sendComplete()
//
// Await.result(reports, 3.seconds) should be(List("current load is 53", "current load is 61", "current load is 78"))
// Periodically collect values of metrics expressed as stream of updates
// ---------------------------------------------------------------------
//
// **Situation:** Given performance counters expressed as a stream of updates we want to gather a periodic report of these.
// We do not want to backpressure the counter updates but always take the last value instead. Whenever we don't have a new counter
// value we want to repeat the last value.
//
// This recipe uses the :class:`HoldWithWait` recipe introduced previously. We use this element to gather updates from
// the counter stream and store the final value, and also repeat this final value if no update is received between
// metrics collection rounds.
//
// To finish the recipe, we simply use :class:`ZipWith` to trigger reading the latest value from the ``currentLoad``
// stream whenever a new ``Tick`` arrives on the stream of ticks, ``reportTicks``.
//
// .. includecode:: code/docs/stream/cookbook/RecipeCollectingMetrics.scala#periodic-metrics-collection
//
// .. warning::
// In order for this recipe to work the buffer size for the :class:`ZipWith` must be set to 1. The reason for this is
// explained in the "Buffering" section of the documentation.
// FIXME: This recipe does only work with buffer size of 0, which is only available if graph fusing is implemented
pending
}
}
}

View file

@ -0,0 +1,62 @@
package docs.stream.cookbook
import java.security.MessageDigest
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeDigest extends RecipeSpec {
"Recipe for calculating digest" must {
"work" in {
val data = Source(List(
ByteString("abcdbcdecdef"),
ByteString("defgefghfghighijhijkijkljklmklmnlmnomnopnopq")))
//#calculating-digest
import akka.stream.stage._
def digestCalculator(algorithm: String) = new PushPullStage[ByteString, ByteString] {
val digest = MessageDigest.getInstance(algorithm)
override def onPush(chunk: ByteString, ctx: Context[ByteString]): Directive = {
digest.update(chunk.toArray)
ctx.pull()
}
override def onPull(ctx: Context[ByteString]): Directive = {
if (ctx.isFinishing) ctx.pushAndFinish(ByteString(digest.digest()))
else ctx.pull()
}
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = {
// If the stream is finished, we need to emit the last element in the onPull block.
// It is not allowed to directly emit elements from a termination block
// (onUpstreamFinish or onUpstreamFailure)
ctx.absorbTermination()
}
}
val digest: Source[ByteString] = data.transform(() => digestCalculator("SHA-256"))
//#calculating-digest
Await.result(digest.runWith(Sink.head), 3.seconds) should be(
ByteString(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,
0xe5, 0xc0, 0x26, 0x93,
0x0c, 0x3e, 0x60, 0x39,
0xa3, 0x3c, 0xe4, 0x59,
0x64, 0xff, 0x21, 0x67,
0xf6, 0xec, 0xed, 0xd4,
0x19, 0xdb, 0x06, 0xc1))
}
}
}

View file

@ -0,0 +1,58 @@
package docs.stream.cookbook
import akka.stream.OverflowStrategy
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeDroppyBroadcast extends RecipeSpec {
"Recipe for a droppy broadcast" must {
"work" in {
val myElements = Source(immutable.Iterable.tabulate(100)(_ + 1))
val sub1 = SubscriberProbe[Int]()
val sub2 = SubscriberProbe[Int]()
val mySink1 = Sink(sub1)
val mySink2 = Sink(sub2)
val futureSink = Sink.head[Seq[Int]]
val mySink3 = Flow[Int].grouped(200).to(futureSink)
//#droppy-bcast
// Makes a sink drop elements if too slow
def droppySink[T](sink: Sink[T], bufferSize: Int): Sink[T] = {
Flow[T].buffer(bufferSize, OverflowStrategy.dropHead).to(sink)
}
import FlowGraphImplicits._
val graph = FlowGraph { implicit builder =>
val bcast = Broadcast[Int]("broadcast")
myElements ~> bcast
bcast ~> droppySink(mySink1, 10)
bcast ~> droppySink(mySink2, 10)
bcast ~> droppySink(mySink3, 10)
}
//#droppy-bcast
Await.result(graph.run().get(futureSink), 3.seconds).sum should be(5050)
sub1.expectSubscription().request(10)
sub2.expectSubscription().request(10)
for (i <- 91 to 100) {
sub1.expectNext(i)
sub2.expectNext(i)
}
sub1.expectComplete()
sub2.expectComplete()
}
}
}

View file

@ -0,0 +1,28 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeFlattenSeq extends RecipeSpec {
"Recipe for flatteing a stream of seqs" must {
"work" in {
val someDataSource = Source(List(List("1"), List("2"), List("3", "4", "5"), List("6", "7")))
//#flattening-seqs
val myData: Source[List[Message]] = someDataSource
val flattened: Source[Message] = myData.mapConcat(identity)
//#flattening-seqs
Await.result(flattened.grouped(8).runWith(Sink.head), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7"))
}
}
}

View file

@ -0,0 +1,132 @@
package docs.stream.cookbook
import akka.actor.{ Props, ActorRef, Actor }
import akka.actor.Actor.Receive
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import scala.collection.immutable
import scala.concurrent.duration._
class RecipeGlobalRateLimit extends RecipeSpec {
"Global rate limiting recipe" must {
//#global-limiter-actor
object Limiter {
case object WantToPass
case object MayPass
case object ReplenishTokens
def props(maxAvailableTokens: Int, tokenRefreshPeriod: FiniteDuration, tokenRefreshAmount: Int): Props =
Props(new Limiter(maxAvailableTokens, tokenRefreshPeriod, tokenRefreshAmount))
}
class Limiter(
val maxAvailableTokens: Int,
val tokenRefreshPeriod: FiniteDuration,
val tokenRefreshAmount: Int) extends Actor {
import Limiter._
import context.dispatcher
import akka.actor.Status
private var waitQueue = immutable.Queue.empty[ActorRef]
private var permitTokens = maxAvailableTokens
private val replenishTimer = system.scheduler.schedule(
initialDelay = tokenRefreshPeriod,
interval = tokenRefreshPeriod,
receiver = self,
ReplenishTokens)
override def receive: Receive = open
val open: Receive = {
case ReplenishTokens =>
permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens)
case WantToPass =>
permitTokens -= 1
sender() ! MayPass
if (permitTokens == 0) context.become(closed)
}
val closed: Receive = {
case ReplenishTokens =>
permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens)
releaseWaiting()
case WantToPass =>
waitQueue = waitQueue.enqueue(sender())
}
private def releaseWaiting(): Unit = {
val (toBeReleased, remainingQueue) = waitQueue.splitAt(permitTokens)
waitQueue = remainingQueue
permitTokens -= toBeReleased.size
toBeReleased foreach (_ ! MayPass)
if (permitTokens > 0) context.become(open)
}
override def postStop(): Unit = {
replenishTimer.cancel()
waitQueue foreach (_ ! Status.Failure(new IllegalStateException("limiter stopped")))
}
}
//#global-limiter-actor
"work" in {
//#global-limiter-flow
def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T] = {
import akka.pattern.ask
import akka.util.Timeout
Flow[T].mapAsync { (element: T) =>
import system.dispatcher
implicit val triggerTimeout = Timeout(maxAllowedWait)
val limiterTriggerFuture = limiter ? Limiter.WantToPass
limiterTriggerFuture.map((_) => element)
}
}
//#global-limiter-flow
// Use a large period and emulate the timer by hand instead
val limiter = system.actorOf(Limiter.props(2, 100.days, 1), "limiter")
val source1 = Source(() => Iterator.continually("E1")).via(limitGlobal(limiter, 2.seconds))
val source2 = Source(() => Iterator.continually("E2")).via(limitGlobal(limiter, 2.seconds))
val probe = SubscriberProbe[String]()
FlowGraph { implicit b =>
import FlowGraphImplicits._
val merge = Merge[String]
source1 ~> merge ~> Sink(probe)
source2 ~> merge
}.run()
probe.expectSubscription().request(1000)
probe.expectNext() should startWith("E")
probe.expectNext() should startWith("E")
probe.expectNoMsg(500.millis)
limiter ! Limiter.ReplenishTokens
probe.expectNext() should startWith("E")
probe.expectNoMsg(500.millis)
var resultSet = Set.empty[String]
for (_ <- 1 to 100) {
limiter ! Limiter.ReplenishTokens
resultSet += probe.expectNext()
}
resultSet.contains("E1") should be(true)
resultSet.contains("E2") should be(true)
probe.expectError()
}
}
}

View file

@ -0,0 +1,120 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import scala.concurrent.duration._
object HoldOps {
//#hold-version-1
import akka.stream.stage._
class HoldWithInitial[T](initial: T) extends DetachedStage[T, T] {
private var currentValue: T = initial
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
currentValue = elem
ctx.pull()
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
ctx.push(currentValue)
}
}
//#hold-version-1
//#hold-version-2
import akka.stream.stage._
class HoldWithWait[T] extends DetachedStage[T, T] {
private var currentValue: T = _
private var waitingFirstValue = true
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
currentValue = elem
waitingFirstValue = false
if (ctx.isHolding) ctx.pushAndPull(currentValue)
else ctx.pull()
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (waitingFirstValue) ctx.hold()
else ctx.push(currentValue)
}
}
//#hold-version-2
}
class RecipeHold extends RecipeSpec {
import HoldOps._
"Recipe for creating a holding element" must {
"work for version 1" in {
val pub = PublisherProbe[Int]()
val sub = SubscriberProbe[Int]()
val source = Source(pub)
val sink = Sink(sub)
source.transform(() => new HoldWithInitial(0)).to(sink).run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
subscription.request(1)
sub.expectNext(0)
subscription.request(1)
sub.expectNext(0)
manualSource.sendNext(1)
manualSource.sendNext(2)
subscription.request(2)
sub.expectNext(2)
sub.expectNext(2)
manualSource.sendComplete()
subscription.request(1)
sub.expectComplete()
}
"work for version 2" in {
val pub = PublisherProbe[Int]()
val sub = SubscriberProbe[Int]()
val source = Source(pub)
val sink = Sink(sub)
source.transform(() => new HoldWithWait).to(sink).run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
subscription.request(1)
sub.expectNoMsg(100.millis)
manualSource.sendNext(1)
sub.expectNext(1)
manualSource.sendNext(2)
manualSource.sendNext(3)
subscription.request(2)
sub.expectNext(3)
sub.expectNext(3)
manualSource.sendComplete()
subscription.request(1)
sub.expectComplete()
}
}
}

View file

@ -0,0 +1,76 @@
package docs.stream.cookbook
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import akka.util.ByteString
class RecipeKeepAlive extends RecipeSpec {
"Recipe for injecting keepalive messages" must {
"work" in {
type Tick = Unit
val tickPub = PublisherProbe[Tick]()
val dataPub = PublisherProbe[ByteString]()
val sub = SubscriberProbe[ByteString]()
val ticks = Source(tickPub)
val dataStream = Source(dataPub)
val keepaliveMessage = ByteString(11)
val sink = Sink(sub)
//#inject-keepalive
val keepAliveStream: Source[ByteString] = ticks
.conflate(seed = (tick) => keepaliveMessage)((msg, newTick) => msg)
import FlowGraphImplicits._
val graph = FlowGraph { implicit builder =>
val unfairMerge = MergePreferred[ByteString]("keepAliveInjector")
dataStream ~> unfairMerge.preferred // If data is available then no keepalive is injected
keepAliveStream ~> unfairMerge
unfairMerge ~> sink
}
//#inject-keepalive
graph.run()
val manualTicks = new StreamTestKit.AutoPublisher(tickPub)
val manualData = new StreamTestKit.AutoPublisher(dataPub)
val subscription = sub.expectSubscription()
manualTicks.sendNext(())
// pending data will overcome the keepalive
manualData.sendNext(ByteString(1))
manualData.sendNext(ByteString(2))
manualData.sendNext(ByteString(3))
subscription.request(1)
sub.expectNext(ByteString(1))
subscription.request(2)
sub.expectNext(ByteString(2))
sub.expectNext(ByteString(3))
subscription.request(1)
sub.expectNext(keepaliveMessage)
subscription.request(1)
manualTicks.sendNext(())
sub.expectNext(keepaliveMessage)
manualData.sendComplete()
manualTicks.sendComplete()
sub.expectComplete()
}
}
}

View file

@ -0,0 +1,61 @@
package docs.stream.cookbook
import akka.event.Logging
import akka.stream.scaladsl.{ Sink, Source, Flow }
import akka.testkit.{ EventFilter, TestProbe }
class RecipeLoggingElements extends RecipeSpec {
"Simple logging recipe" must {
"work with println" in {
val printProbe = TestProbe()
def println(s: String): Unit = printProbe.ref ! s
val mySource = Source(List("1", "2", "3"))
//#println-debug
val loggedSource = mySource.map { elem => println(elem); elem }
//#println-debug
loggedSource.runWith(Sink.ignore)
printProbe.expectMsgAllOf("1", "2", "3")
}
"work with PushStage" in {
val mySource = Source(List("1", "2", "3"))
//#loggingadapter
import akka.stream.stage._
class LoggingStage[T] extends PushStage[T, T] {
private val log = Logging(system, "loggingName")
override def onPush(elem: T, ctx: Context[T]): Directive = {
log.debug("Element flowing through: {}", elem)
ctx.push(elem)
}
override def onUpstreamFailure(cause: Throwable,
ctx: Context[T]): TerminationDirective = {
log.error(cause, "Upstream failed.")
super.onUpstreamFailure(cause, ctx)
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
log.debug("Upstream finished")
super.onUpstreamFinish(ctx)
}
}
val loggedSource = mySource.transform(() => new LoggingStage)
//#loggingadapter
EventFilter.debug(start = "Element flowing").intercept {
loggedSource.runWith(Sink.ignore)
}
}
}
}

View file

@ -0,0 +1,94 @@
package docs.stream.cookbook
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import scala.concurrent.duration._
class RecipeManualTrigger extends RecipeSpec {
"Recipe for triggering a stream manually" must {
"work" in {
val elements = Source(List("1", "2", "3", "4"))
val pub = PublisherProbe[Trigger]()
val sub = SubscriberProbe[Message]()
val triggerSource = Source(pub)
val sink = Sink(sub)
//#manually-triggered-stream
import FlowGraphImplicits._
val graph = FlowGraph { implicit builder =>
val zip = Zip[Message, Trigger]
elements ~> zip.left
triggerSource ~> zip.right
zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) => msg } ~> sink
}
//#manually-triggered-stream
graph.run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
sub.expectSubscription().request(1000)
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
sub.expectNext("1")
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
manualSource.sendNext(())
sub.expectNext("2")
sub.expectNext("3")
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
sub.expectNext("4")
sub.expectComplete()
}
"work with ZipWith" in {
val elements = Source(List("1", "2", "3", "4"))
val pub = PublisherProbe[Trigger]()
val sub = SubscriberProbe[Message]()
val triggerSource = Source(pub)
val sink = Sink(sub)
//#manually-triggered-stream-zipwith
import FlowGraphImplicits._
val graph = FlowGraph { implicit builder =>
val zip = ZipWith[Message, Trigger, Message](
(msg: Message, trigger: Trigger) => msg)
elements ~> zip.left
triggerSource ~> zip.right
zip.out ~> sink
}
//#manually-triggered-stream-zipwith
graph.run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
sub.expectSubscription().request(1000)
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
sub.expectNext("1")
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
manualSource.sendNext(())
sub.expectNext("2")
sub.expectNext("3")
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
sub.expectNext("4")
sub.expectComplete()
}
}
}

View file

@ -0,0 +1,53 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import scala.concurrent.duration._
class RecipeMissedTicks extends RecipeSpec {
"Recipe for collecting missed ticks" must {
"work" in {
type Tick = Unit
val pub = PublisherProbe[Tick]()
val sub = SubscriberProbe[Int]()
val tickStream = Source(pub)
val sink = Sink(sub)
//#missed-ticks
// tickStream is a Source[Tick]
val missedTicks: Source[Int] =
tickStream.conflate(seed = (_) => 0)(
(missedTicks, tick) => missedTicks + 1)
//#missed-ticks
missedTicks.to(sink).run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
manualSource.sendNext(())
manualSource.sendNext(())
manualSource.sendNext(())
manualSource.sendNext(())
val subscription = sub.expectSubscription()
subscription.request(1)
sub.expectNext(3)
subscription.request(1)
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
sub.expectNext(0)
manualSource.sendComplete()
subscription.request(1)
sub.expectComplete()
}
}
}

View file

@ -0,0 +1,57 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeMultiGroupBy extends RecipeSpec {
"Recipe for multi-groupBy" must {
"work" in {
case class Topic(name: String)
val elems = Source(List("1: a", "1: b", "all: c", "all: d", "1: e"))
val topicMapper: (Message) => immutable.Seq[Topic] = { msg =>
if (msg.startsWith("1")) List(Topic("1"))
else List(Topic("1"), Topic("2"))
}
class X {
//#multi-groupby
val topicMapper: (Message) => immutable.Seq[Topic] = ???
//#multi-groupby
}
//#multi-groupby
val messageAndTopic: Source[(Message, Topic)] = elems.mapConcat { msg: Message =>
val topicsForMessage = topicMapper(msg)
// Create a (Msg, Topic) pair for each of the topics
// the message belongs to
topicsForMessage.map(msg -> _)
}
val multiGroups: Source[(Topic, Source[String])] = messageAndTopic.groupBy(_._2).map {
case (topic, topicStream) =>
// chopping of the topic from the (Message, Topic) pairs
(topic, topicStream.map(_._1))
}
//#multi-groupby
val result = multiGroups.map {
case (topic, topicMessages) => topicMessages.grouped(10).map(topic.name + _.mkString("[", ", ", "]")).runWith(Sink.head)
}.mapAsync(identity).grouped(10).runWith(Sink.head)
Await.result(result, 3.seconds).toSet should be(Set(
"1[1: a, 1: b, all: c, all: d, 1: e]",
"2[all: c, all: d]"))
}
}
}

View file

@ -0,0 +1,87 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeParseLines extends RecipeSpec {
"Recipe for parsing line from bytes" must {
"work" in {
val rawData = Source(List(
ByteString("Hello World"),
ByteString("\r"),
ByteString("!\r"),
ByteString("\nHello Akka!\r\nHello Streams!"),
ByteString("\r\n\r\n")))
import akka.stream.stage._
//#parse-lines
def parseLines(separator: String, maximumLineBytes: Int) =
new StatefulStage[ByteString, String] {
private val separatorBytes = ByteString(separator)
private val firstSeparatorByte = separatorBytes.head
private var buffer = ByteString.empty
private var nextPossibleMatch = 0
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[String]): Directive = {
buffer ++= chunk
if (buffer.size > maximumLineBytes)
ctx.fail(new IllegalStateException(s"Read ${buffer.size} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else emit(doParse(Vector.empty).iterator, ctx)
}
@tailrec
private def doParse(parsedLinesSoFar: Vector[String]): Vector[String] = {
val possibleMatchPos = buffer.indexOf(firstSeparatorByte, from = nextPossibleMatch)
if (possibleMatchPos == -1) {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.size
parsedLinesSoFar
} else {
if (possibleMatchPos + separatorBytes.size > buffer.size) {
// We have found a possible match (we found the first character of the terminator
// sequence) but we don't have yet enough bytes. We remember the position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
parsedLinesSoFar
} else {
if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size)
== separatorBytes) {
// Found a match
val parsedLine = buffer.slice(0, possibleMatchPos).utf8String
buffer = buffer.drop(possibleMatchPos + separatorBytes.size)
nextPossibleMatch -= possibleMatchPos + separatorBytes.size
doParse(parsedLinesSoFar :+ parsedLine)
} else {
nextPossibleMatch += 1
doParse(parsedLinesSoFar)
}
}
}
}
}
}
val linesStream = rawData.transform(() => parseLines("\r\n", 100))
//#parse-lines
Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List(
"Hello World\r!",
"Hello Akka!",
"Hello Streams!",
""))
}
}
}

View file

@ -0,0 +1,78 @@
package docs.stream.cookbook
import akka.stream.scaladsl._
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
class RecipeReduceByKey extends RecipeSpec {
"Reduce by key recipe" must {
"work with simple word count" in {
def words = Source(List("hello", "world", "and", "hello", "universe", "akka") ++ List.fill(1000)("rocks!"))
//#word-count
// split the words into separate streams first
val wordStreams: Source[(String, Source[String])] = words.groupBy(identity)
// add counting logic to the streams
val countedWords: Source[Future[(String, Int)]] = wordStreams.map {
case (word, wordStream) =>
wordStream.fold((word, 0)) {
case ((w, count), _) => (w, count + 1)
}
}
// get a stream of word counts
val counts: Source[(String, Int)] = countedWords.mapAsync(identity)
//#word-count
Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
("hello", 2),
("world", 1),
("and", 1),
("universe", 1),
("akka", 1),
("rocks!", 1000)))
}
"work generalized" in {
def words = Source(List("hello", "world", "and", "hello", "universe", "akka") ++ List.fill(1000)("rocks!"))
//#reduce-by-key-general
def reduceByKey[In, K, Out](
groupKey: (In) => K,
foldZero: (K) => Out)(fold: (Out, In) => Out): Flow[In, (K, Out)] = {
val groupStreams = Flow[In].groupBy(groupKey)
val reducedValues = groupStreams.map {
case (key, groupStream) =>
groupStream.fold((key, foldZero(key))) {
case ((key, aggregated), elem) => (key, fold(aggregated, elem))
}
}
reducedValues.mapAsync(identity)
}
val wordCounts = words.via(reduceByKey(
groupKey = (word: String) => word,
foldZero = (key: String) => 0)(fold = (count: Int, elem: String) => count + 1))
//#reduce-by-key-general
Await.result(wordCounts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
("hello", 2),
("world", 1),
("and", 1),
("universe", 1),
("akka", 1),
("rocks!", 1000)))
}
}
}

View file

@ -0,0 +1,46 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import scala.concurrent.duration._
class RecipeSimpleDrop extends RecipeSpec {
"Recipe for simply dropping elements for a faster stream" must {
"work" in {
//#simple-drop
val droppyStream: Flow[Message, Message] =
Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => newMessage)
//#simple-drop
val pub = PublisherProbe[Message]()
val sub = SubscriberProbe[Message]()
val messageSource = Source(pub)
val sink = Sink(sub)
messageSource.via(droppyStream).to(sink).run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
manualSource.sendNext("1")
manualSource.sendNext("2")
manualSource.sendNext("3")
subscription.request(1)
sub.expectNext("3")
manualSource.sendComplete()
subscription.request(1)
sub.expectComplete()
}
}
}

View file

@ -0,0 +1,13 @@
package docs.stream.cookbook
import akka.stream.FlowMaterializer
import akka.stream.testkit.AkkaSpec
trait RecipeSpec extends AkkaSpec {
implicit val m = FlowMaterializer()
type Message = String
type Trigger = Unit
type Job = String
}

View file

@ -0,0 +1,27 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import scala.collection.immutable
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
class RecipeToStrict extends RecipeSpec {
"Recipe for draining a stream into a strict collection" must {
"work" in {
val myData = Source(List("1", "2", "3"))
val MaxAllowedSeqSize = 100
//#draining-to-seq
val strict: Future[immutable.Seq[Message]] =
myData.grouped(MaxAllowedSeqSize).runWith(Sink.head)
//#draining-to-seq
Await.result(strict, 3.seconds) should be(List("1", "2", "3"))
}
}
}

View file

@ -0,0 +1,53 @@
package docs.stream.cookbook
import akka.stream.scaladsl._
import akka.testkit.TestProbe
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeWorkerPool extends RecipeSpec {
"Recipe for a pool of workers" must {
"work" in {
val myJobs = Source(List("1", "2", "3", "4", "5"))
type Result = String
val worker = Flow[String].map(_ + " done")
//#worker-pool
def balancer[In, Out](worker: Flow[In, Out], workerCount: Int): Flow[In, Out] = {
import FlowGraphImplicits._
Flow[In, Out]() { implicit graphBuilder =>
val jobsIn = UndefinedSource[In]
val resultsOut = UndefinedSink[Out]
val balancer = Balance[In](waitForAllDownstreams = true)
val merge = Merge[Out]("merge")
jobsIn ~> balancer // Jobs are fed into the balancer
merge ~> resultsOut // the merged results are sent out
for (_ <- 1 to workerCount) {
// for each worker, add an edge from the balancer to the worker, then wire
// it to the merge element
balancer ~> worker ~> merge
}
(jobsIn, resultsOut)
}
}
val processedJobs: Source[Result] = myJobs.via(balancer(worker, 3))
//#worker-pool
Await.result(processedJobs.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
"1 done", "2 done", "3 done", "4 done", "5 done"))
}
}
}