Various corrections
This commit is contained in:
parent
435d95bcdc
commit
4ffb4d30b9
11 changed files with 382 additions and 195 deletions
|
|
@ -13,7 +13,7 @@ it can only modified by invoking a set of curated methods. The object is respons
|
|||
that protect the invariant nature of its encapsulated data.
|
||||
|
||||
For example, operations on an ordered binary tree implementation must not allow violation of the tree ordering
|
||||
constraint. Callers rely on the ordering being intact. For example, when querying the tree for a certain piece of
|
||||
invariant. Callers rely on the ordering being intact. For example, when querying the tree for a certain piece of
|
||||
data, you need to be able to rely on this constraint.
|
||||
|
||||
When we analyze OOP runtime behavior, we sometimes draw a message sequence chart showing the interactions of
|
||||
|
|
@ -40,14 +40,14 @@ can be interleaved in arbitrary ways which eliminates any hope for keeping the i
|
|||
type of coordination between two threads. Now, imagine this issue compounded by the existence of many threads.
|
||||
|
||||
The common approach for solving this problem is to add a lock around these methods. While this ensures that at most
|
||||
ne thread will enter the method at any given time, this is a very costly strategy:
|
||||
one thread will enter the method at any given time, this is a very costly strategy:
|
||||
* Locks _seriously limit_ concurrency, they are very costly on modern CPU architectures,
|
||||
requiring heavy-lifting from the operating system to suspend the thread and restore it later.
|
||||
* The caller thread is now blocked, so it cannot do any other meaningful work. Even in desktop applications this is
|
||||
unacceptable, we want to keep user facing parts of an applications (its UI) to be responsive even when a
|
||||
long background job is running. In backend, server settings, this is outright wasteful.
|
||||
One might think that this can be compensated by launching new threads, but threads are also a costly abstraction.
|
||||
* Locks introduce a new menace, deadlocks.
|
||||
* Locks introduce a new menace: deadlocks.
|
||||
|
||||
These realities result in a no-win situation:
|
||||
* Without sufficient locks, state gets corrupted.
|
||||
|
|
@ -190,8 +190,10 @@ Instead, the receiving actor delivers the results in a reply message.
|
|||
The second key change we need in our model is to reinstate encapsulation. Actors react to messages just like objects
|
||||
"react" to methods invoked on them. The difference is that instead of multiple threads "protruding" into our actor and
|
||||
wreaking havoc to internal state and invariants, actors execute independently from the senders of a message, and they
|
||||
react to incoming messages sequentially, one at a time. There is always at most one message being processed, meaning
|
||||
that invariants can be kept without synchronization. This happens automatically without using locks:
|
||||
react to incoming messages sequentially, one at a time. While each actor processes messages sent to it sequentially,
|
||||
different actors work concurrently with each other so an actor system can process as many messages simultaneously
|
||||
as many processor cores are available on the machine. Since there is always at most one message being processed per actor
|
||||
the invariants of an actor can be kept without synchronization. This happens automatically without using locks:
|
||||
|
||||
TODO: SERIALIZED-TIMELINE-INVARIANTS
|
||||
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
* [What is Akka?](introduction.md)
|
||||
* [What are Actors?](actors-intro.md)
|
||||
* [Akka Libraries and modules](modules.md)
|
||||
* [Akka Libraries and Modules](modules.md)
|
||||
* [Your First Akka Application - Hello World](quickstart.md)
|
||||
|
||||
@@@
|
||||
|
|
@ -10,7 +10,7 @@ crash without responding, messages get lost without a trace on the wire, and net
|
|||
These problems occur regularly in carefully managed intra-datacenter environments - even more so in virtualized
|
||||
architectures.
|
||||
|
||||
To deal with these realities, Akka provides
|
||||
To deal with these realities, Akka provides:
|
||||
|
||||
* Multi-threaded behavior without use of low-level concurrency constructs like
|
||||
atomics or locks. You do not even need to think about memory visibility issues.
|
||||
|
|
@ -36,7 +36,7 @@ But relatively recently, their applicability to the challenges of modern computi
|
|||
proved to be effective.
|
||||
|
||||
The actor model provides an abstraction that allows you to think about your code in terms of communication, not unlike
|
||||
people in a large organization. The basic characteristic of actors that they model the world as stateful entities
|
||||
people in a large organization. The basic characteristic of actors is that they model the world as stateful entities
|
||||
communicating with each other by explicit message passing.
|
||||
|
||||
As computational entities, actors have these characteristics:
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@ Unlike objects, actors encapsulate not only their
|
|||
state but their execution. Communication with actors is not via method calls but by passing messages. While this
|
||||
difference seems to be minor, this is actually what allows us to break clean from the limitations of OOP when it
|
||||
comes to concurrency and remote communication. Don’t worry if this description feels too high level to fully grasp
|
||||
yet, in the next chapter we will explain actors in detail. For now, the important point that this is a model that
|
||||
yet, in the next chapter we will explain actors in detail. For now, the important point is that this is a model that
|
||||
handles concurrency and distribution at the fundamental level instead of ad hoc patched attempts to bring these
|
||||
features to OOP.
|
||||
|
||||
|
|
@ -45,8 +45,8 @@ Some of the problems Remoting solves are
|
|||
If you have a set of actor systems that cooperate to solve some business problem, then you likely want to manage these set of
|
||||
systems in a disciplined way. While Remoting solves the problem of addressing and communicating with components of
|
||||
remote systems, Clustering gives you the ability to organize these into a "meta-system" tied together by a membership
|
||||
protocol. **In most of the cases, you want to use the Cluster module instead of using Remoting directly.**
|
||||
Cluster provides an additional set of services on top of Remoting that most of the real world applications need.
|
||||
protocol. **In most cases, you want to use the Cluster module instead of using Remoting directly.**
|
||||
Cluster provides an additional set of services on top of Remoting that most real world applications need.
|
||||
|
||||
The problems the Cluster module solves (among others) are
|
||||
|
||||
|
|
@ -57,36 +57,6 @@ The problems the Cluster module solves (among others) are
|
|||
* How to distribute computations among the current set of members?
|
||||
* How do I designate members of the cluster to a certain role, in other words to provide certain services and not others?
|
||||
|
||||
### Persistence
|
||||
|
||||
Just like objects in OOP actors keep their state in volatile memory. Once the system is shut down, gracefully or
|
||||
because of a crash, all data that was in memory is lost. Persistence provide patterns to enable actors to persist
|
||||
events that lead to their current state. Upon startup events can be replayed to restore the state of the entity hosted
|
||||
by the actor. The event stream can be queried and fed into additional processing pipelines (an external Big Data
|
||||
cluster for example) or alternate views (like reports).
|
||||
|
||||
Persistence tackles the following problems:
|
||||
|
||||
* How do I restore the state of an entity/actor when system restarts or crashes?
|
||||
* How do I implement a [CQRS system](https://msdn.microsoft.com/en-us/library/jj591573.aspx)?
|
||||
* How do I ensure reliable delivery of messages in face of network errors and system crashes?
|
||||
* How do I introspect domain events that has lead an entity to its current state?
|
||||
|
||||
### Cluster Singleton
|
||||
|
||||
A common (in fact, a bit too common) use case in distributed systems is to have a single entity responsible
|
||||
for a given task which is shared among other members of the cluster and migrated if the host system fails.
|
||||
While this undeniably introduces a common bottleneck for the whole cluster that limits scaling,
|
||||
there are scenarios where the use of this pattern is unavoidable. Cluster singleton allows a cluster to elect an
|
||||
actor system which will host a particular actor while other systems can always access said service independently from
|
||||
where it is.
|
||||
|
||||
The Singleton module can be used to solve these problems:
|
||||
|
||||
* How do I ensure that only one instance is running of a service in the whole cluster?
|
||||
* How do I ensure that the service is up even if the system hosting it currently crashes or shut down during the process of scaling down?
|
||||
* How do I reach this instance from any member of the cluster assuming that it can migrate to other systems over time?
|
||||
|
||||
### Cluster Sharding
|
||||
|
||||
Persistence solves the problem of restoring an actor’s state from persistent storage after system restart or crash.
|
||||
|
|
@ -101,6 +71,21 @@ The problem space that Sharding targets:
|
|||
* How do I ensure migrating entities from a crashed system without losing state?
|
||||
* How do I ensure that an entity does not exist on multiple systems at the same time and hence kept consistent?
|
||||
|
||||
### Cluster Singleton
|
||||
|
||||
A common (in fact, a bit too common) use case in distributed systems is to have a single entity responsible
|
||||
for a given task which is shared among other members of the cluster and migrated if the host system fails.
|
||||
While this undeniably introduces a common bottleneck for the whole cluster that limits scaling,
|
||||
there are scenarios where the use of this pattern is unavoidable. Cluster singleton allows a cluster to elect an
|
||||
actor system which will host a particular actor while other systems can always access said service independently from
|
||||
where it is.
|
||||
|
||||
The Singleton module can be used to solve these problems:
|
||||
|
||||
* How do I ensure that only one instance is running of a service in the whole cluster?
|
||||
* How do I ensure that the service is up even if the system hosting it currently crashes or shut down during the process of scaling down?
|
||||
* How do I reach this instance from any member of the cluster assuming that it can migrate to other systems over time?
|
||||
|
||||
### Cluster Publish-Subscribe
|
||||
|
||||
For coordination among systems it is often necessary to distribute messages to all, or one system of a set of
|
||||
|
|
@ -112,6 +97,21 @@ to broadcast or anycast messages to subscribers of that topic.
|
|||
* How do I anycast messages to a member from an interested set of parties in a cluster?
|
||||
* How to subscribe and unsubscribe for events of a certain topic in the cluster?
|
||||
|
||||
### Persistence
|
||||
|
||||
Just like objects in OOP actors keep their state in volatile memory. Once the system is shut down, gracefully or
|
||||
because of a crash, all data that was in memory is lost. Persistence provide patterns to enable actors to persist
|
||||
events that lead to their current state. Upon startup events can be replayed to restore the state of the entity hosted
|
||||
by the actor. The event stream can be queried and fed into additional processing pipelines (an external Big Data
|
||||
cluster for example) or alternate views (like reports).
|
||||
|
||||
Persistence tackles the following problems:
|
||||
|
||||
* How do I restore the state of an entity/actor when system restarts or crashes?
|
||||
* How do I implement a [CQRS system](https://msdn.microsoft.com/en-us/library/jj591573.aspx)?
|
||||
* How do I ensure reliable delivery of messages in face of network errors and system crashes?
|
||||
* How do I introspect domain events that has lead an entity to its current state?
|
||||
|
||||
### Streams
|
||||
|
||||
Actors are a fundamental model for concurrency, but there are common patterns where their use requires the user
|
||||
|
|
@ -152,14 +152,4 @@ it into a streaming BigData engine. Take the output of that engine as a Stream,
|
|||
operators and expose it as websocket connections served by a load balanced set of HTTP servers hosted by your cluster
|
||||
to power your real-time business analytics tool.
|
||||
|
||||
Got you interested?
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Got you interested?
|
||||
|
|
|
|||
|
|
@ -11,30 +11,31 @@ object Device {
|
|||
|
||||
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
|
||||
|
||||
case class RecordTemperature(requestId: Long, value: Double)
|
||||
case class TemperatureRecorded(requestId: Long)
|
||||
final case class RecordTemperature(requestId: Long, value: Double)
|
||||
final case class TemperatureRecorded(requestId: Long)
|
||||
|
||||
case class ReadTemperature(requestId: Long)
|
||||
case class RespondTemperature(requestId: Long, value: Option[Double])
|
||||
final case class ReadTemperature(requestId: Long)
|
||||
final case class RespondTemperature(requestId: Long, value: Option[Double])
|
||||
}
|
||||
|
||||
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
|
||||
var lastTemperatureReading: Option[Double] = None
|
||||
|
||||
override def preStart(): Unit = log.info(s"Device actor $groupId-$deviceId started")
|
||||
override def preStart(): Unit = log.info("Device actor {}-{} started", groupId, deviceId)
|
||||
|
||||
override def postStop(): Unit = log.info(s"Device actor $groupId-$deviceId stopped")
|
||||
override def postStop(): Unit = log.info("Device actor {}-{} stopped", groupId, deviceId)
|
||||
|
||||
override def receive: Receive = {
|
||||
case RequestTrackDevice(`groupId`, `deviceId`) =>
|
||||
sender() ! DeviceRegistered
|
||||
|
||||
case RequestTrackDevice(groupId, deviceId) =>
|
||||
log.warning(s"Ignoring TrackDevice request for $groupId-$deviceId. " +
|
||||
s"This actor is responsible for ${this.groupId}-${this.deviceId}.")
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}-{}.This actor is responsible for {}-{}.",
|
||||
groupId, deviceId, this.groupId, this.deviceId)
|
||||
|
||||
case RecordTemperature(id, value) =>
|
||||
log.info(s"Recorded temperature reading $value with $id")
|
||||
log.info("Recorded temperature reading {} with {}", value, id)
|
||||
lastTemperatureReading = Some(value)
|
||||
sender() ! TemperatureRecorded(id)
|
||||
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
*/
|
||||
package tutorial_3
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Cancellable, Props, Stash, Terminated }
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
||||
import tutorial_3.DeviceGroup._
|
||||
import tutorial_3.DeviceManager.RequestTrackDevice
|
||||
import scala.concurrent.duration._
|
||||
|
|
@ -12,125 +12,64 @@ object DeviceGroup {
|
|||
|
||||
def props(groupId: String): Props = Props(new DeviceGroup(groupId))
|
||||
|
||||
case class RequestDeviceList(requestId: Long)
|
||||
case class ReplyDeviceList(requestId: Long, ids: Set[String])
|
||||
final case class RequestDeviceList(requestId: Long)
|
||||
final case class ReplyDeviceList(requestId: Long, ids: Set[String])
|
||||
|
||||
case class RequestAllTemperatures(requestId: Long)
|
||||
case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, Option[Double]])
|
||||
final case class RequestAllTemperatures(requestId: Long)
|
||||
final case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, TemperatureReading])
|
||||
|
||||
case class CollectionTimeout(requestId: Long)
|
||||
sealed trait TemperatureReading
|
||||
final case class Temperature(value: Double) extends TemperatureReading
|
||||
case object TemperatureNotAvailable extends TemperatureReading
|
||||
case object DeviceNotAvailable extends TemperatureReading
|
||||
case object DeviceTimedOut extends TemperatureReading
|
||||
}
|
||||
|
||||
class DeviceGroup(groupId: String) extends Actor with ActorLogging with Stash {
|
||||
class DeviceGroup(groupId: String) extends Actor with ActorLogging {
|
||||
var deviceIdToActor = Map.empty[String, ActorRef]
|
||||
var actorToDeviceId = Map.empty[ActorRef, String]
|
||||
var nextCollectionId = 0L
|
||||
|
||||
override def preStart(): Unit = log.info(s"DeviceGroup $groupId started")
|
||||
override def preStart(): Unit = log.info("DeviceGroup {} started", groupId)
|
||||
|
||||
override def postStop(): Unit = log.info(s"DeviceGroup $groupId stopped")
|
||||
override def postStop(): Unit = log.info("DeviceGroup {} stopped", groupId)
|
||||
|
||||
override def receive: Receive = waitingForRequest orElse generalManagement
|
||||
|
||||
def waitingForRequest: Receive = {
|
||||
case RequestAllTemperatures(requestId) =>
|
||||
import context.dispatcher
|
||||
|
||||
val collectionId = nextCollectionId
|
||||
val requester = sender()
|
||||
nextCollectionId += 1
|
||||
val answersSoFar = deviceIdToActor.mapValues(_ => None)
|
||||
context.children.foreach(_ ! Device.ReadTemperature(collectionId))
|
||||
val collectionTimeoutTimer = context.system.scheduler.scheduleOnce(3.seconds, self, CollectionTimeout(collectionId))
|
||||
|
||||
context.become(
|
||||
collectResults(
|
||||
collectionTimeoutTimer,
|
||||
collectionId,
|
||||
requester,
|
||||
requestId,
|
||||
answersSoFar.size,
|
||||
answersSoFar) orElse generalManagement,
|
||||
discardOld = false
|
||||
)
|
||||
}
|
||||
|
||||
def generalManagement: Receive = {
|
||||
override def receive: Receive = {
|
||||
// Note the backticks
|
||||
case trackMsg @ RequestTrackDevice(`groupId`, _) =>
|
||||
handleTrackMessage(trackMsg)
|
||||
deviceIdToActor.get(trackMsg.deviceId) match {
|
||||
case Some(ref) =>
|
||||
ref forward trackMsg
|
||||
case None =>
|
||||
log.info("Creating device actor for {}", trackMsg.deviceId)
|
||||
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId)
|
||||
context.watch(deviceActor)
|
||||
deviceActor forward trackMsg
|
||||
deviceIdToActor += trackMsg.deviceId -> deviceActor
|
||||
actorToDeviceId += deviceActor -> trackMsg.deviceId
|
||||
}
|
||||
|
||||
case RequestTrackDevice(groupId, deviceId) =>
|
||||
log.warning(s"Ignoring TrackDevice request for $groupId. This actor is responsible for ${this.groupId}.")
|
||||
log.warning(
|
||||
"Ignoring TrackDevice request for {}. This actor is responsible for {}.",
|
||||
groupId, this.groupId)
|
||||
|
||||
case RequestDeviceList(requestId) =>
|
||||
sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
|
||||
|
||||
case Terminated(deviceActor) =>
|
||||
removeDeviceActor(deviceActor)
|
||||
}
|
||||
|
||||
def handleTrackMessage(trackMsg: RequestTrackDevice): Unit = {
|
||||
deviceIdToActor.get(trackMsg.deviceId) match {
|
||||
case Some(ref) =>
|
||||
ref forward trackMsg
|
||||
case None =>
|
||||
log.info(s"Creating device actor for ${trackMsg.deviceId}")
|
||||
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId)
|
||||
context.watch(deviceActor)
|
||||
deviceActor forward trackMsg
|
||||
deviceIdToActor += trackMsg.deviceId -> deviceActor
|
||||
actorToDeviceId += deviceActor -> trackMsg.deviceId
|
||||
}
|
||||
}
|
||||
|
||||
def removeDeviceActor(deviceActor: ActorRef): Unit = {
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
log.info(s"Device actor for $deviceId has been terminated")
|
||||
actorToDeviceId -= deviceActor
|
||||
deviceIdToActor -= deviceId
|
||||
}
|
||||
|
||||
def collectResults(
|
||||
timer: Cancellable,
|
||||
expectedId: Long,
|
||||
requester: ActorRef,
|
||||
requestId: Long,
|
||||
waiting: Int,
|
||||
answersSoFar: Map[String, Option[Double]]
|
||||
): Receive = {
|
||||
|
||||
case Device.RespondTemperature(`expectedId`, temperatureOption) =>
|
||||
val deviceActor = sender()
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
val newAnswers = answersSoFar + (deviceId -> temperatureOption)
|
||||
log.info("Device actor for {} has been terminated", deviceId)
|
||||
actorToDeviceId -= deviceActor
|
||||
deviceIdToActor -= deviceId
|
||||
|
||||
if (waiting == 1) {
|
||||
requester ! RespondAllTemperatures(requestId, newAnswers)
|
||||
finishCollection(timer)
|
||||
} else {
|
||||
context.become(collectResults(timer, expectedId, requester, requestId, waiting - 1, newAnswers))
|
||||
}
|
||||
|
||||
case Terminated(deviceActor) =>
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
removeDeviceActor(deviceActor)
|
||||
val newAnswers = answersSoFar + (deviceId -> None)
|
||||
|
||||
if (waiting == 1) {
|
||||
requester ! RespondAllTemperatures(requestId, newAnswers)
|
||||
finishCollection(timer: Cancellable)
|
||||
} else {
|
||||
context.become(collectResults(timer, expectedId, requester, requestId, waiting - 1, newAnswers))
|
||||
}
|
||||
|
||||
case CollectionTimeout(`expectedId`) =>
|
||||
requester ! RespondAllTemperatures(requestId, answersSoFar)
|
||||
}
|
||||
|
||||
def finishCollection(timer: Cancellable): Unit = {
|
||||
context.unbecome()
|
||||
timer.cancel()
|
||||
case RequestAllTemperatures(requestId) =>
|
||||
context.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId = actorToDeviceId,
|
||||
requestId = requestId,
|
||||
requester = sender(),
|
||||
3.seconds
|
||||
))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,98 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package tutorial_3
|
||||
|
||||
import akka.actor.Actor.Receive
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object DeviceGroupQuery {
|
||||
|
||||
case object CollectionTimeout
|
||||
|
||||
def props(
|
||||
actorToDeviceId: Map[ActorRef, String],
|
||||
requestId: Long,
|
||||
requester: ActorRef,
|
||||
timeout: FiniteDuration
|
||||
): Props = {
|
||||
Props(new DeviceGroupQuery(actorToDeviceId, requestId, requester, timeout))
|
||||
}
|
||||
}
|
||||
|
||||
class DeviceGroupQuery(
|
||||
actorToDeviceId: Map[ActorRef, String],
|
||||
requestId: Long,
|
||||
requester: ActorRef,
|
||||
timeout: FiniteDuration
|
||||
) extends Actor with ActorLogging {
|
||||
import DeviceGroupQuery._
|
||||
import context.dispatcher
|
||||
val queryTimeoutTimer = context.system.scheduler.scheduleOnce(timeout, self, CollectionTimeout)
|
||||
|
||||
override def preStart(): Unit = {
|
||||
actorToDeviceId.keysIterator.foreach { deviceActor =>
|
||||
context.watch(deviceActor)
|
||||
deviceActor ! Device.ReadTemperature(0)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
queryTimeoutTimer.cancel()
|
||||
}
|
||||
|
||||
override def receive: Receive =
|
||||
waitingForReplies(
|
||||
Map.empty,
|
||||
actorToDeviceId.keySet
|
||||
)
|
||||
|
||||
def waitingForReplies(
|
||||
repliesSoFar: Map[String, DeviceGroup.TemperatureReading],
|
||||
stillWaiting: Set[ActorRef]
|
||||
): Receive = {
|
||||
case Device.RespondTemperature(0, valueOption) =>
|
||||
val deviceActor = sender()
|
||||
val reading = valueOption match {
|
||||
case Some(value) => DeviceGroup.Temperature(value)
|
||||
case None => DeviceGroup.TemperatureNotAvailable
|
||||
}
|
||||
receivedResponse(deviceActor, reading, stillWaiting, repliesSoFar)
|
||||
|
||||
case Terminated(deviceActor) =>
|
||||
if (stillWaiting.contains(deviceActor))
|
||||
receivedResponse(deviceActor, DeviceGroup.DeviceNotAvailable, stillWaiting, repliesSoFar)
|
||||
// else ignore
|
||||
|
||||
case CollectionTimeout =>
|
||||
val timedOutReplies =
|
||||
stillWaiting.map { deviceActor =>
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
deviceId -> DeviceGroup.DeviceTimedOut
|
||||
}
|
||||
requester ! DeviceGroup.RespondAllTemperatures(requestId, repliesSoFar ++ timedOutReplies)
|
||||
context.stop(self)
|
||||
}
|
||||
|
||||
def receivedResponse(
|
||||
deviceActor: ActorRef,
|
||||
reading: DeviceGroup.TemperatureReading,
|
||||
stillWaiting: Set[ActorRef],
|
||||
repliesSoFar: Map[String, DeviceGroup.TemperatureReading]
|
||||
): Unit = {
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
val newStillWaiting = stillWaiting - deviceActor
|
||||
|
||||
val newRepliesSoFar = repliesSoFar + (deviceId -> reading)
|
||||
if (newStillWaiting.isEmpty) {
|
||||
requester ! DeviceGroup.RespondAllTemperatures(requestId, newRepliesSoFar)
|
||||
context.stop(self)
|
||||
} else {
|
||||
context.become(waitingForReplies(newRepliesSoFar, newStillWaiting))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,157 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package tutorial_3
|
||||
|
||||
import akka.actor.PoisonPill
|
||||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class DeviceGroupQuerySpec extends AkkaSpec {
|
||||
|
||||
"DeviceGroupQuery" must {
|
||||
|
||||
"return temperature value for working devices" in {
|
||||
val requester = TestProbe()
|
||||
|
||||
val device1 = TestProbe()
|
||||
val device2 = TestProbe()
|
||||
|
||||
val queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
|
||||
requestId = 1,
|
||||
requester = requester.ref,
|
||||
timeout = 3.seconds
|
||||
))
|
||||
|
||||
device1.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
device2.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
|
||||
|
||||
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 1,
|
||||
temperatures = Map(
|
||||
"device1" -> DeviceGroup.Temperature(1.0),
|
||||
"device2" -> DeviceGroup.Temperature(2.0)
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
"return TemperatureNotAvailable for devices with no readings" in {
|
||||
val requester = TestProbe()
|
||||
|
||||
val device1 = TestProbe()
|
||||
val device2 = TestProbe()
|
||||
|
||||
val queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
|
||||
requestId = 1,
|
||||
requester = requester.ref,
|
||||
timeout = 3.seconds
|
||||
))
|
||||
|
||||
device1.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
device2.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, None), device1.ref)
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
|
||||
|
||||
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 1,
|
||||
temperatures = Map(
|
||||
"device1" -> DeviceGroup.TemperatureNotAvailable,
|
||||
"device2" -> DeviceGroup.Temperature(2.0)
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
"return DeviceNotAvailable if device stops before answering" in {
|
||||
val requester = TestProbe()
|
||||
|
||||
val device1 = TestProbe()
|
||||
val device2 = TestProbe()
|
||||
|
||||
val queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
|
||||
requestId = 1,
|
||||
requester = requester.ref,
|
||||
timeout = 3.seconds
|
||||
))
|
||||
|
||||
device1.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
device2.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
|
||||
device2.ref ! PoisonPill
|
||||
|
||||
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 1,
|
||||
temperatures = Map(
|
||||
"device1" -> DeviceGroup.Temperature(1.0),
|
||||
"device2" -> DeviceGroup.DeviceNotAvailable
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
"return temperature reading even if device stops after answering" in {
|
||||
val requester = TestProbe()
|
||||
|
||||
val device1 = TestProbe()
|
||||
val device2 = TestProbe()
|
||||
|
||||
val queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
|
||||
requestId = 1,
|
||||
requester = requester.ref,
|
||||
timeout = 3.seconds
|
||||
))
|
||||
|
||||
device1.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
device2.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(2.0)), device2.ref)
|
||||
device2.ref ! PoisonPill
|
||||
|
||||
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 1,
|
||||
temperatures = Map(
|
||||
"device1" -> DeviceGroup.Temperature(1.0),
|
||||
"device2" -> DeviceGroup.Temperature(2.0)
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
"return DeviceTimedOut if device does not answer in time" in {
|
||||
val requester = TestProbe()
|
||||
|
||||
val device1 = TestProbe()
|
||||
val device2 = TestProbe()
|
||||
|
||||
val queryActor = system.actorOf(DeviceGroupQuery.props(
|
||||
actorToDeviceId = Map(device1.ref -> "device1", device2.ref -> "device2"),
|
||||
requestId = 1,
|
||||
requester = requester.ref,
|
||||
timeout = 1.second
|
||||
))
|
||||
|
||||
device1.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
device2.expectMsg(Device.ReadTemperature(requestId = 0))
|
||||
|
||||
queryActor.tell(Device.RespondTemperature(requestId = 0, Some(1.0)), device1.ref)
|
||||
|
||||
requester.expectMsg(DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 1,
|
||||
temperatures = Map(
|
||||
"device1" -> DeviceGroup.Temperature(1.0),
|
||||
"device2" -> DeviceGroup.DeviceTimedOut
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -17,18 +17,18 @@ class DeviceGroupSpec extends AkkaSpec {
|
|||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
|
||||
// Check that the device actors are working
|
||||
probe.send(deviceActor1, Device.RecordTemperature(requestId = 0, 1.0))
|
||||
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
|
||||
probe.send(deviceActor2, Device.RecordTemperature(requestId = 1, 2.0))
|
||||
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
}
|
||||
|
||||
|
|
@ -36,7 +36,7 @@ class DeviceGroupSpec extends AkkaSpec {
|
|||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("wrongGroup", "device1"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device1"), probe.ref)
|
||||
probe.expectNoMsg(500.milliseconds)
|
||||
}
|
||||
|
||||
|
|
@ -44,11 +44,11 @@ class DeviceGroupSpec extends AkkaSpec {
|
|||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
|
||||
|
|
@ -59,13 +59,13 @@ class DeviceGroupSpec extends AkkaSpec {
|
|||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
probe.send(groupActor, DeviceGroup.RequestDeviceList(requestId = 0))
|
||||
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
|
||||
}
|
||||
|
||||
|
|
@ -73,55 +73,55 @@ class DeviceGroupSpec extends AkkaSpec {
|
|||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val toShutDown = probe.lastSender
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
probe.send(groupActor, DeviceGroup.RequestDeviceList(requestId = 0))
|
||||
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 0), probe.ref)
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
|
||||
|
||||
probe.watch(toShutDown)
|
||||
toShutDown ! PoisonPill
|
||||
probe.expectTerminated(toShutDown)
|
||||
|
||||
probe.send(groupActor, DeviceGroup.RequestDeviceList(requestId = 0))
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device2")))
|
||||
groupActor.tell(DeviceGroup.RequestDeviceList(requestId = 1), probe.ref)
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 1, Set("device2")))
|
||||
}
|
||||
|
||||
"be able to collect temperatures from all active devices" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device1"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device2"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device3"))
|
||||
groupActor.tell(DeviceManager.RequestTrackDevice("group", "device3"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor3 = probe.lastSender
|
||||
|
||||
// Check that the device actors are working
|
||||
probe.send(deviceActor1, Device.RecordTemperature(requestId = 0, 1.0))
|
||||
deviceActor1.tell(Device.RecordTemperature(requestId = 0, 1.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
|
||||
probe.send(deviceActor2, Device.RecordTemperature(requestId = 1, 2.0))
|
||||
deviceActor2.tell(Device.RecordTemperature(requestId = 1, 2.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
// No temperature for device3
|
||||
|
||||
probe.send(groupActor, DeviceGroup.RequestAllTemperatures(requestId = 0))
|
||||
groupActor.tell(DeviceGroup.RequestAllTemperatures(requestId = 0), probe.ref)
|
||||
probe.expectMsg(
|
||||
DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 0,
|
||||
temperatures = Map(
|
||||
"device1" -> Some(1.0),
|
||||
"device2" -> Some(2.0),
|
||||
"device3" -> None
|
||||
"device1" -> DeviceGroup.Temperature(1.0),
|
||||
"device2" -> DeviceGroup.Temperature(2.0),
|
||||
"device3" -> DeviceGroup.TemperatureNotAvailable
|
||||
)
|
||||
)
|
||||
)
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import tutorial_3.DeviceManager.RequestTrackDevice
|
|||
object DeviceManager {
|
||||
def props(): Props = Props(new DeviceManager)
|
||||
|
||||
case class RequestTrackDevice(groupId: String, deviceId: String)
|
||||
final case class RequestTrackDevice(groupId: String, deviceId: String)
|
||||
case object DeviceRegistered
|
||||
}
|
||||
|
||||
|
|
@ -28,7 +28,7 @@ class DeviceManager extends Actor with ActorLogging {
|
|||
case Some(ref) =>
|
||||
ref forward trackMsg
|
||||
case None =>
|
||||
log.info(s"Creating device group actor for $groupId")
|
||||
log.info("Creating device group actor for {}", groupId)
|
||||
val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId)
|
||||
context.watch(groupActor)
|
||||
groupActor forward trackMsg
|
||||
|
|
@ -38,7 +38,7 @@ class DeviceManager extends Actor with ActorLogging {
|
|||
|
||||
case Terminated(groupActor) =>
|
||||
val groupId = actorToGroupId(groupActor)
|
||||
log.info(s"Device group actor for $groupId has been terminated")
|
||||
log.info("Device group actor for {} has been terminated", groupId)
|
||||
actorToGroupId -= groupActor
|
||||
groupIdToActor -= groupId
|
||||
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ class DeviceSpec extends AkkaSpec {
|
|||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
probe.send(deviceActor, DeviceManager.RequestTrackDevice("group", "device"))
|
||||
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "device"), probe.ref)
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
probe.lastSender should ===(deviceActor)
|
||||
}
|
||||
|
|
@ -24,10 +24,10 @@ class DeviceSpec extends AkkaSpec {
|
|||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
probe.send(deviceActor, DeviceManager.RequestTrackDevice("wrongGroup", "device"))
|
||||
deviceActor.tell(DeviceManager.RequestTrackDevice("wrongGroup", "device"), probe.ref)
|
||||
probe.expectNoMsg(500.milliseconds)
|
||||
|
||||
probe.send(deviceActor, DeviceManager.RequestTrackDevice("group", "wrongDevice"))
|
||||
deviceActor.tell(DeviceManager.RequestTrackDevice("group", "Wrongdevice"), probe.ref)
|
||||
probe.expectNoMsg(500.milliseconds)
|
||||
}
|
||||
|
||||
|
|
@ -35,7 +35,7 @@ class DeviceSpec extends AkkaSpec {
|
|||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
probe.send(deviceActor, Device.ReadTemperature(requestId = 42))
|
||||
deviceActor.tell(Device.ReadTemperature(requestId = 42), probe.ref)
|
||||
val response = probe.expectMsgType[Device.RespondTemperature]
|
||||
response.requestId should ===(42)
|
||||
response.value should ===(None)
|
||||
|
|
@ -45,18 +45,18 @@ class DeviceSpec extends AkkaSpec {
|
|||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
probe.send(deviceActor, Device.RecordTemperature(requestId = 1, 24.0))
|
||||
deviceActor.tell(Device.RecordTemperature(requestId = 1, 24.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
|
||||
probe.send(deviceActor, Device.ReadTemperature(requestId = 2))
|
||||
deviceActor.tell(Device.ReadTemperature(requestId = 2), probe.ref)
|
||||
val response1 = probe.expectMsgType[Device.RespondTemperature]
|
||||
response1.requestId should ===(2)
|
||||
response1.value should ===(Some(24.0))
|
||||
|
||||
probe.send(deviceActor, Device.RecordTemperature(requestId = 3, 55.0))
|
||||
deviceActor.tell(Device.RecordTemperature(requestId = 3, 55.0), probe.ref)
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 3))
|
||||
|
||||
probe.send(deviceActor, Device.ReadTemperature(requestId = 4))
|
||||
deviceActor.tell(Device.ReadTemperature(requestId = 4), probe.ref)
|
||||
val response2 = probe.expectMsgType[Device.RespondTemperature]
|
||||
response2.requestId should ===(4)
|
||||
response2.value should ===(Some(55.0))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue