From 5d524872c7e4ae9458b0aa58aa6b65a817841d4a Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Tue, 21 May 2013 09:05:40 +0200 Subject: [PATCH] Minor fixes after review. --- akka-contrib/docs/distributed-pub-sub.rst | 4 +-- .../pattern/DistributedPubSubMediator.scala | 33 ++++++++----------- .../DistributedPubSubMediatorSpec.scala | 2 +- 3 files changed, 16 insertions(+), 23 deletions(-) diff --git a/akka-contrib/docs/distributed-pub-sub.rst b/akka-contrib/docs/distributed-pub-sub.rst index 6394934a51..5621fc51ba 100644 --- a/akka-contrib/docs/distributed-pub-sub.rst +++ b/akka-contrib/docs/distributed-pub-sub.rst @@ -43,8 +43,8 @@ the same path, without address information, can be registered on different nodes On each node there can only be one such actor, since the path is unique within one local actor system. Typical usage of this mode is to broadcast messages to all replicas with the same path, e.g. 3 actors on different nodes that all perform the same actions, -for redundancy. You can also optionally specify a property (``skipSenderNode``) deciding -if the message should be sent to a matching path on the sender node or not. +for redundancy. You can also optionally specify a property (``allButSelf``) deciding +if the message should be sent to a matching path on the self node or not. **3. DistributedPubSubMediator.Publish** diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index e2de1b22f0..b3b23a8e66 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -52,24 +52,17 @@ object DistributedPubSubMediator { */ def defaultProps(role: String): Props = props(Internal.roleOption(role)) - @SerialVersionUID(1L) - case class Put(ref: ActorRef) - @SerialVersionUID(1L) - case class Remove(path: String) - @SerialVersionUID(1L) - case class Subscribe(topic: String, ref: ActorRef) - @SerialVersionUID(1L) - case class Unsubscribe(topic: String, ref: ActorRef) - @SerialVersionUID(1L) - case class SubscribeAck(subscribe: Subscribe) - @SerialVersionUID(1L) - case class UnsubscribeAck(unsubscribe: Unsubscribe) - @SerialVersionUID(1L) - case class Send(path: String, msg: Any, localAffinity: Boolean) - @SerialVersionUID(1L) - case class SendToAll(path: String, msg: Any, skipSenderNode: Boolean = false) - @SerialVersionUID(1L) - case class Publish(topic: String, msg: Any) + @SerialVersionUID(1L) case class Put(ref: ActorRef) + @SerialVersionUID(1L) case class Remove(path: String) + @SerialVersionUID(1L) case class Subscribe(topic: String, ref: ActorRef) + @SerialVersionUID(1L) case class Unsubscribe(topic: String, ref: ActorRef) + @SerialVersionUID(1L) case class SubscribeAck(subscribe: Subscribe) + @SerialVersionUID(1L) case class UnsubscribeAck(unsubscribe: Unsubscribe) + @SerialVersionUID(1L) case class Publish(topic: String, msg: Any) + @SerialVersionUID(1L) case class Send(path: String, msg: Any, localAffinity: Boolean) + @SerialVersionUID(1L) case class SendToAll(path: String, msg: Any, allButSelf: Boolean = false) { + def this(path: String, msg: Any) = this(path, msg, allButSelf = false) + } // Only for testing purposes, to poll/await replication case object Count @@ -368,12 +361,12 @@ class DistributedPubSubMediator( sender ! count } - def publish(path: String, msg: Any, skipSenderNode: Boolean = false): Unit = { + def publish(path: String, msg: Any, allButSelf: Boolean = false): Unit = { for { (address, bucket) ← registry + if !(allButSelf && address == selfAddress) // if we should skip sender node and current address == self address => skip valueHolder ← bucket.content.get(path) ref ← valueHolder.ref - if !(skipSenderNode && address == selfAddress) // if we should skip sender node and current address == self address => skip } ref forward msg } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala index 66fd6364c8..329a8a06f7 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala @@ -45,7 +45,7 @@ object DistributedPubSubMediatorSpec extends MultiNodeConfig { def receive = { case Whisper(path, msg) ⇒ mediator ! Send(path, msg, localAffinity = true) case Talk(path, msg) ⇒ mediator ! SendToAll(path, msg) - case TalkToOthers(path, msg) ⇒ mediator ! SendToAll(path, msg, skipSenderNode = true) + case TalkToOthers(path, msg) ⇒ mediator ! SendToAll(path, msg, allButSelf = true) case Shout(topic, msg) ⇒ mediator ! Publish(topic, msg) case msg ⇒ testActor ! msg }