=con #15285 wrap message in RouterEnvelope before routing

PubSubMediator uses router which always unwraps RouterEnvelope messages.
However unwrapping is undesirable if user sends message in
ConsistentHashableEnvelope. Thus PubSubMediator should always wrap user
messages in RouterEnvelope which will be unwrapped by the router, leaving
user message unchanged.

Also disallow consistent hashing routing logic in pub-sub mediator.
This commit is contained in:
Martynas Mickevicius 2014-05-27 16:20:31 +02:00
parent 5f3d6029b1
commit 04d5cef3d9
3 changed files with 162 additions and 17 deletions

View file

@ -14,9 +14,9 @@ akka.contrib.cluster.pub-sub {
# Start the mediator on members tagged with this role.
# All members are used if undefined or empty.
role = ""
# The routing logic to use for 'Send'
# Possible values: random, round-robin, consistent-hashing, broadcast
# Possible values: random, round-robin, broadcast
routing-logic = random
# How often the DistributedPubSubMediator should send out gossip information
@ -24,11 +24,11 @@ akka.contrib.cluster.pub-sub {
# Removed entries are pruned after this duration
removed-time-to-live = 120s
# Maximum number of elements to transfer in one message when synchronizing the registries.
# Next chunk will be transferred in next round of gossip.
# Next chunk will be transferred in next round of gossip.
max-delta-elements = 3000
}
# //#pub-sub-ext-config
@ -75,7 +75,7 @@ akka.contrib.cluster.client {
# //#sharding-ext-config
# Settings for the ClusterShardingExtension
akka.contrib.cluster.sharding {
# The extension creates a top level actor with this name in top level user scope,
# The extension creates a top level actor with this name in top level user scope,
# e.g. '/user/sharding'
guardian-name = sharding
# If the coordinator can't store state changes it will be stopped
@ -84,9 +84,9 @@ akka.contrib.cluster.sharding {
# Start the coordinator singleton manager on members tagged with this role.
# All members are used if undefined or empty.
# ShardRegion actor is started in proxy only mode on nodes that are not tagged
# with this role.
# with this role.
role = ""
# The ShardRegion retries registration and shard location requests to the
# The ShardRegion retries registration and shard location requests to the
# ShardCoordinator with this interval if it does not reply.
retry-interval = 2 s
# Maximum number of messages that are buffered by a ShardRegion actor.

View file

@ -29,6 +29,7 @@ import akka.routing.RoutingLogic
import akka.routing.Routee
import akka.routing.ActorRefRoutee
import akka.routing.Router
import akka.routing.RouterEnvelope
import akka.routing.RoundRobinRoutingLogic
import akka.routing.ConsistentHashingRoutingLogic
import akka.routing.BroadcastRoutingLogic
@ -143,6 +144,11 @@ object DistributedPubSubMediator {
@SerialVersionUID(1L)
final case class SendToOneSubscriber(msg: Any)
@SerialVersionUID(1L)
final case class MediatorRouterEnvelope(msg: Any) extends RouterEnvelope {
override def message = msg
}
def roleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
@ -224,9 +230,23 @@ object DistributedPubSubMediator {
def business = {
case SendToOneSubscriber(msg)
if (subscribers.nonEmpty)
Router(routingLogic, (subscribers map ActorRefRoutee).toVector).route(msg, sender())
Router(routingLogic, (subscribers map ActorRefRoutee).toVector).route(wrapIfNeeded(msg), sender())
}
}
/**
* Mediator uses [[Router]] to send messages to multiple destinations, Router in general
* unwraps messages from [[RouterEnvelope]] and sends the contents to [[Routee]]s.
*
* Using mediator services should not have an undesired effect of unwrapping messages
* out of [[RouterEnvelope]]. For this reason user messages are wrapped in
* [[MediatorRouterEnvelope]] which will be unwrapped by the [[Router]] leaving original
* user message.
*/
def wrapIfNeeded: Any Any = {
case msg: RouterEnvelope MediatorRouterEnvelope(msg)
case msg: Any msg
}
}
}
@ -313,6 +333,9 @@ class DistributedPubSubMediator(
import DistributedPubSubMediator._
import DistributedPubSubMediator.Internal._
require(!routingLogic.isInstanceOf[ConsistentHashingRoutingLogic],
"'consistent-hashing' routing logic can't be used by the pub-sub mediator")
val cluster = Cluster(context.system)
import cluster.selfAddress
@ -358,20 +381,22 @@ class DistributedPubSubMediator(
def receive = {
case Send(path, msg, localAffinity)
registry(selfAddress).content.get(path) match {
case Some(ValueHolder(_, Some(ref))) if localAffinity
ref forward msg
val routees = registry(selfAddress).content.get(path) match {
case Some(valueHolder) if localAffinity
(for {
routee valueHolder.routee
} yield routee).toVector
case _
val routees = (for {
(for {
(_, bucket) registry
valueHolder bucket.content.get(path)
routee valueHolder.routee
} yield routee).toVector
if (routees.nonEmpty)
Router(routingLogic, routees).route(msg, sender())
}
if (routees.nonEmpty)
Router(routingLogic, routees).route(wrapIfNeeded(msg), sender())
case SendToAll(path, msg, skipSenderNode)
publish(path, msg, skipSenderNode)
@ -622,7 +647,7 @@ class DistributedPubSubExtension(system: ExtendedActorSystem) extends Extension
val routingLogic = config.getString("routing-logic") match {
case "random" RandomRoutingLogic()
case "round-robin" RoundRobinRoutingLogic()
case "consistent-hashing" ConsistentHashingRoutingLogic(system)
case "consistent-hashing" throw new IllegalArgumentException(s"'consistent-hashing' routing logic can't be used by the pub-sub mediator")
case "broadcast" BroadcastRoutingLogic()
case other throw new IllegalArgumentException(s"Unknown 'routing-logic': [$other]")
}

View file

@ -0,0 +1,120 @@
package akka.contrib.pattern
import akka.testkit._
import akka.routing.{ ConsistentHashingRoutingLogic, RouterEnvelope }
import org.scalatest.WordSpecLike
import akka.actor.{ ActorInitializationException, ActorRef }
case class WrappedMessage(msg: String) extends RouterEnvelope {
override def message = msg
}
case class UnwrappedMessage(msg: String)
object DistributedPubSubMediatorSpec {
def config(routingLogic: String) = s"""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.contrib.cluster.pub-sub.routing-logic = $routingLogic
"""
}
trait DistributedPubSubMediatorSpec { this: WordSpecLike with TestKit with ImplicitSender
def nonUnwrappingPubSub(mediator: ActorRef, testActor: ActorRef, msg: Any) {
val path = testActor.path.toStringWithoutAddress
"keep the RouterEnvelope when sending to a local logical path" in {
mediator ! DistributedPubSubMediator.Put(testActor)
mediator ! DistributedPubSubMediator.Send(path, msg, localAffinity = true)
expectMsg(msg)
mediator ! DistributedPubSubMediator.Remove(path)
}
"keep the RouterEnvelope when sending to a logical path" in {
mediator ! DistributedPubSubMediator.Put(testActor)
mediator ! DistributedPubSubMediator.Send(path, msg, localAffinity = false)
expectMsg(msg)
mediator ! DistributedPubSubMediator.Remove(path)
}
"keep the RouterEnvelope when sending to all actors on a logical path" in {
mediator ! DistributedPubSubMediator.Put(testActor)
mediator ! DistributedPubSubMediator.SendToAll(path, msg)
expectMsg(msg) // SendToAll does not use provided RoutingLogic
mediator ! DistributedPubSubMediator.Remove(path)
}
"keep the RouterEnvelope when sending to a topic" in {
mediator ! DistributedPubSubMediator.Subscribe("topic", testActor)
expectMsgClass(classOf[DistributedPubSubMediator.SubscribeAck])
mediator ! DistributedPubSubMediator.Publish("topic", msg)
expectMsg(msg) // Publish(... sendOneMessageToEachGroup = false) does not use provided RoutingLogic
mediator ! DistributedPubSubMediator.Unsubscribe("topic", testActor)
expectMsgClass(classOf[DistributedPubSubMediator.UnsubscribeAck])
}
"keep the RouterEnvelope when sending to a topic for a group" in {
mediator ! DistributedPubSubMediator.Subscribe("topic", Some("group"), testActor)
expectMsgClass(classOf[DistributedPubSubMediator.SubscribeAck])
mediator ! DistributedPubSubMediator.Publish("topic", msg, sendOneMessageToEachGroup = true)
expectMsg(msg)
mediator ! DistributedPubSubMediator.Unsubscribe("topic", testActor)
expectMsgClass(classOf[DistributedPubSubMediator.UnsubscribeAck])
}
}
}
class DistributedPubSubMediatorWithRandomRouterSpec
extends AkkaSpec(DistributedPubSubMediatorSpec.config("random"))
with DistributedPubSubMediatorSpec with DefaultTimeout with ImplicitSender {
val mediator = DistributedPubSubExtension(system).mediator
"DistributedPubSubMediator when sending wrapped message" must {
val msg = WrappedMessage("hello")
behave like nonUnwrappingPubSub(mediator, testActor, msg)
}
"DistributedPubSubMediator when sending unwrapped message" must {
val msg = UnwrappedMessage("hello")
behave like nonUnwrappingPubSub(mediator, testActor, msg)
}
}
class DistributedPubSubMediatorWithHashRouterSpec
extends AkkaSpec(DistributedPubSubMediatorSpec.config("consistent-hashing"))
with DistributedPubSubMediatorSpec with DefaultTimeout with ImplicitSender {
"DistributedPubSubMediator with Consistent Hash router" must {
"not be allowed" when {
"constructed by extension" in {
intercept[IllegalArgumentException] {
DistributedPubSubExtension(system).mediator
}
}
"constructed by props" in {
EventFilter[ActorInitializationException](occurrences = 1) intercept {
system.actorOf(
DistributedPubSubMediator.props(None, routingLogic = ConsistentHashingRoutingLogic(system)))
}
}
}
}
}