diff --git a/akka-contrib/docs/cluster-client.rst b/akka-contrib/docs/cluster-client.rst index 495e581ede..bec4761cff 100644 --- a/akka-contrib/docs/cluster-client.rst +++ b/akka-contrib/docs/cluster-client.rst @@ -18,9 +18,27 @@ or as an ordinary actor. You can send messages via the ``ClusterClient`` to any actor in the cluster that is registered in the ``DistributedPubSubMediator`` used by the ``ClusterReceptionist``. -Messages are wrapped in ``DistributedPubSubMediator.Send``, ``DistributedPubSubMediator.SendToAll`` -or ``DistributedPubSubMediator.Publish`` with the semantics described in -:ref:`distributed-pub-sub`. +The ``ClusterReceptionistExtension`` provides methods for registration of actors that +should be reachable from the client. Messages are wrapped in ``ClusterClient.Send``, +``ClusterClient.SendToAll`` or ``ClusterClient.Publish``. + +**1. ClusterClient.Send** + +The message will be delivered to one recipient with a matching path, if any such +exists. If several entries match the path the message will be delivered +to one random destination. The sender of the message can specify that local +affinity is preferred, i.e. the message is sent to an actor in the same local actor +system as the used receptionist actor, if any such exists, otherwise random to any other +matching entry. + +**2. ClusterClient.SendToAll** + +The message will be delivered to all recipients with a matching path. + +**3. ClusterClient.Publish** + +The message will be delivered to all recipients Actors that have been registered as subscribers +to the named topic. Response messages from the destination actor are tunneled via the receptionist to avoid inbound connections from other cluster nodes to the client, i.e. @@ -55,7 +73,7 @@ ClusterReceptionistExtension In the example above the receptionist is started and accessed with the ``akka.contrib.pattern.ClusterReceptionistExtension``. That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to -start the ````akka.contrib.pattern.ClusterReceptionist`` actor as an ordinary actor and you can have several +start the ``akka.contrib.pattern.ClusterReceptionist`` actor as an ordinary actor and you can have several different receptionists at the same time, serving different types of clients. The ``ClusterReceptionistExtension`` can be configured with the following properties: diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala index 7a774bc4e8..c1c3dbbcf6 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterClient.scala @@ -59,6 +59,13 @@ object ClusterClient { props(initialContacts.asScala.toSet) } + @SerialVersionUID(1L) + case class Send(path: String, msg: Any, localAffinity: Boolean) + @SerialVersionUID(1L) + case class SendToAll(path: String, msg: Any) + @SerialVersionUID(1L) + case class Publish(topic: String, msg: Any) + /** * INTERNAL API */ @@ -79,10 +86,24 @@ object ClusterClient { * contacts, i.e. not necessarily the initial contact points. * * You can send messages via the `ClusterClient` to any actor in the cluster - * that is registered in the [[DistributedPubSubMediator]] used by the [[ClusterReceptionist]]. - * Messages are wrapped in [[DistributedPubSubMediator.Send]], [[DistributedPubSubMediator.SendToAll]] - * or [[DistributedPubSubMediator.Publish]] with the semantics described in - * [[DistributedPubSubMediator]]. + * that is registered in the [[ClusterReceptionist]]. + * Messages are wrapped in [[ClusterClient.Send]], [[ClusterClient.SendToAll]] + * or [[ClusterClient.Publish]]. + * + * 1. [[ClusterClient.Send]] - + * The message will be delivered to one recipient with a matching path, if any such + * exists. If several entries match the path the message will be delivered + * to one random destination. The sender of the message can specify that local + * affinity is preferred, i.e. the message is sent to an actor in the same local actor + * system as the used receptionist actor, if any such exists, otherwise random to any other + * matching entry. + * + * 2. [[ClusterClient.SendToAll]] - + * The message will be delivered to all recipients with a matching path. + * + * 3. [[ClusterClient.Publish]] - + * The message will be delivered to all recipients Actors that have been registered as subscribers to + * to the named topic. * * Use the factory method [[ClusterClient#props]]) to create the * [[akka.actor.Props]] for the actor. @@ -96,7 +117,6 @@ class ClusterClient( import ClusterClient._ import ClusterClient.Internal._ import ClusterReceptionist.Internal._ - import DistributedPubSubMediator.{ Send, SendToAll, Publish } var contacts: immutable.IndexedSeq[ActorSelection] = initialContacts.toVector sendGetContacts() @@ -141,8 +161,12 @@ class ClusterClient( var pongTimestamp = System.nanoTime { - case msg @ (_: Send | _: SendToAll | _: Publish) ⇒ - receptionist forward msg + case Send(path, msg, localAffinity) ⇒ + receptionist forward DistributedPubSubMediator.Send(path, msg, localAffinity) + case SendToAll(path, msg) ⇒ + receptionist forward DistributedPubSubMediator.SendToAll(path, msg) + case Publish(topic, msg) ⇒ + receptionist forward DistributedPubSubMediator.Publish(topic, msg) case PingTick ⇒ if (System.nanoTime - pongTimestamp > 3 * pingInterval.toNanos) becomeEstablishing() @@ -200,12 +224,43 @@ class ClusterReceptionistExtension(system: ExtendedActorSystem) extends Extensio /** * Register the actors that should be reachable for the clients in this [[DistributedPubSubMediator]]. */ - def pubSubMediator: ActorRef = DistributedPubSubExtension(system).mediator + private def pubSubMediator: ActorRef = DistributedPubSubExtension(system).mediator + + /** + * Register an actor that should be reachable for the clients. + * The clients can send messages to this actor with `Send` or `SendToAll` using + * the path elements of the `ActorRef`, e.g. `"/user/myservice"`. + */ + def registerService(actor: ActorRef): Unit = + pubSubMediator ! DistributedPubSubMediator.Put(actor) + + /** + * A registered actor will be automatically unregistered when terminated, + * but it can also be explicitly unregistered before termination. + */ + def unregisterService(actor: ActorRef): Unit = + pubSubMediator ! DistributedPubSubMediator.Remove(actor.path.elements.mkString("/", "/", "")) + + /** + * Register an actor that should be reachable for the clients to a named topic. + * Several actors can be registered to the same topic name, and all will receive + * published messages. + * The client can publish messages to this topic with `Publish`. + */ + def registerSubscriber(topic: String, actor: ActorRef): Unit = + pubSubMediator ! DistributedPubSubMediator.Subscribe(topic, actor) + + /** + * A registered subscriber will be automatically unregistered when terminated, + * but it can also be explicitly unregistered before termination. + */ + def unregisterSubscriber(topic: String, actor: ActorRef): Unit = + pubSubMediator ! DistributedPubSubMediator.Unsubscribe(topic, actor) /** * The [[ClusterReceptionist]] actor */ - val receptionist: ActorRef = { + private val receptionist: ActorRef = { if (isTerminated) system.deadLetters else { diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala index 11fb75a9e6..443c47ead3 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterClientSpec.scala @@ -57,7 +57,6 @@ class ClusterClientMultiJvmNode5 extends ClusterClientSpec class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNodeSpec with ImplicitSender { import ClusterClientSpec._ - import DistributedPubSubMediator._ override def initialParticipants = roles.size @@ -69,14 +68,13 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod enterBarrier(from.name + "-joined") } - def createReceptionist(): ActorRef = ClusterReceptionistExtension(system).receptionist + def createReceptionist(): Unit = ClusterReceptionistExtension(system) - def mediator: ActorRef = ClusterReceptionistExtension(system).pubSubMediator - def receptionist: ActorRef = ClusterReceptionistExtension(system).receptionist + def receptionist: ClusterReceptionistExtension = ClusterReceptionistExtension(system) def awaitCount(expected: Int): Unit = { awaitAssert { - mediator ! Count + DistributedPubSubExtension(system).mediator ! DistributedPubSubMediator.Count expectMsgType[Int] must be(expected) } } @@ -96,7 +94,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod join(fourth, first) runOn(fourth) { val service = system.actorOf(Props(classOf[TestService], testActor), "testService") - mediator ! Put(service) + receptionist.registerService(service) } runOn(first, second, third, fourth) { awaitCount(1) @@ -110,7 +108,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod val c = system.actorOf(ClusterClient.props(initialContacts)) awaitAssert { - c ! Send("/user/testService", "hello", localAffinity = true) + c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true) expectMsg(1 second, "ack") } } @@ -128,23 +126,21 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod //#server runOn(host1) { - val mediator = ClusterReceptionistExtension(system).pubSubMediator val serviceA = system.actorOf(Props[Service], "serviceA") - mediator ! DistributedPubSubMediator.Put(serviceA) + receptionist.registerService(serviceA) } runOn(host2, host3) { - val mediator = ClusterReceptionistExtension(system).pubSubMediator val serviceB = system.actorOf(Props[Service], "serviceB") - mediator ! DistributedPubSubMediator.Put(serviceB) + receptionist.registerService(serviceB) } //#server //#client runOn(client) { val c = system.actorOf(ClusterClient.props(initialContacts)) - c ! DistributedPubSubMediator.Send("/user/serviceA", "hello", localAffinity = true) - c ! DistributedPubSubMediator.SendToAll("/user/serviceB", "hi") + c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true) + c ! ClusterClient.SendToAll("/user/serviceB", "hi") } //#client @@ -164,7 +160,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod "re-establish connection to receptionist when connection is lost" in within(30 seconds) { runOn(first, second, third, fourth) { val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2") - mediator ! Put(service2) + receptionist.registerService(service2) awaitCount(8) } enterBarrier("service2-replicated") @@ -173,7 +169,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod val c = system.actorOf(ClusterClient.props(initialContacts)) awaitAssert { - c ! Send("/user/service2", "bonjour", localAffinity = true) + c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true) expectMsg(1 second, "ack") } val lastSenderAddress = lastSender.path.address @@ -183,7 +179,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod } testConductor.shutdown(receptionistRoleName, 0).await awaitAssert { - c ! Send("/user/service2", "hi again", localAffinity = true) + c ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true) expectMsg(1 second, "ack") } }