parent
63ccdeec16
commit
b1df13d4d4
221 changed files with 1528 additions and 1580 deletions
|
|
@ -23,7 +23,7 @@ class RecipeDroppyBroadcast extends RecipeSpec {
|
|||
val mySink3 = Sink.fromSubscriber(sub3)
|
||||
|
||||
//#droppy-bcast
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.create(mySink1, mySink2, mySink3)((_, _, _)) { implicit b => (sink1, sink2, sink3) =>
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.create(mySink1, mySink2, mySink3)((_, _, _)) { implicit b ⇒ (sink1, sink2, sink3) ⇒
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
val bcast = b.add(Broadcast[Int](3))
|
||||
|
|
@ -39,7 +39,7 @@ class RecipeDroppyBroadcast extends RecipeSpec {
|
|||
graph.run()
|
||||
|
||||
sub3.request(100)
|
||||
for (i <- 1 to 100) {
|
||||
for (i ← 1 to 100) {
|
||||
pub.sendNext(i)
|
||||
sub3.expectNext(i)
|
||||
}
|
||||
|
|
@ -49,7 +49,7 @@ class RecipeDroppyBroadcast extends RecipeSpec {
|
|||
sub1.expectSubscription().request(10)
|
||||
sub2.expectSubscription().request(10)
|
||||
|
||||
for (i <- 91 to 100) {
|
||||
for (i ← 91 to 100) {
|
||||
sub1.expectNext(i)
|
||||
sub2.expectNext(i)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,19 +45,19 @@ class RecipeGlobalRateLimit extends RecipeSpec {
|
|||
override def receive: Receive = open
|
||||
|
||||
val open: Receive = {
|
||||
case ReplenishTokens =>
|
||||
case ReplenishTokens ⇒
|
||||
permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens)
|
||||
case WantToPass =>
|
||||
case WantToPass ⇒
|
||||
permitTokens -= 1
|
||||
sender() ! MayPass
|
||||
if (permitTokens == 0) context.become(closed)
|
||||
}
|
||||
|
||||
val closed: Receive = {
|
||||
case ReplenishTokens =>
|
||||
case ReplenishTokens ⇒
|
||||
permitTokens = math.min(permitTokens + tokenRefreshAmount, maxAvailableTokens)
|
||||
releaseWaiting()
|
||||
case WantToPass =>
|
||||
case WantToPass ⇒
|
||||
waitQueue = waitQueue.enqueue(sender())
|
||||
}
|
||||
|
||||
|
|
@ -82,11 +82,11 @@ class RecipeGlobalRateLimit extends RecipeSpec {
|
|||
def limitGlobal[T](limiter: ActorRef, maxAllowedWait: FiniteDuration): Flow[T, T, NotUsed] = {
|
||||
import akka.pattern.ask
|
||||
import akka.util.Timeout
|
||||
Flow[T].mapAsync(4)((element: T) => {
|
||||
Flow[T].mapAsync(4)((element: T) ⇒ {
|
||||
import system.dispatcher
|
||||
implicit val triggerTimeout = Timeout(maxAllowedWait)
|
||||
val limiterTriggerFuture = limiter ? Limiter.WantToPass
|
||||
limiterTriggerFuture.map((_) => element)
|
||||
limiterTriggerFuture.map((_) ⇒ element)
|
||||
})
|
||||
|
||||
}
|
||||
|
|
@ -95,12 +95,12 @@ class RecipeGlobalRateLimit extends RecipeSpec {
|
|||
// Use a large period and emulate the timer by hand instead
|
||||
val limiter = system.actorOf(Limiter.props(2, 100.days, 1), "limiter")
|
||||
|
||||
val source1 = Source.fromIterator(() => Iterator.continually("E1")).via(limitGlobal(limiter, 2.seconds.dilated))
|
||||
val source2 = Source.fromIterator(() => Iterator.continually("E2")).via(limitGlobal(limiter, 2.seconds.dilated))
|
||||
val source1 = Source.fromIterator(() ⇒ Iterator.continually("E1")).via(limitGlobal(limiter, 2.seconds.dilated))
|
||||
val source2 = Source.fromIterator(() ⇒ Iterator.continually("E2")).via(limitGlobal(limiter, 2.seconds.dilated))
|
||||
|
||||
val probe = TestSubscriber.manualProbe[String]()
|
||||
|
||||
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
|
||||
RunnableGraph.fromGraph(GraphDSL.create() { implicit b ⇒
|
||||
import GraphDSL.Implicits._
|
||||
val merge = b.add(Merge[String](2))
|
||||
source1 ~> merge ~> Sink.fromSubscriber(probe)
|
||||
|
|
@ -119,7 +119,7 @@ class RecipeGlobalRateLimit extends RecipeSpec {
|
|||
probe.expectNoMsg(500.millis)
|
||||
|
||||
var resultSet = Set.empty[String]
|
||||
for (_ <- 1 to 100) {
|
||||
for (_ ← 1 to 100) {
|
||||
limiter ! Limiter.ReplenishTokens
|
||||
resultSet += probe.expectNext()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ class RecipeKeepAlive extends RecipeSpec {
|
|||
//#inject-keepalive
|
||||
import scala.concurrent.duration._
|
||||
val injectKeepAlive: Flow[ByteString, ByteString, NotUsed] =
|
||||
Flow[ByteString].keepAlive(1.second, () => keepaliveMessage)
|
||||
Flow[ByteString].keepAlive(1.second, () ⇒ keepaliveMessage)
|
||||
//#inject-keepalive
|
||||
|
||||
// No need to test, this is a built-in stage with proper tests
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ class RecipeLoggingElements extends RecipeSpec {
|
|||
val mySource = Source(List("1", "2", "3"))
|
||||
|
||||
//#println-debug
|
||||
val loggedSource = mySource.map { elem => println(elem); elem }
|
||||
val loggedSource = mySource.map { elem ⇒ println(elem); elem }
|
||||
//#println-debug
|
||||
|
||||
loggedSource.runWith(Sink.ignore)
|
||||
|
|
|
|||
|
|
@ -18,12 +18,12 @@ class RecipeManualTrigger extends RecipeSpec {
|
|||
val sink = Sink.fromSubscriber(sub)
|
||||
|
||||
//#manually-triggered-stream
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder ⇒
|
||||
import GraphDSL.Implicits._
|
||||
val zip = builder.add(Zip[Message, Trigger]())
|
||||
elements ~> zip.in0
|
||||
triggerSource ~> zip.in1
|
||||
zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) => msg } ~> sink
|
||||
zip.out ~> Flow[(Message, Trigger)].map { case (msg, trigger) ⇒ msg } ~> sink
|
||||
ClosedShape
|
||||
})
|
||||
//#manually-triggered-stream
|
||||
|
|
@ -57,9 +57,9 @@ class RecipeManualTrigger extends RecipeSpec {
|
|||
val sink = Sink.fromSubscriber(sub)
|
||||
|
||||
//#manually-triggered-stream-zipwith
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder ⇒
|
||||
import GraphDSL.Implicits._
|
||||
val zip = builder.add(ZipWith((msg: Message, trigger: Trigger) => msg))
|
||||
val zip = builder.add(ZipWith((msg: Message, trigger: Trigger) ⇒ msg))
|
||||
|
||||
elements ~> zip.in0
|
||||
triggerSource ~> zip.in1
|
||||
|
|
|
|||
|
|
@ -21,13 +21,13 @@ class RecipeMissedTicks extends RecipeSpec {
|
|||
|
||||
//#missed-ticks
|
||||
val missedTicks: Flow[Tick, Int, NotUsed] =
|
||||
Flow[Tick].conflateWithSeed(seed = (_) => 0)(
|
||||
(missedTicks, tick) => missedTicks + 1)
|
||||
Flow[Tick].conflateWithSeed(seed = (_) ⇒ 0)(
|
||||
(missedTicks, tick) ⇒ missedTicks + 1)
|
||||
//#missed-ticks
|
||||
val latch = TestLatch(3)
|
||||
val realMissedTicks: Flow[Tick, Int, NotUsed] =
|
||||
Flow[Tick].conflateWithSeed(seed = (_) => 0)(
|
||||
(missedTicks, tick) => { latch.countDown(); missedTicks + 1 })
|
||||
Flow[Tick].conflateWithSeed(seed = (_) ⇒ 0)(
|
||||
(missedTicks, tick) ⇒ { latch.countDown(); missedTicks + 1 })
|
||||
|
||||
tickStream.via(realMissedTicks).to(sink).run()
|
||||
|
||||
|
|
|
|||
|
|
@ -16,15 +16,15 @@ class RecipeMultiGroupBy extends RecipeSpec {
|
|||
case class Topic(name: String)
|
||||
|
||||
val elems = Source(List("1: a", "1: b", "all: c", "all: d", "1: e"))
|
||||
val extractTopics = { msg: Message =>
|
||||
val extractTopics = { msg: Message ⇒
|
||||
if (msg.startsWith("1")) List(Topic("1"))
|
||||
else List(Topic("1"), Topic("2"))
|
||||
}
|
||||
|
||||
//#multi-groupby
|
||||
val topicMapper: (Message) => immutable.Seq[Topic] = extractTopics
|
||||
val topicMapper: (Message) ⇒ immutable.Seq[Topic] = extractTopics
|
||||
|
||||
val messageAndTopic: Source[(Message, Topic), NotUsed] = elems.mapConcat { msg: Message =>
|
||||
val messageAndTopic: Source[(Message, Topic), NotUsed] = elems.mapConcat { msg: Message ⇒
|
||||
val topicsForMessage = topicMapper(msg)
|
||||
// Create a (Msg, Topic) pair for each of the topics
|
||||
// the message belongs to
|
||||
|
|
@ -33,7 +33,7 @@ class RecipeMultiGroupBy extends RecipeSpec {
|
|||
|
||||
val multiGroups = messageAndTopic
|
||||
.groupBy(2, _._2).map {
|
||||
case (msg, topic) =>
|
||||
case (msg, topic) ⇒
|
||||
// do what needs to be done
|
||||
//#multi-groupby
|
||||
(msg, topic)
|
||||
|
|
@ -44,7 +44,7 @@ class RecipeMultiGroupBy extends RecipeSpec {
|
|||
val result = multiGroups
|
||||
.grouped(10)
|
||||
.mergeSubstreams
|
||||
.map(g => g.head._2.name + g.map(_._1).mkString("[", ", ", "]"))
|
||||
.map(g ⇒ g.head._2.name + g.map(_._1).mkString("[", ", ", "]"))
|
||||
.limit(10)
|
||||
.runWith(Sink.seq)
|
||||
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ class RecipeReduceByKey extends RecipeSpec {
|
|||
//transform each element to pair with number of words in it
|
||||
.map(_ -> 1)
|
||||
// add counting logic to the streams
|
||||
.reduce((l, r) => (l._1, l._2 + r._2))
|
||||
.reduce((l, r) ⇒ (l._1, l._2 + r._2))
|
||||
// get a stream of word counts
|
||||
.mergeSubstreams
|
||||
//#word-count
|
||||
|
|
@ -45,21 +45,21 @@ class RecipeReduceByKey extends RecipeSpec {
|
|||
//#reduce-by-key-general
|
||||
def reduceByKey[In, K, Out](
|
||||
maximumGroupSize: Int,
|
||||
groupKey: (In) => K,
|
||||
map: (In) => Out)(reduce: (Out, Out) => Out): Flow[In, (K, Out), NotUsed] = {
|
||||
groupKey: (In) ⇒ K,
|
||||
map: (In) ⇒ Out)(reduce: (Out, Out) ⇒ Out): Flow[In, (K, Out), NotUsed] = {
|
||||
|
||||
Flow[In]
|
||||
.groupBy[K](maximumGroupSize, groupKey)
|
||||
.map(e => groupKey(e) -> map(e))
|
||||
.reduce((l, r) => l._1 -> reduce(l._2, r._2))
|
||||
.map(e ⇒ groupKey(e) -> map(e))
|
||||
.reduce((l, r) ⇒ l._1 -> reduce(l._2, r._2))
|
||||
.mergeSubstreams
|
||||
}
|
||||
|
||||
val wordCounts = words.via(
|
||||
reduceByKey(
|
||||
MaximumDistinctWords,
|
||||
groupKey = (word: String) => word,
|
||||
map = (word: String) => 1)((left: Int, right: Int) => left + right))
|
||||
groupKey = (word: String) ⇒ word,
|
||||
map = (word: String) ⇒ 1)((left: Int, right: Int) ⇒ left + right))
|
||||
//#reduce-by-key-general
|
||||
|
||||
Await.result(wordCounts.limit(10).runWith(Sink.seq), 3.seconds).toSet should be(Set(
|
||||
|
|
|
|||
|
|
@ -15,11 +15,11 @@ class RecipeSimpleDrop extends RecipeSpec {
|
|||
|
||||
//#simple-drop
|
||||
val droppyStream: Flow[Message, Message, NotUsed] =
|
||||
Flow[Message].conflate((lastMessage, newMessage) => newMessage)
|
||||
Flow[Message].conflate((lastMessage, newMessage) ⇒ newMessage)
|
||||
//#simple-drop
|
||||
val latch = TestLatch(2)
|
||||
val realDroppyStream =
|
||||
Flow[Message].conflate((lastMessage, newMessage) => { latch.countDown(); newMessage })
|
||||
Flow[Message].conflate((lastMessage, newMessage) ⇒ { latch.countDown(); newMessage })
|
||||
|
||||
val pub = TestPublisher.probe[Message]()
|
||||
val sub = TestSubscriber.manualProbe[Message]()
|
||||
|
|
|
|||
|
|
@ -22,11 +22,11 @@ class RecipeWorkerPool extends RecipeSpec {
|
|||
def balancer[In, Out](worker: Flow[In, Out, Any], workerCount: Int): Flow[In, Out, NotUsed] = {
|
||||
import GraphDSL.Implicits._
|
||||
|
||||
Flow.fromGraph(GraphDSL.create() { implicit b =>
|
||||
Flow.fromGraph(GraphDSL.create() { implicit b ⇒
|
||||
val balancer = b.add(Balance[In](workerCount, waitForAllDownstreams = true))
|
||||
val merge = b.add(Merge[Out](workerCount))
|
||||
|
||||
for (_ <- 1 to workerCount) {
|
||||
for (_ ← 1 to workerCount) {
|
||||
// for each worker, add an edge from the balancer to the worker, then wire
|
||||
// it to the merge element
|
||||
balancer ~> worker.async ~> merge
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue