Merge branch 'master' into wip-multi-dc-merge-master-patriknw
This commit is contained in:
commit
6ed3295acd
393 changed files with 11343 additions and 9108 deletions
|
|
@ -34,43 +34,41 @@ class SharedMutableStateDocSpec {
|
|||
|
||||
def expensiveCalculation(actorRef: ActorRef): String = {
|
||||
// this is a very costly operation
|
||||
"Meaning of live is 42"
|
||||
"Meaning of life is 42"
|
||||
}
|
||||
|
||||
def expensiveCalculation(): String = {
|
||||
// this is a very costly operation
|
||||
"Meaning of live is 42"
|
||||
"Meaning of life is 42"
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case _ =>
|
||||
|
||||
//Wrong ways
|
||||
implicit val ec = context.dispatcher
|
||||
implicit val timeout = Timeout(5 seconds) // needed for `?` below
|
||||
|
||||
// Very bad, shared mutable state,
|
||||
// will break your application in weird ways
|
||||
// Example of incorrect approach
|
||||
// Very bad: shared mutable state will cause your
|
||||
// application to break in weird ways
|
||||
Future { state = "This will race" }
|
||||
((echoActor ? Message("With this other one")).mapTo[Message])
|
||||
.foreach { received => state = received.msg }
|
||||
|
||||
// Very bad, shared mutable object,
|
||||
// the other actor cand mutate your own state,
|
||||
// Very bad: shared mutable object allows
|
||||
// the other actor to mutate your own state,
|
||||
// or worse, you might get weird race conditions
|
||||
cleanUpActor ! mySet
|
||||
|
||||
// Very bad, "sender" changes for every message,
|
||||
// Very bad: "sender" changes for every message,
|
||||
// shared mutable state bug
|
||||
Future { expensiveCalculation(sender()) }
|
||||
|
||||
//Right ways
|
||||
|
||||
// Completely safe, "self" is OK to close over
|
||||
// Example of correct approach
|
||||
// Completely safe: "self" is OK to close over
|
||||
// and it's an ActorRef, which is thread-safe
|
||||
Future { expensiveCalculation() } foreach { self ! _ }
|
||||
|
||||
// Completely safe, we close over a fixed value
|
||||
// Completely safe: we close over a fixed value
|
||||
// and it's an ActorRef, which is thread-safe
|
||||
val currentSender = sender()
|
||||
Future { expensiveCalculation(currentSender) }
|
||||
|
|
|
|||
33
akka-docs/src/test/scala/docs/actor/TimerDocSpec.scala
Normal file
33
akka-docs/src/test/scala/docs/actor/TimerDocSpec.scala
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package docs.actor
|
||||
|
||||
import akka.actor.Actor
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object TimerDocSpec {
|
||||
//#timers
|
||||
import akka.actor.Timers
|
||||
|
||||
object MyActor {
|
||||
private case object TickKey
|
||||
private case object FirstTick
|
||||
private case object Tick
|
||||
private case object LaterTick
|
||||
}
|
||||
|
||||
class MyActor extends Actor with Timers {
|
||||
import MyActor._
|
||||
timers.startSingleTimer(TickKey, FirstTick, 500.millis)
|
||||
|
||||
def receive = {
|
||||
case FirstTick =>
|
||||
// do something useful here
|
||||
timers.startPeriodicTimer(TickKey, Tick, 1.second)
|
||||
case Tick =>
|
||||
// do something useful here
|
||||
}
|
||||
}
|
||||
//#timers
|
||||
}
|
||||
|
|
@ -78,7 +78,7 @@ abstract class FactorialFrontend2 extends Actor {
|
|||
AdaptiveLoadBalancingGroup(HeapMetricsSelector),
|
||||
ClusterRouterGroupSettings(
|
||||
totalInstances = 100, routeesPaths = List("/user/factorialBackend"),
|
||||
allowLocalRoutees = true, useRole = Some("backend"))).props(),
|
||||
allowLocalRoutees = true, useRoles = Set("backend"))).props(),
|
||||
name = "factorialBackendRouter2")
|
||||
|
||||
//#router-lookup-in-code
|
||||
|
|
@ -96,7 +96,7 @@ abstract class FactorialFrontend3 extends Actor {
|
|||
ClusterRouterPool(AdaptiveLoadBalancingPool(
|
||||
SystemLoadAverageMetricsSelector), ClusterRouterPoolSettings(
|
||||
totalInstances = 100, maxInstancesPerNode = 3,
|
||||
allowLocalRoutees = false, useRole = Some("backend"))).props(Props[FactorialBackend]),
|
||||
allowLocalRoutees = false, useRoles = Set("backend"))).props(Props[FactorialBackend]),
|
||||
name = "factorialBackendRouter3")
|
||||
//#router-deploy-in-code
|
||||
}
|
||||
|
|
|
|||
|
|
@ -92,6 +92,28 @@ object DispatcherDocSpec {
|
|||
}
|
||||
//#my-thread-pool-dispatcher-config
|
||||
|
||||
//#affinity-pool-dispatcher-config
|
||||
affinity-pool-dispatcher {
|
||||
# Dispatcher is the name of the event-based dispatcher
|
||||
type = Dispatcher
|
||||
# What kind of ExecutionService to use
|
||||
executor = "affinity-pool-executor"
|
||||
# Configuration for the thread pool
|
||||
affinity-pool-executor {
|
||||
# Min number of threads to cap factor-based parallelism number to
|
||||
parallelism-min = 8
|
||||
# Parallelism (threads) ... ceil(available processors * factor)
|
||||
parallelism-factor = 1
|
||||
# Max number of threads to cap factor-based parallelism number to
|
||||
parallelism-max = 16
|
||||
}
|
||||
# Throughput defines the maximum number of messages to be
|
||||
# processed per actor before the thread jumps to the next actor.
|
||||
# Set to 1 for as fair as possible.
|
||||
throughput = 100
|
||||
}
|
||||
//#affinity-pool-dispatcher-config
|
||||
|
||||
//#fixed-pool-size-dispatcher-config
|
||||
blocking-io-dispatcher {
|
||||
type = Dispatcher
|
||||
|
|
@ -294,6 +316,14 @@ class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) {
|
|||
//#defining-pinned-dispatcher
|
||||
}
|
||||
|
||||
"defining affinity-pool dispatcher" in {
|
||||
val context = system
|
||||
//#defining-affinity-pool-dispatcher
|
||||
val myActor =
|
||||
context.actorOf(Props[MyActor].withDispatcher("affinity-pool-dispatcher"), "myactor4")
|
||||
//#defining-affinity-pool-dispatcher
|
||||
}
|
||||
|
||||
"looking up a dispatcher" in {
|
||||
//#lookup
|
||||
// for use with Futures, Scheduler, etc.
|
||||
|
|
|
|||
|
|
@ -15,9 +15,6 @@ import com.typesafe.config.Config
|
|||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
import scala.Option
|
||||
|
||||
// Marker trait used for mailbox requirements mapping
|
||||
trait MyUnboundedMessageQueueSemantics
|
||||
|
||||
object MyUnboundedMailbox {
|
||||
// This is the MessageQueue implementation
|
||||
class MyMessageQueue extends MessageQueue
|
||||
|
|
@ -58,3 +55,8 @@ class MyUnboundedMailbox extends MailboxType
|
|||
new MyMessageQueue()
|
||||
}
|
||||
//#mailbox-implementation-example
|
||||
|
||||
//#mailbox-marker-interface
|
||||
// Marker trait used for mailbox requirements mapping
|
||||
trait MyUnboundedMessageQueueSemantics
|
||||
//#mailbox-marker-interface
|
||||
|
|
@ -1,101 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package docs.pattern
|
||||
|
||||
import language.postfixOps
|
||||
|
||||
import akka.actor.{ Props, ActorRef, Actor }
|
||||
import scala.concurrent.duration._
|
||||
import akka.testkit.{ TimingTest, AkkaSpec, filterException }
|
||||
import docs.pattern.SchedulerPatternSpec.ScheduleInConstructor
|
||||
|
||||
object SchedulerPatternSpec {
|
||||
//#schedule-constructor
|
||||
class ScheduleInConstructor extends Actor {
|
||||
import context.dispatcher
|
||||
val tick =
|
||||
context.system.scheduler.schedule(500 millis, 1000 millis, self, "tick")
|
||||
//#schedule-constructor
|
||||
// this var and constructor is declared here to not show up in the docs
|
||||
var target: ActorRef = null
|
||||
def this(target: ActorRef) = { this(); this.target = target }
|
||||
//#schedule-constructor
|
||||
|
||||
override def postStop() = tick.cancel()
|
||||
|
||||
def receive = {
|
||||
case "tick" =>
|
||||
// do something useful here
|
||||
//#schedule-constructor
|
||||
target ! "tick"
|
||||
case "restart" =>
|
||||
throw new ArithmeticException
|
||||
//#schedule-constructor
|
||||
}
|
||||
}
|
||||
//#schedule-constructor
|
||||
|
||||
//#schedule-receive
|
||||
class ScheduleInReceive extends Actor {
|
||||
import context._
|
||||
//#schedule-receive
|
||||
// this var and constructor is declared here to not show up in the docs
|
||||
var target: ActorRef = null
|
||||
def this(target: ActorRef) = { this(); this.target = target }
|
||||
//#schedule-receive
|
||||
|
||||
override def preStart() =
|
||||
system.scheduler.scheduleOnce(500 millis, self, "tick")
|
||||
|
||||
// override postRestart so we don't call preStart and schedule a new message
|
||||
override def postRestart(reason: Throwable) = {}
|
||||
|
||||
def receive = {
|
||||
case "tick" =>
|
||||
// send another periodic tick after the specified delay
|
||||
system.scheduler.scheduleOnce(1000 millis, self, "tick")
|
||||
// do something useful here
|
||||
//#schedule-receive
|
||||
target ! "tick"
|
||||
case "restart" =>
|
||||
throw new ArithmeticException
|
||||
//#schedule-receive
|
||||
}
|
||||
}
|
||||
//#schedule-receive
|
||||
}
|
||||
|
||||
class SchedulerPatternSpec extends AkkaSpec {
|
||||
|
||||
def testSchedule(actor: ActorRef, startDuration: FiniteDuration,
|
||||
afterRestartDuration: FiniteDuration) = {
|
||||
|
||||
filterException[ArithmeticException] {
|
||||
within(startDuration) {
|
||||
expectMsg("tick")
|
||||
expectMsg("tick")
|
||||
expectMsg("tick")
|
||||
}
|
||||
actor ! "restart"
|
||||
within(afterRestartDuration) {
|
||||
expectMsg("tick")
|
||||
expectMsg("tick")
|
||||
}
|
||||
system.stop(actor)
|
||||
}
|
||||
}
|
||||
|
||||
"send periodic ticks from the constructor" taggedAs TimingTest in {
|
||||
testSchedule(
|
||||
system.actorOf(Props(classOf[ScheduleInConstructor], testActor)),
|
||||
3000 millis, 2000 millis)
|
||||
}
|
||||
|
||||
"send ticks from the preStart and receive" taggedAs TimingTest in {
|
||||
testSchedule(
|
||||
system.actorOf(Props(classOf[ScheduleInConstructor], testActor)),
|
||||
3000 millis, 2500 millis)
|
||||
}
|
||||
}
|
||||
|
|
@ -4,8 +4,11 @@
|
|||
|
||||
package docs.persistence
|
||||
|
||||
import java.io.NotSerializableException
|
||||
|
||||
import scala.language.reflectiveCalls
|
||||
import java.nio.charset.Charset
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.persistence.journal.{ EventAdapter, EventSeq }
|
||||
import akka.serialization.{ SerializationExtension, SerializerWithStringManifest }
|
||||
|
|
@ -13,6 +16,7 @@ import akka.testkit.TestKit
|
|||
import com.typesafe.config._
|
||||
import org.scalatest.WordSpec
|
||||
import spray.json.JsObject
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import docs.persistence.proto.FlightAppModels
|
||||
|
||||
|
|
@ -82,7 +86,7 @@ class ProtobufReadOptional {
|
|||
// use generated protobuf serializer
|
||||
seatReserved(FlightAppModels.SeatReserved.parseFrom(bytes))
|
||||
case _ =>
|
||||
throw new IllegalArgumentException("Unable to handle manifest: " + manifest)
|
||||
throw new NotSerializableException("Unable to handle manifest: " + manifest)
|
||||
}
|
||||
|
||||
override def toBinary(o: AnyRef): Array[Byte] = o match {
|
||||
|
|
@ -197,7 +201,7 @@ object SimplestCustomSerializer {
|
|||
val nameAndSurname = new String(bytes, Utf8)
|
||||
val Array(name, surname) = nameAndSurname.split("[|]")
|
||||
Person(name, surname)
|
||||
case _ => throw new IllegalArgumentException(
|
||||
case _ => throw new NotSerializableException(
|
||||
s"Unable to deserialize from bytes, manifest was: $manifest! Bytes length: " +
|
||||
bytes.length)
|
||||
}
|
||||
|
|
@ -317,7 +321,7 @@ class RenamedEventAwareSerializer extends SerializerWithStringManifest {
|
|||
manifest match {
|
||||
case OldPayloadClassName => SamplePayload(new String(bytes, Utf8))
|
||||
case MyPayloadClassName => SamplePayload(new String(bytes, Utf8))
|
||||
case other => throw new Exception(s"unexpected manifest [$other]")
|
||||
case other => throw new NotSerializableException(s"unexpected manifest [$other]")
|
||||
}
|
||||
}
|
||||
//#string-serializer-handle-rename
|
||||
|
|
|
|||
|
|
@ -89,4 +89,57 @@ class FlowErrorDocSpec extends AkkaSpec {
|
|||
Await.result(result, 3.seconds) should be(Vector(0, 1, 4, 0, 5, 12))
|
||||
}
|
||||
|
||||
"demonstrate recover" in {
|
||||
implicit val materializer = ActorMaterializer()
|
||||
//#recover
|
||||
Source(0 to 6).map(n =>
|
||||
if (n < 5) n.toString
|
||||
else throw new RuntimeException("Boom!")
|
||||
).recover {
|
||||
case _: RuntimeException => "stream truncated"
|
||||
}.runForeach(println)
|
||||
//#recover
|
||||
|
||||
/*
|
||||
Output:
|
||||
//#recover-output
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
stream truncated
|
||||
//#recover-output
|
||||
*/
|
||||
}
|
||||
|
||||
"demonstrate recoverWithRetries" in {
|
||||
implicit val materializer = ActorMaterializer()
|
||||
//#recoverWithRetries
|
||||
val planB = Source(List("five", "six", "seven", "eight"))
|
||||
|
||||
Source(0 to 10).map(n =>
|
||||
if (n < 5) n.toString
|
||||
else throw new RuntimeException("Boom!")
|
||||
).recoverWithRetries(attempts = 1, {
|
||||
case _: RuntimeException => planB
|
||||
}).runForeach(println)
|
||||
//#recoverWithRetries
|
||||
|
||||
/*
|
||||
Output:
|
||||
//#recoverWithRetries-output
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
five
|
||||
six
|
||||
seven
|
||||
eight
|
||||
//#recoverWithRetries-output
|
||||
*/
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import java.util.concurrent.ThreadLocalRandom
|
|||
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, StageLogging }
|
||||
import akka.testkit.{ AkkaSpec, EventFilter }
|
||||
|
||||
class GraphStageLoggingDocSpec extends AkkaSpec("akka.loglevel = DEBUG") {
|
||||
|
|
@ -16,6 +15,8 @@ class GraphStageLoggingDocSpec extends AkkaSpec("akka.loglevel = DEBUG") {
|
|||
implicit val ec = system.dispatcher
|
||||
|
||||
//#stage-with-logging
|
||||
import akka.stream.stage.{ GraphStage, GraphStageLogic, OutHandler, StageLogging }
|
||||
|
||||
final class RandomLettersSource extends GraphStage[SourceShape[String]] {
|
||||
val out = Outlet[String]("RandomLettersSource.out")
|
||||
override val shape: SourceShape[String] = SourceShape(out)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.testkit.AkkaSpec
|
|||
import docs.CompileOnlySpec
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.stream.ThrottleMode
|
||||
|
||||
class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
|
||||
implicit val materializer = ActorMaterializer()
|
||||
|
|
@ -104,6 +105,86 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
|
|||
//#pub-sub-4
|
||||
}
|
||||
|
||||
"demonstrate creating a dynamic partition hub" in compileOnlySpec {
|
||||
//#partition-hub
|
||||
// A simple producer that publishes a new "message-" every second
|
||||
val producer = Source.tick(1.second, 1.second, "message")
|
||||
.zipWith(Source(1 to 100))((a, b) => s"$a-$b")
|
||||
|
||||
// Attach a PartitionHub Sink to the producer. This will materialize to a
|
||||
// corresponding Source.
|
||||
// (We need to use toMat and Keep.right since by default the materialized
|
||||
// value to the left is used)
|
||||
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
|
||||
producer.toMat(PartitionHub.sink(
|
||||
(size, elem) => math.abs(elem.hashCode) % size,
|
||||
startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right)
|
||||
|
||||
// By running/materializing the producer, we get back a Source, which
|
||||
// gives us access to the elements published by the producer.
|
||||
val fromProducer: Source[String, NotUsed] = runnableGraph.run()
|
||||
|
||||
// Print out messages from the producer in two independent consumers
|
||||
fromProducer.runForeach(msg => println("consumer1: " + msg))
|
||||
fromProducer.runForeach(msg => println("consumer2: " + msg))
|
||||
//#partition-hub
|
||||
}
|
||||
|
||||
"demonstrate creating a dynamic stateful partition hub" in compileOnlySpec {
|
||||
//#partition-hub-stateful
|
||||
// A simple producer that publishes a new "message-" every second
|
||||
val producer = Source.tick(1.second, 1.second, "message")
|
||||
.zipWith(Source(1 to 100))((a, b) => s"$a-$b")
|
||||
|
||||
// New instance of the partitioner function and its state is created
|
||||
// for each materialization of the PartitionHub.
|
||||
def roundRobin(): (PartitionHub.ConsumerInfo, String) ⇒ Long = {
|
||||
var i = -1L
|
||||
|
||||
(info, elem) => {
|
||||
i += 1
|
||||
info.consumerIdByIdx((i % info.size).toInt)
|
||||
}
|
||||
}
|
||||
|
||||
// Attach a PartitionHub Sink to the producer. This will materialize to a
|
||||
// corresponding Source.
|
||||
// (We need to use toMat and Keep.right since by default the materialized
|
||||
// value to the left is used)
|
||||
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
|
||||
producer.toMat(PartitionHub.statefulSink(
|
||||
() => roundRobin(),
|
||||
startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right)
|
||||
|
||||
// By running/materializing the producer, we get back a Source, which
|
||||
// gives us access to the elements published by the producer.
|
||||
val fromProducer: Source[String, NotUsed] = runnableGraph.run()
|
||||
|
||||
// Print out messages from the producer in two independent consumers
|
||||
fromProducer.runForeach(msg => println("consumer1: " + msg))
|
||||
fromProducer.runForeach(msg => println("consumer2: " + msg))
|
||||
//#partition-hub-stateful
|
||||
}
|
||||
|
||||
"demonstrate creating a dynamic partition hub routing to fastest consumer" in compileOnlySpec {
|
||||
//#partition-hub-fastest
|
||||
val producer = Source(0 until 100)
|
||||
|
||||
// ConsumerInfo.queueSize is the approximate number of buffered elements for a consumer.
|
||||
// Note that this is a moving target since the elements are consumed concurrently.
|
||||
val runnableGraph: RunnableGraph[Source[Int, NotUsed]] =
|
||||
producer.toMat(PartitionHub.statefulSink(
|
||||
() => (info, elem) ⇒ info.consumerIds.minBy(id ⇒ info.queueSize(id)),
|
||||
startAfterNrOfConsumers = 2, bufferSize = 16))(Keep.right)
|
||||
|
||||
val fromProducer: Source[Int, NotUsed] = runnableGraph.run()
|
||||
|
||||
fromProducer.runForeach(msg => println("consumer1: " + msg))
|
||||
fromProducer.throttle(10, 100.millis, 10, ThrottleMode.Shaping)
|
||||
.runForeach(msg => println("consumer2: " + msg))
|
||||
//#partition-hub-fastest
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
67
akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala
Normal file
67
akka-docs/src/test/scala/docs/stream/RestartDocSpec.scala
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package docs.stream
|
||||
|
||||
import akka.NotUsed
|
||||
import akka.stream.{ ActorMaterializer, KillSwitches }
|
||||
import akka.stream.scaladsl._
|
||||
import akka.testkit.AkkaSpec
|
||||
import docs.CompileOnlySpec
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent._
|
||||
|
||||
class RestartDocSpec extends AkkaSpec with CompileOnlySpec {
|
||||
implicit val materializer = ActorMaterializer()
|
||||
import system.dispatcher
|
||||
|
||||
// Mock akka-http interfaces
|
||||
object Http {
|
||||
def apply() = this
|
||||
def singleRequest(req: HttpRequest) = Future.successful(())
|
||||
}
|
||||
case class HttpRequest(uri: String)
|
||||
case class Unmarshal(b: Any) {
|
||||
def to[T]: Future[T] = Promise[T]().future
|
||||
}
|
||||
case class ServerSentEvent()
|
||||
|
||||
def doSomethingElse(): Unit = ()
|
||||
|
||||
"Restart stages" should {
|
||||
|
||||
"demonstrate a restart with backoff source" in compileOnlySpec {
|
||||
|
||||
//#restart-with-backoff-source
|
||||
val restartSource = RestartSource.withBackoff(
|
||||
minBackoff = 3.seconds,
|
||||
maxBackoff = 30.seconds,
|
||||
randomFactor = 0.2 // adds 20% "noise" to vary the intervals slightly
|
||||
) { () =>
|
||||
// Create a source from a future of a source
|
||||
Source.fromFutureSource {
|
||||
// Make a single request with akka-http
|
||||
Http().singleRequest(HttpRequest(
|
||||
uri = "http://example.com/eventstream"
|
||||
))
|
||||
// Unmarshall it as a source of server sent events
|
||||
.flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
|
||||
}
|
||||
}
|
||||
//#restart-with-backoff-source
|
||||
|
||||
//#with-kill-switch
|
||||
val killSwitch = restartSource
|
||||
.viaMat(KillSwitches.single)(Keep.right)
|
||||
.toMat(Sink.foreach(event => println(s"Got event: $event")))(Keep.left)
|
||||
.run()
|
||||
|
||||
doSomethingElse()
|
||||
|
||||
killSwitch.shutdown()
|
||||
//#with-kill-switch
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -29,8 +29,9 @@ object TwitterStreamQuickstartDocSpec {
|
|||
final case class Hashtag(name: String)
|
||||
|
||||
final case class Tweet(author: Author, timestamp: Long, body: String) {
|
||||
def hashtags: Set[Hashtag] =
|
||||
body.split(" ").collect { case t if t.startsWith("#") => Hashtag(t) }.toSet
|
||||
def hashtags: Set[Hashtag] = body.split(" ").collect {
|
||||
case t if t.startsWith("#") => Hashtag(t.replaceAll("[^#\\w]", ""))
|
||||
}.toSet
|
||||
}
|
||||
|
||||
val akkaTag = Hashtag("#akka")
|
||||
|
|
@ -150,10 +151,12 @@ class TwitterStreamQuickstartDocSpec extends AkkaSpec {
|
|||
|
||||
//#fiddle_code
|
||||
tweets
|
||||
.filterNot(_.hashtags.contains(akkaTag))
|
||||
.mapConcat(_.hashtags)
|
||||
.map(_.name.toUpperCase)
|
||||
.runWith(Sink.foreach(println))
|
||||
.filterNot(_.hashtags.contains(akkaTag)) // Remove all tweets containing #akka hashtag
|
||||
.map(_.hashtags) // Get all sets of hashtags ...
|
||||
.reduce(_ ++ _) // ... and reduce them to a single set, removing duplicates across all tweets
|
||||
.mapConcat(identity) // Flatten the stream of tweets to a stream of hashtags
|
||||
.map(_.name.toUpperCase) // Convert all hashtags to upper case
|
||||
.runWith(Sink.foreach(println)) // Attach the Flow to a Sink that will finally print the hashtags
|
||||
|
||||
// $FiddleDependency org.akka-js %%% akkajsactorstream % 1.2.5.1
|
||||
//#fiddle_code
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue