diff --git a/akka-contrib/src/main/resources/reference.conf b/akka-contrib/src/main/resources/reference.conf index f95819385b..4c82a8ad94 100644 --- a/akka-contrib/src/main/resources/reference.conf +++ b/akka-contrib/src/main/resources/reference.conf @@ -14,9 +14,9 @@ akka.contrib.cluster.pub-sub { # Start the mediator on members tagged with this role. # All members are used if undefined or empty. role = "" - + # The routing logic to use for 'Send' - # Possible values: random, round-robin, consistent-hashing, broadcast + # Possible values: random, round-robin, broadcast routing-logic = random # How often the DistributedPubSubMediator should send out gossip information @@ -24,11 +24,11 @@ akka.contrib.cluster.pub-sub { # Removed entries are pruned after this duration removed-time-to-live = 120s - + # Maximum number of elements to transfer in one message when synchronizing the registries. - # Next chunk will be transferred in next round of gossip. + # Next chunk will be transferred in next round of gossip. max-delta-elements = 3000 - + } # //#pub-sub-ext-config @@ -75,7 +75,7 @@ akka.contrib.cluster.client { # //#sharding-ext-config # Settings for the ClusterShardingExtension akka.contrib.cluster.sharding { - # The extension creates a top level actor with this name in top level user scope, + # The extension creates a top level actor with this name in top level user scope, # e.g. '/user/sharding' guardian-name = sharding # If the coordinator can't store state changes it will be stopped @@ -84,9 +84,9 @@ akka.contrib.cluster.sharding { # Start the coordinator singleton manager on members tagged with this role. # All members are used if undefined or empty. # ShardRegion actor is started in proxy only mode on nodes that are not tagged - # with this role. + # with this role. role = "" - # The ShardRegion retries registration and shard location requests to the + # The ShardRegion retries registration and shard location requests to the # ShardCoordinator with this interval if it does not reply. retry-interval = 2 s # Maximum number of messages that are buffered by a ShardRegion actor. diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala index 27b161cce8..5db3783d79 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala @@ -29,6 +29,7 @@ import akka.routing.RoutingLogic import akka.routing.Routee import akka.routing.ActorRefRoutee import akka.routing.Router +import akka.routing.RouterEnvelope import akka.routing.RoundRobinRoutingLogic import akka.routing.ConsistentHashingRoutingLogic import akka.routing.BroadcastRoutingLogic @@ -143,6 +144,11 @@ object DistributedPubSubMediator { @SerialVersionUID(1L) final case class SendToOneSubscriber(msg: Any) + @SerialVersionUID(1L) + final case class MediatorRouterEnvelope(msg: Any) extends RouterEnvelope { + override def message = msg + } + def roleOption(role: String): Option[String] = role match { case null | "" ⇒ None case _ ⇒ Some(role) @@ -224,9 +230,23 @@ object DistributedPubSubMediator { def business = { case SendToOneSubscriber(msg) ⇒ if (subscribers.nonEmpty) - Router(routingLogic, (subscribers map ActorRefRoutee).toVector).route(msg, sender()) + Router(routingLogic, (subscribers map ActorRefRoutee).toVector).route(wrapIfNeeded(msg), sender()) } } + + /** + * Mediator uses [[Router]] to send messages to multiple destinations, Router in general + * unwraps messages from [[RouterEnvelope]] and sends the contents to [[Routee]]s. + * + * Using mediator services should not have an undesired effect of unwrapping messages + * out of [[RouterEnvelope]]. For this reason user messages are wrapped in + * [[MediatorRouterEnvelope]] which will be unwrapped by the [[Router]] leaving original + * user message. + */ + def wrapIfNeeded: Any ⇒ Any = { + case msg: RouterEnvelope ⇒ MediatorRouterEnvelope(msg) + case msg: Any ⇒ msg + } } } @@ -313,6 +333,9 @@ class DistributedPubSubMediator( import DistributedPubSubMediator._ import DistributedPubSubMediator.Internal._ + require(!routingLogic.isInstanceOf[ConsistentHashingRoutingLogic], + "'consistent-hashing' routing logic can't be used by the pub-sub mediator") + val cluster = Cluster(context.system) import cluster.selfAddress @@ -358,20 +381,22 @@ class DistributedPubSubMediator( def receive = { case Send(path, msg, localAffinity) ⇒ - registry(selfAddress).content.get(path) match { - case Some(ValueHolder(_, Some(ref))) if localAffinity ⇒ - ref forward msg + val routees = registry(selfAddress).content.get(path) match { + case Some(valueHolder) if localAffinity ⇒ + (for { + routee ← valueHolder.routee + } yield routee).toVector case _ ⇒ - val routees = (for { + (for { (_, bucket) ← registry valueHolder ← bucket.content.get(path) routee ← valueHolder.routee } yield routee).toVector - - if (routees.nonEmpty) - Router(routingLogic, routees).route(msg, sender()) } + if (routees.nonEmpty) + Router(routingLogic, routees).route(wrapIfNeeded(msg), sender()) + case SendToAll(path, msg, skipSenderNode) ⇒ publish(path, msg, skipSenderNode) @@ -622,7 +647,7 @@ class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension val routingLogic = config.getString("routing-logic") match { case "random" ⇒ RandomRoutingLogic() case "round-robin" ⇒ RoundRobinRoutingLogic() - case "consistent-hashing" ⇒ ConsistentHashingRoutingLogic(system) + case "consistent-hashing" ⇒ throw new IllegalArgumentException(s"'consistent-hashing' routing logic can't be used by the pub-sub mediator") case "broadcast" ⇒ BroadcastRoutingLogic() case other ⇒ throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]") } diff --git a/akka-contrib/src/test/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala b/akka-contrib/src/test/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala new file mode 100644 index 0000000000..7cef15e7ed --- /dev/null +++ b/akka-contrib/src/test/scala/akka/contrib/pattern/DistributedPubSubMediatorSpec.scala @@ -0,0 +1,120 @@ +package akka.contrib.pattern + +import akka.testkit._ +import akka.routing.{ ConsistentHashingRoutingLogic, RouterEnvelope } +import org.scalatest.WordSpecLike +import akka.actor.{ ActorInitializationException, ActorRef } + +case class WrappedMessage(msg: String) extends RouterEnvelope { + override def message = msg +} + +case class UnwrappedMessage(msg: String) + +object DistributedPubSubMediatorSpec { + def config(routingLogic: String) = s""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.contrib.cluster.pub-sub.routing-logic = $routingLogic + """ +} + +trait DistributedPubSubMediatorSpec { this: WordSpecLike with TestKit with ImplicitSender ⇒ + def nonUnwrappingPubSub(mediator: ActorRef, testActor: ActorRef, msg: Any) { + + val path = testActor.path.toStringWithoutAddress + + "keep the RouterEnvelope when sending to a local logical path" in { + + mediator ! DistributedPubSubMediator.Put(testActor) + + mediator ! DistributedPubSubMediator.Send(path, msg, localAffinity = true) + expectMsg(msg) + + mediator ! DistributedPubSubMediator.Remove(path) + } + + "keep the RouterEnvelope when sending to a logical path" in { + + mediator ! DistributedPubSubMediator.Put(testActor) + + mediator ! DistributedPubSubMediator.Send(path, msg, localAffinity = false) + expectMsg(msg) + + mediator ! DistributedPubSubMediator.Remove(path) + } + + "keep the RouterEnvelope when sending to all actors on a logical path" in { + + mediator ! DistributedPubSubMediator.Put(testActor) + + mediator ! DistributedPubSubMediator.SendToAll(path, msg) + expectMsg(msg) // SendToAll does not use provided RoutingLogic + + mediator ! DistributedPubSubMediator.Remove(path) + } + + "keep the RouterEnvelope when sending to a topic" in { + + mediator ! DistributedPubSubMediator.Subscribe("topic", testActor) + expectMsgClass(classOf[DistributedPubSubMediator.SubscribeAck]) + + mediator ! DistributedPubSubMediator.Publish("topic", msg) + expectMsg(msg) // Publish(... sendOneMessageToEachGroup = false) does not use provided RoutingLogic + + mediator ! DistributedPubSubMediator.Unsubscribe("topic", testActor) + expectMsgClass(classOf[DistributedPubSubMediator.UnsubscribeAck]) + } + + "keep the RouterEnvelope when sending to a topic for a group" in { + + mediator ! DistributedPubSubMediator.Subscribe("topic", Some("group"), testActor) + expectMsgClass(classOf[DistributedPubSubMediator.SubscribeAck]) + + mediator ! DistributedPubSubMediator.Publish("topic", msg, sendOneMessageToEachGroup = true) + expectMsg(msg) + + mediator ! DistributedPubSubMediator.Unsubscribe("topic", testActor) + expectMsgClass(classOf[DistributedPubSubMediator.UnsubscribeAck]) + } + } +} + +class DistributedPubSubMediatorWithRandomRouterSpec + extends AkkaSpec(DistributedPubSubMediatorSpec.config("random")) + with DistributedPubSubMediatorSpec with DefaultTimeout with ImplicitSender { + + val mediator = DistributedPubSubExtension(system).mediator + + "DistributedPubSubMediator when sending wrapped message" must { + val msg = WrappedMessage("hello") + behave like nonUnwrappingPubSub(mediator, testActor, msg) + } + + "DistributedPubSubMediator when sending unwrapped message" must { + val msg = UnwrappedMessage("hello") + behave like nonUnwrappingPubSub(mediator, testActor, msg) + } +} + +class DistributedPubSubMediatorWithHashRouterSpec + extends AkkaSpec(DistributedPubSubMediatorSpec.config("consistent-hashing")) + with DistributedPubSubMediatorSpec with DefaultTimeout with ImplicitSender { + + "DistributedPubSubMediator with Consistent Hash router" must { + "not be allowed" when { + "constructed by extension" in { + intercept[IllegalArgumentException] { + DistributedPubSubExtension(system).mediator + } + } + "constructed by props" in { + EventFilter[ActorInitializationException](occurrences = 1) intercept { + system.actorOf( + DistributedPubSubMediator.props(None, routingLogic = ConsistentHashingRoutingLogic(system))) + } + } + } + } +}