Merge pull request #17869 from akka/wip-17447-split-docs-patriknw
=cls #17447 Split Cluster Sharding and Tools docs into java/scala
This commit is contained in:
commit
664ae2f8f5
21 changed files with 949 additions and 176 deletions
|
|
@ -82,9 +82,8 @@ object ClusterShardingSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
context.setReceiveTimeout(120.seconds)
|
context.setReceiveTimeout(120.seconds)
|
||||||
|
|
||||||
// self.path.parent.parent.name is the type name (utf-8 URL-encoded)
|
|
||||||
// self.path.name is the entity identifier (utf-8 URL-encoded)
|
// self.path.name is the entity identifier (utf-8 URL-encoded)
|
||||||
override def persistenceId: String = self.path.parent.parent.name + "-" + self.path.name
|
override def persistenceId: String = "Counter-" + self.path.name
|
||||||
|
|
||||||
var count = 0
|
var count = 0
|
||||||
//#counter-actor
|
//#counter-actor
|
||||||
|
|
@ -125,6 +124,15 @@ object ClusterShardingSpec extends MultiNodeConfig {
|
||||||
case Get(id) ⇒ (id % numberOfShards).toString
|
case Get(id) ⇒ (id % numberOfShards).toString
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def qualifiedCounterProps(typeName: String): Props =
|
||||||
|
Props(new QualifiedCounter(typeName))
|
||||||
|
|
||||||
|
class QualifiedCounter(typeName: String) extends Counter {
|
||||||
|
override def persistenceId: String = typeName + "-" + self.path.name
|
||||||
|
}
|
||||||
|
|
||||||
|
class AnotherCounter extends QualifiedCounter("AnotherCounter")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// only used in documentation
|
// only used in documentation
|
||||||
|
|
@ -225,7 +233,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
.withRememberEntities(rememberEntities)
|
.withRememberEntities(rememberEntities)
|
||||||
system.actorOf(ShardRegion.props(
|
system.actorOf(ShardRegion.props(
|
||||||
typeName = typeName,
|
typeName = typeName,
|
||||||
entityProps = Props[Counter],
|
entityProps = qualifiedCounterProps(typeName),
|
||||||
settings = settings,
|
settings = settings,
|
||||||
coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator",
|
coordinatorPath = "/user/" + typeName + "Coordinator/singleton/coordinator",
|
||||||
extractEntityId = extractEntityId,
|
extractEntityId = extractEntityId,
|
||||||
|
|
@ -524,7 +532,7 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
|
||||||
//#counter-start
|
//#counter-start
|
||||||
ClusterSharding(system).start(
|
ClusterSharding(system).start(
|
||||||
typeName = "AnotherCounter",
|
typeName = "AnotherCounter",
|
||||||
entityProps = Props[Counter],
|
entityProps = Props[AnotherCounter],
|
||||||
settings = ClusterShardingSettings(system),
|
settings = ClusterShardingSettings(system),
|
||||||
extractEntityId = extractEntityId,
|
extractEntityId = extractEntityId,
|
||||||
extractShardId = extractShardId)
|
extractShardId = extractShardId)
|
||||||
|
|
|
||||||
|
|
@ -120,11 +120,10 @@ public class ClusterShardingTest {
|
||||||
|
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
|
||||||
// getSelf().path().parent().parent().name() is the type name (utf-8 URL-encoded)
|
|
||||||
// getSelf().path().name() is the entity identifier (utf-8 URL-encoded)
|
// getSelf().path().name() is the entity identifier (utf-8 URL-encoded)
|
||||||
@Override
|
@Override
|
||||||
public String persistenceId() {
|
public String persistenceId() {
|
||||||
return getSelf().path().parent().parent().name() + "-" + getSelf().path().name();
|
return "Counter-" + getSelf().path().name();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
|
|
@ -75,6 +75,7 @@ akka.cluster.client.receptionist {
|
||||||
}
|
}
|
||||||
# //#receptionist-ext-config
|
# //#receptionist-ext-config
|
||||||
|
|
||||||
|
# //#cluster-client-config
|
||||||
# Settings for the ClusterClient
|
# Settings for the ClusterClient
|
||||||
akka.cluster.client {
|
akka.cluster.client {
|
||||||
# Actor paths of the ClusterReceptionist actors on the servers (cluster nodes)
|
# Actor paths of the ClusterReceptionist actors on the servers (cluster nodes)
|
||||||
|
|
@ -111,7 +112,9 @@ akka.cluster.client {
|
||||||
# Maximum allowed buffer size is 10000.
|
# Maximum allowed buffer size is 10000.
|
||||||
buffer-size = 1000
|
buffer-size = 1000
|
||||||
}
|
}
|
||||||
|
# //#cluster-client-config
|
||||||
|
|
||||||
|
# //#singleton-config
|
||||||
akka.cluster.singleton {
|
akka.cluster.singleton {
|
||||||
# The actor name of the child singleton actor.
|
# The actor name of the child singleton actor.
|
||||||
singleton-name = "singleton"
|
singleton-name = "singleton"
|
||||||
|
|
@ -126,7 +129,9 @@ akka.cluster.singleton {
|
||||||
# oldest member is removed from the cluster (+ akka.cluster.down-removal-margin).
|
# oldest member is removed from the cluster (+ akka.cluster.down-removal-margin).
|
||||||
hand-over-retry-interval = 1s
|
hand-over-retry-interval = 1s
|
||||||
}
|
}
|
||||||
|
# //#singleton-config
|
||||||
|
|
||||||
|
# //#singleton-proxy-config
|
||||||
akka.cluster.singleton-proxy {
|
akka.cluster.singleton-proxy {
|
||||||
# The actor name of the singleton actor that is started by the ClusterSingletonManager
|
# The actor name of the singleton actor that is started by the ClusterSingletonManager
|
||||||
singleton-name = ${akka.cluster.singleton.singleton-name}
|
singleton-name = ${akka.cluster.singleton.singleton-name}
|
||||||
|
|
@ -147,3 +152,4 @@ akka.cluster.singleton-proxy {
|
||||||
# Maximum allowed buffer size is 10000.
|
# Maximum allowed buffer size is 10000.
|
||||||
buffer-size = 1000
|
buffer-size = 1000
|
||||||
}
|
}
|
||||||
|
# //#singleton-proxy-config
|
||||||
|
|
|
||||||
|
|
@ -107,11 +107,22 @@ final class ClusterClientSettings(
|
||||||
|
|
||||||
require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000")
|
require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000")
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scala API
|
||||||
|
*/
|
||||||
def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = {
|
def withInitialContacts(initialContacts: Set[ActorPath]): ClusterClientSettings = {
|
||||||
require(initialContacts.nonEmpty, "initialContacts must be defined")
|
require(initialContacts.nonEmpty, "initialContacts must be defined")
|
||||||
copy(initialContacts = initialContacts)
|
copy(initialContacts = initialContacts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API
|
||||||
|
*/
|
||||||
|
def withInitialContacts(initialContacts: java.util.Set[ActorPath]): ClusterClientSettings = {
|
||||||
|
import scala.collection.JavaConverters._
|
||||||
|
withInitialContacts(initialContacts.asScala.toSet)
|
||||||
|
}
|
||||||
|
|
||||||
def withEstablishingGetContactsInterval(establishingGetContactsInterval: FiniteDuration): ClusterClientSettings =
|
def withEstablishingGetContactsInterval(establishingGetContactsInterval: FiniteDuration): ClusterClientSettings =
|
||||||
copy(establishingGetContactsInterval = establishingGetContactsInterval)
|
copy(establishingGetContactsInterval = establishingGetContactsInterval)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -182,14 +182,25 @@ object DistributedPubSubMediator {
|
||||||
}
|
}
|
||||||
|
|
||||||
sealed abstract class GetTopics
|
sealed abstract class GetTopics
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send this message to the mediator and it will reply with
|
||||||
|
* [[CurrentTopics]] containing the names of the (currently known)
|
||||||
|
* registered topic names.
|
||||||
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
case object GetTopics extends GetTopics
|
case object GetTopics extends GetTopics
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API: Send this message to the mediator and it will reply with
|
||||||
|
* [[DistributedPubSubMediator.CurrentTopics]] containing the names of the (currently known)
|
||||||
|
* registered topic names.
|
||||||
*/
|
*/
|
||||||
def getTopicsInstance: GetTopics = GetTopics
|
def getTopicsInstance: GetTopics = GetTopics
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reply to `GetTopics`.
|
||||||
|
*/
|
||||||
@SerialVersionUID(1L)
|
@SerialVersionUID(1L)
|
||||||
final case class CurrentTopics(topics: Set[String]) {
|
final case class CurrentTopics(topics: Set[String]) {
|
||||||
/**
|
/**
|
||||||
|
|
@ -348,7 +359,7 @@ trait DistributedPubSubMessage extends Serializable
|
||||||
* the entries to peer actors among all cluster nodes or a group of nodes
|
* the entries to peer actors among all cluster nodes or a group of nodes
|
||||||
* tagged with a specific role.
|
* tagged with a specific role.
|
||||||
*
|
*
|
||||||
* The `DistributedPubSubMediator` is supposed to be started on all nodes,
|
* The `DistributedPubSubMediator` actor is supposed to be started on all nodes,
|
||||||
* or all nodes with specified role, in the cluster. The mediator can be
|
* or all nodes with specified role, in the cluster. The mediator can be
|
||||||
* started with the [[DistributedPubSub]] extension or as an ordinary actor.
|
* started with the [[DistributedPubSub]] extension or as an ordinary actor.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,65 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package akka.cluster.client;
|
||||||
|
|
||||||
|
import com.typesafe.config.ConfigFactory;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
import org.junit.ClassRule;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import akka.actor.ActorPath;
|
||||||
|
import akka.actor.ActorPaths;
|
||||||
|
import akka.actor.ActorRef;
|
||||||
|
import akka.actor.ActorSystem;
|
||||||
|
import akka.actor.Props;
|
||||||
|
import akka.actor.UntypedActor;
|
||||||
|
import akka.testkit.AkkaJUnitActorSystemResource;
|
||||||
|
|
||||||
|
public class ClusterClientTest {
|
||||||
|
|
||||||
|
@ClassRule
|
||||||
|
public static AkkaJUnitActorSystemResource actorSystemResource =
|
||||||
|
new AkkaJUnitActorSystemResource("DistributedPubSubMediatorTest",
|
||||||
|
ConfigFactory.parseString(
|
||||||
|
"akka.actor.provider = \"akka.cluster.ClusterActorRefProvider\"\n" +
|
||||||
|
"akka.remote.netty.tcp.port=0"));
|
||||||
|
|
||||||
|
private final ActorSystem system = actorSystemResource.getSystem();
|
||||||
|
|
||||||
|
//#initialContacts
|
||||||
|
Set<ActorPath> initialContacts() {
|
||||||
|
return new HashSet<ActorPath>(Arrays.asList(
|
||||||
|
ActorPaths.fromString("akka.tcp://OtherSys@host1:2552/system/receptionist"),
|
||||||
|
ActorPaths.fromString("akka.tcp://OtherSys@host2:2552/system/receptionist")));
|
||||||
|
}
|
||||||
|
//#initialContacts
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void demonstrateUsage() {
|
||||||
|
//#server
|
||||||
|
ActorRef serviceA = system.actorOf(Props.create(Service.class), "serviceA");
|
||||||
|
ClusterClientReceptionist.get(system).registerService(serviceA);
|
||||||
|
|
||||||
|
ActorRef serviceB = system.actorOf(Props.create(Service.class), "serviceB");
|
||||||
|
ClusterClientReceptionist.get(system).registerService(serviceB);
|
||||||
|
//#server
|
||||||
|
|
||||||
|
//#client
|
||||||
|
final ActorRef c = system.actorOf(ClusterClient.props(
|
||||||
|
ClusterClientSettings.create(system).withInitialContacts(initialContacts())),
|
||||||
|
"client");
|
||||||
|
c.tell(new ClusterClient.Send("/user/serviceA", "hello", true), ActorRef.noSender());
|
||||||
|
c.tell(new ClusterClient.SendToAll("/user/serviceB", "hi"), ActorRef.noSender());
|
||||||
|
//#client
|
||||||
|
}
|
||||||
|
|
||||||
|
static public class Service extends UntypedActor {
|
||||||
|
public void onReceive(Object msg) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
147
akka-docs/rst/java/cluster-client.rst
Normal file
147
akka-docs/rst/java/cluster-client.rst
Normal file
|
|
@ -0,0 +1,147 @@
|
||||||
|
.. _cluster-client-java:
|
||||||
|
|
||||||
|
Cluster Client
|
||||||
|
==============
|
||||||
|
|
||||||
|
An actor system that is not part of the cluster can communicate with actors
|
||||||
|
somewhere in the cluster via this ``ClusterClient``. The client can of course be part of
|
||||||
|
another cluster. It only needs to know the location of one (or more) nodes to use as initial
|
||||||
|
contact points. It will establish a connection to a ``ClusterReceptionist`` somewhere in
|
||||||
|
the cluster. It will monitor the connection to the receptionist and establish a new
|
||||||
|
connection if the link goes down. When looking for a new receptionist it uses fresh
|
||||||
|
contact points retrieved from previous establishment, or periodically refreshed contacts,
|
||||||
|
i.e. not necessarily the initial contact points.
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
``ClusterClient`` should not be used when sending messages to actors that run
|
||||||
|
within the same cluster. Similar functionality as the ``ClusterClient`` is
|
||||||
|
provided in a more efficient way by :ref:`distributed-pub-sub-java` for actors that
|
||||||
|
belong to the same cluster.
|
||||||
|
|
||||||
|
Also, note it's necessary to change ``akka.actor.provider`` from ``akka.actor.LocalActorRefProvider``
|
||||||
|
to ``akka.remote.RemoteActorRefProvider`` or ``akka.cluster.ClusterActorRefProvider`` when using
|
||||||
|
the cluster client.
|
||||||
|
|
||||||
|
The receptionist is supposed to be started on all nodes, or all nodes with specified role,
|
||||||
|
in the cluster. The receptionist can be started with the ``ClusterClientReceptionist`` extension
|
||||||
|
or as an ordinary actor.
|
||||||
|
|
||||||
|
You can send messages via the ``ClusterClient`` to any actor in the cluster that is registered
|
||||||
|
in the ``DistributedPubSubMediator`` used by the ``ClusterReceptionist``.
|
||||||
|
The ``ClusterClientReceptionist`` provides methods for registration of actors that
|
||||||
|
should be reachable from the client. Messages are wrapped in ``ClusterClient.Send``,
|
||||||
|
``ClusterClient.SendToAll`` or ``ClusterClient.Publish``.
|
||||||
|
|
||||||
|
**1. ClusterClient.Send**
|
||||||
|
|
||||||
|
The message will be delivered to one recipient with a matching path, if any such
|
||||||
|
exists. If several entries match the path the message will be delivered
|
||||||
|
to one random destination. The sender() of the message can specify that local
|
||||||
|
affinity is preferred, i.e. the message is sent to an actor in the same local actor
|
||||||
|
system as the used receptionist actor, if any such exists, otherwise random to any other
|
||||||
|
matching entry.
|
||||||
|
|
||||||
|
**2. ClusterClient.SendToAll**
|
||||||
|
|
||||||
|
The message will be delivered to all recipients with a matching path.
|
||||||
|
|
||||||
|
**3. ClusterClient.Publish**
|
||||||
|
|
||||||
|
The message will be delivered to all recipients Actors that have been registered as subscribers
|
||||||
|
to the named topic.
|
||||||
|
|
||||||
|
Response messages from the destination actor are tunneled via the receptionist
|
||||||
|
to avoid inbound connections from other cluster nodes to the client, i.e.
|
||||||
|
the ``sender()``, as seen by the destination actor, is not the client itself.
|
||||||
|
The ``sender()`` of the response messages, as seen by the client, is preserved
|
||||||
|
as the original sender(), so the client can choose to send subsequent messages
|
||||||
|
directly to the actor in the cluster.
|
||||||
|
|
||||||
|
While establishing a connection to a receptionist the ``ClusterClient`` will buffer
|
||||||
|
messages and send them when the connection is established. If the buffer is full
|
||||||
|
the ``ClusterClient`` will drop old messages when new messages are sent via the client.
|
||||||
|
The size of the buffer is configurable and it can be disabled by using a buffer size of 0.
|
||||||
|
|
||||||
|
It's worth noting that messages can always be lost because of the distributed nature
|
||||||
|
of these actors. As always, additional logic should be implemented in the destination
|
||||||
|
(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery.
|
||||||
|
|
||||||
|
An Example
|
||||||
|
----------
|
||||||
|
|
||||||
|
On the cluster nodes first start the receptionist. Note, it is recommended to load the extension
|
||||||
|
when the actor system is started by defining it in the ``akka.extensions`` configuration property::
|
||||||
|
|
||||||
|
akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
|
||||||
|
|
||||||
|
Next, register the actors that should be available for the client.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java#server
|
||||||
|
|
||||||
|
On the client you create the ``ClusterClient`` actor and use it as a gateway for sending
|
||||||
|
messages to the actors identified by their path (without address information) somewhere
|
||||||
|
in the cluster.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java#client
|
||||||
|
|
||||||
|
The ``initialContacts`` parameter is a ``Set<ActorPath>``, which can be created like this:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java#initialContacts
|
||||||
|
|
||||||
|
You will probably define the address information of the initial contact points in configuration or system property.
|
||||||
|
See also :ref:`cluster-client-config-java`.
|
||||||
|
|
||||||
|
A more comprehensive sample is available in the `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
||||||
|
tutorial named `Distributed workers with Akka and Java! <http://www.typesafe.com/activator/template/akka-distributed-workers-java>`_.
|
||||||
|
|
||||||
|
ClusterClientReceptionist Extension
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
In the example above the receptionist is started and accessed with the ``akka.cluster.client.ClusterClientReceptionist`` extension.
|
||||||
|
That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to
|
||||||
|
start the ``akka.cluster.client.ClusterReceptionist`` actor as an ordinary actor and you can have several
|
||||||
|
different receptionists at the same time, serving different types of clients.
|
||||||
|
|
||||||
|
Note that the ``ClusterClientReceptionist`` uses the ``DistributedPubSub`` extension, which is described
|
||||||
|
in :ref:`distributed-pub-sub-java`.
|
||||||
|
|
||||||
|
It is recommended to load the extension when the actor system is started by defining it in the
|
||||||
|
``akka.extensions`` configuration property::
|
||||||
|
|
||||||
|
akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
|
||||||
|
|
||||||
|
Dependencies
|
||||||
|
------------
|
||||||
|
|
||||||
|
To use the Cluster Client you must add the following dependency in your project.
|
||||||
|
|
||||||
|
sbt::
|
||||||
|
|
||||||
|
"com.typesafe.akka" %% "akka-cluster-tools" % "@version@" @crossString@
|
||||||
|
|
||||||
|
maven::
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.typesafe.akka</groupId>
|
||||||
|
<artifactId>akka-cluster-tools_@binVersion@</artifactId>
|
||||||
|
<version>@version@</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
.. _cluster-client-config-java:
|
||||||
|
|
||||||
|
Configuration
|
||||||
|
-------------
|
||||||
|
|
||||||
|
The ``ClusterClientReceptionist`` extension (or ``ClusterReceptionistSettings``) can be configured
|
||||||
|
with the following properties:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#receptionist-ext-config
|
||||||
|
|
||||||
|
The following configuration properties are read by the ``ClusterClientSettings``
|
||||||
|
when created with a ``ActorSystem`` parameter. It is also possible to amend the ``ClusterClientSettings``
|
||||||
|
or create it from another config section with the same layout as below. ``ClusterClientSettings`` is
|
||||||
|
a parameter to the ``ClusterClient.props`` factory method, i.e. each client can be configured
|
||||||
|
with different settings if needed.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#cluster-client-config
|
||||||
|
|
@ -159,4 +159,12 @@ You can plug-in your own metrics collector instead of built-in
|
||||||
|
|
||||||
Look at those two implementations for inspiration.
|
Look at those two implementations for inspiration.
|
||||||
|
|
||||||
Custom metrics collector implementation class must be specified in the :ref:`cluster_metrics_configuration_java`.
|
Custom metrics collector implementation class must be specified in the
|
||||||
|
``akka.cluster.metrics.collector.provider`` configuration property.
|
||||||
|
|
||||||
|
Configuration
|
||||||
|
-------------
|
||||||
|
|
||||||
|
The Cluster metrics extension can be configured with the following properties:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-metrics/src/main/resources/reference.conf
|
||||||
|
|
|
||||||
267
akka-docs/rst/java/cluster-sharding.rst
Normal file
267
akka-docs/rst/java/cluster-sharding.rst
Normal file
|
|
@ -0,0 +1,267 @@
|
||||||
|
.. _cluster_sharding_java:
|
||||||
|
|
||||||
|
Cluster Sharding
|
||||||
|
================
|
||||||
|
|
||||||
|
Cluster sharding is useful when you need to distribute actors across several nodes in the cluster and want to
|
||||||
|
be able to interact with them using their logical identifier, but without having to care about
|
||||||
|
their physical location in the cluster, which might also change over time.
|
||||||
|
|
||||||
|
It could for example be actors representing Aggregate Roots in Domain-Driven Design terminology.
|
||||||
|
Here we call these actors "entities". These actors typically have persistent (durable) state,
|
||||||
|
but this feature is not limited to actors with persistent state.
|
||||||
|
|
||||||
|
Cluster sharding is typically used when you have many stateful actors that together consume
|
||||||
|
more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors
|
||||||
|
it might be easier to run them on a :ref:`cluster-singleton-java` node.
|
||||||
|
|
||||||
|
In this context sharding means that actors with an identifier, so called entities,
|
||||||
|
can be automatically distributed across multiple nodes in the cluster. Each entity
|
||||||
|
actor runs only at one place, and messages can be sent to the entity without requiring
|
||||||
|
the sender to know the location of the destination actor. This is achieved by sending
|
||||||
|
the messages via a ``ShardRegion`` actor provided by this extension, which knows how
|
||||||
|
to route the message with the entity id to the final destination.
|
||||||
|
|
||||||
|
An Example
|
||||||
|
----------
|
||||||
|
|
||||||
|
This is how an entity actor may look like:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-actor
|
||||||
|
|
||||||
|
The above actor uses event sourcing and the support provided in ``UntypedPersistentActor`` to store its state.
|
||||||
|
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
|
||||||
|
its state if it is valuable.
|
||||||
|
|
||||||
|
Note how the ``persistenceId`` is defined. The name of the actor is the entity entity identifier (utf-8 URL-encoded).
|
||||||
|
You may define it another way, but it must be unique.
|
||||||
|
|
||||||
|
When using the sharding extension you are first, typically at system startup on each node
|
||||||
|
in the cluster, supposed to register the supported entity types with the ``ClusterSharding.start``
|
||||||
|
method. ``ClusterSharding.start`` gives you the reference which you can pass along.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-start
|
||||||
|
|
||||||
|
The ``messageExtractor`` defines application specific methods to extract the entity
|
||||||
|
identifier and the shard identifier from incoming messages.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-extractor
|
||||||
|
|
||||||
|
This example illustrates two different ways to define the entity identifier in the messages:
|
||||||
|
|
||||||
|
* The ``Get`` message includes the identifier itself.
|
||||||
|
* The ``EntityEnvelope`` holds the identifier, and the actual message that is
|
||||||
|
sent to the entity actor is wrapped in the envelope.
|
||||||
|
|
||||||
|
Note how these two messages types are handled in the ``entityId`` and ``entityMessage`` methods shown above.
|
||||||
|
The message sent to the entity actor is what ``entityMessage`` returns and that makes it possible to unwrap envelopes
|
||||||
|
if needed.
|
||||||
|
|
||||||
|
A shard is a group of entities that will be managed together. The grouping is defined by the
|
||||||
|
``extractShardId`` function shown above. For a specific entity identifier the shard identifier must always
|
||||||
|
be the same. Otherwise the entity actor might accidentally be started in several places at the same time.
|
||||||
|
|
||||||
|
Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution,
|
||||||
|
i.e. same amount of entities in each shard. As a rule of thumb, the number of shards should be a factor ten greater
|
||||||
|
than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes
|
||||||
|
will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing
|
||||||
|
overhead, and increased latency because the coordinator is involved in the routing of the first message for each
|
||||||
|
shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping
|
||||||
|
all nodes in the cluster.
|
||||||
|
|
||||||
|
A simple sharding algorithm that works fine in most cases is to take the absolute value of the ``hashCode`` of
|
||||||
|
the entity identifier modulo number of shards. As a convenience this is provided by the
|
||||||
|
``ShardRegion.HashCodeMessageExtractor``.
|
||||||
|
|
||||||
|
Messages to the entities are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a
|
||||||
|
named entity type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``.
|
||||||
|
The ``ShardRegion`` will lookup the location of the shard for the entity if it does not already know its location. It will
|
||||||
|
delegate the message to the right node and it will create the entity actor on demand, i.e. when the
|
||||||
|
first message for a specific entity is delivered.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-usage
|
||||||
|
|
||||||
|
How it works
|
||||||
|
------------
|
||||||
|
|
||||||
|
The ``ShardRegion`` actor is started on each node in the cluster, or group of nodes
|
||||||
|
tagged with a specific role. The ``ShardRegion`` is created with two application specific
|
||||||
|
functions to extract the entity identifier and the shard identifier from incoming messages.
|
||||||
|
A shard is a group of entities that will be managed together. For the first message in a
|
||||||
|
specific shard the ``ShardRegion`` request the location of the shard from a central coordinator,
|
||||||
|
the ``ShardCoordinator``.
|
||||||
|
|
||||||
|
The ``ShardCoordinator`` decides which ``ShardRegion`` shall own the ``Shard`` and informs
|
||||||
|
that ``ShardRegion``. The region will confirm this request and create the ``Shard`` supervisor
|
||||||
|
as a child actor. The individual ``Entities`` will then be created when needed by the ``Shard``
|
||||||
|
actor. Incoming messages thus travel via the ``ShardRegion`` and the ``Shard`` to the target
|
||||||
|
``Entity``.
|
||||||
|
|
||||||
|
If the shard home is another ``ShardRegion`` instance messages will be forwarded
|
||||||
|
to that ``ShardRegion`` instance instead. While resolving the location of a
|
||||||
|
shard incoming messages for that shard are buffered and later delivered when the
|
||||||
|
shard home is known. Subsequent messages to the resolved shard can be delivered
|
||||||
|
to the target destination immediately without involving the ``ShardCoordinator``.
|
||||||
|
|
||||||
|
Scenario 1:
|
||||||
|
|
||||||
|
#. Incoming message M1 to ``ShardRegion`` instance R1.
|
||||||
|
#. M1 is mapped to shard S1. R1 doesn't know about S1, so it asks the coordinator C for the location of S1.
|
||||||
|
#. C answers that the home of S1 is R1.
|
||||||
|
#. R1 creates child actor for the entity E1 and sends buffered messages for S1 to E1 child
|
||||||
|
#. All incoming messages for S1 which arrive at R1 can be handled by R1 without C. It creates entity children as needed, and forwards messages to them.
|
||||||
|
|
||||||
|
Scenario 2:
|
||||||
|
|
||||||
|
#. Incoming message M2 to R1.
|
||||||
|
#. M2 is mapped to S2. R1 doesn't know about S2, so it asks C for the location of S2.
|
||||||
|
#. C answers that the home of S2 is R2.
|
||||||
|
#. R1 sends buffered messages for S2 to R2
|
||||||
|
#. All incoming messages for S2 which arrive at R1 can be handled by R1 without C. It forwards messages to R2.
|
||||||
|
#. R2 receives message for S2, ask C, which answers that the home of S2 is R2, and we are in Scenario 1 (but for R2).
|
||||||
|
|
||||||
|
To make sure that at most one instance of a specific entity actor is running somewhere
|
||||||
|
in the cluster it is important that all nodes have the same view of where the shards
|
||||||
|
are located. Therefore the shard allocation decisions are taken by the central
|
||||||
|
``ShardCoordinator``, which is running as a cluster singleton, i.e. one instance on
|
||||||
|
the oldest member among all cluster nodes or a group of nodes tagged with a specific
|
||||||
|
role.
|
||||||
|
|
||||||
|
The logic that decides where a shard is to be located is defined in a pluggable shard
|
||||||
|
allocation strategy. The default implementation ``ShardCoordinator.LeastShardAllocationStrategy``
|
||||||
|
allocates new shards to the ``ShardRegion`` with least number of previously allocated shards.
|
||||||
|
This strategy can be replaced by an application specific implementation.
|
||||||
|
|
||||||
|
To be able to use newly added members in the cluster the coordinator facilitates rebalancing
|
||||||
|
of shards, i.e. migrate entities from one node to another. In the rebalance process the
|
||||||
|
coordinator first notifies all ``ShardRegion`` actors that a handoff for a shard has started.
|
||||||
|
That means they will start buffering incoming messages for that shard, in the same way as if the
|
||||||
|
shard location is unknown. During the rebalance process the coordinator will not answer any
|
||||||
|
requests for the location of shards that are being rebalanced, i.e. local buffering will
|
||||||
|
continue until the handoff is completed. The ``ShardRegion`` responsible for the rebalanced shard
|
||||||
|
will stop all entities in that shard by sending the specified ``handOffStopMessage``
|
||||||
|
(default ``PoisonPill``) to them. When all entities have been terminated the ``ShardRegion``
|
||||||
|
owning the entities will acknowledge the handoff as completed to the coordinator.
|
||||||
|
Thereafter the coordinator will reply to requests for the location of
|
||||||
|
the shard and thereby allocate a new home for the shard and then buffered messages in the
|
||||||
|
``ShardRegion`` actors are delivered to the new location. This means that the state of the entities
|
||||||
|
are not transferred or migrated. If the state of the entities are of importance it should be
|
||||||
|
persistent (durable), e.g. with :ref:`persistence-java`, so that it can be recovered at the new
|
||||||
|
location.
|
||||||
|
|
||||||
|
The logic that decides which shards to rebalance is defined in a pluggable shard
|
||||||
|
allocation strategy. The default implementation ``ShardCoordinator.LeastShardAllocationStrategy``
|
||||||
|
picks shards for handoff from the ``ShardRegion`` with most number of previously allocated shards.
|
||||||
|
They will then be allocated to the ``ShardRegion`` with least number of previously allocated shards,
|
||||||
|
i.e. new members in the cluster. There is a configurable threshold of how large the difference
|
||||||
|
must be to begin the rebalancing. This strategy can be replaced by an application specific
|
||||||
|
implementation.
|
||||||
|
|
||||||
|
The state of shard locations in the ``ShardCoordinator`` is persistent (durable) with
|
||||||
|
:ref:`persistence-java` to survive failures. Since it is running in a cluster :ref:`persistence-java`
|
||||||
|
must be configured with a distributed journal. When a crashed or unreachable coordinator
|
||||||
|
node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton
|
||||||
|
actor will take over and the state is recovered. During such a failure period shards
|
||||||
|
with known location are still available, while messages for new (unknown) shards
|
||||||
|
are buffered until the new ``ShardCoordinator`` becomes available.
|
||||||
|
|
||||||
|
As long as a sender uses the same ``ShardRegion`` actor to deliver messages to an entity
|
||||||
|
actor the order of the messages is preserved. As long as the buffer limit is not reached
|
||||||
|
messages are delivered on a best effort basis, with at-most once delivery semantics,
|
||||||
|
in the same way as ordinary message sending. Reliable end-to-end messaging, with
|
||||||
|
at-least-once semantics can be added by using ``AtLeastOnceDelivery`` in :ref:`persistence-java`.
|
||||||
|
|
||||||
|
Some additional latency is introduced for messages targeted to new or previously
|
||||||
|
unused shards due to the round-trip to the coordinator. Rebalancing of shards may
|
||||||
|
also add latency. This should be considered when designing the application specific
|
||||||
|
shard resolution, e.g. to avoid too fine grained shards.
|
||||||
|
|
||||||
|
Proxy Only Mode
|
||||||
|
---------------
|
||||||
|
|
||||||
|
The ``ShardRegion`` actor can also be started in proxy only mode, i.e. it will not
|
||||||
|
host any entities itself, but knows how to delegate messages to the right location.
|
||||||
|
A ``ShardRegion`` is started in proxy only mode with the method ``ClusterSharding.startProxy``
|
||||||
|
method.
|
||||||
|
|
||||||
|
Passivation
|
||||||
|
-----------
|
||||||
|
|
||||||
|
If the state of the entities are persistent you may stop entities that are not used to
|
||||||
|
reduce memory consumption. This is done by the application specific implementation of
|
||||||
|
the entity actors for example by defining receive timeout (``context.setReceiveTimeout``).
|
||||||
|
If a message is already enqueued to the entity when it stops itself the enqueued message
|
||||||
|
in the mailbox will be dropped. To support graceful passivation without loosing such
|
||||||
|
messages the entity actor can send ``ShardRegion.Passivate`` to its parent ``Shard``.
|
||||||
|
The specified wrapped message in ``Passivate`` will be sent back to the entity, which is
|
||||||
|
then supposed to stop itself. Incoming messages will be buffered by the ``Shard``
|
||||||
|
between reception of ``Passivate`` and termination of the entity. Such buffered messages
|
||||||
|
are thereafter delivered to a new incarnation of the entity.
|
||||||
|
|
||||||
|
Remembering Entities
|
||||||
|
--------------------
|
||||||
|
|
||||||
|
The list of entities in each ``Shard`` can be made persistent (durable) by setting
|
||||||
|
the ``rememberEntities`` flag to true in ``ClusterShardingSettings`` when calling
|
||||||
|
``ClusterSharding.start``. When configured to remember entities, whenever a ``Shard``
|
||||||
|
is rebalanced onto another node or recovers after a crash it will recreate all the
|
||||||
|
entities which were previously running in that ``Shard``. To permanently stop entities,
|
||||||
|
a ``Passivate`` message must be sent to the parent of the entity actor, otherwise the
|
||||||
|
entity will be automatically restarted after the entity restart backoff specified in
|
||||||
|
the configuration.
|
||||||
|
|
||||||
|
When ``rememberEntities`` is set to false, a ``Shard`` will not automatically restart any entities
|
||||||
|
after a rebalance or recovering from a crash. Entities will only be started once the first message
|
||||||
|
for that entity has been received in the ``Shard``. Entities will not be restarted if they stop without
|
||||||
|
using a ``Passivate``.
|
||||||
|
|
||||||
|
Note that the state of the entities themselves will not be restored unless they have been made persistent,
|
||||||
|
e.g. with :ref:`persistence-java`.
|
||||||
|
|
||||||
|
Graceful Shutdown
|
||||||
|
-----------------
|
||||||
|
|
||||||
|
You can send the message ``ClusterSharding.GracefulShutdown`` message (``ClusterSharding.gracefulShutdownInstance
|
||||||
|
in Java) to the ``ShardRegion`` actor to handoff all shards that are hosted by that ``ShardRegion`` and then the
|
||||||
|
``ShardRegion`` actor will be stopped. You can ``watch`` the ``ShardRegion`` actor to know when it is completed.
|
||||||
|
During this period other regions will buffer messages for those shards in the same way as when a rebalance is
|
||||||
|
triggered by the coordinator. When the shards have been stopped the coordinator will allocate these shards elsewhere.
|
||||||
|
|
||||||
|
When the ``ShardRegion`` has terminated you probably want to ``leave`` the cluster, and shut down the ``ActorSystem``.
|
||||||
|
|
||||||
|
This is how to do that:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#graceful-shutdown
|
||||||
|
|
||||||
|
Dependencies
|
||||||
|
------------
|
||||||
|
|
||||||
|
To use the Cluster Sharding you must add the following dependency in your project.
|
||||||
|
|
||||||
|
sbt::
|
||||||
|
|
||||||
|
"com.typesafe.akka" %% "akka-cluster-sharding" % "@version@" @crossString@
|
||||||
|
|
||||||
|
maven::
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.typesafe.akka</groupId>
|
||||||
|
<artifactId>akka-cluster-sharding_@binVersion@</artifactId>
|
||||||
|
<version>@version@</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
Configuration
|
||||||
|
-------------
|
||||||
|
|
||||||
|
The ``ClusterSharding`` extension can be configured with the following properties. These configuration
|
||||||
|
properties are read by the ``ClusterShardingSettings`` when created with a ``ActorSystem`` parameter.
|
||||||
|
It is also possible to amend the ``ClusterShardingSettings`` or create it from another config section
|
||||||
|
with the same layout as below. ``ClusterShardingSettings`` is a parameter to the ``start`` method of
|
||||||
|
the ``ClusterSharding`` extension, i.e. each each entity type can be configured with different settings
|
||||||
|
if needed.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-sharding/src/main/resources/reference.conf#sharding-ext-config
|
||||||
|
|
||||||
|
Custom shard allocation strategy can be defined in an optional parameter to
|
||||||
|
``ClusterSharding.start``. See the API documentation of ``AbstractShardAllocationStrategy`` for details
|
||||||
|
of how to implement a custom shard allocation strategy.
|
||||||
141
akka-docs/rst/java/cluster-singleton.rst
Normal file
141
akka-docs/rst/java/cluster-singleton.rst
Normal file
|
|
@ -0,0 +1,141 @@
|
||||||
|
.. _cluster-singleton-java:
|
||||||
|
|
||||||
|
Cluster Singleton
|
||||||
|
=================
|
||||||
|
|
||||||
|
For some use cases it is convenient and sometimes also mandatory to ensure that
|
||||||
|
you have exactly one actor of a certain type running somewhere in the cluster.
|
||||||
|
|
||||||
|
Some examples:
|
||||||
|
|
||||||
|
* single point of responsibility for certain cluster-wide consistent decisions, or
|
||||||
|
coordination of actions across the cluster system
|
||||||
|
* single entry point to an external system
|
||||||
|
* single master, many workers
|
||||||
|
* centralized naming service, or routing logic
|
||||||
|
|
||||||
|
Using a singleton should not be the first design choice. It has several drawbacks,
|
||||||
|
such as single-point of bottleneck. Single-point of failure is also a relevant concern,
|
||||||
|
but for some cases this feature takes care of that by making sure that another singleton
|
||||||
|
instance will eventually be started.
|
||||||
|
|
||||||
|
The cluster singleton pattern is implemented by ``akka.cluster.singleton.ClusterSingletonManager``.
|
||||||
|
It manages one singleton actor instance among all cluster nodes or a group of nodes tagged with
|
||||||
|
a specific role. ``ClusterSingletonManager`` is an actor that is supposed to be started on
|
||||||
|
all nodes, or all nodes with specified role, in the cluster. The actual singleton actor is
|
||||||
|
started by the ``ClusterSingletonManager`` on the oldest node by creating a child actor from
|
||||||
|
supplied ``Props``. ``ClusterSingletonManager`` makes sure that at most one singleton instance
|
||||||
|
is running at any point in time.
|
||||||
|
|
||||||
|
The singleton actor is always running on the oldest member with specified role.
|
||||||
|
The oldest member is determined by [[akka.cluster.Member#isOlderThan]].
|
||||||
|
This can change when removing that member from the cluster. Be aware that there is a short time
|
||||||
|
period when there is no active singleton during the hand-over process.
|
||||||
|
|
||||||
|
The cluster failure detector will notice when oldest node becomes unreachable due to
|
||||||
|
things like JVM crash, hard shut down, or network failure. Then a new oldest node will
|
||||||
|
take over and a new singleton actor is created. For these failure scenarios there will
|
||||||
|
not be a graceful hand-over, but more than one active singletons is prevented by all
|
||||||
|
reasonable means. Some corner cases are eventually resolved by configurable timeouts.
|
||||||
|
|
||||||
|
You can access the singleton actor by using the provided ``akka.cluster.singleton.ClusterSingletonProxy``,
|
||||||
|
which will route all messages to the current instance of the singleton. The proxy will keep track of
|
||||||
|
the oldest node in the cluster and resolve the singleton's ``ActorRef`` by explicitly sending the
|
||||||
|
singleton's ``actorSelection`` the ``akka.actor.Identify`` message and waiting for it to reply.
|
||||||
|
This is performed periodically if the singleton doesn't reply within a certain (configurable) time.
|
||||||
|
Given the implementation, there might be periods of time during which the ``ActorRef`` is unavailable,
|
||||||
|
e.g., when a node leaves the cluster. In these cases, the proxy will buffer the messages sent to the
|
||||||
|
singleton and then deliver them when the singleton is finally available. If the buffer is full
|
||||||
|
the ``ClusterSingletonProxy`` will drop old messages when new messages are sent via the proxy.
|
||||||
|
The size of the buffer is configurable and it can be disabled by using a buffer size of 0.
|
||||||
|
|
||||||
|
It's worth noting that messages can always be lost because of the distributed nature of these actors.
|
||||||
|
As always, additional logic should be implemented in the singleton (acknowledgement) and in the
|
||||||
|
client (retry) actors to ensure at-least-once message delivery.
|
||||||
|
|
||||||
|
Potential problems to be aware of
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
This pattern may seem to be very tempting to use at first, but it has several drawbacks, some of them are listed below:
|
||||||
|
|
||||||
|
* the cluster singleton may quickly become a *performance bottleneck*,
|
||||||
|
* you can not rely on the cluster singleton to be *non-stop* available — e.g. when the node on which the singleton has
|
||||||
|
been running dies, it will take a few seconds for this to be noticed and the singleton be migrated to another node,
|
||||||
|
* in the case of a *network partition* appearing in a Cluster that is using Automatic Downing (see docs for
|
||||||
|
:ref:`automatic-vs-manual-downing-java`),
|
||||||
|
it may happen that the isolated clusters each decide to spin up their own singleton, meaning that there might be multiple
|
||||||
|
singletons running in the system, yet the Clusters have no way of finding out about them (because of the partition).
|
||||||
|
|
||||||
|
Especially the last point is something you should be aware of — in general when using the Cluster Singleton pattern
|
||||||
|
you should take care of downing nodes yourself and not rely on the timing based auto-down feature.
|
||||||
|
|
||||||
|
.. warning::
|
||||||
|
**Be very careful when using Cluster Singleton together with Automatic Downing**,
|
||||||
|
since it allows the cluster to split up into two separate clusters, which in turn will result
|
||||||
|
in *multiple Singletons* being started, one in each separate cluster!
|
||||||
|
|
||||||
|
An Example
|
||||||
|
----------
|
||||||
|
|
||||||
|
Assume that we need one single entry point to an external system. An actor that
|
||||||
|
receives messages from a JMS queue with the strict requirement that only one
|
||||||
|
JMS consumer must exist to be make sure that the messages are processed in order.
|
||||||
|
That is perhaps not how one would like to design things, but a typical real-world
|
||||||
|
scenario when integrating with external systems.
|
||||||
|
|
||||||
|
On each node in the cluster you need to start the ``ClusterSingletonManager`` and
|
||||||
|
supply the ``Props`` of the singleton actor, in this case the JMS queue consumer.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java#create-singleton-manager
|
||||||
|
|
||||||
|
Here we limit the singleton to nodes tagged with the ``"worker"`` role, but all nodes, independent of
|
||||||
|
role, can be used by not specifying ``withRole``.
|
||||||
|
|
||||||
|
Here we use an application specific ``terminationMessage`` to be able to close the
|
||||||
|
resources before actually stopping the singleton actor. Note that ``PoisonPill`` is a
|
||||||
|
perfectly fine ``terminationMessage`` if you only need to stop the actor.
|
||||||
|
|
||||||
|
With the names given above, access to the singleton can be obtained from any cluster node using a properly
|
||||||
|
configured proxy.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java#create-singleton-proxy
|
||||||
|
|
||||||
|
A more comprehensive sample is available in the `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
||||||
|
tutorial named `Distributed workers with Akka and Java! <http://www.typesafe.com/activator/template/akka-distributed-workers-java>`_.
|
||||||
|
|
||||||
|
Dependencies
|
||||||
|
------------
|
||||||
|
|
||||||
|
To use the Cluster Singleton you must add the following dependency in your project.
|
||||||
|
|
||||||
|
sbt::
|
||||||
|
|
||||||
|
"com.typesafe.akka" %% "akka-cluster-tools" % "@version@" @crossString@
|
||||||
|
|
||||||
|
maven::
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.typesafe.akka</groupId>
|
||||||
|
<artifactId>akka-cluster-tools_@binVersion@</artifactId>
|
||||||
|
<version>@version@</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
Configuration
|
||||||
|
-------------
|
||||||
|
|
||||||
|
The following configuration properties are read by the ``ClusterSingletonManagerSettings``
|
||||||
|
when created with a ``ActorSystem`` parameter. It is also possible to amend the ``ClusterSingletonManagerSettings``
|
||||||
|
or create it from another config section with the same layout as below. ``ClusterSingletonManagerSettings`` is
|
||||||
|
a parameter to the ``ClusterSingletonManager.props`` factory method, i.e. each singleton can be configured
|
||||||
|
with different settings if needed.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#singleton-config
|
||||||
|
|
||||||
|
The following configuration properties are read by the ``ClusterSingletonProxySettings``
|
||||||
|
when created with a ``ActorSystem`` parameter. It is also possible to amend the ``ClusterSingletonProxySettings``
|
||||||
|
or create it from another config section with the same layout as below. ``ClusterSingletonProxySettings`` is
|
||||||
|
a parameter to the ``ClusterSingletonProxy.props`` factory method, i.e. each singleton proxy can be configured
|
||||||
|
with different settings if needed.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#singleton-proxy-config
|
||||||
|
|
@ -313,7 +313,7 @@ you have exactly one actor of a certain type running somewhere in the cluster.
|
||||||
|
|
||||||
This can be implemented by subscribing to member events, but there are several corner
|
This can be implemented by subscribing to member events, but there are several corner
|
||||||
cases to consider. Therefore, this specific use case is made easily accessible by the
|
cases to consider. Therefore, this specific use case is made easily accessible by the
|
||||||
:ref:`cluster-singleton` in the contrib module.
|
:ref:`cluster-singleton-java`.
|
||||||
|
|
||||||
Cluster Sharding
|
Cluster Sharding
|
||||||
^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^
|
||||||
|
|
@ -322,7 +322,7 @@ Distributes actors across several nodes in the cluster and supports interaction
|
||||||
with the actors using their logical identifier, but without having to care about
|
with the actors using their logical identifier, but without having to care about
|
||||||
their physical location in the cluster.
|
their physical location in the cluster.
|
||||||
|
|
||||||
See :ref:`cluster-sharding` in the contrib module.
|
See :ref:`cluster_sharding_java`.
|
||||||
|
|
||||||
Distributed Publish Subscribe
|
Distributed Publish Subscribe
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
@ -331,7 +331,7 @@ Publish-subscribe messaging between actors in the cluster, and point-to-point me
|
||||||
using the logical path of the actors, i.e. the sender does not have to know on which
|
using the logical path of the actors, i.e. the sender does not have to know on which
|
||||||
node the destination actor is running.
|
node the destination actor is running.
|
||||||
|
|
||||||
See :ref:`distributed-pub-sub` in the contrib module.
|
See :ref:`distributed-pub-sub-scala`.
|
||||||
|
|
||||||
Cluster Client
|
Cluster Client
|
||||||
^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^
|
||||||
|
|
@ -340,7 +340,15 @@ Communication from an actor system that is not part of the cluster to actors run
|
||||||
somewhere in the cluster. The client does not have to know on which node the destination
|
somewhere in the cluster. The client does not have to know on which node the destination
|
||||||
actor is running.
|
actor is running.
|
||||||
|
|
||||||
See :ref:`cluster-client` in the contrib module.
|
See :ref:`cluster-client-java`.
|
||||||
|
|
||||||
|
Distributed Data
|
||||||
|
^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
*Akka Distributed Data* is useful when you need to share data between nodes in an
|
||||||
|
Akka Cluster. The data is accessed with an actor providing a key-value store like API.
|
||||||
|
|
||||||
|
See :ref:`distributed_data_java`.
|
||||||
|
|
||||||
Failure Detector
|
Failure Detector
|
||||||
^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^
|
||||||
|
|
@ -535,7 +543,7 @@ Router Example with Pool of Remote Deployed Routees
|
||||||
---------------------------------------------------
|
---------------------------------------------------
|
||||||
|
|
||||||
Let's take a look at how to use a cluster aware router on single master node that creates
|
Let's take a look at how to use a cluster aware router on single master node that creates
|
||||||
and deploys workers. To keep track of a single master we use the :ref:`cluster-singleton`
|
and deploys workers. To keep track of a single master we use the :ref:`cluster-singleton-java`
|
||||||
in the contrib module. The ``ClusterSingletonManager`` is started on each node.
|
in the contrib module. The ``ClusterSingletonManager`` is started on each node.
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java#create-singleton-manager
|
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/stats/StatsSampleOneMasterMain.java#create-singleton-manager
|
||||||
|
|
|
||||||
138
akka-docs/rst/java/distributed-pub-sub.rst
Normal file
138
akka-docs/rst/java/distributed-pub-sub.rst
Normal file
|
|
@ -0,0 +1,138 @@
|
||||||
|
.. _distributed-pub-sub-java:
|
||||||
|
|
||||||
|
Distributed Publish Subscribe in Cluster
|
||||||
|
========================================
|
||||||
|
|
||||||
|
How do I send a message to an actor without knowing which node it is running on?
|
||||||
|
|
||||||
|
How do I send messages to all actors in the cluster that have registered interest
|
||||||
|
in a named topic?
|
||||||
|
|
||||||
|
This pattern provides a mediator actor, ``akka.cluster.pubsub.DistributedPubSubMediator``,
|
||||||
|
that manages a registry of actor references and replicates the entries to peer
|
||||||
|
actors among all cluster nodes or a group of nodes tagged with a specific role.
|
||||||
|
|
||||||
|
The ``DistributedPubSubMediator`` actor is supposed to be started on all nodes,
|
||||||
|
or all nodes with specified role, in the cluster. The mediator can be
|
||||||
|
started with the ``DistributedPubSub`` extension or as an ordinary actor.
|
||||||
|
|
||||||
|
The registry is eventually consistent, i.e. changes are not immediately visible at
|
||||||
|
other nodes, but typically they will be fully replicated to all other nodes after
|
||||||
|
a few seconds. Changes are only performed in the own part of the registry and those
|
||||||
|
changes are versioned. Deltas are disseminated in a scalable way to other nodes with
|
||||||
|
a gossip protocol.
|
||||||
|
|
||||||
|
You can send messages via the mediator on any node to registered actors on
|
||||||
|
any other node. There are four modes of message delivery.
|
||||||
|
|
||||||
|
**1. DistributedPubSubMediator.Send**
|
||||||
|
|
||||||
|
The message will be delivered to one recipient with a matching path, if any such
|
||||||
|
exists in the registry. If several entries match the path the message will be sent
|
||||||
|
via the supplied ``RoutingLogic`` (default random) to one destination. The sender() of the
|
||||||
|
message can specify that local affinity is preferred, i.e. the message is sent to an actor
|
||||||
|
in the same local actor system as the used mediator actor, if any such exists, otherwise
|
||||||
|
route to any other matching entry. A typical usage of this mode is private chat to one
|
||||||
|
other user in an instant messaging application. It can also be used for distributing
|
||||||
|
tasks to registered workers, like a cluster aware router where the routees dynamically
|
||||||
|
can register themselves.
|
||||||
|
|
||||||
|
**2. DistributedPubSubMediator.SendToAll**
|
||||||
|
|
||||||
|
The message will be delivered to all recipients with a matching path. Actors with
|
||||||
|
the same path, without address information, can be registered on different nodes.
|
||||||
|
On each node there can only be one such actor, since the path is unique within one
|
||||||
|
local actor system. Typical usage of this mode is to broadcast messages to all replicas
|
||||||
|
with the same path, e.g. 3 actors on different nodes that all perform the same actions,
|
||||||
|
for redundancy. You can also optionally specify a property (``allButSelf``) deciding
|
||||||
|
if the message should be sent to a matching path on the self node or not.
|
||||||
|
|
||||||
|
**3. DistributedPubSubMediator.Publish**
|
||||||
|
|
||||||
|
Actors may be registered to a named topic instead of path. This enables many subscribers
|
||||||
|
on each node. The message will be delivered to all subscribers of the topic. For
|
||||||
|
efficiency the message is sent over the wire only once per node (that has a matching topic),
|
||||||
|
and then delivered to all subscribers of the local topic representation. This is the
|
||||||
|
true pub/sub mode. A typical usage of this mode is a chat room in an instant messaging
|
||||||
|
application.
|
||||||
|
|
||||||
|
**4. DistributedPubSubMediator.Publish with sendOneMessageToEachGroup**
|
||||||
|
|
||||||
|
Actors may be subscribed to a named topic with an optional property (``group``).
|
||||||
|
If subscribing with a group name, each message published to a topic with the
|
||||||
|
(``sendOneMessageToEachGroup``) flag is delivered via the supplied ``RoutingLogic``
|
||||||
|
(default random) to one actor within each subscribing group.
|
||||||
|
If all the subscribed actors have the same group name, then this works just like
|
||||||
|
``Send`` and all messages are delivered to one subscriber.
|
||||||
|
If all the subscribed actors have different group names, then this works like
|
||||||
|
normal ``Publish`` and all messages are broadcast to all subscribers.
|
||||||
|
|
||||||
|
You register actors to the local mediator with ``DistributedPubSubMediator.Put`` or
|
||||||
|
``DistributedPubSubMediator.Subscribe``. ``Put`` is used together with ``Send`` and
|
||||||
|
``SendToAll`` message delivery modes. The ``ActorRef`` in ``Put`` must belong to the same
|
||||||
|
local actor system as the mediator. ``Subscribe`` is used together with ``Publish``.
|
||||||
|
Actors are automatically removed from the registry when they are terminated, or you
|
||||||
|
can explicitly remove entries with ``DistributedPubSubMediator.Remove`` or
|
||||||
|
``DistributedPubSubMediator.Unsubscribe``.
|
||||||
|
|
||||||
|
Successful ``Subscribe`` and ``Unsubscribe`` is acknowledged with
|
||||||
|
``DistributedPubSubMediator.SubscribeAck`` and ``DistributedPubSubMediator.UnsubscribeAck``
|
||||||
|
replies.
|
||||||
|
|
||||||
|
A Small Example
|
||||||
|
---------------
|
||||||
|
|
||||||
|
A subscriber actor:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java#subscriber
|
||||||
|
|
||||||
|
Subscriber actors can be started on several nodes in the cluster, and all will receive
|
||||||
|
messages published to the "content" topic.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java#start-subscribers
|
||||||
|
|
||||||
|
A simple actor that publishes to this "content" topic:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java#publisher
|
||||||
|
|
||||||
|
It can publish messages to the topic from anywhere in the cluster:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java#publish-message
|
||||||
|
|
||||||
|
DistributedPubSub Extension
|
||||||
|
---------------------------
|
||||||
|
|
||||||
|
In the example above the mediator is started and accessed with the ``akka.cluster.pubsub.DistributedPubSub`` extension.
|
||||||
|
That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to
|
||||||
|
start the mediator actor as an ordinary actor and you can have several different mediators at the same
|
||||||
|
time to be able to divide a large number of actors/topics to different mediators. For example you might
|
||||||
|
want to use different cluster roles for different mediators.
|
||||||
|
|
||||||
|
The ``DistributedPubSub`` extension can be configured with the following properties:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#pub-sub-ext-config
|
||||||
|
|
||||||
|
It is recommended to load the extension when the actor system is started by defining it in
|
||||||
|
``akka.extensions`` configuration property. Otherwise it will be activated when first used
|
||||||
|
and then it takes a while for it to be populated.
|
||||||
|
|
||||||
|
::
|
||||||
|
|
||||||
|
akka.extensions = ["akka.cluster.pubsub.DistributedPubSub"]
|
||||||
|
|
||||||
|
Dependencies
|
||||||
|
------------
|
||||||
|
|
||||||
|
To use the Cluster Singleton you must add the following dependency in your project.
|
||||||
|
|
||||||
|
sbt::
|
||||||
|
|
||||||
|
"com.typesafe.akka" %% "akka-cluster-tools" % "@version@" @crossString@
|
||||||
|
|
||||||
|
maven::
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.typesafe.akka</groupId>
|
||||||
|
<artifactId>akka-cluster-tools_@binVersion@</artifactId>
|
||||||
|
<version>@version@</version>
|
||||||
|
</dependency>
|
||||||
|
|
@ -152,7 +152,7 @@ Similarly to `Actor Classification`_, :class:`EventStream` will automatically re
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
The event stream is a *local facility*, meaning that it will *not* distribute events to other nodes in a clustered environment (unless you subscribe a Remote Actor to the stream explicitly).
|
The event stream is a *local facility*, meaning that it will *not* distribute events to other nodes in a clustered environment (unless you subscribe a Remote Actor to the stream explicitly).
|
||||||
If you need to broadcast events in an Akka cluster, *without* knowing your recipients explicitly (i.e. obtaining their ActorRefs), you may want to look into: :ref:`distributed-pub-sub`.
|
If you need to broadcast events in an Akka cluster, *without* knowing your recipients explicitly (i.e. obtaining their ActorRefs), you may want to look into: :ref:`distributed-pub-sub-java`.
|
||||||
|
|
||||||
Default Handlers
|
Default Handlers
|
||||||
----------------
|
----------------
|
||||||
|
|
|
||||||
|
|
@ -6,10 +6,10 @@ Networking
|
||||||
|
|
||||||
../common/cluster
|
../common/cluster
|
||||||
cluster-usage
|
cluster-usage
|
||||||
../scala/cluster-singleton
|
cluster-singleton
|
||||||
../scala/distributed-pub-sub
|
distributed-pub-sub
|
||||||
../scala/cluster-client
|
cluster-client
|
||||||
../scala/cluster-sharding
|
cluster-sharding
|
||||||
cluster-metrics
|
cluster-metrics
|
||||||
distributed-data
|
distributed-data
|
||||||
remoting
|
remoting
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
.. _cluster-client:
|
.. _cluster-client-scala:
|
||||||
|
|
||||||
Cluster Client
|
Cluster Client
|
||||||
==============
|
==============
|
||||||
|
|
@ -10,13 +10,19 @@ contact points. It will establish a connection to a ``ClusterReceptionist`` some
|
||||||
the cluster. It will monitor the connection to the receptionist and establish a new
|
the cluster. It will monitor the connection to the receptionist and establish a new
|
||||||
connection if the link goes down. When looking for a new receptionist it uses fresh
|
connection if the link goes down. When looking for a new receptionist it uses fresh
|
||||||
contact points retrieved from previous establishment, or periodically refreshed contacts,
|
contact points retrieved from previous establishment, or periodically refreshed contacts,
|
||||||
i.e. not necessarily the initial contact points. Also, note it's necessary to change
|
i.e. not necessarily the initial contact points.
|
||||||
``akka.actor.provider`` from ``akka.actor.LocalActorRefProvider`` to
|
|
||||||
``akka.remote.RemoteActorRefProvider`` or ``akka.cluster.ClusterActorRefProvider`` when using
|
.. note::
|
||||||
|
|
||||||
|
``ClusterClient`` should not be used when sending messages to actors that run
|
||||||
|
within the same cluster. Similar functionality as the ``ClusterClient`` is
|
||||||
|
provided in a more efficient way by :ref:`distributed-pub-sub-scala` for actors that
|
||||||
|
belong to the same cluster.
|
||||||
|
|
||||||
|
Also, note it's necessary to change ``akka.actor.provider`` from ``akka.actor.LocalActorRefProvider``
|
||||||
|
to ``akka.remote.RemoteActorRefProvider`` or ``akka.cluster.ClusterActorRefProvider`` when using
|
||||||
the cluster client.
|
the cluster client.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
The receptionist is supposed to be started on all nodes, or all nodes with specified role,
|
The receptionist is supposed to be started on all nodes, or all nodes with specified role,
|
||||||
in the cluster. The receptionist can be started with the ``ClusterClientReceptionist`` extension
|
in the cluster. The receptionist can be started with the ``ClusterClientReceptionist`` extension
|
||||||
or as an ordinary actor.
|
or as an ordinary actor.
|
||||||
|
|
@ -79,30 +85,26 @@ in the cluster.
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala#client
|
.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala#client
|
||||||
|
|
||||||
The ``initialContacts`` parameter is a ``Set[ActorSelection]``, which can be created like this:
|
The ``initialContacts`` parameter is a ``Set[ActorPath]``, which can be created like this:
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala#initialContacts
|
.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala#initialContacts
|
||||||
|
|
||||||
You will probably define the address information of the initial contact points in configuration or system property.
|
You will probably define the address information of the initial contact points in configuration or system property.
|
||||||
|
See also :ref:`cluster-client-config-scala`.
|
||||||
|
|
||||||
A more comprehensive sample is available in the `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
A more comprehensive sample is available in the `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
||||||
tutorial named `Distributed workers with Akka and Scala! <http://www.typesafe.com/activator/template/akka-distributed-workers>`_
|
tutorial named `Distributed workers with Akka and Scala! <http://www.typesafe.com/activator/template/akka-distributed-workers>`_.
|
||||||
and `Distributed workers with Akka and Java! <http://www.typesafe.com/activator/template/akka-distributed-workers-java>`_.
|
|
||||||
|
|
||||||
ClusterClientReceptionist
|
ClusterClientReceptionist Extension
|
||||||
----------------------------
|
-----------------------------------
|
||||||
|
|
||||||
In the example above the receptionist is started and accessed with the ``akka.cluster.client.ClusterClientReceptionist``.
|
In the example above the receptionist is started and accessed with the ``akka.cluster.client.ClusterClientReceptionist`` extension.
|
||||||
That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to
|
That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to
|
||||||
start the ``akka.cluster.client.ClusterReceptionist`` actor as an ordinary actor and you can have several
|
start the ``akka.cluster.client.ClusterReceptionist`` actor as an ordinary actor and you can have several
|
||||||
different receptionists at the same time, serving different types of clients.
|
different receptionists at the same time, serving different types of clients.
|
||||||
|
|
||||||
The ``ClusterClientReceptionist`` can be configured with the following properties:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#receptionist-ext-config
|
|
||||||
|
|
||||||
Note that the ``ClusterClientReceptionist`` uses the ``DistributedPubSub`` extension, which is described
|
Note that the ``ClusterClientReceptionist`` uses the ``DistributedPubSub`` extension, which is described
|
||||||
in :ref:`distributed-pub-sub`.
|
in :ref:`distributed-pub-sub-scala`.
|
||||||
|
|
||||||
It is recommended to load the extension when the actor system is started by defining it in the
|
It is recommended to load the extension when the actor system is started by defining it in the
|
||||||
``akka.extensions`` configuration property::
|
``akka.extensions`` configuration property::
|
||||||
|
|
@ -125,3 +127,21 @@ maven::
|
||||||
<artifactId>akka-cluster-tools_@binVersion@</artifactId>
|
<artifactId>akka-cluster-tools_@binVersion@</artifactId>
|
||||||
<version>@version@</version>
|
<version>@version@</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
.. _cluster-client-config-scala:
|
||||||
|
|
||||||
|
Configuration
|
||||||
|
-------------
|
||||||
|
|
||||||
|
The ``ClusterClientReceptionist`` extension (or ``ClusterReceptionistSettings``) can be configured
|
||||||
|
with the following properties:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#receptionist-ext-config
|
||||||
|
|
||||||
|
The following configuration properties are read by the ``ClusterClientSettings``
|
||||||
|
when created with a ``ActorSystem`` parameter. It is also possible to amend the ``ClusterClientSettings``
|
||||||
|
or create it from another config section with the same layout as below. ``ClusterClientSettings`` is
|
||||||
|
a parameter to the ``ClusterClient.props`` factory method, i.e. each client can be configured
|
||||||
|
with different settings if needed.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#cluster-client-config
|
||||||
|
|
|
||||||
|
|
@ -152,4 +152,12 @@ You can plug-in your own metrics collector instead of built-in
|
||||||
|
|
||||||
Look at those two implementations for inspiration.
|
Look at those two implementations for inspiration.
|
||||||
|
|
||||||
Custom metrics collector implementation class must be specified in the :ref:`cluster_metrics_configuration_scala`.
|
Custom metrics collector implementation class must be specified in the
|
||||||
|
``akka.cluster.metrics.collector.provider`` configuration property.
|
||||||
|
|
||||||
|
Configuration
|
||||||
|
-------------
|
||||||
|
|
||||||
|
The Cluster metrics extension can be configured with the following properties:
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-metrics/src/main/resources/reference.conf
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
.. _cluster-sharding:
|
.. _cluster_sharding_scala:
|
||||||
|
|
||||||
Cluster Sharding
|
Cluster Sharding
|
||||||
================
|
================
|
||||||
|
|
@ -13,7 +13,7 @@ but this feature is not limited to actors with persistent state.
|
||||||
|
|
||||||
Cluster sharding is typically used when you have many stateful actors that together consume
|
Cluster sharding is typically used when you have many stateful actors that together consume
|
||||||
more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors
|
more resources (e.g. memory) than fit on one machine. If you only have a few stateful actors
|
||||||
it might be easier to run them on a :ref:`cluster-singleton` node.
|
it might be easier to run them on a :ref:`cluster-singleton-scala` node.
|
||||||
|
|
||||||
In this context sharding means that actors with an identifier, so called entities,
|
In this context sharding means that actors with an identifier, so called entities,
|
||||||
can be automatically distributed across multiple nodes in the cluster. Each entity
|
can be automatically distributed across multiple nodes in the cluster. Each entity
|
||||||
|
|
@ -22,66 +22,8 @@ the sender to know the location of the destination actor. This is achieved by se
|
||||||
the messages via a ``ShardRegion`` actor provided by this extension, which knows how
|
the messages via a ``ShardRegion`` actor provided by this extension, which knows how
|
||||||
to route the message with the entity id to the final destination.
|
to route the message with the entity id to the final destination.
|
||||||
|
|
||||||
An Example in Java
|
An Example
|
||||||
------------------
|
----------
|
||||||
|
|
||||||
This is how an entity actor may look like:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-actor
|
|
||||||
|
|
||||||
The above actor uses event sourcing and the support provided in ``UntypedPersistentActor`` to store its state.
|
|
||||||
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
|
|
||||||
its state if it is valuable.
|
|
||||||
|
|
||||||
Note how the ``persistenceId`` is defined. You may define it another way, but it must be unique.
|
|
||||||
|
|
||||||
When using the sharding extension you are first, typically at system startup on each node
|
|
||||||
in the cluster, supposed to register the supported entity types with the ``ClusterSharding.start``
|
|
||||||
method. ``ClusterSharding.start`` gives you the reference which you can pass along.
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-start
|
|
||||||
|
|
||||||
The ``messageExtractor`` defines application specific methods to extract the entity
|
|
||||||
identifier and the shard identifier from incoming messages.
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-extractor
|
|
||||||
|
|
||||||
This example illustrates two different ways to define the entity identifier in the messages:
|
|
||||||
|
|
||||||
* The ``Get`` message includes the identifier itself.
|
|
||||||
* The ``EntityEnvelope`` holds the identifier, and the actual message that is
|
|
||||||
sent to the entity actor is wrapped in the envelope.
|
|
||||||
|
|
||||||
Note how these two messages types are handled in the ``entityId`` and ``entityMessage`` methods shown above.
|
|
||||||
The message sent to the entity actor is what ``entityMessage`` returns and that makes it possible to unwrap envelopes
|
|
||||||
if needed.
|
|
||||||
|
|
||||||
A shard is a group of entities that will be managed together. The grouping is defined by the
|
|
||||||
``extractShardId`` function shown above. For a specific entity identifier the shard identifier must always
|
|
||||||
be the same. Otherwise the entity actor might accidentally be started in several places at the same time.
|
|
||||||
|
|
||||||
Creating a good sharding algorithm is an interesting challenge in itself. Try to produce a uniform distribution,
|
|
||||||
i.e. same amount of entities in each shard. As a rule of thumb, the number of shards should be a factor ten greater
|
|
||||||
than the planned maximum number of cluster nodes. Less shards than number of nodes will result in that some nodes
|
|
||||||
will not host any shards. Too many shards will result in less efficient management of the shards, e.g. rebalancing
|
|
||||||
overhead, and increased latency because the coordinator is involved in the routing of the first message for each
|
|
||||||
shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping
|
|
||||||
all nodes in the cluster.
|
|
||||||
|
|
||||||
A simple sharding algorithm that works fine in most cases is to take the absolute value of the ``hashCode`` of
|
|
||||||
the entity identifier modulo number of shards. As a convenience this is provided by the
|
|
||||||
``ShardRegion.HashCodeMessageExtractor``.
|
|
||||||
|
|
||||||
Messages to the entities are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a
|
|
||||||
named entity type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``.
|
|
||||||
The ``ShardRegion`` will lookup the location of the shard for the entity if it does not already know its location. It will
|
|
||||||
delegate the message to the right node and it will create the entity actor on demand, i.e. when the
|
|
||||||
first message for a specific entity is delivered.
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#counter-usage
|
|
||||||
|
|
||||||
An Example in Scala
|
|
||||||
-------------------
|
|
||||||
|
|
||||||
This is how an entity actor may look like:
|
This is how an entity actor may look like:
|
||||||
|
|
||||||
|
|
@ -91,7 +33,8 @@ The above actor uses event sourcing and the support provided in ``PersistentActo
|
||||||
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
|
It does not have to be a persistent actor, but in case of failure or migration of entities between nodes it must be able to recover
|
||||||
its state if it is valuable.
|
its state if it is valuable.
|
||||||
|
|
||||||
Note how the ``persistenceId`` is defined. You may define it another way, but it must be unique.
|
Note how the ``persistenceId`` is defined. The name of the actor is the entity entity identifier (utf-8 URL-encoded).
|
||||||
|
You may define it another way, but it must be unique.
|
||||||
|
|
||||||
When using the sharding extension you are first, typically at system startup on each node
|
When using the sharding extension you are first, typically at system startup on each node
|
||||||
in the cluster, supposed to register the supported entity types with the ``ClusterSharding.start``
|
in the cluster, supposed to register the supported entity types with the ``ClusterSharding.start``
|
||||||
|
|
@ -126,8 +69,9 @@ overhead, and increased latency because the coordinator is involved in the routi
|
||||||
shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping
|
shard. The sharding algorithm must be the same on all nodes in a running cluster. It can be changed after stopping
|
||||||
all nodes in the cluster.
|
all nodes in the cluster.
|
||||||
|
|
||||||
A simple sharding algorithm that works fine in most cases is to take the ``hashCode`` of the entity identifier modulo
|
A simple sharding algorithm that works fine in most cases is to take the absolute value of the ``hashCode`` of
|
||||||
number of shards.
|
the entity identifier modulo number of shards. As a convenience this is provided by the
|
||||||
|
``ShardRegion.HashCodeMessageExtractor``.
|
||||||
|
|
||||||
Messages to the entities are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a
|
Messages to the entities are always sent via the local ``ShardRegion``. The ``ShardRegion`` actor reference for a
|
||||||
named entity type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``.
|
named entity type is returned by ``ClusterSharding.start`` and it can also be retrieved with ``ClusterSharding.shardRegion``.
|
||||||
|
|
@ -205,7 +149,7 @@ Thereafter the coordinator will reply to requests for the location of
|
||||||
the shard and thereby allocate a new home for the shard and then buffered messages in the
|
the shard and thereby allocate a new home for the shard and then buffered messages in the
|
||||||
``ShardRegion`` actors are delivered to the new location. This means that the state of the entities
|
``ShardRegion`` actors are delivered to the new location. This means that the state of the entities
|
||||||
are not transferred or migrated. If the state of the entities are of importance it should be
|
are not transferred or migrated. If the state of the entities are of importance it should be
|
||||||
persistent (durable), e.g. with ``akka-persistence``, so that it can be recovered at the new
|
persistent (durable), e.g. with :ref:`persistence-scala`, so that it can be recovered at the new
|
||||||
location.
|
location.
|
||||||
|
|
||||||
The logic that decides which shards to rebalance is defined in a pluggable shard
|
The logic that decides which shards to rebalance is defined in a pluggable shard
|
||||||
|
|
@ -217,7 +161,7 @@ must be to begin the rebalancing. This strategy can be replaced by an applicatio
|
||||||
implementation.
|
implementation.
|
||||||
|
|
||||||
The state of shard locations in the ``ShardCoordinator`` is persistent (durable) with
|
The state of shard locations in the ``ShardCoordinator`` is persistent (durable) with
|
||||||
``akka-persistence`` to survive failures. Since it is running in a cluster ``akka-persistence``
|
:ref:`persistence-scala` to survive failures. Since it is running in a cluster :ref:`persistence-scala`
|
||||||
must be configured with a distributed journal. When a crashed or unreachable coordinator
|
must be configured with a distributed journal. When a crashed or unreachable coordinator
|
||||||
node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton
|
node has been removed (via down) from the cluster a new ``ShardCoordinator`` singleton
|
||||||
actor will take over and the state is recovered. During such a failure period shards
|
actor will take over and the state is recovered. During such a failure period shards
|
||||||
|
|
@ -228,7 +172,7 @@ As long as a sender uses the same ``ShardRegion`` actor to deliver messages to a
|
||||||
actor the order of the messages is preserved. As long as the buffer limit is not reached
|
actor the order of the messages is preserved. As long as the buffer limit is not reached
|
||||||
messages are delivered on a best effort basis, with at-most once delivery semantics,
|
messages are delivered on a best effort basis, with at-most once delivery semantics,
|
||||||
in the same way as ordinary message sending. Reliable end-to-end messaging, with
|
in the same way as ordinary message sending. Reliable end-to-end messaging, with
|
||||||
at-least-once semantics can be added by using ``AtLeastOnceDelivery`` in ``akka-persistence``.
|
at-least-once semantics can be added by using ``AtLeastOnceDelivery`` in :ref:`persistence-scala`.
|
||||||
|
|
||||||
Some additional latency is introduced for messages targeted to new or previously
|
Some additional latency is introduced for messages targeted to new or previously
|
||||||
unused shards due to the round-trip to the coordinator. Rebalancing of shards may
|
unused shards due to the round-trip to the coordinator. Rebalancing of shards may
|
||||||
|
|
@ -275,7 +219,7 @@ for that entity has been received in the ``Shard``. Entities will not be restart
|
||||||
using a ``Passivate``.
|
using a ``Passivate``.
|
||||||
|
|
||||||
Note that the state of the entities themselves will not be restored unless they have been made persistent,
|
Note that the state of the entities themselves will not be restored unless they have been made persistent,
|
||||||
e.g. with ``akka-persistence``.
|
e.g. with :ref:`persistence-scala`.
|
||||||
|
|
||||||
Graceful Shutdown
|
Graceful Shutdown
|
||||||
-----------------
|
-----------------
|
||||||
|
|
@ -288,11 +232,7 @@ triggered by the coordinator. When the shards have been stopped the coordinator
|
||||||
|
|
||||||
When the ``ShardRegion`` has terminated you probably want to ``leave`` the cluster, and shut down the ``ActorSystem``.
|
When the ``ShardRegion`` has terminated you probably want to ``leave`` the cluster, and shut down the ``ActorSystem``.
|
||||||
|
|
||||||
This is how to do it in Java:
|
This is how to do that:
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#graceful-shutdown
|
|
||||||
|
|
||||||
This is how to do it in Scala:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala#graceful-shutdown
|
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala#graceful-shutdown
|
||||||
|
|
||||||
|
|
@ -316,11 +256,15 @@ maven::
|
||||||
Configuration
|
Configuration
|
||||||
-------------
|
-------------
|
||||||
|
|
||||||
The ``ClusterSharding`` extension can be configured with the following properties:
|
The ``ClusterSharding`` extension can be configured with the following properties. These configuration
|
||||||
|
properties are read by the ``ClusterShardingSettings`` when created with a ``ActorSystem`` parameter.
|
||||||
|
It is also possible to amend the ``ClusterShardingSettings`` or create it from another config section
|
||||||
|
with the same layout as below. ``ClusterShardingSettings`` is a parameter to the ``start`` method of
|
||||||
|
the ``ClusterSharding`` extension, i.e. each each entity type can be configured with different settings
|
||||||
|
if needed.
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-sharding/src/main/resources/reference.conf#sharding-ext-config
|
.. includecode:: ../../../akka-cluster-sharding/src/main/resources/reference.conf#sharding-ext-config
|
||||||
|
|
||||||
Custom shard allocation strategy can be defined in an optional parameter to
|
Custom shard allocation strategy can be defined in an optional parameter to
|
||||||
``ClusterSharding.start``. See the API documentation of ``ShardAllocationStrategy``
|
``ClusterSharding.start``. See the API documentation of ``ShardAllocationStrategy`` for details of
|
||||||
(Scala) or ``AbstractShardAllocationStrategy`` (Java) for details of how to implement a custom
|
how to implement a custom shard allocation strategy.
|
||||||
shard allocation strategy.
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
.. _cluster-singleton:
|
.. _cluster-singleton-scala:
|
||||||
|
|
||||||
Cluster Singleton
|
Cluster Singleton
|
||||||
=================
|
=================
|
||||||
|
|
@ -61,7 +61,8 @@ This pattern may seem to be very tempting to use at first, but it has several dr
|
||||||
* the cluster singleton may quickly become a *performance bottleneck*,
|
* the cluster singleton may quickly become a *performance bottleneck*,
|
||||||
* you can not rely on the cluster singleton to be *non-stop* available — e.g. when the node on which the singleton has
|
* you can not rely on the cluster singleton to be *non-stop* available — e.g. when the node on which the singleton has
|
||||||
been running dies, it will take a few seconds for this to be noticed and the singleton be migrated to another node,
|
been running dies, it will take a few seconds for this to be noticed and the singleton be migrated to another node,
|
||||||
* in the case of a *network partition* appearing in a Cluster that is using Automatic Downing (see Auto Downing docs for :ref:`Scala <automatic-vs-manual-downing-scala>` or :ref:`Java <automatic-vs-manual-downing-java>`),
|
* in the case of a *network partition* appearing in a Cluster that is using Automatic Downing (see Auto Downing docs for
|
||||||
|
:ref:`automatic-vs-manual-downing-scala`),
|
||||||
it may happen that the isolated clusters each decide to spin up their own singleton, meaning that there might be multiple
|
it may happen that the isolated clusters each decide to spin up their own singleton, meaning that there might be multiple
|
||||||
singletons running in the system, yet the Clusters have no way of finding out about them (because of the partition).
|
singletons running in the system, yet the Clusters have no way of finding out about them (because of the partition).
|
||||||
|
|
||||||
|
|
@ -85,14 +86,8 @@ scenario when integrating with external systems.
|
||||||
On each node in the cluster you need to start the ``ClusterSingletonManager`` and
|
On each node in the cluster you need to start the ``ClusterSingletonManager`` and
|
||||||
supply the ``Props`` of the singleton actor, in this case the JMS queue consumer.
|
supply the ``Props`` of the singleton actor, in this case the JMS queue consumer.
|
||||||
|
|
||||||
In Scala:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala#create-singleton-manager
|
.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala#create-singleton-manager
|
||||||
|
|
||||||
In Java:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java#create-singleton-manager
|
|
||||||
|
|
||||||
Here we limit the singleton to nodes tagged with the ``"worker"`` role, but all nodes, independent of
|
Here we limit the singleton to nodes tagged with the ``"worker"`` role, but all nodes, independent of
|
||||||
role, can be used by not specifying ``withRole``.
|
role, can be used by not specifying ``withRole``.
|
||||||
|
|
||||||
|
|
@ -104,24 +99,13 @@ Here is how the singleton actor handles the ``terminationMessage`` in this examp
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala#consumer-end
|
.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala#consumer-end
|
||||||
|
|
||||||
Note that you can send back current state to the ``ClusterSingletonManager`` before terminating.
|
|
||||||
This message will be sent over to the ``ClusterSingletonManager`` at the new oldest node and it
|
|
||||||
will be passed to the ``singletonProps`` factory when creating the new singleton instance.
|
|
||||||
|
|
||||||
With the names given above, access to the singleton can be obtained from any cluster node using a properly
|
With the names given above, access to the singleton can be obtained from any cluster node using a properly
|
||||||
configured proxy.
|
configured proxy.
|
||||||
|
|
||||||
In Scala:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala#create-singleton-proxy
|
.. includecode:: ../../../akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala#create-singleton-proxy
|
||||||
|
|
||||||
In Java:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/singleton/ClusterSingletonManagerTest.java#create-singleton-proxy
|
|
||||||
|
|
||||||
A more comprehensive sample is available in the `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
A more comprehensive sample is available in the `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
||||||
tutorial named `Distributed workers with Akka and Scala! <http://www.typesafe.com/activator/template/akka-distributed-workers>`_
|
tutorial named `Distributed workers with Akka and Scala! <http://www.typesafe.com/activator/template/akka-distributed-workers>`_.
|
||||||
and `Distributed workers with Akka and Java! <http://www.typesafe.com/activator/template/akka-distributed-workers-java>`_.
|
|
||||||
|
|
||||||
Dependencies
|
Dependencies
|
||||||
------------
|
------------
|
||||||
|
|
@ -139,3 +123,23 @@ maven::
|
||||||
<artifactId>akka-cluster-tools_@binVersion@</artifactId>
|
<artifactId>akka-cluster-tools_@binVersion@</artifactId>
|
||||||
<version>@version@</version>
|
<version>@version@</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
|
||||||
|
Configuration
|
||||||
|
-------------
|
||||||
|
|
||||||
|
The following configuration properties are read by the ``ClusterSingletonManagerSettings``
|
||||||
|
when created with a ``ActorSystem`` parameter. It is also possible to amend the ``ClusterSingletonManagerSettings``
|
||||||
|
or create it from another config section with the same layout as below. ``ClusterSingletonManagerSettings`` is
|
||||||
|
a parameter to the ``ClusterSingletonManager.props`` factory method, i.e. each singleton can be configured
|
||||||
|
with different settings if needed.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#singleton-config
|
||||||
|
|
||||||
|
The following configuration properties are read by the ``ClusterSingletonProxySettings``
|
||||||
|
when created with a ``ActorSystem`` parameter. It is also possible to amend the ``ClusterSingletonProxySettings``
|
||||||
|
or create it from another config section with the same layout as below. ``ClusterSingletonProxySettings`` is
|
||||||
|
a parameter to the ``ClusterSingletonProxy.props`` factory method, i.e. each singleton proxy can be configured
|
||||||
|
with different settings if needed.
|
||||||
|
|
||||||
|
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#singleton-proxy-config
|
||||||
|
|
|
||||||
|
|
@ -307,7 +307,7 @@ you have exactly one actor of a certain type running somewhere in the cluster.
|
||||||
|
|
||||||
This can be implemented by subscribing to member events, but there are several corner
|
This can be implemented by subscribing to member events, but there are several corner
|
||||||
cases to consider. Therefore, this specific use case is made easily accessible by the
|
cases to consider. Therefore, this specific use case is made easily accessible by the
|
||||||
:ref:`cluster-singleton` in the contrib module.
|
:ref:`cluster-singleton-scala`.
|
||||||
|
|
||||||
Cluster Sharding
|
Cluster Sharding
|
||||||
^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^
|
||||||
|
|
@ -316,7 +316,7 @@ Distributes actors across several nodes in the cluster and supports interaction
|
||||||
with the actors using their logical identifier, but without having to care about
|
with the actors using their logical identifier, but without having to care about
|
||||||
their physical location in the cluster.
|
their physical location in the cluster.
|
||||||
|
|
||||||
See :ref:`cluster-sharding` in the contrib module.
|
See :ref:`cluster_sharding_scala`
|
||||||
|
|
||||||
Distributed Publish Subscribe
|
Distributed Publish Subscribe
|
||||||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
|
||||||
|
|
@ -325,7 +325,7 @@ Publish-subscribe messaging between actors in the cluster, and point-to-point me
|
||||||
using the logical path of the actors, i.e. the sender does not have to know on which
|
using the logical path of the actors, i.e. the sender does not have to know on which
|
||||||
node the destination actor is running.
|
node the destination actor is running.
|
||||||
|
|
||||||
See :ref:`distributed-pub-sub` in the contrib module.
|
See :ref:`distributed-pub-sub-scala`.
|
||||||
|
|
||||||
Cluster Client
|
Cluster Client
|
||||||
^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^
|
||||||
|
|
@ -334,7 +334,15 @@ Communication from an actor system that is not part of the cluster to actors run
|
||||||
somewhere in the cluster. The client does not have to know on which node the destination
|
somewhere in the cluster. The client does not have to know on which node the destination
|
||||||
actor is running.
|
actor is running.
|
||||||
|
|
||||||
See :ref:`cluster-client` in the contrib module.
|
See :ref:`cluster-client-scala`.
|
||||||
|
|
||||||
|
Distributed Data
|
||||||
|
^^^^^^^^^^^^^^^^
|
||||||
|
|
||||||
|
*Akka Distributed Data* is useful when you need to share data between nodes in an
|
||||||
|
Akka Cluster. The data is accessed with an actor providing a key-value store like API.
|
||||||
|
|
||||||
|
See :ref:`distributed_data_scala`.
|
||||||
|
|
||||||
Failure Detector
|
Failure Detector
|
||||||
^^^^^^^^^^^^^^^^
|
^^^^^^^^^^^^^^^^
|
||||||
|
|
@ -530,7 +538,7 @@ Router Example with Pool of Remote Deployed Routees
|
||||||
---------------------------------------------------
|
---------------------------------------------------
|
||||||
|
|
||||||
Let's take a look at how to use a cluster aware router on single master node that creates
|
Let's take a look at how to use a cluster aware router on single master node that creates
|
||||||
and deploys workers. To keep track of a single master we use the :ref:`cluster-singleton`
|
and deploys workers. To keep track of a single master we use the :ref:`cluster-singleton-scala`
|
||||||
in the contrib module. The ``ClusterSingletonManager`` is started on each node.
|
in the contrib module. The ``ClusterSingletonManager`` is started on each node.
|
||||||
|
|
||||||
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala#create-singleton-manager
|
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/stats/StatsSampleOneMaster.scala#create-singleton-manager
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
.. _distributed-pub-sub:
|
.. _distributed-pub-sub-scala:
|
||||||
|
|
||||||
Distributed Publish Subscribe in Cluster
|
Distributed Publish Subscribe in Cluster
|
||||||
========================================
|
========================================
|
||||||
|
|
@ -12,18 +12,18 @@ This pattern provides a mediator actor, ``akka.cluster.pubsub.DistributedPubSubM
|
||||||
that manages a registry of actor references and replicates the entries to peer
|
that manages a registry of actor references and replicates the entries to peer
|
||||||
actors among all cluster nodes or a group of nodes tagged with a specific role.
|
actors among all cluster nodes or a group of nodes tagged with a specific role.
|
||||||
|
|
||||||
The `DistributedPubSubMediator` is supposed to be started on all nodes,
|
The ``DistributedPubSubMediator`` actor is supposed to be started on all nodes,
|
||||||
or all nodes with specified role, in the cluster. The mediator can be
|
or all nodes with specified role, in the cluster. The mediator can be
|
||||||
started with the ``DistributedPubSub`` or as an ordinary actor.
|
started with the ``DistributedPubSub`` extension or as an ordinary actor.
|
||||||
|
|
||||||
Changes are only performed in the own part of the registry and those changes
|
The registry is eventually consistent, i.e. changes are not immediately visible at
|
||||||
are versioned. Deltas are disseminated in a scalable way to other nodes with
|
other nodes, but typically they will be fully replicated to all other nodes after
|
||||||
a gossip protocol. The registry is eventually consistent, i.e. changes are not
|
a few seconds. Changes are only performed in the own part of the registry and those
|
||||||
immediately visible at other nodes, but typically they will be fully replicated
|
changes are versioned. Deltas are disseminated in a scalable way to other nodes with
|
||||||
to all other nodes after a few seconds.
|
a gossip protocol.
|
||||||
|
|
||||||
You can send messages via the mediator on any node to registered actors on
|
You can send messages via the mediator on any node to registered actors on
|
||||||
any other node. There is four modes of message delivery.
|
any other node. There are four modes of message delivery.
|
||||||
|
|
||||||
**1. DistributedPubSubMediator.Send**
|
**1. DistributedPubSubMediator.Send**
|
||||||
|
|
||||||
|
|
@ -79,28 +79,8 @@ Successful ``Subscribe`` and ``Unsubscribe`` is acknowledged with
|
||||||
``DistributedPubSubMediator.SubscribeAck`` and ``DistributedPubSubMediator.UnsubscribeAck``
|
``DistributedPubSubMediator.SubscribeAck`` and ``DistributedPubSubMediator.UnsubscribeAck``
|
||||||
replies.
|
replies.
|
||||||
|
|
||||||
A Small Example in Java
|
A Small Example
|
||||||
-----------------------
|
---------------
|
||||||
|
|
||||||
A subscriber actor:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java#subscriber
|
|
||||||
|
|
||||||
Subscriber actors can be started on several nodes in the cluster, and all will receive
|
|
||||||
messages published to the "content" topic.
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java#start-subscribers
|
|
||||||
|
|
||||||
A simple actor that publishes to this "content" topic:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java#publisher
|
|
||||||
|
|
||||||
It can publish messages to the topic from anywhere in the cluster:
|
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java#publish-message
|
|
||||||
|
|
||||||
A Small Example in Scala
|
|
||||||
------------------------
|
|
||||||
|
|
||||||
A subscriber actor:
|
A subscriber actor:
|
||||||
|
|
||||||
|
|
@ -122,16 +102,16 @@ It can publish messages to the topic from anywhere in the cluster:
|
||||||
A more comprehensive sample is available in the `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
A more comprehensive sample is available in the `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
|
||||||
tutorial named `Akka Clustered PubSub with Scala! <http://www.typesafe.com/activator/template/akka-clustering>`_.
|
tutorial named `Akka Clustered PubSub with Scala! <http://www.typesafe.com/activator/template/akka-clustering>`_.
|
||||||
|
|
||||||
DistributedPubSub
|
DistributedPubSub Extension
|
||||||
--------------------------
|
---------------------------
|
||||||
|
|
||||||
In the example above the mediator is started and accessed with the ``akka.cluster.pubsub.DistributedPubSub``.
|
In the example above the mediator is started and accessed with the ``akka.cluster.pubsub.DistributedPubSub`` extension.
|
||||||
That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to
|
That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to
|
||||||
start the mediator actor as an ordinary actor and you can have several different mediators at the same
|
start the mediator actor as an ordinary actor and you can have several different mediators at the same
|
||||||
time to be able to divide a large number of actors/topics to different mediators. For example you might
|
time to be able to divide a large number of actors/topics to different mediators. For example you might
|
||||||
want to use different cluster roles for different mediators.
|
want to use different cluster roles for different mediators.
|
||||||
|
|
||||||
The ``DistributedPubSub`` can be configured with the following properties:
|
The ``DistributedPubSub`` extension can be configured with the following properties:
|
||||||
|
|
||||||
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#pub-sub-ext-config
|
.. includecode:: ../../../akka-cluster-tools/src/main/resources/reference.conf#pub-sub-ext-config
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -147,7 +147,7 @@ Similarly to `Actor Classification`_, :class:`EventStream` will automatically re
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
The event stream is a *local facility*, meaning that it will *not* distribute events to other nodes in a clustered environment (unless you subscribe a Remote Actor to the stream explicitly).
|
The event stream is a *local facility*, meaning that it will *not* distribute events to other nodes in a clustered environment (unless you subscribe a Remote Actor to the stream explicitly).
|
||||||
If you need to broadcast events in an Akka cluster, *without* knowing your recipients explicitly (i.e. obtaining their ActorRefs), you may want to look into: :ref:`distributed-pub-sub`.
|
If you need to broadcast events in an Akka cluster, *without* knowing your recipients explicitly (i.e. obtaining their ActorRefs), you may want to look into: :ref:`distributed-pub-sub-scala`.
|
||||||
|
|
||||||
Default Handlers
|
Default Handlers
|
||||||
----------------
|
----------------
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue