+doc #19429 initial merge of docs-dev and docs

This commit is contained in:
Konrad Malawski 2016-01-13 16:25:24 +01:00
parent be0c8af4c0
commit 5a18d43435
501 changed files with 9876 additions and 3681 deletions

View file

@ -0,0 +1,96 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import scala.annotation.tailrec
import akka.actor.Props
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorPublisher
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.AkkaSpec
object ActorPublisherDocSpec {
//#job-manager
object JobManager {
def props: Props = Props[JobManager]
final case class Job(payload: String)
case object JobAccepted
case object JobDenied
}
class JobManager extends ActorPublisher[JobManager.Job] {
import akka.stream.actor.ActorPublisherMessage._
import JobManager._
val MaxBufferSize = 100
var buf = Vector.empty[Job]
def receive = {
case job: Job if buf.size == MaxBufferSize =>
sender() ! JobDenied
case job: Job =>
sender() ! JobAccepted
if (buf.isEmpty && totalDemand > 0)
onNext(job)
else {
buf :+= job
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
/*
* totalDemand is a Long and could be larger than
* what buf.splitAt can accept
*/
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
//#job-manager
}
class ActorPublisherDocSpec extends AkkaSpec {
import ActorPublisherDocSpec._
implicit val materializer = ActorMaterializer()
"illustrate usage of ActorPublisher" in {
def println(s: String): Unit =
testActor ! s
//#actor-publisher-usage
val jobManagerSource = Source.actorPublisher[JobManager.Job](JobManager.props)
val ref = Flow[JobManager.Job]
.map(_.payload.toUpperCase)
.map { elem => println(elem); elem }
.to(Sink.ignore)
.runWith(jobManagerSource)
ref ! JobManager.Job("a")
ref ! JobManager.Job("b")
ref ! JobManager.Job("c")
//#actor-publisher-usage
expectMsg("A")
expectMsg("B")
expectMsg("C")
}
}

View file

@ -0,0 +1,89 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.routing.ActorRefRoutee
import akka.routing.RoundRobinRoutingLogic
import akka.routing.Router
import akka.stream.ActorMaterializer
import akka.stream.actor.ActorSubscriber
import akka.stream.actor.ActorSubscriberMessage
import akka.stream.actor.MaxInFlightRequestStrategy
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.stream.testkit.AkkaSpec
object ActorSubscriberDocSpec {
//#worker-pool
object WorkerPool {
case class Msg(id: Int, replyTo: ActorRef)
case class Work(id: Int)
case class Reply(id: Int)
case class Done(id: Int)
def props: Props = Props(new WorkerPool)
}
class WorkerPool extends ActorSubscriber {
import WorkerPool._
import ActorSubscriberMessage._
val MaxQueueSize = 10
var queue = Map.empty[Int, ActorRef]
val router = {
val routees = Vector.fill(3) {
ActorRefRoutee(context.actorOf(Props[Worker]))
}
Router(RoundRobinRoutingLogic(), routees)
}
override val requestStrategy = new MaxInFlightRequestStrategy(max = MaxQueueSize) {
override def inFlightInternally: Int = queue.size
}
def receive = {
case OnNext(Msg(id, replyTo)) =>
queue += (id -> replyTo)
assert(queue.size <= MaxQueueSize, s"queued too many: ${queue.size}")
router.route(Work(id), self)
case Reply(id) =>
queue(id) ! Done(id)
queue -= id
}
}
class Worker extends Actor {
import WorkerPool._
def receive = {
case Work(id) =>
// ...
sender() ! Reply(id)
}
}
//#worker-pool
}
class ActorSubscriberDocSpec extends AkkaSpec {
import ActorSubscriberDocSpec._
implicit val materializer = ActorMaterializer()
"illustrate usage of ActorSubscriber" in {
val replyTo = testActor
//#actor-subscriber-usage
val N = 117
Source(1 to N).map(WorkerPool.Msg(_, replyTo))
.runWith(Sink.actorSubscriber(WorkerPool.props))
//#actor-subscriber-usage
receiveN(N).toSet should be((1 to N).map(WorkerPool.Done).toSet)
}
}

View file

@ -0,0 +1,176 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl._
import akka.stream._
import akka.util.ByteString
import java.nio.ByteOrder
import akka.stream.stage._
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.concurrent.Await
import org.scalactic.ConversionCheckedTripleEquals
object BidiFlowDocSpec {
//#codec
trait Message
case class Ping(id: Int) extends Message
case class Pong(id: Int) extends Message
//#codec-impl
def toBytes(msg: Message): ByteString = {
//#implementation-details-elided
implicit val order = ByteOrder.LITTLE_ENDIAN
msg match {
case Ping(id) => ByteString.newBuilder.putByte(1).putInt(id).result()
case Pong(id) => ByteString.newBuilder.putByte(2).putInt(id).result()
}
//#implementation-details-elided
}
def fromBytes(bytes: ByteString): Message = {
//#implementation-details-elided
implicit val order = ByteOrder.LITTLE_ENDIAN
val it = bytes.iterator
it.getByte match {
case 1 => Ping(it.getInt)
case 2 => Pong(it.getInt)
case other => throw new RuntimeException(s"parse error: expected 1|2 got $other")
}
//#implementation-details-elided
}
//#codec-impl
val codecVerbose = BidiFlow.fromGraph(GraphDSL.create() { b =>
// construct and add the top flow, going outbound
val outbound = b.add(Flow[Message].map(toBytes))
// construct and add the bottom flow, going inbound
val inbound = b.add(Flow[ByteString].map(fromBytes))
// fuse them together into a BidiShape
BidiShape.fromFlows(outbound, inbound)
})
// this is the same as the above
val codec = BidiFlow.fromFunctions(toBytes _, fromBytes _)
//#codec
//#framing
val framing = BidiFlow.fromGraph(GraphDSL.create() { b =>
implicit val order = ByteOrder.LITTLE_ENDIAN
def addLengthHeader(bytes: ByteString) = {
val len = bytes.length
ByteString.newBuilder.putInt(len).append(bytes).result()
}
class FrameParser extends PushPullStage[ByteString, ByteString] {
// this holds the received but not yet parsed bytes
var stash = ByteString.empty
// this holds the current message length or -1 if at a boundary
var needed = -1
override def onPush(bytes: ByteString, ctx: Context[ByteString]) = {
stash ++= bytes
run(ctx)
}
override def onPull(ctx: Context[ByteString]) = run(ctx)
override def onUpstreamFinish(ctx: Context[ByteString]) =
if (stash.isEmpty) ctx.finish()
else ctx.absorbTermination() // we still have bytes to emit
private def run(ctx: Context[ByteString]): SyncDirective =
if (needed == -1) {
// are we at a boundary? then figure out next length
if (stash.length < 4) pullOrFinish(ctx)
else {
needed = stash.iterator.getInt
stash = stash.drop(4)
run(ctx) // cycle back to possibly already emit the next chunk
}
} else if (stash.length < needed) {
// we are in the middle of a message, need more bytes
pullOrFinish(ctx)
} else {
// we have enough to emit at least one message, so do it
val emit = stash.take(needed)
stash = stash.drop(needed)
needed = -1
ctx.push(emit)
}
/*
* After having called absorbTermination() we cannot pull any more, so if we need
* more data we will just have to give up.
*/
private def pullOrFinish(ctx: Context[ByteString]) =
if (ctx.isFinishing) ctx.finish()
else ctx.pull()
}
val outbound = b.add(Flow[ByteString].map(addLengthHeader))
val inbound = b.add(Flow[ByteString].transform(() => new FrameParser))
BidiShape.fromFlows(outbound, inbound)
})
//#framing
val chopUp = BidiFlow.fromGraph(GraphDSL.create() { b =>
val f = Flow[ByteString].mapConcat(_.map(ByteString(_)))
BidiShape.fromFlows(b.add(f), b.add(f))
})
val accumulate = BidiFlow.fromGraph(GraphDSL.create() { b =>
val f = Flow[ByteString].grouped(1000).map(_.fold(ByteString.empty)(_ ++ _))
BidiShape.fromFlows(b.add(f), b.add(f))
})
}
class BidiFlowDocSpec extends AkkaSpec with ConversionCheckedTripleEquals {
import BidiFlowDocSpec._
implicit val materializer = ActorMaterializer()
"A BidiFlow" must {
"compose" in {
//#compose
/* construct protocol stack
* +------------------------------------+
* | stack |
* | |
* | +-------+ +---------+ |
* ~> O~~o | ~> | o~~O ~>
* Message | | codec | ByteString | framing | | ByteString
* <~ O~~o | <~ | o~~O <~
* | +-------+ +---------+ |
* +------------------------------------+
*/
val stack = codec.atop(framing)
// test it by plugging it into its own inverse and closing the right end
val pingpong = Flow[Message].collect { case Ping(id) => Pong(id) }
val flow = stack.atop(stack.reversed).join(pingpong)
val result = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head)
Await.result(result, 1.second) should ===((0 to 9).map(Pong))
//#compose
}
"work when chopped up" in {
val stack = codec.atop(framing)
val flow = stack.atop(chopUp).atop(stack.reversed).join(Flow[Message].map { case Ping(id) => Pong(id) })
val f = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head)
Await.result(f, 1.second) should ===((0 to 9).map(Pong))
}
"work when accumulated" in {
val stack = codec.atop(framing)
val flow = stack.atop(accumulate).atop(stack.reversed).join(Flow[Message].map { case Ping(id) => Pong(id) })
val f = Source((0 to 9).map(Ping)).via(flow).grouped(20).runWith(Sink.head)
Await.result(f, 1.second) should ===((0 to 9).map(Pong))
}
}
}

View file

@ -0,0 +1,249 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream._
import akka.stream.scaladsl.Tcp.OutgoingConnection
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import akka.util.ByteString
import scala.concurrent.{ Future, Promise }
class CompositionDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
"nonnested flow" in {
//#non-nested-flow
Source.single(0)
.map(_ + 1)
.filter(_ != 0)
.map(_ - 2)
.to(Sink.fold(0)(_ + _))
// ... where is the nesting?
//#non-nested-flow
}
"nested flow" in {
//#nested-flow
val nestedSource =
Source.single(0) // An atomic source
.map(_ + 1) // an atomic processing stage
.named("nestedSource") // wraps up the current Source and gives it a name
val nestedFlow =
Flow[Int].filter(_ != 0) // an atomic processing stage
.map(_ - 2) // another atomic processing stage
.named("nestedFlow") // wraps up the Flow, and gives it a name
val nestedSink =
nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
.named("nestedSink") // wrap it up
// Create a RunnableGraph
val runnableGraph = nestedSource.to(nestedSink)
//#nested-flow
}
"reusing components" in {
val nestedSource =
Source.single(0) // An atomic source
.map(_ + 1) // an atomic processing stage
.named("nestedSource") // wraps up the current Source and gives it a name
val nestedFlow =
Flow[Int].filter(_ != 0) // an atomic processing stage
.map(_ - 2) // another atomic processing stage
.named("nestedFlow") // wraps up the Flow, and gives it a name
val nestedSink =
nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
.named("nestedSink") // wrap it up
//#reuse
// Create a RunnableGraph from our components
val runnableGraph = nestedSource.to(nestedSink)
// Usage is uniform, no matter if modules are composite or atomic
val runnableGraph2 = Source.single(0).to(Sink.fold(0)(_ + _))
//#reuse
}
"complex graph" in {
// format: OFF
//#complex-graph
import GraphDSL.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val A: Outlet[Int] = builder.add(Source.single(0)).out
val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
val G: Inlet[Any] = builder.add(Sink.foreach(println)).in
C <~ F
A ~> B ~> C ~> F
B ~> D ~> E ~> F
E ~> G
ClosedShape
})
//#complex-graph
//#complex-graph-alt
import GraphDSL.Implicits._
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val B = builder.add(Broadcast[Int](2))
val C = builder.add(Merge[Int](2))
val E = builder.add(Balance[Int](2))
val F = builder.add(Merge[Int](2))
Source.single(0) ~> B.in; B.out(0) ~> C.in(1); C.out ~> F.in(0)
C.in(0) <~ F.out
B.out(1).map(_ + 1) ~> E.in; E.out(0) ~> F.in(1)
E.out(1) ~> Sink.foreach(println)
ClosedShape
})
//#complex-graph-alt
// format: ON
}
"partial graph" in {
// format: OFF
//#partial-graph
import GraphDSL.Implicits._
val partial = GraphDSL.create() { implicit builder =>
val B = builder.add(Broadcast[Int](2))
val C = builder.add(Merge[Int](2))
val E = builder.add(Balance[Int](2))
val F = builder.add(Merge[Int](2))
C <~ F
B ~> C ~> F
B ~> Flow[Int].map(_ + 1) ~> E ~> F
FlowShape(B.in, E.out(1))
}.named("partial")
//#partial-graph
// format: ON
//#partial-use
Source.single(0).via(partial).to(Sink.ignore)
//#partial-use
// format: OFF
//#partial-flow-dsl
// Convert the partial graph of FlowShape to a Flow to get
// access to the fluid DSL (for example to be able to call .filter())
val flow = Flow.fromGraph(partial)
// Simple way to create a graph backed Source
val source = Source.fromGraph( GraphDSL.create() { implicit builder =>
val merge = builder.add(Merge[Int](2))
Source.single(0) ~> merge
Source(List(2, 3, 4)) ~> merge
// Exposing exactly one output port
SourceShape(merge.out)
})
// Building a Sink with a nested Flow, using the fluid DSL
val sink = {
val nestedFlow = Flow[Int].map(_ * 2).drop(10).named("nestedFlow")
nestedFlow.to(Sink.head)
}
// Putting all together
val closed = source.via(flow.filter(_ > 1)).to(sink)
//#partial-flow-dsl
// format: ON
}
"closed graph" in {
//#embed-closed
val closed1 = Source.single(0).to(Sink.foreach(println))
val closed2 = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
val embeddedClosed: ClosedShape = builder.add(closed1)
//
embeddedClosed
})
//#embed-closed
}
"materialized values" in {
//#mat-combine-1
// Materializes to Promise[Option[Int]] (red)
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
// Materializes to Unit (black)
val flow1: Flow[Int, Int, Unit] = Flow[Int].take(100)
// Materializes to Promise[Int] (red)
val nestedSource: Source[Int, Promise[Option[Int]]] =
source.viaMat(flow1)(Keep.left).named("nestedSource")
//#mat-combine-1
//#mat-combine-2
// Materializes to Unit (orange)
val flow2: Flow[Int, ByteString, Unit] = Flow[Int].map { i => ByteString(i.toString) }
// Materializes to Future[OutgoingConnection] (yellow)
val flow3: Flow[ByteString, ByteString, Future[OutgoingConnection]] =
Tcp().outgoingConnection("localhost", 8080)
// Materializes to Future[OutgoingConnection] (yellow)
val nestedFlow: Flow[Int, ByteString, Future[OutgoingConnection]] =
flow2.viaMat(flow3)(Keep.right).named("nestedFlow")
//#mat-combine-2
//#mat-combine-3
// Materializes to Future[String] (green)
val sink: Sink[ByteString, Future[String]] = Sink.fold("")(_ + _.utf8String)
// Materializes to (Future[OutgoingConnection], Future[String]) (blue)
val nestedSink: Sink[Int, (Future[OutgoingConnection], Future[String])] =
nestedFlow.toMat(sink)(Keep.both)
//#mat-combine-3
//#mat-combine-4
case class MyClass(private val p: Promise[Option[Int]], conn: OutgoingConnection) {
def close() = p.trySuccess(None)
}
def f(p: Promise[Option[Int]],
rest: (Future[OutgoingConnection], Future[String])): Future[MyClass] = {
val connFuture = rest._1
connFuture.map(MyClass(p, _))
}
// Materializes to Future[MyClass] (purple)
val runnableGraph: RunnableGraph[Future[MyClass]] =
nestedSource.toMat(nestedSink)(f)
//#mat-combine-4
}
"attributes" in {
//#attributes-inheritance
import Attributes._
val nestedSource =
Source.single(0)
.map(_ + 1)
.named("nestedSource") // Wrap, no inputBuffer set
val nestedFlow =
Flow[Int].filter(_ != 0)
.via(Flow[Int].map(_ - 2).withAttributes(inputBuffer(4, 4))) // override
.named("nestedFlow") // Wrap, no inputBuffer set
val nestedSink =
nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
.withAttributes(name("nestedSink") and inputBuffer(3, 3)) // override
//#attributes-inheritance
}
}

