From 33407e39de7f07915cfee6dc7b5aa281d3242d16 Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Mon, 20 May 2013 13:45:13 +0200 Subject: [PATCH] Added optional property (skipSenderNode) to PubSub.SendToAll if the message should be sent to a matching path on the sender node or not. Added optional property (skipSenderNode) to PubSub.SendToAll. Deciding if the SendToAll message should be sent to a matching path on the sender cluster node or not. + Test and Docs. --- akka-contrib/docs/distributed-pub-sub.rst | 3 +- .../pattern/DistributedPubSubMediator.scala | 11 ++++--- .../DistributedPubSubMediatorSpec.scala | 32 ++++++++++++++++--- 3 files changed, 36 insertions(+), 10 deletions(-) diff --git a/akka-contrib/docs/distributed-pub-sub.rst b/akka-contrib/docs/distributed-pub-sub.rst index 01aebd0aed..6394934a51 100644 --- a/akka-contrib/docs/distributed-pub-sub.rst +++ b/akka-contrib/docs/distributed-pub-sub.rst @@ -43,7 +43,8 @@ the same path, without address information, can be registered on different nodes On each node there can only be one such actor, since the path is unique within one local actor system. Typical usage of this mode is to broadcast messages to all replicas with the same path, e.g. 3 actors on different nodes that all perform the same actions, -for redundancy. +for redundancy. You can also optionally specify a property (``skipSenderNode``) deciding +if the message should be sent to a matching path on the sender node or not. **3. DistributedPubSubMediator.Publish** diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index 8559e9e2a6..e2de1b22f0 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -67,7 +67,7 @@ object DistributedPubSubMediator { @SerialVersionUID(1L) case class Send(path: String, msg: Any, localAffinity: Boolean) @SerialVersionUID(1L) - case class SendToAll(path: String, msg: Any) + case class SendToAll(path: String, msg: Any, skipSenderNode: Boolean = false) @SerialVersionUID(1L) case class Publish(topic: String, msg: Any) @@ -267,8 +267,8 @@ class DistributedPubSubMediator( refs(ThreadLocalRandom.current.nextInt(refs.size)) forward msg } - case SendToAll(path, msg) ⇒ - publish(path, msg) + case SendToAll(path, msg, skipSenderNode) ⇒ + publish(path, msg, skipSenderNode) case Publish(topic, msg) ⇒ publish(mkKey(self.path / URLEncoder.encode(topic, "utf-8")), msg) @@ -368,11 +368,12 @@ class DistributedPubSubMediator( sender ! count } - def publish(path: String, msg: Any): Unit = { + def publish(path: String, msg: Any, skipSenderNode: Boolean = false): Unit = { for { - (_, bucket) ← registry + (address, bucket) ← registry valueHolder ← bucket.content.get(path) ref ← valueHolder.ref + if !(skipSenderNode && address == selfAddress) // if we should skip sender node and current address == self address => skip } ref forward msg } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala index 18ab652d1c..66fd6364c8 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala @@ -34,6 +34,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { object TestChatUser { case class Whisper(path: String, msg: Any) case class Talk(path: String, msg: Any) + case class TalkToOthers(path: String, msg: Any) case class Shout(topic: String, msg: Any) } @@ -42,10 +43,11 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { import DistributedPubSubMediator._ def receive = { - case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true) - case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg) - case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg) - case msg ⇒ testActor ! msg + case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true) + case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg) + case TalkToOthers(path, msg) ⇒ mediator ! SendToAll(path, msg, skipSenderNode = true) + case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg) + case msg ⇒ testActor ! msg } } @@ -317,5 +319,27 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia enterBarrier("after-8") } + "send-all to all other nodes" in within(15 seconds) { + runOn(first, second, third) { // create the user on all nodes + val u11 = createChatUser("u11") + mediator ! Put(u11) + } + awaitCount(13) + enterBarrier("11-registered") + + runOn(third) { + chatUser("u5") ! TalkToOthers("/user/u11", "hi") // sendToAll to all other nodes + } + + runOn(first, second) { + expectMsg("hi") + lastSender.path.name must be("u11") + } + runOn(third) { + expectNoMsg(2.seconds) // sender node should not receive a message + } + + enterBarrier("after-11") + } } }