diff --git a/akka-docs-new/src/main/paradox/guide/index.md b/akka-docs-new/src/main/paradox/guide/index.md index 7bfbb36115..30d3807c4c 100644 --- a/akka-docs-new/src/main/paradox/guide/index.md +++ b/akka-docs-new/src/main/paradox/guide/index.md @@ -9,5 +9,6 @@ * [Your second Akka application, part 1: Top-level architecture](tutorial_1.md) * [Your second Akka application, part 2: The Device actor](tutorial_2.md) * [Your second Akka application, part 3: Device groups](tutorial_3.md) + * [Your second Akka application, part 4: Querying a group of devices](tutorial_4.md) @@@ \ No newline at end of file diff --git a/akka-docs-new/src/main/paradox/guide/tutorial_4.md b/akka-docs-new/src/main/paradox/guide/tutorial_4.md new file mode 100644 index 0000000000..04064e5b75 --- /dev/null +++ b/akka-docs-new/src/main/paradox/guide/tutorial_4.md @@ -0,0 +1,209 @@ +# Your second Akka application, part 4: Querying a group of devices + +The conversational patterns we have seen so far were simple in the sense that they required no or little state to be kept in the +actor that is only relevant to the conversation. Our device actors either simply returned a reading, which required no +state change, recorded a temperature, which was required an update of a single field, or in the most complex case, +managing groups and devices, we had to add or remove simple entries from a `Map`. + +In this chapter we will see a more complex example. Our goal is to add a new service to the group actor, one which +allows querying the temperature from all running devices. We need to first investigate how we want our query API to +behave. + +The very first problem we face is that the set of devices is dynamic, and each device is represented by an actor that +can stop at any time. At the beginning of the query, we need to ask all of the device actors for the current temperature +that we know about. However, during the lifecycle of the query + + * a device actor may stop and not respond with a temperature reading + * a new device actor might start up, but we missed asking it for the current temperature + +There are many approaches that can be taken to address these issues, but the important point is to settle on what is +the desired behavior. We will pick the following two guarantees: + + * when a query arrives to the group, the group actor takes a _snaphsot_ of the existing device actors and will only + ask those for the temperature. Actors that are started _after_ the arrival of the query are simply ignored. + * when an actor stops during the query without answering (i.e. before all the actors we asked for the temperature + responded) we simply report back the fact to the sender of the query message + +Apart from device actors coming and going dynamically some actors might take a long time to answer, for example because +they are stuck in an accidental infinite loop, or because they failed due to a bug and dropped our request. Ideally, +we would like to give a deadline to our query: + + * the query is considered completed if either all actors have responded (or confirmed being stopped), or we reach + the deadline + +Given these decisions, and the fact that a device might not have a temperature to record, we can define four states +that each device can be in, according to the query: + + * It has a temperature available: `Temperature(value)` + * It has responded, but has no temperature available yet: `TemperatureNotAvailable` + * It has stopped before answering: `DeviceNotAvailable` + * It did not respond before the deadline: `DeviceTimedOut` + +Summarizing these in message types we can add the following to `DeviceGroup`: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroup.scala) { #query-protocol } + +## Implementing the query + +One of the approaches could be for implementing the query is to add more code to the group actor. While this is +possible, in practice this can be very cumbersome and error prone. When we start a query, we need to take a snapshot +of the devices present at the start of the query and start a timer so that we can enforce the deadline. Unfortunately, +during the time we execute a query _another query_ might just arrive. For this other query of course we need to keep +track of the exact same information, but isolated from the previous query. This complicates the code and also poses +some problems. For example, we would need a data structure that maps the `ActorRef`s of the devices to the queries +that use that device, so that they can be notified when such a device terminates, i.e. a `Terminated` message is +received. + +There is a much simpler approach that is superior in every way, and it is the one we will implement. We will create +an actor that represents a _single query_ and which performs the tasks needed to complete the query in behalf of the +group actor. So far we have created actors that belonged to classical domain objects, but now, we will create an +actor that represents a process or task rather than an entity. This move keeps our group actor simple and gives +us better ways to test the query capability in isolation. + +First, we need to design the lifecycle of our query actors. This consists of identifying its initial state, then +the first action to be taken by the actor, then, the cleanup if necessary. There are a few things the query should +need to be able to work: + + * The snapshot of active device actors to query, and their IDs + * The requestID of the request that started the query (so we can include it in the reply) + * The `ActorRef` of the actor who sent the group actor the query. We will send the reply to this actor directly. + * A timeout parameter, how long the query should wait for replies. Keeping this as a parameter will simplify testing. + +Since we need to have a deadline until we are willing to wait for responses, we will need a new feature that we have +not used yet: timers. Akka has a built-in scheduler facility for this exact purpose. Using it is simple, the +`scheduler.scheduleOnce(time, actorRef, message)` method will schedule the message `message` into the future by the +specified `time` and send it to the actor `actorRef`. To implement our query timeout we need to create the message +that represents that the deadline is due, we create a simple message `CollectionTimeout` without any parameters for +this purpose. The return value from `scheduleOnce` is a `Cancellable` which will be useful to cancel the timer +if the query finishes successfully in time. Getting the scheduler is possible from the `ActorSystem`, which, in turn, +is accessible from the actor's context: `context.system.scheduler`. This needs an implicit `ExecutionContext` which +is basically the thread-pool that will execute the timer task itself. In our case we can just use the default dispatcher +which can be brought into scope by `import context.dispatcher`. + +At the start of the query, we need to ask each of the device actors for the current temperature. To be able to quickly +detect devices that stopped before they got the `ReadTemperature` message we will also watch each of the actors. This +way, we get `Terminated` messages for those that stop during the lifetime of the query, so we don't need to wait +until the timeout to mark these as not available. + +Putting together all these, the outline of our actor looks like this: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuery.scala) { #query-outline } + +The query (apart from the pending timer) has one stateful aspect about it: the actors that did not answer so far (or, +looking from the other way around, the set of actors that have replied or stopped). One way to track this state is +to create a mutable field in the actor (a `var`). There is another approach. It is also possible to change how +the actor responds to messages. By default, the `receive` block defines the behavior of the actor, but it is possible +to change it (even several times) during the life of the actor. This is possible by calling `context.become(newBehavior)` +where `newBehavior` is anything with type `Receive` (which is just a shorthand for `PartialFunction[Any, Unit]`). A +`Receive` is just a function (or an object, if you like) it can be returned from a function. We will leverage this +to track the state of our actor. + +As the first step, instead of defining `receive` directly, we delegate to another function to create the `Receive`, which +we will call `waitingForReplies`. This will keep track of two changing values, a `Map` of already received replies +and a `Set` of actors that we still wait on. We have three events that we should act on. We can receive a +`RespondTemperature` message from one of the devices. Second, we can receive a `Terminated` message for a device actor +that has been stopped in the meantime. Finally, we can reach the deadline and receive a `CollectionTimeout`. In the +first two cases, we need to keep track of the replies, which we now simply delegate to a method `receivedResponse` which +we will discuss later. In the case of timeout, we need to simply take all the actors that has not yet replied yet +(the members of the set `stillWaiting`) and put a `DeviceTimedOut` as the status in the final reply. Then we +reply to the submitter of the query with the collected results and stop the query actor: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuery.scala) { #query-state } + +What is not yet clear, how we will "mutate" the `answersSoFar` and `stillWaiting` data structures. One important +thing to note is that the function `waitingForReplies` **does not handle the messages directly. It returns a `Receive` +function that will handle the messages**. This means that if we call `waitingForReplies` again, with different parameters, +then it returns a brand new `Receive` that will use those new parameters. We have seen how we +can install the initial `Receive` by simply returning it from `receive`. In order to install a new one, to record a +new reply for example, we need some mechanism. This mechanism is the method `context.becdome(newReceive)` which will +_change_ the actor's message handling function to the provided `newReceive` function. You can imagine that before +starting, your actor automatically calls `context.become(receive)`, i.e. installing the `Receive` function that +is returned from `receive`. This is another important observation: **it is not `receive` that handles the messages, +it just returns a `Receive` function that will actually handle the messages***. + +We now have to figure out what to do in `receivedResponse()`. First, we need to record the new result in the map +`repliesSoFar` and remove the actor from `stillWaiting`. Then, we need to check if there is any remaining actors +that we are waiting for. If there is none, we can send the result of the query to the original requester, and stop +the query actor. Otherwise, we need to update the `repliesSoFar` and `stillWaiting` structures and wait for more +messages. + +In the code before, we treated `Terminated` as the implicit response `DeviceNotAvailable`, so `receivedResponse` does +not need to do anything special. There is one small task we still need to do. It is possible that we receive a proper +response from a device actor, but then it stops during the lifetime of the query. We don't want this second event +overwrite the already received reply. In other words, we don't want to receive `Terminated` after we recorded the +response. This is simple to achieve by calling `context.unwatch(ref)`. This method also ensures that we don't +receive `Terminated` events that are already in the mailbox of the actor. It is also safe to call this multiple times, +only the first call will have any effect, the rest is simply ignored. + +With all this knowledge, we can create the `receivedResponse` method: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuery.scala) { #query-collect-reply } + +It is quite natural to ask at this point, what have we gained by using the `context.become()` trick instead of +just making the `repliesSoFar` and `stillWaiting` structures mutable fields of the actor (i.e. `var`s)? In this +simple example, not that much. The value of this style of state keeping becomes more evident when you suddenly have +_more kinds_ of states. For example imagine that the query have multiple phases that come each other. Since each phase +might have temporary data that is relevant only to that phase, keeping these as fields would pollute the global state +of the actor where it is not clear which field is used or ignored in which state. Using parametrized `Receive` "factory" +methods we can keep data that is only relevant to the state private to the state. It is still a good exercise to +rewrite the query using `var`s instead of `context.become()`. In general, it is a good practice to get comfortable +with the solution we have used here as it helps structuring more complex actor in a cleaner and more maintainable way. + +Or query actor is now done: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuery.scala) { #query-full } + +## Testing + +It is time to test if the query actor is correct. There are various scenarios we need to test individually to make +sure everything works as expected. To be able to do this, we need to simulate the device actors somehow to exercise +various normal or failure scenarios. Thankfully we took the list of collaborators (actually a `Map`) as a parameter +to the query actor, so we can easily pass in `TestProbe` references. In our first test, we try out the case when +there are two devices and both report a temperature: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuerySpec.scala) { #query-test-normal } + +That was the happy case, but we know that sometimes devices cannot provide a temperature measurement yet. This +scenario is just slightly different from the previous: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuerySpec.scala) { #query-test-no-reading } + +We also know, that sometimes device actors stop before answering: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuerySpec.scala) { #query-test-stopped } + +If you remember, there is another case related to device actors stopping. It is possible that we get a normal reply +from a device actor, but then receive a `Terminated` for the same actor later. In this case we would like to keep +the first reply and not mark the device as `DeviceNotAvailable`. We should test this, too: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuerySpec.scala) { #query-test-stopped-later } + +The final case is when not all devices respond in time. To keep our test relatively fast, we will construct the +`DeviceGroupQuery` actor with a smaller timeout: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuerySpec.scala) { #query-test-timeout } + +Our query works as expected now, it is time to include this new functionality in the `DeviceGroup` actor now. + +## Adding the query capability to the group + +Including the query feature in the group actor is fairly simple now. We did all the heavylifting in the query actor +itself, the group actor only needs to create it with the right initial parameters and nothing else. + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroup.scala) { #query-added } + +It is probably worth to reiterate what we said in the beginning of the chapter. By keeping all the temporary state +that was only relevant to the query itself in a separate actor we kept the group actor very simple. It delegates +everything to child actors that needs temporary state not relevant to its main business. Also, multiple queries can +now run parallel to each other, as many as needed. In our case querying the individual device actors is fast, but +were this not the case, for example because the remote sensors need to be contacted over the network, this formulation +would significantly improve throughput. + +We close this chapter by testing that everything works together. This test is just a variant of the previous ones, +now exercising the group query feature: + +@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupSpec.scala) { #group-query-integration-test } + + + + diff --git a/akka-docs-new/src/test/scala/tutorial_4/Device.scala b/akka-docs-new/src/test/scala/tutorial_4/Device.scala new file mode 100644 index 0000000000..a379a4a6d6 --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_4/Device.scala @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package tutorial_4 + +import akka.actor.{ Actor, ActorLogging, Props } +import tutorial_4.Device.{ ReadTemperature, RecordTemperature, RespondTemperature, TemperatureRecorded } +import tutorial_4.DeviceManager.{ DeviceRegistered, RequestTrackDevice } + +object Device { + + def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId)) + + final case class RecordTemperature(requestId: Long, value: Double) + final case class TemperatureRecorded(requestId: Long) + + final case class ReadTemperature(requestId: Long) + final case class RespondTemperature(requestId: Long, value: Option[Double]) +} + +class Device(groupId: String, deviceId: String) extends Actor with ActorLogging { + var lastTemperatureReading: Option[Double] = None + + override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId) + + override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId) + + override def receive: Receive = { + case RequestTrackDevice(`groupId`, `deviceId`) => + sender() ! DeviceRegistered + + case RequestTrackDevice(groupId, deviceId) => + log.warning( + "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", + groupId, deviceId, this.groupId, this.deviceId) + + case RecordTemperature(id, value) => + log.info("Recorded temperature reading {} with {}", value, id) + lastTemperatureReading = Some(value) + sender() ! TemperatureRecorded(id) + + case ReadTemperature(id) => + sender() ! RespondTemperature(id, lastTemperatureReading) + } +} diff --git a/akka-docs-new/src/test/scala/tutorial_4/DeviceGroup.scala b/akka-docs-new/src/test/scala/tutorial_4/DeviceGroup.scala new file mode 100644 index 0000000000..b89b490f59 --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_4/DeviceGroup.scala @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package tutorial_4 + +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } +import tutorial_4.DeviceGroup._ +import tutorial_4.DeviceManager.RequestTrackDevice + +import scala.concurrent.duration._ + +//#query-protocol +object DeviceGroup { + //#query-protocol + def props(groupId: String): Props = Props(new DeviceGroup(groupId)) + + final case class RequestDeviceList(requestId: Long) + final case class ReplyDeviceList(requestId: Long, ids: Set[String]) + + //#query-protocol + // ... earlier message types not shown + final case class RequestAllTemperatures(requestId: Long) + final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading]) + + sealed trait TemperatureReading + final case class Temperature(value: Double) extends TemperatureReading + case object TemperatureNotAvailable extends TemperatureReading + case object DeviceNotAvailable extends TemperatureReading + case object DeviceTimedOut extends TemperatureReading +} +//#query-protocol + +//#query-added +class DeviceGroup(groupId: String) extends Actor with ActorLogging { + var deviceIdToActor = Map.empty[String, ActorRef] + var actorToDeviceId = Map.empty[ActorRef, String] + var nextCollectionId = 0L + + override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) + + override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) + + override def receive: Receive = { + //#query-added + case trackMsg @ RequestTrackDevice(`groupId`, _) => + deviceIdToActor.get(trackMsg.deviceId) match { + case Some(ref) => + ref forward trackMsg + case None => + log.info("Creating device actor for {}", trackMsg.deviceId) + val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId) + context.watch(deviceActor) + deviceActor forward trackMsg + deviceIdToActor += trackMsg.deviceId -> deviceActor + actorToDeviceId += deviceActor -> trackMsg.deviceId + } + + case RequestTrackDevice(groupId, deviceId) => + log.warning( + "Ignoring TrackDevice request for {}. This actor is responsible for {}.", + groupId, this.groupId) + + case RequestDeviceList(requestId) => + sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet) + + case Terminated(deviceActor) => + val deviceId = actorToDeviceId(deviceActor) + log.info("Device actor for {} has been terminated", deviceId) + actorToDeviceId -= deviceActor + deviceIdToActor -= deviceId + + //#query-added + // ... other cases omitted + + case RequestAllTemperatures(requestId) => + context.actorOf(DeviceGroupQuery.props( + actorToDeviceId = actorToDeviceId, + requestId = requestId, + requester = sender(), + 3.seconds + )) + } + +} +//#query-added \ No newline at end of file diff --git a/akka-docs-new/src/test/scala/tutorial_4/DeviceGroupQuery.scala b/akka-docs-new/src/test/scala/tutorial_4/DeviceGroupQuery.scala new file mode 100644 index 0000000000..508ab8c692 --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_4/DeviceGroupQuery.scala @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package tutorial_4 + +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } + +import scala.concurrent.duration._ + +//#query-full +//#query-outline +object DeviceGroupQuery { + case object CollectionTimeout + + def props( + actorToDeviceId: Map[ActorRef, String], + requestId: Long, + requester: ActorRef, + timeout: FiniteDuration + ): Props = { + Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout)) + } +} + +class DeviceGroupQuery( + actorToDeviceId: Map[ActorRef, String], + requestId: Long, + requester: ActorRef, + timeout: FiniteDuration +) extends Actor with ActorLogging { + import DeviceGroupQuery._ + import context.dispatcher + val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout) + + override def preStart(): Unit = { + actorToDeviceId.keysIterator.foreach { deviceActor => + context.watch(deviceActor) + deviceActor ! Device.ReadTemperature(0) + } + + } + + override def postStop(): Unit = { + queryTimeoutTimer.cancel() + } + + //#query-outline + //#query-state + override def receive: Receive = + waitingForReplies( + Map.empty, + actorToDeviceId.keySet + ) + + def waitingForReplies( + repliesSoFar: Map[String, DeviceGroup.TemperatureReading], + stillWaiting: Set[ActorRef] + ): Receive = { + case Device.RespondTemperature(0, valueOption) => + val deviceActor = sender() + val reading = valueOption match { + case Some(value) => DeviceGroup.Temperature(value) + case None => DeviceGroup.TemperatureNotAvailable + } + receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar) + + case Terminated(deviceActor) => + receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar) + + case CollectionTimeout => + val timedOutReplies = + stillWaiting.map { deviceActor => + val deviceId = actorToDeviceId(deviceActor) + deviceId -> DeviceGroup.DeviceTimedOut + } + requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies) + context.stop(self) + } + //#query-state + + //#query-collect-reply + def receivedResponse( + deviceActor: ActorRef, + reading: DeviceGroup.TemperatureReading, + stillWaiting: Set[ActorRef], + repliesSoFar: Map[String, DeviceGroup.TemperatureReading] + ): Unit = { + context.unwatch(deviceActor) + val deviceId = actorToDeviceId(deviceActor) + val newStillWaiting = stillWaiting - deviceActor + + val newRepliesSoFar = repliesSoFar + (deviceId -> reading) + if (newStillWaiting.isEmpty) { + requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar) + context.stop(self) + } else { + context.become(waitingForReplies(newRepliesSoFar, newStillWaiting)) + } + } + //#query-collect-reply + + //#query-outline +} +//#query-outline +//#query-full \ No newline at end of file diff --git a/akka-docs-new/src/test/scala/tutorial_4/DeviceGroupQuerySpec.scala b/akka-docs-new/src/test/scala/tutorial_4/DeviceGroupQuerySpec.scala new file mode 100644 index 0000000000..2ee67d2fc5 --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_4/DeviceGroupQuerySpec.scala @@ -0,0 +1,167 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package tutorial_4 + +import akka.actor.PoisonPill +import akka.testkit.{ AkkaSpec, TestProbe } + +import scala.concurrent.duration._ + +class DeviceGroupQuerySpec extends AkkaSpec { + + "DeviceGroupQuery" must { + + //#query-test-normal + "return temperature value for working devices" in { + val requester = TestProbe() + + val device1 = TestProbe() + val device2 = TestProbe() + + val queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), + requestId = 1, + requester = requester.ref, + timeout = 3.seconds + )) + + device1.expectMsg(Device.ReadTemperature(requestId = 0)) + device2.expectMsg(Device.ReadTemperature(requestId = 0)) + + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) + + requester.expectMsg(DeviceGroup.RespondAllTemperatures( + requestId = 1, + temperatures = Map( + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.Temperature(2.0) + ) + )) + } + //#query-test-normal + + //#query-test-no-reading + "return TemperatureNotAvailable for devices with no readings" in { + val requester = TestProbe() + + val device1 = TestProbe() + val device2 = TestProbe() + + val queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), + requestId = 1, + requester = requester.ref, + timeout = 3.seconds + )) + + device1.expectMsg(Device.ReadTemperature(requestId = 0)) + device2.expectMsg(Device.ReadTemperature(requestId = 0)) + + queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref) + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) + + requester.expectMsg(DeviceGroup.RespondAllTemperatures( + requestId = 1, + temperatures = Map( + "device1" -> DeviceGroup.TemperatureNotAvailable, + "device2" -> DeviceGroup.Temperature(2.0) + ) + )) + } + //#query-test-no-reading + + //#query-test-stopped + "return DeviceNotAvailable if device stops before answering" in { + val requester = TestProbe() + + val device1 = TestProbe() + val device2 = TestProbe() + + val queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), + requestId = 1, + requester = requester.ref, + timeout = 3.seconds + )) + + device1.expectMsg(Device.ReadTemperature(requestId = 0)) + device2.expectMsg(Device.ReadTemperature(requestId = 0)) + + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) + device2.ref ! PoisonPill + + requester.expectMsg(DeviceGroup.RespondAllTemperatures( + requestId = 1, + temperatures = Map( + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.DeviceNotAvailable + ) + )) + } + //#query-test-stopped + + //#query-test-stopped-later + "return temperature reading even if device stops after answering" in { + val requester = TestProbe() + + val device1 = TestProbe() + val device2 = TestProbe() + + val queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), + requestId = 1, + requester = requester.ref, + timeout = 3.seconds + )) + + device1.expectMsg(Device.ReadTemperature(requestId = 0)) + device2.expectMsg(Device.ReadTemperature(requestId = 0)) + + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref) + device2.ref ! PoisonPill + + requester.expectMsg(DeviceGroup.RespondAllTemperatures( + requestId = 1, + temperatures = Map( + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.Temperature(2.0) + ) + )) + } + //#query-test-stopped-later + + //#query-test-timeout + "return DeviceTimedOut if device does not answer in time" in { + val requester = TestProbe() + + val device1 = TestProbe() + val device2 = TestProbe() + + val queryActor = system.actorOf(DeviceGroupQuery.props( + actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"), + requestId = 1, + requester = requester.ref, + timeout = 1.second + )) + + device1.expectMsg(Device.ReadTemperature(requestId = 0)) + device2.expectMsg(Device.ReadTemperature(requestId = 0)) + + queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref) + + requester.expectMsg(DeviceGroup.RespondAllTemperatures( + requestId = 1, + temperatures = Map( + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.DeviceTimedOut + ) + )) + } + //#query-test-timeout + + } + +} diff --git a/akka-docs-new/src/test/scala/tutorial_4/DeviceGroupSpec.scala b/akka-docs-new/src/test/scala/tutorial_4/DeviceGroupSpec.scala new file mode 100644 index 0000000000..316e372433 --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_4/DeviceGroupSpec.scala @@ -0,0 +1,135 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package tutorial_4 + +import akka.actor.PoisonPill +import akka.testkit.{ AkkaSpec, TestProbe } + +import scala.concurrent.duration._ + +class DeviceGroupSpec extends AkkaSpec { + + "DeviceGroup actor" must { + + "be able to register a device actor" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor1 = probe.lastSender + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor2 = probe.lastSender + deviceActor1 should !==(deviceActor2) + + // Check that the device actors are working + deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) + probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) + deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) + probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) + } + + "ignore requests for wrong groupId" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref) + probe.expectNoMsg(500.milliseconds) + } + + "return same actor for same deviceId" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor1 = probe.lastSender + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor2 = probe.lastSender + + deviceActor1 should ===(deviceActor2) + } + + "be able to list active devices" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + + groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) + probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) + } + + "be able to list active devices after one shuts down" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val toShutDown = probe.lastSender + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + + groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) + probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) + + probe.watch(toShutDown) + toShutDown ! PoisonPill + probe.expectTerminated(toShutDown) + + groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref) + probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2"))) + } + + //#group-query-integration-test + "be able to collect temperatures from all active devices" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor1 = probe.lastSender + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor2 = probe.lastSender + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor3 = probe.lastSender + + // Check that the device actors are working + deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) + probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) + deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) + probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) + // No temperature for device3 + + groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref) + probe.expectMsg( + DeviceGroup.RespondAllTemperatures( + requestId = 0, + temperatures = Map( + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.Temperature(2.0), + "device3" -> DeviceGroup.TemperatureNotAvailable + ) + ) + ) + } + //#group-query-integration-test + + } + +} diff --git a/akka-docs-new/src/test/scala/tutorial_4/DeviceManager.scala b/akka-docs-new/src/test/scala/tutorial_4/DeviceManager.scala new file mode 100644 index 0000000000..5d4caf16ae --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_4/DeviceManager.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package tutorial_4 + +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } +import tutorial_4.DeviceManager.RequestTrackDevice + +object DeviceManager { + def props(): Props = Props(new DeviceManager) + + final case class RequestTrackDevice(groupId: String, deviceId: String) + case object DeviceRegistered +} + +class DeviceManager extends Actor with ActorLogging { + var groupIdToActor = Map.empty[String, ActorRef] + var actorToGroupId = Map.empty[ActorRef, String] + + override def preStart(): Unit = log.info("DeviceManager started") + + override def postStop(): Unit = log.info("DeviceManager stopped") + + override def receive = { + case trackMsg @ RequestTrackDevice(groupId, _) => + groupIdToActor.get(groupId) match { + case Some(ref) => + ref forward trackMsg + case None => + log.info("Creating device group actor for {}", groupId) + val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId) + context.watch(groupActor) + groupActor forward trackMsg + groupIdToActor += groupId -> groupActor + actorToGroupId += groupActor -> groupId + } + + case Terminated(groupActor) => + val groupId = actorToGroupId(groupActor) + log.info("Device group actor for {} has been terminated", groupId) + actorToGroupId -= groupActor + groupIdToActor -= groupId + + } + +}