From 4fb22dec682cfe1180e7ee0fa85437d84fcb345c Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 4 Feb 2014 15:51:08 +0100 Subject: [PATCH] =clu #3750 Update cluster specification * Based on Jonas' notes: https://gist.github.com/jboner/7692270 * Also removed "Future Extension" descriptions of cluster actor reference, since that is probably not the direction we are taking --- akka-docs/rst/common/cluster.rst | 109 ++++++++++++++++++-------- akka-docs/rst/general/addressing.rst | 38 --------- akka-docs/rst/java/cluster-usage.rst | 10 ++- akka-docs/rst/scala/cluster-usage.rst | 10 ++- 4 files changed, 88 insertions(+), 79 deletions(-) diff --git a/akka-docs/rst/common/cluster.rst b/akka-docs/rst/common/cluster.rst index 20ccc7088f..685225f0c0 100644 --- a/akka-docs/rst/common/cluster.rst +++ b/akka-docs/rst/common/cluster.rst @@ -46,7 +46,8 @@ A cluster is made up of a set of member nodes. The identifier for each node is a ``hostname:port:uid`` tuple. An Akka application can be distributed over a cluster with each node hosting some part of the application. Cluster membership and partitioning :ref:`[*] ` of the application are decoupled. A node could be a member of a -cluster without hosting any actors. +cluster without hosting any actors. Joining a cluster is initiated +by issuing a ``Join`` command to one of the nodes in the cluster to join. The node identifier internally also contains a UID that uniquely identifies this actor system instance at that ``hostname:port``. Akka uses the UID to be able to @@ -56,6 +57,12 @@ system with the same ``hostname:port`` to a cluster you have to stop the actor s and start a new one with the same ``hotname:port`` which will then receive a different UID. +The cluster membership state is a specialized `CRDT`_, which means that it has a monotonic +merge function. When concurrent changes occur on different nodes the updates can always be +merged and converge to the same end result. + +.. _CRDT: http://hal.upmc.fr/docs/00/55/55/88/PDF/techreport.pdf + Gossip ------ @@ -63,8 +70,7 @@ The cluster membership used in Akka is based on Amazon's `Dynamo`_ system and particularly the approach taken in Basho's' `Riak`_ distributed database. Cluster membership is communicated using a `Gossip Protocol`_, where the current state of the cluster is gossiped randomly through the cluster, with preference to -members that have not seen the latest version. Joining a cluster is initiated -by issuing a ``Join`` command to one of the nodes in the cluster to join. +members that have not seen the latest version. .. _Gossip Protocol: http://en.wikipedia.org/wiki/Gossip_protocol .. _Dynamo: http://www.allthingsdistributed.com/files/amazon-dynamo-sosp2007.pdf @@ -89,12 +95,19 @@ Gossip Convergence Information about the cluster converges locally at a node at certain points in time. This is when a node can prove that the cluster state he is observing has been observed -by all other nodes in the cluster. Convergence is implemented by passing a map from -node to current state version during gossip. This information is referred to as the -gossip overview. When all versions in the overview are equal there is convergence. +by all other nodes in the cluster. Convergence is implemented by passing a set of nodes +that have seen current state version during gossip. This information is referred to as the +seen set in the gossip overview. When all nodes are included in the seen set there is +convergence. + Gossip convergence cannot occur while any nodes are ``unreachable``. The nodes need to become ``reachable`` again, or moved to the ``down`` and ``removed`` states -(see the `Membership Lifecycle`_ section below). +(see the `Membership Lifecycle`_ section below). This only blocks the leader +from performing its cluster membership management and does not influence the application +running on top of the cluster. For example this means that during a network partition +it is not possible to add more nodes to the cluster. The nodes can join, but they +will not be moved to the ``up`` state until the partition has healed or the unreachable +nodes have been downed. Failure Detector @@ -127,6 +140,13 @@ any of these detects the node as ``unreachable`` that information will spread to the rest of the cluster through the gossip. In other words, only one node needs to mark a node ``unreachable`` to have the rest of the cluster mark that node ``unreachable``. +The nodes to monitor are picked out of neighbors in a hashed ordered node ring. +This is to increase the likelihood to monitor across racks and data centers, but the order +is the same on all nodes, which ensures full coverage. + +Heartbeats are sent out every second and every heartbeat is performed in a request/reply +handshake with the replies used as input to the failure detector. + The failure detector will also detect if the node becomes ``reachable`` again. When all nodes that monitored the ``unreachable`` node detects it as ``reachable`` again the cluster, after gossip dissemination, will consider it as ``reachable``. @@ -146,10 +166,11 @@ Leader After gossip convergence a ``leader`` for the cluster can be determined. There is no ``leader`` election process, the ``leader`` can always be recognised deterministically -by any node whenever there is gossip convergence. The ``leader`` is simply the first -node in sorted order that is able to take the leadership role, where the preferred -member states for a ``leader`` are ``up`` and ``leaving`` (see the `Membership Lifecycle`_ -section below for more information about member states). +by any node whenever there is gossip convergence. The leader is just a role, any node +can be the leader and it can change between convergence rounds. +The ``leader`` is simply the first node in sorted order that is able to take the leadership role, +where the preferred member states for a ``leader`` are ``up`` and ``leaving`` +(see the `Membership Lifecycle`_ section below for more information about member states). The role of the ``leader`` is to shift members in and out of the cluster, changing ``joining`` members to the ``up`` state or ``exiting`` members to the ``removed`` @@ -165,12 +186,14 @@ of unreachability. Seed Nodes ^^^^^^^^^^ -The seed nodes are configured contact points for initial join of the cluster. -When a new node is started it sends a message to all seed nodes and then sends +The seed nodes are configured contact points for new nodes joining the cluster. +When a new node is started it sends a message to all seed nodes and then sends a join command to the seed node that answers first. -It is possible to not use seed nodes and instead join any node in the cluster -manually. +The seed nodes configuration value does not have any influence on the running +cluster itself, it is only relevant for new nodes joining the cluster as it +helps them to find contact points to send the join command to; a new member +can send this command to any current member of the cluster, not only to the seed nodes. Gossip Protocol @@ -185,33 +208,53 @@ a vector clock for versioning, so the variant of push-pull gossip used in Akka makes use of this version to only push the actual state as needed. Periodically, the default is every 1 second, each node chooses another random -node to initiate a round of gossip with. The choice of node is random but can -also include extra gossiping nodes with either newer or older state versions. +node to initiate a round of gossip with. If less than ½ of the nodes resides in the +seen set (have seen the new state) then the cluster gossips 3 times instead of once +every second. This adjusted gossip interval is a way to speed up the convergence process +in the early dissemination phase after a state change. -The gossip overview contains the current state version for all nodes and also a -list of unreachable nodes. This allows any node to easily determine which other -nodes have newer or older information, not just the nodes involved in a gossip -exchange. +The choice of node to gossip with is random but it is biased to towards nodes that +might not have seen the current state version. During each round of gossip exchange when +no convergence it uses a probability of 0.8 (configurable) to gossip to a node not +part of the seen set, i.e. that probably has an older version of the state. Otherwise +gossip to any random live node. -The nodes defined as ``seed`` nodes are just regular member nodes whose only -"special role" is to function as contact points in the cluster. +This biased selection is a way to speed up the convergence process in the late dissemination +phase after a state change. -During each round of gossip exchange it sends Gossip to random node with -newer or older state information, if any, based on the current gossip overview, -with some probability. Otherwise Gossip to any random live node. +For clusters larger than 400 nodes (configurable, and suggested by empirical evidence) +the 0.8 probability is gradually reduced to avoid overwhelming single stragglers with +too many concurrent gossip requests. The gossip receiver also has a mechanism to +protect itself from too many simultaneous gossip messages by dropping messages that +have been enqueued in the mailbox for too long time. -The gossiper only sends the gossip version to the chosen node. The recipient of -the gossip can use the gossip version to determine whether: +While the cluster is in a converged state the gossiper only sends a small gossip status message containing the gossip +version to the chosen node. As soon as there is a change to the cluster (meaning non-convergence) +then it goes back to biased gossip again. -1. it has a newer version of the gossip state, in which case it sends that back - to the gossiper, or +The recipient of the gossip state or the gossip status can use the gossip version +(vector clock) to determine whether: -2. it has an outdated version of the state, in which case the recipient requests - the current state from the gossiper +#. it has a newer version of the gossip state, in which case it sends that back + to the gossiper + +#. it has an outdated version of the state, in which case the recipient requests + the current state from the gossiper by sending back its version of the gossip state + +#. it has conflicting gossip versions, in which case the different versions are merged + and sent back If the recipient and the gossip have the same version then the gossip state is not sent or requested. +The periodic nature of the gossip has a nice batching effect of state changes, +e.g. joining several nodes quickly after each other to one node will result in only +one state change to be spread to other members in the cluster. + +The gossip messages are serialized with `protobuf`_ and also gzipped to reduce payload +size. + +.. _protobuf: https://code.google.com/p/protobuf/ Membership Lifecycle -------------------- @@ -232,7 +275,7 @@ become a part of the cluster). To be able to move forward the state of the ``unreachable`` nodes must be changed. It must become ``reachable`` again or marked as ``down``. If the node is to join the cluster again the actor system must be restarted and go through the joining process again. The cluster can, through the -leader, also *auto-down* a node. +leader, also *auto-down* a node after a configured time of unreachability.. .. note:: If you have *auto-down* enabled and the failure detector triggers, you can over time end up with a lot of single node clusters if you don't put diff --git a/akka-docs/rst/general/addressing.rst b/akka-docs/rst/general/addressing.rst index b0fba1fd5b..be37c021e6 100644 --- a/akka-docs/rst/general/addressing.rst +++ b/akka-docs/rst/general/addressing.rst @@ -68,11 +68,6 @@ depending on the configuration of the actor system: facilities is a fake actor reference which accepts log events and prints them directly to standard output; it is :class:`Logging.StandardOutLogger`. -- **(Future Extension)** Cluster actor references represent clustered actor - services which may be replicated, migrated or load-balanced across multiple - cluster nodes. As such they are virtual names which the cluster service - translates into local or remote actor references as appropriate. - What is an Actor Path? ---------------------- @@ -120,7 +115,6 @@ actors in the hierarchy from the root up. Examples are:: "akka://my-sys/user/service-a/worker1" // purely local "akka.tcp://my-sys@host.example.com:5678/user/service-b" // remote - "cluster://my-cluster/service-c" // clustered (Future Extension) Here, ``akka.tcp`` is the default remote transport for the 2.2 release; other transports are pluggable. A remote host using UDP would be accessible by using ``akka.udp``. @@ -154,17 +148,6 @@ systems or JVMs. This means that the logical path (supervision hierarchy) and the physical path (actor deployment) of an actor may diverge if one of its ancestors is remotely supervised. -Virtual Actor Paths **(Future Extension)** -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -In order to be able to replicate and migrate actors across a cluster of Akka -nodes, another level of indirection has to be introduced. The cluster component -therefore provides a translation from virtual paths to physical paths which may -change in reaction to node failures, cluster rebalancing, etc. - -*This area is still under active development, expect updates in this section -for the Akka release code named Rollins .* - How are Actor References obtained? ---------------------------------- @@ -173,12 +156,6 @@ creating actors or by looking them up, where the latter functionality comes in the two flavours of creating actor references from concrete actor paths and querying the logical actor hierarchy. -*While local and remote actor references and their paths work in the same way -concerning the facilities mentioned below, the exact semantics of clustered -actor references and their paths—while certainly as similar as possible—may -differ in certain aspects, owing to the virtual nature of those paths. Expect -updates for the Akka release code named Rollins.* - Creating Actors ^^^^^^^^^^^^^^^ @@ -342,21 +319,6 @@ when sending to an unresolved actor reference. .. image:: RemoteDeployment.png -The Interplay with Clustering **(Future Extension)** ----------------------------------------------------- - -*This section is subject to change!* - -When creating a scaled-out actor subtree, a cluster name is created for a -routed actor reference, where sending to this reference will send to one (or -more) of the actual actors created in the cluster. In order for those actors to -be able to query other actors while processing their messages, their sender -reference must be unique for each of the replicas, which means that physical -paths will be used as ``self`` references for these instances. In the case -of replication for achieving fault-tolerance the opposite is required: the -``self`` reference will be a virtual (cluster) path so that in case of -migration or fail-over communication is resumed with the fresh instance. - What is the Address part used for? ---------------------------------- diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index cc3709b8fc..441aaff328 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -238,7 +238,8 @@ Note that the ``TransformationFrontend`` actor watch the registered backend to be able to remove it from its list of available backend workers. Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects network failures and JVM crashes, in addition to graceful termination of watched -actor. +actor. Death watch generates the ``Terminated`` message to the watching actor when the +unreachable cluster node has been downed and removed. The `Typesafe Activator `_ tutorial named `Akka Cluster Samples with Java `_. @@ -377,9 +378,10 @@ This is how the curve looks like for ``acceptable-heartbeat-pause`` configured t .. image:: ../images/phi3.png -Death watch uses the cluster failure detector for nodes in the cluster, i.e. it -generates ``Terminated`` message from network failures and JVM crashes, in addition -to graceful termination of watched actor. +Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects +network failures and JVM crashes, in addition to graceful termination of watched +actor. Death watch generates the ``Terminated`` message to the watching actor when the +unreachable cluster node has been downed and removed. If you encounter suspicious false positives when the system is under load you should define a separate dispatcher for the cluster actors as described in :ref:`cluster_dispatcher_java`. diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index 60a97d1f18..af704b716b 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -232,7 +232,8 @@ Note that the ``TransformationFrontend`` actor watch the registered backend to be able to remove it from its list of available backend workers. Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects network failures and JVM crashes, in addition to graceful termination of watched -actor. +actor. Death watch generates the ``Terminated`` message to the watching actor when the +unreachable cluster node has been downed and removed. The `Typesafe Activator `_ tutorial named `Akka Cluster Samples with Scala `_. @@ -372,9 +373,10 @@ This is how the curve looks like for ``acceptable-heartbeat-pause`` configured t .. image:: ../images/phi3.png -Death watch uses the cluster failure detector for nodes in the cluster, i.e. it -generates ``Terminated`` message from network failures and JVM crashes, in addition -to graceful termination of watched actor. +Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects +network failures and JVM crashes, in addition to graceful termination of watched +actor. Death watch generates the ``Terminated`` message to the watching actor when the +unreachable cluster node has been downed and removed. If you encounter suspicious false positives when the system is under load you should define a separate dispatcher for the cluster actors as described in :ref:`cluster_dispatcher_scala`.