diff --git a/akka-docs-new/src/main/paradox/guide/actors-intro.md b/akka-docs-new/src/main/paradox/guide/actors-intro.md index 5d60d673e4..78a8cb08ec 100644 --- a/akka-docs-new/src/main/paradox/guide/actors-intro.md +++ b/akka-docs-new/src/main/paradox/guide/actors-intro.md @@ -13,7 +13,7 @@ it can only modified by invoking a set of curated methods. The object is respons that protect the invariant nature of its encapsulated data. For example, operations on an ordered binary tree implementation must not allow violation of the tree ordering -constraint. Callers rely on the ordering being intact. For example, when querying the tree for a certain piece of +invariant. Callers rely on the ordering being intact. For example, when querying the tree for a certain piece of data, you need to be able to rely on this constraint. When we analyze OOP runtime behavior, we sometimes draw a message sequence chart showing the interactions of @@ -40,14 +40,14 @@ can be interleaved in arbitrary ways which eliminates any hope for keeping the i type of coordination between two threads. Now, imagine this issue compounded by the existence of many threads. The common approach for solving this problem is to add a lock around these methods. While this ensures that at most -ne thread will enter the method at any given time, this is a very costly strategy: +one thread will enter the method at any given time, this is a very costly strategy: * Locks _seriously limit_ concurrency, they are very costly on modern CPU architectures, requiring heavy-lifting from the operating system to suspend the thread and restore it later. * The caller thread is now blocked, so it cannot do any other meaningful work. Even in desktop applications this is unacceptable, we want to keep user facing parts of an applications (its UI) to be responsive even when a long background job is running. In backend, server settings, this is outright wasteful. One might think that this can be compensated by launching new threads, but threads are also a costly abstraction. - * Locks introduce a new menace, deadlocks. + * Locks introduce a new menace: deadlocks. These realities result in a no-win situation: * Without sufficient locks, state gets corrupted. @@ -190,8 +190,10 @@ Instead, the receiving actor delivers the results in a reply message. The second key change we need in our model is to reinstate encapsulation. Actors react to messages just like objects "react" to methods invoked on them. The difference is that instead of multiple threads "protruding" into our actor and wreaking havoc to internal state and invariants, actors execute independently from the senders of a message, and they -react to incoming messages sequentially, one at a time. There is always at most one message being processed, meaning -that invariants can be kept without synchronization. This happens automatically without using locks: +react to incoming messages sequentially, one at a time. While each actor processes messages sent to it sequentially, +different actors work concurrently with each other so an actor system can process as many messages simultaneously +as many processor cores are available on the machine. Since there is always at most one message being processed per actor +the invariants of an actor can be kept without synchronization. This happens automatically without using locks: TODO: SERIALIZED-TIMELINE-INVARIANTS diff --git a/akka-docs-new/src/main/paradox/guide/index.md b/akka-docs-new/src/main/paradox/guide/index.md index c5789e9136..7e76a3f9f0 100644 --- a/akka-docs-new/src/main/paradox/guide/index.md +++ b/akka-docs-new/src/main/paradox/guide/index.md @@ -4,7 +4,7 @@ * [What is Akka?](introduction.md) * [What are Actors?](actors-intro.md) - * [Akka Libraries and modules](modules.md) + * [Akka Libraries and Modules](modules.md) * [Your First Akka Application - Hello World](quickstart.md) @@@ \ No newline at end of file diff --git a/akka-docs-new/src/main/paradox/guide/introduction.md b/akka-docs-new/src/main/paradox/guide/introduction.md index 30fe85e7dc..95c7e1b3a7 100644 --- a/akka-docs-new/src/main/paradox/guide/introduction.md +++ b/akka-docs-new/src/main/paradox/guide/introduction.md @@ -10,7 +10,7 @@ crash without responding, messages get lost without a trace on the wire, and net These problems occur regularly in carefully managed intra-datacenter environments - even more so in virtualized architectures. -To deal with these realities, Akka provides +To deal with these realities, Akka provides: * Multi-threaded behavior without use of low-level concurrency constructs like atomics or locks. You do not even need to think about memory visibility issues. @@ -36,7 +36,7 @@ But relatively recently, their applicability to the challenges of modern computi proved to be effective. The actor model provides an abstraction that allows you to think about your code in terms of communication, not unlike -people in a large organization. The basic characteristic of actors that they model the world as stateful entities +people in a large organization. The basic characteristic of actors is that they model the world as stateful entities communicating with each other by explicit message passing. As computational entities, actors have these characteristics: diff --git a/akka-docs-new/src/main/paradox/guide/modules.md b/akka-docs-new/src/main/paradox/guide/modules.md index 378e9968d4..a39b320676 100644 --- a/akka-docs-new/src/main/paradox/guide/modules.md +++ b/akka-docs-new/src/main/paradox/guide/modules.md @@ -13,7 +13,7 @@ Unlike objects, actors encapsulate not only their state but their execution. Communication with actors is not via method calls but by passing messages. While this difference seems to be minor, this is actually what allows us to break clean from the limitations of OOP when it comes to concurrency and remote communication. Don’t worry if this description feels too high level to fully grasp -yet, in the next chapter we will explain actors in detail. For now, the important point that this is a model that +yet, in the next chapter we will explain actors in detail. For now, the important point is that this is a model that handles concurrency and distribution at the fundamental level instead of ad hoc patched attempts to bring these features to OOP. @@ -45,8 +45,8 @@ Some of the problems Remoting solves are If you have a set of actor systems that cooperate to solve some business problem, then you likely want to manage these set of systems in a disciplined way. While Remoting solves the problem of addressing and communicating with components of remote systems, Clustering gives you the ability to organize these into a "meta-system" tied together by a membership -protocol. **In most of the cases, you want to use the Cluster module instead of using Remoting directly.** -Cluster provides an additional set of services on top of Remoting that most of the real world applications need. +protocol. **In most cases, you want to use the Cluster module instead of using Remoting directly.** +Cluster provides an additional set of services on top of Remoting that most real world applications need. The problems the Cluster module solves (among others) are @@ -57,36 +57,6 @@ The problems the Cluster module solves (among others) are * How to distribute computations among the current set of members? * How do I designate members of the cluster to a certain role, in other words to provide certain services and not others? -### Persistence - -Just like objects in OOP actors keep their state in volatile memory. Once the system is shut down, gracefully or -because of a crash, all data that was in memory is lost. Persistence provide patterns to enable actors to persist -events that lead to their current state. Upon startup events can be replayed to restore the state of the entity hosted -by the actor. The event stream can be queried and fed into additional processing pipelines (an external Big Data -cluster for example) or alternate views (like reports). - -Persistence tackles the following problems: - -* How do I restore the state of an entity/actor when system restarts or crashes? -* How do I implement a [CQRS system](https://msdn.microsoft.com/en-us/library/jj591573.aspx)? -* How do I ensure reliable delivery of messages in face of network errors and system crashes? -* How do I introspect domain events that has lead an entity to its current state? - -### Cluster Singleton - -A common (in fact, a bit too common) use case in distributed systems is to have a single entity responsible -for a given task which is shared among other members of the cluster and migrated if the host system fails. -While this undeniably introduces a common bottleneck for the whole cluster that limits scaling, -there are scenarios where the use of this pattern is unavoidable. Cluster singleton allows a cluster to elect an -actor system which will host a particular actor while other systems can always access said service independently from -where it is. - -The Singleton module can be used to solve these problems: - -* How do I ensure that only one instance is running of a service in the whole cluster? -* How do I ensure that the service is up even if the system hosting it currently crashes or shut down during the process of scaling down? -* How do I reach this instance from any member of the cluster assuming that it can migrate to other systems over time? - ### Cluster Sharding Persistence solves the problem of restoring an actor’s state from persistent storage after system restart or crash. @@ -101,6 +71,21 @@ The problem space that Sharding targets: * How do I ensure migrating entities from a crashed system without losing state? * How do I ensure that an entity does not exist on multiple systems at the same time and hence kept consistent? +### Cluster Singleton + +A common (in fact, a bit too common) use case in distributed systems is to have a single entity responsible +for a given task which is shared among other members of the cluster and migrated if the host system fails. +While this undeniably introduces a common bottleneck for the whole cluster that limits scaling, +there are scenarios where the use of this pattern is unavoidable. Cluster singleton allows a cluster to elect an +actor system which will host a particular actor while other systems can always access said service independently from +where it is. + +The Singleton module can be used to solve these problems: + +* How do I ensure that only one instance is running of a service in the whole cluster? +* How do I ensure that the service is up even if the system hosting it currently crashes or shut down during the process of scaling down? +* How do I reach this instance from any member of the cluster assuming that it can migrate to other systems over time? + ### Cluster Publish-Subscribe For coordination among systems it is often necessary to distribute messages to all, or one system of a set of @@ -112,6 +97,21 @@ to broadcast or anycast messages to subscribers of that topic. * How do I anycast messages to a member from an interested set of parties in a cluster? * How to subscribe and unsubscribe for events of a certain topic in the cluster? +### Persistence + +Just like objects in OOP actors keep their state in volatile memory. Once the system is shut down, gracefully or +because of a crash, all data that was in memory is lost. Persistence provide patterns to enable actors to persist +events that lead to their current state. Upon startup events can be replayed to restore the state of the entity hosted +by the actor. The event stream can be queried and fed into additional processing pipelines (an external Big Data +cluster for example) or alternate views (like reports). + +Persistence tackles the following problems: + +* How do I restore the state of an entity/actor when system restarts or crashes? +* How do I implement a [CQRS system](https://msdn.microsoft.com/en-us/library/jj591573.aspx)? +* How do I ensure reliable delivery of messages in face of network errors and system crashes? +* How do I introspect domain events that has lead an entity to its current state? + ### Streams Actors are a fundamental model for concurrency, but there are common patterns where their use requires the user @@ -152,14 +152,4 @@ it into a streaming BigData engine. Take the output of that engine as a Stream, operators and expose it as websocket connections served by a load balanced set of HTTP servers hosted by your cluster to power your real-time business analytics tool. -Got you interested? - - - - - - - - - - +Got you interested? diff --git a/akka-docs-new/src/test/scala/tutorial_3/Device.scala b/akka-docs-new/src/test/scala/tutorial_3/Device.scala index 92e29d19d4..0772307608 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/Device.scala +++ b/akka-docs-new/src/test/scala/tutorial_3/Device.scala @@ -11,30 +11,31 @@ object Device { def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId)) - case class RecordTemperature(requestId: Long, value: Double) - case class TemperatureRecorded(requestId: Long) + final case class RecordTemperature(requestId: Long, value: Double) + final case class TemperatureRecorded(requestId: Long) - case class ReadTemperature(requestId: Long) - case class RespondTemperature(requestId: Long, value: Option[Double]) + final case class ReadTemperature(requestId: Long) + final case class RespondTemperature(requestId: Long, value: Option[Double]) } class Device(groupId: String, deviceId: String) extends Actor with ActorLogging { var lastTemperatureReading: Option[Double] = None - override def preStart(): Unit = log.info(s"Device actor $groupId-$deviceId started") + override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId) - override def postStop(): Unit = log.info(s"Device actor $groupId-$deviceId stopped") + override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId) override def receive: Receive = { case RequestTrackDevice(`groupId`, `deviceId`) => sender() ! DeviceRegistered case RequestTrackDevice(groupId, deviceId) => - log.warning(s"Ignoring TrackDevice request for $groupId-$deviceId. " + - s"This actor is responsible for ${this.groupId}-${this.deviceId}.") + log.warning( + "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", + groupId, deviceId, this.groupId, this.deviceId) case RecordTemperature(id, value) => - log.info(s"Recorded temperature reading $value with $id") + log.info("Recorded temperature reading {} with {}", value, id) lastTemperatureReading = Some(value) sender() ! TemperatureRecorded(id) diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroup.scala b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroup.scala index 99a851214c..4ada3abb85 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroup.scala +++ b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroup.scala @@ -3,7 +3,7 @@ */ package tutorial_3 -import akka.actor.{ Actor, ActorLogging, ActorRef, Cancellable, Props, Stash, Terminated } +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } import tutorial_3.DeviceGroup._ import tutorial_3.DeviceManager.RequestTrackDevice import scala.concurrent.duration._ @@ -12,125 +12,64 @@ object DeviceGroup { def props(groupId: String): Props = Props(new DeviceGroup(groupId)) - case class RequestDeviceList(requestId: Long) - case class ReplyDeviceList(requestId: Long, ids: Set[String]) + final case class RequestDeviceList(requestId: Long) + final case class ReplyDeviceList(requestId: Long, ids: Set[String]) - case class RequestAllTemperatures(requestId: Long) - case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, Option[Double]]) + final case class RequestAllTemperatures(requestId: Long) + final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading]) - case class CollectionTimeout(requestId: Long) + sealed trait TemperatureReading + final case class Temperature(value: Double) extends TemperatureReading + case object TemperatureNotAvailable extends TemperatureReading + case object DeviceNotAvailable extends TemperatureReading + case object DeviceTimedOut extends TemperatureReading } -class DeviceGroup(groupId: String) extends Actor with ActorLogging with Stash { +class DeviceGroup(groupId: String) extends Actor with ActorLogging { var deviceIdToActor = Map.empty[String, ActorRef] var actorToDeviceId = Map.empty[ActorRef, String] var nextCollectionId = 0L - override def preStart(): Unit = log.info(s"DeviceGroup $groupId started") + override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) - override def postStop(): Unit = log.info(s"DeviceGroup $groupId stopped") + override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) - override def receive: Receive = waitingForRequest orElse generalManagement - - def waitingForRequest: Receive = { - case RequestAllTemperatures(requestId) => - import context.dispatcher - - val collectionId = nextCollectionId - val requester = sender() - nextCollectionId += 1 - val answersSoFar = deviceIdToActor.mapValues(_ => None) - context.children.foreach(_ ! Device.ReadTemperature(collectionId)) - val collectionTimeoutTimer = context.system.scheduler.scheduleOnce(3.seconds, self, CollectionTimeout(collectionId)) - - context.become( - collectResults( - collectionTimeoutTimer, - collectionId, - requester, - requestId, - answersSoFar.size, - answersSoFar) orElse generalManagement, - discardOld = false - ) - } - - def generalManagement: Receive = { + override def receive: Receive = { // Note the backticks case trackMsg @ RequestTrackDevice(`groupId`, _) => - handleTrackMessage(trackMsg) + deviceIdToActor.get(trackMsg.deviceId) match { + case Some(ref) => + ref forward trackMsg + case None => + log.info("Creating device actor for {}", trackMsg.deviceId) + val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId) + context.watch(deviceActor) + deviceActor forward trackMsg + deviceIdToActor += trackMsg.deviceId -> deviceActor + actorToDeviceId += deviceActor -> trackMsg.deviceId + } case RequestTrackDevice(groupId, deviceId) => - log.warning(s"Ignoring TrackDevice request for $groupId. This actor is responsible for ${this.groupId}.") + log.warning( + "Ignoring TrackDevice request for {}. This actor is responsible for {}.", + groupId, this.groupId) case RequestDeviceList(requestId) => sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet) case Terminated(deviceActor) => - removeDeviceActor(deviceActor) - } - - def handleTrackMessage(trackMsg: RequestTrackDevice): Unit = { - deviceIdToActor.get(trackMsg.deviceId) match { - case Some(ref) => - ref forward trackMsg - case None => - log.info(s"Creating device actor for ${trackMsg.deviceId}") - val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId) - context.watch(deviceActor) - deviceActor forward trackMsg - deviceIdToActor += trackMsg.deviceId -> deviceActor - actorToDeviceId += deviceActor -> trackMsg.deviceId - } - } - - def removeDeviceActor(deviceActor: ActorRef): Unit = { - val deviceId = actorToDeviceId(deviceActor) - log.info(s"Device actor for $deviceId has been terminated") - actorToDeviceId -= deviceActor - deviceIdToActor -= deviceId - } - - def collectResults( - timer: Cancellable, - expectedId: Long, - requester: ActorRef, - requestId: Long, - waiting: Int, - answersSoFar: Map[String, Option[Double]] - ): Receive = { - - case Device.RespondTemperature(`expectedId`, temperatureOption) => - val deviceActor = sender() val deviceId = actorToDeviceId(deviceActor) - val newAnswers = answersSoFar + (deviceId -> temperatureOption) + log.info("Device actor for {} has been terminated", deviceId) + actorToDeviceId -= deviceActor + deviceIdToActor -= deviceId - if (waiting == 1) { - requester ! RespondAllTemperatures(requestId, newAnswers) - finishCollection(timer) - } else { - context.become(collectResults(timer, expectedId, requester, requestId, waiting - 1, newAnswers)) - } - - case Terminated(deviceActor) => - val deviceId = actorToDeviceId(deviceActor) - removeDeviceActor(deviceActor) - val newAnswers = answersSoFar + (deviceId -> None) - - if (waiting == 1) { - requester ! RespondAllTemperatures(requestId, newAnswers) - finishCollection(timer: Cancellable) - } else { - context.become(collectResults(timer, expectedId, requester, requestId, waiting - 1, newAnswers)) - } - - case CollectionTimeout(`expectedId`) => - requester ! RespondAllTemperatures(requestId, answersSoFar) - } - - def finishCollection(timer: Cancellable): Unit = { - context.unbecome() - timer.cancel() + case RequestAllTemperatures(requestId) => + context.actorOf(DeviceGroupQuery.props( + actorToDeviceId = actorToDeviceId, + requestId = requestId, + requester = sender(), + 3.seconds + )) } } diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuery.scala b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuery.scala new file mode 100644 index 0000000000..f8c145b4ea --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuery.scala @@ -0,0 +1,98 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package tutorial_3 + +import akka.actor.Actor.Receive +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } + +import scala.concurrent.duration._ + +object DeviceGroupQuery { + + case object CollectionTimeout + + def props( + actorToDeviceId: Map[ActorRef, String], + requestId: Long, + requester: ActorRef, + timeout: FiniteDuration + ): Props = { + Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout)) + } +} + +class DeviceGroupQuery( + actorToDeviceId: Map[ActorRef, String], + requestId: Long, + requester: ActorRef, + timeout: FiniteDuration +) extends Actor with ActorLogging { + import DeviceGroupQuery._ + import context.dispatcher + val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout) + + override def preStart(): Unit = { + actorToDeviceId.keysIterator.foreach { deviceActor => + context.watch(deviceActor) + deviceActor ! Device.ReadTemperature(0) + } + + } + + override def postStop(): Unit = { + queryTimeoutTimer.cancel() + } + + override def receive: Receive = + waitingForReplies( + Map.empty, + actorToDeviceId.keySet + ) + + def waitingForReplies( + repliesSoFar: Map[String, DeviceGroup.TemperatureReading], + stillWaiting: Set[ActorRef] + ): Receive = { + case Device.RespondTemperature(0, valueOption) => + val deviceActor = sender() + val reading = valueOption match { + case Some(value) => DeviceGroup.Temperature(value) + case None => DeviceGroup.TemperatureNotAvailable + } + receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar) + + case Terminated(deviceActor) => + if (stillWaiting.contains(deviceActor)) + receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar) + // else ignore + + case CollectionTimeout => + val timedOutReplies = + stillWaiting.map { deviceActor => + val deviceId = actorToDeviceId(deviceActor) + deviceId -> DeviceGroup.DeviceTimedOut + } + requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies) + context.stop(self) + } + + def receivedResponse( + deviceActor: ActorRef, + reading: DeviceGroup.TemperatureReading, + stillWaiting: Set[ActorRef], + repliesSoFar: Map[String, DeviceGroup.TemperatureReading] + ): Unit = { + val deviceId = actorToDeviceId(deviceActor) + val newStillWaiting = stillWaiting - deviceActor + + val newRepliesSoFar = repliesSoFar + (deviceId -> reading) + if (newStillWaiting.isEmpty) { + requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar) + context.stop(self) + } else { + context.become(waitingForReplies(newRepliesSoFar, newStillWaiting)) + } + } + +} diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuerySpec.scala b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuerySpec.scala new file mode 100644 index 0000000000..c6efd2f576 --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuerySpec.scala @@ -0,0 +1,157 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package tutorial_3 + +import akka.actor.PoisonPill +import akka.testkit.{ AkkaSpec, TestProbe } + +import scala.concurrent.duration._ + +class DeviceGroupQuerySpec extends AkkaSpec { + + "DeviceGroupQuery" must { + + "return temperature value for working devices" in { + val requester = TestProbe() + + val device1 = TestProbe() + val device2 = TestProbe() + + val queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), + requestId = 1, + requester = requester.ref, + timeout = 3.seconds + )) + + device1.expectMsg(Device.ReadTemperature(requestId = 0)) + device2.expectMsg(Device.ReadTemperature(requestId = 0)) + + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) + + requester.expectMsg(DeviceGroup.RespondAllTemperatures( + requestId = 1, + temperatures = Map( + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.Temperature(2.0) + ) + )) + } + + "return TemperatureNotAvailable for devices with no readings" in { + val requester = TestProbe() + + val device1 = TestProbe() + val device2 = TestProbe() + + val queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), + requestId = 1, + requester = requester.ref, + timeout = 3.seconds + )) + + device1.expectMsg(Device.ReadTemperature(requestId = 0)) + device2.expectMsg(Device.ReadTemperature(requestId = 0)) + + queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref) + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) + + requester.expectMsg(DeviceGroup.RespondAllTemperatures( + requestId = 1, + temperatures = Map( + "device1" -> DeviceGroup.TemperatureNotAvailable, + "device2" -> DeviceGroup.Temperature(2.0) + ) + )) + } + + "return DeviceNotAvailable if device stops before answering" in { + val requester = TestProbe() + + val device1 = TestProbe() + val device2 = TestProbe() + + val queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), + requestId = 1, + requester = requester.ref, + timeout = 3.seconds + )) + + device1.expectMsg(Device.ReadTemperature(requestId = 0)) + device2.expectMsg(Device.ReadTemperature(requestId = 0)) + + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) + device2.ref ! PoisonPill + + requester.expectMsg(DeviceGroup.RespondAllTemperatures( + requestId = 1, + temperatures = Map( + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.DeviceNotAvailable + ) + )) + } + + "return temperature reading even if device stops after answering" in { + val requester = TestProbe() + + val device1 = TestProbe() + val device2 = TestProbe() + + val queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), + requestId = 1, + requester = requester.ref, + timeout = 3.seconds + )) + + device1.expectMsg(Device.ReadTemperature(requestId = 0)) + device2.expectMsg(Device.ReadTemperature(requestId = 0)) + + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) + device2.ref ! PoisonPill + + requester.expectMsg(DeviceGroup.RespondAllTemperatures( + requestId = 1, + temperatures = Map( + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.Temperature(2.0) + ) + )) + } + + "return DeviceTimedOut if device does not answer in time" in { + val requester = TestProbe() + + val device1 = TestProbe() + val device2 = TestProbe() + + val queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), + requestId = 1, + requester = requester.ref, + timeout = 1.second + )) + + device1.expectMsg(Device.ReadTemperature(requestId = 0)) + device2.expectMsg(Device.ReadTemperature(requestId = 0)) + + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) + + requester.expectMsg(DeviceGroup.RespondAllTemperatures( + requestId = 1, + temperatures = Map( + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.DeviceTimedOut + ) + )) + } + + } + +} diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupSpec.scala b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupSpec.scala index d59f1468bc..698fb472a7 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupSpec.scala +++ b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupSpec.scala @@ -17,18 +17,18 @@ class DeviceGroupSpec extends AkkaSpec { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor1 = probe.lastSender - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor2 = probe.lastSender // Check that the device actors are working - probe.send(deviceActor1, Device.RecordTemperature(requestId = 0, 1.0)) + deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) - probe.send(deviceActor2, Device.RecordTemperature(requestId = 1, 2.0)) + deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) } @@ -36,7 +36,7 @@ class DeviceGroupSpec extends AkkaSpec { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) - probe.send(groupActor, DeviceManager.RequestTrackDevice("wrongGroup", "device1")) + groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref) probe.expectNoMsg(500.milliseconds) } @@ -44,11 +44,11 @@ class DeviceGroupSpec extends AkkaSpec { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor1 = probe.lastSender - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor2 = probe.lastSender @@ -59,13 +59,13 @@ class DeviceGroupSpec extends AkkaSpec { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) - probe.send(groupActor, DeviceGroup.RequestDeviceList(requestId = 0)) + groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) } @@ -73,55 +73,55 @@ class DeviceGroupSpec extends AkkaSpec { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val toShutDown = probe.lastSender - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) - probe.send(groupActor, DeviceGroup.RequestDeviceList(requestId = 0)) + groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) probe.watch(toShutDown) toShutDown ! PoisonPill probe.expectTerminated(toShutDown) - probe.send(groupActor, DeviceGroup.RequestDeviceList(requestId = 0)) - probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device2"))) + groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref) + probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2"))) } "be able to collect temperatures from all active devices" in { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor1 = probe.lastSender - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor2 = probe.lastSender - probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device3")) + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor3 = probe.lastSender // Check that the device actors are working - probe.send(deviceActor1, Device.RecordTemperature(requestId = 0, 1.0)) + deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) - probe.send(deviceActor2, Device.RecordTemperature(requestId = 1, 2.0)) + deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) // No temperature for device3 - probe.send(groupActor, DeviceGroup.RequestAllTemperatures(requestId = 0)) + groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref) probe.expectMsg( DeviceGroup.RespondAllTemperatures( requestId = 0, temperatures = Map( - "device1" -> Some(1.0), - "device2" -> Some(2.0), - "device3" -> None + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.Temperature(2.0), + "device3" -> DeviceGroup.TemperatureNotAvailable ) ) ) diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceManager.scala b/akka-docs-new/src/test/scala/tutorial_3/DeviceManager.scala index 3abe30b4af..0f15f7769f 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/DeviceManager.scala +++ b/akka-docs-new/src/test/scala/tutorial_3/DeviceManager.scala @@ -10,7 +10,7 @@ import tutorial_3.DeviceManager.RequestTrackDevice object DeviceManager { def props(): Props = Props(new DeviceManager) - case class RequestTrackDevice(groupId: String, deviceId: String) + final case class RequestTrackDevice(groupId: String, deviceId: String) case object DeviceRegistered } @@ -28,7 +28,7 @@ class DeviceManager extends Actor with ActorLogging { case Some(ref) => ref forward trackMsg case None => - log.info(s"Creating device group actor for $groupId") + log.info("Creating device group actor for {}", groupId) val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId) context.watch(groupActor) groupActor forward trackMsg @@ -38,7 +38,7 @@ class DeviceManager extends Actor with ActorLogging { case Terminated(groupActor) => val groupId = actorToGroupId(groupActor) - log.info(s"Device group actor for $groupId has been terminated") + log.info("Device group actor for {} has been terminated", groupId) actorToGroupId -= groupActor groupIdToActor -= groupId diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceSpec.scala b/akka-docs-new/src/test/scala/tutorial_3/DeviceSpec.scala index 5bb79f6b02..8341d54d98 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/DeviceSpec.scala +++ b/akka-docs-new/src/test/scala/tutorial_3/DeviceSpec.scala @@ -15,7 +15,7 @@ class DeviceSpec extends AkkaSpec { val probe = TestProbe() val deviceActor = system.actorOf(Device.props("group", "device")) - probe.send(deviceActor, DeviceManager.RequestTrackDevice("group", "device")) + deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) probe.lastSender should ===(deviceActor) } @@ -24,10 +24,10 @@ class DeviceSpec extends AkkaSpec { val probe = TestProbe() val deviceActor = system.actorOf(Device.props("group", "device")) - probe.send(deviceActor, DeviceManager.RequestTrackDevice("wrongGroup", "device")) + deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref) probe.expectNoMsg(500.milliseconds) - probe.send(deviceActor, DeviceManager.RequestTrackDevice("group", "wrongDevice")) + deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref) probe.expectNoMsg(500.milliseconds) } @@ -35,7 +35,7 @@ class DeviceSpec extends AkkaSpec { val probe = TestProbe() val deviceActor = system.actorOf(Device.props("group", "device")) - probe.send(deviceActor, Device.ReadTemperature(requestId = 42)) + deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref) val response = probe.expectMsgType[Device.RespondTemperature] response.requestId should ===(42) response.value should ===(None) @@ -45,18 +45,18 @@ class DeviceSpec extends AkkaSpec { val probe = TestProbe() val deviceActor = system.actorOf(Device.props("group", "device")) - probe.send(deviceActor, Device.RecordTemperature(requestId = 1, 24.0)) + deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref) probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) - probe.send(deviceActor, Device.ReadTemperature(requestId = 2)) + deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref) val response1 = probe.expectMsgType[Device.RespondTemperature] response1.requestId should ===(2) response1.value should ===(Some(24.0)) - probe.send(deviceActor, Device.RecordTemperature(requestId = 3, 55.0)) + deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref) probe.expectMsg(Device.TemperatureRecorded(requestId = 3)) - probe.send(deviceActor, Device.ReadTemperature(requestId = 4)) + deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref) val response2 = probe.expectMsgType[Device.RespondTemperature] response2.requestId should ===(4) response2.value should ===(Some(55.0))