=clu #3750 Update cluster specification

* Based on Jonas' notes: https://gist.github.com/jboner/7692270
* Also removed "Future Extension" descriptions of cluster actor reference,
  since that is probably not the direction we are taking
This commit is contained in:
Patrik Nordwall 2014-02-04 15:51:08 +01:00
parent eb8a3b2c3e
commit 4fb22dec68
4 changed files with 88 additions and 79 deletions

View file

@ -46,7 +46,8 @@ A cluster is made up of a set of member nodes. The identifier for each node is a
``hostname:port:uid`` tuple. An Akka application can be distributed over a cluster with
each node hosting some part of the application. Cluster membership and partitioning
:ref:`[*] <niy>` of the application are decoupled. A node could be a member of a
cluster without hosting any actors.
cluster without hosting any actors. Joining a cluster is initiated
by issuing a ``Join`` command to one of the nodes in the cluster to join.
The node identifier internally also contains a UID that uniquely identifies this
actor system instance at that ``hostname:port``. Akka uses the UID to be able to
@ -56,6 +57,12 @@ system with the same ``hostname:port`` to a cluster you have to stop the actor s
and start a new one with the same ``hotname:port`` which will then receive a different
UID.
The cluster membership state is a specialized `CRDT`_, which means that it has a monotonic
merge function. When concurrent changes occur on different nodes the updates can always be
merged and converge to the same end result.
.. _CRDT: http://hal.upmc.fr/docs/00/55/55/88/PDF/techreport.pdf
Gossip
------
@ -63,8 +70,7 @@ The cluster membership used in Akka is based on Amazon's `Dynamo`_ system and
particularly the approach taken in Basho's' `Riak`_ distributed database.
Cluster membership is communicated using a `Gossip Protocol`_, where the current
state of the cluster is gossiped randomly through the cluster, with preference to
members that have not seen the latest version. Joining a cluster is initiated
by issuing a ``Join`` command to one of the nodes in the cluster to join.
members that have not seen the latest version.
.. _Gossip Protocol: http://en.wikipedia.org/wiki/Gossip_protocol
.. _Dynamo: http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf
@ -89,12 +95,19 @@ Gossip Convergence
Information about the cluster converges locally at a node at certain points in time.
This is when a node can prove that the cluster state he is observing has been observed
by all other nodes in the cluster. Convergence is implemented by passing a map from
node to current state version during gossip. This information is referred to as the
gossip overview. When all versions in the overview are equal there is convergence.
by all other nodes in the cluster. Convergence is implemented by passing a set of nodes
that have seen current state version during gossip. This information is referred to as the
seen set in the gossip overview. When all nodes are included in the seen set there is
convergence.
Gossip convergence cannot occur while any nodes are ``unreachable``. The nodes need
to become ``reachable`` again, or moved to the ``down`` and ``removed`` states
(see the `Membership Lifecycle`_ section below).
(see the `Membership Lifecycle`_ section below). This only blocks the leader
from performing its cluster membership management and does not influence the application
running on top of the cluster. For example this means that during a network partition
it is not possible to add more nodes to the cluster. The nodes can join, but they
will not be moved to the ``up`` state until the partition has healed or the unreachable
nodes have been downed.
Failure Detector
@ -127,6 +140,13 @@ any of these detects the node as ``unreachable`` that information will spread to
the rest of the cluster through the gossip. In other words, only one node needs to
mark a node ``unreachable`` to have the rest of the cluster mark that node ``unreachable``.
The nodes to monitor are picked out of neighbors in a hashed ordered node ring.
This is to increase the likelihood to monitor across racks and data centers, but the order
is the same on all nodes, which ensures full coverage.
Heartbeats are sent out every second and every heartbeat is performed in a request/reply
handshake with the replies used as input to the failure detector.
The failure detector will also detect if the node becomes ``reachable`` again. When
all nodes that monitored the ``unreachable`` node detects it as ``reachable`` again
the cluster, after gossip dissemination, will consider it as ``reachable``.
@ -146,10 +166,11 @@ Leader
After gossip convergence a ``leader`` for the cluster can be determined. There is no
``leader`` election process, the ``leader`` can always be recognised deterministically
by any node whenever there is gossip convergence. The ``leader`` is simply the first
node in sorted order that is able to take the leadership role, where the preferred
member states for a ``leader`` are ``up`` and ``leaving`` (see the `Membership Lifecycle`_
section below for more information about member states).
by any node whenever there is gossip convergence. The leader is just a role, any node
can be the leader and it can change between convergence rounds.
The ``leader`` is simply the first node in sorted order that is able to take the leadership role,
where the preferred member states for a ``leader`` are ``up`` and ``leaving``
(see the `Membership Lifecycle`_ section below for more information about member states).
The role of the ``leader`` is to shift members in and out of the cluster, changing
``joining`` members to the ``up`` state or ``exiting`` members to the ``removed``
@ -165,12 +186,14 @@ of unreachability.
Seed Nodes
^^^^^^^^^^
The seed nodes are configured contact points for initial join of the cluster.
When a new node is started it sends a message to all seed nodes and then sends
The seed nodes are configured contact points for new nodes joining the cluster.
When a new node is started it sends a message to all seed nodes and then sends
a join command to the seed node that answers first.
It is possible to not use seed nodes and instead join any node in the cluster
manually.
The seed nodes configuration value does not have any influence on the running
cluster itself, it is only relevant for new nodes joining the cluster as it
helps them to find contact points to send the join command to; a new member
can send this command to any current member of the cluster, not only to the seed nodes.
Gossip Protocol
@ -185,33 +208,53 @@ a vector clock for versioning, so the variant of push-pull gossip used in Akka
makes use of this version to only push the actual state as needed.
Periodically, the default is every 1 second, each node chooses another random
node to initiate a round of gossip with. The choice of node is random but can
also include extra gossiping nodes with either newer or older state versions.
node to initiate a round of gossip with. If less than ½ of the nodes resides in the
seen set (have seen the new state) then the cluster gossips 3 times instead of once
every second. This adjusted gossip interval is a way to speed up the convergence process
in the early dissemination phase after a state change.
The gossip overview contains the current state version for all nodes and also a
list of unreachable nodes. This allows any node to easily determine which other
nodes have newer or older information, not just the nodes involved in a gossip
exchange.
The choice of node to gossip with is random but it is biased to towards nodes that
might not have seen the current state version. During each round of gossip exchange when
no convergence it uses a probability of 0.8 (configurable) to gossip to a node not
part of the seen set, i.e. that probably has an older version of the state. Otherwise
gossip to any random live node.
The nodes defined as ``seed`` nodes are just regular member nodes whose only
"special role" is to function as contact points in the cluster.
This biased selection is a way to speed up the convergence process in the late dissemination
phase after a state change.
During each round of gossip exchange it sends Gossip to random node with
newer or older state information, if any, based on the current gossip overview,
with some probability. Otherwise Gossip to any random live node.
For clusters larger than 400 nodes (configurable, and suggested by empirical evidence)
the 0.8 probability is gradually reduced to avoid overwhelming single stragglers with
too many concurrent gossip requests. The gossip receiver also has a mechanism to
protect itself from too many simultaneous gossip messages by dropping messages that
have been enqueued in the mailbox for too long time.
The gossiper only sends the gossip version to the chosen node. The recipient of
the gossip can use the gossip version to determine whether:
While the cluster is in a converged state the gossiper only sends a small gossip status message containing the gossip
version to the chosen node. As soon as there is a change to the cluster (meaning non-convergence)
then it goes back to biased gossip again.
1. it has a newer version of the gossip state, in which case it sends that back
to the gossiper, or
The recipient of the gossip state or the gossip status can use the gossip version
(vector clock) to determine whether:
2. it has an outdated version of the state, in which case the recipient requests
the current state from the gossiper
#. it has a newer version of the gossip state, in which case it sends that back
to the gossiper
#. it has an outdated version of the state, in which case the recipient requests
the current state from the gossiper by sending back its version of the gossip state
#. it has conflicting gossip versions, in which case the different versions are merged
and sent back
If the recipient and the gossip have the same version then the gossip state is
not sent or requested.
The periodic nature of the gossip has a nice batching effect of state changes,
e.g. joining several nodes quickly after each other to one node will result in only
one state change to be spread to other members in the cluster.
The gossip messages are serialized with `protobuf`_ and also gzipped to reduce payload
size.
.. _protobuf: https://code.google.com/p/protobuf/
Membership Lifecycle
--------------------
@ -232,7 +275,7 @@ become a part of the cluster). To be able to move forward the state of the
``unreachable`` nodes must be changed. It must become ``reachable`` again or marked
as ``down``. If the node is to join the cluster again the actor system must be
restarted and go through the joining process again. The cluster can, through the
leader, also *auto-down* a node.
leader, also *auto-down* a node after a configured time of unreachability..
.. note:: If you have *auto-down* enabled and the failure detector triggers, you
can over time end up with a lot of single node clusters if you don't put

