format source with scalafmt

This commit is contained in:
Auto Format 2019-03-11 10:38:24 +01:00 committed by Patrik Nordwall
parent 0f40491d42
commit ce404e4f53
1669 changed files with 43208 additions and 35404 deletions

View file

@ -55,11 +55,11 @@ object ActorPublisherDocSpec {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
use.foreach(onNext)
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
use.foreach(onNext)
deliverBuf()
}
}
@ -80,7 +80,9 @@ class ActorPublisherDocSpec extends AkkaSpec {
val jobManagerSource = Source.actorPublisher[JobManager.Job](JobManager.props)
val ref = Flow[JobManager.Job]
.map(_.payload.toUpperCase)
.map { elem => println(elem); elem }
.map { elem =>
println(elem); elem
}
.to(Sink.ignore)
.runWith(jobManagerSource)

View file

@ -88,8 +88,7 @@ class ActorSubscriberDocSpec extends AkkaSpec {
//#actor-subscriber-usage
val N = 117
val worker = Source(1 to N).map(WorkerPool.Msg(_, replyTo))
.runWith(Sink.actorSubscriber(WorkerPool.props))
val worker = Source(1 to N).map(WorkerPool.Msg(_, replyTo)).runWith(Sink.actorSubscriber(WorkerPool.props))
//#actor-subscriber-usage
watch(worker)

View file

@ -20,11 +20,7 @@ class CompositionDocSpec extends AkkaSpec {
"nonnested flow" in {
//#non-nested-flow
Source.single(0)
.map(_ + 1)
.filter(_ != 0)
.map(_ - 2)
.to(Sink.fold(0)(_ + _))
Source.single(0).map(_ + 1).filter(_ != 0).map(_ - 2).to(Sink.fold(0)(_ + _))
// ... where is the nesting?
//#non-nested-flow
@ -33,17 +29,20 @@ class CompositionDocSpec extends AkkaSpec {
"nested flow" in {
//#nested-flow
val nestedSource =
Source.single(0) // An atomic source
Source
.single(0) // An atomic source
.map(_ + 1) // an atomic processing stage
.named("nestedSource") // wraps up the current Source and gives it a name
val nestedFlow =
Flow[Int].filter(_ != 0) // an atomic processing stage
Flow[Int]
.filter(_ != 0) // an atomic processing stage
.map(_ - 2) // another atomic processing stage
.named("nestedFlow") // wraps up the Flow, and gives it a name
val nestedSink =
nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
nestedFlow
.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
.named("nestedSink") // wrap it up
// Create a RunnableGraph
@ -53,17 +52,20 @@ class CompositionDocSpec extends AkkaSpec {
"reusing components" in {
val nestedSource =
Source.single(0) // An atomic source
Source
.single(0) // An atomic source
.map(_ + 1) // an atomic processing stage
.named("nestedSource") // wraps up the current Source and gives it a name
val nestedFlow =
Flow[Int].filter(_ != 0) // an atomic processing stage
Flow[Int]
.filter(_ != 0) // an atomic processing stage
.map(_ - 2) // another atomic processing stage
.named("nestedFlow") // wraps up the Flow, and gives it a name
val nestedSink =
nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
nestedFlow
.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
.named("nestedSink") // wrap it up
//#reuse
@ -192,7 +194,9 @@ class CompositionDocSpec extends AkkaSpec {
//#mat-combine-2
// Materializes to NotUsed (orange)
val flow2: Flow[Int, ByteString, NotUsed] = Flow[Int].map { i => ByteString(i.toString) }
val flow2: Flow[Int, ByteString, NotUsed] = Flow[Int].map { i =>
ByteString(i.toString)
}
// Materializes to Future[OutgoingConnection] (yellow)
val flow3: Flow[ByteString, ByteString, Future[OutgoingConnection]] =
@ -217,9 +221,7 @@ class CompositionDocSpec extends AkkaSpec {
def close() = p.trySuccess(None)
}
def f(
p: Promise[Option[Int]],
rest: (Future[OutgoingConnection], Future[String])): Future[MyClass] = {
def f(p: Promise[Option[Int]], rest: (Future[OutgoingConnection], Future[String])): Future[MyClass] = {
val connFuture = rest._1
connFuture.map(MyClass(p, _))
@ -235,17 +237,17 @@ class CompositionDocSpec extends AkkaSpec {
//#attributes-inheritance
import Attributes._
val nestedSource =
Source.single(0)
.map(_ + 1)
.named("nestedSource") // Wrap, no inputBuffer set
Source.single(0).map(_ + 1).named("nestedSource") // Wrap, no inputBuffer set
val nestedFlow =
Flow[Int].filter(_ != 0)
Flow[Int]
.filter(_ != 0)
.via(Flow[Int].map(_ - 2).withAttributes(inputBuffer(4, 4))) // override
.named("nestedFlow") // Wrap, no inputBuffer set
val nestedSink =
nestedFlow.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
nestedFlow
.to(Sink.fold(0)(_ + _)) // wire an atomic sink to the nestedFlow
.withAttributes(name("nestedSink") and inputBuffer(3, 3)) // override
//#attributes-inheritance
}

View file

@ -152,11 +152,12 @@ class FlowDocSpec extends AkkaSpec with CompileOnlySpec {
"various ways of transforming materialized values" in {
import scala.concurrent.duration._
val throttler = Flow.fromGraph(GraphDSL.create(Source.tick(1.second, 1.second, "test")) { implicit builder => tickSource =>
import GraphDSL.Implicits._
val zip = builder.add(ZipWith[String, Int, Int](Keep.right))
tickSource ~> zip.in0
FlowShape(zip.in1, zip.out)
val throttler = Flow.fromGraph(GraphDSL.create(Source.tick(1.second, 1.second, "test")) {
implicit builder => tickSource =>
import GraphDSL.Implicits._
val zip = builder.add(ZipWith[String, Int, Int](Keep.right))
tickSource ~> zip.in0
FlowShape(zip.in1, zip.out)
})
//#flow-mat-combine
@ -225,10 +226,7 @@ class FlowDocSpec extends AkkaSpec with CompileOnlySpec {
"defining asynchronous boundaries" in {
//#flow-async
Source(List(1, 2, 3))
.map(_ + 1).async
.map(_ * 2)
.to(Sink.ignore)
Source(List(1, 2, 3)).map(_ + 1).async.map(_ * 2).to(Sink.ignore)
//#flow-async
}
@ -261,11 +259,10 @@ object FlowDocSpec {
final class RunWithMyself extends Actor {
implicit val mat = ActorMaterializer()
Source.maybe
.runWith(Sink.onComplete {
case Success(done) => println(s"Completed: $done")
case Failure(ex) => println(s"Failed: ${ex.getMessage}")
})
Source.maybe.runWith(Sink.onComplete {
case Success(done) => println(s"Completed: $done")
case Failure(ex) => println(s"Failed: ${ex.getMessage}")
})
def receive = {
case "boom" =>
@ -277,11 +274,10 @@ object FlowDocSpec {
//#materializer-from-system-in-actor
final class RunForever(implicit val mat: Materializer) extends Actor {
Source.maybe
.runWith(Sink.onComplete {
case Success(done) => println(s"Completed: $done")
case Failure(ex) => println(s"Failed: ${ex.getMessage}")
})
Source.maybe.runWith(Sink.onComplete {
case Success(done) => println(s"Completed: $done")
case Failure(ex) => println(s"Failed: ${ex.getMessage}")
})
def receive = {
case "boom" =>

View file

@ -36,8 +36,7 @@ class FlowErrorDocSpec extends AkkaSpec {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
@ -55,7 +54,8 @@ class FlowErrorDocSpec extends AkkaSpec {
case _ => Supervision.Stop
}
val flow = Flow[Int]
.filter(100 / _ < 50).map(elem => 100 / (5 - elem))
.filter(100 / _ < 50)
.map(elem => 100 / (5 - elem))
.withAttributes(ActorAttributes.supervisionStrategy(decider))
val source = Source(0 to 5).via(flow)
@ -93,12 +93,14 @@ class FlowErrorDocSpec extends AkkaSpec {
"demonstrate recover" in {
implicit val materializer = ActorMaterializer()
//#recover
Source(0 to 6).map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!")
).recover {
case _: RuntimeException => "stream truncated"
}.runForeach(println)
Source(0 to 6)
.map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!"))
.recover {
case _: RuntimeException => "stream truncated"
}
.runForeach(println)
//#recover
/*
@ -111,7 +113,7 @@ Output:
4
stream truncated
//#recover-output
*/
*/
}
"demonstrate recoverWithRetries" in {
@ -119,12 +121,14 @@ stream truncated
//#recoverWithRetries
val planB = Source(List("five", "six", "seven", "eight"))
Source(0 to 10).map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!")
).recoverWithRetries(attempts = 1, {
case _: RuntimeException => planB
}).runForeach(println)
Source(0 to 10)
.map(n =>
if (n < 5) n.toString
else throw new RuntimeException("Boom!"))
.recoverWithRetries(attempts = 1, {
case _: RuntimeException => planB
})
.runForeach(println)
//#recoverWithRetries
/*
@ -140,7 +144,7 @@ six
seven
eight
//#recoverWithRetries-output
*/
*/
}
}

View file

@ -6,7 +6,7 @@ package docs.stream
import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.{ GraphDSL, Merge, Balance, Source, Flow }
import akka.stream.scaladsl.{ Balance, Flow, GraphDSL, Merge, Source }
import akka.testkit.AkkaSpec
class FlowParallelismDocSpec extends AkkaSpec {
@ -19,13 +19,17 @@ class FlowParallelismDocSpec extends AkkaSpec {
//format: OFF
//#pipelining
// Takes a scoop of batter and creates a pancake with one side cooked
val fryingPan1: Flow[ScoopOfBatter, HalfCookedPancake, NotUsed] =
Flow[ScoopOfBatter].map { batter => HalfCookedPancake() }
// Takes a scoop of batter and creates a pancake with one side cooked
val fryingPan1: Flow[ScoopOfBatter, HalfCookedPancake, NotUsed] =
Flow[ScoopOfBatter].map { batter =>
HalfCookedPancake()
}
// Finishes a half-cooked pancake
val fryingPan2: Flow[HalfCookedPancake, Pancake, NotUsed] =
Flow[HalfCookedPancake].map { halfCooked => Pancake() }
// Finishes a half-cooked pancake
val fryingPan2: Flow[HalfCookedPancake, Pancake, NotUsed] =
Flow[HalfCookedPancake].map { halfCooked =>
Pancake()
}
//#pipelining
//format: ON
@ -41,7 +45,9 @@ class FlowParallelismDocSpec extends AkkaSpec {
"Demonstrate parallel processing" in {
//#parallelism
val fryingPan: Flow[ScoopOfBatter, Pancake, NotUsed] =
Flow[ScoopOfBatter].map { batter => Pancake() }
Flow[ScoopOfBatter].map { batter =>
Pancake()
}
val pancakeChef: Flow[ScoopOfBatter, Pancake, NotUsed] = Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
@ -65,7 +71,6 @@ class FlowParallelismDocSpec extends AkkaSpec {
//#parallel-pipeline
val pancakeChef: Flow[ScoopOfBatter, Pancake, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
val mergePancakes = builder.add(Merge[Pancake](2))

View file

@ -38,7 +38,7 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
val reply: Future[LogsOffer] = ref.map(LogsOffer(streamId, _))
// reply to sender
reply pipeTo sender()
reply.pipeTo(sender())
}
def streamLogs(streamId: Long): Source[String, NotUsed] = ???
@ -85,7 +85,7 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
val reply: Future[MeasurementsSinkReady] = ref.map(MeasurementsSinkReady(nodeId, _))
// reply to sender
reply pipeTo sender()
reply.pipeTo(sender())
}
def logsSinkFor(nodeId: String): Sink[String, NotUsed] = ???
@ -116,11 +116,14 @@ class FlowStreamRefsDocSpec extends AkkaSpec with CompileOnlySpec {
import akka.stream.StreamRefAttributes
// configuring Sink.sourceRef (notice that we apply the attributes to the Sink!):
Source.repeat("hello")
Source
.repeat("hello")
.runWith(StreamRefs.sourceRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds)))
// configuring SinkRef.source:
StreamRefs.sinkRef().addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds))
StreamRefs
.sinkRef()
.addAttributes(StreamRefAttributes.subscriptionTimeout(5.seconds))
.runWith(Sink.ignore) // not very interesting Sink, just an example
//#attr-sub-timeout
}

View file

@ -4,7 +4,7 @@
package docs.stream
import akka.stream.{ ClosedShape, OverflowStrategy, ActorMaterializer }
import akka.stream.{ ActorMaterializer, ClosedShape, OverflowStrategy }
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec

View file

@ -97,10 +97,8 @@ class GraphDSLDocSpec extends AkkaSpec {
//#graph-dsl-components-shape
// A shape represents the input and output ports of a reusable
// processing module
case class PriorityWorkerPoolShape[In, Out](
jobsIn: Inlet[In],
priorityJobsIn: Inlet[In],
resultsOut: Outlet[Out]) extends Shape {
case class PriorityWorkerPoolShape[In, Out](jobsIn: Inlet[In], priorityJobsIn: Inlet[In], resultsOut: Outlet[Out])
extends Shape {
// It is important to provide the list of all input and output
// ports with a stable order. Duplicates are not allowed.
@ -111,19 +109,16 @@ class GraphDSLDocSpec extends AkkaSpec {
// A Shape must be able to create a copy of itself. Basically
// it means a new instance with copies of the ports
override def deepCopy() = PriorityWorkerPoolShape(
jobsIn.carbonCopy(),
priorityJobsIn.carbonCopy(),
resultsOut.carbonCopy())
override def deepCopy() =
PriorityWorkerPoolShape(jobsIn.carbonCopy(), priorityJobsIn.carbonCopy(), resultsOut.carbonCopy())
}
//#graph-dsl-components-shape
//#graph-dsl-components-create
object PriorityWorkerPool {
def apply[In, Out](
worker: Flow[In, Out, Any],
workerCount: Int): Graph[PriorityWorkerPoolShape[In, Out], NotUsed] = {
def apply[In, Out](worker: Flow[In, Out, Any],
workerCount: Int): Graph[PriorityWorkerPoolShape[In, Out], NotUsed] = {
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
@ -143,10 +138,9 @@ class GraphDSLDocSpec extends AkkaSpec {
// We now expose the input ports of the priorityMerge and the output
// of the resultsMerge as our PriorityWorkerPool ports
// -- all neatly wrapped in our domain specific Shape
PriorityWorkerPoolShape(
jobsIn = priorityMerge.in(0),
priorityJobsIn = priorityMerge.preferred,
resultsOut = resultsMerge.out)
PriorityWorkerPoolShape(jobsIn = priorityMerge.in(0),
priorityJobsIn = priorityMerge.preferred,
resultsOut = resultsMerge.out)
}
}
@ -160,28 +154,30 @@ class GraphDSLDocSpec extends AkkaSpec {
val worker1 = Flow[String].map("step 1 " + _)
val worker2 = Flow[String].map("step 2 " + _)
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
RunnableGraph
.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val priorityPool1 = b.add(PriorityWorkerPool(worker1, 4))
val priorityPool2 = b.add(PriorityWorkerPool(worker2, 2))
val priorityPool1 = b.add(PriorityWorkerPool(worker1, 4))
val priorityPool2 = b.add(PriorityWorkerPool(worker2, 2))
Source(1 to 100).map("job: " + _) ~> priorityPool1.jobsIn
Source(1 to 100).map("priority job: " + _) ~> priorityPool1.priorityJobsIn
Source(1 to 100).map("job: " + _) ~> priorityPool1.jobsIn
Source(1 to 100).map("priority job: " + _) ~> priorityPool1.priorityJobsIn
priorityPool1.resultsOut ~> priorityPool2.jobsIn
Source(1 to 100).map("one-step, priority " + _) ~> priorityPool2.priorityJobsIn
priorityPool1.resultsOut ~> priorityPool2.jobsIn
Source(1 to 100).map("one-step, priority " + _) ~> priorityPool2.priorityJobsIn
priorityPool2.resultsOut ~> Sink.foreach(println)
ClosedShape
}).run()
priorityPool2.resultsOut ~> Sink.foreach(println)
ClosedShape
})
.run()
//#graph-dsl-components-use
//#graph-dsl-components-shape2
import FanInShape.{ Init, Name }
class PriorityWorkerPoolShape2[In, Out](_init: Init[Out] = Name("PriorityWorkerPool"))
extends FanInShape[Out](_init) {
extends FanInShape[Out](_init) {
protected override def construct(i: Init[Out]) = new PriorityWorkerPoolShape2(i)
val jobsIn = newInlet[In]("jobsIn")
@ -195,8 +191,9 @@ class GraphDSLDocSpec extends AkkaSpec {
"access to materialized value" in {
//#graph-dsl-matvalue
import GraphDSL.Implicits._
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
val foldFlow: Flow[Int, Int, Future[Int]] = Flow.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) {
implicit builder => fold =>
FlowShape(fold.in, builder.materializedValue.mapAsync(4)(identity).outlet)
})
//#graph-dsl-matvalue
@ -205,15 +202,16 @@ class GraphDSLDocSpec extends AkkaSpec {
//#graph-dsl-matvalue-cycle
import GraphDSL.Implicits._
// This cannot produce any value:
val cyclicFold: Source[Int, Future[Int]] = Source.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
// - Fold cannot complete until its upstream mapAsync completes
// - mapAsync cannot complete until the materialized Future produced by
// fold completes
// As a result this Source will never emit anything, and its materialited
// Future will never complete
builder.materializedValue.mapAsync(4)(identity) ~> fold
SourceShape(builder.materializedValue.mapAsync(4)(identity).outlet)
})
val cyclicFold: Source[Int, Future[Int]] =
Source.fromGraph(GraphDSL.create(Sink.fold[Int, Int](0)(_ + _)) { implicit builder => fold =>
// - Fold cannot complete until its upstream mapAsync completes
// - mapAsync cannot complete until the materialized Future produced by
// fold completes
// As a result this Source will never emit anything, and its materialited
// Future will never complete
builder.materializedValue.mapAsync(4)(identity) ~> fold
SourceShape(builder.materializedValue.mapAsync(4)(identity).outlet)
})
//#graph-dsl-matvalue-cycle
}

View file

@ -149,9 +149,7 @@ class GraphStageDocSpec extends AkkaSpec {
val stringLength = Flow.fromGraph(new Map[String, Int](_.length))
val result =
Source(Vector("one", "two", "three"))
.via(stringLength)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Source(Vector("one", "two", "three")).via(stringLength).runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Await.result(result, 3.seconds) should ===(Seq(3, 3, 5))
}
@ -188,9 +186,7 @@ class GraphStageDocSpec extends AkkaSpec {
val evenFilter = Flow.fromGraph(new Filter[Int](_ % 2 == 0))
val result =
Source(Vector(1, 2, 3, 4, 5, 6))
.via(evenFilter)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Source(Vector(1, 2, 3, 4, 5, 6)).via(evenFilter).runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Await.result(result, 3.seconds) should ===(Seq(2, 4, 6))
}
@ -241,9 +237,7 @@ class GraphStageDocSpec extends AkkaSpec {
val duplicator = Flow.fromGraph(new Duplicator[Int])
val result =
Source(Vector(1, 2, 3))
.via(duplicator)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Source(Vector(1, 2, 3)).via(duplicator).runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Await.result(result, 3.seconds) should ===(Seq(1, 1, 2, 2, 3, 3))
}
@ -281,9 +275,7 @@ class GraphStageDocSpec extends AkkaSpec {
val duplicator = Flow.fromGraph(new Duplicator[Int])
val result =
Source(Vector(1, 2, 3))
.via(duplicator)
.runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Source(Vector(1, 2, 3)).via(duplicator).runFold(Seq.empty[Int])((elem, acc) => elem :+ acc)
Await.result(result, 3.seconds) should ===(Seq(1, 1, 2, 2, 3, 3))
@ -293,11 +285,8 @@ class GraphStageDocSpec extends AkkaSpec {
val sink = Sink.fold[List[Int], Int](List.empty[Int])((acc, n) => acc :+ n)
//#graph-operator-chain
val resultFuture = Source(1 to 5)
.via(new Filter(_ % 2 == 0))
.via(new Duplicator())
.via(new Map(_ / 2))
.runWith(sink)
val resultFuture =
Source(1 to 5).via(new Filter(_ % 2 == 0)).via(new Duplicator()).via(new Map(_ / 2)).runWith(sink)
//#graph-operator-chain
@ -344,7 +333,8 @@ class GraphStageDocSpec extends AkkaSpec {
val in = TestPublisher.probe[Int]()
val out = TestSubscriber.probe[Int]()
Source.fromPublisher(in)
Source
.fromPublisher(in)
.via(duplicator)
.to(Sink.fromSubscriber(out))
.withAttributes(Attributes.inputBuffer(1, 1))
@ -426,20 +416,21 @@ class GraphStageDocSpec extends AkkaSpec {
val promise = Promise[A]()
val logic = new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
promise.success(elem)
push(out, elem)
setHandler(in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
promise.success(elem)
push(out, elem)
// replace handler with one that only forwards elements
setHandler(in, new InHandler {
override def onPush(): Unit = {
push(out, grab(in))
}
})
}
})
// replace handler with one that only forwards elements
setHandler(in, new InHandler {
override def onPush(): Unit = {
push(out, grab(in))
}
})
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
@ -455,9 +446,7 @@ class GraphStageDocSpec extends AkkaSpec {
//#materialized
// tests:
val flow = Source(Vector(1, 2, 3))
.viaMat(new FirstValue)(Keep.right)
.to(Sink.ignore)
val flow = Source(Vector(1, 2, 3)).viaMat(new FirstValue)(Keep.right).to(Sink.ignore)
val result: Future[Int] = flow.run()
@ -488,60 +477,58 @@ class GraphStageDocSpec extends AkkaSpec {
pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer.enqueue(elem)
if (downstreamWaiting) {
downstreamWaiting = false
val bufferedElem = buffer.dequeue()
push(out, bufferedElem)
}
if (!bufferFull) {
pull(in)
}
}
setHandler(in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer.enqueue(elem)
if (downstreamWaiting) {
downstreamWaiting = false
val bufferedElem = buffer.dequeue()
push(out, bufferedElem)
}
if (!bufferFull) {
pull(in)
}
}
override def onUpstreamFinish(): Unit = {
if (buffer.nonEmpty) {
// emit the rest if possible
emitMultiple(out, buffer.toIterator)
}
completeStage()
}
})
override def onUpstreamFinish(): Unit = {
if (buffer.nonEmpty) {
// emit the rest if possible
emitMultiple(out, buffer.toIterator)
}
completeStage()
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (buffer.isEmpty) {
downstreamWaiting = true
} else {
val elem = buffer.dequeue
push(out, elem)
}
if (!bufferFull && !hasBeenPulled(in)) {
pull(in)
}
}
})
setHandler(out,
new OutHandler {
override def onPull(): Unit = {
if (buffer.isEmpty) {
downstreamWaiting = true
} else {
val elem = buffer.dequeue
push(out, elem)
}
if (!bufferFull && !hasBeenPulled(in)) {
pull(in)
}
}
})
}
}
//#detached
// tests:
val result1 = Source(Vector(1, 2, 3))
.via(new TwoBuffer)
.runFold(Vector.empty[Int])((acc, n) => acc :+ n)
val result1 = Source(Vector(1, 2, 3)).via(new TwoBuffer).runFold(Vector.empty[Int])((acc, n) => acc :+ n)
Await.result(result1, 3.seconds) should ===(Vector(1, 2, 3))
val subscriber = TestSubscriber.manualProbe[Int]()
val publisher = TestPublisher.probe[Int]()
val flow2 =
Source.fromPublisher(publisher)
.via(new TwoBuffer)
.to(Sink.fromSubscriber(subscriber))
Source.fromPublisher(publisher).via(new TwoBuffer).to(Sink.fromSubscriber(subscriber))
val result2 = flow2.run()

View file

@ -44,12 +44,8 @@ class GraphStageLoggingDocSpec extends AkkaSpec("akka.loglevel = DEBUG") {
"demonstrate logging in custom graphstage" in {
val n = 10
EventFilter.debug(start = "Randomly generated", occurrences = n).intercept {
Source.fromGraph(new RandomLettersSource)
.take(n)
.runWith(Sink.ignore)
.futureValue
Source.fromGraph(new RandomLettersSource).take(n).runWith(Sink.ignore).futureValue
}
}
}

View file

@ -72,9 +72,7 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
//#pub-sub-1
// Obtain a Sink and Source which will publish and receive from the "bus" respectively.
val (sink, source) =
MergeHub.source[String](perProducerBufferSize = 16)
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
.run()
MergeHub.source[String](perProducerBufferSize = 16).toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both).run()
//#pub-sub-1
//#pub-sub-2
@ -89,17 +87,15 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
// started stream as its "topic". We add two more features, external cancellation of
// the registration and automatic cleanup for very slow subscribers.
val busFlow: Flow[String, String, UniqueKillSwitch] =
Flow.fromSinkAndSource(sink, source)
Flow
.fromSinkAndSource(sink, source)
.joinMat(KillSwitches.singleBidi[String, String])(Keep.right)
.backpressureTimeout(3.seconds)
//#pub-sub-3
//#pub-sub-4
val switch: UniqueKillSwitch =
Source.repeat("Hello world!")
.viaMat(busFlow)(Keep.right)
.to(Sink.foreach(println))
.run()
Source.repeat("Hello world!").viaMat(busFlow)(Keep.right).to(Sink.foreach(println)).run()
// Shut down externally
switch.shutdown()
@ -109,17 +105,17 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
"demonstrate creating a dynamic partition hub" in compileOnlySpec {
//#partition-hub
// A simple producer that publishes a new "message-" every second
val producer = Source.tick(1.second, 1.second, "message")
.zipWith(Source(1 to 100))((a, b) => s"$a-$b")
val producer = Source.tick(1.second, 1.second, "message").zipWith(Source(1 to 100))((a, b) => s"$a-$b")
// Attach a PartitionHub Sink to the producer. This will materialize to a
// corresponding Source.
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
producer.toMat(PartitionHub.sink(
(size, elem) => math.abs(elem.hashCode % size),
startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right)
producer.toMat(
PartitionHub.sink((size, elem) => math.abs(elem.hashCode % size),
startAfterNrOfConsumers = 2,
bufferSize = 256))(Keep.right)
// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
@ -134,8 +130,7 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
"demonstrate creating a dynamic stateful partition hub" in compileOnlySpec {
//#partition-hub-stateful
// A simple producer that publishes a new "message-" every second
val producer = Source.tick(1.second, 1.second, "message")
.zipWith(Source(1 to 100))((a, b) => s"$a-$b")
val producer = Source.tick(1.second, 1.second, "message").zipWith(Source(1 to 100))((a, b) => s"$a-$b")
// New instance of the partitioner function and its state is created
// for each materialization of the PartitionHub.
@ -153,9 +148,8 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
// (We need to use toMat and Keep.right since by default the materialized
// value to the left is used)
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
producer.toMat(PartitionHub.statefulSink(
() => roundRobin(),
startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right)
producer.toMat(PartitionHub.statefulSink(() => roundRobin(), startAfterNrOfConsumers = 2, bufferSize = 256))(
Keep.right)
// By running/materializing the producer, we get back a Source, which
// gives us access to the elements published by the producer.
@ -174,15 +168,15 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
// ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer.
// Note that this is a moving target since the elements are consumed concurrently.
val runnableGraph: RunnableGraph[Source[Int, NotUsed]] =
producer.toMat(PartitionHub.statefulSink(
() => (info, elem) => info.consumerIds.minBy(id => info.queueSize(id)),
startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right)
producer.toMat(
PartitionHub.statefulSink(() => (info, elem) => info.consumerIds.minBy(id => info.queueSize(id)),
startAfterNrOfConsumers = 2,
bufferSize = 16))(Keep.right)
val fromProducer: Source[Int, NotUsed] = runnableGraph.run()
fromProducer.runForeach(msg => println("consumer1: " + msg))
fromProducer.throttle(10, 100.millis)
.runForeach(msg => println("consumer2: " + msg))
fromProducer.throttle(10, 100.millis).runForeach(msg => println("consumer2: " + msg))
//#partition-hub-fastest
}

View file

@ -161,24 +161,21 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
//#tweet-authors
val authors: Source[Author, NotUsed] =
tweets
.filter(_.hashtags.contains(akkaTag))
.map(_.author)
tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
//#tweet-authors
//#email-addresses-mapAsync
val emailAddresses: Source[String, NotUsed] =
authors
.mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
authors.mapAsync(4)(author => addressSystem.lookupEmail(author.handle)).collect {
case Some(emailAddress) => emailAddress
}
//#email-addresses-mapAsync
//#send-emails
val sendEmails: RunnableGraph[NotUsed] =
emailAddresses
.mapAsync(4)(address => {
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
emailServer.send(Email(to = address, title = "Akka", body = "I like your tweet"))
})
.to(Sink.ignore)
@ -208,19 +205,14 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val onErrorMessage = (ex: Throwable) => AckingReceiver.StreamFailure(ex)
val probe = TestProbe()
val receiver = system.actorOf(
Props(new AckingReceiver(probe.ref, ackWith = AckMessage)))
val sink = Sink.actorRefWithAck(
receiver,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = OnCompleteMessage,
onFailureMessage = onErrorMessage
)
val receiver = system.actorOf(Props(new AckingReceiver(probe.ref, ackWith = AckMessage)))
val sink = Sink.actorRefWithAck(receiver,
onInitMessage = InitMessage,
ackMessage = AckMessage,
onCompleteMessage = OnCompleteMessage,
onFailureMessage = onErrorMessage)
words
.map(_.toLowerCase)
.runWith(sink)
words.map(_.toLowerCase).runWith(sink)
probe.expectMsg("Stream initialized!")
probe.expectMsg("hello")
@ -272,7 +264,8 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val emailAddresses: Source[String, NotUsed] =
authors.via(
Flow[Author].mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
Flow[Author]
.mapAsync(4)(author => addressSystem.lookupEmail(author.handle))
.withAttributes(supervisionStrategy(resumingDecider)))
//#email-addresses-mapAsync-supervision
}
@ -287,29 +280,28 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
val emailAddresses: Source[String, NotUsed] =
authors
.mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle))
.collect { case Some(emailAddress) => emailAddress }
authors.mapAsyncUnordered(4)(author => addressSystem.lookupEmail(author.handle)).collect {
case Some(emailAddress) => emailAddress
}
val sendEmails: RunnableGraph[NotUsed] =
emailAddresses
.mapAsyncUnordered(4)(address => {
emailServer.send(
Email(to = address, title = "Akka", body = "I like your tweet"))
emailServer.send(Email(to = address, title = "Akka", body = "I like your tweet"))
})
.to(Sink.ignore)
sendEmails.run()
//#external-service-mapAsyncUnordered
probe.receiveN(7).toSet should be(Set(
"rolandkuhn@somewhere.com",
"patriknw@somewhere.com",
"bantonsson@somewhere.com",
"drewhk@somewhere.com",
"ktosopl@somewhere.com",
"mmartynas@somewhere.com",
"akkateam@somewhere.com"))
probe.receiveN(7).toSet should be(
Set("rolandkuhn@somewhere.com",
"patriknw@somewhere.com",
"bantonsson@somewhere.com",
"drewhk@somewhere.com",
"ktosopl@somewhere.com",
"mmartynas@somewhere.com",
"akkateam@somewhere.com"))
}
"careful managed blocking with mapAsync" in {
@ -320,8 +312,9 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val authors = tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
val phoneNumbers =
authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo }
authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle)).collect {
case Some(phoneNo) => phoneNo
}
//#blocking-mapAsync
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
@ -330,8 +323,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
phoneNumbers
.mapAsync(4)(phoneNo => {
Future {
smsServer.send(
TextMessage(to = phoneNo, body = "I like your tweet"))
smsServer.send(TextMessage(to = phoneNo, body = "I like your tweet"))
}(blockingExecutionContext)
})
.to(Sink.ignore)
@ -339,14 +331,14 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
sendTextMessages.run()
//#blocking-mapAsync
probe.receiveN(7).toSet should be(Set(
"rolandkuhn".hashCode.toString,
"patriknw".hashCode.toString,
"bantonsson".hashCode.toString,
"drewhk".hashCode.toString,
"ktosopl".hashCode.toString,
"mmartynas".hashCode.toString,
"akkateam".hashCode.toString))
probe.receiveN(7).toSet should be(
Set("rolandkuhn".hashCode.toString,
"patriknw".hashCode.toString,
"bantonsson".hashCode.toString,
"drewhk".hashCode.toString,
"ktosopl".hashCode.toString,
"mmartynas".hashCode.toString,
"akkateam".hashCode.toString))
}
"careful managed blocking with map" in {
@ -357,8 +349,9 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val authors = tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
val phoneNumbers =
authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle))
.collect { case Some(phoneNo) => phoneNo }
authors.mapAsync(4)(author => addressSystem.lookupPhoneNumber(author.handle)).collect {
case Some(phoneNo) => phoneNo
}
//#blocking-map
val send = Flow[String]
@ -392,9 +385,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
implicit val timeout = Timeout(3.seconds)
val saveTweets: RunnableGraph[NotUsed] =
akkaTweets
.mapAsync(4)(tweet => database ? Save(tweet))
.to(Sink.ignore)
akkaTweets.mapAsync(4)(tweet => database ? Save(tweet)).to(Sink.ignore)
//#save-tweets
saveTweets.run()
@ -419,8 +410,8 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val service = new SometimesSlowService
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
implicit val materializer =
ActorMaterializer(ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
@ -451,8 +442,8 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
implicit val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
val service = new SometimesSlowService
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
implicit val materializer =
ActorMaterializer(ActorMaterializerSettings(system).withInputBuffer(initialSize = 4, maxSize = 4))
Source(List("a", "B", "C", "D", "e", "F", "g", "H", "i", "J"))
.map(elem => { println(s"before: $elem"); elem })
@ -460,17 +451,17 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
.runForeach(elem => println(s"after: $elem"))
//#sometimes-slow-mapAsyncUnordered
probe.receiveN(10).toSet should be(Set(
"after: A",
"after: B",
"after: C",
"after: D",
"after: E",
"after: F",
"after: G",
"after: H",
"after: I",
"after: J"))
probe.receiveN(10).toSet should be(
Set("after: A",
"after: B",
"after: C",
"after: D",
"after: E",
"after: F",
"after: G",
"after: H",
"after: I",
"after: J"))
}
"illustrate use of source queue" in {
@ -488,14 +479,16 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) {
val source = Source(1 to 10)
implicit val ec = system.dispatcher
source.mapAsync(1)(x => {
queue.offer(x).map {
case QueueOfferResult.Enqueued => println(s"enqueued $x")
case QueueOfferResult.Dropped => println(s"dropped $x")
case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}")
case QueueOfferResult.QueueClosed => println("Source Queue closed")
}
}).runWith(Sink.ignore)
source
.mapAsync(1)(x => {
queue.offer(x).map {
case QueueOfferResult.Enqueued => println(s"enqueued $x")
case QueueOfferResult.Dropped => println(s"dropped $x")
case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}")
case QueueOfferResult.QueueClosed => println("Source Queue closed")
}
})
.runWith(Sink.ignore)
//#source-queue
}

View file

@ -10,7 +10,7 @@ import akka.stream.scaladsl._
//#stream-imports
//#other-imports
import akka.{ NotUsed, Done }
import akka.{ Done, NotUsed }
import akka.actor.ActorSystem
import akka.util.ByteString
import scala.concurrent._
@ -50,9 +50,7 @@ class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFuture
val factorials = source.scan(BigInt(1))((acc, next) => acc * next)
val result: Future[IOResult] =
factorials
.map(num => ByteString(s"$num\n"))
.runWith(FileIO.toPath(Paths.get("factorials.txt")))
factorials.map(num => ByteString(s"$num\n")).runWith(FileIO.toPath(Paths.get("factorials.txt")))
//#transform-source
//#use-transformed-sink
@ -81,9 +79,7 @@ class QuickStartDocSpec extends WordSpec with BeforeAndAfterAll with ScalaFuture
//#transform-sink
def lineSink(filename: String): Sink[String, Future[IOResult]] =
Flow[String]
.map(s => ByteString(s + "\n"))
.toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)
Flow[String].map(s => ByteString(s + "\n")).toMat(FileIO.toPath(Paths.get(filename)))(Keep.right)
//#transform-sink
}

View file

@ -22,20 +22,16 @@ class RateTransformationDocSpec extends AkkaSpec {
"conflate should summarize" in {
//#conflate-summarize
val statsFlow = Flow[Double]
.conflateWithSeed(immutable.Seq(_))(_ :+ _)
.map { s =>
val μ = s.sum / s.size
val se = s.map(x => pow(x - μ, 2))
val σ = sqrt(se.sum / se.size)
(σ, μ, s.size)
}
val statsFlow = Flow[Double].conflateWithSeed(immutable.Seq(_))(_ :+ _).map { s =>
val μ = s.sum / s.size
val se = s.map(x => pow(x - μ, 2))
val σ = sqrt(se.sum / se.size)
(σ, μ, s.size)
}
//#conflate-summarize
val fut = Source.fromIterator(() => Iterator.continually(Random.nextGaussian))
.via(statsFlow)
.grouped(10)
.runWith(Sink.head)
val fut =
Source.fromIterator(() => Iterator.continually(Random.nextGaussian)).via(statsFlow).grouped(10).runWith(Sink.head)
fut.futureValue
}
@ -51,25 +47,17 @@ class RateTransformationDocSpec extends AkkaSpec {
.mapConcat(identity)
//#conflate-sample
val fut = Source(1 to 1000)
.map(_.toDouble)
.via(sampleFlow)
.runWith(Sink.fold(Seq.empty[Double])(_ :+ _))
val fut = Source(1 to 1000).map(_.toDouble).via(sampleFlow).runWith(Sink.fold(Seq.empty[Double])(_ :+ _))
fut.futureValue
}
"extrapolate should repeat last" in {
//#extrapolate-last
val lastFlow = Flow[Double]
.extrapolate(Iterator.continually(_))
val lastFlow = Flow[Double].extrapolate(Iterator.continually(_))
//#extrapolate-last
val (probe, fut) = TestSource.probe[Double]
.via(lastFlow)
.grouped(10)
.toMat(Sink.head)(Keep.both)
.run()
val (probe, fut) = TestSource.probe[Double].via(lastFlow).grouped(10).toMat(Sink.head)(Keep.both).run()
probe.sendNext(1.0)
val extrapolated = fut.futureValue
@ -80,14 +68,10 @@ class RateTransformationDocSpec extends AkkaSpec {
"extrapolate should send seed first" in {
//#extrapolate-seed
val initial = 2.0
val seedFlow = Flow[Double]
.extrapolate(Iterator.continually(_), Some(initial))
val seedFlow = Flow[Double].extrapolate(Iterator.continually(_), Some(initial))
//#extrapolate-seed
val fut = TestSource.probe[Double]
.via(seedFlow)
.grouped(10)
.runWith(Sink.head)
val fut = TestSource.probe[Double].via(seedFlow).grouped(10).runWith(Sink.head)
val extrapolated = fut.futureValue
extrapolated.size shouldBe 10
@ -96,17 +80,14 @@ class RateTransformationDocSpec extends AkkaSpec {
"extrapolate should track drift" in {
//#extrapolate-drift
val driftFlow = Flow[Double].map(_ -> 0)
.extrapolate[(Double, Int)] { case (i, _) => Iterator.from(1).map(i -> _) }
val driftFlow = Flow[Double].map(_ -> 0).extrapolate[(Double, Int)] { case (i, _) => Iterator.from(1).map(i -> _) }
//#extrapolate-drift
val latch = TestLatch(2)
val realDriftFlow = Flow[Double].map(d => { latch.countDown(); d -> 0; })
.extrapolate[(Double, Int)] { case (d, _) => latch.countDown(); Iterator.from(1).map(d -> _) }
val realDriftFlow = Flow[Double].map(d => { latch.countDown(); d -> 0; }).extrapolate[(Double, Int)] {
case (d, _) => latch.countDown(); Iterator.from(1).map(d -> _)
}
val (pub, sub) = TestSource.probe[Double]
.via(realDriftFlow)
.toMat(TestSink.probe[(Double, Int)])(Keep.both)
.run()
val (pub, sub) = TestSource.probe[Double].via(realDriftFlow).toMat(TestSink.probe[(Double, Int)])(Keep.both).run()
sub.request(1)
pub.sendNext(1.0)
@ -122,17 +103,12 @@ class RateTransformationDocSpec extends AkkaSpec {
"expand should track drift" in {
//#expand-drift
val driftFlow = Flow[Double]
.expand(i => Iterator.from(0).map(i -> _))
val driftFlow = Flow[Double].expand(i => Iterator.from(0).map(i -> _))
//#expand-drift
val latch = TestLatch(2)
val realDriftFlow = Flow[Double]
.expand(d => { latch.countDown(); Iterator.from(0).map(d -> _) })
val realDriftFlow = Flow[Double].expand(d => { latch.countDown(); Iterator.from(0).map(d -> _) })
val (pub, sub) = TestSource.probe[Double]
.via(realDriftFlow)
.toMat(TestSink.probe[(Double, Int)])(Keep.both)
.run()
val (pub, sub) = TestSource.probe[Double].via(realDriftFlow).toMat(TestSink.probe[(Double, Int)])(Keep.both).run()
sub.request(1)
pub.sendNext(1.0)

View file

@ -23,9 +23,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
trait Fixture {
//#authors
val authors = Flow[Tweet]
.filter(_.hashtags.contains(akkaTag))
.map(_.author)
val authors = Flow[Tweet].filter(_.hashtags.contains(akkaTag)).map(_.author)
//#authors
@ -110,8 +108,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
//#source-fanoutPublisher
val authorPublisher: Publisher[Author] =
Source.fromPublisher(tweets).via(authors)
.runWith(Sink.asPublisher(fanout = true))
Source.fromPublisher(tweets).via(authors).runWith(Sink.asPublisher(fanout = true))
authorPublisher.subscribe(storage)
authorPublisher.subscribe(alert)

View file

@ -35,18 +35,16 @@ class RestartDocSpec extends AkkaSpec with CompileOnlySpec {
"demonstrate a restart with backoff source" in compileOnlySpec {
//#restart-with-backoff-source
val restartSource = RestartSource.withBackoff(
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly
maxRestarts = 20 // limits the amount of restarts to 20
val restartSource = RestartSource.withBackoff(minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2, // adds 20% "noise" to vary the intervals slightly
maxRestarts = 20 // limits the amount of restarts to 20
) { () =>
// Create a source from a future of a source
Source.fromFutureSource {
// Make a single request with akka-http
Http().singleRequest(HttpRequest(
uri = "http://example.com/eventstream"
))
Http()
.singleRequest(HttpRequest(uri = "http://example.com/eventstream"))
// Unmarshall it as a source of server sent events
.flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
}

View file

@ -16,8 +16,7 @@ class SinkRecipeDocSpec extends RecipeSpec {
//#forseachAsync-processing
//def asyncProcessing(value: Int): Future[Unit] = _
Source(1 to 100)
.runWith(Sink.foreachAsync(10)(asyncProcessing))
Source(1 to 100).runWith(Sink.foreachAsync(10)(asyncProcessing))
//#forseachAsync-processing
}
}

View file

@ -16,25 +16,30 @@ class StreamBuffersRateSpec extends AkkaSpec {
def println(s: Any) = ()
//#pipelining
Source(1 to 3)
.map { i => println(s"A: $i"); i }.async
.map { i => println(s"B: $i"); i }.async
.map { i => println(s"C: $i"); i }.async
.map { i =>
println(s"A: $i"); i
}
.async
.map { i =>
println(s"B: $i"); i
}
.async
.map { i =>
println(s"C: $i"); i
}
.async
.runWith(Sink.ignore)
//#pipelining
}
"Demonstrate buffer sizes" in {
//#materializer-buffer
val materializer = ActorMaterializer(
ActorMaterializerSettings(system)
.withInputBuffer(
initialSize = 64,
maxSize = 64))
val materializer =
ActorMaterializer(ActorMaterializerSettings(system).withInputBuffer(initialSize = 64, maxSize = 64))
//#materializer-buffer
//#section-buffer
val section = Flow[Int].map(_ * 2).async
.addAttributes(Attributes.inputBuffer(initial = 1, max = 1)) // the buffer size of this map is 1
val section = Flow[Int].map(_ * 2).async.addAttributes(Attributes.inputBuffer(initial = 1, max = 1)) // the buffer size of this map is 1
val flow = section.via(Flow[Int].map(_ / 2)).async // the buffer size of this map is the default
//#section-buffer
}
@ -52,7 +57,8 @@ class StreamBuffersRateSpec extends AkkaSpec {
Source.tick(initialDelay = 3.second, interval = 3.second, Tick()) ~> zipper.in0
Source.tick(initialDelay = 1.second, interval = 1.second, "message!")
Source
.tick(initialDelay = 1.second, interval = 1.second, "message!")
.conflateWithSeed(seed = (_) => 1)((count, _) => count + 1) ~> zipper.in1
zipper.out ~> Sink.foreach(println)

View file

@ -84,8 +84,7 @@ class StreamTestKitDocSpec extends AkkaSpec {
//#source-actorref
val sinkUnderTest = Flow[Int].map(_.toString).toMat(Sink.fold("")(_ + _))(Keep.right)
val (ref, future) = Source.actorRef(8, OverflowStrategy.fail)
.toMat(sinkUnderTest)(Keep.both).run()
val (ref, future) = Source.actorRef(8, OverflowStrategy.fail).toMat(sinkUnderTest)(Keep.both).run()
ref ! 1
ref ! 2
@ -101,11 +100,7 @@ class StreamTestKitDocSpec extends AkkaSpec {
//#test-sink-probe
val sourceUnderTest = Source(1 to 4).filter(_ % 2 == 0).map(_ * 2)
sourceUnderTest
.runWith(TestSink.probe[Int])
.request(2)
.expectNext(4, 8)
.expectComplete()
sourceUnderTest.runWith(TestSink.probe[Int]).request(2).expectNext(4, 8).expectComplete()
//#test-sink-probe
}
@ -113,10 +108,7 @@ class StreamTestKitDocSpec extends AkkaSpec {
//#test-source-probe
val sinkUnderTest = Sink.cancelled
TestSource.probe[Int]
.toMat(sinkUnderTest)(Keep.left)
.run()
.expectCancellation()
TestSource.probe[Int].toMat(sinkUnderTest)(Keep.left).run().expectCancellation()
//#test-source-probe
}
@ -124,9 +116,7 @@ class StreamTestKitDocSpec extends AkkaSpec {
//#injecting-failure
val sinkUnderTest = Sink.head[Int]
val (probe, future) = TestSource.probe[Int]
.toMat(sinkUnderTest)(Keep.both)
.run()
val (probe, future) = TestSource.probe[Int].toMat(sinkUnderTest)(Keep.both).run()
probe.sendError(new Exception("boom"))
Await.ready(future, 3.seconds)
@ -142,10 +132,7 @@ class StreamTestKitDocSpec extends AkkaSpec {
pattern.after(10.millis * sleep, using = system.scheduler)(Future.successful(sleep))
}
val (pub, sub) = TestSource.probe[Int]
.via(flowUnderTest)
.toMat(TestSink.probe[Int])(Keep.both)
.run()
val (pub, sub) = TestSource.probe[Int].via(flowUnderTest).toMat(TestSink.probe[Int])(Keep.both).run()
sub.request(n = 3)
pub.sendNext(3)

View file

@ -21,23 +21,14 @@ class SubstreamDocSpec extends AkkaSpec {
//#groupBy2
//#groupBy3
Source(1 to 10)
.groupBy(3, _ % 3)
.mergeSubstreams
.runWith(Sink.ignore)
Source(1 to 10).groupBy(3, _ % 3).mergeSubstreams.runWith(Sink.ignore)
//#groupBy3
//#groupBy4
Source(1 to 10)
.groupBy(3, _ % 3)
.mergeSubstreamsWithParallelism(2)
.runWith(Sink.ignore)
Source(1 to 10).groupBy(3, _ % 3).mergeSubstreamsWithParallelism(2).runWith(Sink.ignore)
//concatSubstreams is equivalent to mergeSubstreamsWithParallelism(1)
Source(1 to 10)
.groupBy(3, _ % 3)
.concatSubstreams
.runWith(Sink.ignore)
Source(1 to 10).groupBy(3, _ % 3).concatSubstreams.runWith(Sink.ignore)
//#groupBy4
}
@ -51,8 +42,8 @@ class SubstreamDocSpec extends AkkaSpec {
//#wordCount
val text =
"This is the first line.\n" +
"The second line.\n" +
"There is also the 3rd line\n"
"The second line.\n" +
"There is also the 3rd line\n"
val charCount = Source(text.toList)
.splitAfter { _ == '\n' }
@ -66,15 +57,11 @@ class SubstreamDocSpec extends AkkaSpec {
"generate substreams by flatMapConcat and flatMapMerge" in {
//#flatMapConcat
Source(1 to 2)
.flatMapConcat(i => Source(List.fill(3)(i)))
.runWith(Sink.ignore)
Source(1 to 2).flatMapConcat(i => Source(List.fill(3)(i))).runWith(Sink.ignore)
//#flatMapConcat
//#flatMapMerge
Source(1 to 2)
.flatMapMerge(2, i => Source(List.fill(3)(i)))
.runWith(Sink.ignore)
Source(1 to 2).flatMapMerge(2, i => Source(List.fill(3)(i))).runWith(Sink.ignore)
//#flatMapMerge
}
}

View file

@ -8,7 +8,7 @@ package docs.stream
import akka.{ Done, NotUsed }
import akka.actor.ActorSystem
import akka.stream.{ ClosedShape, ActorMaterializer, OverflowStrategy }
import akka.stream.{ ActorMaterializer, ClosedShape, OverflowStrategy }
import akka.stream.scaladsl._
import scala.concurrent.Await
import scala.concurrent.Future
@ -31,9 +31,13 @@ object TwitterStreamQuickstartDocSpec {
final case class Hashtag(name: String)
final case class Tweet(author: Author, timestamp: Long, body: String) {
def hashtags: Set[Hashtag] = body.split(" ").collect {
case t if t.startsWith("#") => Hashtag(t.replaceAll("[^#\\w]", ""))
}.toSet
def hashtags: Set[Hashtag] =
body
.split(" ")
.collect {
case t if t.startsWith("#") => Hashtag(t.replaceAll("[^#\\w]", ""))
}
.toSet
}
val akkaTag = Hashtag("#akka")
@ -50,16 +54,16 @@ object TwitterStreamQuickstartDocSpec {
//#fiddle_code
val tweets: Source[Tweet, NotUsed] = Source(
Tweet(Author("rolandkuhn"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("bantonsson"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("drewhk"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("ktosopl"), System.currentTimeMillis, "#akka on the rocks!") ::
Tweet(Author("mmartynas"), System.currentTimeMillis, "wow #akka !") ::
Tweet(Author("akkateam"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("bananaman"), System.currentTimeMillis, "#bananas rock!") ::
Tweet(Author("appleman"), System.currentTimeMillis, "#apples rock!") ::
Tweet(Author("drama"), System.currentTimeMillis, "we compared #apples to #oranges!") ::
Nil)
Tweet(Author("patriknw"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("bantonsson"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("drewhk"), System.currentTimeMillis, "#akka !") ::
Tweet(Author("ktosopl"), System.currentTimeMillis, "#akka on the rocks!") ::
Tweet(Author("mmartynas"), System.currentTimeMillis, "wow #akka !") ::
Tweet(Author("akkateam"), System.currentTimeMillis, "#akka rocks!") ::
Tweet(Author("bananaman"), System.currentTimeMillis, "#bananas rock!") ::
Tweet(Author("appleman"), System.currentTimeMillis, "#apples rock!") ::
Tweet(Author("drama"), System.currentTimeMillis, "we compared #apples to #oranges!") ::
Nil)
//#fiddle_code
}
@ -91,9 +95,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#authors-filter-map
val authors: Source[Author, NotUsed] =
tweets
.filter(_.hashtags.contains(akkaTag))
.map(_.author)
tweets.filter(_.hashtags.contains(akkaTag)).map(_.author)
//#first-sample
//#authors-filter-map
@ -171,10 +173,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
}
//#tweets-slow-consumption-dropHead
tweets
.buffer(10, OverflowStrategy.dropHead)
.map(slowComputation)
.runWith(Sink.ignore)
tweets.buffer(10, OverflowStrategy.dropHead).map(slowComputation).runWith(Sink.ignore)
//#tweets-slow-consumption-dropHead
}
@ -184,9 +183,9 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#backpressure-by-readline
val completion: Future[Done] =
Source(1 to 10)
.map(i => { println(s"map => $i"); i })
.runForeach { i => readLine(s"Element = $i; continue reading? [press enter]\n") }
Source(1 to 10).map(i => { println(s"map => $i"); i }).runForeach { i =>
readLine(s"Element = $i; continue reading? [press enter]\n")
}
Await.ready(completion, 1.minute)
//#backpressure-by-readline
@ -200,9 +199,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val sumSink: Sink[Int, Future[Int]] = Sink.fold[Int, Int](0)(_ + _)
val counterGraph: RunnableGraph[Future[Int]] =
tweets
.via(count)
.toMat(sumSink)(Keep.right)
tweets.via(count).toMat(sumSink)(Keep.right)
val sum: Future[Int] = counterGraph.run()
@ -222,10 +219,7 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
//#tweets-runnable-flow-materialized-twice
val sumSink = Sink.fold[Int, Int](0)(_ + _)
val counterRunnableGraph: RunnableGraph[Future[Int]] =
tweetsInMinuteFromNow
.filter(_.hashtags contains akkaTag)
.map(t => 1)
.toMat(sumSink)(Keep.right)
tweetsInMinuteFromNow.filter(_.hashtags contains akkaTag).map(t => 1).toMat(sumSink)(Keep.right)
// materialize the stream once in the morning
val morningTweetsCount: Future[Int] = counterRunnableGraph.run()
@ -236,7 +230,9 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
val sum: Future[Int] = counterRunnableGraph.run()
sum.map { c => println(s"Total tweets processed: $c") }
sum.map { c =>
println(s"Total tweets processed: $c")
}
}
}

View file

@ -19,11 +19,13 @@ class RecipeAdhocSource extends RecipeSpec {
//#adhoc-source
def adhocSource[T](source: Source[T, _], timeout: FiniteDuration, maxRetries: Int): Source[T, _] =
Source.lazily(
() => source.backpressureTimeout(timeout).recoverWithRetries(maxRetries, {
case t: TimeoutException =>
Source.lazily(() => source.backpressureTimeout(timeout)).mapMaterializedValue(_ => NotUsed)
})
)
() =>
source
.backpressureTimeout(timeout)
.recoverWithRetries(maxRetries, {
case t: TimeoutException =>
Source.lazily(() => source.backpressureTimeout(timeout)).mapMaterializedValue(_ => NotUsed)
}))
//#adhoc-source
"Recipe for adhoc source" must {
@ -36,18 +38,15 @@ class RecipeAdhocSource extends RecipeSpec {
}
"start the source when there is a demand" taggedAs TimingTest in {
val sink = adhocSource(Source.repeat("a"), 200.milliseconds, 3)
.runWith(TestSink.probe[String])
val sink = adhocSource(Source.repeat("a"), 200.milliseconds, 3).runWith(TestSink.probe[String])
sink.requestNext("a")
}
"shut down the source when the next demand times out" taggedAs TimingTest in {
val shutdown = Promise[Done]()
val sink = adhocSource(
Source.repeat("a").watchTermination() { (_, term) =>
shutdown.completeWith(term)
}, 200.milliseconds, 3)
.runWith(TestSink.probe[String])
val sink = adhocSource(Source.repeat("a").watchTermination() { (_, term) =>
shutdown.completeWith(term)
}, 200.milliseconds, 3).runWith(TestSink.probe[String])
sink.requestNext("a")
Thread.sleep(200)
@ -56,11 +55,9 @@ class RecipeAdhocSource extends RecipeSpec {
"not shut down the source when there are still demands" taggedAs TimingTest in {
val shutdown = Promise[Done]()
val sink = adhocSource(
Source.repeat("a").watchTermination() { (_, term) =>
shutdown.completeWith(term)
}, 200.milliseconds, 3)
.runWith(TestSink.probe[String])
val sink = adhocSource(Source.repeat("a").watchTermination() { (_, term) =>
shutdown.completeWith(term)
}, 200.milliseconds, 3).runWith(TestSink.probe[String])
sink.requestNext("a")
Thread.sleep(100)
@ -80,14 +77,11 @@ class RecipeAdhocSource extends RecipeSpec {
val shutdown = Promise[Done]()
val startedCount = new AtomicInteger(0)
val source = Source
.empty.mapMaterializedValue(_ => startedCount.incrementAndGet())
.concat(Source.repeat("a"))
val source = Source.empty.mapMaterializedValue(_ => startedCount.incrementAndGet()).concat(Source.repeat("a"))
val sink = adhocSource(source.watchTermination() { (_, term) =>
shutdown.completeWith(term)
}, 200.milliseconds, 3)
.runWith(TestSink.probe[String])
}, 200.milliseconds, 3).runWith(TestSink.probe[String])
sink.requestNext("a")
startedCount.get() should be(1)
@ -99,14 +93,11 @@ class RecipeAdhocSource extends RecipeSpec {
val shutdown = Promise[Done]()
val startedCount = new AtomicInteger(0)
val source = Source
.empty.mapMaterializedValue(_ => startedCount.incrementAndGet())
.concat(Source.repeat("a"))
val source = Source.empty.mapMaterializedValue(_ => startedCount.incrementAndGet()).concat(Source.repeat("a"))
val sink = adhocSource(source.watchTermination() { (_, term) =>
shutdown.completeWith(term)
}, 200.milliseconds, 3)
.runWith(TestSink.probe[String])
}, 200.milliseconds, 3).runWith(TestSink.probe[String])
sink.requestNext("a")
startedCount.get() should be(1)

View file

@ -5,7 +5,7 @@
package docs.stream.cookbook
import akka.NotUsed
import akka.stream.{ Attributes, Outlet, Inlet, FlowShape }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.util.ByteString
@ -35,26 +35,27 @@ class RecipeByteStrings extends RecipeSpec {
emitChunk()
}
})
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer ++= elem
emitChunk()
}
setHandler(in,
new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
buffer ++= elem
emitChunk()
}
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty) completeStage()
else {
// There are elements left in buffer, so
// we keep accepting downstream pulls and push from buffer until emptied.
//
// It might be though, that the upstream finished while it was pulled, in which
// case we will not get an onPull from the downstream, because we already had one.
// In that case we need to emit from the buffer.
if (isAvailable(out)) emitChunk()
}
}
})
override def onUpstreamFinish(): Unit = {
if (buffer.isEmpty) completeStage()
else {
// There are elements left in buffer, so
// we keep accepting downstream pulls and push from buffer until emptied.
//
// It might be though, that the upstream finished while it was pulled, in which
// case we will not get an onPull from the downstream, because we already had one.
// In that case we need to emit from the buffer.
if (isAvailable(out)) emitChunk()
}
}
})
private def emitChunk(): Unit = {
if (buffer.isEmpty) {
@ -92,19 +93,21 @@ class RecipeByteStrings extends RecipeSpec {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
private var count = 0
setHandlers(in, out, new InHandler with OutHandler {
setHandlers(in,
out,
new InHandler with OutHandler {
override def onPull(): Unit = {
pull(in)
}
override def onPull(): Unit = {
pull(in)
}
override def onPush(): Unit = {
val chunk = grab(in)
count += chunk.size
if (count > maximumBytes) failStage(new IllegalStateException("Too much bytes"))
else push(out, chunk)
}
})
override def onPush(): Unit = {
val chunk = grab(in)
count += chunk.size
if (count > maximumBytes) failStage(new IllegalStateException("Too much bytes"))
else push(out, chunk)
}
})
}
}
@ -114,8 +117,8 @@ class RecipeByteStrings extends RecipeSpec {
val bytes1 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9)))
val bytes2 = Source(List(ByteString(1, 2), ByteString(3), ByteString(4, 5, 6), ByteString(7, 8, 9, 10)))
Await.result(bytes1.via(limiter).limit(10).runWith(Sink.seq), 3.seconds)
.fold(ByteString.empty)(_ ++ _) should be(ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
Await.result(bytes1.via(limiter).limit(10).runWith(Sink.seq), 3.seconds).fold(ByteString.empty)(_ ++ _) should be(
ByteString(1, 2, 3, 4, 5, 6, 7, 8, 9))
an[IllegalStateException] must be thrownBy {
Await.result(bytes2.via(limiter).limit(10).runWith(Sink.seq), 3.seconds)

View file

@ -4,7 +4,7 @@
package docs.stream.cookbook
import akka.stream.{ ActorMaterializerSettings, ActorMaterializer }
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import scala.collection.immutable
import scala.concurrent.Await

View file

@ -19,12 +19,10 @@ class RecipeDecompress extends RecipeSpec {
//#decompress-gzip
val compressed =
Source.single(ByteString.fromString("Hello World"))
.via(Compression.gzip)
Source.single(ByteString.fromString("Hello World")).via(Compression.gzip)
//#decompress-gzip
val uncompressed = compressed.via(Compression.gunzip())
.map(_.utf8String)
val uncompressed = compressed.via(Compression.gunzip()).map(_.utf8String)
//#decompress-gzip
Await.result(uncompressed.runWith(Sink.head), 3.seconds) should be("Hello World")

View file

@ -17,7 +17,7 @@ class RecipeDigest extends RecipeSpec {
import java.security.MessageDigest
import akka.NotUsed
import akka.stream.{ Attributes, Outlet, Inlet, FlowShape }
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.scaladsl.{ Sink, Source }
import akka.util.ByteString
@ -56,15 +56,8 @@ class RecipeDigest extends RecipeSpec {
//#calculating-digest
Await.result(digest.runWith(Sink.head), 3.seconds) should be(
ByteString(
0xba, 0x78, 0x16, 0xbf,
0x8f, 0x01, 0xcf, 0xea,
0x41, 0x41, 0x40, 0xde,
0x5d, 0xae, 0x22, 0x23,
0xb0, 0x03, 0x61, 0xa3,
0x96, 0x17, 0x7a, 0x9c,
0xb4, 0x10, 0xff, 0x61,
0xf2, 0x00, 0x15, 0xad))
ByteString(0xba, 0x78, 0x16, 0xbf, 0x8f, 0x01, 0xcf, 0xea, 0x41, 0x41, 0x40, 0xde, 0x5d, 0xae, 0x22, 0x23, 0xb0,
0x03, 0x61, 0xa3, 0x96, 0x17, 0x7a, 0x9c, 0xb4, 0x10, 0xff, 0x61, 0xf2, 0x00, 0x15, 0xad))
}
}
}

View file

@ -24,16 +24,17 @@ class RecipeDroppyBroadcast extends RecipeSpec {
val mySink3 = Sink.fromSubscriber(sub3)
//#droppy-bcast
val graph = RunnableGraph.fromGraph(GraphDSL.create(mySink1, mySink2, mySink3)((_, _, _)) { implicit b => (sink1, sink2, sink3) =>
import GraphDSL.Implicits._
val graph = RunnableGraph.fromGraph(GraphDSL.create(mySink1, mySink2, mySink3)((_, _, _)) {
implicit b => (sink1, sink2, sink3) =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Int](3))
myElements ~> bcast
val bcast = b.add(Broadcast[Int](3))
myElements ~> bcast
bcast.buffer(10, OverflowStrategy.dropHead) ~> sink1
bcast.buffer(10, OverflowStrategy.dropHead) ~> sink2
bcast.buffer(10, OverflowStrategy.dropHead) ~> sink3
ClosedShape
bcast.buffer(10, OverflowStrategy.dropHead) ~> sink1
bcast.buffer(10, OverflowStrategy.dropHead) ~> sink2
bcast.buffer(10, OverflowStrategy.dropHead) ~> sink3
ClosedShape
})
//#droppy-bcast

View file

@ -5,7 +5,7 @@
package docs.stream.cookbook
import akka.NotUsed
import akka.actor.{ Props, ActorRef, Actor }
import akka.actor.{ Actor, ActorRef, Props }
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.stream.testkit._
@ -25,26 +25,22 @@ class RecipeGlobalRateLimit extends RecipeSpec {
case object ReplenishTokens
def props(maxAvailableTokens: Int, tokenRefreshPeriod: FiniteDuration,
tokenRefreshAmount: Int): Props =
def props(maxAvailableTokens: Int, tokenRefreshPeriod: FiniteDuration, tokenRefreshAmount: Int): Props =
Props(new Limiter(maxAvailableTokens, tokenRefreshPeriod, tokenRefreshAmount))
}
class Limiter(
val maxAvailableTokens: Int,
val tokenRefreshPeriod: FiniteDuration,
val tokenRefreshAmount: Int) extends Actor {
class Limiter(val maxAvailableTokens: Int, val tokenRefreshPeriod: FiniteDuration, val tokenRefreshAmount: Int)
extends Actor {
import Limiter._
import context.dispatcher
import akka.actor.Status
private var waitQueue = immutable.Queue.empty[ActorRef]
private var permitTokens = maxAvailableTokens
private val replenishTimer = system.scheduler.schedule(
initialDelay = tokenRefreshPeriod,
interval = tokenRefreshPeriod,
receiver = self,
ReplenishTokens)
private val replenishTimer = system.scheduler.schedule(initialDelay = tokenRefreshPeriod,
interval = tokenRefreshPeriod,
receiver = self,
ReplenishTokens)
override def receive: Receive = open
@ -69,13 +65,13 @@ class RecipeGlobalRateLimit extends RecipeSpec {
val (toBeReleased, remainingQueue) = waitQueue.splitAt(permitTokens)
waitQueue = remainingQueue
permitTokens -= toBeReleased.size
toBeReleased foreach (_ ! MayPass)
toBeReleased.foreach(_ ! MayPass)
if (permitTokens > 0) context.become(open)
}
override def postStop(): Unit = {
replenishTimer.cancel()
waitQueue foreach (_ ! Status.Failure(new IllegalStateException("limiter stopped")))
waitQueue.foreach(_ ! Status.Failure(new IllegalStateException("limiter stopped")))
}
}
//#global-limiter-actor
@ -104,13 +100,15 @@ class RecipeGlobalRateLimit extends RecipeSpec {
val probe = TestSubscriber.manualProbe[String]()
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[String](2))
source1 ~> merge ~> Sink.fromSubscriber(probe)
source2 ~> merge
ClosedShape
}).run()
RunnableGraph
.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val merge = b.add(Merge[String](2))
source1 ~> merge ~> Sink.fromSubscriber(probe)
source2 ~> merge
ClosedShape
})
.run()
probe.expectSubscription().request(1000)

View file

@ -55,20 +55,22 @@ object HoldOps {
private var currentValue: T = _
private var waitingFirstValue = true
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
currentValue = grab(in)
if (waitingFirstValue) {
waitingFirstValue = false
if (isAvailable(out)) push(out, currentValue)
}
pull(in)
}
setHandlers(in,
out,
new InHandler with OutHandler {
override def onPush(): Unit = {
currentValue = grab(in)
if (waitingFirstValue) {
waitingFirstValue = false
if (isAvailable(out)) push(out, currentValue)
}
pull(in)
}
override def onPull(): Unit = {
if (!waitingFirstValue) push(out, currentValue)
}
})
override def onPull(): Unit = {
if (!waitingFirstValue) push(out, currentValue)
}
})
override def preStart(): Unit = {
pull(in)
@ -90,9 +92,7 @@ class RecipeHold extends RecipeSpec {
val source = Source.fromPublisher(pub)
val sink = Sink.fromSubscriber(sub)
source.via(new HoldWithInitial(0)).to(sink)
.withAttributes(Attributes.inputBuffer(1, 1))
.run()
source.via(new HoldWithInitial(0)).to(sink).withAttributes(Attributes.inputBuffer(1, 1)).run()
val subscription = sub.expectSubscription()
sub.expectNoMessage(100.millis)

View file

@ -20,7 +20,9 @@ class RecipeLoggingElements extends RecipeSpec {
val mySource = Source(List("1", "2", "3"))
//#println-debug
val loggedSource = mySource.map { elem => println(elem); elem }
val loggedSource = mySource.map { elem =>
println(elem); elem
}
//#println-debug
loggedSource.runWith(Sink.ignore)
@ -33,14 +35,10 @@ class RecipeLoggingElements extends RecipeSpec {
//#log-custom
// customise log levels
mySource.log("before-map")
.withAttributes(
Attributes.logLevels(
onElement = Logging.WarningLevel,
onFinish = Logging.InfoLevel,
onFailure = Logging.DebugLevel
)
)
mySource
.log("before-map")
.withAttributes(Attributes
.logLevels(onElement = Logging.WarningLevel, onFinish = Logging.InfoLevel, onFailure = Logging.DebugLevel))
.map(analyse)
// or provide custom logging adapter

View file

@ -25,13 +25,11 @@ class RecipeMissedTicks extends RecipeSpec {
//#missed-ticks
val missedTicks: Flow[Tick, Int, NotUsed] =
Flow[Tick].conflateWithSeed(seed = (_) => 0)(
(missedTicks, tick) => missedTicks + 1)
Flow[Tick].conflateWithSeed(seed = (_) => 0)((missedTicks, tick) => missedTicks + 1)
//#missed-ticks
val latch = TestLatch(3)
val realMissedTicks: Flow[Tick, Int, NotUsed] =
Flow[Tick].conflateWithSeed(seed = (_) => 0)(
(missedTicks, tick) => { latch.countDown(); missedTicks + 1 })
Flow[Tick].conflateWithSeed(seed = (_) => 0)((missedTicks, tick) => { latch.countDown(); missedTicks + 1 })
tickStream.via(realMissedTicks).to(sink).run()

View file

@ -35,14 +35,13 @@ class RecipeMultiGroupBy extends RecipeSpec {
topicsForMessage.map(msg -> _)
}
val multiGroups = messageAndTopic
.groupBy(2, _._2).map {
case (msg, topic) =>
// do what needs to be done
//#multi-groupby
(msg, topic)
val multiGroups = messageAndTopic.groupBy(2, _._2).map {
case (msg, topic) =>
// do what needs to be done
//#multi-groupby
}
(msg, topic)
//#multi-groupby
}
//#multi-groupby
val result = multiGroups
@ -52,9 +51,7 @@ class RecipeMultiGroupBy extends RecipeSpec {
.limit(10)
.runWith(Sink.seq)
Await.result(result, 3.seconds).toSet should be(Set(
"1[1: a, 1: b, all: c, all: d, 1: e]",
"2[all: c, all: d]"))
Await.result(result, 3.seconds).toSet should be(Set("1[1: a, 1: b, all: c, all: d, 1: e]", "2[all: c, all: d]"))
}

View file

@ -16,25 +16,22 @@ class RecipeParseLines extends RecipeSpec {
"Recipe for parsing line from bytes" must {
"work" in {
val rawData = Source(List(
ByteString("Hello World"),
ByteString("\r"),
ByteString("!\r"),
ByteString("\nHello Akka!\r\nHello Streams!"),
ByteString("\r\n\r\n")))
val rawData = Source(
List(ByteString("Hello World"),
ByteString("\r"),
ByteString("!\r"),
ByteString("\nHello Akka!\r\nHello Streams!"),
ByteString("\r\n\r\n")))
//#parse-lines
import akka.stream.scaladsl.Framing
val linesStream = rawData.via(Framing.delimiter(
ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true))
val linesStream = rawData
.via(Framing.delimiter(ByteString("\r\n"), maximumFrameLength = 100, allowTruncation = true))
.map(_.utf8String)
//#parse-lines
Await.result(linesStream.limit(10).runWith(Sink.seq), 3.seconds) should be(List(
"Hello World\r!",
"Hello Akka!",
"Hello Streams!",
""))
Await.result(linesStream.limit(10).runWith(Sink.seq), 3.seconds) should be(
List("Hello World\r!", "Hello Akka!", "Hello Streams!", ""))
}
}

View file

@ -21,7 +21,7 @@ class RecipeReduceByKey extends RecipeSpec {
//#word-count
val counts: Source[(String, Int), NotUsed] = words
// split the words into separate streams first
// split the words into separate streams first
.groupBy(MaximumDistinctWords, identity)
//transform each element to pair with number of words in it
.map(_ -> 1)
@ -31,13 +31,8 @@ class RecipeReduceByKey extends RecipeSpec {
.mergeSubstreams
//#word-count
Await.result(counts.limit(10).runWith(Sink.seq), 3.seconds).toSet should be(Set(
("hello", 2),
("world", 1),
("and", 1),
("universe", 1),
("akka", 1),
("rocks!", 1000)))
Await.result(counts.limit(10).runWith(Sink.seq), 3.seconds).toSet should be(
Set(("hello", 2), ("world", 1), ("and", 1), ("universe", 1), ("akka", 1), ("rocks!", 1000)))
}
"work generalized" in {
@ -45,10 +40,9 @@ class RecipeReduceByKey extends RecipeSpec {
def words = Source(List("hello", "world", "and", "hello", "universe", "akka") ++ List.fill(1000)("rocks!"))
//#reduce-by-key-general
def reduceByKey[In, K, Out](
maximumGroupSize: Int,
groupKey: (In) => K,
map: (In) => Out)(reduce: (Out, Out) => Out): Flow[In, (K, Out), NotUsed] = {
def reduceByKey[In, K, Out](maximumGroupSize: Int,
groupKey: (In) => K,
map: (In) => Out)(reduce: (Out, Out) => Out): Flow[In, (K, Out), NotUsed] = {
Flow[In]
.groupBy[K](maximumGroupSize, groupKey)
@ -58,19 +52,12 @@ class RecipeReduceByKey extends RecipeSpec {
}
val wordCounts = words.via(
reduceByKey(
MaximumDistinctWords,
groupKey = (word: String) => word,
map = (word: String) => 1)((left: Int, right: Int) => left + right))
reduceByKey(MaximumDistinctWords, groupKey = (word: String) => word, map = (word: String) => 1)(
(left: Int, right: Int) => left + right))
//#reduce-by-key-general
Await.result(wordCounts.limit(10).runWith(Sink.seq), 3.seconds).toSet should be(Set(
("hello", 2),
("world", 1),
("and", 1),
("universe", 1),
("akka", 1),
("rocks!", 1000)))
Await.result(wordCounts.limit(10).runWith(Sink.seq), 3.seconds).toSet should be(
Set(("hello", 2), ("world", 1), ("and", 1), ("universe", 1), ("akka", 1), ("rocks!", 1000)))
}
}

View file

@ -42,8 +42,8 @@ class RecipeWorkerPool extends RecipeSpec {
val processedJobs: Source[Result, NotUsed] = myJobs.via(balancer(worker, 3))
//#worker-pool
Await.result(processedJobs.limit(10).runWith(Sink.seq), 3.seconds).toSet should be(Set(
"1 done", "2 done", "3 done", "4 done", "5 done"))
Await.result(processedJobs.limit(10).runWith(Sink.seq), 3.seconds).toSet should be(
Set("1 done", "2 done", "3 done", "4 done", "5 done"))
}

View file

@ -52,25 +52,20 @@ class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) {
//#file-source
val foreach: Future[IOResult] = FileIO.fromPath(file)
.to(Sink.ignore)
.run()
val foreach: Future[IOResult] = FileIO.fromPath(file).to(Sink.ignore).run()
//#file-source
}
"configure dispatcher in code" in {
//#custom-dispatcher-code
FileIO.fromPath(file)
.withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher"))
FileIO.fromPath(file).withAttributes(ActorAttributes.dispatcher("custom-blocking-io-dispatcher"))
//#custom-dispatcher-code
}
"write data into a file" in {
//#file-sink
val text = Source.single("Hello Akka Stream!")
val result: Future[IOResult] = text
.map(t => ByteString(t))
.runWith(FileIO.toPath(file))
val result: Future[IOResult] = text.map(t => ByteString(t)).runWith(FileIO.toPath(file))
//#file-sink
}
}

View file

@ -31,7 +31,7 @@ class StreamTcpDocSpec extends AkkaSpec {
Tcp().bind("127.0.0.1", 8888).to(Sink.ignore).run()
binding.map { b =>
b.unbind() onComplete {
b.unbind().onComplete {
case _ => // ...
}
}
@ -44,14 +44,11 @@ class StreamTcpDocSpec extends AkkaSpec {
val connections: Source[IncomingConnection, Future[ServerBinding]] =
Tcp().bind(host, port)
connections runForeach { connection =>
connections.runForeach { connection =>
println(s"New connection from: ${connection.remoteAddress}")
val echo = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map(_ + "!!!\n")
.map(ByteString(_))
@ -71,32 +68,32 @@ class StreamTcpDocSpec extends AkkaSpec {
import akka.stream.scaladsl.Framing
val binding =
//#welcome-banner-chat-server
connections.to(Sink.foreach { connection =>
connections
.to(Sink.foreach { connection =>
// server logic, parses incoming commands
val commandParser = Flow[String].takeWhile(_ != "BYE").map(_ + "!")
// server logic, parses incoming commands
val commandParser = Flow[String].takeWhile(_ != "BYE").map(_ + "!")
import connection._
val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!"
val welcome = Source.single(welcomeMsg)
import connection._
val welcomeMsg = s"Welcome to: $localAddress, you are: $remoteAddress!"
val welcome = Source.single(welcomeMsg)
val serverLogic = Flow[ByteString]
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
//#welcome-banner-chat-server
.map { command =>
serverProbe.ref ! command; command
}
//#welcome-banner-chat-server
.via(commandParser)
// merge in the initial banner after parser
.merge(welcome)
.map(_ + "\n")
.map(ByteString(_))
val serverLogic = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.map(_.utf8String)
//#welcome-banner-chat-server
.map { command => serverProbe.ref ! command; command }
//#welcome-banner-chat-server
.via(commandParser)
// merge in the initial banner after parser
.merge(welcome)
.map(_ + "\n")
.map(ByteString(_))
connection.handleWith(serverLogic)
}).run()
connection.handleWith(serverLogic)
})
.run()
//#welcome-banner-chat-server
// make sure server is started before we connect
@ -108,7 +105,7 @@ class StreamTcpDocSpec extends AkkaSpec {
def readLine(prompt: String): String = {
input.get() match {
case all @ cmd :: tail if input.compareAndSet(all, tail) => cmd
case _ => "q"
case _ => "q"
}
}
@ -124,15 +121,10 @@ class StreamTcpDocSpec extends AkkaSpec {
//#repl-client
val replParser =
Flow[String].takeWhile(_ != "q")
.concat(Source.single("BYE"))
.map(elem => ByteString(s"$elem\n"))
Flow[String].takeWhile(_ != "q").concat(Source.single("BYE")).map(elem => ByteString(s"$elem\n"))
val repl = Flow[ByteString]
.via(Framing.delimiter(
ByteString("\n"),
maximumFrameLength = 256,
allowTruncation = true))
.via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 256, allowTruncation = true))
.map(_.utf8String)
.map(text => println("Server: " + text))
.map(_ => readLine("> "))

View file

@ -17,12 +17,12 @@ object SourceOrFlow {
//#log
Flow[String]
//#log
//#log
.log(name = "myStream")
.addAttributes(Attributes.logLevels(
onElement = Attributes.LogLevels.Off,
onFailure = Attributes.LogLevels.Error,
onFinish = Attributes.LogLevels.Info))
.addAttributes(
Attributes.logLevels(onElement = Attributes.LogLevels.Off,
onFailure = Attributes.LogLevels.Error,
onFinish = Attributes.LogLevels.Info))
//#log
}
@ -30,7 +30,8 @@ object SourceOrFlow {
//#conflate
import scala.concurrent.duration._
Source.cycle(() => List(1, 10, 100, 1000).iterator)
Source
.cycle(() => List(1, 10, 100, 1000).iterator)
.throttle(10, per = 1.second) // faster upstream
.conflate((acc, el) => acc + el) // acc: Int, el: Int
.throttle(1, per = 1.second) // slow downstream
@ -45,9 +46,10 @@ object SourceOrFlow {
def sum(other: Summed) = Summed(this.i + other.i)
}
Source.cycle(() => List(1, 10, 100, 1000).iterator)
Source
.cycle(() => List(1, 10, 100, 1000).iterator)
.throttle(10, per = 1.second) // faster upstream
.conflateWithSeed(el => Summed(el))((acc, el) => acc sum Summed(el)) // (Summed, Int) => Summed
.conflateWithSeed(el => Summed(el))((acc, el) => acc.sum(Summed(el))) // (Summed, Int) => Summed
.throttle(1, per = 1.second) // slow downstream
//#conflateWithSeed
}