Tutorial pt4

This commit is contained in:
Endre Sándor Varga 2017-03-31 11:09:48 +02:00
parent d90f73e45b
commit 5a0b7b2c61
8 changed files with 794 additions and 0 deletions

View file

@ -9,5 +9,6 @@
* [Your second Akka application, part 1: Top-level architecture](tutorial_1.md)
* [Your second Akka application, part 2: The Device actor](tutorial_2.md)
* [Your second Akka application, part 3: Device groups](tutorial_3.md)
* [Your second Akka application, part 4: Querying a group of devices](tutorial_4.md)
@@@

View file

@ -0,0 +1,209 @@
# Your second Akka application, part 4: Querying a group of devices
The conversational patterns we have seen so far were simple in the sense that they required no or little state to be kept in the
actor that is only relevant to the conversation. Our device actors either simply returned a reading, which required no
state change, recorded a temperature, which was required an update of a single field, or in the most complex case,
managing groups and devices, we had to add or remove simple entries from a `Map`.
In this chapter we will see a more complex example. Our goal is to add a new service to the group actor, one which
allows querying the temperature from all running devices. We need to first investigate how we want our query API to
behave.
The very first problem we face is that the set of devices is dynamic, and each device is represented by an actor that
can stop at any time. At the beginning of the query, we need to ask all of the device actors for the current temperature
that we know about. However, during the lifecycle of the query
* a device actor may stop and not respond with a temperature reading
* a new device actor might start up, but we missed asking it for the current temperature
There are many approaches that can be taken to address these issues, but the important point is to settle on what is
the desired behavior. We will pick the following two guarantees:
* when a query arrives to the group, the group actor takes a _snaphsot_ of the existing device actors and will only
ask those for the temperature. Actors that are started _after_ the arrival of the query are simply ignored.
* when an actor stops during the query without answering (i.e. before all the actors we asked for the temperature
responded) we simply report back the fact to the sender of the query message
Apart from device actors coming and going dynamically some actors might take a long time to answer, for example because
they are stuck in an accidental infinite loop, or because they failed due to a bug and dropped our request. Ideally,
we would like to give a deadline to our query:
* the query is considered completed if either all actors have responded (or confirmed being stopped), or we reach
the deadline
Given these decisions, and the fact that a device might not have a temperature to record, we can define four states
that each device can be in, according to the query:
* It has a temperature available: `Temperature(value)`
* It has responded, but has no temperature available yet: `TemperatureNotAvailable`
* It has stopped before answering: `DeviceNotAvailable`
* It did not respond before the deadline: `DeviceTimedOut`
Summarizing these in message types we can add the following to `DeviceGroup`:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroup.scala) { #query-protocol }
## Implementing the query
One of the approaches could be for implementing the query is to add more code to the group actor. While this is
possible, in practice this can be very cumbersome and error prone. When we start a query, we need to take a snapshot
of the devices present at the start of the query and start a timer so that we can enforce the deadline. Unfortunately,
during the time we execute a query _another query_ might just arrive. For this other query of course we need to keep
track of the exact same information, but isolated from the previous query. This complicates the code and also poses
some problems. For example, we would need a data structure that maps the `ActorRef`s of the devices to the queries
that use that device, so that they can be notified when such a device terminates, i.e. a `Terminated` message is
received.
There is a much simpler approach that is superior in every way, and it is the one we will implement. We will create
an actor that represents a _single query_ and which performs the tasks needed to complete the query in behalf of the
group actor. So far we have created actors that belonged to classical domain objects, but now, we will create an
actor that represents a process or task rather than an entity. This move keeps our group actor simple and gives
us better ways to test the query capability in isolation.
First, we need to design the lifecycle of our query actors. This consists of identifying its initial state, then
the first action to be taken by the actor, then, the cleanup if necessary. There are a few things the query should
need to be able to work:
* The snapshot of active device actors to query, and their IDs
* The requestID of the request that started the query (so we can include it in the reply)
* The `ActorRef` of the actor who sent the group actor the query. We will send the reply to this actor directly.
* A timeout parameter, how long the query should wait for replies. Keeping this as a parameter will simplify testing.
Since we need to have a deadline until we are willing to wait for responses, we will need a new feature that we have
not used yet: timers. Akka has a built-in scheduler facility for this exact purpose. Using it is simple, the
`scheduler.scheduleOnce(time, actorRef, message)` method will schedule the message `message` into the future by the
specified `time` and send it to the actor `actorRef`. To implement our query timeout we need to create the message
that represents that the deadline is due, we create a simple message `CollectionTimeout` without any parameters for
this purpose. The return value from `scheduleOnce` is a `Cancellable` which will be useful to cancel the timer
if the query finishes successfully in time. Getting the scheduler is possible from the `ActorSystem`, which, in turn,
is accessible from the actor's context: `context.system.scheduler`. This needs an implicit `ExecutionContext` which
is basically the thread-pool that will execute the timer task itself. In our case we can just use the default dispatcher
which can be brought into scope by `import context.dispatcher`.
At the start of the query, we need to ask each of the device actors for the current temperature. To be able to quickly
detect devices that stopped before they got the `ReadTemperature` message we will also watch each of the actors. This
way, we get `Terminated` messages for those that stop during the lifetime of the query, so we don't need to wait
until the timeout to mark these as not available.
Putting together all these, the outline of our actor looks like this:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuery.scala) { #query-outline }
The query (apart from the pending timer) has one stateful aspect about it: the actors that did not answer so far (or,
looking from the other way around, the set of actors that have replied or stopped). One way to track this state is
to create a mutable field in the actor (a `var`). There is another approach. It is also possible to change how
the actor responds to messages. By default, the `receive` block defines the behavior of the actor, but it is possible
to change it (even several times) during the life of the actor. This is possible by calling `context.become(newBehavior)`
where `newBehavior` is anything with type `Receive` (which is just a shorthand for `PartialFunction[Any, Unit]`). A
`Receive` is just a function (or an object, if you like) it can be returned from a function. We will leverage this
to track the state of our actor.
As the first step, instead of defining `receive` directly, we delegate to another function to create the `Receive`, which
we will call `waitingForReplies`. This will keep track of two changing values, a `Map` of already received replies
and a `Set` of actors that we still wait on. We have three events that we should act on. We can receive a
`RespondTemperature` message from one of the devices. Second, we can receive a `Terminated` message for a device actor
that has been stopped in the meantime. Finally, we can reach the deadline and receive a `CollectionTimeout`. In the
first two cases, we need to keep track of the replies, which we now simply delegate to a method `receivedResponse` which
we will discuss later. In the case of timeout, we need to simply take all the actors that has not yet replied yet
(the members of the set `stillWaiting`) and put a `DeviceTimedOut` as the status in the final reply. Then we
reply to the submitter of the query with the collected results and stop the query actor:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuery.scala) { #query-state }
What is not yet clear, how we will "mutate" the `answersSoFar` and `stillWaiting` data structures. One important
thing to note is that the function `waitingForReplies` **does not handle the messages directly. It returns a `Receive`
function that will handle the messages**. This means that if we call `waitingForReplies` again, with different parameters,
then it returns a brand new `Receive` that will use those new parameters. We have seen how we
can install the initial `Receive` by simply returning it from `receive`. In order to install a new one, to record a
new reply for example, we need some mechanism. This mechanism is the method `context.becdome(newReceive)` which will
_change_ the actor's message handling function to the provided `newReceive` function. You can imagine that before
starting, your actor automatically calls `context.become(receive)`, i.e. installing the `Receive` function that
is returned from `receive`. This is another important observation: **it is not `receive` that handles the messages,
it just returns a `Receive` function that will actually handle the messages***.
We now have to figure out what to do in `receivedResponse()`. First, we need to record the new result in the map
`repliesSoFar` and remove the actor from `stillWaiting`. Then, we need to check if there is any remaining actors
that we are waiting for. If there is none, we can send the result of the query to the original requester, and stop
the query actor. Otherwise, we need to update the `repliesSoFar` and `stillWaiting` structures and wait for more
messages.
In the code before, we treated `Terminated` as the implicit response `DeviceNotAvailable`, so `receivedResponse` does
not need to do anything special. There is one small task we still need to do. It is possible that we receive a proper
response from a device actor, but then it stops during the lifetime of the query. We don't want this second event
overwrite the already received reply. In other words, we don't want to receive `Terminated` after we recorded the
response. This is simple to achieve by calling `context.unwatch(ref)`. This method also ensures that we don't
receive `Terminated` events that are already in the mailbox of the actor. It is also safe to call this multiple times,
only the first call will have any effect, the rest is simply ignored.
With all this knowledge, we can create the `receivedResponse` method:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuery.scala) { #query-collect-reply }
It is quite natural to ask at this point, what have we gained by using the `context.become()` trick instead of
just making the `repliesSoFar` and `stillWaiting` structures mutable fields of the actor (i.e. `var`s)? In this
simple example, not that much. The value of this style of state keeping becomes more evident when you suddenly have
_more kinds_ of states. For example imagine that the query have multiple phases that come each other. Since each phase
might have temporary data that is relevant only to that phase, keeping these as fields would pollute the global state
of the actor where it is not clear which field is used or ignored in which state. Using parametrized `Receive` "factory"
methods we can keep data that is only relevant to the state private to the state. It is still a good exercise to
rewrite the query using `var`s instead of `context.become()`. In general, it is a good practice to get comfortable
with the solution we have used here as it helps structuring more complex actor in a cleaner and more maintainable way.
Or query actor is now done:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuery.scala) { #query-full }
## Testing
It is time to test if the query actor is correct. There are various scenarios we need to test individually to make
sure everything works as expected. To be able to do this, we need to simulate the device actors somehow to exercise
various normal or failure scenarios. Thankfully we took the list of collaborators (actually a `Map`) as a parameter
to the query actor, so we can easily pass in `TestProbe` references. In our first test, we try out the case when
there are two devices and both report a temperature:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuerySpec.scala) { #query-test-normal }
That was the happy case, but we know that sometimes devices cannot provide a temperature measurement yet. This
scenario is just slightly different from the previous:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuerySpec.scala) { #query-test-no-reading }
We also know, that sometimes device actors stop before answering:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuerySpec.scala) { #query-test-stopped }
If you remember, there is another case related to device actors stopping. It is possible that we get a normal reply
from a device actor, but then receive a `Terminated` for the same actor later. In this case we would like to keep
the first reply and not mark the device as `DeviceNotAvailable`. We should test this, too:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuerySpec.scala) { #query-test-stopped-later }
The final case is when not all devices respond in time. To keep our test relatively fast, we will construct the
`DeviceGroupQuery` actor with a smaller timeout:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupQuerySpec.scala) { #query-test-timeout }
Our query works as expected now, it is time to include this new functionality in the `DeviceGroup` actor now.
## Adding the query capability to the group
Including the query feature in the group actor is fairly simple now. We did all the heavylifting in the query actor
itself, the group actor only needs to create it with the right initial parameters and nothing else.
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroup.scala) { #query-added }
It is probably worth to reiterate what we said in the beginning of the chapter. By keeping all the temporary state
that was only relevant to the query itself in a separate actor we kept the group actor very simple. It delegates
everything to child actors that needs temporary state not relevant to its main business. Also, multiple queries can
now run parallel to each other, as many as needed. In our case querying the individual device actors is fast, but
were this not the case, for example because the remote sensors need to be contacted over the network, this formulation
would significantly improve throughput.
We close this chapter by testing that everything works together. This test is just a variant of the previous ones,
now exercising the group query feature:
@@snip [Hello.scala](../../../test/scala/tutorial_4/DeviceGroupSpec.scala) { #group-query-integration-test }

View file

@ -0,0 +1,45 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package tutorial_4
import akka.actor.{ Actor, ActorLogging, Props }
import tutorial_4.Device.{ ReadTemperature, RecordTemperature, RespondTemperature, TemperatureRecorded }
import tutorial_4.DeviceManager.{ DeviceRegistered, RequestTrackDevice }
object Device {
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
final case class RecordTemperature(requestId: Long, value: Double)
final case class TemperatureRecorded(requestId: Long)
final case class ReadTemperature(requestId: Long)
final case class RespondTemperature(requestId: Long, value: Option[Double])
}
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
var lastTemperatureReading: Option[Double] = None
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
override def receive: Receive = {
case RequestTrackDevice(`groupId`, `deviceId`) =>
sender() ! DeviceRegistered
case RequestTrackDevice(groupId, deviceId) =>
log.warning(
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
groupId, deviceId, this.groupId, this.deviceId)
case RecordTemperature(id, value) =>
log.info("Recorded temperature reading {} with {}", value, id)
lastTemperatureReading = Some(value)
sender() ! TemperatureRecorded(id)
case ReadTemperature(id) =>
sender() ! RespondTemperature(id, lastTemperatureReading)
}
}

View file

@ -0,0 +1,85 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package tutorial_4
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import tutorial_4.DeviceGroup._
import tutorial_4.DeviceManager.RequestTrackDevice
import scala.concurrent.duration._
//#query-protocol
object DeviceGroup {
//#query-protocol
def props(groupId: String): Props = Props(new DeviceGroup(groupId))
final case class RequestDeviceList(requestId: Long)
final case class ReplyDeviceList(requestId: Long, ids: Set[String])
//#query-protocol
// ... earlier message types not shown
final case class RequestAllTemperatures(requestId: Long)
final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading])
sealed trait TemperatureReading
final case class Temperature(value: Double) extends TemperatureReading
case object TemperatureNotAvailable extends TemperatureReading
case object DeviceNotAvailable extends TemperatureReading
case object DeviceTimedOut extends TemperatureReading
}
//#query-protocol
//#query-added
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
var deviceIdToActor = Map.empty[String, ActorRef]
var actorToDeviceId = Map.empty[ActorRef, String]
var nextCollectionId = 0L
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)
override def receive: Receive = {
//#query-added
case trackMsg @ RequestTrackDevice(`groupId`, _) =>
deviceIdToActor.get(trackMsg.deviceId) match {
case Some(ref) =>
ref forward trackMsg
case None =>
log.info("Creating device actor for {}", trackMsg.deviceId)
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId)
context.watch(deviceActor)
deviceActor forward trackMsg
deviceIdToActor += trackMsg.deviceId -> deviceActor
actorToDeviceId += deviceActor -> trackMsg.deviceId
}
case RequestTrackDevice(groupId, deviceId) =>
log.warning(
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId)
case RequestDeviceList(requestId) =>
sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
case Terminated(deviceActor) =>
val deviceId = actorToDeviceId(deviceActor)
log.info("Device actor for {} has been terminated", deviceId)
actorToDeviceId -= deviceActor
deviceIdToActor -= deviceId
//#query-added
// ... other cases omitted
case RequestAllTemperatures(requestId) =>
context.actorOf(DeviceGroupQuery.props(
actorToDeviceId = actorToDeviceId,
requestId = requestId,
requester = sender(),
3.seconds
))
}
}
//#query-added

View file

@ -0,0 +1,105 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package tutorial_4
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import scala.concurrent.duration._
//#query-full
//#query-outline
object DeviceGroupQuery {
case object CollectionTimeout
def props(
actorToDeviceId: Map[ActorRef, String],
requestId: Long,
requester: ActorRef,
timeout: FiniteDuration
): Props = {
Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout))
}
}
class DeviceGroupQuery(
actorToDeviceId: Map[ActorRef, String],
requestId: Long,
requester: ActorRef,
timeout: FiniteDuration
) extends Actor with ActorLogging {
import DeviceGroupQuery._
import context.dispatcher
val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout)
override def preStart(): Unit = {
actorToDeviceId.keysIterator.foreach { deviceActor =>
context.watch(deviceActor)
deviceActor ! Device.ReadTemperature(0)
}
}
override def postStop(): Unit = {
queryTimeoutTimer.cancel()
}
//#query-outline
//#query-state
override def receive: Receive =
waitingForReplies(
Map.empty,
actorToDeviceId.keySet
)
def waitingForReplies(
repliesSoFar: Map[String, DeviceGroup.TemperatureReading],
stillWaiting: Set[ActorRef]
): Receive = {
case Device.RespondTemperature(0, valueOption) =>
val deviceActor = sender()
val reading = valueOption match {
case Some(value) => DeviceGroup.Temperature(value)
case None => DeviceGroup.TemperatureNotAvailable
}
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar)
case Terminated(deviceActor) =>
receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar)
case CollectionTimeout =>
val timedOutReplies =
stillWaiting.map { deviceActor =>
val deviceId = actorToDeviceId(deviceActor)
deviceId -> DeviceGroup.DeviceTimedOut
}
requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies)
context.stop(self)
}
//#query-state
//#query-collect-reply
def receivedResponse(
deviceActor: ActorRef,
reading: DeviceGroup.TemperatureReading,
stillWaiting: Set[ActorRef],
repliesSoFar: Map[String, DeviceGroup.TemperatureReading]
): Unit = {
context.unwatch(deviceActor)
val deviceId = actorToDeviceId(deviceActor)
val newStillWaiting = stillWaiting - deviceActor
val newRepliesSoFar = repliesSoFar + (deviceId -> reading)
if (newStillWaiting.isEmpty) {
requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar)
context.stop(self)
} else {
context.become(waitingForReplies(newRepliesSoFar, newStillWaiting))
}
}
//#query-collect-reply
//#query-outline
}
//#query-outline
//#query-full

View file

@ -0,0 +1,167 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package tutorial_4
import akka.actor.PoisonPill
import akka.testkit.{ AkkaSpec, TestProbe }
import scala.concurrent.duration._
class DeviceGroupQuerySpec extends AkkaSpec {
"DeviceGroupQuery" must {
//#query-test-normal
"return temperature value for working devices" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
//#query-test-normal
//#query-test-no-reading
"return TemperatureNotAvailable for devices with no readings" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.TemperatureNotAvailable,
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
//#query-test-no-reading
//#query-test-stopped
"return DeviceNotAvailable if device stops before answering" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
device2.ref ! PoisonPill
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.DeviceNotAvailable
)
))
}
//#query-test-stopped
//#query-test-stopped-later
"return temperature reading even if device stops after answering" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
device2.ref ! PoisonPill
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
//#query-test-stopped-later
//#query-test-timeout
"return DeviceTimedOut if device does not answer in time" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 1.second
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.DeviceTimedOut
)
))
}
//#query-test-timeout
}
}

View file

@ -0,0 +1,135 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package tutorial_4
import akka.actor.PoisonPill
import akka.testkit.{ AkkaSpec, TestProbe }
import scala.concurrent.duration._
class DeviceGroupSpec extends AkkaSpec {
"DeviceGroup actor" must {
"be able to register a device actor" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor1 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor2 = probe.lastSender
deviceActor1 should !==(deviceActor2)
// Check that the device actors are working
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
}
"ignore requests for wrong groupId" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref)
probe.expectNoMsg(500.milliseconds)
}
"return same actor for same deviceId" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor1 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor2 = probe.lastSender
deviceActor1 should ===(deviceActor2)
}
"be able to list active devices" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
}
"be able to list active devices after one shuts down" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val toShutDown = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
probe.watch(toShutDown)
toShutDown ! PoisonPill
probe.expectTerminated(toShutDown)
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref)
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2")))
}
//#group-query-integration-test
"be able to collect temperatures from all active devices" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor1 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor2 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor3 = probe.lastSender
// Check that the device actors are working
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
// No temperature for device3
groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref)
probe.expectMsg(
DeviceGroup.RespondAllTemperatures(
requestId = 0,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0),
"device3" -> DeviceGroup.TemperatureNotAvailable
)
)
)
}
//#group-query-integration-test
}
}

View file

@ -0,0 +1,47 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package tutorial_4
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import tutorial_4.DeviceManager.RequestTrackDevice
object DeviceManager {
def props(): Props = Props(new DeviceManager)
final case class RequestTrackDevice(groupId: String, deviceId: String)
case object DeviceRegistered
}
class DeviceManager extends Actor with ActorLogging {
var groupIdToActor = Map.empty[String, ActorRef]
var actorToGroupId = Map.empty[ActorRef, String]
override def preStart(): Unit = log.info("DeviceManager started")
override def postStop(): Unit = log.info("DeviceManager stopped")
override def receive = {
case trackMsg @ RequestTrackDevice(groupId, _) =>
groupIdToActor.get(groupId) match {
case Some(ref) =>
ref forward trackMsg
case None =>
log.info("Creating device group actor for {}", groupId)
val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId)
context.watch(groupActor)
groupActor forward trackMsg
groupIdToActor += groupId -> groupActor
actorToGroupId += groupActor -> groupId
}
case Terminated(groupActor) =>
val groupId = actorToGroupId(groupActor)
log.info("Device group actor for {} has been terminated", groupId)
actorToGroupId -= groupActor
groupIdToActor -= groupId
}
}