Minor fixes after review.

This commit is contained in:
Jonas Boner 2013-05-21 09:05:40 +02:00
parent 33407e39de
commit 5d524872c7
3 changed files with 16 additions and 23 deletions

View file

@ -43,8 +43,8 @@ the same path, without address information, can be registered on different nodes
On each node there can only be one such actor, since the path is unique within one On each node there can only be one such actor, since the path is unique within one
local actor system. Typical usage of this mode is to broadcast messages to all replicas local actor system. Typical usage of this mode is to broadcast messages to all replicas
with the same path, e.g. 3 actors on different nodes that all perform the same actions, with the same path, e.g. 3 actors on different nodes that all perform the same actions,
for redundancy. You can also optionally specify a property (``skipSenderNode``) deciding for redundancy. You can also optionally specify a property (``allButSelf``) deciding
if the message should be sent to a matching path on the sender node or not. if the message should be sent to a matching path on the self node or not.
**3. DistributedPubSubMediator.Publish** **3. DistributedPubSubMediator.Publish**

View file

@ -52,24 +52,17 @@ object DistributedPubSubMediator {
*/ */
def defaultProps(role: String): Props = props(Internal.roleOption(role)) def defaultProps(role: String): Props = props(Internal.roleOption(role))
@SerialVersionUID(1L) @SerialVersionUID(1L) case class Put(ref: ActorRef)
case class Put(ref: ActorRef) @SerialVersionUID(1L) case class Remove(path: String)
@SerialVersionUID(1L) @SerialVersionUID(1L) case class Subscribe(topic: String, ref: ActorRef)
case class Remove(path: String) @SerialVersionUID(1L) case class Unsubscribe(topic: String, ref: ActorRef)
@SerialVersionUID(1L) @SerialVersionUID(1L) case class SubscribeAck(subscribe: Subscribe)
case class Subscribe(topic: String, ref: ActorRef) @SerialVersionUID(1L) case class UnsubscribeAck(unsubscribe: Unsubscribe)
@SerialVersionUID(1L) @SerialVersionUID(1L) case class Publish(topic: String, msg: Any)
case class Unsubscribe(topic: String, ref: ActorRef) @SerialVersionUID(1L) case class Send(path: String, msg: Any, localAffinity: Boolean)
@SerialVersionUID(1L) @SerialVersionUID(1L) case class SendToAll(path: String, msg: Any, allButSelf: Boolean = false) {
case class SubscribeAck(subscribe: Subscribe) def this(path: String, msg: Any) = this(path, msg, allButSelf = false)
@SerialVersionUID(1L) }
case class UnsubscribeAck(unsubscribe: Unsubscribe)
@SerialVersionUID(1L)
case class Send(path: String, msg: Any, localAffinity: Boolean)
@SerialVersionUID(1L)
case class SendToAll(path: String, msg: Any, skipSenderNode: Boolean = false)
@SerialVersionUID(1L)
case class Publish(topic: String, msg: Any)
// Only for testing purposes, to poll/await replication // Only for testing purposes, to poll/await replication
case object Count case object Count
@ -368,12 +361,12 @@ class DistributedPubSubMediator(
sender ! count sender ! count
} }
def publish(path: String, msg: Any, skipSenderNode: Boolean = false): Unit = { def publish(path: String, msg: Any, allButSelf: Boolean = false): Unit = {
for { for {
(address, bucket) registry (address, bucket) registry
if !(allButSelf && address == selfAddress) // if we should skip sender node and current address == self address => skip
valueHolder bucket.content.get(path) valueHolder bucket.content.get(path)
ref valueHolder.ref ref valueHolder.ref
if !(skipSenderNode && address == selfAddress) // if we should skip sender node and current address == self address => skip
} ref forward msg } ref forward msg
} }

View file

@ -45,7 +45,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig {
def receive = { def receive = {
case Whisper(path, msg) mediator ! Send(path, msg, localAffinity = true) case Whisper(path, msg) mediator ! Send(path, msg, localAffinity = true)
case Talk(path, msg) mediator ! SendToAll(path, msg) case Talk(path, msg) mediator ! SendToAll(path, msg)
case TalkToOthers(path, msg) mediator ! SendToAll(path, msg, skipSenderNode = true) case TalkToOthers(path, msg) mediator ! SendToAll(path, msg, allButSelf = true)
case Shout(topic, msg) mediator ! Publish(topic, msg) case Shout(topic, msg) mediator ! Publish(topic, msg)
case msg testActor ! msg case msg testActor ! msg
} }