From d90f73e45b8c7e009ff6c8c6283d8fa32399901b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Thu, 30 Mar 2017 11:42:52 +0200 Subject: [PATCH] Tutorial pt3 --- akka-docs-new/src/main/paradox/guide/index.md | 1 + .../src/main/paradox/guide/tutorial_3.md | 210 ++++++++++++++++++ .../test/scala/tutorial_1/IotSupervisor.scala | 2 +- .../src/test/scala/tutorial_2/Device.scala | 5 +- .../scala/tutorial_2/DeviceInProgress.scala | 10 +- .../test/scala/tutorial_2/DeviceSpec.scala | 2 +- .../src/test/scala/tutorial_3/Device.scala | 2 + .../test/scala/tutorial_3/DeviceGroup.scala | 43 ++-- .../scala/tutorial_3/DeviceGroupSpec.scala | 43 +--- .../test/scala/tutorial_3/DeviceManager.scala | 6 + .../test/scala/tutorial_3/DeviceSpec.scala | 2 + .../src/test/scala/tutorial_5/Device.scala | 45 ++++ .../test/scala/tutorial_5/DeviceGroup.scala | 75 +++++++ .../DeviceGroupQuery.scala | 2 +- .../DeviceGroupQuerySpec.scala | 2 +- .../scala/tutorial_5/DeviceGroupSpec.scala | 133 +++++++++++ .../test/scala/tutorial_5/DeviceManager.scala | 47 ++++ .../test/scala/tutorial_5/DeviceSpec.scala | 67 ++++++ .../{tutorial_3 => tutorial_5}/IotApp.scala | 4 +- .../IotSupervisor.scala | 2 +- 20 files changed, 629 insertions(+), 74 deletions(-) create mode 100644 akka-docs-new/src/main/paradox/guide/tutorial_3.md create mode 100644 akka-docs-new/src/test/scala/tutorial_5/Device.scala create mode 100644 akka-docs-new/src/test/scala/tutorial_5/DeviceGroup.scala rename akka-docs-new/src/test/scala/{tutorial_3 => tutorial_5}/DeviceGroupQuery.scala (99%) rename akka-docs-new/src/test/scala/{tutorial_3 => tutorial_5}/DeviceGroupQuerySpec.scala (99%) create mode 100644 akka-docs-new/src/test/scala/tutorial_5/DeviceGroupSpec.scala create mode 100644 akka-docs-new/src/test/scala/tutorial_5/DeviceManager.scala create mode 100644 akka-docs-new/src/test/scala/tutorial_5/DeviceSpec.scala rename akka-docs-new/src/test/scala/{tutorial_3 => tutorial_5}/IotApp.scala (88%) rename akka-docs-new/src/test/scala/{tutorial_3 => tutorial_5}/IotSupervisor.scala (96%) diff --git a/akka-docs-new/src/main/paradox/guide/index.md b/akka-docs-new/src/main/paradox/guide/index.md index bb5ec4817e..7bfbb36115 100644 --- a/akka-docs-new/src/main/paradox/guide/index.md +++ b/akka-docs-new/src/main/paradox/guide/index.md @@ -8,5 +8,6 @@ * [Your First Akka Application - Hello World](quickstart.md) * [Your second Akka application, part 1: Top-level architecture](tutorial_1.md) * [Your second Akka application, part 2: The Device actor](tutorial_2.md) + * [Your second Akka application, part 3: Device groups](tutorial_3.md) @@@ \ No newline at end of file diff --git a/akka-docs-new/src/main/paradox/guide/tutorial_3.md b/akka-docs-new/src/main/paradox/guide/tutorial_3.md new file mode 100644 index 0000000000..273f002ade --- /dev/null +++ b/akka-docs-new/src/main/paradox/guide/tutorial_3.md @@ -0,0 +1,210 @@ +# Your second Akka application, part 3: Device groups + +In this chapter we will integrate our device actors into a component that manages devices. When a new device comes +on-line, there is no actor representing it. We need to be able to ask the device manager component to create a new +device actor for us if necessary, in the required group (or return a reference to an already existing one). + +Since we keep our tutorial system to the bare minimum, we have no actual component that interfaces with the external +world via some networking protocol. For our exercise we will just create the API necessary to integrate with such +a component in the future. In a final system, the steps for connecting a device would look like this: + + 1. The device connects through some protocol to our system + 2. The component managing network connections accepts the connection + 3. The ID of the device and the ID of the group that it belongs is acquired + 4. The device manager component is asked to create a group and device actor for the given IDs (or return an existing + one) + 5. The device actor (just been created or located) responds with an acknowledgement, at the same time exposing its + ActorRef directly (by being the sender of the acknowledgement) + 6. The networking component now uses the ActorRef of the device directly, avoiding going through the component + +We are only concerned with steps 4 and 5 now. We will model the device manager component as an actor tree with three +levels: + +DEVICE_MANAGER_TREE_DIAGRAM + + * The top level is the supervisor actor representing the component. It is also the entry point to look up or create + group and device actors + * Group actors are supervisors of the devices belonging to the group. Groups both supervise the device actors and + also provide extra services, like querying the temperature readings from all the devices available + * Device actors manage all the interactions with the actual devices, storing temperature readings for example + +When designing actor systems one of the main problems is to decide on the granularity of the actors. For example, it +would be perfectly possible to have only a single actor maintaining all the groups, and devices in `HashMap`s for +example. It would be also reasonable to keep the groups as separate actors, but keep device state simply inside +the group actor. + +We chose this three-level architecture for the following reasons: + + * Having groups as separate actors + * allows us to isolate failures happening in a group. If a programmer error would + happen in the single actor that keeps all state, it would be all wiped out once that actor is restarted affecting + groups that are otherwise non-faulty. + * simplifies the problem of querying all the devices belonging to a group (since it only contains state related + to the given group) + * increases the parallelism of the system by allowing to query multiple groups concurrently. Since groups have + dedicated actors, all of them can run concurrently. + * Having devices as separate actors + * allows us to isolate failures happening in a device actor from the rest of the devices + * increases the parallelism of collecting temperature readings as actual network connections from different devices + can talk to the individual device actors directly, reducing contention points. + +In practice, this system can be organized in different ways, all dependent on the characteristics of the interactions +between actors. + +The following guidelines help to arrive at the right granularity + + * Prefer larger granularity to smaller. Introducing more fine-grained actors than needed causes more problems than + it solves + * Prefer finer granularity if it enables higher concurrency in the system + * Prefer finer granularity if actors need to handle complex conversations with other actors and hence have many + states. We will see a very good example for this in the next chapter. + * Prefer finer granularity if there is too many state to keep around in one place compared to dividing into smaller + actors. + * Prefer finer granularity if the current actor has multiple unrelated responsibilities that can fail and restored + individually + + +## The registration protocol + +As the first step, we need to design the protocol for registering a device and getting an actor that will be responsible +for it. This protocol will be provided by the DeviceManager component itself, because that is the only actor that +is known upfront: groups and device actors are created on-demand. The steps of registering a device are the following: + + 1. DeviceManager receives the request to register an actor for a given group and device ID + 2. If the manager already has an actor for the group, it forwards the request to it. Otherwise it first creates + a new one and then forwards the request. + 3. The DeviceGroup receives the request to register an actor for the given device ID + 4. If the group already has an actor for the device ID, it forwards the request to it. Otherwise it first creates + a new one and then forwards the request. + 5. The device actor receives the request, and acknowledges it to the original sender. Since he is the sender of + the acknowledgement, the recevier will be able to learn its `ActorRef` and send direct messages to it in the future. + +Now that the steps are defined, we only need to define the messages that we will use to communicate requests and +their acknowledgement: + +@@snip [Hello.scala](../../../test/scala/tutorial_3/DeviceManager.scala) { #device-manager-msgs } + +As you see, in this case we have not included a request ID field in the messages. Since registration is usually happening +once, at the component that connects the system to some network protocol, we will usually have no use for the ID. +Nevertheless, it is a good exercise to add this ID. + +## Add registration support to Device actor + +We start implementing the protocol from the bottom first. In practice both a top-down and bottom-up approach can +work, but in our case we benefit from the bottom-up approach as it allows us to immediately write tests for the +new features without mocking out parts. + +At the bottom are the Device actors. Their job in this registration process is rather simple, just reply to the +registration request with an acknowledgement to the sender. *We will assume that the sender of the registration +message is preserved in the upper layers.* We will show you in the next section how this can be achieved. + +We also add a safeguard against requests that come with a mismatched group or device ID. This is how the resulting +code looks like: + +> NOTE: We used a feature of scala pattern matching where we can match if a certain field equals to an expected +value. This is achieved by variables included in backticks, like `` `variable` ``, and it means that the pattern +only match if it contains the value of `variable` in that position. + +@@snip [Hello.scala](../../../test/scala/tutorial_3/Device.scala) { #device-with-register } + +We should not leave features untested, so we immediately write two new test cases, one exercising successful +registration, the other testing the case when IDs don't match: + +> NOTE: We used the `expectNoMsg()` helper method from `TestProbe`. This assertion waits until the defined time-limit +and fails if it receives any messages during this period. If no messages are received during the wait period the +assertion passes. It is usually a good idea to keep these timeouts low (but not too low) because they add significant +test execution time otherwise. + +@@snip [Hello.scala](../../../test/scala/tutorial_3/DeviceSpec.scala) { #device-registration-tests } + +## Device group + +We are done with the registration support at the device level, now we have to implement it at the group level. A group +has more work to do when it comes to registrations. It must either forward the request to an existing child, or it +should create one. To be able to look up child actors by their device IDs we will use a `Map`. + +We also want to keep the original sender of the request so that our device actor can reply directly. This is possible +by using `forward` instead of the `!` operator. The only difference between the two is that `forward` keeps the original +sender while `!` always sets the sender to be the current actor. Just like with our device, we ensure that we don't +respond to wrong group IDs: + +@@snip [Hello.scala](../../../test/scala/tutorial_3/DeviceGroup.scala) { #device-group-register } + +Just as we did with the device, we test this new functionality. We also test that the actors returned for the two +different IDs are actually different, and we also attempt to record a temperature reading for each of the devices +to see if the actors are responding. + +@@snip [Hello.scala](../../../test/scala/tutorial_3/DeviceGroupSpec.scala) { #device-group-test-registration } + +It might be, that a device actor already exists for the registration request. In this case we would like to use +the existing actor instead of a new one. We have not tested this yet, so we need to fix this: + +@@snip [Hello.scala](../../../test/scala/tutorial_3/DeviceGroupSpec.scala) { #device-group-test3 } + +So far, we have implemented everything for registering device actors in the group. Devices come and go however, so +we will need a way to remove those from the `Map`. We will assume that when a device is removed, its corresponding actor +is simply stopped. We need some way for the parent to be notified when one of the device actors are stopped. Unfortunately +supervision will not help, because it is used for error scenarios not graceful stopping. + +There is a feature in Akka that is exactly what we need here. It is possible for an actor to _watch_ another actor +and be notified if the other actor is stopped. This feature is called _Death Watch_ and it is an important tool for +any Akka application. Unlike supervision, watching is not limited to parent-child relationships, any actor can watch +any other actor given its `ActorRef`. After a watched actor stops, the watcher receives a `Terminated(ref)` message +which also contains the reference of the watched actor. The watcher can either handle this message explicitly, or, if +it does not handle it directly it will fail with a `DeathPactException`. This latter is useful if the actor cannot +longer perform its duties after its collaborator actor has been stopped. In our case, the group should still function +after one device have been stopped, so we need to handle this message. The steps we need to follow are the following: + + 1. Whenever we create a new device actor, we must also watch it + 2. When we are notified that a device actor has been stopped we also need to remove it from the `Map` that maps + device IDs to children + +Unfortunately, the `Terminated` message contains only contains the `ActorRef` of the child actor but we does not know +its ID, which we need to remove it from the `Map` of existing ID-actor mappings. To be able to do this removal, we +need to introduce a second `Map` that allow us to find out the device ID corresponding to a given `ActorRef`. Putting +this together the result is: + +@@snip [Hello.scala](../../../test/scala/tutorial_3/DeviceGroup.scala) { #device-group-remove } + +Since so far we have no means to get from the group what are the devices it thinks are active, we cannot test our +new functionality yet. To make it testable, we add a new query capability that simply lists the currently active +device IDs: + +@@snip [Hello.scala](../../../test/scala/tutorial_3/DeviceGroup.scala) { #device-group-full } + +We have now almost everything to test the removal of devices. We only need now ways to: + + * stop a device actor from our test case, from the outside: any actor can be stopped by simply sending a special + built-in message, `PoisonPill`, which instructs the actor to stop. + * be notified once the device actor is stopped: we can use the _Death Watch_ facility for this purpose, too. Thankfully + the `TestProbe` has two messages that we can easily use, `watch()` to watch a specific actor, and `expectTerminated` + to assert that the watched actor has been terminated. + +We add two more test cases now. In the first, we just test that we get back the list of proper IDs once we have added +a few devices. The second test case makes sure that the device ID is properly removed after the device actor has + been stopped: + +@@snip [Hello.scala](../../../test/scala/tutorial_3/DeviceGroupSpec.scala) { #device-group-list-terminate-test } + +## Device manager + +The only part that remains now is the entry point for our device manager component. This actor is very similar to +the group actor, with the only difference that it creates group actors instead of device actors: + +@@snip [Hello.scala](../../../test/scala/tutorial_3/DeviceManager.scala) { #device-manager-full } + +We leave tests of the device manager as an exercise as it is very similar to the tests we have written for the group +actor. + +## What is next? + +We have now a hierarchic component for registering and tracking devices and recording measurements. We have seen +some conversation patterns like + + * request-respond (for temperature recorings) + * delegate-respond (for registration of devices) + * create-watch-terminate (for creating group and device actor as children) + +In the next chapter we will introduce group query capabilities, which will establish a new conversation pattern of +scatter-gather. In particular, we will implement the functionality that allows users to query the status of all +the devices belonging to a group. \ No newline at end of file diff --git a/akka-docs-new/src/test/scala/tutorial_1/IotSupervisor.scala b/akka-docs-new/src/test/scala/tutorial_1/IotSupervisor.scala index 55965bc97a..28beb4d9d1 100644 --- a/akka-docs-new/src/test/scala/tutorial_1/IotSupervisor.scala +++ b/akka-docs-new/src/test/scala/tutorial_1/IotSupervisor.scala @@ -4,7 +4,7 @@ package tutorial_1 //#iot-supervisor -import akka.actor.{Actor, ActorLogging, Props} +import akka.actor.{ Actor, ActorLogging, Props } object IotSupervisor { def props(): Props = Props(new IotSupervisor) diff --git a/akka-docs-new/src/test/scala/tutorial_2/Device.scala b/akka-docs-new/src/test/scala/tutorial_2/Device.scala index 02a286aa0c..511c9c9550 100644 --- a/akka-docs-new/src/test/scala/tutorial_2/Device.scala +++ b/akka-docs-new/src/test/scala/tutorial_2/Device.scala @@ -4,9 +4,7 @@ package tutorial_2 //#full-device -import akka.actor.{Actor, ActorLogging, Props} -import tutorial_3.Device.{ReadTemperature, RecordTemperature, RespondTemperature, TemperatureRecorded} -import tutorial_3.DeviceManager.{DeviceRegistered, RequestTrackDevice} +import akka.actor.{ Actor, ActorLogging, Props } object Device { @@ -20,6 +18,7 @@ object Device { } class Device(groupId: String, deviceId: String) extends Actor with ActorLogging { + import Device._ var lastTemperatureReading: Option[Double] = None override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId) diff --git a/akka-docs-new/src/test/scala/tutorial_2/DeviceInProgress.scala b/akka-docs-new/src/test/scala/tutorial_2/DeviceInProgress.scala index 418a5b06cf..4082322e20 100644 --- a/akka-docs-new/src/test/scala/tutorial_2/DeviceInProgress.scala +++ b/akka-docs-new/src/test/scala/tutorial_2/DeviceInProgress.scala @@ -1,6 +1,6 @@ package tutorial_2 -import tutorial_3.Device.{ReadTemperature, RecordTemperature, RespondTemperature, TemperatureRecorded} +import tutorial_5.Device.{ ReadTemperature, RecordTemperature, RespondTemperature, TemperatureRecorded } object DeviceInProgress1 { @@ -17,23 +17,21 @@ object DeviceInProgress2 { //#read-protocol-2 object Device { - + //#dummy final case class ReadTemperature(requestId: Long) - final case class RespondTemperature(requestId: Long, value: Option[Double]) - + //#dummy } //#read-protocol-2 //#device-with-read - import akka.actor.{Actor, ActorLogging} + import akka.actor.{ Actor, ActorLogging } class Device(groupId: String, deviceId: String) extends Actor with ActorLogging { var lastTemperatureReading: Option[Double] = None override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId) - override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId) override def receive: Receive = { diff --git a/akka-docs-new/src/test/scala/tutorial_2/DeviceSpec.scala b/akka-docs-new/src/test/scala/tutorial_2/DeviceSpec.scala index e308ac3e56..2bb7ea3258 100644 --- a/akka-docs-new/src/test/scala/tutorial_2/DeviceSpec.scala +++ b/akka-docs-new/src/test/scala/tutorial_2/DeviceSpec.scala @@ -3,7 +3,7 @@ */ package tutorial_2 -import akka.testkit.{AkkaSpec, TestProbe} +import akka.testkit.{ AkkaSpec, TestProbe } import scala.concurrent.duration._ diff --git a/akka-docs-new/src/test/scala/tutorial_3/Device.scala b/akka-docs-new/src/test/scala/tutorial_3/Device.scala index 0772307608..55e2debf1a 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/Device.scala +++ b/akka-docs-new/src/test/scala/tutorial_3/Device.scala @@ -7,6 +7,7 @@ import akka.actor.{ Actor, ActorLogging, Props } import tutorial_3.Device.{ ReadTemperature, RecordTemperature, RespondTemperature, TemperatureRecorded } import tutorial_3.DeviceManager.{ DeviceRegistered, RequestTrackDevice } +//#device-with-register object Device { def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId)) @@ -43,3 +44,4 @@ class Device(groupId: String, deviceId: String) extends Actor with ActorLogging sender() ! RespondTemperature(id, lastTemperatureReading) } } +//#device-with-register diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroup.scala b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroup.scala index 4ada3abb85..1aa41b1196 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroup.scala +++ b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroup.scala @@ -6,36 +6,34 @@ package tutorial_3 import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } import tutorial_3.DeviceGroup._ import tutorial_3.DeviceManager.RequestTrackDevice + import scala.concurrent.duration._ +//#device-group-full +//#device-group-register object DeviceGroup { - def props(groupId: String): Props = Props(new DeviceGroup(groupId)) + //#device-group-register final case class RequestDeviceList(requestId: Long) final case class ReplyDeviceList(requestId: Long, ids: Set[String]) - - final case class RequestAllTemperatures(requestId: Long) - final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading]) - - sealed trait TemperatureReading - final case class Temperature(value: Double) extends TemperatureReading - case object TemperatureNotAvailable extends TemperatureReading - case object DeviceNotAvailable extends TemperatureReading - case object DeviceTimedOut extends TemperatureReading + //#device-group-register } +//#device-group-register +//#device-group-register +//#device-group-remove class DeviceGroup(groupId: String) extends Actor with ActorLogging { var deviceIdToActor = Map.empty[String, ActorRef] + //#device-group-register var actorToDeviceId = Map.empty[ActorRef, String] - var nextCollectionId = 0L + //#device-group-register override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) override def receive: Receive = { - // Note the backticks case trackMsg @ RequestTrackDevice(`groupId`, _) => deviceIdToActor.get(trackMsg.deviceId) match { case Some(ref) => @@ -43,19 +41,24 @@ class DeviceGroup(groupId: String) extends Actor with ActorLogging { case None => log.info("Creating device actor for {}", trackMsg.deviceId) val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId) + //#device-group-register context.watch(deviceActor) - deviceActor forward trackMsg - deviceIdToActor += trackMsg.deviceId -> deviceActor actorToDeviceId += deviceActor -> trackMsg.deviceId + //#device-group-register + deviceIdToActor += trackMsg.deviceId -> deviceActor + deviceActor forward trackMsg } case RequestTrackDevice(groupId, deviceId) => log.warning( "Ignoring TrackDevice request for {}. This actor is responsible for {}.", groupId, this.groupId) + //#device-group-register + //#device-group-remove case RequestDeviceList(requestId) => sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet) + //#device-group-remove case Terminated(deviceActor) => val deviceId = actorToDeviceId(deviceActor) @@ -63,13 +66,9 @@ class DeviceGroup(groupId: String) extends Actor with ActorLogging { actorToDeviceId -= deviceActor deviceIdToActor -= deviceId - case RequestAllTemperatures(requestId) => - context.actorOf(DeviceGroupQuery.props( - actorToDeviceId = actorToDeviceId, - requestId = requestId, - requester = sender(), - 3.seconds - )) + //#device-group-register } - } +//#device-group-remove +//#device-group-register +//#device-group-full diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupSpec.scala b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupSpec.scala index 698fb472a7..35f1cc1f72 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupSpec.scala +++ b/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupSpec.scala @@ -13,6 +13,7 @@ class DeviceGroupSpec extends AkkaSpec { "DeviceGroup actor" must { + //#device-group-test-registration "be able to register a device actor" in { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) @@ -24,6 +25,7 @@ class DeviceGroupSpec extends AkkaSpec { groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) probe.expectMsg(DeviceManager.DeviceRegistered) val deviceActor2 = probe.lastSender + deviceActor1 should !==(deviceActor2) // Check that the device actors are working deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) @@ -39,7 +41,9 @@ class DeviceGroupSpec extends AkkaSpec { groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref) probe.expectNoMsg(500.milliseconds) } + //#device-group-test-registration + //#device-group-test3 "return same actor for same deviceId" in { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) @@ -54,7 +58,9 @@ class DeviceGroupSpec extends AkkaSpec { deviceActor1 should ===(deviceActor2) } + //#device-group-test3 + //#device-group-list-terminate-test "be able to list active devices" in { val probe = TestProbe() val groupActor = system.actorOf(DeviceGroup.props("group")) @@ -90,42 +96,7 @@ class DeviceGroupSpec extends AkkaSpec { groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref) probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2"))) } - - "be able to collect temperatures from all active devices" in { - val probe = TestProbe() - val groupActor = system.actorOf(DeviceGroup.props("group")) - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val deviceActor1 = probe.lastSender - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val deviceActor2 = probe.lastSender - - groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref) - probe.expectMsg(DeviceManager.DeviceRegistered) - val deviceActor3 = probe.lastSender - - // Check that the device actors are working - deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) - probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) - deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) - probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) - // No temperature for device3 - - groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref) - probe.expectMsg( - DeviceGroup.RespondAllTemperatures( - requestId = 0, - temperatures = Map( - "device1" -> DeviceGroup.Temperature(1.0), - "device2" -> DeviceGroup.Temperature(2.0), - "device3" -> DeviceGroup.TemperatureNotAvailable - ) - ) - ) - } + //#device-group-list-terminate-test } diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceManager.scala b/akka-docs-new/src/test/scala/tutorial_3/DeviceManager.scala index 0f15f7769f..44579efc7f 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/DeviceManager.scala +++ b/akka-docs-new/src/test/scala/tutorial_3/DeviceManager.scala @@ -7,12 +7,17 @@ package tutorial_3 import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } import tutorial_3.DeviceManager.RequestTrackDevice +//#device-manager-full +//#device-manager-msgs object DeviceManager { + //#device-manager-msgs def props(): Props = Props(new DeviceManager) + //#device-manager-msgs final case class RequestTrackDevice(groupId: String, deviceId: String) case object DeviceRegistered } +//#device-manager-msgs class DeviceManager extends Actor with ActorLogging { var groupIdToActor = Map.empty[String, ActorRef] @@ -45,3 +50,4 @@ class DeviceManager extends Actor with ActorLogging { } } +//#device-manager-full diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceSpec.scala b/akka-docs-new/src/test/scala/tutorial_3/DeviceSpec.scala index 8341d54d98..d9bd7e5c47 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/DeviceSpec.scala +++ b/akka-docs-new/src/test/scala/tutorial_3/DeviceSpec.scala @@ -11,6 +11,7 @@ class DeviceSpec extends AkkaSpec { "Device actor" must { + //#device-registration-tests "reply to registration requests" in { val probe = TestProbe() val deviceActor = system.actorOf(Device.props("group", "device")) @@ -30,6 +31,7 @@ class DeviceSpec extends AkkaSpec { deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref) probe.expectNoMsg(500.milliseconds) } + //#device-registration-tests "reply with empty reading if no temperature is known" in { val probe = TestProbe() diff --git a/akka-docs-new/src/test/scala/tutorial_5/Device.scala b/akka-docs-new/src/test/scala/tutorial_5/Device.scala new file mode 100644 index 0000000000..813f8ffdd0 --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_5/Device.scala @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package tutorial_5 + +import akka.actor.{ Actor, ActorLogging, Props } +import tutorial_5.Device.{ ReadTemperature, RecordTemperature, RespondTemperature, TemperatureRecorded } +import tutorial_5.DeviceManager.{ DeviceRegistered, RequestTrackDevice } + +object Device { + + def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId)) + + final case class RecordTemperature(requestId: Long, value: Double) + final case class TemperatureRecorded(requestId: Long) + + final case class ReadTemperature(requestId: Long) + final case class RespondTemperature(requestId: Long, value: Option[Double]) +} + +class Device(groupId: String, deviceId: String) extends Actor with ActorLogging { + var lastTemperatureReading: Option[Double] = None + + override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId) + + override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId) + + override def receive: Receive = { + case RequestTrackDevice(`groupId`, `deviceId`) => + sender() ! DeviceRegistered + + case RequestTrackDevice(groupId, deviceId) => + log.warning( + "Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.", + groupId, deviceId, this.groupId, this.deviceId) + + case RecordTemperature(id, value) => + log.info("Recorded temperature reading {} with {}", value, id) + lastTemperatureReading = Some(value) + sender() ! TemperatureRecorded(id) + + case ReadTemperature(id) => + sender() ! RespondTemperature(id, lastTemperatureReading) + } +} diff --git a/akka-docs-new/src/test/scala/tutorial_5/DeviceGroup.scala b/akka-docs-new/src/test/scala/tutorial_5/DeviceGroup.scala new file mode 100644 index 0000000000..8084fc5641 --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_5/DeviceGroup.scala @@ -0,0 +1,75 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package tutorial_5 + +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } +import tutorial_5.DeviceGroup._ +import tutorial_5.DeviceManager.RequestTrackDevice +import scala.concurrent.duration._ + +object DeviceGroup { + + def props(groupId: String): Props = Props(new DeviceGroup(groupId)) + + final case class RequestDeviceList(requestId: Long) + final case class ReplyDeviceList(requestId: Long, ids: Set[String]) + + final case class RequestAllTemperatures(requestId: Long) + final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading]) + + sealed trait TemperatureReading + final case class Temperature(value: Double) extends TemperatureReading + case object TemperatureNotAvailable extends TemperatureReading + case object DeviceNotAvailable extends TemperatureReading + case object DeviceTimedOut extends TemperatureReading +} + +class DeviceGroup(groupId: String) extends Actor with ActorLogging { + var deviceIdToActor = Map.empty[String, ActorRef] + var actorToDeviceId = Map.empty[ActorRef, String] + var nextCollectionId = 0L + + override def preStart(): Unit = log.info("DeviceGroup {} started", groupId) + + override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId) + + override def receive: Receive = { + // Note the backticks + case trackMsg @ RequestTrackDevice(`groupId`, _) => + deviceIdToActor.get(trackMsg.deviceId) match { + case Some(ref) => + ref forward trackMsg + case None => + log.info("Creating device actor for {}", trackMsg.deviceId) + val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId) + context.watch(deviceActor) + deviceActor forward trackMsg + deviceIdToActor += trackMsg.deviceId -> deviceActor + actorToDeviceId += deviceActor -> trackMsg.deviceId + } + + case RequestTrackDevice(groupId, deviceId) => + log.warning( + "Ignoring TrackDevice request for {}. This actor is responsible for {}.", + groupId, this.groupId) + + case RequestDeviceList(requestId) => + sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet) + + case Terminated(deviceActor) => + val deviceId = actorToDeviceId(deviceActor) + log.info("Device actor for {} has been terminated", deviceId) + actorToDeviceId -= deviceActor + deviceIdToActor -= deviceId + + case RequestAllTemperatures(requestId) => + context.actorOf(DeviceGroupQuery.props( + actorToDeviceId = actorToDeviceId, + requestId = requestId, + requester = sender(), + 3.seconds + )) + } + +} diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuery.scala b/akka-docs-new/src/test/scala/tutorial_5/DeviceGroupQuery.scala similarity index 99% rename from akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuery.scala rename to akka-docs-new/src/test/scala/tutorial_5/DeviceGroupQuery.scala index f8c145b4ea..d670c7c9a7 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuery.scala +++ b/akka-docs-new/src/test/scala/tutorial_5/DeviceGroupQuery.scala @@ -1,7 +1,7 @@ /** * Copyright (C) 2009-2017 Lightbend Inc. */ -package tutorial_3 +package tutorial_5 import akka.actor.Actor.Receive import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } diff --git a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuerySpec.scala b/akka-docs-new/src/test/scala/tutorial_5/DeviceGroupQuerySpec.scala similarity index 99% rename from akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuerySpec.scala rename to akka-docs-new/src/test/scala/tutorial_5/DeviceGroupQuerySpec.scala index c6efd2f576..8b423f680c 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/DeviceGroupQuerySpec.scala +++ b/akka-docs-new/src/test/scala/tutorial_5/DeviceGroupQuerySpec.scala @@ -1,7 +1,7 @@ /** * Copyright (C) 2009-2017 Lightbend Inc. */ -package tutorial_3 +package tutorial_5 import akka.actor.PoisonPill import akka.testkit.{ AkkaSpec, TestProbe } diff --git a/akka-docs-new/src/test/scala/tutorial_5/DeviceGroupSpec.scala b/akka-docs-new/src/test/scala/tutorial_5/DeviceGroupSpec.scala new file mode 100644 index 0000000000..e33f2fb01a --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_5/DeviceGroupSpec.scala @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package tutorial_5 + +import akka.actor.PoisonPill +import akka.testkit.{ AkkaSpec, TestProbe } + +import scala.concurrent.duration._ + +class DeviceGroupSpec extends AkkaSpec { + + "DeviceGroup actor" must { + + "be able to register a device actor" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor1 = probe.lastSender + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor2 = probe.lastSender + deviceActor1 should !==(deviceActor2) + + // Check that the device actors are working + deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) + probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) + deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) + probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) + } + + "ignore requests for wrong groupId" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref) + probe.expectNoMsg(500.milliseconds) + } + + "return same actor for same deviceId" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor1 = probe.lastSender + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor2 = probe.lastSender + + deviceActor1 should ===(deviceActor2) + } + + "be able to list active devices" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + + groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) + probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) + } + + "be able to list active devices after one shuts down" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val toShutDown = probe.lastSender + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + + groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref) + probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2"))) + + probe.watch(toShutDown) + toShutDown ! PoisonPill + probe.expectTerminated(toShutDown) + + groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref) + probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2"))) + } + + "be able to collect temperatures from all active devices" in { + val probe = TestProbe() + val groupActor = system.actorOf(DeviceGroup.props("group")) + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor1 = probe.lastSender + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor2 = probe.lastSender + + groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + val deviceActor3 = probe.lastSender + + // Check that the device actors are working + deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref) + probe.expectMsg(Device.TemperatureRecorded(requestId = 0)) + deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref) + probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) + // No temperature for device3 + + groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref) + probe.expectMsg( + DeviceGroup.RespondAllTemperatures( + requestId = 0, + temperatures = Map( + "device1" -> DeviceGroup.Temperature(1.0), + "device2" -> DeviceGroup.Temperature(2.0), + "device3" -> DeviceGroup.TemperatureNotAvailable + ) + ) + ) + } + + } + +} diff --git a/akka-docs-new/src/test/scala/tutorial_5/DeviceManager.scala b/akka-docs-new/src/test/scala/tutorial_5/DeviceManager.scala new file mode 100644 index 0000000000..e9df70ac34 --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_5/DeviceManager.scala @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ + +package tutorial_5 + +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } +import tutorial_5.DeviceManager.RequestTrackDevice + +object DeviceManager { + def props(): Props = Props(new DeviceManager) + + final case class RequestTrackDevice(groupId: String, deviceId: String) + case object DeviceRegistered +} + +class DeviceManager extends Actor with ActorLogging { + var groupIdToActor = Map.empty[String, ActorRef] + var actorToGroupId = Map.empty[ActorRef, String] + + override def preStart(): Unit = log.info("DeviceManager started") + + override def postStop(): Unit = log.info("DeviceManager stopped") + + override def receive = { + case trackMsg @ RequestTrackDevice(groupId, _) => + groupIdToActor.get(groupId) match { + case Some(ref) => + ref forward trackMsg + case None => + log.info("Creating device group actor for {}", groupId) + val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId) + context.watch(groupActor) + groupActor forward trackMsg + groupIdToActor += groupId -> groupActor + actorToGroupId += groupActor -> groupId + } + + case Terminated(groupActor) => + val groupId = actorToGroupId(groupActor) + log.info("Device group actor for {} has been terminated", groupId) + actorToGroupId -= groupActor + groupIdToActor -= groupId + + } + +} diff --git a/akka-docs-new/src/test/scala/tutorial_5/DeviceSpec.scala b/akka-docs-new/src/test/scala/tutorial_5/DeviceSpec.scala new file mode 100644 index 0000000000..b3d57c1e9e --- /dev/null +++ b/akka-docs-new/src/test/scala/tutorial_5/DeviceSpec.scala @@ -0,0 +1,67 @@ +/** + * Copyright (C) 2009-2017 Lightbend Inc. + */ +package tutorial_5 + +import akka.testkit.{ AkkaSpec, TestProbe } + +import scala.concurrent.duration._ + +class DeviceSpec extends AkkaSpec { + + "Device actor" must { + + "reply to registration requests" in { + val probe = TestProbe() + val deviceActor = system.actorOf(Device.props("group", "device")) + + deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref) + probe.expectMsg(DeviceManager.DeviceRegistered) + probe.lastSender should ===(deviceActor) + } + + "ignore wrong registration requests" in { + val probe = TestProbe() + val deviceActor = system.actorOf(Device.props("group", "device")) + + deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref) + probe.expectNoMsg(500.milliseconds) + + deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref) + probe.expectNoMsg(500.milliseconds) + } + + "reply with empty reading if no temperature is known" in { + val probe = TestProbe() + val deviceActor = system.actorOf(Device.props("group", "device")) + + deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref) + val response = probe.expectMsgType[Device.RespondTemperature] + response.requestId should ===(42) + response.value should ===(None) + } + + "reply with latest temperature reading" in { + val probe = TestProbe() + val deviceActor = system.actorOf(Device.props("group", "device")) + + deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref) + probe.expectMsg(Device.TemperatureRecorded(requestId = 1)) + + deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref) + val response1 = probe.expectMsgType[Device.RespondTemperature] + response1.requestId should ===(2) + response1.value should ===(Some(24.0)) + + deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref) + probe.expectMsg(Device.TemperatureRecorded(requestId = 3)) + + deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref) + val response2 = probe.expectMsgType[Device.RespondTemperature] + response2.requestId should ===(4) + response2.value should ===(Some(55.0)) + } + + } + +} diff --git a/akka-docs-new/src/test/scala/tutorial_3/IotApp.scala b/akka-docs-new/src/test/scala/tutorial_5/IotApp.scala similarity index 88% rename from akka-docs-new/src/test/scala/tutorial_3/IotApp.scala rename to akka-docs-new/src/test/scala/tutorial_5/IotApp.scala index 8e6f3dd370..059acdd18f 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/IotApp.scala +++ b/akka-docs-new/src/test/scala/tutorial_5/IotApp.scala @@ -1,10 +1,10 @@ /** * Copyright (C) 2009-2017 Lightbend Inc. */ -package tutorial_3 +package tutorial_5 import akka.actor.ActorSystem -import tutorial_3.DeviceManager.RequestTrackDevice +import tutorial_5.DeviceManager.RequestTrackDevice import scala.io.StdIn diff --git a/akka-docs-new/src/test/scala/tutorial_3/IotSupervisor.scala b/akka-docs-new/src/test/scala/tutorial_5/IotSupervisor.scala similarity index 96% rename from akka-docs-new/src/test/scala/tutorial_3/IotSupervisor.scala rename to akka-docs-new/src/test/scala/tutorial_5/IotSupervisor.scala index 32eb48cc3c..0f56f0578d 100644 --- a/akka-docs-new/src/test/scala/tutorial_3/IotSupervisor.scala +++ b/akka-docs-new/src/test/scala/tutorial_5/IotSupervisor.scala @@ -1,7 +1,7 @@ /** * Copyright (C) 2009-2017 Lightbend Inc. */ -package tutorial_3 +package tutorial_5 import akka.actor.{ Actor, ActorLogging, ActorRef, Props }