View file

@ -0,0 +1,248 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.actor.Cancellable
import akka.stream.{ ClosedShape, FlowShape }
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import scala.concurrent.{ Promise, Future }
class FlowDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
//#imports
import akka.stream.ActorMaterializer
//#imports
implicit val materializer = ActorMaterializer()
"source is immutable" in {
//#source-immutable
val source = Source(1 to 10)
source.map(_ => 0) // has no effect on source, since it's immutable
source.runWith(Sink.fold(0)(_ + _)) // 55
val zeroes = source.map(_ => 0) // returns new Source[Int], with `map()` appended
zeroes.runWith(Sink.fold(0)(_ + _)) // 0
//#source-immutable
}
"materialization in steps" in {
//#materialization-in-steps
val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
// connect the Source to the Sink, obtaining a RunnableGraph
val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
// materialize the flow and get the value of the FoldSink
val sum: Future[Int] = runnable.run()
//#materialization-in-steps
}
"materialization runWith" in {
//#materialization-runWith
val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
// materialize the flow, getting the Sinks materialized value
val sum: Future[Int] = source.runWith(sink)
//#materialization-runWith
}
"materialization is unique" in {
//#stream-reuse
// connect the Source to the Sink, obtaining a RunnableGraph
val sink = Sink.fold[Int, Int](0)(_ + _)
val runnable: RunnableGraph[Future[Int]] =
Source(1 to 10).toMat(sink)(Keep.right)
// get the materialized value of the FoldSink
val sum1: Future[Int] = runnable.run()
val sum2: Future[Int] = runnable.run()
// sum1 and sum2 are different Futures!
//#stream-reuse
}
"compound source cannot be used as key" in {
// FIXME #16902 This example is now turned around
// The WRONG case has been switched
//#compound-source-is-not-keyed-runWith
import scala.concurrent.duration._
case object Tick
val timer = Source.tick(initialDelay = 1.second, interval = 1.seconds, tick = () => Tick)
val timerCancel: Cancellable = Sink.ignore.runWith(timer)
timerCancel.cancel()
val timerMap = timer.map(tick => "tick")
// materialize the flow and retrieve the timers Cancellable
val timerCancellable = Sink.ignore.runWith(timerMap)
timerCancellable.cancel()
//#compound-source-is-not-keyed-runWith
//#compound-source-is-not-keyed-run
val timerCancellable2 = timerMap.to(Sink.ignore).run()
timerCancellable2.cancel()
//#compound-source-is-not-keyed-run
}
"creating sources, sinks" in {
//#source-sink
// Create a source from an Iterable
Source(List(1, 2, 3))
// Create a source from a Future
Source.fromFuture(Future.successful("Hello Streams!"))
// Create a source from a single element
Source.single("only one element")
// an empty source
Source.empty
// Sink that folds over the stream and returns a Future
// of the final result as its materialized value
Sink.fold[Int, Int](0)(_ + _)
// Sink that returns a Future as its materialized value,
// containing the first element of the stream
Sink.head
// A Sink that consumes a stream without doing anything with the elements
Sink.ignore
// A Sink that executes a side-effecting call for every element of the stream
Sink.foreach[String](println(_))
//#source-sink
}
"various ways of connecting source, sink, flow" in {
//#flow-connecting
// Explicitly creating and wiring up a Source, Sink and Flow
Source(1 to 6).via(Flow[Int].map(_ * 2)).to(Sink.foreach(println(_)))
// Starting from a Source
val source = Source(1 to 6).map(_ * 2)
source.to(Sink.foreach(println(_)))
// Starting from a Sink
val sink: Sink[Int, Unit] = Flow[Int].map(_ * 2).to(Sink.foreach(println(_)))
Source(1 to 6).to(sink)
// Broadcast to a sink inline
val otherSink: Sink[Int, Unit] =
Flow[Int].alsoTo(Sink.foreach(println(_))).to(Sink.ignore)
Source(1 to 6).to(otherSink)
//#flow-connecting
}
"various ways of transforming materialized values" in {
import scala.concurrent.duration._
val throttler = Flow.fromGraph(GraphDSL.create(Source.tick(1.second, 1.second, "test")) { implicit builder =>
tickSource =>
import GraphDSL.Implicits._
val zip = builder.add(ZipWith[String, Int, Int](Keep.right))
tickSource ~> zip.in0
FlowShape(zip.in1, zip.out)
})
//#flow-mat-combine
// An source that can be signalled explicitly from the outside
val source: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
// A flow that internally throttles elements to 1/second, and returns a Cancellable
// which can be used to shut down the stream
val flow: Flow[Int, Int, Cancellable] = throttler
// A sink that returns the first element of a stream in the returned Future
val sink: Sink[Int, Future[Int]] = Sink.head[Int]
// By default, the materialized value of the leftmost stage is preserved
val r1: RunnableGraph[Promise[Option[Int]]] = source.via(flow).to(sink)
// Simple selection of materialized values by using Keep.right
val r2: RunnableGraph[Cancellable] = source.viaMat(flow)(Keep.right).to(sink)
val r3: RunnableGraph[Future[Int]] = source.via(flow).toMat(sink)(Keep.right)
// Using runWith will always give the materialized values of the stages added
// by runWith() itself
val r4: Future[Int] = source.via(flow).runWith(sink)
val r5: Promise[Option[Int]] = flow.to(sink).runWith(source)
val r6: (Promise[Option[Int]], Future[Int]) = flow.runWith(source, sink)
// Using more complext combinations
val r7: RunnableGraph[(Promise[Option[Int]], Cancellable)] =
source.viaMat(flow)(Keep.both).to(sink)
val r8: RunnableGraph[(Promise[Option[Int]], Future[Int])] =
source.via(flow).toMat(sink)(Keep.both)
val r9: RunnableGraph[((Promise[Option[Int]], Cancellable), Future[Int])] =
source.viaMat(flow)(Keep.both).toMat(sink)(Keep.both)
val r10: RunnableGraph[(Cancellable, Future[Int])] =
source.viaMat(flow)(Keep.right).toMat(sink)(Keep.both)
// It is also possible to map over the materialized values. In r9 we had a
// doubly nested pair, but we want to flatten it out
val r11: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =
r9.mapMaterializedValue {
case ((promise, cancellable), future) =>
(promise, cancellable, future)
}
// Now we can use pattern matching to get the resulting materialized values
val (promise, cancellable, future) = r11.run()
// Type inference works as expected
promise.success(None)
cancellable.cancel()
future.map(_ + 3)
// The result of r11 can be also achieved by using the Graph API
val r12: RunnableGraph[(Promise[Option[Int]], Cancellable, Future[Int])] =
RunnableGraph.fromGraph(GraphDSL.create(source, flow, sink)((_, _, _)) { implicit builder =>
(src, f, dst) =>
import GraphDSL.Implicits._
src ~> f ~> dst
ClosedShape
})
//#flow-mat-combine
}
"explicit fusing" in {
//#explicit-fusing
import akka.stream.Fusing
val flow = Flow[Int].map(_ * 2).filter(_ > 500)
val fused = Fusing.aggressive(flow)
Source.fromIterator { () => Iterator from 0 }
.via(fused)
.take(1000)
//#explicit-fusing
}
"defining asynchronous boundaries" in {
//#flow-async
import akka.stream.Attributes.asyncBoundary
Source(List(1, 2, 3))
.map(_ + 1)
.withAttributes(asyncBoundary)
.map(_ * 2)
.to(Sink.ignore)
//#flow-async
}
}

View file

@ -0,0 +1,92 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import scala.concurrent.Await
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.Supervision
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import akka.stream.Attributes
import akka.stream.ActorAttributes
import scala.concurrent.duration._
class FlowErrorDocSpec extends AkkaSpec {
"demonstrate fail stream" in {
//#stop
implicit val materializer = ActorMaterializer()
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// division by zero will fail the stream and the
// result here will be a Future completed with Failure(ArithmeticException)
//#stop
intercept[ArithmeticException] {
Await.result(result, 3.seconds)
}
}
"demonstrate resume stream" in {
//#resume
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)
//#resume
Await.result(result, 3.seconds) should be(228)
}
"demonstrate resume section" in {
//#resume-section
implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
val flow = Flow[Int]
.filter(100 / _ < 50).map(elem => 100 / (5 - elem))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(0 to 5).via(flow)
val result = source.runWith(Sink.fold(0)(_ + _))
// the elements causing division by zero will be dropped
// result here will be a Future completed with Success(150)
//#resume-section
Await.result(result, 3.seconds) should be(150)
}
"demonstrate restart section" in {
//#restart-section
implicit val materializer = ActorMaterializer()
val decider: Supervision.Decider = {
case _: IllegalArgumentException => Supervision.Restart
case _ => Supervision.Stop
}
val flow = Flow[Int]
.scan(0) { (acc, elem) =>
if (elem < 0) throw new IllegalArgumentException("negative not allowed")
else acc + elem
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(List(1, 3, -1, 5, 7)).via(flow)
val result = source.grouped(1000).runWith(Sink.head)
// the negative element cause the scan stage to be restarted,
// i.e. start from 0 again
// result here will be a Future completed with Success(Vector(0, 1, 4, 0, 5, 12))
//#restart-section
Await.result(result, 3.seconds) should be(Vector(0, 1, 4, 0, 5, 12))
}
}

View file

@ -0,0 +1,233 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import scala.collection.immutable
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
import akka.stream.Attributes
class FlowGraphDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
"build simple graph" in {
//format: OFF
//#simple-flow-graph
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[Unit] =>
import GraphDSL.Implicits._
val in = Source(1 to 10)
val out = Sink.ignore
val bcast = builder.add(Broadcast[Int](2))
val merge = builder.add(Merge[Int](2))
val f1, f2, f3, f4 = Flow[Int].map(_ + 10)
in ~> f1 ~> bcast ~> f2 ~> merge ~> f3 ~> out
bcast ~> f4 ~> merge
ClosedShape
})
//#simple-flow-graph
//format: ON
//#simple-graph-run
g.run()
//#simple-graph-run
}
"flow connection errors" in {
intercept[IllegalArgumentException] {
//#simple-graph
RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val source1 = Source(1 to 10)
val source2 = Source(1 to 10)
val zip = builder.add(Zip[Int, Int]())
source1 ~> zip.in0
source2 ~> zip.in1
// unconnected zip.out (!) => "must have at least 1 outgoing edge"
ClosedShape
})
//#simple-graph
}.getMessage should include("ZipWith2.out")
}
"reusing a flow in a graph" in {
//#flow-graph-reusing-a-flow
val topHeadSink = Sink.head[Int]
val bottomHeadSink = Sink.head[Int]
val sharedDoubler = Flow[Int].map(_ * 2)
//#flow-graph-reusing-a-flow
// format: OFF
val g =
//#flow-graph-reusing-a-flow
RunnableGraph.fromGraph(GraphDSL.create(topHeadSink, bottomHeadSink)((_, _)) { implicit builder =>
(topHS, bottomHS) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
Source.single(1) ~> broadcast.in
broadcast.out(0) ~> sharedDoubler ~> topHS.in
broadcast.out(1) ~> sharedDoubler ~> bottomHS.in
ClosedShape
})
//#flow-graph-reusing-a-flow
// format: ON
val (topFuture, bottomFuture) = g.run()
Await.result(topFuture, 300.millis) shouldEqual 2
Await.result(bottomFuture, 300.millis) shouldEqual 2
}
"building a reusable component" in {
//#flow-graph-components-shape
// A shape represents the input and output ports of a reusable
// processing module
case class PriorityWorkerPoolShape[In, Out](
jobsIn: Inlet[In],
priorityJobsIn: Inlet[In],
resultsOut: Outlet[Out]) extends Shape {
// It is important to provide the list of all input and output
// ports with a stable order. Duplicates are not allowed.
override val inlets: immutable.Seq[Inlet[_]] =
jobsIn :: priorityJobsIn :: Nil
override val outlets: immutable.Seq[Outlet[_]] =
resultsOut :: Nil
// A Shape must be able to create a copy of itself. Basically
// it means a new instance with copies of the ports
override def deepCopy() = PriorityWorkerPoolShape(
jobsIn.carbonCopy(),
priorityJobsIn.carbonCopy(),
resultsOut.carbonCopy())
// A Shape must also be able to create itself from existing ports
override def copyFromPorts(
inlets: immutable.Seq[Inlet[_]],
outlets: immutable.Seq[Outlet[_]]) = {
assert(inlets.size == this.inlets.size)
assert(outlets.size == this.outlets.size)
// This is why order matters when overriding inlets and outlets.
PriorityWorkerPoolShape[In, Out](inlets(0).as[In], inlets(1).as[In], outlets(0).as[Out])
}
}
//#flow-graph-components-shape
//#flow-graph-components-create
object PriorityWorkerPool {
def apply[In, Out](
worker: Flow[In, Out, Any],
workerCount: Int): Graph[PriorityWorkerPoolShape[In, Out], Unit] = {
GraphDSL.create() { implicit b
import GraphDSL.Implicits._
val priorityMerge = b.add(MergePreferred[In](1))
val balance = b.add(Balance[In](workerCount))
val resultsMerge = b.add(Merge[Out](workerCount))
// After merging priority and ordinary jobs, we feed them to the balancer
priorityMerge ~> balance
// Wire up each of the outputs of the balancer to a worker flow
// then merge them back
for (i <- 0 until workerCount)
balance.out(i) ~> worker ~> resultsMerge.in(i)
// We now expose the input ports of the priorityMerge and the output
// of the resultsMerge as our PriorityWorkerPool ports
// -- all neatly wrapped in our domain specific Shape
PriorityWorkerPoolShape(
jobsIn = priorityMerge.in(0),
priorityJobsIn = priorityMerge.preferred,
resultsOut = resultsMerge.out)
}
}
}
//#flow-graph-components-create
def println(s: Any): Unit = ()
//#flow-graph-components-use
val worker1 = Flow[String].map("step 1 " + _)
val worker2 = Flow[String].map("step 2 " + _)
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val priorityPool1 = b.add(PriorityWorkerPool(worker1, 4))
val priorityPool2 = b.add(PriorityWorkerPool(worker2, 2))
Source(1 to 100).map("job: " + _) ~> priorityPool1.jobsIn
Source(1 to 100).map("priority job: " + _) ~> priorityPool1.priorityJobsIn
priorityPool1.resultsOut ~> priorityPool2.jobsIn
Source(1 to 100).map("one-step, priority " + _) ~> priorityPool2.priorityJobsIn
priorityPool2.resultsOut ~> Sink.foreach(println)
ClosedShape
}).run()
//#flow-graph-components-use
//#flow-graph-components-shape2
import FanInShape.Name
import FanInShape.Init
class PriorityWorkerPoolShape2[In, Out](_init: Init[Out] = Name("PriorityWorkerPool"))
extends FanInShape[Out](_init) {
protected override def construct(i: Init[Out]) = new PriorityWorkerPoolShape2(i)
val jobsIn = newInlet[In]("jobsIn")
val priorityJobsIn = newInlet[In]("priorityJobsIn")
// Outlet[Out] with name "out" is automatically created
}
//#flow-graph-components-shape2
}
"access to materialized value" in {
//#flow-graph-matvalue
import GraphDSL.Implicits._
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder
fold
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
})
//#flow-graph-matvalue
Await.result(Source(1 to 10).via(foldFlow).runWith(Sink.head), 3.seconds) should ===(55)
//#flow-graph-matvalue-cycle
import GraphDSL.Implicits._
// This cannot produce any value:
val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder =>
fold =>
// - Fold cannot complete until its upstream mapAsync completes
// - mapAsync cannot complete until the materialized Future produced by
// fold completes
// As a result this Source will never emit anything, and its materialited
// Future will never complete
builder.materializedValue.mapAsync(4)(identity) ~> fold
SourceShape(builder.materializedValue.mapAsync(4)(identity).outlet)
})
//#flow-graph-matvalue-cycle
}
}

View file

@ -0,0 +1,110 @@
package docs.stream
import akka.stream.FlowShape
import akka.stream.scaladsl.{ GraphDSL, Merge, Balance, Source, Flow }
import akka.stream.testkit.AkkaSpec
class FlowParallelismDocSpec extends AkkaSpec {
import GraphDSL.Implicits._
case class ScoopOfBatter()
case class HalfCookedPancake()
case class Pancake()
//format: OFF
//#pipelining
// Takes a scoop of batter and creates a pancake with one side cooked
val fryingPan1: Flow[ScoopOfBatter, HalfCookedPancake, Unit] =
Flow[ScoopOfBatter].map { batter => HalfCookedPancake() }
// Finishes a half-cooked pancake
val fryingPan2: Flow[HalfCookedPancake, Pancake, Unit] =
Flow[HalfCookedPancake].map { halfCooked => Pancake() }
//#pipelining
//format: ON
"Demonstrate pipelining" in {
//#pipelining
// With the two frying pans we can fully cook pancakes
val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] =
Flow[ScoopOfBatter].via(fryingPan1).via(fryingPan2)
//#pipelining
}
"Demonstrate parallel processing" in {
//#parallelism
val fryingPan: Flow[ScoopOfBatter, Pancake, Unit] =
Flow[ScoopOfBatter].map { batter => Pancake() }
val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
val mergePancakes = builder.add(Merge[Pancake](2))
// Using two frying pans in parallel, both fully cooking a pancake from the batter.
// We always put the next scoop of batter to the first frying pan that becomes available.
dispatchBatter.out(0) ~> fryingPan ~> mergePancakes.in(0)
// Notice that we used the "fryingPan" flow without importing it via builder.add().
// Flows used this way are auto-imported, which in this case means that the two
// uses of "fryingPan" mean actually different stages in the graph.
dispatchBatter.out(1) ~> fryingPan ~> mergePancakes.in(1)
FlowShape(dispatchBatter.in, mergePancakes.out)
})
//#parallelism
}
"Demonstrate parallelized pipelines" in {
//#parallel-pipeline
val pancakeChef: Flow[ScoopOfBatter, Pancake, Unit] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
val mergePancakes = builder.add(Merge[Pancake](2))
// Using two pipelines, having two frying pans each, in total using
// four frying pans
dispatchBatter.out(0) ~> fryingPan1 ~> fryingPan2 ~> mergePancakes.in(0)
dispatchBatter.out(1) ~> fryingPan1 ~> fryingPan2 ~> mergePancakes.in(1)
FlowShape(dispatchBatter.in, mergePancakes.out)
})
//#parallel-pipeline
}
"Demonstrate pipelined parallel processing" in {
//#pipelined-parallel
val pancakeChefs1: Flow[ScoopOfBatter, HalfCookedPancake, Unit] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
val mergeHalfPancakes = builder.add(Merge[HalfCookedPancake](2))
// Two chefs work with one frying pan for each, half-frying the pancakes then putting
// them into a common pool
dispatchBatter.out(0) ~> fryingPan1 ~> mergeHalfPancakes.in(0)
dispatchBatter.out(1) ~> fryingPan1 ~> mergeHalfPancakes.in(1)
FlowShape(dispatchBatter.in, mergeHalfPancakes.out)
})
val pancakeChefs2: Flow[HalfCookedPancake, Pancake, Unit] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchHalfPancakes = builder.add(Balance[HalfCookedPancake](2))
val mergePancakes = builder.add(Merge[Pancake](2))
// Two chefs work with one frying pan for each, finishing the pancakes then putting
// them into a common pool
dispatchHalfPancakes.out(0) ~> fryingPan2 ~> mergePancakes.in(0)
dispatchHalfPancakes.out(1) ~> fryingPan2 ~> mergePancakes.in(1)
FlowShape(dispatchHalfPancakes.in, mergePancakes.out)
})
val kitchen: Flow[ScoopOfBatter, Pancake, Unit] = pancakeChefs1.via(pancakeChefs2)
//#pipelined-parallel
}
}

View file

@ -0,0 +1,192 @@
package docs.stream
import akka.stream._
import akka.stream.scaladsl.{ Sink, Source, Flow, Keep }
import akka.stream.testkit.AkkaSpec
import org.scalatest.concurrent.ScalaFutures
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowStagesSpec extends AkkaSpec with ScalaFutures {
//#import-stage
import akka.stream.stage._
//#import-stage
implicit val materializer = ActorMaterializer()
"stages demo" must {
"demonstrate various PushPullStages" in {
//#one-to-one
class Map[A, B](f: A => B) extends PushPullStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): SyncDirective =
ctx.push(f(elem))
override def onPull(ctx: Context[B]): SyncDirective =
ctx.pull()
}
//#one-to-one
//#many-to-one
class Filter[A](p: A => Boolean) extends PushPullStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
if (p(elem)) ctx.push(elem)
else ctx.pull()
override def onPull(ctx: Context[A]): SyncDirective =
ctx.pull()
}
//#many-to-one
//#one-to-many
class Duplicator[A]() extends PushPullStage[A, A] {
private var lastElem: A = _
private var oneLeft = false
override def onPush(elem: A, ctx: Context[A]): SyncDirective = {
lastElem = elem
oneLeft = true
ctx.push(elem)
}
override def onPull(ctx: Context[A]): SyncDirective =
if (!ctx.isFinishing) {
// the main pulling logic is below as it is demonstrated on the illustration
if (oneLeft) {
oneLeft = false
ctx.push(lastElem)
} else
ctx.pull()
} else {
// If we need to emit a final element after the upstream
// finished
if (oneLeft) ctx.pushAndFinish(lastElem)
else ctx.finish()
}
override def onUpstreamFinish(ctx: Context[A]): TerminationDirective =
ctx.absorbTermination()
}
//#one-to-many
val keyedSink = Sink.head[immutable.Seq[Int]]
val sink = Flow[Int].grouped(10).toMat(keyedSink)(Keep.right)
//#stage-chain
val resultFuture = Source(1 to 10)
.transform(() => new Filter(_ % 2 == 0))
.transform(() => new Duplicator())
.transform(() => new Map(_ / 2))
.runWith(sink)
//#stage-chain
Await.result(resultFuture, 3.seconds) should be(Seq(1, 1, 2, 2, 3, 3, 4, 4, 5, 5))
}
"demonstrate various PushStages" in {
import akka.stream.stage._
//#pushstage
class Map[A, B](f: A => B) extends PushStage[A, B] {
override def onPush(elem: A, ctx: Context[B]): SyncDirective =
ctx.push(f(elem))
}
class Filter[A](p: A => Boolean) extends PushStage[A, A] {
override def onPush(elem: A, ctx: Context[A]): SyncDirective =
if (p(elem)) ctx.push(elem)
else ctx.pull()
}
//#pushstage
}
"demonstrate GraphStage" in {
//#doubler-stateful
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Duplicator.in")
val out = Outlet[A]("Duplicator.out")
val shape: FlowShape[A, A] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
emitMultiple(out, List(elem, elem))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
}
}
//#doubler-stateful
val duplicator = Flow.fromGraph(new Duplicator[Int])
val fold = Source(1 to 2).via(duplicator).runFold("")(_ + _)
whenReady(fold) { s
s should be("1122")
}
}
"demonstrate DetachedStage" in {
//#detached
class Buffer2[T]() extends DetachedStage[T, T] {
private var buf = Vector.empty[T]
private var capacity = 2
private def isFull = capacity == 0
private def isEmpty = capacity == 2
private def dequeue(): T = {
capacity += 1
val next = buf.head
buf = buf.tail
next
}
private def enqueue(elem: T) = {
capacity -= 1
buf = buf :+ elem
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (isEmpty) {
if (ctx.isFinishing) ctx.finish() // No more elements will arrive
else ctx.holdDownstream() // waiting until new elements
} else {
val next = dequeue()
if (ctx.isHoldingUpstream) ctx.pushAndPull(next) // release upstream
else ctx.push(next)
}
}
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
enqueue(elem)
if (isFull) ctx.holdUpstream() // Queue is now full, wait until new empty slot
else {
if (ctx.isHoldingDownstream) ctx.pushAndPull(dequeue()) // Release downstream
else ctx.pull()
}
}
override def onUpstreamFinish(ctx: DetachedContext[T]): TerminationDirective = {
if (!isEmpty) ctx.absorbTermination() // still need to flush from buffer
else ctx.finish() // already empty, finishing
}
}
//#detached
}
}
}

View file

@ -0,0 +1,110 @@
package docs.stream
import akka.stream.{ ClosedShape, OverflowStrategy, ActorMaterializer }
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
class GraphCyclesSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
"Cycle demonstration" must {
val source = Source.fromIterator(() => Iterator.from(0))
"include a deadlocked cycle" in {
// format: OFF
//#deadlocked
// WARNING! The graph below deadlocks!
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[Int](2))
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge <~ bcast
ClosedShape
})
//#deadlocked
// format: ON
}
"include an unfair cycle" in {
// format: OFF
//#unfair
// WARNING! The graph below stops consuming from "source" after a few steps
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(MergePreferred[Int](1))
val bcast = b.add(Broadcast[Int](2))
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge.preferred <~ bcast
ClosedShape
})
//#unfair
// format: ON
}
"include a dropping cycle" in {
// format: OFF
//#dropping
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[Int](2))
val bcast = b.add(Broadcast[Int](2))
source ~> merge ~> Flow[Int].map { s => println(s); s } ~> bcast ~> Sink.ignore
merge <~ Flow[Int].buffer(10, OverflowStrategy.dropHead) <~ bcast
ClosedShape
})
//#dropping
// format: ON
}
"include a dead zipping cycle" in {
// format: OFF
//#zipping-dead
// WARNING! The graph below never processes any elements
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip = b.add(ZipWith[Int, Int, Int]((left, right) => right))
val bcast = b.add(Broadcast[Int](2))
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
zip.in1 <~ bcast
ClosedShape
})
//#zipping-dead
// format: ON
}
"include a live zipping cycle" in {
// format: OFF
//#zipping-live
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip = b.add(ZipWith((left: Int, right: Int) => left))
val bcast = b.add(Broadcast[Int](2))
val concat = b.add(Concat[Int]())
val start = Source.single(0)
source ~> zip.in0
zip.out.map { s => println(s); s } ~> bcast ~> Sink.ignore
zip.in1 <~ concat <~ start
concat <~ bcast
ClosedShape
})
//#zipping-live
// format: ON
}
}
}

View file

@ -0,0 +1,508 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream.scaladsl.{ Keep, Sink, Flow, Source }
import akka.stream.stage._
import akka.stream._
import akka.stream.testkit.{ TestPublisher, TestSubscriber, AkkaSpec }
import scala.collection.mutable
import scala.concurrent.{ Promise, Await, Future }
import scala.concurrent.duration._
import scala.collection.immutable.Iterable
class GraphStageDocSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
"Demonstrate creation of GraphStage boilerplate" in {
//#boilerplate-example
import akka.stream.SourceShape
import akka.stream.stage.GraphStage
class NumbersSource extends GraphStage[SourceShape[Int]] {
// Define the (sole) output port of this stage
val out: Outlet[Int] = Outlet("NumbersSource")
// Define the shape of this stage, which is SourceShape with the port we defined above
override val shape: SourceShape[Int] = SourceShape(out)
// This is where the actual (possibly stateful) logic will live
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
}
//#boilerplate-example
}
"Demonstrate creation of GraphStage Source" in {
//#custom-source-example
import akka.stream.SourceShape
import akka.stream.Graph
import akka.stream.stage.GraphStage
import akka.stream.stage.OutHandler
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
// All state MUST be inside the GraphStageLogic,
// never inside the enclosing GraphStage.
// This state is safe to access and modify from all the
// callbacks that are provided by GraphStageLogic and the
// registered handlers.
private var counter = 1
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, counter)
counter += 1
}
})
}
}
//#custom-source-example
//#simple-source-usage
// A GraphStage is a proper Graph, just like what GraphDSL.create would return
val sourceGraph: Graph[SourceShape[Int], Unit] = new NumbersSource
// Create a Source from the Graph to access the DSL
val mySource: Source[Int, Unit] = Source.fromGraph(new NumbersSource)
// Returns 55
val result1: Future[Int] = mySource.take(10).runFold(0)(_ + _)
// The source is reusable. This returns 5050
val result2: Future[Int] = mySource.take(100).runFold(0)(_ + _)
//#simple-source-usage
Await.result(result1, 3.seconds) should ===(55)
Await.result(result2, 3.seconds) should ===(5050)
}
//#one-to-one
class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] {
val in = Inlet[A]("Map.in")
val out = Outlet[B]("Map.out")
override val shape = FlowShape.of(in, out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
push(out, f(grab(in)))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
//#one-to-one
"Demonstrate a one to one element GraphStage" in {
// tests:
val stringLength = Flow.fromGraph(new Map[String, Int](_.length))
val result =
Source(Vector("one", "two", "three"))
.via(stringLength)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Await.result(result, 3.seconds) should ===(Seq(3, 3, 5))
}
//#many-to-one
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[A]("Filter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
//#many-to-one
"Demonstrate a many to one element GraphStage" in {
// tests:
val evenFilter = Flow.fromGraph(new Filter[Int](_ % 2 == 0))
val result =
Source(Vector(1, 2, 3, 4, 5, 6))
.via(evenFilter)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Await.result(result, 3.seconds) should ===(Seq(2, 4, 6))
}
//#one-to-many
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Duplicator.in")
val out = Outlet[A]("Duplicator.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
// Again: note that all mutable state
// MUST be inside the GraphStageLogic
var lastElem: Option[A] = None
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
lastElem = Some(elem)
push(out, elem)
}
override def onUpstreamFinish(): Unit = {
if (lastElem.isDefined) emit(out, lastElem.get)
complete(out)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (lastElem.isDefined) {
push(out, lastElem.get)
lastElem = None
} else {
pull(in)
}
}
})
}
}
//#one-to-many
"Demonstrate a one to many element GraphStage" in {
// tests:
val duplicator = Flow.fromGraph(new Duplicator[Int])
val result =
Source(Vector(1, 2, 3))
.via(duplicator)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Await.result(result, 3.seconds) should ===(Seq(1, 1, 2, 2, 3, 3))
}
"Demonstrate a simpler one to many stage" in {
//#simpler-one-to-many
class Duplicator[A] extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Duplicator.in")
val out = Outlet[A]("Duplicator.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
// this will temporarily suspend this handler until the two elems
// are emitted and then reinstates it
emitMultiple(out, Iterable(elem, elem))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
//#simpler-one-to-many
// tests:
val duplicator = Flow.fromGraph(new Duplicator[Int])
val result =
Source(Vector(1, 2, 3))
.via(duplicator)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Await.result(result, 3.seconds) should ===(Seq(1, 1, 2, 2, 3, 3))
}
"Demonstrate chaining of graph stages" in {
val sink = Sink.fold[List[Int], Int](List.empty[Int])((acc, n) => acc :+ n)
//#graph-stage-chain
val resultFuture = Source(1 to 5)
.via(new Filter(_ % 2 == 0))
.via(new Duplicator())
.via(new Map(_ / 2))
.runWith(sink)
//#graph-stage-chain
Await.result(resultFuture, 3.seconds) should ===(List(1, 1, 2, 2))
}
"Demonstrate an asynchronous side channel" in {
import system.dispatcher
//#async-side-channel
// will close upstream when the future completes
class KillSwitch[A](switch: Future[Unit]) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("KillSwitch.in")
val out = Outlet[A]("KillSwitch.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
override def preStart(): Unit = {
val callback = getAsyncCallback[Unit] { (_) =>
completeStage()
}
switch.foreach(callback.invoke)
}
setHandler(in, new InHandler {
override def onPush(): Unit = { push(out, grab(in)) }
})
setHandler(out, new OutHandler {
override def onPull(): Unit = { pull(in) }
})
}
}
//#async-side-channel
// tests:
val switch = Promise[Unit]()
val duplicator = Flow.fromGraph(new KillSwitch[Int](switch.future))
// TODO this is probably racey, is there a way to make sure it happens after?
val valueAfterKill = switch.future.flatMap(_ => Future(4))
val result =
Source(Vector(1, 2, 3)).concat(Source.fromFuture(valueAfterKill))
.via(duplicator)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
switch.success(Unit)
Await.result(result, 3.seconds) should ===(Seq(1, 2, 3))
}
"Demonstrate a graph stage with a timer" in {
//#timed
// each time an event is pushed through it will trigger a period of silence
class TimedGate[A](silencePeriod: FiniteDuration) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("TimedGate.in")
val out = Outlet[A]("TimedGate.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) {
var open = false
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (open) pull(in)
else {
push(out, elem)
open = true
scheduleOnce(None, silencePeriod)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = { pull(in) }
})
override protected def onTimer(timerKey: Any): Unit = {
open = false
}
}
}
//#timed
// tests:
val result =
Source(Vector(1, 2, 3))
.via(new TimedGate[Int](2.second))
.takeWithin(250.millis)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Await.result(result, 3.seconds) should ===(Seq(1))
}
"Demonstrate a custom materialized value" in {
//#materialized
class FirstValue[A] extends GraphStageWithMaterializedValue[FlowShape[A, A], Future[A]] {
val in = Inlet[A]("FirstValue.in")
val out = Outlet[A]("FirstValue.out")
val shape = FlowShape.of(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[A]) = {
val promise = Promise[A]()
val logic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
promise.success(elem)
push(out, elem)
// replace handler with one just forwarding
setHandler(in, new InHandler {
override def onPush(): Unit = {
push(out, grab(in))
}
})
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
(logic, promise.future)
}
}
//#materialized
// tests:
val flow = Source(Vector(1, 2, 3))
.viaMat(new FirstValue)(Keep.right)
.to(Sink.ignore)
val result: Future[Int] = flow.run()
Await.result(result, 3.seconds) should ===(1)
}
"Demonstrate a detached graph stage" in {
//#detached
class TwoBuffer[A] extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("TwoBuffer.in")
val out = Outlet[A]("TwoBuffer.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
val buffer = mutable.Queue[A]()
def bufferFull = buffer.size == 2
var downstreamWaiting = false
override def preStart(): Unit = {
// a detached stage needs to start upstream demand
// itself as it is not triggered by downstream demand
pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer.enqueue(elem)
if (downstreamWaiting) {
downstreamWaiting = false
val bufferedElem = buffer.dequeue()
push(out, bufferedElem)
}
if (!bufferFull) {
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (buffer.nonEmpty) {
// emit the rest if possible
emitMultiple(out, buffer.toIterator)
}
completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (buffer.isEmpty) {
downstreamWaiting = true
} else {
val elem = buffer.dequeue
push(out, elem)
}
if (!bufferFull && !hasBeenPulled(in)) {
pull(in)
}
}
})
}
}
//#detached
// tests:
val result1 = Source(Vector(1, 2, 3))
.via(new TwoBuffer)
.runFold(Vector.empty[Int])((acc, n) => acc :+ n)
Await.result(result1, 3.seconds) should ===(Vector(1, 2, 3))
val subscriber = TestSubscriber.manualProbe[Int]()
val publisher = TestPublisher.probe[Int]()
val flow2 =
Source.fromPublisher(publisher)
.via(new TwoBuffer)
.to(Sink.fromSubscriber(subscriber))
val result2 = flow2.run()
val sub = subscriber.expectSubscription()
// this happens even though the subscriber has not signalled any demand
publisher.sendNext(1)
publisher.sendNext(2)
sub.cancel()
}
}

View file

@ -0,0 +1,379 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import scala.concurrent.duration._
import akka.stream.testkit.AkkaSpec
import akka.stream.scaladsl._
import akka.stream.ActorMaterializer
import scala.concurrent.Future
import akka.testkit.TestProbe
import akka.actor.ActorRef
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.Props
import akka.pattern.ask
import akka.util.Timeout
import akka.stream.Attributes
import akka.stream.ActorAttributes
import scala.concurrent.ExecutionContext
import akka.stream.ActorMaterializerSettings
import java.util.concurrent.atomic.AtomicInteger
import akka.stream.Supervision
import akka.stream.scaladsl.Flow
object IntegrationDocSpec {
import TwitterStreamQuickstartDocSpec._
val config = ConfigFactory.parseString("""
#//#blocking-dispatcher-config
blocking-dispatcher {
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 10
core-pool-size-max = 10
}
}
#//#blocking-dispatcher-config
akka.actor.default-mailbox.mailbox-type = akka.dispatch.UnboundedMailbox
""")
class AddressSystem {
//#email-address-lookup
def lookupEmail(handle: String): Future[Option[String]] =
//#email-address-lookup
Future.successful(Some(handle + "@somewhere.com"))
//#phone-lookup
def lookupPhoneNumber(handle: String): Future[Option[String]] =
//#phone-lookup
Future.successful(Some(handle.hashCode.toString))
}
class AddressSystem2 {
//#email-address-lookup2
def lookupEmail(handle: String): Future[String] =
//#email-address-lookup2
Future.successful(handle + "@somewhere.com")
}
final case class Email(to: String, title: String, body: String)
final case class TextMessage(to: String, body: String)
class EmailServer(probe: ActorRef) {
//#email-server-send
def send(email: Email): Future[Unit] = {
// ...
//#email-server-send
probe ! email.to
Future.successful(())
//#email-server-send
}
//#email-server-send
}
class SmsServer(probe: ActorRef) {
//#sms-server-send
def send(text: TextMessage): Unit = {
// ...
//#sms-server-send
probe ! text.to
//#sms-server-send
}
//#sms-server-send
}
final case class Save(tweet: Tweet)
final case object SaveDone
class DatabaseService(probe: ActorRef) extends Actor {
override def receive = {
case Save(tweet: Tweet) =>
probe ! tweet.author.handle
sender() ! SaveDone
}
}
//#sometimes-slow-service
class SometimesSlowService(implicit ec: ExecutionContext) {
//#sometimes-slow-service
def println(s: String): Unit = ()
//#sometimes-slow-service
private val runningCount = new AtomicInteger
def convert(s: String): Future[String] = {
println(s"running: $s (${runningCount.incrementAndGet()})")
Future {
if (s.nonEmpty && s.head.isLower)
Thread.sleep(500)
else
Thread.sleep(20)
println(s"completed: $s (${runningCount.decrementAndGet()})")
s.toUpperCase
}
}
}
//#sometimes-slow-service
}
class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
import TwitterStreamQuickstartDocSpec._
import IntegrationDocSpec._
implicit val materializer = ActorMaterializer()
"calling external service with mapAsync" in {
val probe = TestProbe()
val addressSystem = new AddressSystem
val emailServer = new EmailServer(probe.ref)
//#tweet-authors
val authors: Source[Author, Unit] =
tweets
.filter(_.hashtags.contains(akka))
.map(_.author)
//#tweet-authors
//#email-addresses-mapAsync
val emailAddresses: Source[String, Unit] =
authors
.mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
//#email-addresses-mapAsync
//#send-emails
val sendEmails: RunnableGraph[Unit] =
emailAddresses
.mapAsync(4)(address => {
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
})
.to(Sink.ignore)
sendEmails.run()
//#send-emails
probe.expectMsg("rolandkuhn@somewhere.com")
probe.expectMsg("patriknw@somewhere.com")
probe.expectMsg("bantonsson@somewhere.com")
probe.expectMsg("drewhk@somewhere.com")
probe.expectMsg("ktosopl@somewhere.com")
probe.expectMsg("mmartynas@somewhere.com")
probe.expectMsg("akkateam@somewhere.com")
}
"lookup email with mapAsync and supervision" in {
val addressSystem = new AddressSystem2
val authors: Source[Author, Unit] =
tweets.filter(_.hashtags.contains(akka)).map(_.author)
//#email-addresses-mapAsync-supervision
import ActorAttributes.supervisionStrategy
import Supervision.resumingDecider
val emailAddresses: Source[String, Unit] =
authors.via(
Flow[Author].mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
.withAttributes(supervisionStrategy(resumingDecider)))
//#email-addresses-mapAsync-supervision
}
"calling external service with mapAsyncUnordered" in {
val probe = TestProbe()
val addressSystem = new AddressSystem
val emailServer = new EmailServer(probe.ref)
//#external-service-mapAsyncUnordered
val authors: Source[Author, Unit] =
tweets.filter(_.hashtags.contains(akka)).map(_.author)
val emailAddresses: Source[String, Unit] =
authors
.mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
val sendEmails: RunnableGraph[Unit] =
emailAddresses
.mapAsyncUnordered(4)(address => {
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
})
.to(Sink.ignore)
sendEmails.run()
//#external-service-mapAsyncUnordered
probe.receiveN(7).toSet should be(Set(
"rolandkuhn@somewhere.com",
"patriknw@somewhere.com",
"bantonsson@somewhere.com",
"drewhk@somewhere.com",
"ktosopl@somewhere.com",
"mmartynas@somewhere.com",
"akkateam@somewhere.com"))
}
"careful managed blocking with mapAsync" in {
val probe = TestProbe()
val addressSystem = new AddressSystem
val smsServer = new SmsServer(probe.ref)
val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author)
val phoneNumbers =
authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo }
//#blocking-mapAsync
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val sendTextMessages: RunnableGraph[Unit] =
phoneNumbers
.mapAsync(4)(phoneNo => {
Future {
smsServer.send(
TextMessage(to = phoneNo, body = "I like your tweet"))
}(blockingExecutionContext)
})
.to(Sink.ignore)
sendTextMessages.run()
//#blocking-mapAsync
probe.receiveN(7).toSet should be(Set(
"rolandkuhn".hashCode.toString,
"patriknw".hashCode.toString,
"bantonsson".hashCode.toString,
"drewhk".hashCode.toString,
"ktosopl".hashCode.toString,
"mmartynas".hashCode.toString,
"akkateam".hashCode.toString))
}
"careful managed blocking with map" in {
val probe = TestProbe()
val addressSystem = new AddressSystem
val smsServer = new SmsServer(probe.ref)
val authors = tweets.filter(_.hashtags.contains(akka)).map(_.author)
val phoneNumbers =
authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo }
//#blocking-map
val send = Flow[String]
.map { phoneNo =>
smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet"))
}
.withAttributes(ActorAttributes.dispatcher("blocking-dispatcher"))
val sendTextMessages: RunnableGraph[Unit] =
phoneNumbers.via(send).to(Sink.ignore)
sendTextMessages.run()
//#blocking-map
probe.expectMsg("rolandkuhn".hashCode.toString)
probe.expectMsg("patriknw".hashCode.toString)
probe.expectMsg("bantonsson".hashCode.toString)
probe.expectMsg("drewhk".hashCode.toString)
probe.expectMsg("ktosopl".hashCode.toString)
probe.expectMsg("mmartynas".hashCode.toString)
probe.expectMsg("akkateam".hashCode.toString)
}
"calling actor service with mapAsync" in {
val probe = TestProbe()
val database = system.actorOf(Props(classOf[DatabaseService], probe.ref), "db")
//#save-tweets
val akkaTweets: Source[Tweet, Unit] = tweets.filter(_.hashtags.contains(akka))
implicit val timeout = Timeout(3.seconds)
val saveTweets: RunnableGraph[Unit] =
akkaTweets
.mapAsync(4)(tweet => database ? Save(tweet))
.to(Sink.ignore)
//#save-tweets
saveTweets.run()
probe.expectMsg("rolandkuhn")
probe.expectMsg("patriknw")
probe.expectMsg("bantonsson")
probe.expectMsg("drewhk")
probe.expectMsg("ktosopl")
probe.expectMsg("mmartynas")
probe.expectMsg("akkateam")
}
"illustrate ordering and parallelism of mapAsync" in {
val probe = TestProbe()
def println(s: String): Unit = {
if (s.startsWith("after:"))
probe.ref ! s
}
//#sometimes-slow-mapAsync
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val service = new SometimesSlowService
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsync(4)(service.convert)
.runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsync
probe.expectMsg("after: A")
probe.expectMsg("after: B")
probe.expectMsg("after: C")
probe.expectMsg("after: D")
probe.expectMsg("after: E")
probe.expectMsg("after: F")
probe.expectMsg("after: G")
probe.expectMsg("after: H")
probe.expectMsg("after: I")
probe.expectMsg("after: J")
}
"illustrate ordering and parallelism of mapAsyncUnordered" in {
val probe = TestProbe()
def println(s: String): Unit = {
if (s.startsWith("after:"))
probe.ref ! s
}
//#sometimes-slow-mapAsyncUnordered
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val service = new SometimesSlowService
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
.mapAsyncUnordered(4)(service.convert)
.runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsyncUnordered
probe.receiveN(10).toSet should be(Set(
"after: A",
"after: B",
"after: C",
"after: D",
"after: E",
"after: F",
"after: G",
"after: H",
"after: I",
"after: J"))
}
}

View file

@ -0,0 +1,284 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import java.io.File
import _root_.akka.http.scaladsl.model.Uri
import _root_.akka.stream._
import _root_.akka.stream.scaladsl._
import _root_.akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import _root_.akka.stream.testkit.{AkkaSpec, TestPublisher, TestSubscriber}
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Random, Success, Try}
class MigrationsScala extends AkkaSpec {
"Examples in migration guide" must {
"compile" in {
val flow1 = Flow[Int]
val flow2 = Flow[Int]
def inlet: Inlet[Int] = ???
def outlet: Outlet[Int] = ???
def inlet1: Inlet[Int] = ???
def outlet1: Outlet[Int] = ???
def inlet2: Inlet[Int] = ???
def outlet2: Outlet[Int] = ???
lazy val dontExecuteMe = {
//#flow-wrap
val graphSource: Graph[SourceShape[Int], Unit] = ???
val source: Source[Int, Unit] = Source.fromGraph(graphSource)
val graphSink: Graph[SinkShape[Int], Unit] = ???
val sink: Sink[Int, Unit] = Sink.fromGraph(graphSink)
val graphFlow: Graph[FlowShape[Int, Int], Unit] = ???
val flow: Flow[Int, Int, Unit] = Flow.fromGraph(graphFlow)
Flow.fromSinkAndSource(Sink.head[Int], Source.single(0))
//#flow-wrap
//#bidiflow-wrap
val bidiGraph: Graph[BidiShape[Int, Int, Int, Int], Unit] = ???
val bidi: BidiFlow[Int, Int, Int, Int, Unit] = BidiFlow.fromGraph(bidiGraph)
BidiFlow.fromFlows(flow1, flow2)
BidiFlow.fromFunctions((x: Int) => x + 1, (y: Int) => y * 3)
//#bidiflow-wrap
//#graph-create
// Replaces GraphDSL.closed()
GraphDSL.create() { builder =>
//...
ClosedShape
}
// Replaces GraphDSL.partial()
GraphDSL.create() { builder =>
//...
FlowShape(inlet, outlet)
}
//#graph-create
//#graph-create-2
Source.fromGraph(
GraphDSL.create() { builder =>
//...
SourceShape(outlet)
})
Sink.fromGraph(
GraphDSL.create() { builder =>
//...
SinkShape(inlet)
})
Flow.fromGraph(
GraphDSL.create() { builder =>
//...
FlowShape(inlet, outlet)
})
BidiFlow.fromGraph(
GraphDSL.create() { builder =>
//...
BidiShape(inlet1, outlet1, inlet2, outlet2)
})
//#graph-create-2
//#graph-edges
RunnableGraph.fromGraph(
GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
outlet ~> inlet
outlet ~> flow ~> inlet
//...
ClosedShape
})
//#graph-edges
val promise = Promise[Unit]()
//#source-creators
val src: Source[Int, Promise[Option[Int]]] = Source.maybe[Int]
//...
// This finishes the stream without emitting anything, just like Source.lazyEmpty did
promise.trySuccess(Some(()))
val ticks = Source.tick(1.second, 3.seconds, "tick")
val pubSource = Source.fromPublisher(TestPublisher.manualProbe[Int]())
val itSource = Source.fromIterator(() => Iterator.continually(Random.nextGaussian))
val futSource = Source.fromFuture(Future.successful(42))
val subSource = Source.asSubscriber
//#source-creators
//#sink-creators
val subSink = Sink.fromSubscriber(TestSubscriber.manualProbe[Int]())
//#sink-creators
//#sink-as-publisher
val pubSink = Sink.asPublisher(fanout = false)
val pubSinkFanout = Sink.asPublisher(fanout = true)
//#sink-as-publisher
//#flatMapConcat
Flow[Source[Int, Any]].flatMapConcat(identity)
//#flatMapConcat
//#group-flatten
Flow[Int]
.groupBy(2, _ % 2) // the first parameter sets max number of substreams
.map(_ + 3)
.concatSubstreams
//#group-flatten
val MaxDistinctWords = 1000
//#group-fold
Flow[String]
.groupBy(MaxDistinctWords, identity)
.fold(("", 0))((pair, word) => (word, pair._2 + 1))
.mergeSubstreams
//#group-fold
//#port-async
class MapAsyncOne[In, Out](f: In Future[Out])(implicit ec: ExecutionContext)
extends GraphStage[FlowShape[In, Out]] {
val in: Inlet[In] = Inlet("MapAsyncOne.in")
val out: Outlet[Out] = Outlet("MapAsyncOne.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
// The actual logic is encapsulated in a GraphStageLogic now
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
// All of the state *must* be encapsulated in the GraphStageLogic,
// not in the GraphStage
private var elemInFlight: Out = _
val callback = getAsyncCallback(onAsyncInput)
var holdingUpstream = false
// All upstream related events now are handled in an InHandler instance
setHandler(in, new InHandler {
// No context or element parameter for onPush
override def onPush(): Unit = {
// The element is not passed as an argument but needs to be dequeued explicitly
val elem = grab(in)
val future = f(elem)
future.onComplete(callback.invoke)
// ctx.holdUpstream is no longer needed, but we need to track the state
holdingUpstream = true
}
// No context parameter
override def onUpstreamFinish(): Unit = {
if (holdingUpstream) absorbTermination()
else completeStage() // ctx.finish turns into completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (elemInFlight != null) {
val e = elemInFlight
elemInFlight = null.asInstanceOf[Out]
pushIt(e)
} // holdDownstream is no longer needed
}
})
// absorbTermination turns into the code below.
// This emulates the behavior of the AsyncStage stage.
private def absorbTermination(): Unit =
if (isAvailable(shape.out)) getHandler(out).onPull()
// The line below emulates the behavior of the AsyncStage holdingDownstream
private def holdingDownstream(): Boolean =
!(isClosed(in) || hasBeenPulled(in))
// Any method can be used as a callback, we chose the previous name for
// easier comparison with the original code
private def onAsyncInput(input: Try[Out]) =
input match {
case Failure(ex) failStage(ex)
case Success(e) if holdingDownstream() pushIt(e)
case Success(e)
elemInFlight = e
// ctx.ignore is no longer needed
}
private def pushIt(elem: Out): Unit = {
// ctx.isFinishing turns into isClosed(in)
if (isClosed(in)) {
// pushAndFinish is now two actions
push(out, elem)
completeStage()
} else {
// pushAndPull is now two actions
push(out, elem)
pull(in)
holdingUpstream = false
}
}
}
}
//#port-async
val uri: Uri = ???
//#raw-query
val queryPart: Option[String] = uri.rawQueryString
//#raw-query
//#query-param
val param: Option[String] = uri.query().get("a")
//#query-param
//#file-source-sink
val fileSrc = FileIO.fromFile(new File("."))
val otherFileSrc = FileIO.fromFile(new File("."), 1024)
val someFileSink = FileIO.toFile(new File("."))
//#file-source-sink
class SomeInputStream extends java.io.InputStream { override def read(): Int = 0 }
class SomeOutputStream extends java.io.OutputStream { override def write(b: Int): Unit = () }
//#input-output-stream-source-sink
val inputStreamSrc = StreamConverters.fromInputStream(() => new SomeInputStream())
val otherInputStreamSrc = StreamConverters.fromInputStream(() => new SomeInputStream())
val someOutputStreamSink = StreamConverters.fromOutputStream(() => new SomeOutputStream())
//#input-output-stream-source-sink
//#output-input-stream-source-sink
val timeout: FiniteDuration = 0.seconds
val outputStreamSrc = StreamConverters.asOutputStream()
val otherOutputStreamSrc = StreamConverters.asOutputStream(timeout)
val someInputStreamSink = StreamConverters.asInputStream()
val someOtherInputStreamSink = StreamConverters.asInputStream(timeout)
//#output-input-stream-source-sink
}
}
}
}

View file

@ -0,0 +1,111 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
import scala.util.Random
import scala.math._
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.collection.immutable
import akka.testkit.TestLatch
class RateTransformationDocSpec extends AkkaSpec {
type Seq[+A] = immutable.Seq[A]
val Seq = immutable.Seq
implicit val materializer = ActorMaterializer()
"conflate should summarize" in {
//#conflate-summarize
val statsFlow = Flow[Double]
.conflate(Seq(_))(_ :+ _)
.map { s =>
val μ = s.sum / s.size
val se = s.map(x => pow(x - μ, 2))
val σ = sqrt(se.sum / se.size)
(σ, μ, s.size)
}
//#conflate-summarize
val fut = Source.fromIterator(() => Iterator.continually(Random.nextGaussian))
.via(statsFlow)
.grouped(10)
.runWith(Sink.head)
Await.result(fut, 100.millis)
}
"conflate should sample" in {
//#conflate-sample
val p = 0.01
val sampleFlow = Flow[Double]
.conflate(Seq(_)) {
case (acc, elem) if Random.nextDouble < p => acc :+ elem
case (acc, _) => acc
}
.mapConcat(identity)
//#conflate-sample
val fut = Source(1 to 1000)
.map(_.toDouble)
.via(sampleFlow)
.runWith(Sink.fold(Seq.empty[Double])(_ :+ _))
val count = Await.result(fut, 1000.millis).size
}
"expand should repeat last" in {
//#expand-last
val lastFlow = Flow[Double]
.expand(identity)(s => (s, s))
//#expand-last
val (probe, fut) = TestSource.probe[Double]
.via(lastFlow)
.grouped(10)
.toMat(Sink.head)(Keep.both)
.run()
probe.sendNext(1.0)
val expanded = Await.result(fut, 100.millis)
expanded.size shouldBe 10
expanded.sum shouldBe 10
}
"expand should track drift" in {
//#expand-drift
val driftFlow = Flow[Double]
.expand((_, 0)) {
case (lastElement, drift) => ((lastElement, drift), (lastElement, drift + 1))
}
//#expand-drift
val latch = TestLatch(2)
val realDriftFlow = Flow[Double]
.expand(d => { latch.countDown(); (d, 0) }) {
case (lastElement, drift) => ((lastElement, drift), (lastElement, drift + 1))
}
val (pub, sub) = TestSource.probe[Double]
.via(realDriftFlow)
.toMat(TestSink.probe[(Double, Int)])(Keep.both)
.run()
sub.request(1)
pub.sendNext(1.0)
sub.expectNext((1.0, 0))
sub.requestNext((1.0, 1))
sub.requestNext((1.0, 2))
pub.sendNext(2.0)
Await.ready(latch, 1.second)
sub.requestNext((2.0, 0))
}
}

View file

@ -0,0 +1,147 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ RunnableGraph, Flow, Sink, Source }
import akka.stream.testkit._
import org.reactivestreams.Processor
class ReactiveStreamsDocSpec extends AkkaSpec {
import TwitterStreamQuickstartDocSpec._
implicit val materializer = ActorMaterializer()
//#imports
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
//#imports
trait Fixture {
//#authors
val authors = Flow[Tweet]
.filter(_.hashtags.contains(akka))
.map(_.author)
//#authors
//#tweets-publisher
def tweets: Publisher[Tweet]
//#tweets-publisher
//#author-storage-subscriber
def storage: Subscriber[Author]
//#author-storage-subscriber
//#author-alert-subscriber
def alert: Subscriber[Author]
//#author-alert-subscriber
}
val impl = new Fixture {
override def tweets: Publisher[Tweet] =
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.asPublisher(false))
override def storage = TestSubscriber.manualProbe[Author]
override def alert = TestSubscriber.manualProbe[Author]
}
def assertResult(storage: TestSubscriber.ManualProbe[Author]): Unit = {
val sub = storage.expectSubscription()
sub.request(10)
storage.expectNext(Author("rolandkuhn"))
storage.expectNext(Author("patriknw"))
storage.expectNext(Author("bantonsson"))
storage.expectNext(Author("drewhk"))
storage.expectNext(Author("ktosopl"))
storage.expectNext(Author("mmartynas"))
storage.expectNext(Author("akkateam"))
storage.expectComplete()
}
"reactive streams publisher via flow to subscriber" in {
import impl._
val storage = impl.storage
//#connect-all
Source.fromPublisher(tweets).via(authors).to(Sink.fromSubscriber(storage)).run()
//#connect-all
assertResult(storage)
}
"flow as publisher and subscriber" in {
import impl._
val storage = impl.storage
//#flow-publisher-subscriber
val processor: Processor[Tweet, Author] = authors.toProcessor.run()
tweets.subscribe(processor)
processor.subscribe(storage)
//#flow-publisher-subscriber
assertResult(storage)
}
"source as publisher" in {
import impl._
val storage = impl.storage
//#source-publisher
val authorPublisher: Publisher[Author] =
Source.fromPublisher(tweets).via(authors).runWith(Sink.asPublisher(fanout = false))
authorPublisher.subscribe(storage)
//#source-publisher
assertResult(storage)
}
"source as fanoutPublisher" in {
import impl._
val storage = impl.storage
val alert = impl.alert
//#source-fanoutPublisher
val authorPublisher: Publisher[Author] =
Source.fromPublisher(tweets).via(authors)
.runWith(Sink.asPublisher(fanout = true))
authorPublisher.subscribe(storage)
authorPublisher.subscribe(alert)
//#source-fanoutPublisher
// this relies on fanoutPublisher buffer size > number of authors
assertResult(storage)
assertResult(alert)
}
"sink as subscriber" in {
import impl._
val storage = impl.storage
//#sink-subscriber
val tweetSubscriber: Subscriber[Tweet] =
authors.to(Sink.fromSubscriber(storage)).runWith(Source.asSubscriber[Tweet])
tweets.subscribe(tweetSubscriber)
//#sink-subscriber
assertResult(storage)
}
"use a processor" in {
//#use-processor
// An example Processor factory
def createProcessor: Processor[Int, Int] = Flow[Int].toProcessor.run()
val flow: Flow[Int, Int, Unit] = Flow.fromProcessor(() => createProcessor)
//#use-processor
}
}

View file

@ -0,0 +1,89 @@
package docs.stream
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
class StreamBuffersRateSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
"Demonstrate pipelining" in {
def println(s: Any) = ()
//#pipelining
Source(1 to 3)
.map { i => println(s"A: $i"); i }
.map { i => println(s"B: $i"); i }
.map { i => println(s"C: $i"); i }
.runWith(Sink.ignore)
//#pipelining
}
"Demonstrate buffer sizes" in {
//#materializer-buffer
val materializer = ActorMaterializer(
ActorMaterializerSettings(system)
.withInputBuffer(
initialSize = 64,
maxSize = 64))
//#materializer-buffer
//#section-buffer
val section = Flow[Int].map(_ * 2)
.withAttributes(Attributes.inputBuffer(initial = 1, max = 1))
val flow = section.via(Flow[Int].map(_ / 2)) // the buffer size of this map is the default
//#section-buffer
}
"buffering abstraction leak" in {
//#buffering-abstraction-leak
import scala.concurrent.duration._
case class Tick()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zipper = b.add(ZipWith[Tick, Int, Int]((tick, count) => count))
Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0
Source.tick(initialDelay = 1.second, interval = 1.second, "message!")
.conflate(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1
zipper.out ~> Sink.foreach(println)
ClosedShape
})
//#buffering-abstraction-leak
}
"explcit buffers" in {
trait Job
def inboundJobsConnector(): Source[Job, Unit] = Source.empty
//#explicit-buffers-backpressure
// Getting a stream of jobs from an imaginary external system as a Source
val jobs: Source[Job, Unit] = inboundJobsConnector()
jobs.buffer(1000, OverflowStrategy.backpressure)
//#explicit-buffers-backpressure
//#explicit-buffers-droptail
jobs.buffer(1000, OverflowStrategy.dropTail)
//#explicit-buffers-droptail
//#explicit-buffers-dropnew
jobs.buffer(1000, OverflowStrategy.dropNew)
//#explicit-buffers-dropnew
//#explicit-buffers-drophead
jobs.buffer(1000, OverflowStrategy.dropHead)
//#explicit-buffers-drophead
//#explicit-buffers-dropbuffer
jobs.buffer(1000, OverflowStrategy.dropBuffer)
//#explicit-buffers-dropbuffer
//#explicit-buffers-fail
jobs.buffer(1000, OverflowStrategy.fail)
//#explicit-buffers-fail
}
}

