pekko/akka-docs/src/main/paradox/cluster-usage.md
2020-06-25 11:59:37 +02:00

19 KiB

Classic Cluster Usage

@@includeincludes.md { #actor-api } For the full documentation of this feature and for new projects see @ref:Cluster. For specific documentation topics see:

@@@ note

You have to enable @ref:serialization to send messages between ActorSystems in the Cluster. @ref:Serialization with Jackson is a good choice in many cases, and our recommendation if you don't have other preferences or constraints.

@@@

Module info

To use Akka Cluster add the following dependency in your project:

@@dependency[sbt,Maven,Gradle] { symbol1=AkkaVersion value1="$akka.version$" group="com.typesafe.akka" artifact="akka-cluster_$scala.binary.version$" version=AkkaVersion }

@@project-info{ projectId="akka-cluster" }

When and where to use Akka Cluster

See @ref:Choosing Akka Cluster in the documentation of the new APIs.

Cluster API Extension

The following configuration enables the Cluster extension to be used. It joins the cluster and an actor subscribes to cluster membership events and logs them.

An actor that uses the cluster extension may look like this:

Scala
@@snip SimpleClusterListener.scala { type=scala }
Java
@@snip SimpleClusterListener.java { type=java }

And the minimum configuration required is to set a host/port for remoting and the akka.actor.provider = "cluster".

@@snip BasicClusterExampleSpec.scala { #config-seeds }

The actor registers itself as subscriber of certain cluster events. It receives events corresponding to the current state of the cluster when the subscription starts and then it receives events for changes that happen in the cluster.

Cluster Membership API

This section shows the basic usage of the membership API. For the in-depth descriptions on joining, joining to seed nodes, downing and leaving of any node in the cluster please see the full @ref:Cluster Membership API documentation.

Joining to Seed Nodes

The seed nodes are initial contact points for joining a cluster, which can be done in different ways:

After the joining process the seed nodes are not special and they participate in the cluster in exactly the same way as other nodes.

Joining programmatically to seed nodes

You may also join programmatically, which is attractive when dynamically discovering other nodes at startup by using some external tool or API.

Scala
@@snip ClusterDocSpec.scala { #join-seed-nodes }
Java
@@snip ClusterDocTest.java { #join-seed-nodes-imports #join-seed-nodes }

For more information see @reftuning joins

It's also possible to specifically join a single node as illustrated in below example, but joinSeedNodes should be preferred since it has redundancy and retry mechanisms built-in.

Scala
@@snip SimpleClusterListener2.scala { #join }
Java
@@snip SimpleClusterListener2.java { #join }

Leaving

See @ref:Leaving in the documentation of the new APIs.

Downing

See @ref:Downing in the documentation of the new APIs.

Subscribe to Cluster Events

You can subscribe to change notifications of the cluster membership by using @scala[Cluster(system).subscribe]@java[Cluster.get(system).subscribe].

Scala
@@snip SimpleClusterListener2.scala { #subscribe }
Java
@@snip SimpleClusterListener2.java { #subscribe }

A snapshot of the full state, akka.cluster.ClusterEvent.CurrentClusterState, is sent to the subscriber as the first message, followed by events for incremental updates.

Note that you may receive an empty CurrentClusterState, containing no members, followed by MemberUp events from other nodes which already joined, if you start the subscription before the initial join procedure has completed. This may for example happen when you start the subscription immediately after cluster.join() like below. This is expected behavior. When the node has been accepted in the cluster you will receive MemberUp for that node, and other nodes.

Scala
@@snip SimpleClusterListener2.scala { #join #subscribe }
Java
@@snip SimpleClusterListener2.java { #join #subscribe }

To avoid receiving an empty CurrentClusterState at the beginning, you can use it like shown in the following example, to defer subscription until the MemberUp event for the own node is received:

Scala
@@snip SimpleClusterListener2.scala { #join #register-on-memberup }
Java
@@snip SimpleClusterListener2.java { #join #register-on-memberup }

If you find it inconvenient to handle the CurrentClusterState you can use @scala[ClusterEvent.InitialStateAsEvents] @java[ClusterEvent.initialStateAsEvents()] as parameter to subscribe. That means that instead of receiving CurrentClusterState as the first message you will receive the events corresponding to the current state to mimic what you would have seen if you were listening to the events when they occurred in the past. Note that those initial events only correspond to the current state and it is not the full history of all changes that actually has occurred in the cluster.

Scala
@@snip SimpleClusterListener.scala { #subscribe }
Java
@@snip SimpleClusterListener.java { #subscribe }

Worker Dial-in Example

Let's take a look at an example that illustrates how workers, here named backend, can detect and register to new master nodes, here named frontend.

The example application provides a service to transform text. When some text is sent to one of the frontend services, it will be delegated to one of the backend workers, which performs the transformation job, and sends the result back to the original client. New backend nodes, as well as new frontend nodes, can be added or removed to the cluster dynamically.

Messages:

Scala
@@snip TransformationMessages.scala { #messages }
Java
@@snip TransformationMessages.java { #messages }

The backend worker that performs the transformation job:

Scala
@@snip TransformationBackend.scala { #backend }
Java
@@snip TransformationBackend.java { #backend }

Note that the TransformationBackend actor subscribes to cluster events to detect new, potential, frontend nodes, and send them a registration message so that they know that they can use the backend worker.

The frontend that receives user jobs and delegates to one of the registered backend workers:

Scala
@@snip TransformationFrontend.scala { #frontend }
Java
@@snip TransformationFrontend.java { #frontend }

Note that the TransformationFrontend actor watch the registered backend to be able to remove it from its list of available backend workers. Death watch uses the cluster failure detector for nodes in the cluster, i.e. it detects network failures and JVM crashes, in addition to graceful termination of watched actor. Death watch generates the Terminated message to the watching actor when the unreachable cluster node has been downed and removed.

Node Roles

See @ref:Cluster Node Roles in the documentation of the new APIs.

How To Startup when Cluster Size Reached

See @ref:How to startup when a minimum number of members in the cluster is reached in the documentation of the new APIs.

How To Startup when Member is Up

A common use case is to start actors after the cluster has been initialized, members have joined, and the cluster has reached a certain size.

With a configuration option you can define required number of members before the leader changes member status of 'Joining' members to 'Up'.:

akka.cluster.min-nr-of-members = 3

In a similar way you can define required number of members of a certain role before the leader changes member status of 'Joining' members to 'Up'.:

akka.cluster.role {
  frontend.min-nr-of-members = 1
  backend.min-nr-of-members = 2
}

You can start actors or trigger any functions using the registerOnMemberUp callback, which will be invoked when the current member status is changed to 'Up'. This can additionally be used with akka.cluster.min-nr-of-members optional configuration to defer an action until the cluster has reached a certain size.

Scala
@@snip FactorialFrontend.scala { #registerOnUp }
Java
@@snip FactorialFrontendMain.java { #registerOnUp }

How To Cleanup when Member is Removed

You can do some clean up in a registerOnMemberRemoved callback, which will be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown.

An alternative is to register tasks to the @ref:Coordinated Shutdown.

@@@ note

Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may want to install some cleanup handling after the cluster was started up, but the cluster might already be shutting down when you installing, and depending on the race is not healthy.

@@@

Higher level Cluster tools

@@includecluster.md { #cluster-singleton } See @ref:Cluster Singleton.

@@includecluster.md { #cluster-sharding } See @ref:Cluster Sharding.

@@includecluster.md { #cluster-ddata } See @ref:Distributed Data.

@@includecluster.md { #cluster-pubsub } See @ref:Cluster Distributed Publish Subscribe.

Cluster Aware Routers

All routers can be made aware of member nodes in the cluster, i.e. deploying new routees or looking up routees on nodes in the cluster. When a node becomes unreachable or leaves the cluster the routees of that node are automatically unregistered from the router. When new nodes join the cluster, additional routees are added to the router, according to the configuration.

See @ref:Cluster Aware Routers and @ref:Routers.

@@includecluster.md { #cluster-multidc } See @ref:Cluster Multi-DC.

Cluster Client

Communication from an actor system that is not part of the cluster to actors running somewhere in the cluster. The client does not have to know on which node the destination actor is running.

See @ref:Cluster Client.

Cluster Metrics

The member nodes of the cluster can collect system health metrics and publish that to other cluster nodes and to the registered subscribers on the system event bus.

See @ref:Cluster Metrics.

Failure Detector

The nodes in the cluster monitor each other by sending heartbeats to detect if a node is unreachable from the rest of the cluster. Please see:

How to Test

@@@ div { .group-scala }

@ref:Multi Node Testing is useful for testing cluster applications.

Set up your project according to the instructions in @ref:Multi Node Testing and @ref:Multi JVM Testing, i.e. add the sbt-multi-jvm plugin and the dependency to akka-multi-node-testkit.

First, as described in @ref:Multi Node Testing, we need some scaffolding to configure the MultiNodeSpec. Define the participating @ref:roles and their @ref:configuration in an object extending MultiNodeConfig:

@@snip StatsSampleSpec.scala { #MultiNodeConfig }

Define one concrete test class for each role/node. These will be instantiated on the different nodes (JVMs). They can be implemented differently, but often they are the same and extend an abstract test class, as illustrated here.

@@snip StatsSampleSpec.scala { #concrete-tests }

Note the naming convention of these classes. The name of the classes must end with MultiJvmNode1, MultiJvmNode2 and so on. It is possible to define another suffix to be used by the sbt-multi-jvm, but the default should be fine in most cases.

Then the abstract MultiNodeSpec, which takes the MultiNodeConfig as constructor parameter.

@@snip StatsSampleSpec.scala { #abstract-test }

Most of this can be extracted to a separate trait to avoid repeating this in all your tests.

Typically you begin your test by starting up the cluster and let the members join, and create some actors. That can be done like this:

@@snip StatsSampleSpec.scala { #startup-cluster }

From the test you interact with the cluster using the Cluster extension, e.g. join.

@@snip StatsSampleSpec.scala { #join }

Notice how the testActor from @ref:testkit is added as @ref:subscriber to cluster changes and then waiting for certain events, such as in this case all members becoming 'Up'.

The above code was running for all roles (JVMs). runOn is a convenient utility to declare that a certain block of code should only run for a specific role.

@@snip StatsSampleSpec.scala { #test-statsService }

Once again we take advantage of the facilities in @ref:testkit to verify expected behavior. Here using testActor as sender (via ImplicitSender) and verifying the reply with expectMsgPF.

In the above code you can see node(third), which is useful facility to get the root actor reference of the actor system for a specific role. This can also be used to grab the akka.actor.Address of that node.

@@snip StatsSampleSpec.scala { #addresses }

@@@

@@@ div { .group-java }

Currently testing with the sbt-multi-jvm plugin is only documented for Scala. Go to the corresponding Scala version of this page for details.

@@@

Management

There are several management tools for the cluster. Please refer to the @ref:Cluster Management for more information.

Command Line

@@@ warning

Deprecation warning - The command line script has been deprecated and is scheduled for removal in the next major version. Use the @ref:HTTP management API with curl or similar instead.

@@@

The cluster can be managed with the script akka-cluster provided in the Akka GitHub repository @extrefhere. Place the script and the jmxsh-R5.jar library in the same directory.

Run it without parameters to see instructions about how to use the script:

Usage: ./akka-cluster <node-hostname> <jmx-port> <command> ...

Supported commands are:
           join <node-url> - Sends request a JOIN node with the specified URL
          leave <node-url> - Sends a request for node with URL to LEAVE the cluster
           down <node-url> - Sends a request for marking node with URL as DOWN
             member-status - Asks the member node for its current status
                   members - Asks the cluster for addresses of current members
               unreachable - Asks the cluster for addresses of unreachable members
            cluster-status - Asks the cluster for its current status (member ring,
                             unavailable nodes, meta data etc.)
                    leader - Asks the cluster who the current leader is
              is-singleton - Checks if the cluster is a singleton cluster (single
                             node cluster)
              is-available - Checks if the member node is available
Where the <node-url> should be on the format of
  'akka.<protocol>://<actor-system-name>@<hostname>:<port>'

Examples: ./akka-cluster localhost 9999 is-available
          ./akka-cluster localhost 9999 join akka://MySystem@darkstar:2552
          ./akka-cluster localhost 9999 cluster-status

To be able to use the script you must enable remote monitoring and management when starting the JVMs of the cluster nodes, as described in Monitoring and Management Using JMX Technology. Make sure you understand the security implications of enabling remote monitoring and management.

Configuration

There are several @ref:configuration properties for the cluster, and the full @ref:reference configuration for complete information.