Merge branch 'master' of github.com:jboner/akka
Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
commit
09a219bcd1
4 changed files with 256 additions and 184 deletions
|
|
@ -54,6 +54,8 @@ private[akka] object ActorCell {
|
||||||
val contextStack = new ThreadLocal[Stack[ActorContext]] {
|
val contextStack = new ThreadLocal[Stack[ActorContext]] {
|
||||||
override def initialValue = Stack[ActorContext]()
|
override def initialValue = Stack[ActorContext]()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
val emptyChildren = TreeMap[ActorRef, ChildRestartStats]()
|
||||||
}
|
}
|
||||||
|
|
||||||
//vars don't need volatile since it's protected with the mailbox status
|
//vars don't need volatile since it's protected with the mailbox status
|
||||||
|
|
@ -74,9 +76,9 @@ private[akka] class ActorCell(
|
||||||
|
|
||||||
def provider = app.provider
|
def provider = app.provider
|
||||||
|
|
||||||
var futureTimeout: Option[ScheduledFuture[AnyRef]] = None //FIXME TODO Doesn't need to be volatile either, since it will only ever be accessed when a message is processed
|
var futureTimeout: Option[ScheduledFuture[AnyRef]] = None
|
||||||
|
|
||||||
var _children = TreeMap[ActorRef, ChildRestartStats]()
|
var _children = emptyChildren //Reuse same empty instance to avoid allocating new instance of the Ordering and the actual empty instance for every actor
|
||||||
|
|
||||||
var currentMessage: Envelope = null
|
var currentMessage: Envelope = null
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,28 +1,28 @@
|
||||||
|
|
||||||
.. _cluster:
|
.. _cluster:
|
||||||
|
|
||||||
################
|
#########
|
||||||
New Clustering
|
Cluster
|
||||||
################
|
#########
|
||||||
|
|
||||||
|
|
||||||
Intro
|
Intro
|
||||||
=====
|
=====
|
||||||
|
|
||||||
Akka Cluster provides a fault-tolerant, elastic, decentralized
|
Akka Cluster provides a fault-tolerant, elastic, decentralized peer-to-peer
|
||||||
peer-to-peer cluster with no single point of failure (SPOF) or single
|
cluster with no single point of failure (SPOF) or single point of bottleneck
|
||||||
point of bottleneck (SPOB). It implemented as a Dynamo-style system
|
(SPOB). It implemented as a Dynamo-style system using gossip protocols,
|
||||||
using gossip protocols, automatic failure detection, automatic
|
automatic failure detection, automatic partitioning, handoff, and cluster
|
||||||
partitioning, handoff and cluster rebalancing. But with some
|
rebalancing. But with some differences due to the fact that it is not just
|
||||||
differences due to the fact that it is not just managing passive data,
|
managing passive data, but actors, e.g. active, sometimes stateful, components
|
||||||
but actors, e.g. active, sometimes stateful, components that have
|
that have requirements on message ordering, the number of active instances in
|
||||||
requirements on message ordering, the number of active instances in
|
the cluster, etc.
|
||||||
the cluster etc.
|
|
||||||
|
|
||||||
Terms
|
Terms
|
||||||
=====
|
=====
|
||||||
|
|
||||||
These terms are used throughout the documentation.
|
These terms are used throughout the documentation.
|
||||||
|
|
||||||
**node**
|
**node**
|
||||||
A logical member of a cluster. There could be multiple nodes on a physical
|
A logical member of a cluster. There could be multiple nodes on a physical
|
||||||
|
|
@ -36,139 +36,111 @@ These terms are used throughout the documentation.
|
||||||
is distributed within the cluster.
|
is distributed within the cluster.
|
||||||
|
|
||||||
**partition path**
|
**partition path**
|
||||||
Also referred to as the actor address on the format `actor1/actor2/actor3`
|
Also referred to as the actor address. Has the format `actor1/actor2/actor3`
|
||||||
|
|
||||||
**base node**
|
**base node**
|
||||||
The first node (with nodes in sorted order) that contains a partition.
|
The first node (with nodes in sorted order) that contains a particular partition.
|
||||||
|
|
||||||
**instance count**
|
**instance count**
|
||||||
The number of instances of a partition in the cluster. Also referred to as the
|
The number of instances of a partition in the cluster. Also referred to as the
|
||||||
``N-value`` of the partition.
|
``N-value`` of the partition.
|
||||||
|
|
||||||
**partition table**
|
**partition table**
|
||||||
A mapping from partition path to base node and its ``N-value``
|
A mapping from partition path to base node and its ``N-value`` (i.e. its
|
||||||
(e.g. its instance count).
|
instance count).
|
||||||
|
|
||||||
Cluster
|
|
||||||
=======
|
Membership
|
||||||
|
==========
|
||||||
|
|
||||||
A cluster is made up of a set of member nodes. The identifier for each node is a
|
A cluster is made up of a set of member nodes. The identifier for each node is a
|
||||||
`hostname:port` pair. An Akka application is distributed over a cluster with each node
|
`hostname:port` pair. An Akka application is distributed over a cluster with
|
||||||
hosting some part of the application. Cluster membership and partitioning of the
|
each node hosting some part of the application. Cluster membership and
|
||||||
application are decoupled. A node could be a member of a cluster without hosting
|
partitioning of the application are decoupled. A node could be a member of a
|
||||||
any actors.
|
cluster without hosting any actors.
|
||||||
|
|
||||||
|
|
||||||
Gossip
|
Gossip
|
||||||
------
|
------
|
||||||
|
|
||||||
The cluster membership used in Akka is based on Amazon's `Dynamo`_
|
The cluster membership used in Akka is based on Amazon's `Dynamo`_ system and
|
||||||
system and particularly the approach taken Basho's' `Riak`_
|
particularly the approach taken Basho's' `Riak`_ distributed database. Cluster
|
||||||
distributed database. Cluster membership is communicated using a
|
membership is communicated using a `Gossip Protocol`_, where the current state
|
||||||
`Gossip Protocol`_. The current state of the cluster is gossiped
|
of the cluster is gossiped randomly through the cluster. Joining a cluster is
|
||||||
randomly through the cluster. Joining a cluster is initiated by
|
initiated by specifying a set of ``seed`` nodes with which to begin gossiping.
|
||||||
specifying a set of ``seed`` nodes with which to begin gossiping.
|
|
||||||
|
|
||||||
The gossip protocol maintains the list of live and dead
|
|
||||||
nodes. Periodically, default is every 1 second, this module chooses a
|
|
||||||
random node and initiates a round of Gossip with it. Whenever it gets
|
|
||||||
gossip updates it updates the `Failure Detector`_ with the liveness
|
|
||||||
information.
|
|
||||||
|
|
||||||
The nodes defined as ``seed`` nodes are just regular member nodes whos
|
|
||||||
only additional role is to function as contact points in the cluster
|
|
||||||
and to help breaking logical partitions (as seen in the gossip
|
|
||||||
algorithm defined below). A cluster can and should have multiple
|
|
||||||
``seed`` nodes. Seed nodes are *not* a single point of failure since the
|
|
||||||
cluster can continue to function just as fine without them.
|
|
||||||
|
|
||||||
During each of these runs the node initiates gossip exchange according
|
|
||||||
to following rules:
|
|
||||||
|
|
||||||
1. Gossip to random ``live`` membership node (if any).
|
|
||||||
2. Gossip to random ``unreachable`` node with certain probability
|
|
||||||
depending on number of unreachable and live nodes (if any).
|
|
||||||
3. If the node gossiped to at (1) was not a ``seed`` node, or the
|
|
||||||
number of live nodes is less than number of seeds, then gossip to
|
|
||||||
random ``seed`` node with a certain probability depending on number
|
|
||||||
of ``unreachable``, ``seed`` and ``live`` nodes.
|
|
||||||
|
|
||||||
All gossip is done over standard TCP and do not require multicast and
|
|
||||||
therefore works fine in virtualized environments such as Amazon EC2.
|
|
||||||
|
|
||||||
TODO: More details about our version of push-pull-gossip.
|
|
||||||
|
|
||||||
.. _Gossip Protocol: http://en.wikipedia.org/wiki/Gossip_protocol
|
.. _Gossip Protocol: http://en.wikipedia.org/wiki/Gossip_protocol
|
||||||
.. _Dynamo: http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf
|
.. _Dynamo: http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf
|
||||||
.. _Riak: http://basho.com/technology/architecture/
|
.. _Riak: http://basho.com/technology/architecture/
|
||||||
|
|
||||||
Vector Clocks
|
|
||||||
-------------
|
|
||||||
|
|
||||||
`Vector clocks`_ are an algorithm for generating a partial ordering of
|
Vector Clocks
|
||||||
events in a distributed system and detecting causality violations.
|
^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
`Vector clocks`_ are an algorithm for generating a partial ordering of events in
|
||||||
|
a distributed system and detecting causality violations.
|
||||||
|
|
||||||
We use vector clocks to to reconcile and merge differences in cluster state
|
We use vector clocks to to reconcile and merge differences in cluster state
|
||||||
during gossiping. A vector clock is a set of (node, counter) pairs. Each update
|
during gossiping. A vector clock is a set of (node, counter) pairs. Each update
|
||||||
to the cluster state has an accompanying update to the vector clock.
|
to the cluster state has an accompanying update to the vector clock.
|
||||||
|
|
||||||
One problem with vector clocks is that their history can over time be
|
One problem with vector clocks is that their history can over time be very long,
|
||||||
very long, which will both make comparisons take longer time as well
|
which will both make comparisons take longer time as well as take up unnecessary
|
||||||
as take up unnecessary memory. To solve that problem we do pruning of
|
memory. To solve that problem we do pruning of the vector clocks according to
|
||||||
the vector clocks according to the `pruning algorithm`_ in Riak.
|
the `pruning algorithm`_ in Riak.
|
||||||
|
|
||||||
.. _Vector Clocks: http://en.wikipedia.org/wiki/Vector_clock
|
.. _Vector Clocks: http://en.wikipedia.org/wiki/Vector_clock
|
||||||
.. _pruning algorithm: http://wiki.basho.com/Vector-Clocks.html#Vector-Clock-Pruning
|
.. _pruning algorithm: http://wiki.basho.com/Vector-Clocks.html#Vector-Clock-Pruning
|
||||||
|
|
||||||
Gossip convergence
|
Gossip convergence
|
||||||
------------------
|
^^^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
Information about the cluster converges at certain points of time. This is when
|
Information about the cluster converges at certain points of time. This is when
|
||||||
all nodes have seen the same cluster state. To be able to recognise this
|
all nodes have seen the same cluster state. Convergence is recognised by passing
|
||||||
convergence a map from node to current vector clock is also passed as part of
|
a map from node to current state version during gossip. This information is
|
||||||
the gossip state. Gossip convergence cannot occur while any nodes are
|
referred to as the gossip overview. When all versions in the overview are equal
|
||||||
|
there is convergence. Gossip convergence cannot occur while any nodes are
|
||||||
unreachable, either the nodes become reachable again, or the nodes need to be
|
unreachable, either the nodes become reachable again, or the nodes need to be
|
||||||
moved into the ``down`` or ``removed`` states (see section on `Member
|
moved into the ``down`` or ``removed`` states (see section on `Member
|
||||||
states`_ below).
|
states`_ below).
|
||||||
|
|
||||||
|
|
||||||
Failure Detector
|
Failure Detector
|
||||||
-----------------
|
^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
The failure detector is responsible for trying to detect if a node is
|
The failure detector is responsible for trying to detect if a node is
|
||||||
unreachable from the rest of the cluster. For this we are using an
|
unreachable from the rest of the cluster. For this we are using an
|
||||||
implementation of `The Phi Accrual Failure Detector`_ by Hayashibara et al.
|
implementation of `The Phi Accrual Failure Detector`_ by Hayashibara et al.
|
||||||
|
|
||||||
An accrual failure detector decouple monitoring and
|
An accrual failure detector decouple monitoring and interpretation. That makes
|
||||||
interpretation. That makes them applicable to a wider area of
|
them applicable to a wider area ofQ scenarios and more adequate to build generic
|
||||||
scenarios and more adequate to build generic failure detection
|
failure detection services. The idea is that it is keeping a history of failure
|
||||||
services. The idea is that it is keeping a history of failure
|
statistics, calculated from heartbeats received from the gossip protocol, and is
|
||||||
statistics, calculated from heartbeats recevied from the gossip
|
trying to do educated guesses by taking multiple factors, and how they
|
||||||
protocol, and is trying to do educated guesses by taking multiple
|
accumulate over time, into account in order to come up with a better guess if a
|
||||||
factors, and how they accumulate over time, into account in order to
|
specific node is up or down. Rather than just answering "yes" or "no" to the
|
||||||
come up with a better guess if a specific node is up or down. Rather
|
question "is the node down?" it returns a ``phi`` value representing the
|
||||||
than just answering "yes" or "no" to the question "is the node down?"
|
likelihood that the node is down.
|
||||||
it returns a ``phi`` value representing the likelyhood that the node
|
|
||||||
is down.
|
|
||||||
|
|
||||||
The ``threshold`` that is the basis for the calculation is
|
The ``threshold`` that is the basis for the calculation is configurable by the
|
||||||
configurable by the user. A low ``threshold`` is prone to generate
|
user. A low ``threshold`` is prone to generate many wrong suspicions but ensures
|
||||||
many wrong suspicions but ensures a quick detection in the event of a
|
a quick detection in the event of a real crash. Conversely, a high ``threshold``
|
||||||
real crash. Conversely, a high ``threshold`` generates fewer mistakes
|
generates fewer mistakes but needs more time to detect actual crashes. The
|
||||||
but needs more time to detect actual crashes. The default
|
default ``threshold`` is 8 and is appropriate for most situations. However in
|
||||||
``threshold`` is 8 and is appropriate for most situations. However in
|
cloud environments, such as Amazon EC2, the value could be increased to 12 in
|
||||||
cloud environments, such as Amazon EC2, the value could be increased
|
order to account for network issues that sometimes occur on such platforms.
|
||||||
to 12 in order to account for network issues that sometimes occur on
|
|
||||||
such platforms.
|
|
||||||
|
|
||||||
.. _The Phi Accrual Failure Detector: http://ddg.jaist.ac.jp/pub/HDY+04.pdf
|
.. _The Phi Accrual Failure Detector: http://ddg.jaist.ac.jp/pub/HDY+04.pdf
|
||||||
|
|
||||||
Leader
|
Leader
|
||||||
------
|
^^^^^^
|
||||||
|
|
||||||
After gossip convergence a leader for the cluster can be determined. There is no
|
After gossip convergence a leader for the cluster can be determined. There is no
|
||||||
leader election process, the leader can always be recognised deterministically
|
leader election process, the leader can always be recognised deterministically
|
||||||
by any node whenever there is gossip convergence. The leader is simply the first
|
by any node whenever there is gossip convergence. The leader is simply the first
|
||||||
node in sorted order that is able to take the leadership role, where the only
|
node in sorted order that is able to take the leadership role, where the only
|
||||||
allowed member states for a leader are ``up`` or ``leaving``.
|
allowed member states for a leader are ``up`` or ``leaving`` (see below for more
|
||||||
|
information about member states).
|
||||||
|
|
||||||
The role of the leader is to shift members in and out of the cluster, changing
|
The role of the leader is to shift members in and out of the cluster, changing
|
||||||
``joining`` members to the ``up`` state or ``exiting`` members to the
|
``joining`` members to the ``up`` state or ``exiting`` members to the
|
||||||
|
|
@ -178,11 +150,102 @@ convergence but it may also be possible for the user to explicitly rebalance the
|
||||||
cluster by specifying migrations, or to rebalance the cluster automatically
|
cluster by specifying migrations, or to rebalance the cluster automatically
|
||||||
based on metrics gossiped by the member nodes.
|
based on metrics gossiped by the member nodes.
|
||||||
|
|
||||||
The leader also has the power, if configured so, to "auto-down" a node
|
The leader also has the power, if configured so, to "auto-down" a node that
|
||||||
that according the Failure Detector is considured unreachable. This
|
according to the Failure Detector is considered unreachable. This means setting the
|
||||||
means setting the unreachable node status to ``down`` automatically.
|
unreachable node status to ``down`` automatically.
|
||||||
|
|
||||||
Membership Lifecycle
|
|
||||||
|
Gossip protocol
|
||||||
|
^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
A variation of *push-pull gossip* is used to reduce the amount of gossip
|
||||||
|
information sent around the cluster. In push-pull gossip a digest is sent
|
||||||
|
representing current versions but not actual values; the recipient of the gossip
|
||||||
|
can then send back any values for which it has newer versions and also request
|
||||||
|
values for which it has outdated versions. Akka uses a single shared state with
|
||||||
|
a vector clock for versioning, so the variant of push-pull gossip used in Akka
|
||||||
|
makes use of the gossip overview (containing the current state versions for all
|
||||||
|
nodes) to only push the actual state as needed. This also allows any node to
|
||||||
|
easily determine which other nodes have newer or older information, not just the
|
||||||
|
nodes involved in a gossip exchange.
|
||||||
|
|
||||||
|
Periodically, the default is every 1 second, each node chooses another random
|
||||||
|
node to initiate a round of gossip with. The choice of node is random but can
|
||||||
|
also include extra gossiping for unreachable nodes, seed nodes, and nodes with
|
||||||
|
either newer or older state versions.
|
||||||
|
|
||||||
|
The gossip overview contains the current state version for all nodes and also a
|
||||||
|
list of unreachable nodes. Whenever a node receives a gossip overview it updates
|
||||||
|
the `Failure Detector`_ with the liveness information.
|
||||||
|
|
||||||
|
The nodes defined as ``seed`` nodes are just regular member nodes whose only
|
||||||
|
"special role" is to function as contact points in the cluster and to help
|
||||||
|
breaking logical partitions as seen in the gossip algorithm defined below.
|
||||||
|
|
||||||
|
During each round of gossip exchange the following process is used:
|
||||||
|
|
||||||
|
1. Gossip to random live node (if any)
|
||||||
|
|
||||||
|
2. Gossip to random unreachable node with certain probability depending on the
|
||||||
|
number of unreachable and live nodes
|
||||||
|
|
||||||
|
3. If the node gossiped to at (1) was not a ``seed`` node, or the number of live
|
||||||
|
nodes is less than number of seeds, gossip to random ``seed`` node with
|
||||||
|
certain probability depending on number of unreachable, seed, and live nodes.
|
||||||
|
|
||||||
|
4. Gossip to random node with newer or older state information, based on the
|
||||||
|
current gossip overview, with some probability (?)
|
||||||
|
|
||||||
|
The gossiper only sends the gossip overview to the chosen node. The recipient of
|
||||||
|
the gossip can use the gossip overview to determine whether:
|
||||||
|
|
||||||
|
1. it has a newer version of the gossip state, in which case it sends that back
|
||||||
|
to the gossiper, or
|
||||||
|
|
||||||
|
2. it has an outdated version of the state, in which case the recipient requests
|
||||||
|
the current state from the gossiper
|
||||||
|
|
||||||
|
If the recipient and the gossip have the same version then the gossip state is
|
||||||
|
not sent or requested.
|
||||||
|
|
||||||
|
The main structures used in gossiping are the gossip overview and the gossip
|
||||||
|
state::
|
||||||
|
|
||||||
|
GossipOverview {
|
||||||
|
versions: Map[Node, VectorClock],
|
||||||
|
unreachable: Set[Node]
|
||||||
|
}
|
||||||
|
|
||||||
|
GossipState {
|
||||||
|
version: VectorClock,
|
||||||
|
members: SortedSet[Member],
|
||||||
|
partitions: Tree[PartitionPath, Node],
|
||||||
|
pending: Set[PartitionChange],
|
||||||
|
meta: Option[Map[String, Array[Byte]]]
|
||||||
|
}
|
||||||
|
|
||||||
|
Some of the other structures used are::
|
||||||
|
|
||||||
|
Node = InetSocketAddress
|
||||||
|
|
||||||
|
Member {
|
||||||
|
node: Node,
|
||||||
|
state: MemberState
|
||||||
|
}
|
||||||
|
|
||||||
|
MemberState = Joining | Up | Leaving | Exiting | Down | Removed
|
||||||
|
|
||||||
|
PartitionChange {
|
||||||
|
from: Node,
|
||||||
|
to: Node,
|
||||||
|
path: PartitionPath,
|
||||||
|
status: PartitionChangeStatus
|
||||||
|
}
|
||||||
|
|
||||||
|
PartitionChangeStatus = Awaiting | Complete
|
||||||
|
|
||||||
|
|
||||||
|
Membership lifecycle
|
||||||
--------------------
|
--------------------
|
||||||
|
|
||||||
A node begins in the ``joining`` state. Once all nodes have seen that the new
|
A node begins in the ``joining`` state. Once all nodes have seen that the new
|
||||||
|
|
@ -260,7 +323,7 @@ User actions
|
||||||
Leader actions
|
Leader actions
|
||||||
^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^
|
||||||
|
|
||||||
The leader have the following duties:
|
The leader has the following duties:
|
||||||
|
|
||||||
- shifting members in and out of the cluster
|
- shifting members in and out of the cluster
|
||||||
|
|
||||||
|
|
@ -277,21 +340,20 @@ The leader have the following duties:
|
||||||
- Automatic rebalancing based on runtime metrics in the
|
- Automatic rebalancing based on runtime metrics in the
|
||||||
system (such as CPU, RAM, Garbage Collection, mailbox depth etc.)
|
system (such as CPU, RAM, Garbage Collection, mailbox depth etc.)
|
||||||
|
|
||||||
|
|
||||||
Partitioning
|
Partitioning
|
||||||
============
|
============
|
||||||
|
|
||||||
Each partition (an actor or actor subtree) in the actor system is
|
Each partition (an actor or actor subtree) in the actor system is assigned to a
|
||||||
assigned to a base node. The mapping from partition path (actor
|
base node. The mapping from partition path (actor address on the format "a/b/c")
|
||||||
address on the format "a/b/c") to base node is stored in the partition
|
to base node is stored in the partition table and is maintained as part of the
|
||||||
table and is maintained as part of the cluster state through the
|
cluster state through the gossip protocol. The partition table is only updated
|
||||||
gossip protocol. The partition table is only updated by the leader
|
by the leader node. If the partition has a configured instance count, referred
|
||||||
node. If the partition has a configured instance count, referred to as
|
to as the ``N-value``, greater than one, then the location of the other
|
||||||
the ``N-value``, greater than one, then the location of the other
|
instances can be found deterministically by counting from the base node. (The
|
||||||
instances can be found deterministically by counting from the base
|
``N-value`` is larger than 1 when a actor is configured to be routed.) The first
|
||||||
node. (The ``N-value`` is larger than 1 when a actor is configured to
|
instance will be found on the base node, and the other instances on the next N-1
|
||||||
be routed.) The first instance will be found on the base node, and the
|
nodes, given the nodes in sorted order.
|
||||||
other instances on the next N-1 nodes, given the nodes in sorted
|
|
||||||
order.
|
|
||||||
|
|
||||||
TODO: discuss how different N values within the tree work (especially subtrees
|
TODO: discuss how different N values within the tree work (especially subtrees
|
||||||
with a greater or lesser N value). A simple implementation would only allow the
|
with a greater or lesser N value). A simple implementation would only allow the
|
||||||
|
|
@ -311,11 +373,11 @@ Handoff
|
||||||
|
|
||||||
Handoff for an actor-based system is different than for a data-based system. The
|
Handoff for an actor-based system is different than for a data-based system. The
|
||||||
most important point is that message ordering (from a given node to a given
|
most important point is that message ordering (from a given node to a given
|
||||||
actor instance) may need to be maintained. If an actor is a singleton
|
actor instance) may need to be maintained. If an actor is a singleton actor
|
||||||
actor (only one instance possible throughout the cluster) then the
|
(only one instance possible throughout the cluster) then the cluster may also
|
||||||
cluster may also need to assure that there is only one such actor active at any
|
need to assure that there is only one such actor active at any one time. Both of
|
||||||
one time. Both of these situations can be handled by forwarding and buffering
|
these situations can be handled by forwarding and buffering messages during
|
||||||
messages during transitions.
|
transitions.
|
||||||
|
|
||||||
A *graceful handoff* (one where the previous host node is up and running during
|
A *graceful handoff* (one where the previous host node is up and running during
|
||||||
the handoff), given a previous host node ``N1``, a new host node ``N2``, and an
|
the handoff), given a previous host node ``N1``, a new host node ``N2``, and an
|
||||||
|
|
@ -352,7 +414,7 @@ transition*.
|
||||||
|
|
||||||
The first question is; during the migration transition, should:
|
The first question is; during the migration transition, should:
|
||||||
|
|
||||||
- ``N1`` continue to process messages for ``A``?
|
- ``N1`` continue to process messages for ``A``?
|
||||||
|
|
||||||
- Or is it important that no messages for ``A`` are processed on
|
- Or is it important that no messages for ``A`` are processed on
|
||||||
``N1`` once migration begins?
|
``N1`` once migration begins?
|
||||||
|
|
@ -369,10 +431,10 @@ terminating the actor and allowing the normal dead letter process to be used.
|
||||||
Update transition
|
Update transition
|
||||||
~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~
|
||||||
|
|
||||||
The second transition begins when the migration is marked as complete
|
The second transition begins when the migration is marked as complete and ends
|
||||||
and ends when all nodes have the updated partition table (when all
|
when all nodes have the updated partition table (when all nodes will use ``N2``
|
||||||
nodes will use ``N2`` as the host for ``A``), e.g. we have
|
as the host for ``A``), e.g. we have convergence, and is referred to as
|
||||||
convergence, and is referred to as the *update transition*.
|
the *update transition*.
|
||||||
|
|
||||||
Once the update transition begins ``N1`` can forward any messages it receives
|
Once the update transition begins ``N1`` can forward any messages it receives
|
||||||
for ``A`` to the new host ``N2``. The question is whether or not message
|
for ``A`` to the new host ``N2``. The question is whether or not message
|
||||||
|
|
@ -381,25 +443,23 @@ ordering needs to be preserved. If messages sent to the previous host node
|
||||||
could be forwarded after a direct message to the new host ``N2``, breaking
|
could be forwarded after a direct message to the new host ``N2``, breaking
|
||||||
message ordering from a client to actor ``A``.
|
message ordering from a client to actor ``A``.
|
||||||
|
|
||||||
In this situation ``N2`` can keep a buffer for messages per sending
|
In this situation ``N2`` can keep a buffer for messages per sending node. Each
|
||||||
node. Each buffer is flushed and removed when an acknowledgement
|
buffer is flushed and removed when an acknowledgement (``ack``) message has been
|
||||||
(``ack``) message has been received. When each node in the cluster
|
received. When each node in the cluster sees the partition update it first sends
|
||||||
sees the partition update it first sends an ``ack`` message to the
|
an ``ack`` message to the previous host node ``N1`` before beginning to use
|
||||||
previous host node ``N1`` before beginning to use ``N2`` as the new
|
``N2`` as the new host for ``A``. Any messages sent from the client node
|
||||||
host for ``A``. Any messages sent from the client node directly to
|
directly to ``N2`` will be buffered. ``N1`` can count down the number of acks to
|
||||||
``N2`` will be buffered. ``N1`` can count down the number of acks to
|
determine when no more forwarding is needed. The ``ack`` message from any node
|
||||||
determine when no more forwarding is needed. The ``ack`` message from
|
will always follow any other messages sent to ``N1``. When ``N1`` receives the
|
||||||
any node will always follow any other messages sent to ``N1``. When
|
``ack`` message it also forwards it to ``N2`` and again this ``ack`` message
|
||||||
``N1`` receives the ``ack`` message it also forwards it to ``N2`` and
|
will follow any other messages already forwarded for ``A``. When ``N2`` receives
|
||||||
again this ``ack`` message will follow any other messages already
|
an ``ack`` message, the buffer for the sending node can be flushed and removed.
|
||||||
forwarded for ``A``. When ``N2`` receives an ``ack`` message, the
|
Any subsequent messages from this sending node can be queued normally. Once all
|
||||||
buffer for the sending node can be flushed and removed. Any subsequent
|
nodes in the cluster have acknowledged the partition change and ``N2`` has
|
||||||
messages from this sending node can be queued normally. Once all nodes
|
cleared all buffers, the handoff is complete and message ordering has been
|
||||||
in the cluster have acknowledged the partition change and ``N2`` has
|
preserved. In practice the buffers should remain small as it is only those
|
||||||
cleared all buffers, the handoff is complete and message ordering has
|
messages sent directly to ``N2`` before the acknowledgement has been forwarded
|
||||||
been preserved. In practice the buffers should remain small as it is
|
that will be buffered.
|
||||||
only those messages sent directly to ``N2`` before the acknowledgement
|
|
||||||
has been forwarded that will be buffered.
|
|
||||||
|
|
||||||
|
|
||||||
Graceful handoff
|
Graceful handoff
|
||||||
|
|
@ -457,18 +517,18 @@ The default approach is to take options 2a, 3a, and 4a - allowing ``A`` on
|
||||||
messages during the update transition. This assumes stateless actors that do not
|
messages during the update transition. This assumes stateless actors that do not
|
||||||
have a dependency on message ordering from any given source.
|
have a dependency on message ordering from any given source.
|
||||||
|
|
||||||
- If an actor has a distributed durable mailbox then nothing needs to
|
- If an actor has a distributed durable mailbox then nothing needs to be done,
|
||||||
be done, other than migrating the actor.
|
other than migrating the actor.
|
||||||
|
|
||||||
- If message ordering needs to be maintained during the update
|
- If message ordering needs to be maintained during the update transition then
|
||||||
transition then option 3b can be used, creating buffers per sending node.
|
option 3b can be used, creating buffers per sending node.
|
||||||
|
|
||||||
- If the actors are robust to message send failures then the dropping
|
- If the actors are robust to message send failures then the dropping messages
|
||||||
messages approach can be used (with no forwarding or buffering needed).
|
approach can be used (with no forwarding or buffering needed).
|
||||||
|
|
||||||
- If an actor is a singleton (only one instance possible throughout
|
- If an actor is a singleton (only one instance possible throughout the cluster)
|
||||||
the cluster) and state is transfered during the migration
|
and state is transfered during the migration initialization, then options 2b
|
||||||
initialization, then options 2b and 3b would be required.
|
and 3b would be required.
|
||||||
|
|
||||||
Support for stateful singleton actor will come in future releases of
|
Support for stateful singleton actors will come in future releases of Akka, most
|
||||||
Akka, most likely Akka 2.2.
|
likely Akka 2.2.
|
||||||
|
|
@ -258,20 +258,20 @@ Configuring transactions with an **explicit** ``TransactionFactory``:
|
||||||
|
|
||||||
The following settings are possible on a TransactionFactory:
|
The following settings are possible on a TransactionFactory:
|
||||||
|
|
||||||
- familyName - Family name for transactions. Useful for debugging.
|
- ``familyName`` - Family name for transactions. Useful for debugging.
|
||||||
- readonly - Sets transaction as readonly. Readonly transactions are cheaper.
|
- ``readonly`` - Sets transaction as readonly. Readonly transactions are cheaper.
|
||||||
- maxRetries - The maximum number of times a transaction will retry.
|
- ``maxRetries`` - The maximum number of times a transaction will retry.
|
||||||
- timeout - The maximum time a transaction will block for.
|
- ``timeout`` - The maximum time a transaction will block for.
|
||||||
- trackReads - Whether all reads should be tracked. Needed for blocking operations.
|
- ``trackReads`` - Whether all reads should be tracked. Needed for blocking operations.
|
||||||
- writeSkew - Whether writeskew is allowed. Disable with care.
|
- ``writeSkew`` - Whether writeskew is allowed. Disable with care.
|
||||||
- blockingAllowed - Whether explicit retries are allowed.
|
- ``blockingAllowed`` - Whether explicit retries are allowed.
|
||||||
- interruptible - Whether a blocking transaction can be interrupted.
|
- ``interruptible`` - Whether a blocking transaction can be interrupted.
|
||||||
- speculative - Whether speculative configuration should be enabled.
|
- ``speculative`` - Whether speculative configuration should be enabled.
|
||||||
- quickRelease - Whether locks should be released as quickly as possible (before whole commit).
|
- ``quickRelease`` - Whether locks should be released as quickly as possible (before whole commit).
|
||||||
- propagation - For controlling how nested transactions behave.
|
- ``propagation`` - For controlling how nested transactions behave.
|
||||||
- traceLevel - Transaction trace level.
|
- ``traceLevel`` - Transaction trace level.
|
||||||
|
|
||||||
You can also specify the default values for some of these options in akka.conf. Here they are with their default values:
|
You can also specify the default values for some of these options in ``akka.conf``. Here they are with their default values:
|
||||||
|
|
||||||
::
|
::
|
||||||
|
|
||||||
|
|
@ -461,12 +461,12 @@ Transactional datastructures
|
||||||
|
|
||||||
Akka provides two datastructures that are managed by the STM.
|
Akka provides two datastructures that are managed by the STM.
|
||||||
|
|
||||||
- TransactionalMap
|
- ``TransactionalMap``
|
||||||
- TransactionalVector
|
- ``TransactionalVector``
|
||||||
|
|
||||||
TransactionalMap and TransactionalVector look like regular mutable datastructures, they even implement the standard Scala 'Map' and 'RandomAccessSeq' interfaces, but they are implemented using persistent datastructures and managed references under the hood. Therefore they are safe to use in a concurrent environment. Underlying TransactionalMap is HashMap, an immutable Map but with near constant time access and modification operations. Similarly TransactionalVector uses a persistent Vector. See the Persistent Datastructures section below for more details.
|
``TransactionalMap`` and ``TransactionalVector`` look like regular mutable datastructures, they even implement the standard Scala 'Map' and 'RandomAccessSeq' interfaces, but they are implemented using persistent datastructures and managed references under the hood. Therefore they are safe to use in a concurrent environment. Underlying TransactionalMap is HashMap, an immutable Map but with near constant time access and modification operations. Similarly ``TransactionalVector`` uses a persistent Vector. See the Persistent Datastructures section below for more details.
|
||||||
|
|
||||||
Like managed references, TransactionalMap and TransactionalVector can only be modified inside the scope of an STM transaction.
|
Like managed references, ``TransactionalMap`` and ``TransactionalVector`` can only be modified inside the scope of an STM transaction.
|
||||||
|
|
||||||
*IMPORTANT*: There have been some problems reported when using transactional datastructures with 'lazy' initialization. Avoid that.
|
*IMPORTANT*: There have been some problems reported when using transactional datastructures with 'lazy' initialization. Avoid that.
|
||||||
|
|
||||||
|
|
@ -488,9 +488,9 @@ Here is how you create these transactional datastructures:
|
||||||
val map = TransactionalMap[String, User]
|
val map = TransactionalMap[String, User]
|
||||||
val vector = TransactionalVector[Address]
|
val vector = TransactionalVector[Address]
|
||||||
|
|
||||||
TransactionalMap and TransactionalVector wrap persistent datastructures with transactional references and provide a standard Scala interface. This makes them convenient to use.
|
``TransactionalMap`` and ``TransactionalVector`` wrap persistent datastructures with transactional references and provide a standard Scala interface. This makes them convenient to use.
|
||||||
|
|
||||||
Here is an example of using a Ref and a HashMap directly:
|
Here is an example of using a ``Ref`` and a ``HashMap`` directly:
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
|
|
@ -512,7 +512,7 @@ Here is an example of using a Ref and a HashMap directly:
|
||||||
}
|
}
|
||||||
// -> User("bill")
|
// -> User("bill")
|
||||||
|
|
||||||
Here is the same example using TransactionalMap:
|
Here is the same example using ``TransactionalMap``:
|
||||||
|
|
||||||
.. code-block:: scala
|
.. code-block:: scala
|
||||||
|
|
||||||
|
|
@ -536,8 +536,9 @@ Persistent datastructures
|
||||||
-------------------------
|
-------------------------
|
||||||
|
|
||||||
Akka's STM should only be used with immutable data. This can be costly if you have large datastructures and are using a naive copy-on-write. In order to make working with immutable datastructures fast enough Scala provides what are called Persistent Datastructures. There are currently two different ones:
|
Akka's STM should only be used with immutable data. This can be costly if you have large datastructures and are using a naive copy-on-write. In order to make working with immutable datastructures fast enough Scala provides what are called Persistent Datastructures. There are currently two different ones:
|
||||||
* HashMap (`scaladoc <http://www.scala-lang.org/api/current/scala/collection/immutable/HashMap.html>`__)
|
|
||||||
* Vector (`scaladoc <http://www.scala-lang.org/api/current/scala/collection/immutable/Vector.html>`__)
|
* ``HashMap`` (`scaladoc <http://www.scala-lang.org/api/current/scala/collection/immutable/HashMap.html>`__)
|
||||||
|
* ``Vector`` (`scaladoc <http://www.scala-lang.org/api/current/scala/collection/immutable/Vector.html>`__)
|
||||||
|
|
||||||
They are immutable and each update creates a completely new version but they are using clever structural sharing in order to make them almost as fast, for both read and update, as regular mutable datastructures.
|
They are immutable and each update creates a completely new version but they are using clever structural sharing in order to make them almost as fast, for both read and update, as regular mutable datastructures.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -109,7 +109,10 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
|
||||||
|
|
||||||
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this, actor)
|
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this, actor)
|
||||||
|
|
||||||
private def getMailbox(actor: ActorCell) = actor.mailbox.asInstanceOf[CallingThreadMailbox]
|
private def getMailbox(actor: ActorCell): Option[CallingThreadMailbox] = actor.mailbox match {
|
||||||
|
case m: CallingThreadMailbox ⇒ Some(m)
|
||||||
|
case _ ⇒ None
|
||||||
|
}
|
||||||
|
|
||||||
protected[akka] override def start() {}
|
protected[akka] override def start() {}
|
||||||
|
|
||||||
|
|
@ -122,11 +125,13 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
|
||||||
protected[akka] override def timeoutMs = 100L
|
protected[akka] override def timeoutMs = 100L
|
||||||
|
|
||||||
override def suspend(actor: ActorCell) {
|
override def suspend(actor: ActorCell) {
|
||||||
getMailbox(actor).suspendSwitch.switchOn
|
getMailbox(actor) foreach (_.suspendSwitch.switchOn)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def resume(actor: ActorCell) {
|
override def resume(actor: ActorCell) {
|
||||||
val mbox = getMailbox(actor)
|
val mboxopt = getMailbox(actor)
|
||||||
|
if (mboxopt.isEmpty) return
|
||||||
|
val mbox = mboxopt.get
|
||||||
val queue = mbox.queue
|
val queue = mbox.queue
|
||||||
val wasActive = queue.isActive
|
val wasActive = queue.isActive
|
||||||
val switched = mbox.suspendSwitch.switchOff {
|
val switched = mbox.suspendSwitch.switchOff {
|
||||||
|
|
@ -137,12 +142,14 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def mailboxSize(actor: ActorCell) = getMailbox(actor).queue.size
|
override def mailboxSize(actor: ActorCell) = getMailbox(actor) map (_.queue.size) getOrElse 0
|
||||||
|
|
||||||
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor).queue.isEmpty
|
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor) map (_.queue.isEmpty) getOrElse true
|
||||||
|
|
||||||
protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) {
|
protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) {
|
||||||
val mbox = getMailbox(receiver)
|
val mboxopt = getMailbox(receiver)
|
||||||
|
if (mboxopt.isEmpty) return
|
||||||
|
val mbox = mboxopt.get
|
||||||
mbox.systemEnqueue(message)
|
mbox.systemEnqueue(message)
|
||||||
val queue = mbox.queue
|
val queue = mbox.queue
|
||||||
if (!queue.isActive) {
|
if (!queue.isActive) {
|
||||||
|
|
@ -152,7 +159,9 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling
|
||||||
}
|
}
|
||||||
|
|
||||||
protected[akka] override def dispatch(receiver: ActorCell, handle: Envelope) {
|
protected[akka] override def dispatch(receiver: ActorCell, handle: Envelope) {
|
||||||
val mbox = getMailbox(receiver)
|
val mboxopt = getMailbox(receiver)
|
||||||
|
if (mboxopt.isEmpty) return
|
||||||
|
val mbox = mboxopt.get
|
||||||
val queue = mbox.queue
|
val queue = mbox.queue
|
||||||
val execute = mbox.suspendSwitch.fold {
|
val execute = mbox.suspendSwitch.fold {
|
||||||
queue.push(handle)
|
queue.push(handle)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue