Added optional property (skipSenderNode) to PubSub.SendToAll

if the message should be sent to a matching path on the sender node or not.

Added optional property (skipSenderNode) to PubSub.SendToAll.
Deciding if the SendToAll message should be sent to a matching path on the sender cluster node or not. + Test and Docs.
This commit is contained in:
Jonas Boner 2013-05-20 13:45:13 +02:00
parent 336fb1b180
commit 33407e39de
3 changed files with 36 additions and 10 deletions

View file

@ -43,7 +43,8 @@ the same path, without address information, can be registered on different nodes
On each node there can only be one such actor, since the path is unique within one
local actor system. Typical usage of this mode is to broadcast messages to all replicas
with the same path, e.g. 3 actors on different nodes that all perform the same actions,
for redundancy.
for redundancy. You can also optionally specify a property (``skipSenderNode``) deciding
if the message should be sent to a matching path on the sender node or not.
**3. DistributedPubSubMediator.Publish**

View file

@ -67,7 +67,7 @@ object DistributedPubSubMediator {
@SerialVersionUID(1L)
case class Send(path: String, msg: Any, localAffinity: Boolean)
@SerialVersionUID(1L)
case class SendToAll(path: String, msg: Any)
case class SendToAll(path: String, msg: Any, skipSenderNode: Boolean = false)
@SerialVersionUID(1L)
case class Publish(topic: String, msg: Any)
@ -267,8 +267,8 @@ class DistributedPubSubMediator(
refs(ThreadLocalRandom.current.nextInt(refs.size)) forward msg
}
case SendToAll(path, msg)
publish(path, msg)
case SendToAll(path, msg, skipSenderNode)
publish(path, msg, skipSenderNode)
case Publish(topic, msg)
publish(mkKey(self.path / URLEncoder.encode(topic, "utf-8")), msg)
@ -368,11 +368,12 @@ class DistributedPubSubMediator(
sender ! count
}
def publish(path: String, msg: Any): Unit = {
def publish(path: String, msg: Any, skipSenderNode: Boolean = false): Unit = {
for {
(_, bucket) registry
(address, bucket) registry
valueHolder bucket.content.get(path)
ref valueHolder.ref
if !(skipSenderNode && address == selfAddress) // if we should skip sender node and current address == self address => skip
} ref forward msg
}

View file

@ -34,6 +34,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
object TestChatUser {
case class Whisper(path: String, msg: Any)
case class Talk(path: String, msg: Any)
case class TalkToOthers(path: String, msg: Any)
case class Shout(topic: String, msg: Any)
}
@ -42,10 +43,11 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
import DistributedPubSubMediator._
def receive = {
case Whisper(path, msg) mediator ! Send(path, msg, localAffinity = true)
case Talk(path, msg) mediator ! SendToAll(path, msg)
case Shout(topic, msg) mediator ! Publish(topic, msg)
case msg testActor ! msg
case Whisper(path, msg) mediator ! Send(path, msg, localAffinity = true)
case Talk(path, msg) mediator ! SendToAll(path, msg)
case TalkToOthers(path, msg) mediator ! SendToAll(path, msg, skipSenderNode = true)
case Shout(topic, msg) mediator ! Publish(topic, msg)
case msg testActor ! msg
}
}
@ -317,5 +319,27 @@ class DistributedPubSubMediatorSpec extends MultiNodeSpec(DistributedPubSubMedia
enterBarrier("after-8")
}
"send-all to all other nodes" in within(15 seconds) {
runOn(first, second, third) { // create the user on all nodes
val u11 = createChatUser("u11")
mediator ! Put(u11)
}
awaitCount(13)
enterBarrier("11-registered")
runOn(third) {
chatUser("u5") ! TalkToOthers("/user/u11", "hi") // sendToAll to all other nodes
}
runOn(first, second) {
expectMsg("hi")
lastSender.path.name must be("u11")
}
runOn(third) {
expectNoMsg(2.seconds) // sender node should not receive a message
}
enterBarrier("after-11")
}
}
}