Getting Started tutorial improvements (#23210)

This commit is contained in:
Arnout Engelen 2017-07-13 01:24:53 -07:00 committed by GitHub
parent d87cf4aec4
commit f38b928e13
67 changed files with 1451 additions and 1507 deletions

View file

@ -4,11 +4,9 @@
package tutorial_4
import akka.actor.{ Actor, ActorLogging, Props }
import tutorial_4.Device.{ ReadTemperature, RecordTemperature, RespondTemperature, TemperatureRecorded }
import tutorial_4.DeviceManager.{ DeviceRegistered, RequestTrackDevice }
//#device-with-register
object Device {
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
final case class RecordTemperature(requestId: Long, value: Double)
@ -19,6 +17,8 @@ object Device {
}
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
import Device._
var lastTemperatureReading: Option[Double] = None
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
@ -26,10 +26,10 @@ class Device(groupId: String, deviceId: String) extends Actor with ActorLogging
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
override def receive: Receive = {
case RequestTrackDevice(`groupId`, `deviceId`) =>
sender() ! DeviceRegistered
case DeviceManager.RequestTrackDevice(`groupId`, `deviceId`) =>
sender() ! DeviceManager.DeviceRegistered
case RequestTrackDevice(groupId, deviceId) =>
case DeviceManager.RequestTrackDevice(groupId, deviceId) =>
log.warning(
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
groupId, deviceId, this.groupId, this.deviceId
@ -44,3 +44,4 @@ class Device(groupId: String, deviceId: String) extends Actor with ActorLogging
sender() ! RespondTemperature(id, lastTemperatureReading)
}
}
//#device-with-register

View file

@ -4,52 +4,49 @@
package tutorial_4
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import tutorial_4.DeviceGroup._
import tutorial_4.DeviceManager.RequestTrackDevice
import DeviceGroup._
import DeviceManager.RequestTrackDevice
import scala.concurrent.duration._
//#device-group-full
//#device-group-register
object DeviceGroup {
def props(groupId: String): Props = Props(new DeviceGroup(groupId))
//#device-group-register
final case class RequestDeviceList(requestId: Long)
final case class ReplyDeviceList(requestId: Long, ids: Set[String])
//#query-protocol
final case class RequestAllTemperatures(requestId: Long)
final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading])
sealed trait TemperatureReading
final case class Temperature(value: Double) extends TemperatureReading
case object TemperatureNotAvailable extends TemperatureReading
case object DeviceNotAvailable extends TemperatureReading
case object DeviceTimedOut extends TemperatureReading
//#query-protocol
//#device-group-register
}
//#device-group-register
//#device-group-register
//#device-group-remove
//#query-added
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
var deviceIdToActor = Map.empty[String, ActorRef]
//#device-group-register
var actorToDeviceId = Map.empty[ActorRef, String]
var nextCollectionId = 0L
//#device-group-register
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)
override def receive: Receive = {
//#query-added
case trackMsg @ RequestTrackDevice(`groupId`, _) =>
deviceIdToActor.get(trackMsg.deviceId) match {
case Some(ref) =>
ref forward trackMsg
case Some(deviceActor) =>
deviceActor forward trackMsg
case None =>
log.info("Creating device actor for {}", trackMsg.deviceId)
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId)
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), s"device-${trackMsg.deviceId}")
//#device-group-register
context.watch(deviceActor)
deviceActor forward trackMsg
deviceIdToActor += trackMsg.deviceId -> deviceActor
actorToDeviceId += deviceActor -> trackMsg.deviceId
//#device-group-register
deviceIdToActor += trackMsg.deviceId -> deviceActor
deviceActor forward trackMsg
}
case RequestTrackDevice(groupId, deviceId) =>
@ -57,9 +54,12 @@ class DeviceGroup(groupId: String) extends Actor with ActorLogging {
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
groupId, this.groupId
)
//#device-group-register
//#device-group-remove
case RequestDeviceList(requestId) =>
sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
//#device-group-remove
case Terminated(deviceActor) =>
val deviceId = actorToDeviceId(deviceActor)
@ -67,17 +67,9 @@ class DeviceGroup(groupId: String) extends Actor with ActorLogging {
actorToDeviceId -= deviceActor
deviceIdToActor -= deviceId
//#query-added
// ... other cases omitted
case RequestAllTemperatures(requestId) =>
context.actorOf(DeviceGroupQuery.props(
actorToDeviceId = actorToDeviceId,
requestId = requestId,
requester = sender(),
3.seconds
))
//#device-group-register
}
}
//#query-added
//#device-group-remove
//#device-group-register
//#device-group-full

View file

@ -1,104 +0,0 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package tutorial_4
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import scala.concurrent.duration._
//#query-full
//#query-outline
object DeviceGroupQuery {
case object CollectionTimeout
def props(
actorToDeviceId: Map[ActorRef, String],
requestId: Long,
requester: ActorRef,
timeout: FiniteDuration
): Props = {
Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout))
}
}
class DeviceGroupQuery(
actorToDeviceId: Map[ActorRef, String],
requestId: Long,
requester: ActorRef,
timeout: FiniteDuration
) extends Actor with ActorLogging {
import DeviceGroupQuery._
import context.dispatcher
val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout)
override def preStart(): Unit = {
actorToDeviceId.keysIterator.foreach { deviceActor =>
context.watch(deviceActor)
deviceActor ! Device.ReadTemperature(0)
}
}
override def postStop(): Unit = {
queryTimeoutTimer.cancel()
}
//#query-outline
//#query-state
override def receive: Receive =
waitingForReplies(
Map.empty,
actorToDeviceId.keySet
)
def waitingForReplies(
repliesSoFar: Map[String, DeviceGroup.TemperatureReading],
stillWaiting: Set[ActorRef]
): Receive = {
case Device.RespondTemperature(0, valueOption) =>
val deviceActor = sender()
val reading = valueOption match {
case Some(value) => DeviceGroup.Temperature(value)
case None => DeviceGroup.TemperatureNotAvailable
}
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar)
case Terminated(deviceActor) =>
receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar)
case CollectionTimeout =>
val timedOutReplies =
stillWaiting.map { deviceActor =>
val deviceId = actorToDeviceId(deviceActor)
deviceId -> DeviceGroup.DeviceTimedOut
}
requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies)
context.stop(self)
}
//#query-state
//#query-collect-reply
def receivedResponse(
deviceActor: ActorRef,
reading: DeviceGroup.TemperatureReading,
stillWaiting: Set[ActorRef],
repliesSoFar: Map[String, DeviceGroup.TemperatureReading]
): Unit = {
context.unwatch(deviceActor)
val deviceId = actorToDeviceId(deviceActor)
val newStillWaiting = stillWaiting - deviceActor
val newRepliesSoFar = repliesSoFar + (deviceId -> reading)
if (newStillWaiting.isEmpty) {
requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar)
context.stop(self)
} else {
context.become(waitingForReplies(newRepliesSoFar, newStillWaiting))
}
}
//#query-collect-reply
//#query-outline
}
//#query-outline
//#query-full

View file

@ -1,167 +0,0 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package tutorial_4
import akka.actor.PoisonPill
import akka.testkit.{ AkkaSpec, TestProbe }
import scala.concurrent.duration._
class DeviceGroupQuerySpec extends AkkaSpec {
"DeviceGroupQuery" must {
//#query-test-normal
"return temperature value for working devices" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
//#query-test-normal
//#query-test-no-reading
"return TemperatureNotAvailable for devices with no readings" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.TemperatureNotAvailable,
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
//#query-test-no-reading
//#query-test-stopped
"return DeviceNotAvailable if device stops before answering" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
device2.ref ! PoisonPill
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.DeviceNotAvailable
)
))
}
//#query-test-stopped
//#query-test-stopped-later
"return temperature reading even if device stops after answering" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 3.seconds
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
device2.ref ! PoisonPill
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0)
)
))
}
//#query-test-stopped-later
//#query-test-timeout
"return DeviceTimedOut if device does not answer in time" in {
val requester = TestProbe()
val device1 = TestProbe()
val device2 = TestProbe()
val queryActor = system.actorOf(DeviceGroupQuery.props(
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
requestId = 1,
requester = requester.ref,
timeout = 1.second
))
device1.expectMsg(Device.ReadTemperature(requestId = 0))
device2.expectMsg(Device.ReadTemperature(requestId = 0))
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
requestId = 1,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.DeviceTimedOut
)
))
}
//#query-test-timeout
}
}

