2017-05-10 16:20:38 +02:00
# Cluster Usage
2012-08-27 07:47:34 +02:00
2017-05-11 17:27:57 +02:00
For introduction to the Akka Cluster concepts please see @ref: [Cluster Specification ](common/cluster.md ).
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
## Preparing Your Project for Clustering
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
The Akka cluster is a separate jar file. Make sure that you have the following dependency in your project:
2012-08-27 07:47:34 +02:00
2017-08-07 01:56:40 -07:00
sbt
2017-06-14 17:52:01 +09:00
: @@@vars
```
"com.typesafe.akka" %% "akka-cluster" % "$akka.version$"
```
@@@
2017-09-01 14:02:00 +02:00
Gradle
2017-08-07 01:56:40 -07:00
: @@@vars
```
compile group: 'com.typesafe.akka', name: 'akka-cluster_$scala.binary_version$', version: '$akka.version$'
```
@@@
2017-09-01 14:02:00 +02:00
Maven
2017-06-14 17:52:01 +09:00
: @@@vars
```
< dependency >
< groupId > com.typesafe.akka< / groupId >
< artifactId > akka-cluster_$scala.binary_version$< / artifactId >
< version > $akka.version$< / version >
< / dependency >
```
@@@
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
## A Simple Cluster Example
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
The following configuration enables the `Cluster` extension to be used.
It joins the cluster and an actor subscribes to cluster membership events and logs them.
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
The `application.conf` configuration looks like this:
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
```
akka {
actor {
provider = "cluster"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127 .0.0.1:2551",
"akka.tcp://ClusterSystem@127 .0.0.1:2552"]
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
#
# auto-down-unreachable-after = 10s
}
}
2017-02-14 13:10:23 +02:00
2017-05-10 16:20:38 +02:00
# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
2017-02-14 13:10:23 +02:00
2017-05-10 16:20:38 +02:00
# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
```
2017-02-14 13:10:23 +02:00
2017-05-10 16:20:38 +02:00
To enable cluster capabilities in your Akka project you should, at a minimum, add the @ref: [Remoting ](remoting.md )
settings, but with `cluster` .
The `akka.cluster.seed-nodes` should normally also be added to your `application.conf` file.
2017-02-14 13:10:23 +02:00
2017-05-10 16:20:38 +02:00
@@@ note
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
If you are running Akka in a Docker container or the nodes for some other reason have separate internal and
external ip addresses you must configure remoting according to @ref: [Akka behind NAT or in a Docker container ](remoting.md#remote-configuration-nat )
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
@@@
2015-09-09 10:14:51 +02:00
2012-08-27 07:47:34 +02:00
The seed nodes are configured contact points for initial, automatic, join of the cluster.
2012-08-31 12:27:17 +02:00
Note that if you are going to start the nodes on different machines you need to specify the
2017-05-10 16:20:38 +02:00
ip-addresses or host names of the machines in `application.conf` instead of `127.0.0.1`
2012-08-27 07:47:34 +02:00
2013-11-29 16:27:23 +01:00
An actor that uses the cluster extension may look like this:
2012-08-27 07:47:34 +02:00
2017-06-14 17:52:01 +09:00
Scala
: @@snip [SimpleClusterListener.scala ]($code$/scala/docs/cluster/SimpleClusterListener.scala ) { type=scala }
Java
: @@snip [SimpleClusterListener.java ]($code$/java/jdocs/cluster/SimpleClusterListener.java ) { type=java }
2012-08-27 07:47:34 +02:00
2014-01-08 14:14:48 +01:00
The actor registers itself as subscriber of certain cluster events. It receives events corresponding to the current state
of the cluster when the subscription starts and then it receives events for changes that happen in the cluster.
2012-08-27 07:47:34 +02:00
2017-03-31 13:52:05 +03:00
The easiest way to run this example yourself is to download the ready to run
2017-06-14 17:52:01 +09:00
@scala [@extref[Akka Cluster Sample with Scala ](ecs:akka-samples-cluster-scala )]
@java [@extref[Akka Cluster Sample with Java ](ecs:akka-samples-cluster-java )]
2017-05-10 16:20:38 +02:00
together with the tutorial. It contains instructions on how to run the `SimpleClusterApp` .
2017-06-14 17:52:01 +09:00
The source code of this sample can be found in the
@scala [@extref[Akka Samples Repository ](samples:akka-sample-cluster-scala )]@java [@extref[Akka Samples Repository ](samples:akka-sample-cluster-java )].
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
## Joining to Seed Nodes
2012-08-27 07:47:34 +02:00
2013-05-16 17:00:04 +02:00
You may decide if joining to the cluster should be done manually or automatically
2017-08-21 10:49:56 +02:00
to configured initial contact points, so-called seed nodes. After the joining process
the seed nodes are not special and they participate in the cluster in exactly the same
way as other nodes.
When a new node is started it sends a message to all seed nodes and then sends join command to the one that
2012-08-27 07:47:34 +02:00
answers first. If no one of the seed nodes replied (might not be started yet)
it retries this procedure until successful or shutdown.
2017-05-11 17:27:57 +02:00
You define the seed nodes in the [configuration ](#cluster-configuration ) file (application.conf):
2013-05-21 17:52:04 +02:00
2017-05-10 16:20:38 +02:00
```
akka.cluster.seed-nodes = [
"akka.tcp://ClusterSystem@host1:2552 ",
"akka.tcp://ClusterSystem@host2:2552 "]
```
2013-05-21 17:52:04 +02:00
2017-05-10 16:20:38 +02:00
This can also be defined as Java system properties when starting the JVM using the following syntax:
2013-05-21 17:52:04 +02:00
2017-05-10 16:20:38 +02:00
```
-Dakka.cluster.seed-nodes.0=akka.tcp://ClusterSystem@host1:2552
-Dakka.cluster.seed-nodes.1=akka.tcp://ClusterSystem@host2:2552
```
2013-05-21 17:52:04 +02:00
2017-08-21 10:49:56 +02:00
Such configuration is typically created dynamically by external tools, see for example:
* [Deploying clustered Akka applications on Kubernetes ](http://developer.lightbend.com/guides/k8-akka-cluster/ )
* [ConductR ](https://conductr.lightbend.com/docs/2.1.x/AkkaAndPlay#Akka-Clustering )
* [ConstructR ](https://github.com/hseeberger/constructr )
2012-08-27 07:47:34 +02:00
The seed nodes can be started in any order and it is not necessary to have all
2017-05-10 16:20:38 +02:00
seed nodes running, but the node configured as the first element in the `seed-nodes`
2015-10-15 08:08:01 +02:00
configuration list must be started when initially starting a cluster, otherwise the
2013-10-14 17:46:55 +02:00
other seed-nodes will not become initialized and no other node can join the cluster.
The reason for the special first seed node is to avoid forming separated islands when
starting from an empty cluster.
2015-10-15 08:08:01 +02:00
It is quickest to start all configured seed nodes at the same time (order doesn't matter),
2017-05-10 16:20:38 +02:00
otherwise it can take up to the configured `seed-node-timeout` until the nodes
2013-02-17 17:35:43 +01:00
can join.
Once more than two seed nodes have been started it is no problem to shut down the first
2014-08-05 14:16:46 +02:00
seed node. If the first seed node is restarted, it will first try to join the other
2017-08-21 10:49:56 +02:00
seed nodes in the existing cluster. Note that if you stop all seed nodes at the same time
and restart them with the same `seed-nodes` configuration they will join themselves and
form a new cluster instead of joining remaining nodes of the existing cluster. That is
likely not desired and should be avoided by listing several nodes as seed nodes for redundancy
and don't stop all of them at the same time.
2013-10-14 17:46:55 +02:00
2017-06-14 17:52:01 +09:00
You may also use @scala [`Cluster(system).joinSeedNodes` ]@java [`Cluster.get(system).joinSeedNodes` ] to join programmatically,
2014-08-05 14:16:46 +02:00
which is attractive when dynamically discovering other nodes at startup by using some external tool or API.
2017-05-10 16:20:38 +02:00
When using `joinSeedNodes` you should not include the node itself except for the node that is
2017-08-21 10:49:56 +02:00
supposed to be the first seed node, and that should be placed first in the parameter to
2017-05-10 16:20:38 +02:00
`joinSeedNodes` .
2012-08-27 07:47:34 +02:00
2015-10-15 08:08:01 +02:00
Unsuccessful attempts to contact seed nodes are automatically retried after the time period defined in
2017-05-10 16:20:38 +02:00
configuration property `seed-node-timeout` . Unsuccessful attempt to join a specific seed node is
automatically retried after the configured `retry-unsuccessful-join-after` . Retrying means that it
2015-05-04 08:35:46 +02:00
tries to contact all seed nodes and then joins the node that answers first. The first node in the list
of seed nodes will join itself if it cannot contact any of the other seed nodes within the
2017-05-10 16:20:38 +02:00
configured `seed-node-timeout` .
2013-04-11 09:18:12 +02:00
2017-08-21 10:49:56 +02:00
The joining of given seed nodes will by default be retried indefinitely until
a successful join. That process can be aborted if unsuccessful by configuring a
timeout. When aborted it will run @ref: [Coordinated Shutdown ](actors.md#coordinated-shutdown ),
which by default will terminated the ActorSystem. CoordinatedShutdown can also be configured to exit
the JVM. It is useful to define this timeout if the `seed-nodes` are assembled
dynamically and a restart with new seed-nodes should be tried after unsuccessful
attempts.
```
akka.cluster.shutdown-after-unsuccessful-join-seed-nodes = 20s
akka.coordinated-shutdown.terminate-actor-system = on
```
If you don't configure seed nodes or use `joinSeedNodes` you need to join the cluster manually, which can be performed by using [JMX ](#cluster-jmx ) or [HTTP ](#cluster-http ).
You can join to any node in the cluster. It does not have to be configured as a seed node.
Note that you can only join to an existing cluster member, which means that for bootstrapping some
node must join itself,and then the following nodes could join them to make up a cluster.
2013-04-11 09:18:12 +02:00
An actor system can only join a cluster once. Additional attempts will be ignored.
When it has successfully joined it must be restarted to be able to join another
2017-06-14 17:52:01 +09:00
cluster or to join the same cluster again. It can use the same host name and port
2015-04-17 17:28:37 +08:00
after the restart, when it come up as new incarnation of existing member in the cluster,
2015-05-26 09:00:40 +02:00
trying to join in, then the existing one will be removed from the cluster and then it will
be allowed to join.
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
@@@ note
2016-04-04 12:37:12 +02:00
2017-05-10 16:20:38 +02:00
The name of the `ActorSystem` must be the same for all members of a cluster. The name is given
when you start the `ActorSystem` .
2016-04-04 12:37:12 +02:00
2017-05-10 16:20:38 +02:00
@@@
2014-04-10 12:52:57 +02:00
2017-05-11 17:27:57 +02:00
< a id = "automatic-vs-manual-downing" > < / a >
2017-05-10 16:20:38 +02:00
## Downing
2012-08-27 07:47:34 +02:00
When a member is considered by the failure detector to be unreachable the
leader is not allowed to perform its duties, such as changing status of
2013-08-27 15:14:53 +02:00
new joining members to 'Up'. The node must first become reachable again, or the
status of the unreachable member must be changed to 'Down'. Changing status to 'Down'
can be performed automatically or manually. By default it must be done manually, using
2017-05-11 17:27:57 +02:00
[JMX ](#cluster-jmx ) or [HTTP ](#cluster-http ).
2012-08-27 07:47:34 +02:00
2017-06-14 17:52:01 +09:00
It can also be performed programmatically with @scala [`Cluster(system).down(address)` ]@java [`Cluster.get(system).down(address)` ].
2012-08-27 07:47:34 +02:00
2016-09-02 10:26:47 +02:00
A pre-packaged solution for the downing problem is provided by
2017-05-10 16:20:38 +02:00
[Split Brain Resolver ](http://developer.lightbend.com/docs/akka-commercial-addons/current/split-brain-resolver.html ),
which is part of the [Lightbend Reactive Platform ](http://www.lightbend.com/platform ).
If you don’ t use RP, you should anyway carefully read the [documentation ](http://developer.lightbend.com/docs/akka-commercial-addons/current/split-brain-resolver.html )
2016-09-02 10:26:47 +02:00
of the Split Brain Resolver and make sure that the solution you are using handles the concerns
described there.
2017-05-10 16:20:38 +02:00
### Auto-downing (DO NOT USE)
2016-09-02 10:26:47 +02:00
2017-05-10 16:20:38 +02:00
There is an automatic downing feature that you should not use in production. For testing purpose you can enable it with configuration:
2012-08-27 07:47:34 +02:00
2017-05-10 16:20:38 +02:00
```
akka.cluster.auto-down-unreachable-after = 120s
```
2013-09-11 16:09:51 +02:00
2017-05-10 16:20:38 +02:00
This means that the cluster leader member will change the `unreachable` node
status to `down` automatically after the configured time of unreachability.
2012-08-27 07:47:34 +02:00
2016-04-04 12:37:12 +02:00
This is a naïve approach to remove unreachable nodes from the cluster membership. It
works great for crashes and short transient network partitions, but not for long network
2016-06-20 13:10:43 +02:00
partitions. Both sides of the network partition will see the other side as unreachable
2016-04-04 12:37:12 +02:00
and after a while remove it from its cluster membership. Since this happens on both
sides the result is that two separate disconnected clusters have been created. This
can also happen because of long GC pauses or system overload.
2017-05-10 16:20:38 +02:00
@@@ warning
2016-04-04 12:37:12 +02:00
2017-05-10 16:20:38 +02:00
We recommend against using the auto-down feature of Akka Cluster in production.
This is crucial for correct behavior if you use @ref: [Cluster Singleton ](cluster-singleton.md ) or
2017-05-11 17:27:57 +02:00
@ref: [Cluster Sharding ](cluster-sharding.md ), especially together with Akka @ref: [Persistence ](persistence.md ).
2017-05-10 16:20:38 +02:00
For Akka Persistence with Cluster Sharding it can result in corrupt data in case
of network partitions.
2016-06-20 13:10:43 +02:00
2017-05-10 16:20:38 +02:00
@@@
2013-09-11 16:09:51 +02:00
2017-05-10 16:20:38 +02:00
## Leaving
2013-05-09 09:49:59 +02:00
There are two ways to remove a member from the cluster.
You can just stop the actor system (or the JVM process). It will be detected
as unreachable and removed after the automatic or manual downing as described
above.
A more graceful exit can be performed if you tell the cluster that a node shall leave.
2017-05-11 17:27:57 +02:00
This can be performed using [JMX ](#cluster-jmx ) or [HTTP ](#cluster-http ).
2015-09-04 08:53:36 +02:00
It can also be performed programmatically with:
2017-06-14 17:52:01 +09:00
Scala
: @@snip [ClusterDocSpec.scala ]($code$/scala/docs/cluster/ClusterDocSpec.scala ) { #leave }
Java
: @@snip [ClusterDocTest.java ]($code$/java/jdocs/cluster/ClusterDocTest.java ) { #leave }
2013-05-09 09:49:59 +02:00
Note that this command can be issued to any member in the cluster, not necessarily the
2016-12-01 18:49:38 +01:00
one that is leaving.
2017-05-11 17:27:57 +02:00
The @ref: [Coordinated Shutdown ](actors.md#coordinated-shutdown ) will automatically run when the cluster node sees itself as
2017-05-10 16:20:38 +02:00
`Exiting` , i.e. leaving from another node will trigger the shutdown process on the leaving node.
2017-03-31 13:52:05 +03:00
Tasks for graceful leaving of cluster including graceful shutdown of Cluster Singletons and
Cluster Sharding are added automatically when Akka Cluster is used, i.e. running the shutdown
process will also trigger the graceful leaving if it's not already in progress.
2016-12-01 18:49:38 +01:00
2017-03-31 13:52:05 +03:00
Normally this is handled automatically, but in case of network failures during this process it might still
2017-05-10 16:20:38 +02:00
be necessary to set the node’ s status to `Down` in order to complete the removal.
2013-05-09 09:49:59 +02:00
2017-05-11 17:27:57 +02:00
< a id = "weakly-up" > < / a >
2017-05-10 16:20:38 +02:00
## WeaklyUp Members
2015-09-04 12:38:49 +02:00
2017-05-10 16:20:38 +02:00
If a node is `unreachable` then gossip convergence is not possible and therefore any
`leader` actions are also not possible. However, we still might want new nodes to join
2015-08-25 17:20:05 -05:00
the cluster in this scenario.
2017-05-10 16:20:38 +02:00
`Joining` members will be promoted to `WeaklyUp` and become part of the cluster if
convergence can't be reached. Once gossip convergence is reached, the leader will move `WeaklyUp`
members to `Up` .
2015-08-25 17:20:05 -05:00
2017-05-10 16:20:38 +02:00
This feature is enabled by default, but it can be disabled with configuration option:
2015-08-25 17:20:05 -05:00
2017-05-10 16:20:38 +02:00
```
akka.cluster.allow-weakly-up-members = off
```
2015-08-25 17:20:05 -05:00
2017-05-10 16:20:38 +02:00
You can subscribe to the `WeaklyUp` membership event to make use of the members that are
2015-09-04 12:38:49 +02:00
in this state, but you should be aware of that members on the other side of a network partition
2015-10-15 08:08:01 +02:00
have no knowledge about the existence of the new members. You should for example not count
2017-05-10 16:20:38 +02:00
`WeaklyUp` members in quorum decisions.
2012-09-21 11:47:50 +02:00
2017-05-11 17:27:57 +02:00
< a id = "cluster-subscriber" > < / a >
2017-05-10 16:20:38 +02:00
## Subscribe to Cluster Events
2012-09-13 10:54:14 +02:00
You can subscribe to change notifications of the cluster membership by using
2017-06-14 17:52:01 +09:00
@scala [`Cluster(system).subscribe` ]@java [`Cluster.get(system).subscribe` ].
2014-01-08 14:14:48 +01:00
2017-06-14 17:52:01 +09:00
Scala
: @@snip [SimpleClusterListener2.scala ]($code$/scala/docs/cluster/SimpleClusterListener2.scala ) { #subscribe }
Java
: @@snip [SimpleClusterListener2.java ]($code$/java/jdocs/cluster/SimpleClusterListener2.java ) { #subscribe }
2014-01-08 14:14:48 +01:00
2017-05-10 16:20:38 +02:00
A snapshot of the full state, `akka.cluster.ClusterEvent.CurrentClusterState` , is sent to the subscriber
2014-01-08 14:14:48 +01:00
as the first message, followed by events for incremental updates.
2012-09-13 10:54:14 +02:00
2017-05-10 16:20:38 +02:00
Note that you may receive an empty `CurrentClusterState` , containing no members,
2015-10-15 08:08:01 +02:00
if you start the subscription before the initial join procedure has completed.
This is expected behavior. When the node has been accepted in the cluster you will
2017-05-10 16:20:38 +02:00
receive `MemberUp` for that node, and other nodes.
2013-07-01 16:02:45 +02:00
2017-05-10 16:20:38 +02:00
If you find it inconvenient to handle the `CurrentClusterState` you can use
2017-06-14 17:52:01 +09:00
@scala [`ClusterEvent.InitialStateAsEvents` ] @java [`ClusterEvent.initialStateAsEvents()` ] as parameter to `subscribe` .
2017-05-10 16:20:38 +02:00
That means that instead of receiving `CurrentClusterState` as the first message you will receive
2014-01-08 14:14:48 +01:00
the events corresponding to the current state to mimic what you would have seen if you were
listening to the events when they occurred in the past. Note that those initial events only correspond
2015-10-15 08:08:01 +02:00
to the current state and it is not the full history of all changes that actually has occurred in the cluster.
2014-01-08 14:14:48 +01:00
2017-06-14 17:52:01 +09:00
Scala
: @@snip [SimpleClusterListener.scala ]($code$/scala/docs/cluster/SimpleClusterListener.scala ) { #subscribe }
Java
: @@snip [SimpleClusterListener.java ]($code$/java/jdocs/cluster/SimpleClusterListener.java ) { #subscribe }
2014-01-08 14:14:48 +01:00
2013-05-09 09:49:59 +02:00
The events to track the life-cycle of members are:
2017-06-14 17:52:01 +09:00
* `ClusterEvent.MemberJoined` - A new member has joined the cluster and its status has been changed to `Joining`
* `ClusterEvent.MemberUp` - A new member has joined the cluster and its status has been changed to `Up`
* `ClusterEvent.MemberExited` - A member is leaving the cluster and its status has been changed to `Exiting`
2017-05-10 16:20:38 +02:00
Note that the node might already have been shutdown when this event is published on another node.
* `ClusterEvent.MemberRemoved` - Member completely removed from the cluster.
* `ClusterEvent.UnreachableMember` - A member is considered as unreachable, detected by the failure detector
of at least one other node.
* `ClusterEvent.ReachableMember` - A member is considered as reachable again, after having been unreachable.
All nodes that previously detected it as unreachable has detected it as reachable again.
2013-05-09 09:49:59 +02:00
There are more types of change events, consult the API documentation
2017-05-10 16:20:38 +02:00
of classes that extends `akka.cluster.ClusterEvent.ClusterDomainEvent`
2012-09-13 10:54:14 +02:00
for details about the events.
2014-01-08 14:14:48 +01:00
Instead of subscribing to cluster events it can sometimes be convenient to only get the full membership state with
2017-06-14 17:52:01 +09:00
@scala [`Cluster(system).state` ]@java [`Cluster.get(system).state()` ]. Note that this state is not necessarily in sync with the events published to a
2015-10-15 08:08:01 +02:00
cluster subscription.
2014-01-08 14:14:48 +01:00
2017-05-10 16:20:38 +02:00
### Worker Dial-in Example
2012-09-13 10:54:14 +02:00
Let's take a look at an example that illustrates how workers, here named *backend* ,
can detect and register to new master nodes, here named *frontend* .
The example application provides a service to transform text. When some text
is sent to one of the frontend services, it will be delegated to one of the
backend workers, which performs the transformation job, and sends the result back to
the original client. New backend nodes, as well as new frontend nodes, can be
added or removed to the cluster dynamically.
Messages:
2017-06-14 17:52:01 +09:00
Scala
: @@snip [TransformationMessages.scala ]($code$/scala/docs/cluster/TransformationMessages.scala ) { #messages }
Java
: @@snip [TransformationMessages.java ]($code$/java/jdocs/cluster/TransformationMessages.java ) { #messages }
2012-09-13 10:54:14 +02:00
The backend worker that performs the transformation job:
2017-06-14 17:52:01 +09:00
Scala
: @@snip [TransformationBackend.scala ]($code$/scala/docs/cluster/TransformationBackend.scala ) { #backend }
Java
: @@snip [TransformationBackend.java ]($code$/java/jdocs/cluster/TransformationBackend.java ) { #backend }
2012-09-13 10:54:14 +02:00
2017-05-10 16:20:38 +02:00
Note that the `TransformationBackend` actor subscribes to cluster events to detect new,
2012-09-13 10:54:14 +02:00
potential, frontend nodes, and send them a registration message so that they know
that they can use the backend worker.
The frontend that receives user jobs and delegates to one of the registered backend workers:
2017-06-14 17:52:01 +09:00
Scala
: @@snip [TransformationFrontend.scala ]($code$/scala/docs/cluster/TransformationFrontend.scala ) { #frontend }
Java
: @@snip [TransformationFrontend.java ]($code$/java/jdocs/cluster/TransformationFrontend.java ) { #frontend }
2012-09-13 10:54:14 +02:00
2017-05-10 16:20:38 +02:00
Note that the `TransformationFrontend` actor watch the registered backend
2013-11-29 16:27:23 +01:00
to be able to remove it from its list of available backend workers.
2012-09-23 12:55:31 +02:00
Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects
2012-09-13 10:54:14 +02:00
network failures and JVM crashes, in addition to graceful termination of watched
2017-05-10 16:20:38 +02:00
actor. Death watch generates the `Terminated` message to the watching actor when the
2014-02-04 15:51:08 +01:00
unreachable cluster node has been downed and removed.
2012-09-13 10:54:14 +02:00
2017-03-31 13:52:05 +03:00
The easiest way to run **Worker Dial-in Example** example yourself is to download the ready to run
2017-06-14 17:52:01 +09:00
@scala [@extref[Akka Cluster Sample with Scala ](ecs:akka-samples-cluster-scala )]
@java [@extref[Akka Cluster Sample with Java ](ecs:akka-samples-cluster-java )]
2017-03-31 13:52:05 +03:00
together with the tutorial. It contains instructions on how to run the **Worker Dial-in Example** sample.
2017-06-14 17:52:01 +09:00
The source code of this sample can be found in the
@scala [@extref[Akka Samples Repository ](samples:akka-sample-cluster-scala )]@java [@extref[Akka Samples Repository ](samples:akka-sample-cluster-java )].
2012-09-13 10:54:14 +02:00
2017-05-10 16:20:38 +02:00
## Node Roles
2012-09-13 10:54:14 +02:00
2013-03-14 20:32:43 +01:00
Not all nodes of a cluster need to perform the same function: there might be one sub-set which runs the web front-end,
one which runs the data access layer and one for the number-crunching. Deployment of actors—for example by cluster-aware
routers—can take node roles into account to achieve this distribution of responsibilities.
2017-05-10 16:20:38 +02:00
The roles of a node is defined in the configuration property named `akka.cluster.roles`
2013-03-14 20:32:43 +01:00
and it is typically defined in the start script as a system property or environment variable.
2017-05-10 16:20:38 +02:00
The roles of the nodes is part of the membership information in `MemberEvent` that you can subscribe to.
2012-09-13 10:54:14 +02:00
2017-05-11 17:27:57 +02:00
< a id = "min-members" > < / a >
2017-05-10 16:20:38 +02:00
## How To Startup when Cluster Size Reached
2012-12-10 08:46:25 +01:00
A common use case is to start actors after the cluster has been initialized,
2015-10-15 08:08:01 +02:00
members have joined, and the cluster has reached a certain size.
2012-12-10 08:46:25 +01:00
With a configuration option you can define required number of members
2017-05-10 16:20:38 +02:00
before the leader changes member status of 'Joining' members to 'Up'.:
2012-12-10 08:46:25 +01:00
2017-05-10 16:20:38 +02:00
```
akka.cluster.min-nr-of-members = 3
```
2012-12-10 08:46:25 +01:00
2013-03-14 20:32:43 +01:00
In a similar way you can define required number of members of a certain role
2017-05-10 16:20:38 +02:00
before the leader changes member status of 'Joining' members to 'Up'.:
2013-03-14 20:32:43 +01:00
2017-05-10 16:20:38 +02:00
```
akka.cluster.role {
frontend.min-nr-of-members = 1
backend.min-nr-of-members = 2
}
```
2013-03-14 20:32:43 +01:00
2017-05-10 16:20:38 +02:00
You can start the actors in a `registerOnMemberUp` callback, which will
2015-05-28 21:16:34 +02:00
be invoked when the current member status is changed to 'Up', i.e. the cluster
2012-12-10 08:46:25 +01:00
has at least the defined number of members.
2017-06-14 17:52:01 +09:00
Scala
: @@snip [FactorialFrontend.scala ]($code$/scala/docs/cluster/FactorialFrontend.scala ) { #registerOnUp }
Java
: @@snip [FactorialFrontendMain.java ]($code$/java/jdocs/cluster/FactorialFrontendMain.java ) { #registerOnUp }
2012-12-10 08:46:25 +01:00
This callback can be used for other things than starting actors.
2017-05-10 16:20:38 +02:00
## How To Cleanup when Member is Removed
2015-05-26 09:00:40 +02:00
2017-05-10 16:20:38 +02:00
You can do some clean up in a `registerOnMemberRemoved` callback, which will
2015-09-10 08:48:50 +02:00
be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown.
2015-04-17 17:28:37 +08:00
2017-05-11 17:27:57 +02:00
An alternative is to register tasks to the @ref: [Coordinated Shutdown ](actors.md#coordinated-shutdown ).
2017-05-10 16:20:38 +02:00
@@@ note
2015-04-17 17:28:37 +08:00
2017-05-10 16:20:38 +02:00
Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on
the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may
want to install some cleanup handling after the cluster was started up, but the cluster might already be shutting
down when you installing, and depending on the race is not healthy.
2015-04-17 17:28:37 +08:00
2017-05-10 16:20:38 +02:00
@@@
## Cluster Singleton
2013-01-14 14:09:53 +01:00
For some use cases it is convenient and sometimes also mandatory to ensure that
you have exactly one actor of a certain type running somewhere in the cluster.
2015-10-15 08:08:01 +02:00
This can be implemented by subscribing to member events, but there are several corner
cases to consider. Therefore, this specific use case is made easily accessible by the
2017-05-10 16:20:38 +02:00
@ref: [Cluster Singleton ](cluster-singleton.md ).
2013-11-19 15:53:40 +01:00
2017-05-10 16:20:38 +02:00
## Cluster Sharding
2013-11-19 15:53:40 +01:00
2014-02-21 11:24:01 +01:00
Distributes actors across several nodes in the cluster and supports interaction
with the actors using their logical identifier, but without having to care about
their physical location in the cluster.
2013-11-19 15:53:40 +01:00
2017-06-14 17:52:01 +09:00
See @ref: [Cluster Sharding ](cluster-sharding.md ).
2013-01-14 14:09:53 +01:00
2017-05-10 16:20:38 +02:00
## Distributed Publish Subscribe
2014-02-21 11:24:01 +01:00
Publish-subscribe messaging between actors in the cluster, and point-to-point messaging
using the logical path of the actors, i.e. the sender does not have to know on which
node the destination actor is running.
2013-04-06 16:22:30 +02:00
2017-05-10 16:20:38 +02:00
See @ref: [Distributed Publish Subscribe in Cluster ](distributed-pub-sub.md ).
2013-04-06 16:22:30 +02:00
2017-05-10 16:20:38 +02:00
## Cluster Client
2013-04-14 22:30:09 +02:00
2014-02-21 11:24:01 +01:00
Communication from an actor system that is not part of the cluster to actors running
somewhere in the cluster. The client does not have to know on which node the destination
actor is running.
2017-05-10 16:20:38 +02:00
See @ref: [Cluster Client ](cluster-client.md ).
2013-04-14 22:30:09 +02:00
2017-05-10 16:20:38 +02:00
## Distributed Data
2015-06-30 11:43:37 +02:00
*Akka Distributed Data* is useful when you need to share data between nodes in an
Akka Cluster. The data is accessed with an actor providing a key-value store like API.
2017-05-11 17:27:57 +02:00
See @ref: [Distributed Data ](distributed-data.md ).
2013-04-14 22:30:09 +02:00
2017-05-10 16:20:38 +02:00
## Failure Detector
2012-09-20 15:24:07 +02:00
2013-08-27 15:14:53 +02:00
In a cluster each node is monitored by a few (default maximum 5) other nodes, and when
2017-05-10 16:20:38 +02:00
any of these detects the node as `unreachable` that information will spread to
2013-08-27 15:14:53 +02:00
the rest of the cluster through the gossip. In other words, only one node needs to
2017-05-10 16:20:38 +02:00
mark a node `unreachable` to have the rest of the cluster mark that node `unreachable` .
2013-08-27 15:14:53 +02:00
2017-05-10 16:20:38 +02:00
The failure detector will also detect if the node becomes `reachable` again. When
all nodes that monitored the `unreachable` node detects it as `reachable` again
the cluster, after gossip dissemination, will consider it as `reachable` .
2013-08-27 15:14:53 +02:00
If system messages cannot be delivered to a node it will be quarantined and then it
2017-05-10 16:20:38 +02:00
cannot come back from `unreachable` . This can happen if the there are too many
2015-10-15 08:08:01 +02:00
unacknowledged system messages (e.g. watch, Terminated, remote actor deployment,
2013-08-27 15:14:53 +02:00
failures of actors supervised by remote parent). Then the node needs to be moved
2017-05-10 16:20:38 +02:00
to the `down` or `removed` states and the actor system of the quarantined node
2016-06-20 13:10:43 +02:00
must be restarted before it can join the cluster again.
2013-08-27 15:14:53 +02:00
2012-09-20 15:24:07 +02:00
The nodes in the cluster monitor each other by sending heartbeats to detect if a node is
unreachable from the rest of the cluster. The heartbeat arrival times is interpreted
2012-09-23 12:55:31 +02:00
by an implementation of
2017-05-10 16:20:38 +02:00
[The Phi Accrual Failure Detector ](http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf ).
2012-09-21 16:23:55 +02:00
The suspicion level of failure is given by a value called *phi* .
The basic idea of the phi failure detector is to express the value of *phi* on a scale that
2012-09-23 12:55:31 +02:00
is dynamically adjusted to reflect current network conditions.
2012-09-21 16:23:55 +02:00
2017-05-10 16:20:38 +02:00
The value of *phi* is calculated as:
2012-09-21 16:23:55 +02:00
2017-05-10 16:20:38 +02:00
```
phi = -log10(1 - F(timeSinceLastHeartbeat))
```
2012-09-21 16:23:55 +02:00
where F is the cumulative distribution function of a normal distribution with mean
and standard deviation estimated from historical heartbeat inter-arrival times.
2012-09-20 15:24:07 +02:00
2017-05-11 17:27:57 +02:00
In the [configuration ](#cluster-configuration ) you can adjust the `akka.cluster.failure-detector.threshold`
2012-09-23 12:55:31 +02:00
to define when a *phi* value is considered to be a failure.
2012-09-21 16:23:55 +02:00
2017-05-10 16:20:38 +02:00
A low `threshold` is prone to generate many false positives but ensures
a quick detection in the event of a real crash. Conversely, a high `threshold`
2012-09-20 15:24:07 +02:00
generates fewer mistakes but needs more time to detect actual crashes. The
2017-05-10 16:20:38 +02:00
default `threshold` is 8 and is appropriate for most situations. However in
2012-09-20 15:24:07 +02:00
cloud environments, such as Amazon EC2, the value could be increased to 12 in
order to account for network issues that sometimes occur on such platforms.
2012-09-23 12:55:31 +02:00
The following chart illustrates how *phi* increase with increasing time since the
previous heartbeat.
2012-09-20 15:24:07 +02:00
2017-05-10 16:20:38 +02:00

2012-09-20 15:24:07 +02:00
Phi is calculated from the mean and standard deviation of historical
inter arrival times. The previous chart is an example for standard deviation
2012-09-23 12:55:31 +02:00
of 200 ms. If the heartbeats arrive with less deviation the curve becomes steeper,
2013-03-14 20:32:43 +01:00
i.e. it is possible to determine failure more quickly. The curve looks like this for
2012-09-21 16:23:55 +02:00
a standard deviation of 100 ms.
2012-09-20 15:24:07 +02:00
2017-05-10 16:20:38 +02:00

2012-09-20 15:24:07 +02:00
To be able to survive sudden abnormalities, such as garbage collection pauses and
2012-09-23 12:55:31 +02:00
transient network failures the failure detector is configured with a margin,
2017-05-10 16:20:38 +02:00
`akka.cluster.failure-detector.acceptable-heartbeat-pause` . You may want to
2017-11-16 03:22:55 +02:00
adjust the [configuration ](#cluster-configuration ) of this depending on your environment.
2017-05-10 16:20:38 +02:00
This is how the curve looks like for `acceptable-heartbeat-pause` configured to
2012-09-20 15:24:07 +02:00
3 seconds.
2017-05-10 16:20:38 +02:00

2013-01-31 09:30:05 +01:00
2014-02-04 15:51:08 +01:00
Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects
network failures and JVM crashes, in addition to graceful termination of watched
2017-05-10 16:20:38 +02:00
actor. Death watch generates the `Terminated` message to the watching actor when the
2015-10-15 08:08:01 +02:00
unreachable cluster node has been downed and removed.
2013-01-31 09:30:05 +01:00
2015-10-15 08:08:01 +02:00
If you encounter suspicious false positives when the system is under load you should
2017-05-11 17:27:57 +02:00
define a separate dispatcher for the cluster actors as described in [Cluster Dispatcher ](#cluster-dispatcher ).
2012-11-08 18:49:54 +01:00
2017-05-11 17:27:57 +02:00
< a id = "cluster-aware-routers" > < / a >
2017-05-10 16:20:38 +02:00
## Cluster Aware Routers
2012-09-13 10:54:14 +02:00
2017-05-10 16:20:38 +02:00
All @ref: [routers ](routing.md ) can be made aware of member nodes in the cluster, i.e.
2012-09-13 10:54:14 +02:00
deploying new routees or looking up routees on nodes in the cluster.
2013-08-27 15:14:53 +02:00
When a node becomes unreachable or leaves the cluster the routees of that node are
2014-05-10 16:49:09 -04:00
automatically unregistered from the router. When new nodes join the cluster, additional
2015-10-15 08:08:01 +02:00
routees are added to the router, according to the configuration. Routees are also added
2013-08-27 15:14:53 +02:00
when a node becomes reachable again, after having been unreachable.
2012-09-13 10:54:14 +02:00
2017-05-11 17:27:57 +02:00
Cluster aware routers make use of members with status [WeaklyUp ](#weakly-up ) if that feature
2015-09-04 12:38:49 +02:00
is enabled.
2015-10-15 08:08:01 +02:00
There are two distinct types of routers.
2013-07-01 14:00:24 +02:00
2017-05-10 16:20:38 +02:00
* **Group - router that sends messages to the specified path using actor selection**
The routees can be shared among routers running on different nodes in the cluster.
One example of a use case for this type of router is a service running on some backend
nodes in the cluster and used by routers running on front-end nodes in the cluster.
* **Pool - router that creates routees as child actors and deploys them on remote nodes.**
Each router will have its own routee instances. For example, if you start a router
on 3 nodes in a 10-node cluster, you will have 30 routees in total if the router is
configured to use one instance per node. The routees created by the different routers
will not be shared among the routers. One example of a use case for this type of router
is a single master that coordinates jobs and delegates the actual work to routees running
on other nodes in the cluster.
### Router with Group of Routees
When using a `Group` you must start the routee actors on the cluster member nodes.
That is not done by the router. The configuration for a group looks like this::
```
akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing-group
routees.paths = ["/user/statsWorker"]
cluster {
enabled = on
allow-local-routees = on
2017-08-09 16:06:18 +02:00
use-roles = ["compute"]
2017-05-10 16:20:38 +02:00
}
2017-02-14 13:10:23 +02:00
}
2017-05-10 16:20:38 +02:00
}
```
2012-09-13 10:54:14 +02:00
2017-05-10 16:20:38 +02:00
@@@ note
2013-07-01 14:00:24 +02:00
2017-05-10 16:20:38 +02:00
The routee actors should be started as early as possible when starting the actor system, because
the router will try to use them as soon as the member status is changed to 'Up'.
@@@
The actor paths without address information that are defined in `routees.paths` are used for selecting the
2015-07-01 14:49:06 +02:00
actors to which the messages will be forwarded to by the router.
2017-05-11 17:27:57 +02:00
Messages will be forwarded to the routees using @ref: [ActorSelection ](actors.md#actorselection ), so the same delivery semantics should be expected.
2017-08-09 16:06:18 +02:00
It is possible to limit the lookup of routees to member nodes tagged with a particular set of roles by specifying `use-roles` .
2012-09-13 10:54:14 +02:00
2017-05-10 16:20:38 +02:00
`max-total-nr-of-instances` defines total number of routees in the cluster. By default `max-total-nr-of-instances`
2015-08-20 15:13:35 +02:00
is set to a high value (10000) that will result in new routees added to the router when nodes join the cluster.
Set it to a lower value if you want to limit total number of routees.
2012-09-13 10:54:14 +02:00
The same type of router could also have been defined in code:
2017-06-14 17:52:01 +09:00
Scala
: @@snip [StatsService.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsService.scala ) { #router -lookup-in-code }
Java
: @@snip [StatsService.java ]($code$/java/jdocs/cluster/StatsService.java ) { #router -lookup-in-code }
2012-09-13 10:54:14 +02:00
2017-05-11 17:27:57 +02:00
See [configuration ](#cluster-configuration ) section for further descriptions of the settings.
2012-09-13 10:54:14 +02:00
2017-05-10 16:20:38 +02:00
### Router Example with Group of Routees
2012-09-13 10:54:14 +02:00
2015-10-15 08:08:01 +02:00
Let's take a look at how to use a cluster aware router with a group of routees,
2013-09-19 08:00:05 +02:00
i.e. router sending to the paths of the routees.
2012-09-13 10:54:14 +02:00
The example application provides a service to calculate statistics for a text.
When some text is sent to the service it splits it into words, and delegates the task
to count number of characters in each word to a separate worker, a routee of a router.
2012-09-23 12:55:31 +02:00
The character count for each word is sent back to an aggregator that calculates
2012-09-13 10:54:14 +02:00
the average number of characters per word when all results have been collected.
Messages:
2017-06-14 17:52:01 +09:00
Scala
: @@snip [StatsMessages.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsMessages.scala ) { #messages }
Java
: @@snip [StatsMessages.java ]($code$/java/jdocs/cluster/StatsMessages.java ) { #messages }
2012-09-13 10:54:14 +02:00
The worker that counts number of characters in each word:
2017-06-14 17:52:01 +09:00
Scala
: @@snip [StatsWorker.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsWorker.scala ) { #worker }
Java
: @@snip [StatsWorker.java ]($code$/java/jdocs/cluster/StatsWorker.java ) { #worker }
2012-09-13 10:54:14 +02:00
The service that receives text from users and splits it up into words, delegates to workers and aggregates:
2017-06-14 17:52:01 +09:00
@@@ div { .group-scala }
2017-05-11 11:59:28 +03:00
@@snip [StatsService.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsService.scala ) { #service }
2012-09-13 10:54:14 +02:00
2017-06-14 17:52:01 +09:00
@@@
@@@ div { .group-java }
@@snip [StatsService.java ]($code$/java/jdocs/cluster/StatsService.java ) { #service }
@@snip [StatsAggregator.java ]($code$/java/jdocs/cluster/StatsAggregator.java ) { #aggregator }
@@@
2012-09-13 10:54:14 +02:00
Note, nothing cluster specific so far, just plain actors.
2017-05-10 16:20:38 +02:00
All nodes start `StatsService` and `StatsWorker` actors. Remember, routees are the workers in this case.
The router is configured with `routees.paths` ::
```
akka.actor.deployment {
/statsService/workerRouter {
router = consistent-hashing-group
routees.paths = ["/user/statsWorker"]
cluster {
enabled = on
allow-local-routees = on
2017-08-09 16:06:18 +02:00
use-roles = ["compute"]
2017-02-14 13:10:23 +02:00
}
2017-05-10 16:20:38 +02:00
}
}
```
2012-09-13 10:54:14 +02:00
2017-05-10 16:20:38 +02:00
This means that user requests can be sent to `StatsService` on any node and it will use
`StatsWorker` on all nodes.
2012-09-13 10:54:14 +02:00
2017-03-31 13:52:05 +03:00
The easiest way to run **Router Example with Group of Routees** example yourself is to download the ready to run
2017-06-14 17:52:01 +09:00
@scala [@extref[Akka Cluster Sample with Scala ](ecs:akka-samples-cluster-scala )]
@java [@extref[Akka Cluster Sample with Java ](ecs:akka-samples-cluster-java )]
2017-03-31 13:52:05 +03:00
together with the tutorial. It contains instructions on how to run the **Router Example with Group of Routees** sample.
2017-06-14 17:52:01 +09:00
The source code of this sample can be found in the
@scala [@extref[Akka Samples Repository ](samples:akka-sample-cluster-scala )]@java [@extref[Akka Samples Repository ](samples:akka-sample-cluster-java )].
2017-05-10 16:20:38 +02:00
### Router with Pool of Remote Deployed Routees
When using a `Pool` with routees created and deployed on the cluster member nodes
the configuration for a router looks like this::
```
akka.actor.deployment {
/statsService/singleton/workerRouter {
router = consistent-hashing-pool
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
2017-08-09 16:06:18 +02:00
use-roles = ["compute"]
2017-05-10 16:20:38 +02:00
}
2017-02-14 13:10:23 +02:00
}
2017-05-10 16:20:38 +02:00
}
```
2013-07-01 14:00:24 +02:00
2017-08-09 16:06:18 +02:00
It is possible to limit the deployment of routees to member nodes tagged with a particular set of roles by
specifying `use-roles` .
2013-07-01 14:00:24 +02:00
2017-05-10 16:20:38 +02:00
`max-total-nr-of-instances` defines total number of routees in the cluster, but the number of routees
per node, `max-nr-of-instances-per-node` , will not be exceeded. By default `max-total-nr-of-instances`
2015-08-20 15:13:35 +02:00
is set to a high value (10000) that will result in new routees added to the router when nodes join the cluster.
Set it to a lower value if you want to limit total number of routees.
2013-07-01 14:00:24 +02:00
The same type of router could also have been defined in code:
2017-06-14 17:52:01 +09:00
Scala
: @@snip [StatsService.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsService.scala ) { #router -deploy-in-code }
Java
: @@snip [StatsService.java ]($code$/java/jdocs/cluster/StatsService.java ) { #router -deploy-in-code }
2013-07-01 14:00:24 +02:00
2017-05-11 17:27:57 +02:00
See [configuration ](#cluster-configuration ) section for further descriptions of the settings.
2013-07-01 14:00:24 +02:00
2017-05-10 16:20:38 +02:00
### Router Example with Pool of Remote Deployed Routees
2013-01-14 14:09:53 +01:00
2013-07-01 14:00:24 +02:00
Let's take a look at how to use a cluster aware router on single master node that creates
2017-05-10 16:20:38 +02:00
and deploys workers. To keep track of a single master we use the @ref: [Cluster Singleton ](cluster-singleton.md )
2017-06-14 17:52:01 +09:00
in the cluster-tools module. The `ClusterSingletonManager` is started on each node:
Scala
: @@@vars
```
system.actorOf(
ClusterSingletonManager.props(
singletonProps = Props[StatsService],
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(system).withRole("compute")),
name = "statsService")
```
@@@
Java
: @@snip [StatsSampleOneMasterMain.java ]($code$/java/jdocs/cluster/StatsSampleOneMasterMain.java ) { #create -singleton-manager }
2013-01-14 14:09:53 +01:00
We also need an actor on each node that keeps track of where current single master exists and
2017-06-14 17:52:01 +09:00
delegates jobs to the `StatsService` . That is provided by the `ClusterSingletonProxy` :
Scala
: @@@vars
```
system.actorOf(
ClusterSingletonProxy.props(
singletonManagerPath = "/user/statsService",
settings = ClusterSingletonProxySettings(system).withRole("compute")),
name = "statsServiceProxy")
```
@@@
Java
: @@snip [StatsSampleOneMasterMain.java ]($code$/java/jdocs/cluster/StatsSampleOneMasterMain.java ) { #singleton -proxy }
2017-05-10 16:20:38 +02:00
The `ClusterSingletonProxy` receives text from users and delegates to the current `StatsService` , the single
master. It listens to cluster events to lookup the `StatsService` on the oldest node.
All nodes start `ClusterSingletonProxy` and the `ClusterSingletonManager` . The router is now configured like this::
```
akka.actor.deployment {
/statsService/singleton/workerRouter {
router = consistent-hashing-pool
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
2017-08-09 16:06:18 +02:00
use-roles = ["compute"]
2017-02-14 13:10:23 +02:00
}
2017-05-10 16:20:38 +02:00
}
}
```
2012-09-13 10:54:14 +02:00
2017-03-31 13:52:05 +03:00
The easiest way to run **Router Example with Pool of Remote Deployed Routees** example yourself is to download the ready to run
2017-06-14 17:52:01 +09:00
@scala [@extref[Akka Cluster Sample with Scala ](ecs:akka-samples-cluster-scala )]
@java [@extref[Akka Cluster Sample with Java ](ecs:akka-samples-cluster-java )]
2017-03-31 13:52:05 +03:00
together with the tutorial. It contains instructions on how to run the **Router Example with Pool of Remote Deployed Routees** sample.
2017-06-14 17:52:01 +09:00
The source code of this sample can be found in the
@scala [@extref[Akka Samples Repository ](samples:akka-sample-cluster-scala )]@java [@extref[Akka Samples Repository ](samples:akka-sample-cluster-java )].
2012-09-21 11:47:50 +02:00
2017-05-10 16:20:38 +02:00
## Cluster Metrics
2012-11-08 18:49:54 +01:00
2014-12-12 11:49:32 -06:00
The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes
2017-05-10 16:20:38 +02:00
and to the registered subscribers on the system event bus with the help of `cluster-metrics` .
2012-11-08 18:49:54 +01:00
2017-06-14 17:52:01 +09:00
@@@ div { .group-scala }
2017-05-10 16:20:38 +02:00
## How to Test
2012-09-21 11:47:50 +02:00
2017-05-11 17:27:57 +02:00
@ref: [Multi Node Testing ](multi-node-testing.md ) is useful for testing cluster applications.
2012-09-21 11:47:50 +02:00
2017-05-11 17:27:57 +02:00
Set up your project according to the instructions in @ref: [Multi Node Testing ](multi-node-testing.md ) and @ref: [Multi JVM Testing ](multi-jvm-testing.md ), i.e.
2017-05-10 16:20:38 +02:00
add the `sbt-multi-jvm` plugin and the dependency to `akka-multi-node-testkit` .
2012-09-21 11:47:50 +02:00
2017-05-11 17:27:57 +02:00
First, as described in @ref: [Multi Node Testing ](multi-node-testing.md ), we need some scaffolding to configure the `MultiNodeSpec` .
Define the participating roles and their [configuration ](#cluster-configuration ) in an object extending `MultiNodeConfig` :
2012-09-21 11:47:50 +02:00
2017-05-11 11:59:28 +03:00
@@snip [StatsSampleSpec.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala ) { #MultiNodeConfig }
2012-09-21 11:47:50 +02:00
Define one concrete test class for each role/node. These will be instantiated on the different nodes (JVMs). They can be
implemented differently, but often they are the same and extend an abstract test class, as illustrated here.
2017-05-11 11:59:28 +03:00
@@snip [StatsSampleSpec.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala ) { #concrete -tests }
2012-09-21 11:47:50 +02:00
2017-05-10 16:20:38 +02:00
Note the naming convention of these classes. The name of the classes must end with `MultiJvmNode1` , `MultiJvmNode2`
and so on. It is possible to define another suffix to be used by the `sbt-multi-jvm` , but the default should be
2012-09-21 11:47:50 +02:00
fine in most cases.
2017-05-10 16:20:38 +02:00
Then the abstract `MultiNodeSpec` , which takes the `MultiNodeConfig` as constructor parameter.
2012-09-21 11:47:50 +02:00
2017-05-11 11:59:28 +03:00
@@snip [StatsSampleSpec.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala ) { #abstract -test }
2012-09-21 11:47:50 +02:00
2012-09-21 16:53:21 +02:00
Most of this can of course be extracted to a separate trait to avoid repeating this in all your tests.
2012-09-21 11:47:50 +02:00
Typically you begin your test by starting up the cluster and let the members join, and create some actors.
That can be done like this:
2017-05-11 11:59:28 +03:00
@@snip [StatsSampleSpec.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala ) { #startup -cluster }
2012-09-21 11:47:50 +02:00
2017-05-10 16:20:38 +02:00
From the test you interact with the cluster using the `Cluster` extension, e.g. `join` .
2012-09-21 11:47:50 +02:00
2017-05-11 11:59:28 +03:00
@@snip [StatsSampleSpec.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala ) { #join }
2012-09-21 11:47:50 +02:00
2017-05-11 17:27:57 +02:00
Notice how the *testActor* from @ref: [testkit ](testing.md ) is added as [subscriber ](#cluster-subscriber )
2012-09-21 11:47:50 +02:00
to cluster changes and then waiting for certain events, such as in this case all members becoming 'Up'.
2017-05-10 16:20:38 +02:00
The above code was running for all roles (JVMs). `runOn` is a convenient utility to declare that a certain block
2012-09-21 11:47:50 +02:00
of code should only run for a specific role.
2017-05-11 11:59:28 +03:00
@@snip [StatsSampleSpec.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala ) { #test -statsService }
2012-09-21 11:47:50 +02:00
2017-05-10 16:20:38 +02:00
Once again we take advantage of the facilities in @ref: [testkit ](testing.md ) to verify expected behavior.
Here using `testActor` as sender (via `ImplicitSender` ) and verifying the reply with `expectMsgPF` .
2012-09-21 11:47:50 +02:00
2017-05-10 16:20:38 +02:00
In the above code you can see `node(third)` , which is useful facility to get the root actor reference of
the actor system for a specific role. This can also be used to grab the `akka.actor.Address` of that node.
2012-09-21 11:47:50 +02:00
2017-05-11 11:59:28 +03:00
@@snip [StatsSampleSpec.scala ]($akka$/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/sample/StatsSampleSpec.scala ) { #addresses }
2012-09-21 11:47:50 +02:00
2017-06-14 17:52:01 +09:00
@@@
@@@ div { .group-java }
## How to Test
Currently testing with the `sbt-multi-jvm` plugin is only documented for Scala.
2017-10-30 13:45:30 +01:00
Go to the corresponding Scala version of this page for details.
2017-06-14 17:52:01 +09:00
@@@
2017-05-10 16:20:38 +02:00
## Management
2012-09-21 11:47:50 +02:00
2017-05-11 17:27:57 +02:00
< a id = "cluster-http" > < / a >
2017-05-10 16:20:38 +02:00
### HTTP
2016-12-13 10:54:41 +01:00
2017-03-31 13:52:05 +03:00
Information and management of the cluster is available with a HTTP API.
2017-05-11 17:27:57 +02:00
See documentation of [Akka Management ](http://developer.lightbend.com/docs/akka-management/current/ ).
2012-09-19 16:46:39 +02:00
2017-05-11 17:27:57 +02:00
< a id = "cluster-jmx" > < / a >
2017-05-10 16:20:38 +02:00
### JMX
2012-09-19 16:46:39 +02:00
2017-05-10 16:20:38 +02:00
Information and management of the cluster is available as JMX MBeans with the root name `akka.Cluster` .
2012-09-19 16:46:39 +02:00
The JMX information can be displayed with an ordinary JMX console such as JConsole or JVisualVM.
From JMX you can:
2017-05-10 16:20:38 +02:00
* see what members that are part of the cluster
* see status of this node
* see roles of each member
* join this node to another node in cluster
* mark any node in the cluster as down
* tell any node in the cluster to leave
2012-09-19 16:46:39 +02:00
2017-05-10 16:20:38 +02:00
Member nodes are identified by their address, in format *akka.<protocol>://<actor-system-name>@<hostname>:<port>* .
2012-09-19 16:46:39 +02:00
2017-05-11 17:27:57 +02:00
< a id = "cluster-command-line" > < / a >
2017-05-10 16:20:38 +02:00
### Command Line
2012-09-19 16:46:39 +02:00
2017-05-10 16:20:38 +02:00
@@@ warning
2016-12-13 10:54:41 +01:00
2017-05-10 16:20:38 +02:00
**Deprecation warning** - The command line script has been deprecated and is scheduled for removal
2017-05-11 17:27:57 +02:00
in the next major version. Use the [HTTP management ](#cluster-http ) API with [curl ](https://curl.haxx.se/ )
2017-05-10 16:20:38 +02:00
or similar instead.
2012-09-19 16:46:39 +02:00
2017-05-10 16:20:38 +02:00
@@@
2012-09-19 16:46:39 +02:00
2017-05-26 09:00:20 +02:00
The cluster can be managed with the script `akka-cluster` provided in the Akka GitHub repository @extref [here ](github:akka-cluster/jmx-client ). Place the script and the `jmxsh-R5.jar` library in the same directory.
2012-09-19 16:46:39 +02:00
2017-05-10 16:20:38 +02:00
Run it without parameters to see instructions about how to use the script:
2012-09-19 16:46:39 +02:00
2017-05-10 16:20:38 +02:00
```
Usage: ./akka-cluster < node-hostname > < jmx-port > < command > ...
2012-09-19 16:46:39 +02:00
2017-05-10 16:20:38 +02:00
Supported commands are:
join < node-url > - Sends request a JOIN node with the specified URL
leave < node-url > - Sends a request for node with URL to LEAVE the cluster
down < node-url > - Sends a request for marking node with URL as DOWN
member-status - Asks the member node for its current status
members - Asks the cluster for addresses of current members
unreachable - Asks the cluster for addresses of unreachable members
cluster-status - Asks the cluster for its current status (member ring,
unavailable nodes, meta data etc.)
leader - Asks the cluster who the current leader is
is-singleton - Checks if the cluster is a singleton cluster (single
node cluster)
is-available - Checks if the member node is available
Where the < node-url > should be on the format of
'akka.< protocol > ://< actor-system-name > @< hostname > :< port > '
2012-09-19 16:46:39 +02:00
2017-05-10 16:20:38 +02:00
Examples: ./akka-cluster localhost 9999 is-available
./akka-cluster localhost 9999 join akka.tcp://MySystem@darkstar:2552
./akka-cluster localhost 9999 cluster-status
```
2012-09-19 16:46:39 +02:00
2012-09-23 12:55:31 +02:00
To be able to use the script you must enable remote monitoring and management when starting the JVMs of the cluster nodes,
2017-05-10 16:20:38 +02:00
as described in [Monitoring and Management Using JMX Technology ](http://docs.oracle.com/javase/8/docs/technotes/guides/management/agent.html ).
2016-12-13 10:54:41 +01:00
Make sure you understand the security implications of enabling remote monitoring and management.
2012-09-19 16:46:39 +02:00
2017-05-11 17:27:57 +02:00
< a id = "cluster-configuration" > < / a >
2017-05-10 16:20:38 +02:00
## Configuration
2012-08-27 07:47:34 +02:00
2015-10-15 08:08:01 +02:00
There are several configuration properties for the cluster. We refer to the
2017-05-11 17:27:57 +02:00
@ref: [reference configuration ](general/configuration.md#config-akka-cluster ) for more information.
2013-05-23 13:36:35 +02:00
2017-05-10 16:20:38 +02:00
### Cluster Info Logging
2013-05-23 13:36:35 +02:00
2017-05-10 16:20:38 +02:00
You can silence the logging of cluster events at info level with configuration property:
2013-07-01 17:17:12 +02:00
2017-05-10 16:20:38 +02:00
```
akka.cluster.log-info = off
```
2013-07-01 17:17:12 +02:00
2017-05-11 17:27:57 +02:00
< a id = "cluster-dispatcher" > < / a >
2017-05-10 16:20:38 +02:00
### Cluster Dispatcher
2013-07-01 17:17:12 +02:00
Under the hood the cluster extension is implemented with actors and it can be necessary
to create a bulkhead for those actors to avoid disturbance from other actors. Especially
the heartbeating actors that is used for failure detection can generate false positives
if they are not given a chance to run at regular intervals.
2017-05-10 16:20:38 +02:00
For this purpose you can define a separate dispatcher to be used for the cluster actors:
2013-07-01 17:17:12 +02:00
2017-05-10 16:20:38 +02:00
```
akka.cluster.use-dispatcher = cluster-dispatcher
2013-07-01 17:17:12 +02:00
2017-05-10 16:20:38 +02:00
cluster-dispatcher {
type = "Dispatcher"
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-max = 4
2013-07-01 17:17:12 +02:00
}
2017-05-10 16:20:38 +02:00
}
```
@@@ note
Normally it should not be necessary to configure a separate dispatcher for the Cluster.
The default-dispatcher should be sufficient for performing the Cluster tasks, i.e. `akka.cluster.use-dispatcher`
should not be changed. If you have Cluster related problems when using the default-dispatcher that is typically an
indication that you are running blocking or CPU intensive actors/tasks on the default-dispatcher.
Use dedicated dispatchers for such actors/tasks instead of running them on the default-dispatcher,
because that may starve system internal tasks.
Related config properties: `akka.cluster.use-dispatcher = akka.cluster.cluster-dispatcher` .
Corresponding default values: `akka.cluster.use-dispatcher =` .
2016-07-11 06:48:07 -07:00
2017-05-26 09:00:20 +02:00
@@@