diff --git a/akka-contrib/docs/cluster-client.rst b/akka-contrib/docs/cluster-client.rst new file mode 100644 index 0000000000..495e581ede --- /dev/null +++ b/akka-contrib/docs/cluster-client.rst @@ -0,0 +1,72 @@ +.. _cluster-client: + +Cluster Client +============== + +An actor system that is not part of the cluster can communicate with actors +somewhere in the cluster via this ``ClusterClient``. The client can of course be part of +another cluster. It only needs to know the location of one (or more) nodes to use as initial +contact point. It will establish a connection to a ``ClusterReceptionist`` somewhere in +the cluster. It will monitor the connection to the receptionist and establish a new +connection if the link goes down. When looking for a new receptionist it uses fresh +contact points retrieved from previous establishment, or periodically refreshed contacts, +i.e. not necessarily the initial contact points. + +The receptionist is supposed to be started on all nodes, or all nodes with specified role, +in the cluster. The receptionist can be started with the ``ClusterReceptionistExtension`` +or as an ordinary actor. + +You can send messages via the ``ClusterClient`` to any actor in the cluster that is registered +in the ``DistributedPubSubMediator`` used by the ``ClusterReceptionist``. +Messages are wrapped in ``DistributedPubSubMediator.Send``, ``DistributedPubSubMediator.SendToAll`` +or ``DistributedPubSubMediator.Publish`` with the semantics described in +:ref:`distributed-pub-sub`. + +Response messages from the destination actor are tunneled via the receptionist +to avoid inbound connections from other cluster nodes to the client, i.e. +the ``sender``, as seen by the destination actor, is not the client itself. +The ``sender`` of the response messages, as seen by the client, is preserved +as the original sender, so the client can choose to send subsequent messages +directly to the actor in the cluster. + +An Example +---------- + +On the cluster nodes you start the receptionist and register the actors that +should be available for the client. + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala#server + +On the client you create the ``ClusterClient`` actor and use it as a gateway for sending +messages to the actors identified by their path (without address information) somewhere +in the cluster. + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala#client + +The ``initialContacts`` parameter is a ``Set[ActorSelection]``, which can be created like this: + +.. includecode:: @contribSrc@/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala#initialContacts + +You will probably define the address information of the initial contact points in configuration or system property. + + +ClusterReceptionistExtension +---------------------------- + +In the example above the receptionist is started and accessed with the ``akka.contrib.pattern.ClusterReceptionistExtension``. +That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to +start the ````akka.contrib.pattern.ClusterReceptionist`` actor as an ordinary actor and you can have several +different receptionists at the same time, serving different types of clients. + +The ``ClusterReceptionistExtension`` can be configured with the following properties: + +.. includecode:: @contribSrc@/src/main/resources/reference.conf#receptionist-ext-config + +Note that the ``ClusterReceptionistExtension`` uses the ``DistributedPubSubExtension``, which is described +in :ref:`distributed-pub-sub`. + +It is recommended to load the extension when the actor system is started by defining it in +``akka.extensions`` configuration property:: + + akka.extensions = ["akka.contrib.pattern.ClusterReceptionistExtension"] + diff --git a/akka-contrib/docs/index.rst b/akka-contrib/docs/index.rst index ba2e911041..d4bd2d5112 100644 --- a/akka-contrib/docs/index.rst +++ b/akka-contrib/docs/index.rst @@ -36,6 +36,7 @@ The Current List of Modules peek-mailbox cluster-singleton distributed-pub-sub + cluster-client Suggested Way of Using these Contributions ------------------------------------------ diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf index 5f02965181..fa680512ef 100644 --- a/akka-contrib/src/main/resources/reference.conf +++ b/akka-contrib/src/main/resources/reference.conf @@ -22,3 +22,23 @@ akka.contrib.cluster.pub-sub { removed-time-to-live = 120s } # //#pub-sub-ext-config + + +# //#receptionist-ext-config +# Settings for the ClusterReceptionistExtension +akka.contrib.cluster.receptionist { + # Actor name of the ClusterReceptionist actor, /user/receptionist + name = receptionist + + # Start the receptionist on members tagged with this role. + # All members are used if undefined or empty. + role = "" + + # The receptionist will send this number of contact points to the client + number-of-contacts = 3 + + # The actor that tunnel response messages to the client will be stopped + # after this time of inactivity. + response-tunnel-receive-timeout = 30s +} +# //#receptionist-ext-config diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala new file mode 100644 index 0000000000..668f17e3c2 --- /dev/null +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala @@ -0,0 +1,359 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.contrib.pattern + +import java.net.URLEncoder +import scala.collection.immutable +import scala.concurrent.duration._ +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.ActorSelection +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.actor.Props +import akka.actor.ReceiveTimeout +import akka.actor.Terminated +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.Member +import akka.cluster.MemberStatus +import akka.routing.ConsistentHash +import akka.routing.MurmurHash + +/** + * INTERNAL API + */ +private object ClusterClient { + case object PingTick + case object RefreshContactsTick +} + +/** + * This actor is intended to be used on an external node that is not member + * of the cluster. It acts like a gateway for sending messages to actors + * somewhere in the cluster. From the initial contact points it will establish + * a connection to a [[ClusterReceptionist]] somewhere in the cluster. It will + * monitor the connection to the receptionist and establish a new connection if + * the link goes down. When looking for a new receptionist it uses fresh contact + * points retrieved from previous establishment, or periodically refreshed + * contacts, i.e. not necessarily the initial contact points. + * + * You can send messages via the `ClusterClient` to any actor in the cluster + * that is registered in the [[DistributedPubSubMediator]] used by the [[ClusterReceptionist]]. + * Messages are wrapped in [[DistributedPubSubMediator.Send]], [[DistributedPubSubMediator.SendToAll]] + * or [[DistributedPubSubMediator.Publish]] with the semantics described in + * [[DistributedPubSubMediator]]. + */ +class ClusterClient( + initialContacts: Set[ActorSelection], + pingInterval: FiniteDuration = 3.second, + refreshContactsInterval: FiniteDuration = 1.minute) + extends Actor with ActorLogging { + + import ClusterClient._ + import ClusterReceptionist._ + import DistributedPubSubMediator.{ Send, SendToAll, Publish } + + var contacts: immutable.IndexedSeq[ActorSelection] = initialContacts.toVector + sendGetContacts() + + import context.dispatcher + val refreshContactsTask = context.system.scheduler.schedule( + refreshContactsInterval, refreshContactsInterval, self, RefreshContactsTick) + val pingTask = context.system.scheduler.schedule(pingInterval, pingInterval, self, PingTick) + + override def postStop(): Unit = { + super.postStop() + refreshContactsTask.cancel() + pingTask.cancel() + } + + def receive = establishing + + def establishing: Actor.Receive = { + case Contacts(contactPoints) ⇒ + if (contactPoints.nonEmpty) { + contacts = contactPoints + contacts foreach { _ ! Identify(None) } + } + case ActorIdentity(_, Some(receptionist)) ⇒ + context watch receptionist + log.info("Connected to [{}]", receptionist.path) + context.become(active(receptionist)) + case ActorIdentity(_, None) ⇒ // ok, use another instead + case PingTick ⇒ sendGetContacts() + case Pong ⇒ + case RefreshContactsTick ⇒ + case msg ⇒ context.system.deadLetters forward msg + } + + def active(receptionist: ActorRef): Actor.Receive = { + def becomeEstablishing(): Unit = { + log.info("Lost contact with [{}], restablishing connection", receptionist) + sendGetContacts() + context.become(establishing) + } + + var pongTimestamp = System.nanoTime + + { + case msg @ (_: Send | _: SendToAll | _: Publish) ⇒ + receptionist forward msg + case PingTick ⇒ + if (System.nanoTime - pongTimestamp > 3 * pingInterval.toNanos) + becomeEstablishing() + else + receptionist ! Ping + case Pong ⇒ + pongTimestamp = System.nanoTime + case RefreshContactsTick ⇒ + receptionist ! GetContacts + case Terminated(`receptionist`) ⇒ + becomeEstablishing() + case Contacts(contactPoints) ⇒ + // refresh of contacts + if (contactPoints.nonEmpty) + contacts = contactPoints + case _: ActorIdentity ⇒ // ok, from previous establish, already handled + } + } + + def sendGetContacts(): Unit = { + if (contacts.isEmpty) initialContacts foreach { _ ! GetContacts } + else if (contacts.size == 1) (initialContacts ++ contacts) foreach { _ ! GetContacts } + else contacts foreach { _ ! GetContacts } + } +} + +/** + * Extension that starts [[ClusterReceptionist]] and accompanying [[DistributedPubSubMediator]] + * with settings defined in config section `akka.contrib.cluster.receptionist`. + * The [[DistributedPubSubMediator]] is started by the [[DistributedPubSubExtension]]. + */ +object ClusterReceptionistExtension extends ExtensionId[ClusterReceptionistExtension] with ExtensionIdProvider { + override def get(system: ActorSystem): ClusterReceptionistExtension = super.get(system) + + override def lookup = ClusterReceptionistExtension + + override def createExtension(system: ExtendedActorSystem): ClusterReceptionistExtension = + new ClusterReceptionistExtension(system) +} + +class ClusterReceptionistExtension(system: ExtendedActorSystem) extends Extension { + + private val config = system.settings.config.getConfig("akka.contrib.cluster.receptionist") + private val role: Option[String] = config.getString("role") match { + case "" ⇒ None + case r ⇒ Some(r) + } + + /** + * Returns true if this member is not tagged with the role configured for the + * receptionist. + */ + def isTerminated: Boolean = Cluster(system).isTerminated || !role.forall(Cluster(system).selfRoles.contains) + + /** + * Register the actors that should be reachable for the clients in this [[DistributedPubSubMediator]]. + */ + def pubSubMediator: ActorRef = DistributedPubSubExtension(system).mediator + + /** + * The [[ClusterReceptionist]] actor + */ + val receptionist: ActorRef = { + if (isTerminated) + system.deadLetters + else { + val numberOfContacts: Int = config.getInt("number-of-contacts") + val responseTunnelReceiveTimeout = + Duration(config.getMilliseconds("response-tunnel-receive-timeout"), MILLISECONDS) + val name = config.getString("name") + // important to use val mediator here to activate it outside of ClusterReceptionist constructor + val mediator = pubSubMediator + system.actorOf(Props(new ClusterReceptionist(mediator, role, numberOfContacts, + responseTunnelReceiveTimeout)), name) + } + } +} + +/** + * INTERNAL API + */ +private[pattern] object ClusterReceptionist { + @SerialVersionUID(1L) + case object GetContacts + @SerialVersionUID(1L) + case class Contacts(contactPoints: immutable.IndexedSeq[ActorSelection]) + @SerialVersionUID(1L) + case object Ping + @SerialVersionUID(1L) + case object Pong + + // FIXME change to akka.actor.Identify when that is in master + @SerialVersionUID(1L) + case class Identify(messageId: Any) + @SerialVersionUID(1L) + case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) + + def roleOption(role: String): Option[String] = role match { + case null | "" ⇒ None + case _ ⇒ Some(role) + } + + /** + * Replies are tunneled via this actor, child of the receptionist, to avoid + * inbound connections from other cluster nodes to the client. + */ + class ClientResponseTunnel(client: ActorRef, timeout: FiniteDuration) extends Actor { + context.setReceiveTimeout(timeout) + def receive = { + case Ping ⇒ // keep alive from client + case ReceiveTimeout ⇒ context stop self + case msg ⇒ client forward msg + } + } + +} + +/** + * [[ClusterClient]] connects to this actor to retrieve. The `ClusterReceptionist` is + * supposed to be started on all nodes, or all nodes with specified role, in the cluster. + * The receptionist can be started with the [[ClusterReceptionistExtension]] or as an + * ordinary actor. + * + * The receptionist forwards messages from the client to the associated [[DistributedPubSubMediator]], + * i.e. the client can send messages to any actor in the cluster that is registered in the + * `DistributedPubSubMediator`. Messages from the client are wrapped in + * [[DistributedPubSubMediator.Send]], [[DistributedPubSubMediator.SendToAll]] + * or [[DistributedPubSubMediator.Publish]] with the semantics described in + * [[DistributedPubSubMediator]]. + * + * Response messages from the destination actor are tunneled via the receptionist + * to avoid inbound connections from other cluster nodes to the client, i.e. + * the `sender`, as seen by the destination actor, is not the client itself. + * The `sender` of the response messages, as seen by the client, is preserved + * as the original sender, so the client can choose to send subsequent messages + * directly to the actor in the cluster. + */ +class ClusterReceptionist( + pubSubMediator: ActorRef, + role: Option[String], + numberOfContacts: Int = 3, + responseTunnelReceiveTimeout: FiniteDuration = 30.seconds) + extends Actor with ActorLogging { + + import DistributedPubSubMediator.{ Send, SendToAll, Publish } + + /** + * Java API constructor with default values. + */ + def this(pubSubMediator: ActorRef, role: String) = + this(pubSubMediator, ClusterReceptionist.roleOption(role)) + + import ClusterReceptionist._ + + val cluster = Cluster(context.system) + import cluster.selfAddress + + require(role.forall(cluster.selfRoles.contains), + s"This cluster member [${selfAddress}] doesn't have the role [$role]") + + var nodes: immutable.SortedSet[Address] = { + def hashFor(node: Address): Int = node match { + // cluster node identifier is the host and port of the address; protocol and system is assumed to be the same + case Address(_, _, Some(host), Some(port)) ⇒ MurmurHash.stringHash(s"${host}:${port}") + case _ ⇒ + throw new IllegalStateException(s"Unexpected address without host/port: [$node]") + } + implicit val ringOrdering: Ordering[Address] = Ordering.fromLessThan[Address] { (a, b) ⇒ + val ha = hashFor(a) + val hb = hashFor(b) + ha < hb || (ha == hb && Member.addressOrdering.compare(a, b) < 0) + } + immutable.SortedSet() + } + val virtualNodesFactor = 10 + var consistentHash: ConsistentHash[Address] = ConsistentHash(nodes, virtualNodesFactor) + + override def preStart(): Unit = { + super.preStart() + require(!cluster.isTerminated, "Cluster node must not be terminated") + cluster.subscribe(self, classOf[MemberEvent]) + } + + override def postStop(): Unit = { + super.postStop() + cluster unsubscribe self + } + + def matchingRole(m: Member): Boolean = role.forall(m.hasRole) + + def responseTunnel(client: ActorRef): ActorRef = { + val encName = URLEncoder.encode(client.path.toSerializationFormat, "utf-8") + context.child(encName) match { + case Some(tunnel) ⇒ tunnel + case None ⇒ + context.actorOf(Props(new ClientResponseTunnel(client, responseTunnelReceiveTimeout)), encName) + } + } + + def receive = { + case msg @ (_: Send | _: SendToAll | _: Publish) ⇒ + pubSubMediator.tell(msg, responseTunnel(sender)) + + case Ping ⇒ + responseTunnel(sender) ! Ping // keep alive + sender ! Pong + + // FIXME remove when akka.actor.Identify when is in master + case Identify(messageId) ⇒ + sender ! ActorIdentity(messageId, Some(self)) + + case GetContacts ⇒ + // Consistent hashing is used to ensure that the reply to GetContacts + // is the same from all nodes (most of the time) and it also + // load balances the client connections among the nodes in the cluster. + if (numberOfContacts >= nodes.size) { + sender ! Contacts(nodes.map(a ⇒ context.actorSelection(self.path.toStringWithAddress(a)))(collection.breakOut)) + } else { + // using toStringWithAddress in case the client is local, normally it is not, and + // toStringWithAddress will use the remote address of the client + val a = consistentHash.nodeFor(sender.path.toStringWithAddress(cluster.selfAddress)) + val slice = { + val first = nodes.from(a).tail.take(numberOfContacts) + if (first.size == numberOfContacts) first + else first ++ nodes.take(numberOfContacts - first.size) + } + sender ! Contacts(slice.map(a ⇒ context.actorSelection(self.path.toStringWithAddress(a)))(collection.breakOut)) + } + + case state: CurrentClusterState ⇒ + nodes = nodes.empty ++ state.members.collect { case m if m.status != MemberStatus.Joining && matchingRole(m) ⇒ m.address } + consistentHash = ConsistentHash(nodes, virtualNodesFactor) + + case MemberUp(m) ⇒ + if (matchingRole(m)) { + nodes += m.address + consistentHash = ConsistentHash(nodes, virtualNodesFactor) + } + + case MemberRemoved(m) ⇒ + if (m.address == selfAddress) + context stop self + else if (matchingRole(m)) { + nodes -= m.address + consistentHash = ConsistentHash(nodes, virtualNodesFactor) + } + + case _: MemberEvent ⇒ // not of interest + } + +} + diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala new file mode 100644 index 0000000000..ba0c8c6224 --- /dev/null +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala @@ -0,0 +1,199 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.contrib.pattern + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.actor.Address + +object ClusterClientSpec extends MultiNodeConfig { + val client = role("client") + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-join = off + akka.cluster.auto-down = on + """)) + + class TestService(testActor: ActorRef) extends Actor { + def receive = { + case msg ⇒ + testActor forward msg + sender ! "ack" + } + } + + class Service extends Actor { + def receive = { + case msg ⇒ sender ! msg + } + } + +} + +class ClusterClientMultiJvmNode1 extends ClusterClientSpec +class ClusterClientMultiJvmNode2 extends ClusterClientSpec +class ClusterClientMultiJvmNode3 extends ClusterClientSpec +class ClusterClientMultiJvmNode4 extends ClusterClientSpec +class ClusterClientMultiJvmNode5 extends ClusterClientSpec + +class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterClientSpec._ + import DistributedPubSubMediator._ + + override def initialParticipants = roles.size + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + createReceptionist() + } + enterBarrier(from.name + "-joined") + } + + def createReceptionist(): ActorRef = ClusterReceptionistExtension(system).receptionist + + def mediator: ActorRef = ClusterReceptionistExtension(system).pubSubMediator + def receptionist: ActorRef = ClusterReceptionistExtension(system).receptionist + + def awaitCount(expected: Int): Unit = { + awaitAssert { + mediator ! Count + expectMsgType[Int] must be(expected) + } + } + + def roleName(addr: Address): Option[RoleName] = roles.find(node(_).address == addr) + + def initialContacts = Set( + system.actorSelection(node(second) / "user" / "receptionist"), + system.actorSelection(node(third) / "user" / "receptionist")) + + "A ClusterClient" must { + + "startup cluster" in within(30 seconds) { + join(first, first) + join(second, first) + join(third, first) + join(fourth, first) + runOn(fourth) { + val service = system.actorOf(Props(new TestService(testActor)), "testService") + mediator ! Put(service) + } + runOn(first, second, third, fourth) { + awaitCount(1) + } + + enterBarrier("after-1") + } + + "communicate to actor on any node in cluster" in within(10 seconds) { + runOn(client) { + val c = system.actorOf(Props(new ClusterClient(initialContacts))) + + awaitAssert { + c ! Send("/user/testService", "hello", localAffinity = true) + expectMsg(1 second, "ack") + } + } + runOn(fourth) { + expectMsg("hello") + } + + enterBarrier("after-2") + } + + "demonstrate usage" in within(15 seconds) { + def host1 = first + def host2 = second + def host3 = third + + //#server + runOn(host1) { + val mediator = ClusterReceptionistExtension(system).pubSubMediator + val serviceA = system.actorOf(Props[Service], "serviceA") + mediator ! DistributedPubSubMediator.Put(serviceA) + } + + runOn(host2, host3) { + val mediator = ClusterReceptionistExtension(system).pubSubMediator + val serviceB = system.actorOf(Props[Service], "serviceB") + mediator ! DistributedPubSubMediator.Put(serviceB) + } + //#server + + //#client + runOn(client) { + val c = system.actorOf(Props(new ClusterClient(initialContacts))) + c ! DistributedPubSubMediator.Send("/user/serviceA", "hello", localAffinity = true) + c ! DistributedPubSubMediator.SendToAll("/user/serviceB", "hi") + } + //#client + + { //not used, only demo + //#initialContacts + val initialContacts = Set( + system.actorSelection("akka.tcp://OtherSys@host1:2552/user/receptionist"), + system.actorSelection("akka.tcp://OtherSys@host2:2552/user/receptionist")) + //#initialContacts + } + + // strange, barriers fail without this sleep + Thread.sleep(1000) + enterBarrier("after-3") + } + + "re-establish connection to receptionist when connection is lost" in within(30 seconds) { + runOn(first, second, third, fourth) { + val service2 = system.actorOf(Props(new TestService(testActor)), "service2") + mediator ! Put(service2) + awaitCount(8) + } + enterBarrier("service2-replicated") + + runOn(client) { + val c = system.actorOf(Props(new ClusterClient(initialContacts))) + + awaitAssert { + c ! Send("/user/service2", "bonjour", localAffinity = true) + expectMsg(1 second, "ack") + } + val lastSenderAddress = lastSender.path.address + val receptionistRoleName = roleName(lastSenderAddress) match { + case Some(r) ⇒ r + case None ⇒ fail("unexpected missing roleName: " + lastSender.path.address) + } + testConductor.shutdown(receptionistRoleName, 0).await + awaitAssert { + c ! Send("/user/service2", "hi again", localAffinity = true) + expectMsg(1 second, "ack") + } + } + enterBarrier("verifed-3") + receiveWhile(2 seconds) { + case "hi again" ⇒ + case other ⇒ fail("unexpected message: " + other) + } + enterBarrier("after-4") + } + + } +} diff --git a/akka-docs/rst/cluster/cluster-usage-java.rst b/akka-docs/rst/cluster/cluster-usage-java.rst index c8e3de3789..66a18af618 100644 --- a/akka-docs/rst/cluster/cluster-usage-java.rst +++ b/akka-docs/rst/cluster/cluster-usage-java.rst @@ -274,6 +274,11 @@ Distributed Publish Subscribe Pattern See :ref:`distributed-pub-sub` in the contrib module. +Cluster Client +^^^^^^^^^^^^^^ + +See :ref:`cluster-client` in the contrib module. + Failure Detector ^^^^^^^^^^^^^^^^ diff --git a/akka-docs/rst/cluster/cluster-usage-scala.rst b/akka-docs/rst/cluster/cluster-usage-scala.rst index a54f9a7472..4321cc169d 100644 --- a/akka-docs/rst/cluster/cluster-usage-scala.rst +++ b/akka-docs/rst/cluster/cluster-usage-scala.rst @@ -262,6 +262,11 @@ Distributed Publish Subscribe Pattern See :ref:`distributed-pub-sub` in the contrib module. +Cluster Client +^^^^^^^^^^^^^^ + +See :ref:`cluster-client` in the contrib module. + Failure Detector ^^^^^^^^^^^^^^^^