View file

@ -68,11 +68,6 @@ depending on the configuration of the actor system:
facilities is a fake actor reference which accepts log events and prints
them directly to standard output; it is :class:`Logging.StandardOutLogger`.
- **(Future Extension)** Cluster actor references represent clustered actor
services which may be replicated, migrated or load-balanced across multiple
cluster nodes. As such they are virtual names which the cluster service
translates into local or remote actor references as appropriate.
What is an Actor Path?
----------------------
@ -120,7 +115,6 @@ actors in the hierarchy from the root up. Examples are::
"akka://my-sys/user/service-a/worker1" // purely local
"akka.tcp://my-sys@host.example.com:5678/user/service-b" // remote
"cluster://my-cluster/service-c" // clustered (Future Extension)
Here, ``akka.tcp`` is the default remote transport for the 2.2 release; other transports
are pluggable. A remote host using UDP would be accessible by using ``akka.udp``.
@ -154,17 +148,6 @@ systems or JVMs. This means that the logical path (supervision hierarchy) and
the physical path (actor deployment) of an actor may diverge if one of its
ancestors is remotely supervised.
Virtual Actor Paths **(Future Extension)**
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
In order to be able to replicate and migrate actors across a cluster of Akka
nodes, another level of indirection has to be introduced. The cluster component
therefore provides a translation from virtual paths to physical paths which may
change in reaction to node failures, cluster rebalancing, etc.
*This area is still under active development, expect updates in this section
for the Akka release code named Rollins .*
How are Actor References obtained?
----------------------------------
@ -173,12 +156,6 @@ creating actors or by looking them up, where the latter functionality comes in
the two flavours of creating actor references from concrete actor paths and
querying the logical actor hierarchy.
*While local and remote actor references and their paths work in the same way
concerning the facilities mentioned below, the exact semantics of clustered
actor references and their paths—while certainly as similar as possible—may
differ in certain aspects, owing to the virtual nature of those paths. Expect
updates for the Akka release code named Rollins.*
Creating Actors
^^^^^^^^^^^^^^^
@ -342,21 +319,6 @@ when sending to an unresolved actor reference.
.. image:: RemoteDeployment.png
The Interplay with Clustering **(Future Extension)**
----------------------------------------------------
*This section is subject to change!*
When creating a scaled-out actor subtree, a cluster name is created for a
routed actor reference, where sending to this reference will send to one (or
more) of the actual actors created in the cluster. In order for those actors to
be able to query other actors while processing their messages, their sender
reference must be unique for each of the replicas, which means that physical
paths will be used as ``self`` references for these instances. In the case
of replication for achieving fault-tolerance the opposite is required: the
``self`` reference will be a virtual (cluster) path so that in case of
migration or fail-over communication is resumed with the fresh instance.
What is the Address part used for?
----------------------------------

View file

@ -238,7 +238,8 @@ Note that the ``TransformationFrontend`` actor watch the registered backend
to be able to remove it from its list of available backend workers.
Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects
network failures and JVM crashes, in addition to graceful termination of watched
actor.
actor. Death watch generates the ``Terminated`` message to the watching actor when the
unreachable cluster node has been downed and removed.
The `Typesafe Activator <http://typesafe.com/platform/getstarted>`_ tutorial named
`Akka Cluster Samples with Java <http://typesafe.com/activator/template/akka-sample-cluster-java>`_.
@ -377,9 +378,10 @@ This is how the curve looks like for ``acceptable-heartbeat-pause`` configured t
.. image:: ../images/phi3.png
Death watch uses the cluster failure detector for nodes in the cluster, i.e. it
generates ``Terminated`` message from network failures and JVM crashes, in addition
to graceful termination of watched actor.
Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects
network failures and JVM crashes, in addition to graceful termination of watched
actor. Death watch generates the ``Terminated`` message to the watching actor when the
unreachable cluster node has been downed and removed.
If you encounter suspicious false positives when the system is under load you should
define a separate dispatcher for the cluster actors as described in :ref:`cluster_dispatcher_java`.

View file

@ -232,7 +232,8 @@ Note that the ``TransformationFrontend`` actor watch the registered backend
to be able to remove it from its list of available backend workers.
Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects
network failures and JVM crashes, in addition to graceful termination of watched
actor.
actor. Death watch generates the ``Terminated`` message to the watching actor when the
unreachable cluster node has been downed and removed.
The `Typesafe Activator <http://typesafe.com/platform/getstarted>`_ tutorial named
`Akka Cluster Samples with Scala <http://typesafe.com/activator/template/akka-sample-cluster-scala>`_.
@ -372,9 +373,10 @@ This is how the curve looks like for ``acceptable-heartbeat-pause`` configured t
.. image:: ../images/phi3.png
Death watch uses the cluster failure detector for nodes in the cluster, i.e. it
generates ``Terminated`` message from network failures and JVM crashes, in addition
to graceful termination of watched actor.
Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects
network failures and JVM crashes, in addition to graceful termination of watched
actor. Death watch generates the ``Terminated`` message to the watching actor when the
unreachable cluster node has been downed and removed.
If you encounter suspicious false positives when the system is under load you should
define a separate dispatcher for the cluster actors as described in :ref:`cluster_dispatcher_scala`.