View file

@ -13,6 +13,7 @@ class DeviceGroupSpec extends AkkaSpec {
"DeviceGroup actor" must {
//#device-group-test-registration
"be able to register a device actor" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
@ -40,7 +41,9 @@ class DeviceGroupSpec extends AkkaSpec {
groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref)
probe.expectNoMsg(500.milliseconds)
}
//#device-group-test-registration
//#device-group-test3
"return same actor for same deviceId" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
@ -55,7 +58,9 @@ class DeviceGroupSpec extends AkkaSpec {
deviceActor1 should ===(deviceActor2)
}
//#device-group-test3
//#device-group-list-terminate-test
"be able to list active devices" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
@ -95,41 +100,7 @@ class DeviceGroupSpec extends AkkaSpec {
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2")))
}
}
//#group-query-integration-test
"be able to collect temperatures from all active devices" in {
val probe = TestProbe()
val groupActor = system.actorOf(DeviceGroup.props("group"))
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor1 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor2 = probe.lastSender
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
val deviceActor3 = probe.lastSender
// Check that the device actors are working
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
// No temperature for device3
groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref)
probe.expectMsg(
DeviceGroup.RespondAllTemperatures(
requestId = 0,
temperatures = Map(
"device1" -> DeviceGroup.Temperature(1.0),
"device2" -> DeviceGroup.Temperature(2.0),
"device3" -> DeviceGroup.TemperatureNotAvailable)))
}
//#group-query-integration-test
//#device-group-list-terminate-test
}

View file

@ -5,13 +5,16 @@
package tutorial_4
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import tutorial_4.DeviceManager.RequestTrackDevice
import DeviceManager.RequestTrackDevice
//#device-manager-full
object DeviceManager {
def props(): Props = Props(new DeviceManager)
//#device-manager-msgs
final case class RequestTrackDevice(groupId: String, deviceId: String)
case object DeviceRegistered
//#device-manager-msgs
}
class DeviceManager extends Actor with ActorLogging {
@ -45,3 +48,4 @@ class DeviceManager extends Actor with ActorLogging {
}
}
//#device-manager-full

View file

@ -0,0 +1,69 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package tutorial_4
import akka.testkit.{ AkkaSpec, TestProbe }
import scala.concurrent.duration._
class DeviceSpec extends AkkaSpec {
"Device actor" must {
//#device-registration-tests
"reply to registration requests" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref)
probe.expectMsg(DeviceManager.DeviceRegistered)
probe.lastSender should ===(deviceActor)
}
"ignore wrong registration requests" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref)
probe.expectNoMsg(500.milliseconds)
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref)
probe.expectNoMsg(500.milliseconds)
}
//#device-registration-tests
"reply with empty reading if no temperature is known" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref)
val response = probe.expectMsgType[Device.RespondTemperature]
response.requestId should ===(42)
response.value should ===(None)
}
"reply with latest temperature reading" in {
val probe = TestProbe()
val deviceActor = system.actorOf(Device.props("group", "device"))
deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref)
val response1 = probe.expectMsgType[Device.RespondTemperature]
response1.requestId should ===(2)
response1.value should ===(Some(24.0))
deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref)
probe.expectMsg(Device.TemperatureRecorded(requestId = 3))
deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref)
val response2 = probe.expectMsgType[Device.RespondTemperature]
response2.requestId should ===(4)
response2.value should ===(Some(55.0))
}
}
}