View file

@ -0,0 +1,130 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.actor.ActorRef
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit.AkkaSpec
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
class StreamPartialFlowGraphDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
"build with open ports" in {
//#simple-partial-flow-graph
val pickMaxOfThree = GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val zip1 = b.add(ZipWith[Int, Int, Int](math.max _))
val zip2 = b.add(ZipWith[Int, Int, Int](math.max _))
zip1.out ~> zip2.in0
UniformFanInShape(zip2.out, zip1.in0, zip1.in1, zip2.in1)
}
val resultSink = Sink.head[Int]
val g = RunnableGraph.fromGraph(GraphDSL.create(resultSink) { implicit b =>
sink =>
import GraphDSL.Implicits._
// importing the partial graph will return its shape (inlets & outlets)
val pm3 = b.add(pickMaxOfThree)
Source.single(1) ~> pm3.in(0)
Source.single(2) ~> pm3.in(1)
Source.single(3) ~> pm3.in(2)
pm3.out ~> sink.in
ClosedShape
})
val max: Future[Int] = g.run()
Await.result(max, 300.millis) should equal(3)
//#simple-partial-flow-graph
}
"build source from partial flow graph" in {
//#source-from-partial-flow-graph
val pairs = Source.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// prepare graph elements
val zip = b.add(Zip[Int, Int]())
def ints = Source.fromIterator(() => Iterator.from(1))
// connect the graph
ints.filter(_ % 2 != 0) ~> zip.in0
ints.filter(_ % 2 == 0) ~> zip.in1
// expose port
SourceShape(zip.out)
})
val firstPair: Future[(Int, Int)] = pairs.runWith(Sink.head)
//#source-from-partial-flow-graph
Await.result(firstPair, 300.millis) should equal(1 -> 2)
}
"build flow from partial flow graph" in {
//#flow-from-partial-flow-graph
val pairUpWithToString =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// prepare graph elements
val broadcast = b.add(Broadcast[Int](2))
val zip = b.add(Zip[Int, String]())
// connect the graph
broadcast.out(0).map(identity) ~> zip.in0
broadcast.out(1).map(_.toString) ~> zip.in1
// expose ports
FlowShape(broadcast.in, zip.out)
})
//#flow-from-partial-flow-graph
// format: OFF
val (_, matSink: Future[(Int, String)]) =
//#flow-from-partial-flow-graph
pairUpWithToString.runWith(Source(List(1)), Sink.head)
//#flow-from-partial-flow-graph
// format: ON
Await.result(matSink, 300.millis) should equal(1 -> "1")
}
"combine sources with simplified API" in {
//#source-combine
val sourceOne = Source(List(1))
val sourceTwo = Source(List(2))
val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))
val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _))
//#source-combine
Await.result(mergedResult, 300.millis) should equal(3)
}
"combine sinks with simplified API" in {
val actorRef: ActorRef = testActor
//#sink-combine
val sendRmotely = Sink.actorRef(actorRef, "Done")
val localProcessing = Sink.foreach[Int](_ => /* do something usefull */ ())
val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast[Int](_))
Source(List(0, 1, 2)).runWith(sink)
//#sink-combine
expectMsg(0)
expectMsg(1)
expectMsg(2)
}
}

View file

@ -0,0 +1,161 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
import scala.util._
import scala.concurrent.duration._
import scala.concurrent._
import akka.testkit.TestProbe
import akka.pattern
class StreamTestKitDocSpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
"strict collection" in {
//#strict-collection
val sinkUnderTest = Flow[Int].map(_ * 2).toMat(Sink.fold(0)(_ + _))(Keep.right)
val future = Source(1 to 4).runWith(sinkUnderTest)
val result = Await.result(future, 100.millis)
assert(result == 20)
//#strict-collection
}
"grouped part of infinite stream" in {
//#grouped-infinite
import system.dispatcher
import akka.pattern.pipe
val sourceUnderTest = Source.repeat(1).map(_ * 2)
val future = sourceUnderTest.grouped(10).runWith(Sink.head)
val result = Await.result(future, 100.millis)
assert(result == Seq.fill(10)(2))
//#grouped-infinite
}
"folded stream" in {
//#folded-stream
val flowUnderTest = Flow[Int].takeWhile(_ < 5)
val future = Source(1 to 10).via(flowUnderTest).runWith(Sink.fold(Seq.empty[Int])(_ :+ _))
val result = Await.result(future, 100.millis)
assert(result == (1 to 4))
//#folded-stream
}
"pipe to test probe" in {
//#pipeto-testprobe
import system.dispatcher
import akka.pattern.pipe
val sourceUnderTest = Source(1 to 4).grouped(2)
val probe = TestProbe()
sourceUnderTest.grouped(2).runWith(Sink.head).pipeTo(probe.ref)
probe.expectMsg(100.millis, Seq(Seq(1, 2), Seq(3, 4)))
//#pipeto-testprobe
}
"sink actor ref" in {
//#sink-actorref
case object Tick
val sourceUnderTest = Source.tick(0.seconds, 200.millis, Tick)
val probe = TestProbe()
val cancellable = sourceUnderTest.to(Sink.actorRef(probe.ref, "completed")).run()
probe.expectMsg(1.second, Tick)
probe.expectNoMsg(100.millis)
probe.expectMsg(200.millis, Tick)
cancellable.cancel()
probe.expectMsg(200.millis, "completed")
//#sink-actorref
}
"source actor ref" in {
//#source-actorref
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (ref, future) = Source.actorRef(8, OverflowStrategy.fail)
.toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
ref ! 3
ref ! akka.actor.Status.Success("done")
val result = Await.result(future, 100.millis)
assert(result == "123")
//#source-actorref
}
"test sink probe" in {
//#test-sink-probe
val sourceUnderTest = Source(1 to 4).filter(_ % 2 == 0).map(_ * 2)
sourceUnderTest
.runWith(TestSink.probe[Int])
.request(2)
.expectNext(4, 8)
.expectComplete()
//#test-sink-probe
}
"test source probe" in {
//#test-source-probe
val sinkUnderTest = Sink.cancelled
TestSource.probe[Int]
.toMat(sinkUnderTest)(Keep.left)
.run()
.expectCancellation()
//#test-source-probe
}
"injecting failure" in {
//#injecting-failure
val sinkUnderTest = Sink.head[Int]
val (probe, future) = TestSource.probe[Int]
.toMat(sinkUnderTest)(Keep.both)
.run()
probe.sendError(new Exception("boom"))
Await.ready(future, 100.millis)
val Failure(exception) = future.value.get
assert(exception.getMessage == "boom")
//#injecting-failure
}
"test source and a sink" in {
import system.dispatcher
//#test-source-and-sink
val flowUnderTest = Flow[Int].mapAsyncUnordered(2) { sleep =>
pattern.after(10.millis * sleep, using = system.scheduler)(Future.successful(sleep))
}
val (pub, sub) = TestSource.probe[Int]
.via(flowUnderTest)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
sub.request(n = 3)
pub.sendNext(3)
pub.sendNext(2)
pub.sendNext(1)
sub.expectNextUnordered(1, 2, 3)
pub.sendError(new Exception("Power surge in the linear subroutine C-47!"))
val ex = sub.expectError()
assert(ex.getMessage.contains("C-47"))
//#test-source-and-sink
}
}

View file

@ -0,0 +1,210 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
//#imports
import akka.actor.ActorSystem
import akka.stream.{ ClosedShape, ActorMaterializer, OverflowStrategy }
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.Future
//#imports
import akka.stream.testkit.AkkaSpec
object TwitterStreamQuickstartDocSpec {
//#model
final case class Author(handle: String)
final case class Hashtag(name: String)
final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] =
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
}
val akka = Hashtag("#akka")
//#model
val tweets = Source(
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("bantonsson"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("drewhk"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("ktosopl"), System.currentTimeMillis, "#akka on the rocks!") ::
Tweet(Author("mmartynas"), System.currentTimeMillis, "wow #akka !") ::
Tweet(Author("akkateam"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("bananaman"), System.currentTimeMillis, "#bananas rock!") ::
Tweet(Author("appleman"), System.currentTimeMillis, "#apples rock!") ::
Tweet(Author("drama"), System.currentTimeMillis, "we compared #apples to #oranges!") ::
Nil)
}
class TwitterStreamQuickstartDocSpec extends AkkaSpec {
import TwitterStreamQuickstartDocSpec._
implicit val executionContext = system.dispatcher
// Disable println
def println(s: Any): Unit = ()
trait Example0 {
//#tweet-source
val tweets: Source[Tweet, Unit]
//#tweet-source
}
trait Example1 {
//#first-sample
//#materializer-setup
implicit val system = ActorSystem("reactive-tweets")
implicit val materializer = ActorMaterializer()
//#materializer-setup
//#first-sample
}
implicit val materializer = ActorMaterializer()
"filter and map" in {
//#first-sample
//#authors-filter-map
val authors: Source[Author, Unit] =
tweets
.filter(_.hashtags.contains(akka))
.map(_.author)
//#first-sample
//#authors-filter-map
trait Example3 {
//#authors-collect
val authors: Source[Author, Unit] =
tweets.collect { case t if t.hashtags.contains(akka) => t.author }
//#authors-collect
}
//#first-sample
//#authors-foreachsink-println
authors.runWith(Sink.foreach(println))
//#authors-foreachsink-println
//#first-sample
//#authors-foreach-println
authors.runForeach(println)
//#authors-foreach-println
}
"mapConcat hashtags" in {
//#hashtags-mapConcat
val hashtags: Source[Hashtag, Unit] = tweets.mapConcat(_.hashtags.toList)
//#hashtags-mapConcat
}
trait HiddenDefinitions {
//#flow-graph-broadcast
val writeAuthors: Sink[Author, Unit] = ???
val writeHashtags: Sink[Hashtag, Unit] = ???
//#flow-graph-broadcast
}
"simple broadcast" in {
val writeAuthors: Sink[Author, Future[Unit]] = Sink.ignore
val writeHashtags: Sink[Hashtag, Future[Unit]] = Sink.ignore
// format: OFF
//#flow-graph-broadcast
val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Tweet](2))
tweets ~> bcast.in
bcast.out(0) ~> Flow[Tweet].map(_.author) ~> writeAuthors
bcast.out(1) ~> Flow[Tweet].mapConcat(_.hashtags.toList) ~> writeHashtags
ClosedShape
})
g.run()
//#flow-graph-broadcast
// format: ON
}
"slowProcessing" in {
def slowComputation(t: Tweet): Long = {
Thread.sleep(500) // act as if performing some heavy computation
42
}
//#tweets-slow-consumption-dropHead
tweets
.buffer(10, OverflowStrategy.dropHead)
.map(slowComputation)
.runWith(Sink.ignore)
//#tweets-slow-consumption-dropHead
}
"backpressure by readline" in {
trait X {
import scala.concurrent.duration._
//#backpressure-by-readline
val completion: Future[Unit] =
Source(1 to 10)
.map(i => { println(s"map => $i"); i })
.runForeach { i => readLine(s"Element = $i; continue reading? [press enter]\n") }
Await.ready(completion, 1.minute)
//#backpressure-by-readline
}
}
"count elements on finite stream" in {
//#tweets-fold-count
val count: Flow[Tweet, Int, Unit] = Flow[Tweet].map(_ => 1)
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counterGraph: RunnableGraph[Future[Int]] =
tweets
.via(count)
.toMat(sumSink)(Keep.right)
val sum: Future[Int] = counterGraph.run()
sum.foreach(c => println(s"Total tweets processed: $c"))
//#tweets-fold-count
new AnyRef {
//#tweets-fold-count-oneline
val sum: Future[Int] = tweets.map(t => 1).runWith(sumSink)
//#tweets-fold-count-oneline
}
}
"materialize multiple times" in {
val tweetsInMinuteFromNow = tweets // not really in second, just acting as if
//#tweets-runnable-flow-materialized-twice
val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counterRunnableGraph: RunnableGraph[Future[Int]] =
tweetsInMinuteFromNow
.filter(_.hashtags contains akka)
.map(t => 1)
.toMat(sumSink)(Keep.right)
// materialize the stream once in the morning
val morningTweetsCount: Future[Int] = counterRunnableGraph.run()
// and once in the evening, reusing the flow
val eveningTweetsCount: Future[Int] = counterRunnableGraph.run()
//#tweets-runnable-flow-materialized-twice
val sum: Future[Int] = counterRunnableGraph.run()
sum.map { c => println(s"Total tweets processed: $c") }
}
}

View file

@ -0,0 +1,100 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeByteStrings extends RecipeSpec {
"Recipes for bytestring streams" must {
"have a working chunker" in {
val rawBytes = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
val ChunkLimit = 2
//#bytestring-chunker
import akka.stream.stage._
class Chunker(val chunkSize: Int) extends PushPullStage[ByteString, ByteString] {
private var buffer = ByteString.empty
override def onPush(elem: ByteString, ctx: Context[ByteString]): SyncDirective = {
buffer ++= elem
emitChunkOrPull(ctx)
}
override def onPull(ctx: Context[ByteString]): SyncDirective = emitChunkOrPull(ctx)
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective =
if (buffer.nonEmpty) ctx.absorbTermination()
else ctx.finish()
private def emitChunkOrPull(ctx: Context[ByteString]): SyncDirective = {
if (buffer.isEmpty) {
if (ctx.isFinishing) ctx.finish()
else ctx.pull()
} else {
val (emit, nextBuffer) = buffer.splitAt(chunkSize)
buffer = nextBuffer
ctx.push(emit)
}
}
}
val chunksStream = rawBytes.transform(() => new Chunker(ChunkLimit))
//#bytestring-chunker
val chunksFuture = chunksStream.grouped(10).runWith(Sink.head)
val chunks = Await.result(chunksFuture, 3.seconds)
chunks.forall(_.size <= 2) should be(true)
chunks.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
}
"have a working bytes limiter" in {
val SizeLimit = 9
//#bytes-limiter
import akka.stream.stage._
class ByteLimiter(val maximumBytes: Long) extends PushStage[ByteString, ByteString] {
private var count = 0
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
count += chunk.size
if (count > maximumBytes) ctx.fail(new IllegalStateException("Too much bytes"))
else ctx.push(chunk)
}
}
val limiter = Flow[ByteString].transform(() => new ByteLimiter(SizeLimit))
//#bytes-limiter
val bytes1 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10)))
Await.result(bytes1.via(limiter).grouped(10).runWith(Sink.head), 3.seconds)
.fold(ByteString())(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
an[IllegalStateException] must be thrownBy {
Await.result(bytes2.via(limiter).grouped(10).runWith(Sink.head), 3.seconds)
}
}
"demonstrate compacting" in {
val data = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
//#compacting-bytestrings
val compacted: Source[ByteString, Unit] = data.map(_.compact)
//#compacting-bytestrings
Await.result(compacted.grouped(10).runWith(Sink.head), 3.seconds).forall(_.isCompact) should be(true)
}
}
}

View file

@ -0,0 +1,90 @@
package docs.stream.cookbook
import akka.stream.{ ActorMaterializerSettings, ActorMaterializer }
import akka.stream.scaladsl._
import akka.stream.testkit._
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeCollectingMetrics extends RecipeSpec {
import HoldOps._
implicit val m2 = ActorMaterializer(ActorMaterializerSettings(system).withInputBuffer(1, 1))
"Recipe for periodically collecting metrics" must {
"work" in {
// type Tick = Unit
//
// val loadPub = TestPublisher.manualProbe[Int]()
// val tickPub = TestPublisher.manualProbe[Tick]()
// val reportTicks = Source.fromPublisher(tickPub)
// val loadUpdates = Source.fromPublisher(loadPub)
// val futureSink = Sink.head[immutable.Seq[String]]
// val sink = Flow[String].grouped(10).to(futureSink)
//
// //#periodic-metrics-collection
// val currentLoad = loadUpdates.transform(() => new HoldWithWait)
//
// val graph = GraphDSL { implicit builder =>
// import FlowGraphImplicits._
// val collector = ZipWith[Int, Tick, String](
// (load: Int, tick: Tick) => s"current load is $load")
//
// currentLoad ~> collector.left
// reportTicks ~> collector.right
//
// collector.out ~> sink
// }
// //#periodic-metrics-collection
//
// val reports = graph.run().get(futureSink)
// val manualLoad = new StreamTestKit.AutoPublisher(loadPub)
// val manualTick = new StreamTestKit.AutoPublisher(tickPub)
//
// // Prefetch elimination
// manualTick.sendNext(())
//
// manualLoad.sendNext(53)
// manualLoad.sendNext(61)
// manualTick.sendNext(())
//
// manualLoad.sendNext(44)
// manualLoad.sendNext(54)
// manualLoad.sendNext(78)
// Thread.sleep(500)
//
// manualTick.sendNext(())
//
// manualTick.sendComplete()
//
// Await.result(reports, 3.seconds) should be(List("current load is 53", "current load is 61", "current load is 78"))
// Periodically collect values of metrics expressed as stream of updates
// ---------------------------------------------------------------------
//
// **Situation:** Given performance counters expressed as a stream of updates we want to gather a periodic report of these.
// We do not want to backpressure the counter updates but always take the last value instead. Whenever we don't have a new counter
// value we want to repeat the last value.
//
// This recipe uses the :class:`HoldWithWait` recipe introduced previously. We use this element to gather updates from
// the counter stream and store the final value, and also repeat this final value if no update is received between
// metrics collection rounds.
//
// To finish the recipe, we simply use :class:`ZipWith` to trigger reading the latest value from the ``currentLoad``
// stream whenever a new ``Tick`` arrives on the stream of ticks, ``reportTicks``.
//
// .. includecode:: ../code/docs/stream/cookbook/RecipeCollectingMetrics.scala#periodic-metrics-collection
//
// .. warning::
// In order for this recipe to work the buffer size for the :class:`ZipWith` must be set to 1. The reason for this is
// explained in the "Buffering" section of the documentation.
// FIXME: This recipe does only work with buffer size of 0, which is only available if graph fusing is implemented
pending
}
}
}

View file

@ -0,0 +1,62 @@
package docs.stream.cookbook
import java.security.MessageDigest
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeDigest extends RecipeSpec {
"Recipe for calculating digest" must {
"work" in {
val data = Source(List(
ByteString("abcdbcdecdef"),
ByteString("defgefghfghighijhijkijkljklmklmnlmnomnopnopq")))
//#calculating-digest
import akka.stream.stage._
def digestCalculator(algorithm: String) = new PushPullStage[ByteString, ByteString] {
val digest = MessageDigest.getInstance(algorithm)
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
digest.update(chunk.toArray)
ctx.pull()
}
override def onPull(ctx: Context[ByteString]): SyncDirective = {
if (ctx.isFinishing) ctx.pushAndFinish(ByteString(digest.digest()))
else ctx.pull()
}
override def onUpstreamFinish(ctx: Context[ByteString]): TerminationDirective = {
// If the stream is finished, we need to emit the last element in the onPull block.
// It is not allowed to directly emit elements from a termination block
// (onUpstreamFinish or onUpstreamFailure)
ctx.absorbTermination()
}
}
val digest: Source[ByteString, Unit] = data.transform(() => digestCalculator("SHA-256"))
//#calculating-digest
Await.result(digest.runWith(Sink.head), 3.seconds) should be(
ByteString(
0x24, 0x8d, 0x6a, 0x61,
0xd2, 0x06, 0x38, 0xb8,
0xe5, 0xc0, 0x26, 0x93,
0x0c, 0x3e, 0x60, 0x39,
0xa3, 0x3c, 0xe4, 0x59,
0x64, 0xff, 0x21, 0x67,
0xf6, 0xec, 0xed, 0xd4,
0x19, 0xdb, 0x06, 0xc1))
}
}
}

View file

@ -0,0 +1,65 @@
package docs.stream.cookbook
import akka.stream.{ ClosedShape, OverflowStrategy }
import akka.stream.scaladsl._
import akka.stream.testkit._
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeDroppyBroadcast extends RecipeSpec {
"Recipe for a droppy broadcast" must {
"work" in {
val pub = TestPublisher.probe[Int]()
val myElements = Source.fromPublisher(pub)
val sub1 = TestSubscriber.manualProbe[Int]()
val sub2 = TestSubscriber.manualProbe[Int]()
val sub3 = TestSubscriber.probe[Int]()
val futureSink = Sink.head[Seq[Int]]
val mySink1 = Sink.fromSubscriber(sub1)
val mySink2 = Sink.fromSubscriber(sub2)
val mySink3 = Sink.fromSubscriber(sub3)
//#droppy-bcast
val graph = RunnableGraph.fromGraph(GraphDSL.create(mySink1, mySink2, mySink3)((_, _, _)) { implicit b =>
(sink1, sink2, sink3) =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Int](3))
myElements ~> bcast
bcast.buffer(10, OverflowStrategy.dropHead) ~> sink1
bcast.buffer(10, OverflowStrategy.dropHead) ~> sink2
bcast.buffer(10, OverflowStrategy.dropHead) ~> sink3
ClosedShape
})
//#droppy-bcast
graph.run()
sub3.request(100)
for (i <- 1 to 100) {
pub.sendNext(i)
sub3.expectNext(i)
}
pub.sendComplete()
sub1.expectSubscription().request(10)
sub2.expectSubscription().request(10)
for (i <- 91 to 100) {
sub1.expectNext(i)
sub2.expectNext(i)
}
sub1.expectComplete()
sub2.expectComplete()
}
}
}

View file

@ -0,0 +1,28 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeFlattenSeq extends RecipeSpec {
"Recipe for flatteing a stream of seqs" must {
"work" in {
val someDataSource = Source(List(List("1"), List("2"), List("3", "4", "5"), List("6", "7")))
//#flattening-seqs
val myData: Source[List[Message], Unit] = someDataSource
val flattened: Source[Message, Unit] = myData.mapConcat(identity)
//#flattening-seqs
Await.result(flattened.grouped(8).runWith(Sink.head), 3.seconds) should be(List("1", "2", "3", "4", "5", "6", "7"))
}
}
}

View file

@ -0,0 +1,135 @@
package docs.stream.cookbook
import akka.actor.{ Props, ActorRef, Actor }
import akka.actor.Actor.Receive
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.stream.testkit._
import scala.collection.immutable
import scala.concurrent.duration._
class RecipeGlobalRateLimit extends RecipeSpec {
"Global rate limiting recipe" must {
//#global-limiter-actor
object Limiter {
case object WantToPass
case object MayPass
case object ReplenishTokens
def props(maxAvailableTokens: Int, tokenRefreshPeriod: FiniteDuration,
tokenRefreshAmount: Int): Props =
Props(new Limiter(maxAvailableTokens, tokenRefreshPeriod, tokenRefreshAmount))
}
class Limiter(
val maxAvailableTokens: Int,
val tokenRefreshPeriod: FiniteDuration,
val tokenRefreshAmount: Int) extends Actor {
import Limiter._
import context.dispatcher
import akka.actor.Status
private var waitQueue = immutable.Queue.empty[ActorRef]
private var permitTokens = maxAvailableTokens
private val replenishTimer = system.scheduler.schedule(
initialDelay = tokenRefreshPeriod,
interval = tokenRefreshPeriod,
receiver = self,
ReplenishTokens)
override def receive: Receive = open
val open: Receive = {
case ReplenishTokens =>
permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens)
case WantToPass =>
permitTokens -= 1
sender() ! MayPass
if (permitTokens == 0) context.become(closed)
}
val closed: Receive = {
case ReplenishTokens =>
permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens)
releaseWaiting()
case WantToPass =>
waitQueue = waitQueue.enqueue(sender())
}
private def releaseWaiting(): Unit = {
val (toBeReleased, remainingQueue) = waitQueue.splitAt(permitTokens)
waitQueue = remainingQueue
permitTokens -= toBeReleased.size
toBeReleased foreach (_ ! MayPass)
if (permitTokens > 0) context.become(open)
}
override def postStop(): Unit = {
replenishTimer.cancel()
waitQueue foreach (_ ! Status.Failure(new IllegalStateException("limiter stopped")))
}
}
//#global-limiter-actor
"work" in {
//#global-limiter-flow
def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T, Unit] = {
import akka.pattern.ask
import akka.util.Timeout
Flow[T].mapAsync(4)((element: T) => {
import system.dispatcher
implicit val triggerTimeout = Timeout(maxAllowedWait)
val limiterTriggerFuture = limiter ? Limiter.WantToPass
limiterTriggerFuture.map((_) => element)
})
}
//#global-limiter-flow
// Use a large period and emulate the timer by hand instead
val limiter = system.actorOf(Limiter.props(2, 100.days, 1), "limiter")
val source1 = Source.fromIterator(() => Iterator.continually("E1")).via(limitGlobal(limiter, 2.seconds))
val source2 = Source.fromIterator(() => Iterator.continually("E2")).via(limitGlobal(limiter, 2.seconds))
val probe = TestSubscriber.manualProbe[String]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[String](2))
source1 ~> merge ~> Sink.fromSubscriber(probe)
source2 ~> merge
ClosedShape
}).run()
probe.expectSubscription().request(1000)
probe.expectNext() should startWith("E")
probe.expectNext() should startWith("E")
probe.expectNoMsg(500.millis)
limiter ! Limiter.ReplenishTokens
probe.expectNext() should startWith("E")
probe.expectNoMsg(500.millis)
var resultSet = Set.empty[String]
for (_ <- 1 to 100) {
limiter ! Limiter.ReplenishTokens
resultSet += probe.expectNext()
}
resultSet.contains("E1") should be(true)
resultSet.contains("E2") should be(true)
probe.expectError()
}
}
}

View file

@ -0,0 +1,115 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit._
import scala.concurrent.duration._
object HoldOps {
//#hold-version-1
import akka.stream.stage._
class HoldWithInitial[T](initial: T) extends DetachedStage[T, T] {
private var currentValue: T = initial
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
currentValue = elem
ctx.pull()
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
ctx.push(currentValue)
}
}
//#hold-version-1
//#hold-version-2
import akka.stream.stage._
class HoldWithWait[T] extends DetachedStage[T, T] {
private var currentValue: T = _
private var waitingFirstValue = true
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = {
currentValue = elem
waitingFirstValue = false
if (ctx.isHoldingDownstream) ctx.pushAndPull(currentValue)
else ctx.pull()
}
override def onPull(ctx: DetachedContext[T]): DownstreamDirective = {
if (waitingFirstValue) ctx.holdDownstream()
else ctx.push(currentValue)
}
}
//#hold-version-2
}
class RecipeHold extends RecipeSpec {
import HoldOps._
"Recipe for creating a holding element" must {
"work for version 1" in {
val pub = TestPublisher.probe[Int]()
val sub = TestSubscriber.manualProbe[Int]()
val source = Source.fromPublisher(pub)
val sink = Sink.fromSubscriber(sub)
source.transform(() => new HoldWithInitial(0)).to(sink).run()
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
subscription.request(1)
sub.expectNext(0)
subscription.request(1)
sub.expectNext(0)
pub.sendNext(1)
pub.sendNext(2)
subscription.request(2)
sub.expectNext(2)
sub.expectNext(2)
pub.sendComplete()
subscription.request(1)
sub.expectComplete()
}
"work for version 2" in {
val pub = TestPublisher.probe[Int]()
val sub = TestSubscriber.manualProbe[Int]()
val source = Source.fromPublisher(pub)
val sink = Sink.fromSubscriber(sub)
source.transform(() => new HoldWithWait).to(sink).run()
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
subscription.request(1)
sub.expectNoMsg(100.millis)
pub.sendNext(1)
sub.expectNext(1)
pub.sendNext(2)
pub.sendNext(3)
subscription.request(2)
sub.expectNext(3)
sub.expectNext(3)
pub.sendComplete()
subscription.request(1)
sub.expectComplete()
}
}
}

View file

@ -0,0 +1,25 @@
package docs.stream.cookbook
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.util.ByteString
class RecipeKeepAlive extends RecipeSpec {
"Recipe for injecting keepalive messages" must {
"work" in {
val keepaliveMessage = ByteString(11)
//#inject-keepalive
import scala.concurrent.duration._
val injectKeepAlive: Flow[ByteString, ByteString, Unit] =
Flow[ByteString].keepAlive(1.second, () => keepaliveMessage)
//#inject-keepalive
// No need to test, this is a built-in stage with proper tests
}
}
}

View file

@ -0,0 +1,50 @@
package docs.stream.cookbook
import akka.event.Logging
import akka.stream.Attributes
import akka.stream.scaladsl.{ Sink, Source }
import akka.testkit.{ EventFilter, TestProbe }
class RecipeLoggingElements extends RecipeSpec {
"Simple logging recipe" must {
"work with println" in {
val printProbe = TestProbe()
def println(s: String): Unit = printProbe.ref ! s
val mySource = Source(List("1", "2", "3"))
//#println-debug
val loggedSource = mySource.map { elem => println(elem); elem }
//#println-debug
loggedSource.runWith(Sink.ignore)
printProbe.expectMsgAllOf("1", "2", "3")
}
"use log()" in {
val mySource = Source(List("1", "2", "3"))
def analyse(s: String) = s
//#log-custom
// customise log levels
mySource.log("before-map")
.withAttributes(Attributes.logLevels(onElement = Logging.WarningLevel))
.map(analyse)
// or provide custom logging adapter
implicit val adapter = Logging(system, "customLogger")
mySource.log("custom")
//#log-custom
val loggedSource = mySource.log("custom")
EventFilter.debug(start = "[custom] Element: ").intercept {
loggedSource.runWith(Sink.ignore)
}
}
}
}

