Towards getting-started docs
This commit is contained in:
parent
ecbcc56f28
commit
435d95bcdc
18 changed files with 1161 additions and 0 deletions
17
akka-docs-new/build.sbt
Normal file
17
akka-docs-new/build.sbt
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
import akka.{ AkkaBuild, Dependencies, Formatting }
|
||||
import akka.ValidatePullRequest._
|
||||
import com.typesafe.sbt.SbtScalariform.ScalariformKeys
|
||||
|
||||
AkkaBuild.defaultSettings
|
||||
AkkaBuild.dontPublishSettings
|
||||
Formatting.docFormatSettings
|
||||
Dependencies.docs
|
||||
|
||||
unmanagedSourceDirectories in ScalariformKeys.format in Test <<= unmanagedSourceDirectories in Test
|
||||
//TODO: additionalTasks in ValidatePR += paradox in Paradox
|
||||
|
||||
enablePlugins(ScaladocNoVerificationOfDiagrams)
|
||||
disablePlugins(MimaPlugin)
|
||||
enablePlugins(ParadoxPlugin)
|
||||
|
||||
paradoxTheme := Some(builtinParadoxTheme("generic"))
|
||||
259
akka-docs-new/src/main/paradox/guide/actors-intro.md
Normal file
259
akka-docs-new/src/main/paradox/guide/actors-intro.md
Normal file
|
|
@ -0,0 +1,259 @@
|
|||
# What problems does the actor model solve?
|
||||
|
||||
Akka uses the actor model to overcome the limitations of traditional object-oriented programming models and meet the
|
||||
unique challenges of highly distributed systems. To fully understand why the actor model is necessary, it helps to
|
||||
identify mismatches between traditional approaches to programming and the realities of concurrent and distributed
|
||||
computing.
|
||||
|
||||
### The illusion of encapsulation
|
||||
|
||||
Object oriented programming (OOP) is a widely-accepted, familiar programming model. One of its basic pillars is
|
||||
_encapsulation_. Encapsulation dictates that the internal data of an object is not accessible directly from the outside;
|
||||
it can only modified by invoking a set of curated methods. The object is responsible to expose only safe operations
|
||||
that protect the invariant nature of its encapsulated data.
|
||||
|
||||
For example, operations on an ordered binary tree implementation must not allow violation of the tree ordering
|
||||
constraint. Callers rely on the ordering being intact. For example, when querying the tree for a certain piece of
|
||||
data, you need to be able to rely on this constraint.
|
||||
|
||||
When we analyze OOP runtime behavior, we sometimes draw a message sequence chart showing the interactions of
|
||||
method calls. For example:
|
||||
|
||||
TODO: SEQ-CHART
|
||||
|
||||
Unfortunately, the above diagram does not accurately represent the _lifelines_ of the instances during execution.
|
||||
In reality, a _thread_ executes all these calls, and the enforcement of invariants occurs on the same thread from
|
||||
which the method was called. Updating the diagram with the thread of execution, it looks like this:
|
||||
|
||||
TODO: SEQ-CHART-THREAD
|
||||
|
||||
The significance of this clarification becomes clear when you try to model what happens with _multiple threads_.
|
||||
Suddenly, our neatly drawn diagram becomes inadequate. We can try to illustrate multiple threads accessing
|
||||
the same instance:
|
||||
|
||||
TODO: SEQ-CHART-MULTI-THREAD
|
||||
|
||||
There is a section of execution where two threads enter the same method. Unfortunately, the encapsulation model
|
||||
of objects does not guarantee anything about what happens in that section. In fact, we also see one thread calling
|
||||
into the first object while it is in the middle of a call to a third one. Instructions of the two invocations
|
||||
can be interleaved in arbitrary ways which eliminates any hope for keeping the invariants intact without some
|
||||
type of coordination between two threads. Now, imagine this issue compounded by the existence of many threads.
|
||||
|
||||
The common approach for solving this problem is to add a lock around these methods. While this ensures that at most
|
||||
ne thread will enter the method at any given time, this is a very costly strategy:
|
||||
* Locks _seriously limit_ concurrency, they are very costly on modern CPU architectures,
|
||||
requiring heavy-lifting from the operating system to suspend the thread and restore it later.
|
||||
* The caller thread is now blocked, so it cannot do any other meaningful work. Even in desktop applications this is
|
||||
unacceptable, we want to keep user facing parts of an applications (its UI) to be responsive even when a
|
||||
long background job is running. In backend, server settings, this is outright wasteful.
|
||||
One might think that this can be compensated by launching new threads, but threads are also a costly abstraction.
|
||||
* Locks introduce a new menace, deadlocks.
|
||||
|
||||
These realities result in a no-win situation:
|
||||
* Without sufficient locks, state gets corrupted.
|
||||
* With many locks in place, performance suffers and very easily leads to deadlocks.
|
||||
|
||||
Additionally, locks only really work well locally. When it comes to coordinating across multiple machines,
|
||||
the only alternative is distributed locks. Unfortunately, distributed locks are several magnitudes less efficient
|
||||
than local locks and usually impose a hard limit on scaling out. Distributed lock protocols require several
|
||||
communication round-trips over the network across multiple machines, so latency goes through the roof.
|
||||
|
||||
In Object Oriented languages we rarely think about threads, or linear execution paths in general.
|
||||
We often envision a system as a network of object instances that react to method calls, modify their internal state,
|
||||
then communicate with each other via method calls driving the whole application state forward:
|
||||
|
||||
TODO: OBJECT-GRAPH
|
||||
|
||||
However, in a multi-threaded distributed environment, what actually happens is more like the game Snake,
|
||||
in the sense that threads “traverse” this network of object instances by following method calls.
|
||||
As a result, threads are what really drive execution:
|
||||
|
||||
TODO: OBJECT-GRAPH-SNAKES
|
||||
|
||||
**In summary**:
|
||||
|
||||
* **Objects can only guarantee encapsulation (protection of invariants) in the face of single-threaded access,
|
||||
multi-thread execution almost always leads to corrupted internal state. Every invariant can be violated by
|
||||
having two contending threads on the same code segment**
|
||||
* **While locks seem to be the natural remedy to uphold encapsulation with multiple threads, in practice they
|
||||
are inefficient and easily lead to deadlocks in any application of real-world scale.**
|
||||
* **Locks work locally, attempts to make them distributed exist, but offer limited potential for scaling out.**
|
||||
|
||||
### The illusion of shared memory on modern computer architectures
|
||||
|
||||
Programming models of the 80’-90’s conceptualize that writing to a variable means writing to a memory location directly
|
||||
(somewhat muddies the water that local variables might exist only in registers). On modern architectures -
|
||||
if we simplify things a bit - CPUs are writing to [cache lines](https://en.wikipedia.org/wiki/CPU_cache)
|
||||
instead of writing to memory directly. Most of these caches are local to the CPU core, that is, writes by one core
|
||||
are not visible by another. In order to make local changes visible to another core (and hence, to another thread)
|
||||
the cache line needs to be shipped to the other processor’s cache.
|
||||
|
||||
On the JVM, we have to explicitly denote memory locations to be shared across threads by using _volatile_ markers
|
||||
or `Atomic` wrappers. Otherwise, we can access them only in a locked section. Why can’t we just mark all variables as
|
||||
volatile? Because shipping cache lines across cores is a very costly operation. It implicitly stalls the cores
|
||||
involved from doing additional work, and results in bottlenecks on the cache coherence protocol (the protocol CPUs
|
||||
use to transfer cache lines between main memory and other CPUs).
|
||||
The result is magnitudes of slowdown.
|
||||
|
||||
Even for developers aware of this situation, figuring out which memory locations should be marked as volatile,
|
||||
or which atomic structures to use is a dark art.
|
||||
|
||||
**In summary**:
|
||||
|
||||
* **There is no real shared memory anymore, CPU cores pass chunks of data (cache lines) explicitly to each other
|
||||
just as computers on a network do. Inter-CPU communication and network communication have more in common than
|
||||
many realize. Passing messages is the norm now be it across CPUs or networked computers.**
|
||||
* **Instead of hiding the message passing aspect through variables marked as shared or using atomic data structures,
|
||||
a more disciplined and principled approach is to keep state local to a concurrent entity and propagate data or events
|
||||
between concurrent entities explicitly via messages.**
|
||||
|
||||
### The illusion of a call stack
|
||||
|
||||
Today, we often take call stacks for granted. But, they were invented in an era where concurrent programming
|
||||
was not as important because multi-CPU systems were not common. Call stacks do not span threads and hence,
|
||||
do not model asynchronous call chains.
|
||||
|
||||
The problem arises when a thread intends to delegate a task to the "background". In practice, this really means
|
||||
delegating to another thread. This cannot be a simple method/function call because calls are strictly local to the
|
||||
thread. What usually happens, is that the "caller" puts an object into a memory location shared by a worker thread
|
||||
("callee"), which in turn, picks it up in some event loop. This allows the "caller" thread to move on and do other tasks.
|
||||
|
||||
The first issue is, how can the "caller" be notified of the completion of the task? But a more serious issue arises
|
||||
when a task fails with an exception. Where does the exception propagate to? It will propagate to the exception handler
|
||||
of the worker thread completely ignoring who the actual "caller" was:
|
||||
|
||||
TODO: EXCEPTION-PROP
|
||||
|
||||
This is a serious problem. How does the worker thread deal with the situation? It likely cannot fix the issue as it is
|
||||
usually oblivious of the purpose of the failed task. The "caller" thread needs to be notified somehow,
|
||||
but there is no call-stack to unwind with an exception. Failure notification can only be done via a side-channel,
|
||||
for example putting an error code where the "caller" thread otherwise expects the result once ready.
|
||||
If this notification is not in place, the "caller" never gets notified of a failure and the task is lost!
|
||||
**This is surprisingly close to how networked systems work where messages/requests can get lost/fail without any
|
||||
notification.**
|
||||
|
||||
This bad situation gets worse when things go really wrong and a worker backed by a thread encounters a bug and ends
|
||||
up in an unrecoverable situation. For example, maybe an internal exception caused by a bug bubbles up to the root of
|
||||
the thread and the thread shuts down. This immediately raises the question, who should restart the normal operation
|
||||
of the service hosted by the thread, and how should it be restored to a known-good state? At first glance,
|
||||
this might seem manageable, but we are suddenly faced by a new, unexpected phenomena: the actual task,
|
||||
that the thread was currently working on, is no longer in the shared memory location where tasks are taken from
|
||||
(usually a queue). In fact, due to the exception reaching to the top, unwinding all the callstack,
|
||||
the task state is fully lost! **We have lost a message even though this is local communication with no networking
|
||||
involved (where message losses are expected).**
|
||||
|
||||
**In summary:**
|
||||
* **To achieve any meaningful concurrency and performance on current systems threads must delegate tasks among each
|
||||
other in an efficient way without blocking. With this style of task-delegating concurrency
|
||||
(and even more so with networked/distributed computing) callstack-based error handling breaks down and new,
|
||||
explicit error signaling mechanisms need to be introduced. Failure becomes part of the domain model.**
|
||||
* **Concurrent systems with work delegation needs to handle service faults and have principled means to recover from them.
|
||||
Clients of such services need to be aware that tasks/messages might get lost during restarts.
|
||||
Even if loss does not happen, a response might be delayed arbitrarily due to previously enqueued tasks
|
||||
(a long queue), delays caused by garbage collection, etc. In face of these, concurrent systems should handle response
|
||||
deadlines in the form of timeouts, just like networked/distributed systems.**
|
||||
|
||||
## How the actor model meets the needs of concurrent, distributed systems
|
||||
|
||||
As described in the sections above, common programming practices cannot properly address the needs of modern concurrent
|
||||
and distributed systems.
|
||||
Thankfully, we don’t need to scrap everything we know. Instead, the actor model addresses these shortcomings in a
|
||||
principled way, allowing systems to behave in a way that better matches our mental model.
|
||||
|
||||
In particular, we would like to:
|
||||
|
||||
* Enforce encapsulation without resorting to locks
|
||||
* Use the model of cooperative entities reacting to signals, changing state and sending signals to each other
|
||||
to drive the whole application forward
|
||||
* Stop worrying about the executing mechanism ("snaky-sneaky" threads) which is in a mismatch with our world view
|
||||
|
||||
The actor model accomplishes all of these goals. The following topics describe how.
|
||||
|
||||
### Use of message passing avoids locking and blocking
|
||||
|
||||
Instead of calling methods, actors send messages to each other. Sending a message does not transfer the thread
|
||||
of execution from the sender to the destination. An actor can send a message and continue without blocking.
|
||||
It can do more work, send and receive messages.
|
||||
|
||||
With objects, when a method returns, it releases control of its executing thread. In this respect, actors behave
|
||||
much like objects, they react to messages and return execution when they finish processing the current message.
|
||||
In this way, actors actually achieve the execution we imagined for objects:
|
||||
|
||||
TODO: ACTOR-GRAPH
|
||||
|
||||
An important consequence of passing messages instead of calling methods is that messages have no return value.
|
||||
By sending a message, an actor delegates work to another actor. As we saw in [The illusion of a call stack],
|
||||
if it expected a return value, the sending actor would either need to block (issue: locks and sacrifice of a thread),
|
||||
or to execute the other actor’s work on the same thread (issue: "snakes-in-a-maze").
|
||||
Instead, the receiving actor delivers the results in a reply message.
|
||||
|
||||
The second key change we need in our model is to reinstate encapsulation. Actors react to messages just like objects
|
||||
"react" to methods invoked on them. The difference is that instead of multiple threads "protruding" into our actor and
|
||||
wreaking havoc to internal state and invariants, actors execute independently from the senders of a message, and they
|
||||
react to incoming messages sequentially, one at a time. There is always at most one message being processed, meaning
|
||||
that invariants can be kept without synchronization. This happens automatically without using locks:
|
||||
|
||||
TODO: SERIALIZED-TIMELINE-INVARIANTS
|
||||
|
||||
In summary, this is what happens when an actor receives a message. It:
|
||||
|
||||
1. Adds the message to the end of a queue
|
||||
2. If the actor was not scheduled for execution, it is marked as ready to execute
|
||||
3. A (hidden) scheduler entity takes the actor and starts executing it
|
||||
4. Actor picks the message from the front of the queue
|
||||
5. Actor modifies internal state, sends messages to other actors
|
||||
6. Actor is unscheduled
|
||||
|
||||
To accomplish this behavior, actors have
|
||||
|
||||
* A Mailbox (the queue where messages end up)
|
||||
* A Behavior (the state of the actor, internal variables etc.)
|
||||
* Messages (pieces of data representing a signal, similar to method calls and their parameters)
|
||||
* An Execution Environment (the machinery that takes actors that have messages to react to and invokes
|
||||
their message handling code)
|
||||
* An Address (more on this later)
|
||||
|
||||
Messages are put into Mailboxes of Actors, then the Behavior of the actor describes how the actor responds to
|
||||
messages (like sending more messages and/or changing state). The Execution Environment orchestrates a pool of threads
|
||||
to drive all these actions completely transparently.
|
||||
|
||||
This is a very simple model and it solves the issues enumerated previously:
|
||||
* Encapsulation is preserved by decoupling execution from signaling (method calls transfer execution,
|
||||
message passing does not).
|
||||
* There is no need for locks. Modifying the internal state of an actor is only possible via messages, which are
|
||||
processed one at a time eliminating races when trying to keep invariants.
|
||||
* There are no locks used anywhere, and senders are not blocked. Millions of actors can be efficiently scheduled on a
|
||||
dozen of threads reaching the full potential of modern CPUs. Task delegation is the natural mode of operation
|
||||
for actors.
|
||||
* State of actors is local and not shared, changes and data is propagated via messages, which maps to how modern
|
||||
memory hierarchy actually works. In many cases this means transferring over only the cache lines that contain the
|
||||
data in the message while keeping local state and data cached at the original core. The same model maps exactly
|
||||
to remote communication where state is kept in the RAM of machines and changes/data is propagated over the network
|
||||
as packets.
|
||||
|
||||
### Actors handle error situations gracefully
|
||||
|
||||
Since we have no longer a shared call stack between actors that send messages to each other, we need to handle
|
||||
error situations differently. There are two kinds of errors we need to consider:
|
||||
|
||||
* The first case is when the delegated task on the target actor failed due to an error in the task (typically some
|
||||
validation issue, like a non-existent user ID). In this case, the service encapsulated by the target actor is intact,
|
||||
it is only the task that itself is erroneous.
|
||||
The service actor should reply to the sender with a message, presenting the error case. There is nothing special
|
||||
here, errors are part of the domain and hence become ordinary messages.
|
||||
* The second case is when a service itself encounters an internal fault. Akka enforces that all actors are organized
|
||||
into a tree, an actor that creates another actor becomes the parent of that new actor. This is very similar
|
||||
how operating systems organize processes into a tree. Just like with processes, when an actor fails,
|
||||
its parent actor is notified and it can react to the failure. Also, if the parent actor is stopped,
|
||||
all of its children are recursively stopped, too. This service is called supervision and it is central to Akka.
|
||||
|
||||
TODO: ACTOR-TREE-SUPERVISION
|
||||
|
||||
A supervisor (parent) can decide to restart its child actors on certain types of failures, or stop them completely on
|
||||
others. Children never go silently dead (with the notable exception of entering an infinite loop) instead they are
|
||||
either failing and their parent can react to the fault, or they are stopped (in which case interested parties are
|
||||
automatically notified). There is always a responsible entity for managing an actor: its parent. Restarts are not
|
||||
visible from the outside: collaborating actors can keep continuing sending messages while the target actor restarts.
|
||||
|
||||
|
||||
|
||||
10
akka-docs-new/src/main/paradox/guide/index.md
Normal file
10
akka-docs-new/src/main/paradox/guide/index.md
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
# Akka Documentation
|
||||
|
||||
@@@ index
|
||||
|
||||
* [What is Akka?](introduction.md)
|
||||
* [What are Actors?](actors-intro.md)
|
||||
* [Akka Libraries and modules](modules.md)
|
||||
* [Your First Akka Application - Hello World](quickstart.md)
|
||||
|
||||
@@@
|
||||
49
akka-docs-new/src/main/paradox/guide/introduction.md
Normal file
49
akka-docs-new/src/main/paradox/guide/introduction.md
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
# Introduction to Akka
|
||||
|
||||
Welcome to Akka, a set of open-source libraries for designing scalable, resilient systems that
|
||||
span processor cores and networks. Akka allows you to focus on meeting business needs instead
|
||||
of writing low-level code to provide reliable behavior, fault tolerance, and high performance.
|
||||
|
||||
Common practices and programming models do not address important challenges inherent in designing systems
|
||||
for modern computer architectures. To be successful, distributed systems must cope in an environment where components
|
||||
crash without responding, messages get lost without a trace on the wire, and network latency fluctuates.
|
||||
These problems occur regularly in carefully managed intra-datacenter environments - even more so in virtualized
|
||||
architectures.
|
||||
|
||||
To deal with these realities, Akka provides
|
||||
|
||||
* Multi-threaded behavior without use of low-level concurrency constructs like
|
||||
atomics or locks. You do not even need to think about memory visibility issues.
|
||||
* Transparent remote communication between systems and their components. You do
|
||||
not need to write or maintain difficult networking code.
|
||||
* A clustered, high-availability architecture that can scale-out (or in) on demand.
|
||||
|
||||
All of these features are available through a uniform programming model: Akka exploits the actor model
|
||||
to provide a level of abstraction that makes it easier to write correct concurrent and parallel systems.
|
||||
The actor model spans the set of Akka libraries, providing you with a consistent way of understanding and using them.
|
||||
Thus, Akka offers a depth of integration that you cannot achieve by picking libraries to solve individual problems and
|
||||
trying to piece them together.
|
||||
|
||||
By learning Akka and its actor model, you will gain access to a vast and deep set of tools that solve difficult
|
||||
distributed/parallel systems problems in a uniform programming model where everything fits together tightly and
|
||||
efficiently.
|
||||
|
||||
## What is the actor model?
|
||||
|
||||
The characteristics of today's computing environments are vastly different from the ones in use when the programming
|
||||
models of yesterday were conceived. Actors were invented decades ago by Carl Hewitt (reference).
|
||||
But relatively recently, their applicability to the challenges of modern computing systems has been recognized and
|
||||
proved to be effective.
|
||||
|
||||
The actor model provides an abstraction that allows you to think about your code in terms of communication, not unlike
|
||||
people in a large organization. The basic characteristic of actors that they model the world as stateful entities
|
||||
communicating with each other by explicit message passing.
|
||||
|
||||
As computational entities, actors have these characteristics:
|
||||
|
||||
* They communicate with asynchronous messaging instead of method calls
|
||||
* They manage their own state
|
||||
* When responding to a message, they can:
|
||||
* Create other (child) actors
|
||||
* Send messages to other actors
|
||||
* Stop (child) actors or themselves
|
||||
165
akka-docs-new/src/main/paradox/guide/modules.md
Normal file
165
akka-docs-new/src/main/paradox/guide/modules.md
Normal file
|
|
@ -0,0 +1,165 @@
|
|||
# Akka Libraries and Modules
|
||||
|
||||
Before we delve further into writing our first actors, we should stop for a moment and look at the set of libraries
|
||||
that come out-of-the-box. This will help you identify which modules and libraries provide the functionality you
|
||||
want to use in your system.
|
||||
|
||||
### Actors (`akka-actor` library, the core)
|
||||
|
||||
The use of actors across Akka libraries provides a consistent, integrated model that relieves you from individually
|
||||
solving the challenges that arise in concurrent or distributed system design. From a birds-eye view,
|
||||
actors are a programming paradigm that takes encapsulation (one of the pillars of OOP) to its extreme.
|
||||
Unlike objects, actors encapsulate not only their
|
||||
state but their execution. Communication with actors is not via method calls but by passing messages. While this
|
||||
difference seems to be minor, this is actually what allows us to break clean from the limitations of OOP when it
|
||||
comes to concurrency and remote communication. Don’t worry if this description feels too high level to fully grasp
|
||||
yet, in the next chapter we will explain actors in detail. For now, the important point that this is a model that
|
||||
handles concurrency and distribution at the fundamental level instead of ad hoc patched attempts to bring these
|
||||
features to OOP.
|
||||
|
||||
Problems that actors solve include:
|
||||
|
||||
* How to build and design high-performance, concurrent applications?
|
||||
* How to handle errors in a multi-threaded environment?
|
||||
* How to protect my project from the pitfalls of concurrency?
|
||||
|
||||
### Remoting
|
||||
|
||||
Remoting enables actors that are remote from each other, living on different computers, to seamlessly exchange messages.
|
||||
While distributed as a JAR artifact, Remoting resembles a module more than it does a library. You enable it mostly
|
||||
with configuration, it has only a few APIs. Thanks to the actor model, remote and local message sends look exactly the
|
||||
same. The patterns that you use locally on multi-core systems translate directly to remote systems.
|
||||
You will rarely need to use Remoting directly, but it provides the foundation on which the Cluster subsystem is built.
|
||||
|
||||
Some of the problems Remoting solves are
|
||||
|
||||
* How to address actor systems living on remote hosts?
|
||||
* How to address individual actors on remote actor systems?
|
||||
* How to turn messages to bytes on the wire?
|
||||
* How to manage low-level, network connections (and reconnections) between hosts, detect crashed actor systems and hosts,
|
||||
all transparently?
|
||||
* How to multiplex communications from unrelated set of actors on the same network connection, all transparently?
|
||||
|
||||
### Cluster
|
||||
|
||||
If you have a set of actor systems that cooperate to solve some business problem, then you likely want to manage these set of
|
||||
systems in a disciplined way. While Remoting solves the problem of addressing and communicating with components of
|
||||
remote systems, Clustering gives you the ability to organize these into a "meta-system" tied together by a membership
|
||||
protocol. **In most of the cases, you want to use the Cluster module instead of using Remoting directly.**
|
||||
Cluster provides an additional set of services on top of Remoting that most of the real world applications need.
|
||||
|
||||
The problems the Cluster module solves (among others) are
|
||||
|
||||
* How to maintain a set of actor systems (a cluster) that can communicate with each other and consider each other as part of the cluster?
|
||||
* How to introduce a new system safely to the set of already existing members?
|
||||
* How to detect reliably systems that are temporarily unreachable?
|
||||
* How to remove failed hosts/systems (or scale down the system) so that all remaining members agree on the remaining subset of the cluster?
|
||||
* How to distribute computations among the current set of members?
|
||||
* How do I designate members of the cluster to a certain role, in other words to provide certain services and not others?
|
||||
|
||||
### Persistence
|
||||
|
||||
Just like objects in OOP actors keep their state in volatile memory. Once the system is shut down, gracefully or
|
||||
because of a crash, all data that was in memory is lost. Persistence provide patterns to enable actors to persist
|
||||
events that lead to their current state. Upon startup events can be replayed to restore the state of the entity hosted
|
||||
by the actor. The event stream can be queried and fed into additional processing pipelines (an external Big Data
|
||||
cluster for example) or alternate views (like reports).
|
||||
|
||||
Persistence tackles the following problems:
|
||||
|
||||
* How do I restore the state of an entity/actor when system restarts or crashes?
|
||||
* How do I implement a [CQRS system](https://msdn.microsoft.com/en-us/library/jj591573.aspx)?
|
||||
* How do I ensure reliable delivery of messages in face of network errors and system crashes?
|
||||
* How do I introspect domain events that has lead an entity to its current state?
|
||||
|
||||
### Cluster Singleton
|
||||
|
||||
A common (in fact, a bit too common) use case in distributed systems is to have a single entity responsible
|
||||
for a given task which is shared among other members of the cluster and migrated if the host system fails.
|
||||
While this undeniably introduces a common bottleneck for the whole cluster that limits scaling,
|
||||
there are scenarios where the use of this pattern is unavoidable. Cluster singleton allows a cluster to elect an
|
||||
actor system which will host a particular actor while other systems can always access said service independently from
|
||||
where it is.
|
||||
|
||||
The Singleton module can be used to solve these problems:
|
||||
|
||||
* How do I ensure that only one instance is running of a service in the whole cluster?
|
||||
* How do I ensure that the service is up even if the system hosting it currently crashes or shut down during the process of scaling down?
|
||||
* How do I reach this instance from any member of the cluster assuming that it can migrate to other systems over time?
|
||||
|
||||
### Cluster Sharding
|
||||
|
||||
Persistence solves the problem of restoring an actor’s state from persistent storage after system restart or crash.
|
||||
It does not solve itself the problem of distributing a set of such actors among members of an Akka cluster.
|
||||
Sharding is a pattern that mostly used together with Persistence to balance a large set of persistent entities
|
||||
(backed by actors) to members of a cluster and also migrate them to other systems if one of the members crash.
|
||||
|
||||
The problem space that Sharding targets:
|
||||
|
||||
* How do I model and scale out a large set of stateful entities on a set of systems?
|
||||
* How do I ensure that entities in the cluster are distributed properly so that load is properly balanced across the machines?
|
||||
* How do I ensure migrating entities from a crashed system without losing state?
|
||||
* How do I ensure that an entity does not exist on multiple systems at the same time and hence kept consistent?
|
||||
|
||||
### Cluster Publish-Subscribe
|
||||
|
||||
For coordination among systems it is often necessary to distribute messages to all, or one system of a set of
|
||||
interested systems in a cluster. This pattern is usually called publish-subscribe and this module solves this exact
|
||||
problem. It is possible to subscribe to topics and receive messages published to that topic and it is also possible
|
||||
to broadcast or anycast messages to subscribers of that topic.
|
||||
|
||||
* How do I broadcast messages to an interested set of parties in a cluster?
|
||||
* How do I anycast messages to a member from an interested set of parties in a cluster?
|
||||
* How to subscribe and unsubscribe for events of a certain topic in the cluster?
|
||||
|
||||
### Streams
|
||||
|
||||
Actors are a fundamental model for concurrency, but there are common patterns where their use requires the user
|
||||
to implement the same pattern over and over. Very common is the scenario where a chain (or graph) of actors need to
|
||||
process a potentially large (or infinite) stream of sequential events and properly coordinate resource usage so that
|
||||
faster processing stages don’t overwhelm slower ones in the chain (or graph). Streams provide a higher-level
|
||||
abstraction on top of actors that simplifies writing such processing networks, handling all the fine details in the
|
||||
background and providing a safe programming model. Streams is also an implementation
|
||||
of the [Reactive Streams standard](http://www.reactive-streams.org) which enables integration with all 3rd
|
||||
party implementations of that standard.
|
||||
|
||||
* How do I handle streams of events or large datasets with high performance, exploiting concurrency and keep resource usage tight?
|
||||
* How do I assemble reusable pieces of event/data processing into flexible pipelines?
|
||||
* How do I connect asynchronous services in a flexible way to each other, and have good performance?
|
||||
* How do I provide or consume Reactive Streams compliant interfaces to interface with a 3rd party library?
|
||||
|
||||
### HTTP
|
||||
|
||||
The de facto standard for providing APIs remotely (internal or external) is HTTP. Akka provides a library to provide or
|
||||
consume such HTTP services by giving a set of tools to create HTTP services (and serve them) and a client that can be
|
||||
used to consume other services. These tools are particularly suited to streaming in and out large set of data or real
|
||||
time events by leveraging the underlying model of Akka Streams.
|
||||
|
||||
* How do I expose services of my system or cluster of systems to the external world via an HTTP API in a performant way?
|
||||
* How do I stream large datasets in and out of a system using HTTP?
|
||||
* How do I stream live events in and out of a system using HTTP?
|
||||
|
||||
***
|
||||
|
||||
This is an incomplete list of all the available modules, but it gives a nice overview of the landscape of modules
|
||||
and the level of sophistication you can reach when you start building systems on top of Akka. All these modules
|
||||
integrate with together seamlessly. For example, take a large set of stateful business objects
|
||||
(a document, a shopping cart, etc) that is accessed by on-line users of your website. Model these as sharded
|
||||
entities using Sharding and Persistence to keep them balanced across a cluster that you can scale out on-demand
|
||||
(for example during an advertising campaign before holidays) and keep them available even if some systems crash.
|
||||
Take the real-time stream of domain events of your business objects with Persistence Query and use Streams to pipe
|
||||
it into a streaming BigData engine. Take the output of that engine as a Stream, manipulate it using Akka Streams
|
||||
operators and expose it as websocket connections served by a load balanced set of HTTP servers hosted by your cluster
|
||||
to power your real-time business analytics tool.
|
||||
|
||||
Got you interested?
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
113
akka-docs-new/src/main/paradox/guide/quickstart.md
Normal file
113
akka-docs-new/src/main/paradox/guide/quickstart.md
Normal file
|
|
@ -0,0 +1,113 @@
|
|||
# Your First Akka Application - Hello World
|
||||
|
||||
After all this introduction, we are ready to build our first actor system. We will do this in three chapters.
|
||||
In this first chapter we will help you to set up your project and tools and have a simple "Hello World" demo running.
|
||||
We will keep this to the bare minimum and then extend the sample application in the next chapter. Finally we review
|
||||
what we have learned in the third chapter, looking in detail how the pieces work and fit together.
|
||||
|
||||
> Our goal in this chapter is to set up a working environment for you, create an application that starts up and stops
|
||||
an ActorSystem and create an actor which we will test.
|
||||
|
||||
As the very first thing, we need to make sure that we can compile our project and have a working IDE setup to be
|
||||
able to edit code comfortably. Depending on your preference for build tool and IDE there are multiple paths you can
|
||||
follow here.
|
||||
|
||||
## Setting up the build
|
||||
|
||||
Depending on your build tool, you need to set up the layout for your project and tell the build tool about your
|
||||
dependencies (libraries that you want to use). There are common things to care for independently of your choice
|
||||
of build tool:
|
||||
|
||||
* Declare `akka-actor` as a dependency. This is the core library of Akka, the one we are now learning
|
||||
* Declare `akka-testkit` as a dependency. This is a toolkit for testing Akka applications. Without this
|
||||
dependency you will have a hard time testing actors.
|
||||
* **Use the latest Akka version for new projects (unless you have additional constraints)!**
|
||||
* **Don’t mix Akka versions! You are free to use any Akka version in your project, but you must use
|
||||
that version for all Akka core projects** In this sample it means that `akka-actor` and `akka-testkit` should
|
||||
always be of the same version.
|
||||
|
||||
### sbt
|
||||
|
||||
If you plan to use sbt, your first step is to set up the directory structure for the project. sbt follows the
|
||||
directory layout standard of Maven. Usually, you want to start with the following directories and files:
|
||||
|
||||
* `/src`
|
||||
* `/main` (this is where main, production classes live)
|
||||
* `/scala` (this is where Scala classes live)
|
||||
* `/java` (this is where Java classes live if you plan to use Java as well)
|
||||
* `/resources` (this is where non-source files live which are required on the classpath.
|
||||
Typical example is application.conf which contains the configuration for your application.
|
||||
We will cover this in detail in CONFIG-SECTION)
|
||||
* `/test`
|
||||
* `/scala` (this is where Scala test and test helper classes live)
|
||||
* `/java` (this is where Java test and test helper classes live if you plan to use Java as well)
|
||||
* `/resources` (this is where non-source files live which are required on the classpath to run tests.
|
||||
Typically this contains an application.conf that overrides the default configuration for tests. We will
|
||||
cover this in detail in CONFIG-SECTION)
|
||||
* `project`
|
||||
* `build.properties` ()
|
||||
* `build.sbt`
|
||||
|
||||
For example, if you have a Scala class `TestClass` in package `com.example.foo` then should go in
|
||||
`/src/main/scala/com/example/foo/TestClass.scala`.
|
||||
|
||||
The file `build.sbt` contains the necessary information for sbt about your project metadata and dependencies.
|
||||
For our sample project, this file should contain the following:
|
||||
|
||||
```scala
|
||||
// build.sbt
|
||||
|
||||
name := "intro-akka"
|
||||
organization := "intro-akka.organization"
|
||||
version := "0.1-SNAPSHOT"
|
||||
|
||||
scalaVersion := "2.11.8"
|
||||
val AkkaVersion = "2.4.12"
|
||||
|
||||
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % AkkaVersion
|
||||
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % AkkaVersion % "test"
|
||||
```
|
||||
|
||||
This simple file sets up first the project metadata (_name_ and _organization_; we just picked a sample one here).
|
||||
Then we set up the version of the scala compiler we use, then set a variable with the Akka version we intend to
|
||||
use (always try to use the latest).
|
||||
|
||||
As the last step, we add two dependencies, `akka-actor` and `akka-testkit`. Note that we used the `AkkaVersion`
|
||||
variable for both dependencies, ensuring that versions are not accidentally mixed and kept in sync when upgrading.
|
||||
|
||||
Finally, check that everything works by running `sbt update` from the base directory of your project
|
||||
(where the `build.sbt` file is).
|
||||
|
||||
## Setting up your IDE
|
||||
|
||||
If you go with IDEA, you usually have flexible means to import project either manually created by one of the
|
||||
previous steps from Setting up Your Build, or to let it create it for you. Depending on your build tool,
|
||||
there are detailed steps in the IDEA documentation
|
||||
|
||||
sbt
|
||||
: [https://www.jetbrains.com/help/idea/2016.2/getting-started-with-sbt.html](https://www.jetbrains.com/help/idea/2016.2/getting-started-with-sbt.html)
|
||||
|
||||
## Building the first application
|
||||
|
||||
Akka applications are simply Scala/Java applications, you don’t need to set up any container (application server, etc.)
|
||||
to have an actor system running. All we need is a class with a proper `main` method that stops and starts an actor
|
||||
system. The pieces of the puzzle that we need to put together are: What is an actor system and why do I need one?
|
||||
How can I start and stop it? Why do I need to stop it?
|
||||
|
||||
In Akka, actors belong to actor systems, which are instances of the type `ActorSystem`. This class acts as a
|
||||
resource container which holds among others
|
||||
|
||||
* configuration shared by all actors in that system
|
||||
* a pool of threads that will execute actors that are ready to process messages
|
||||
* a dispatcher mechanism that dynamically assigns actors to threads of the pool
|
||||
* a scheduler used for timer related tasks
|
||||
|
||||
The `ActorSystem` manages its actors and runs them in the background on its encapsulated thread pool.
|
||||
This means that is must be explicitly shut down, otherwise these threads keep running and the JVM does
|
||||
not exit (by default threads created by ActorSystem are not daemon threads; see the JDK documentation on
|
||||
more details on [daemon or non-daemon threads](https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html)).
|
||||
|
||||
This is the usual pattern to have your system set up and stopped on external signal
|
||||
(user pressing ENTER in the console):
|
||||
|
||||
|
||||
41
akka-docs-new/src/main/paradox/guide/tutorial_1.md
Normal file
41
akka-docs-new/src/main/paradox/guide/tutorial_1.md
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
supervision
|
||||
deathwatch
|
||||
testing
|
||||
conversation patterns
|
||||
timeouts
|
||||
|
||||
create app structure
|
||||
iot collectors
|
||||
dashboards
|
||||
analysis backend
|
||||
|
||||
STEP 1
|
||||
create entry point
|
||||
- just the usual stuff
|
||||
explain supervision
|
||||
- don't do too much top level actors
|
||||
create top level supervisor
|
||||
- lifecycle hooks
|
||||
- emptyBehavior
|
||||
create device actor
|
||||
create device group actor
|
||||
create device manager
|
||||
|
||||
STEP 2
|
||||
create dashboard manager
|
||||
create dashboard actor
|
||||
- resilience to device group actor failures
|
||||
- resilience to individual device actor failures
|
||||
|
||||
STEP 3
|
||||
create analytics manager
|
||||
set up aggregator
|
||||
set up pool of workers
|
||||
hook analytics up with dashboard (show global average)
|
||||
make it resilient to analytics failure
|
||||
|
||||
STEP 4
|
||||
set up configuration
|
||||
set up logging
|
||||
|
||||
|
||||
8
akka-docs-new/src/main/paradox/index.md
Normal file
8
akka-docs-new/src/main/paradox/index.md
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
# Akka Documentation
|
||||
|
||||
@@@ index
|
||||
|
||||
* [Beginners Guide](guide/index.md)
|
||||
* [Reference](reference/index.md)
|
||||
|
||||
@@@
|
||||
5
akka-docs-new/src/main/paradox/reference/index.md
Normal file
5
akka-docs-new/src/main/paradox/reference/index.md
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
# Reference
|
||||
|
||||
@@@ index
|
||||
|
||||
@@@
|
||||
44
akka-docs-new/src/test/scala/tutorial_3/Device.scala
Normal file
44
akka-docs-new/src/test/scala/tutorial_3/Device.scala
Normal file
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package tutorial_3
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, Props }
|
||||
import tutorial_3.Device.{ ReadTemperature, RecordTemperature, RespondTemperature, TemperatureRecorded }
|
||||
import tutorial_3.DeviceManager.{ DeviceRegistered, RequestTrackDevice }
|
||||
|
||||
object Device {
|
||||
|
||||
def props(groupId: String, deviceId: String): Props = Props(new Device(groupId, deviceId))
|
||||
|
||||
case class RecordTemperature(requestId: Long, value: Double)
|
||||
case class TemperatureRecorded(requestId: Long)
|
||||
|
||||
case class ReadTemperature(requestId: Long)
|
||||
case class RespondTemperature(requestId: Long, value: Option[Double])
|
||||
}
|
||||
|
||||
class Device(groupId: String, deviceId: String) extends Actor with ActorLogging {
|
||||
var lastTemperatureReading: Option[Double] = None
|
||||
|
||||
override def preStart(): Unit = log.info(s"Device actor $groupId-$deviceId started")
|
||||
|
||||
override def postStop(): Unit = log.info(s"Device actor $groupId-$deviceId stopped")
|
||||
|
||||
override def receive: Receive = {
|
||||
case RequestTrackDevice(`groupId`, `deviceId`) =>
|
||||
sender() ! DeviceRegistered
|
||||
|
||||
case RequestTrackDevice(groupId, deviceId) =>
|
||||
log.warning(s"Ignoring TrackDevice request for $groupId-$deviceId. " +
|
||||
s"This actor is responsible for ${this.groupId}-${this.deviceId}.")
|
||||
|
||||
case RecordTemperature(id, value) =>
|
||||
log.info(s"Recorded temperature reading $value with $id")
|
||||
lastTemperatureReading = Some(value)
|
||||
sender() ! TemperatureRecorded(id)
|
||||
|
||||
case ReadTemperature(id) =>
|
||||
sender() ! RespondTemperature(id, lastTemperatureReading)
|
||||
}
|
||||
}
|
||||
136
akka-docs-new/src/test/scala/tutorial_3/DeviceGroup.scala
Normal file
136
akka-docs-new/src/test/scala/tutorial_3/DeviceGroup.scala
Normal file
|
|
@ -0,0 +1,136 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package tutorial_3
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Cancellable, Props, Stash, Terminated }
|
||||
import tutorial_3.DeviceGroup._
|
||||
import tutorial_3.DeviceManager.RequestTrackDevice
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object DeviceGroup {
|
||||
|
||||
def props(groupId: String): Props = Props(new DeviceGroup(groupId))
|
||||
|
||||
case class RequestDeviceList(requestId: Long)
|
||||
case class ReplyDeviceList(requestId: Long, ids: Set[String])
|
||||
|
||||
case class RequestAllTemperatures(requestId: Long)
|
||||
case class RespondAllTemperatures(requestId: Long, temperatures: Map[String, Option[Double]])
|
||||
|
||||
case class CollectionTimeout(requestId: Long)
|
||||
}
|
||||
|
||||
class DeviceGroup(groupId: String) extends Actor with ActorLogging with Stash {
|
||||
var deviceIdToActor = Map.empty[String, ActorRef]
|
||||
var actorToDeviceId = Map.empty[ActorRef, String]
|
||||
var nextCollectionId = 0L
|
||||
|
||||
override def preStart(): Unit = log.info(s"DeviceGroup $groupId started")
|
||||
|
||||
override def postStop(): Unit = log.info(s"DeviceGroup $groupId stopped")
|
||||
|
||||
override def receive: Receive = waitingForRequest orElse generalManagement
|
||||
|
||||
def waitingForRequest: Receive = {
|
||||
case RequestAllTemperatures(requestId) =>
|
||||
import context.dispatcher
|
||||
|
||||
val collectionId = nextCollectionId
|
||||
val requester = sender()
|
||||
nextCollectionId += 1
|
||||
val answersSoFar = deviceIdToActor.mapValues(_ => None)
|
||||
context.children.foreach(_ ! Device.ReadTemperature(collectionId))
|
||||
val collectionTimeoutTimer = context.system.scheduler.scheduleOnce(3.seconds, self, CollectionTimeout(collectionId))
|
||||
|
||||
context.become(
|
||||
collectResults(
|
||||
collectionTimeoutTimer,
|
||||
collectionId,
|
||||
requester,
|
||||
requestId,
|
||||
answersSoFar.size,
|
||||
answersSoFar) orElse generalManagement,
|
||||
discardOld = false
|
||||
)
|
||||
}
|
||||
|
||||
def generalManagement: Receive = {
|
||||
// Note the backticks
|
||||
case trackMsg @ RequestTrackDevice(`groupId`, _) =>
|
||||
handleTrackMessage(trackMsg)
|
||||
|
||||
case RequestTrackDevice(groupId, deviceId) =>
|
||||
log.warning(s"Ignoring TrackDevice request for $groupId. This actor is responsible for ${this.groupId}.")
|
||||
|
||||
case RequestDeviceList(requestId) =>
|
||||
sender() ! ReplyDeviceList(requestId, deviceIdToActor.keySet)
|
||||
|
||||
case Terminated(deviceActor) =>
|
||||
removeDeviceActor(deviceActor)
|
||||
}
|
||||
|
||||
def handleTrackMessage(trackMsg: RequestTrackDevice): Unit = {
|
||||
deviceIdToActor.get(trackMsg.deviceId) match {
|
||||
case Some(ref) =>
|
||||
ref forward trackMsg
|
||||
case None =>
|
||||
log.info(s"Creating device actor for ${trackMsg.deviceId}")
|
||||
val deviceActor = context.actorOf(Device.props(groupId, trackMsg.deviceId), "device-" + trackMsg.deviceId)
|
||||
context.watch(deviceActor)
|
||||
deviceActor forward trackMsg
|
||||
deviceIdToActor += trackMsg.deviceId -> deviceActor
|
||||
actorToDeviceId += deviceActor -> trackMsg.deviceId
|
||||
}
|
||||
}
|
||||
|
||||
def removeDeviceActor(deviceActor: ActorRef): Unit = {
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
log.info(s"Device actor for $deviceId has been terminated")
|
||||
actorToDeviceId -= deviceActor
|
||||
deviceIdToActor -= deviceId
|
||||
}
|
||||
|
||||
def collectResults(
|
||||
timer: Cancellable,
|
||||
expectedId: Long,
|
||||
requester: ActorRef,
|
||||
requestId: Long,
|
||||
waiting: Int,
|
||||
answersSoFar: Map[String, Option[Double]]
|
||||
): Receive = {
|
||||
|
||||
case Device.RespondTemperature(`expectedId`, temperatureOption) =>
|
||||
val deviceActor = sender()
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
val newAnswers = answersSoFar + (deviceId -> temperatureOption)
|
||||
|
||||
if (waiting == 1) {
|
||||
requester ! RespondAllTemperatures(requestId, newAnswers)
|
||||
finishCollection(timer)
|
||||
} else {
|
||||
context.become(collectResults(timer, expectedId, requester, requestId, waiting - 1, newAnswers))
|
||||
}
|
||||
|
||||
case Terminated(deviceActor) =>
|
||||
val deviceId = actorToDeviceId(deviceActor)
|
||||
removeDeviceActor(deviceActor)
|
||||
val newAnswers = answersSoFar + (deviceId -> None)
|
||||
|
||||
if (waiting == 1) {
|
||||
requester ! RespondAllTemperatures(requestId, newAnswers)
|
||||
finishCollection(timer: Cancellable)
|
||||
} else {
|
||||
context.become(collectResults(timer, expectedId, requester, requestId, waiting - 1, newAnswers))
|
||||
}
|
||||
|
||||
case CollectionTimeout(`expectedId`) =>
|
||||
requester ! RespondAllTemperatures(requestId, answersSoFar)
|
||||
}
|
||||
|
||||
def finishCollection(timer: Cancellable): Unit = {
|
||||
context.unbecome()
|
||||
timer.cancel()
|
||||
}
|
||||
|
||||
}
|
||||
132
akka-docs-new/src/test/scala/tutorial_3/DeviceGroupSpec.scala
Normal file
132
akka-docs-new/src/test/scala/tutorial_3/DeviceGroupSpec.scala
Normal file
|
|
@ -0,0 +1,132 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_3
|
||||
|
||||
import akka.actor.PoisonPill
|
||||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class DeviceGroupSpec extends AkkaSpec {
|
||||
|
||||
"DeviceGroup actor" must {
|
||||
|
||||
"be able to register a device actor" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
|
||||
// Check that the device actors are working
|
||||
probe.send(deviceActor1, Device.RecordTemperature(requestId = 0, 1.0))
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
|
||||
probe.send(deviceActor2, Device.RecordTemperature(requestId = 1, 2.0))
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
}
|
||||
|
||||
"ignore requests for wrong groupId" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("wrongGroup", "device1"))
|
||||
probe.expectNoMsg(500.milliseconds)
|
||||
}
|
||||
|
||||
"return same actor for same deviceId" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
|
||||
deviceActor1 should ===(deviceActor2)
|
||||
}
|
||||
|
||||
"be able to list active devices" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
probe.send(groupActor, DeviceGroup.RequestDeviceList(requestId = 0))
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
|
||||
}
|
||||
|
||||
"be able to list active devices after one shuts down" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val toShutDown = probe.lastSender
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
|
||||
probe.send(groupActor, DeviceGroup.RequestDeviceList(requestId = 0))
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device1", "device2")))
|
||||
|
||||
probe.watch(toShutDown)
|
||||
toShutDown ! PoisonPill
|
||||
probe.expectTerminated(toShutDown)
|
||||
|
||||
probe.send(groupActor, DeviceGroup.RequestDeviceList(requestId = 0))
|
||||
probe.expectMsg(DeviceGroup.ReplyDeviceList(requestId = 0, Set("device2")))
|
||||
}
|
||||
|
||||
"be able to collect temperatures from all active devices" in {
|
||||
val probe = TestProbe()
|
||||
val groupActor = system.actorOf(DeviceGroup.props("group"))
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device1"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor1 = probe.lastSender
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device2"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor2 = probe.lastSender
|
||||
|
||||
probe.send(groupActor, DeviceManager.RequestTrackDevice("group", "device3"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
val deviceActor3 = probe.lastSender
|
||||
|
||||
// Check that the device actors are working
|
||||
probe.send(deviceActor1, Device.RecordTemperature(requestId = 0, 1.0))
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 0))
|
||||
probe.send(deviceActor2, Device.RecordTemperature(requestId = 1, 2.0))
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
// No temperature for device3
|
||||
|
||||
probe.send(groupActor, DeviceGroup.RequestAllTemperatures(requestId = 0))
|
||||
probe.expectMsg(
|
||||
DeviceGroup.RespondAllTemperatures(
|
||||
requestId = 0,
|
||||
temperatures = Map(
|
||||
"device1" -> Some(1.0),
|
||||
"device2" -> Some(2.0),
|
||||
"device3" -> None
|
||||
)
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
47
akka-docs-new/src/test/scala/tutorial_3/DeviceManager.scala
Normal file
47
akka-docs-new/src/test/scala/tutorial_3/DeviceManager.scala
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package tutorial_3
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
|
||||
import tutorial_3.DeviceManager.RequestTrackDevice
|
||||
|
||||
object DeviceManager {
|
||||
def props(): Props = Props(new DeviceManager)
|
||||
|
||||
case class RequestTrackDevice(groupId: String, deviceId: String)
|
||||
case object DeviceRegistered
|
||||
}
|
||||
|
||||
class DeviceManager extends Actor with ActorLogging {
|
||||
var groupIdToActor = Map.empty[String, ActorRef]
|
||||
var actorToGroupId = Map.empty[ActorRef, String]
|
||||
|
||||
override def preStart(): Unit = log.info("DeviceManager started")
|
||||
|
||||
override def postStop(): Unit = log.info("DeviceManager stopped")
|
||||
|
||||
override def receive = {
|
||||
case trackMsg @ RequestTrackDevice(groupId, _) =>
|
||||
groupIdToActor.get(groupId) match {
|
||||
case Some(ref) =>
|
||||
ref forward trackMsg
|
||||
case None =>
|
||||
log.info(s"Creating device group actor for $groupId")
|
||||
val groupActor = context.actorOf(DeviceGroup.props(groupId), "group-" + groupId)
|
||||
context.watch(groupActor)
|
||||
groupActor forward trackMsg
|
||||
groupIdToActor += groupId -> groupActor
|
||||
actorToGroupId += groupActor -> groupId
|
||||
}
|
||||
|
||||
case Terminated(groupActor) =>
|
||||
val groupId = actorToGroupId(groupActor)
|
||||
log.info(s"Device group actor for $groupId has been terminated")
|
||||
actorToGroupId -= groupActor
|
||||
groupIdToActor -= groupId
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
67
akka-docs-new/src/test/scala/tutorial_3/DeviceSpec.scala
Normal file
67
akka-docs-new/src/test/scala/tutorial_3/DeviceSpec.scala
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package tutorial_3
|
||||
|
||||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class DeviceSpec extends AkkaSpec {
|
||||
|
||||
"Device actor" must {
|
||||
|
||||
"reply to registration requests" in {
|
||||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
probe.send(deviceActor, DeviceManager.RequestTrackDevice("group", "device"))
|
||||
probe.expectMsg(DeviceManager.DeviceRegistered)
|
||||
probe.lastSender should ===(deviceActor)
|
||||
}
|
||||
|
||||
"ignore wrong registration requests" in {
|
||||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
probe.send(deviceActor, DeviceManager.RequestTrackDevice("wrongGroup", "device"))
|
||||
probe.expectNoMsg(500.milliseconds)
|
||||
|
||||
probe.send(deviceActor, DeviceManager.RequestTrackDevice("group", "wrongDevice"))
|
||||
probe.expectNoMsg(500.milliseconds)
|
||||
}
|
||||
|
||||
"reply with empty reading if no temperature is known" in {
|
||||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
probe.send(deviceActor, Device.ReadTemperature(requestId = 42))
|
||||
val response = probe.expectMsgType[Device.RespondTemperature]
|
||||
response.requestId should ===(42)
|
||||
response.value should ===(None)
|
||||
}
|
||||
|
||||
"reply with latest temperature reading" in {
|
||||
val probe = TestProbe()
|
||||
val deviceActor = system.actorOf(Device.props("group", "device"))
|
||||
|
||||
probe.send(deviceActor, Device.RecordTemperature(requestId = 1, 24.0))
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 1))
|
||||
|
||||
probe.send(deviceActor, Device.ReadTemperature(requestId = 2))
|
||||
val response1 = probe.expectMsgType[Device.RespondTemperature]
|
||||
response1.requestId should ===(2)
|
||||
response1.value should ===(Some(24.0))
|
||||
|
||||
probe.send(deviceActor, Device.RecordTemperature(requestId = 3, 55.0))
|
||||
probe.expectMsg(Device.TemperatureRecorded(requestId = 3))
|
||||
|
||||
probe.send(deviceActor, Device.ReadTemperature(requestId = 4))
|
||||
val response2 = probe.expectMsgType[Device.RespondTemperature]
|
||||
response2.requestId should ===(4)
|
||||
response2.value should ===(Some(55.0))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
29
akka-docs-new/src/test/scala/tutorial_3/IotApp.scala
Normal file
29
akka-docs-new/src/test/scala/tutorial_3/IotApp.scala
Normal file
|
|
@ -0,0 +1,29 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package tutorial_3
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import tutorial_3.DeviceManager.RequestTrackDevice
|
||||
|
||||
import scala.io.StdIn
|
||||
|
||||
object IotApp {
|
||||
|
||||
def main(args: Array[String]): Unit = {
|
||||
val system = ActorSystem("iot-system")
|
||||
|
||||
try {
|
||||
// Create top level supervisor
|
||||
val supervisor = system.actorOf(DeviceManager.props(), "iot-supervisor")
|
||||
|
||||
supervisor ! RequestTrackDevice("mygroup", "device1")
|
||||
|
||||
// Exit the system after ENTER is pressed
|
||||
StdIn.readLine()
|
||||
} finally {
|
||||
system.terminate()
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
24
akka-docs-new/src/test/scala/tutorial_3/IotSupervisor.scala
Normal file
24
akka-docs-new/src/test/scala/tutorial_3/IotSupervisor.scala
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package tutorial_3
|
||||
|
||||
import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
|
||||
|
||||
object IotSupervisor {
|
||||
|
||||
def props(): Props = Props(new IotSupervisor)
|
||||
|
||||
}
|
||||
|
||||
class IotSupervisor extends Actor with ActorLogging {
|
||||
val deviceManager: ActorRef = context.system.actorOf(DeviceManager.props(), "device-manager")
|
||||
|
||||
override def preStart(): Unit = log.info("IoT Application started")
|
||||
|
||||
override def postStop(): Unit = log.info("IoT Application stopped")
|
||||
|
||||
// No need to handle any messages
|
||||
override def receive = Actor.emptyBehavior
|
||||
|
||||
}
|
||||
|
|
@ -265,6 +265,19 @@ object AkkaBuild extends Build {
|
|||
)
|
||||
)
|
||||
|
||||
lazy val newDocs = Project(
|
||||
id = "akka-docs-new",
|
||||
base = file("akka-docs-new"),
|
||||
dependencies = Seq(
|
||||
actor,
|
||||
testkit % "compile;test->test",
|
||||
remote % "compile;test->test", cluster, clusterMetrics, slf4j, agent, camel, osgi,
|
||||
persistence % "compile;provided->provided;test->test", persistenceTck, persistenceQuery,
|
||||
typed % "compile;test->test", distributedData,
|
||||
stream, streamTestkit % "compile;test->test"
|
||||
)
|
||||
)
|
||||
|
||||
lazy val contrib = Project(
|
||||
id = "akka-contrib",
|
||||
base = file("akka-contrib"),
|
||||
|
|
|
|||
|
|
@ -37,3 +37,5 @@ libraryDependencies += "org.kohsuke" % "github-api" % "1.68"
|
|||
addSbtPlugin("io.spray" % "sbt-boilerplate" % "0.6.0")
|
||||
|
||||
addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.1.8")
|
||||
|
||||
addSbtPlugin("com.lightbend.paradox" % "sbt-paradox" % "0.2.9")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue