API adjustments to ClusterClient, see #3264

* Message wrappers in ClusterClient, so that DistributedPubSubMediator
  is not leaking to the client api
* Register methods in ClusterReceptionistExtension, for convenience and
  clarity
This commit is contained in:
Patrik Nordwall 2013-04-22 13:03:29 +02:00
parent a4b485968c
commit 7634964b96
3 changed files with 98 additions and 29 deletions

View file

@ -18,9 +18,27 @@ or as an ordinary actor.
You can send messages via the ``ClusterClient`` to any actor in the cluster that is registered
in the ``DistributedPubSubMediator`` used by the ``ClusterReceptionist``.
Messages are wrapped in ``DistributedPubSubMediator.Send``, ``DistributedPubSubMediator.SendToAll``
or ``DistributedPubSubMediator.Publish`` with the semantics described in
:ref:`distributed-pub-sub`.
The ``ClusterReceptionistExtension`` provides methods for registration of actors that
should be reachable from the client. Messages are wrapped in ``ClusterClient.Send``,
``ClusterClient.SendToAll`` or ``ClusterClient.Publish``.
**1. ClusterClient.Send**
The message will be delivered to one recipient with a matching path, if any such
exists. If several entries match the path the message will be delivered
to one random destination. The sender of the message can specify that local
affinity is preferred, i.e. the message is sent to an actor in the same local actor
system as the used receptionist actor, if any such exists, otherwise random to any other
matching entry.
**2. ClusterClient.SendToAll**
The message will be delivered to all recipients with a matching path.
**3. ClusterClient.Publish**
The message will be delivered to all recipients Actors that have been registered as subscribers
to the named topic.
Response messages from the destination actor are tunneled via the receptionist
to avoid inbound connections from other cluster nodes to the client, i.e.
@ -55,7 +73,7 @@ ClusterReceptionistExtension
In the example above the receptionist is started and accessed with the ``akka.contrib.pattern.ClusterReceptionistExtension``.
That is convenient and perfectly fine in most cases, but it can be good to know that it is possible to
start the ````akka.contrib.pattern.ClusterReceptionist`` actor as an ordinary actor and you can have several
start the ``akka.contrib.pattern.ClusterReceptionist`` actor as an ordinary actor and you can have several
different receptionists at the same time, serving different types of clients.
The ``ClusterReceptionistExtension`` can be configured with the following properties:

View file

@ -59,6 +59,13 @@ object ClusterClient {
props(initialContacts.asScala.toSet)
}
@SerialVersionUID(1L)
case class Send(path: String, msg: Any, localAffinity: Boolean)
@SerialVersionUID(1L)
case class SendToAll(path: String, msg: Any)
@SerialVersionUID(1L)
case class Publish(topic: String, msg: Any)
/**
* INTERNAL API
*/
@ -79,10 +86,24 @@ object ClusterClient {
* contacts, i.e. not necessarily the initial contact points.
*
* You can send messages via the `ClusterClient` to any actor in the cluster
* that is registered in the [[DistributedPubSubMediator]] used by the [[ClusterReceptionist]].
* Messages are wrapped in [[DistributedPubSubMediator.Send]], [[DistributedPubSubMediator.SendToAll]]
* or [[DistributedPubSubMediator.Publish]] with the semantics described in
* [[DistributedPubSubMediator]].
* that is registered in the [[ClusterReceptionist]].
* Messages are wrapped in [[ClusterClient.Send]], [[ClusterClient.SendToAll]]
* or [[ClusterClient.Publish]].
*
* 1. [[ClusterClient.Send]] -
* The message will be delivered to one recipient with a matching path, if any such
* exists. If several entries match the path the message will be delivered
* to one random destination. The sender of the message can specify that local
* affinity is preferred, i.e. the message is sent to an actor in the same local actor
* system as the used receptionist actor, if any such exists, otherwise random to any other
* matching entry.
*
* 2. [[ClusterClient.SendToAll]] -
* The message will be delivered to all recipients with a matching path.
*
* 3. [[ClusterClient.Publish]] -
* The message will be delivered to all recipients Actors that have been registered as subscribers to
* to the named topic.
*
* Use the factory method [[ClusterClient#props]]) to create the
* [[akka.actor.Props]] for the actor.
@ -96,7 +117,6 @@ class ClusterClient(
import ClusterClient._
import ClusterClient.Internal._
import ClusterReceptionist.Internal._
import DistributedPubSubMediator.{ Send, SendToAll, Publish }
var contacts: immutable.IndexedSeq[ActorSelection] = initialContacts.toVector
sendGetContacts()
@ -141,8 +161,12 @@ class ClusterClient(
var pongTimestamp = System.nanoTime
{
case msg @ (_: Send | _: SendToAll | _: Publish)
receptionist forward msg
case Send(path, msg, localAffinity)
receptionist forward DistributedPubSubMediator.Send(path, msg, localAffinity)
case SendToAll(path, msg)
receptionist forward DistributedPubSubMediator.SendToAll(path, msg)
case Publish(topic, msg)
receptionist forward DistributedPubSubMediator.Publish(topic, msg)
case PingTick
if (System.nanoTime - pongTimestamp > 3 * pingInterval.toNanos)
becomeEstablishing()
@ -200,12 +224,43 @@ class ClusterReceptionistExtension(system: ExtendedActorSystem) extends Extensio
/**
* Register the actors that should be reachable for the clients in this [[DistributedPubSubMediator]].
*/
def pubSubMediator: ActorRef = DistributedPubSubExtension(system).mediator
private def pubSubMediator: ActorRef = DistributedPubSubExtension(system).mediator
/**
* Register an actor that should be reachable for the clients.
* The clients can send messages to this actor with `Send` or `SendToAll` using
* the path elements of the `ActorRef`, e.g. `"/user/myservice"`.
*/
def registerService(actor: ActorRef): Unit =
pubSubMediator ! DistributedPubSubMediator.Put(actor)
/**
* A registered actor will be automatically unregistered when terminated,
* but it can also be explicitly unregistered before termination.
*/
def unregisterService(actor: ActorRef): Unit =
pubSubMediator ! DistributedPubSubMediator.Remove(actor.path.elements.mkString("/", "/", ""))
/**
* Register an actor that should be reachable for the clients to a named topic.
* Several actors can be registered to the same topic name, and all will receive
* published messages.
* The client can publish messages to this topic with `Publish`.
*/
def registerSubscriber(topic: String, actor: ActorRef): Unit =
pubSubMediator ! DistributedPubSubMediator.Subscribe(topic, actor)
/**
* A registered subscriber will be automatically unregistered when terminated,
* but it can also be explicitly unregistered before termination.
*/
def unregisterSubscriber(topic: String, actor: ActorRef): Unit =
pubSubMediator ! DistributedPubSubMediator.Unsubscribe(topic, actor)
/**
* The [[ClusterReceptionist]] actor
*/
val receptionist: ActorRef = {
private val receptionist: ActorRef = {
if (isTerminated)
system.deadLetters
else {

View file

@ -57,7 +57,6 @@ class ClusterClientMultiJvmNode5 extends ClusterClientSpec
class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterClientSpec._
import DistributedPubSubMediator._
override def initialParticipants = roles.size
@ -69,14 +68,13 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
enterBarrier(from.name + "-joined")
}
def createReceptionist(): ActorRef = ClusterReceptionistExtension(system).receptionist
def createReceptionist(): Unit = ClusterReceptionistExtension(system)
def mediator: ActorRef = ClusterReceptionistExtension(system).pubSubMediator
def receptionist: ActorRef = ClusterReceptionistExtension(system).receptionist
def receptionist: ClusterReceptionistExtension = ClusterReceptionistExtension(system)
def awaitCount(expected: Int): Unit = {
awaitAssert {
mediator ! Count
DistributedPubSubExtension(system).mediator ! DistributedPubSubMediator.Count
expectMsgType[Int] must be(expected)
}
}
@ -96,7 +94,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
join(fourth, first)
runOn(fourth) {
val service = system.actorOf(Props(classOf[TestService], testActor), "testService")
mediator ! Put(service)
receptionist.registerService(service)
}
runOn(first, second, third, fourth) {
awaitCount(1)
@ -110,7 +108,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
val c = system.actorOf(ClusterClient.props(initialContacts))
awaitAssert {
c ! Send("/user/testService", "hello", localAffinity = true)
c ! ClusterClient.Send("/user/testService", "hello", localAffinity = true)
expectMsg(1 second, "ack")
}
}
@ -128,23 +126,21 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
//#server
runOn(host1) {
val mediator = ClusterReceptionistExtension(system).pubSubMediator
val serviceA = system.actorOf(Props[Service], "serviceA")
mediator ! DistributedPubSubMediator.Put(serviceA)
receptionist.registerService(serviceA)
}
runOn(host2, host3) {
val mediator = ClusterReceptionistExtension(system).pubSubMediator
val serviceB = system.actorOf(Props[Service], "serviceB")
mediator ! DistributedPubSubMediator.Put(serviceB)
receptionist.registerService(serviceB)
}
//#server
//#client
runOn(client) {
val c = system.actorOf(ClusterClient.props(initialContacts))
c ! DistributedPubSubMediator.Send("/user/serviceA", "hello", localAffinity = true)
c ! DistributedPubSubMediator.SendToAll("/user/serviceB", "hi")
c ! ClusterClient.Send("/user/serviceA", "hello", localAffinity = true)
c ! ClusterClient.SendToAll("/user/serviceB", "hi")
}
//#client
@ -164,7 +160,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
"re-establish connection to receptionist when connection is lost" in within(30 seconds) {
runOn(first, second, third, fourth) {
val service2 = system.actorOf(Props(classOf[TestService], testActor), "service2")
mediator ! Put(service2)
receptionist.registerService(service2)
awaitCount(8)
}
enterBarrier("service2-replicated")
@ -173,7 +169,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
val c = system.actorOf(ClusterClient.props(initialContacts))
awaitAssert {
c ! Send("/user/service2", "bonjour", localAffinity = true)
c ! ClusterClient.Send("/user/service2", "bonjour", localAffinity = true)
expectMsg(1 second, "ack")
}
val lastSenderAddress = lastSender.path.address
@ -183,7 +179,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
}
testConductor.shutdown(receptionistRoleName, 0).await
awaitAssert {
c ! Send("/user/service2", "hi again", localAffinity = true)
c ! ClusterClient.Send("/user/service2", "hi again", localAffinity = true)
expectMsg(1 second, "ack")
}
}