View file

@ -0,0 +1,93 @@
package docs.stream.cookbook
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.stream.testkit._
import scala.concurrent.duration._
class RecipeManualTrigger extends RecipeSpec {
"Recipe for triggering a stream manually" must {
"work" in {
val elements = Source(List("1", "2", "3", "4"))
val pub = TestPublisher.probe[Trigger]()
val sub = TestSubscriber.manualProbe[Message]()
val triggerSource = Source.fromPublisher(pub)
val sink = Sink.fromSubscriber(sub)
//#manually-triggered-stream
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val zip = builder.add(Zip[Message, Trigger]())
elements ~> zip.in0
triggerSource ~> zip.in1
zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) => msg } ~> sink
ClosedShape
})
//#manually-triggered-stream
graph.run()
sub.expectSubscription().request(1000)
sub.expectNoMsg(100.millis)
pub.sendNext(())
sub.expectNext("1")
sub.expectNoMsg(100.millis)
pub.sendNext(())
pub.sendNext(())
sub.expectNext("2")
sub.expectNext("3")
sub.expectNoMsg(100.millis)
pub.sendNext(())
sub.expectNext("4")
sub.expectComplete()
}
"work with ZipWith" in {
val elements = Source(List("1", "2", "3", "4"))
val pub = TestPublisher.probe[Trigger]()
val sub = TestSubscriber.manualProbe[Message]()
val triggerSource = Source.fromPublisher(pub)
val sink = Sink.fromSubscriber(sub)
//#manually-triggered-stream-zipwith
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val zip = builder.add(ZipWith((msg: Message, trigger: Trigger) => msg))
elements ~> zip.in0
triggerSource ~> zip.in1
zip.out ~> sink
ClosedShape
})
//#manually-triggered-stream-zipwith
graph.run()
sub.expectSubscription().request(1000)
sub.expectNoMsg(100.millis)
pub.sendNext(())
sub.expectNext("1")
sub.expectNoMsg(100.millis)
pub.sendNext(())
pub.sendNext(())
sub.expectNext("2")
sub.expectNext("3")
sub.expectNoMsg(100.millis)
pub.sendNext(())
sub.expectNext("4")
sub.expectComplete()
}
}
}

View file

@ -0,0 +1,57 @@
package docs.stream.cookbook
import akka.stream.scaladsl._
import akka.stream.testkit._
import scala.concurrent.duration._
import akka.testkit.TestLatch
import scala.concurrent.Await
class RecipeMissedTicks extends RecipeSpec {
"Recipe for collecting missed ticks" must {
"work" in {
type Tick = Unit
val pub = TestPublisher.probe[Tick]()
val sub = TestSubscriber.manualProbe[Int]()
val tickStream = Source.fromPublisher(pub)
val sink = Sink.fromSubscriber(sub)
//#missed-ticks
val missedTicks: Flow[Tick, Int, Unit] =
Flow[Tick].conflate(seed = (_) => 0)(
(missedTicks, tick) => missedTicks + 1)
//#missed-ticks
val latch = TestLatch(3)
val realMissedTicks: Flow[Tick, Int, Unit] =
Flow[Tick].conflate(seed = (_) => 0)(
(missedTicks, tick) => { latch.countDown(); missedTicks + 1 })
tickStream.via(realMissedTicks).to(sink).run()
pub.sendNext(())
pub.sendNext(())
pub.sendNext(())
pub.sendNext(())
val subscription = sub.expectSubscription()
Await.ready(latch, 1.second)
subscription.request(1)
sub.expectNext(3)
subscription.request(1)
sub.expectNoMsg(100.millis)
pub.sendNext(())
sub.expectNext(0)
pub.sendComplete()
subscription.request(1)
sub.expectComplete()
}
}
}

View file

@ -0,0 +1,58 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeMultiGroupBy extends RecipeSpec {
"Recipe for multi-groupBy" must {
"work" in {
case class Topic(name: String)
val elems = Source(List("1: a", "1: b", "all: c", "all: d", "1: e"))
val extractTopics = { msg: Message =>
if (msg.startsWith("1")) List(Topic("1"))
else List(Topic("1"), Topic("2"))
}
//#multi-groupby
val topicMapper: (Message) => immutable.Seq[Topic] = extractTopics
val messageAndTopic: Source[(Message, Topic), Unit] = elems.mapConcat { msg: Message =>
val topicsForMessage = topicMapper(msg)
// Create a (Msg, Topic) pair for each of the topics
// the message belongs to
topicsForMessage.map(msg -> _)
}
val multiGroups = messageAndTopic
.groupBy(2, _._2).map {
case (msg, topic) =>
// do what needs to be done
//#multi-groupby
(msg, topic)
//#multi-groupby
}
//#multi-groupby
val result = multiGroups
.grouped(10)
.mergeSubstreams
.map(g => g.head._2.name + g.map(_._1).mkString("[", ", ", "]"))
.grouped(10)
.runWith(Sink.head)
Await.result(result, 3.seconds).toSet should be(Set(
"1[1: a, 1: b, all: c, all: d, 1: e]",
"2[all: c, all: d]"))
}
}
}

View file

@ -0,0 +1,39 @@
package docs.stream.cookbook
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import scala.annotation.tailrec
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeParseLines extends RecipeSpec {
"Recipe for parsing line from bytes" must {
"work" in {
val rawData = Source(List(
ByteString("Hello World"),
ByteString("\r"),
ByteString("!\r"),
ByteString("\nHello Akka!\r\nHello Streams!"),
ByteString("\r\n\r\n")))
//#parse-lines
import akka.stream.io.Framing
val linesStream = rawData.via(Framing.delimiter(
ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true))
.map(_.utf8String)
//#parse-lines
Await.result(linesStream.grouped(10).runWith(Sink.head), 3.seconds) should be(List(
"Hello World\r!",
"Hello Akka!",
"Hello Streams!",
""))
}
}
}

View file

@ -0,0 +1,81 @@
package docs.stream.cookbook
import akka.stream.{ Graph, FlowShape, Inlet, Outlet, Attributes, OverflowStrategy }
import akka.stream.scaladsl._
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
import akka.stream.stage.{ GraphStage, GraphStageLogic }
class RecipeReduceByKey extends RecipeSpec {
"Reduce by key recipe" must {
val MaximumDistinctWords = 1000
"work with simple word count" in {
def words = Source(List("hello", "world", "and", "hello", "universe", "akka") ++ List.fill(1000)("rocks!"))
//#word-count
val counts: Source[(String, Int), Unit] = words
// split the words into separate streams first
.groupBy(MaximumDistinctWords, identity)
// add counting logic to the streams
.fold(("", 0)) {
case ((_, count), word) => (word, count + 1)
}
// get a stream of word counts
.mergeSubstreams
//#word-count
Await.result(counts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
("hello", 2),
("world", 1),
("and", 1),
("universe", 1),
("akka", 1),
("rocks!", 1000)))
}
"work generalized" in {
def words = Source(List("hello", "world", "and", "hello", "universe", "akka") ++ List.fill(1000)("rocks!"))
//#reduce-by-key-general
def reduceByKey[In, K, Out](
maximumGroupSize: Int,
groupKey: (In) => K,
foldZero: (K) => Out)(fold: (Out, In) => Out): Flow[In, (K, Out), Unit] = {
Flow[In]
.groupBy(maximumGroupSize, groupKey)
.fold(Option.empty[(K, Out)]) {
case (None, elem) =>
val key = groupKey(elem)
Some((key, fold(foldZero(key), elem)))
case (Some((key, out)), elem) =>
Some((key, fold(out, elem)))
}
.map(_.get)
.mergeSubstreams
}
val wordCounts = words.via(reduceByKey(
MaximumDistinctWords,
groupKey = (word: String) => word,
foldZero = (key: String) => 0)(fold = (count: Int, elem: String) => count + 1))
//#reduce-by-key-general
Await.result(wordCounts.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
("hello", 2),
("world", 1),
("and", 1),
("universe", 1),
("akka", 1),
("rocks!", 1000)))
}
}
}

View file

@ -0,0 +1,49 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit._
import scala.concurrent.duration._
import akka.testkit.TestLatch
import scala.concurrent.Await
class RecipeSimpleDrop extends RecipeSpec {
"Recipe for simply dropping elements for a faster stream" must {
"work" in {
//#simple-drop
val droppyStream: Flow[Message, Message, Unit] =
Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => newMessage)
//#simple-drop
val latch = TestLatch(2)
val realDroppyStream =
Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => { latch.countDown(); newMessage })
val pub = TestPublisher.probe[Message]()
val sub = TestSubscriber.manualProbe[Message]()
val messageSource = Source.fromPublisher(pub)
val sink = Sink.fromSubscriber(sub)
messageSource.via(realDroppyStream).to(sink).run()
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
pub.sendNext("1")
pub.sendNext("2")
pub.sendNext("3")
Await.ready(latch, 1.second)
subscription.request(1)
sub.expectNext("3")
pub.sendComplete()
subscription.request(1)
sub.expectComplete()
}
}
}

View file

@ -0,0 +1,13 @@
package docs.stream.cookbook
import akka.stream.ActorMaterializer
import akka.stream.testkit.AkkaSpec
trait RecipeSpec extends AkkaSpec {
implicit val m = ActorMaterializer()
type Message = String
type Trigger = Unit
type Job = String
}

View file

@ -0,0 +1,27 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import scala.collection.immutable
import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._
class RecipeToStrict extends RecipeSpec {
"Recipe for draining a stream into a strict collection" must {
"work" in {
val myData = Source(List("1", "2", "3"))
val MaxAllowedSeqSize = 100
//#draining-to-seq
val strict: Future[immutable.Seq[Message]] =
myData.grouped(MaxAllowedSeqSize).runWith(Sink.head)
//#draining-to-seq
Await.result(strict, 3.seconds) should be(List("1", "2", "3"))
}
}
}

View file

@ -0,0 +1,48 @@
package docs.stream.cookbook
import akka.stream.FlowShape
import akka.stream.scaladsl._
import akka.testkit.TestProbe
import scala.concurrent.Await
import scala.concurrent.duration._
class RecipeWorkerPool extends RecipeSpec {
"Recipe for a pool of workers" must {
"work" in {
val myJobs = Source(List("1", "2", "3", "4", "5"))
type Result = String
val worker = Flow[String].map(_ + " done")
//#worker-pool
def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, Unit] = {
import GraphDSL.Implicits._
Flow.fromGraph(GraphDSL.create() { implicit b =>
val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
val merge = b.add(Merge[Out](workerCount))
for (_ <- 1 to workerCount) {
// for each worker, add an edge from the balancer to the worker, then wire
// it to the merge element
balancer ~> worker ~> merge
}
FlowShape(balancer.in, merge.out)
})
}
val processedJobs: Source[Result, Unit] = myJobs.via(balancer(worker, 3))
//#worker-pool
Await.result(processedJobs.grouped(10).runWith(Sink.head), 3.seconds).toSet should be(Set(
"1 done", "2 done", "3 done", "4 done", "5 done"))
}
}
}

View file

@ -0,0 +1,60 @@
/*
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream.io
import java.io.File
import akka.stream._
import akka.stream.scaladsl.{ FileIO, Sink, Source }
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import akka.util.ByteString
import scala.concurrent.Future
class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) {
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
// silence sysout
def println(s: String) = ()
val file = File.createTempFile(getClass.getName, ".tmp")
override def afterTermination() = file.delete()
{
//#file-source
import akka.stream.io._
//#file-source
Thread.sleep(0) // needs a statement here for valid syntax and to avoid "unused" warnings
}
{
//#file-source
val file = new File("example.csv")
//#file-source
}
"read data from a file" in {
//#file-source
def handle(b: ByteString): Unit //#file-source
= ()
//#file-source
val foreach: Future[Long] = FileIO.fromFile(file)
.to(Sink.ignore)
.run()
//#file-source
}
"configure dispatcher in code" in {
//#custom-dispatcher-code
FileIO.fromFile(file)
.withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher"))
//#custom-dispatcher-code
}
}

View file

@ -0,0 +1,165 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream.io
import java.util.concurrent.atomic.AtomicReference
import akka.stream._
import akka.stream.scaladsl.Tcp._
import akka.stream.scaladsl._
import akka.stream.stage.{ Context, PushStage, SyncDirective }
import akka.stream.testkit.AkkaSpec
import akka.testkit.TestProbe
import akka.util.ByteString
import docs.utils.TestUtils
import scala.concurrent.Future
class StreamTcpDocSpec extends AkkaSpec {
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
// silence sysout
def println(s: String) = ()
"simple server connection" in {
{
//#echo-server-simple-bind
val binding: Future[ServerBinding] =
Tcp().bind("127.0.0.1", 8888).to(Sink.ignore).run()
binding.map { b =>
b.unbind() onComplete {
case _ => // ...
}
}
//#echo-server-simple-bind
}
{
val (host, port) = TestUtils.temporaryServerHostnameAndPort()
//#echo-server-simple-handle
import akka.stream.io.Framing
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
connection.handleWith(echo)
}
//#echo-server-simple-handle
}
}
"initial server banner echo server" in {
val localhost = TestUtils.temporaryServerAddress()
val connections = Tcp().bind(localhost.getHostName, localhost.getPort) // TODO getHostString in Java7
val serverProbe = TestProbe()
import akka.stream.io.Framing
//#welcome-banner-chat-server
connections runForeach { connection =>
val serverLogic = Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// server logic, parses incoming commands
val commandParser = new PushStage[String, String] {
override def onPush(elem: String, ctx: Context[String]): SyncDirective = {
elem match {
case "BYE" ctx.finish()
case _ ctx.push(elem + "!")
}
}
}
import connection._
val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!\n"
val welcome = Source.single(ByteString(welcomeMsg))
val echo = b.add(Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
//#welcome-banner-chat-server
.map { command serverProbe.ref ! command; command }
//#welcome-banner-chat-server
.transform(() commandParser)
.map(_ + "\n")
.map(ByteString(_)))
val concat = b.add(Concat[ByteString]())
// first we emit the welcome message,
welcome ~> concat.in(0)
// then we continue using the echo-logic Flow
echo.outlet ~> concat.in(1)
FlowShape(echo.in, concat.out)
})
connection.handleWith(serverLogic)
}
//#welcome-banner-chat-server
import akka.stream.io.Framing
val input = new AtomicReference("Hello world" :: "What a lovely day" :: Nil)
def readLine(prompt: String): String = {
input.get() match {
case all @ cmd :: tail if input.compareAndSet(all, tail) cmd
case _ "q"
}
}
{
//#repl-client
val connection = Tcp().outgoingConnection("127.0.0.1", 8888)
//#repl-client
}
{
val connection = Tcp().outgoingConnection(localhost)
//#repl-client
val replParser = new PushStage[String, ByteString] {
override def onPush(elem: String, ctx: Context[ByteString]): SyncDirective = {
elem match {
case "q" ctx.pushAndFinish(ByteString("BYE\n"))
case _ ctx.push(ByteString(s"$elem\n"))
}
}
}
val repl = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
.map(text => println("Server: " + text))
.map(_ => readLine("> "))
.transform(() replParser)
connection.join(repl).run()
}
//#repl-client
serverProbe.expectMsg("Hello world")
serverProbe.expectMsg("What a lovely day")
serverProbe.expectMsg("BYE")
}
}