diff --git a/akka-contrib/docs/distributed-pub-sub.rst b/akka-contrib/docs/distributed-pub-sub.rst index 01aebd0aed..6394934a51 100644 --- a/akka-contrib/docs/distributed-pub-sub.rst +++ b/akka-contrib/docs/distributed-pub-sub.rst @@ -43,7 +43,8 @@ the same path, without address information, can be registered on different nodes On each node there can only be one such actor, since the path is unique within one local actor system. Typical usage of this mode is to broadcast messages to all replicas with the same path, e.g. 3 actors on different nodes that all perform the same actions, -for redundancy. +for redundancy. You can also optionally specify a property (``skipSenderNode``) deciding +if the message should be sent to a matching path on the sender node or not. **3. DistributedPubSubMediator.Publish** diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index 8559e9e2a6..e2de1b22f0 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -67,7 +67,7 @@ object DistributedPubSubMediator { @SerialVersionUID(1L) case class Send(path: String, msg: Any, localAffinity: Boolean) @SerialVersionUID(1L) - case class SendToAll(path: String, msg: Any) + case class SendToAll(path: String, msg: Any, skipSenderNode: Boolean = false) @SerialVersionUID(1L) case class Publish(topic: String, msg: Any) @@ -267,8 +267,8 @@ class DistributedPubSubMediator( refs(ThreadLocalRandom.current.nextInt(refs.size)) forward msg } - case SendToAll(path, msg) ⇒ - publish(path, msg) + case SendToAll(path, msg, skipSenderNode) ⇒ + publish(path, msg, skipSenderNode) case Publish(topic, msg) ⇒ publish(mkKey(self.path / URLEncoder.encode(topic, "utf-8")), msg) @@ -368,11 +368,12 @@ class DistributedPubSubMediator( sender ! count } - def publish(path: String, msg: Any): Unit = { + def publish(path: String, msg: Any, skipSenderNode: Boolean = false): Unit = { for { - (_, bucket) ← registry + (address, bucket) ← registry valueHolder ← bucket.content.get(path) ref ← valueHolder.ref + if !(skipSenderNode && address == selfAddress) // if we should skip sender node and current address == self address => skip } ref forward msg } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala index 18ab652d1c..66fd6364c8 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala @@ -34,6 +34,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { object TestChatUser { case class Whisper(path: String, msg: Any) case class Talk(path: String, msg: Any) + case class TalkToOthers(path: String, msg: Any) case class Shout(topic: String, msg: Any) } @@ -42,10 +43,11 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { import DistributedPubSubMediator._ def receive = { - case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true) - case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg) - case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg) - case msg ⇒ testActor ! msg + case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true) + case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg) + case TalkToOthers(path, msg) ⇒ mediator ! SendToAll(path, msg, skipSenderNode = true) + case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg) + case msg ⇒ testActor ! msg } } @@ -317,5 +319,27 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia enterBarrier("after-8") } + "send-all to all other nodes" in within(15 seconds) { + runOn(first, second, third) { // create the user on all nodes + val u11 = createChatUser("u11") + mediator ! Put(u11) + } + awaitCount(13) + enterBarrier("11-registered") + + runOn(third) { + chatUser("u5") ! TalkToOthers("/user/u11", "hi") // sendToAll to all other nodes + } + + runOn(first, second) { + expectMsg("hi") + lastSender.path.name must be("u11") + } + runOn(third) { + expectNoMsg(2.seconds) // sender node should not receive a message + } + + enterBarrier("after-11") + } } }