From 3f12ef262f975a532d443e10a8b54e1bcc4a249b Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Fri, 5 Sep 2014 14:15:46 +0200 Subject: [PATCH] !clu #15042 useRole restriction on local node is now respected This is an API breaking change if someone implemented their own Routers. The change is required because the router must know if the local routees should be started or not so it has to check the roles of the cluster member (the local one). We could delay this decision of starting local routees, but that would allow messages to be dead-letter-ed (bad). --- .../main/scala/akka/routing/Balancing.scala | 7 +- .../main/scala/akka/routing/Broadcast.scala | 4 +- .../akka/routing/ConsistentHashing.scala | 5 +- .../src/main/scala/akka/routing/Random.scala | 4 +- .../main/scala/akka/routing/RoundRobin.scala | 4 +- .../scala/akka/routing/RoutedActorCell.scala | 21 +- .../scala/akka/routing/RouterConfig.scala | 5 +- .../routing/ScatterGatherFirstCompleted.scala | 4 +- .../scala/akka/routing/SmallestMailbox.scala | 4 +- .../scala/akka/routing/TailChopping.scala | 4 +- .../cluster/ClusterActorRefProvider.scala | 36 +-- .../routing/AdaptiveLoadBalancing.scala | 4 +- .../cluster/routing/ClusterRouterConfig.scala | 80 ++++--- .../cluster/routing/UseRoleIgnoredSpec.scala | 210 ++++++++++++++++++ .../project/migration-guide-2.3.x-2.4.x.rst | 9 + .../remote/routing/RemoteRouterConfig.scala | 32 +-- 16 files changed, 319 insertions(+), 114 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala diff --git a/akka-actor/src/main/scala/akka/routing/Balancing.scala b/akka-actor/src/main/scala/akka/routing/Balancing.scala index bc4fd0372c..74dc885265 100644 --- a/akka-actor/src/main/scala/akka/routing/Balancing.scala +++ b/akka-actor/src/main/scala/akka/routing/Balancing.scala @@ -66,14 +66,13 @@ private[akka] final class BalancingRoutingLogic extends RoutingLogic { */ @SerialVersionUID(1L) final case class BalancingPool( - override val nrOfInstances: Int, + nrOfInstances: Int, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends Pool { def this(config: Config) = - this( - nrOfInstances = config.getInt("nr-of-instances")) + this(nrOfInstances = config.getInt("nr-of-instances")) /** * Java API @@ -94,6 +93,8 @@ final case class BalancingPool( */ def withDispatcher(dispatcherId: String): BalancingPool = copy(routerDispatcher = dispatcherId) + def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * INTERNAL API */ diff --git a/akka-actor/src/main/scala/akka/routing/Broadcast.scala b/akka-actor/src/main/scala/akka/routing/Broadcast.scala index 3e0ba27418..d0ab00a2d3 100644 --- a/akka-actor/src/main/scala/akka/routing/Broadcast.scala +++ b/akka-actor/src/main/scala/akka/routing/Broadcast.scala @@ -58,7 +58,7 @@ final class BroadcastRoutingLogic extends RoutingLogic { */ @SerialVersionUID(1L) final case class BroadcastPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val usePoolDispatcher: Boolean = false) @@ -78,6 +78,8 @@ final case class BroadcastPool( override def createRouter(system: ActorSystem): Router = new Router(BroadcastRoutingLogic()) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala index 846362184a..4a70b5fb91 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala @@ -260,7 +260,8 @@ final case class ConsistentHashingRoutingLogic( */ @SerialVersionUID(1L) final case class ConsistentHashingPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, + override val resizer: Option[Resizer] = None, val virtualNodesFactor: Int = 0, val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, @@ -283,6 +284,8 @@ final case class ConsistentHashingPool( override def createRouter(system: ActorSystem): Router = new Router(ConsistentHashingRoutingLogic(system, virtualNodesFactor, hashMapping)) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/Random.scala b/akka-actor/src/main/scala/akka/routing/Random.scala index bfdf1ff288..9056000c05 100644 --- a/akka-actor/src/main/scala/akka/routing/Random.scala +++ b/akka-actor/src/main/scala/akka/routing/Random.scala @@ -59,7 +59,7 @@ final class RandomRoutingLogic extends RoutingLogic { */ @SerialVersionUID(1L) final case class RandomPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val usePoolDispatcher: Boolean = false) @@ -79,6 +79,8 @@ final case class RandomPool( override def createRouter(system: ActorSystem): Router = new Router(RandomRoutingLogic()) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala index 52a68739cc..4b0d0a99f6 100644 --- a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala +++ b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala @@ -64,7 +64,7 @@ final class RoundRobinRoutingLogic extends RoutingLogic { */ @SerialVersionUID(1L) final case class RoundRobinPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val usePoolDispatcher: Boolean = false) @@ -83,6 +83,8 @@ final case class RoundRobinPool( override def createRouter(system: ActorSystem): Router = new Router(RoundRobinRoutingLogic()) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala b/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala index f62c03f03a..7b06b2b489 100644 --- a/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala +++ b/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala @@ -3,26 +3,22 @@ */ package akka.routing -import scala.collection.immutable -import scala.concurrent.duration._ import akka.actor.Actor import akka.actor.ActorCell import akka.actor.ActorInitializationException +import akka.actor.ActorRef import akka.actor.ActorSystemImpl -import akka.actor.AutoReceivedMessage import akka.actor.IndirectActorProducer import akka.actor.InternalActorRef +import akka.actor.PoisonPill import akka.actor.Props +import akka.actor.SupervisorStrategy import akka.actor.Terminated import akka.dispatch.Envelope import akka.dispatch.MessageDispatcher -import akka.actor.ActorContext -import akka.actor.PoisonPill -import akka.actor.SupervisorStrategy -import akka.actor.ActorRef -import akka.actor.ReceiveTimeout -import akka.actor.Identify -import akka.actor.ActorIdentity + +import scala.collection.immutable +import scala.concurrent.duration._ /** * INTERNAL API @@ -106,8 +102,9 @@ private[akka] class RoutedActorCell( _router = routerConfig.createRouter(system) routerConfig match { case pool: Pool ⇒ - if (pool.nrOfInstances > 0) - addRoutees(Vector.fill(pool.nrOfInstances)(pool.newRoutee(routeeProps, this))) + val nrOfRoutees = pool.nrOfInstances(system) + if (nrOfRoutees > 0) + addRoutees(Vector.fill(nrOfRoutees)(pool.newRoutee(routeeProps, this))) case group: Group ⇒ val paths = group.paths if (paths.nonEmpty) diff --git a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala index ae6895d77e..e3cb6235c1 100644 --- a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala +++ b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala @@ -176,10 +176,11 @@ abstract class PoolBase extends Pool * them from the router if they terminate. */ trait Pool extends RouterConfig { + /** * Initial number of routee instances */ - def nrOfInstances: Int + def nrOfInstances(sys: ActorSystem): Int /** * Use a dedicated dispatcher for the routees of the pool. @@ -315,7 +316,7 @@ class FromConfig(override val resizer: Option[Resizer], def withDispatcher(dispatcherId: String): FromConfig = new FromConfig(resizer, supervisorStrategy, dispatcherId) - override val nrOfInstances: Int = 0 + override def nrOfInstances(sys: ActorSystem): Int = 0 /** * [[akka.actor.Props]] for a group router based on the settings defined by diff --git a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala index fb5dbcd41d..1d409a92fe 100644 --- a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala +++ b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala @@ -93,7 +93,7 @@ private[akka] final case class ScatterGatherFirstCompletedRoutees( */ @SerialVersionUID(1L) final case class ScatterGatherFirstCompletedPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, override val resizer: Option[Resizer] = None, within: FiniteDuration, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, @@ -117,6 +117,8 @@ final case class ScatterGatherFirstCompletedPool( override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within)) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala b/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala index e8b3a39b9f..53b3aa027a 100644 --- a/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala +++ b/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala @@ -174,7 +174,7 @@ class SmallestMailboxRoutingLogic extends RoutingLogic { */ @SerialVersionUID(1L) final case class SmallestMailboxPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val usePoolDispatcher: Boolean = false) @@ -194,6 +194,8 @@ final case class SmallestMailboxPool( override def createRouter(system: ActorSystem): Router = new Router(SmallestMailboxRoutingLogic()) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/TailChopping.scala b/akka-actor/src/main/scala/akka/routing/TailChopping.scala index 4d51bdf90f..379bd219a0 100644 --- a/akka-actor/src/main/scala/akka/routing/TailChopping.scala +++ b/akka-actor/src/main/scala/akka/routing/TailChopping.scala @@ -120,7 +120,7 @@ private[akka] final case class TailChoppingRoutees( */ @SerialVersionUID(1L) final case class TailChoppingPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, override val resizer: Option[Resizer] = None, within: FiniteDuration, interval: FiniteDuration, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, @@ -150,6 +150,8 @@ final case class TailChoppingPool( new Router(TailChoppingRoutingLogic(system.scheduler, within, interval, system.dispatchers.lookup(routerDispatcher))) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 0bdfd9eb22..89966e2de9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -3,40 +3,14 @@ */ package akka.cluster -import com.typesafe.config.Config import akka.ConfigurationException -import akka.actor.ActorSystem -import akka.actor.ActorSystemImpl -import akka.actor.Deploy -import akka.actor.DynamicAccess -import akka.actor.InternalActorRef -import akka.actor.NoScopeGiven -import akka.actor.Scheduler -import akka.actor.Scope -import akka.actor.Terminated -import akka.dispatch.sysmsg.DeathWatchNotification +import akka.actor.{ ActorRef, ActorSystem, ActorSystemImpl, Deploy, DynamicAccess, NoScopeGiven, Scope } +import akka.cluster.routing.{ ClusterRouterGroup, ClusterRouterGroupSettings, ClusterRouterPool, ClusterRouterPoolSettings } import akka.event.EventStream -import akka.japi.Util.immutableSeq -import akka.remote.RemoteActorRefProvider -import akka.remote.RemoteDeployer +import akka.remote.{ RemoteActorRefProvider, RemoteDeployer } import akka.remote.routing.RemoteRouterConfig -import akka.routing.RouterConfig -import akka.routing.DefaultResizer -import akka.cluster.routing.MixMetricsSelector -import akka.cluster.routing.HeapMetricsSelector -import akka.cluster.routing.SystemLoadAverageMetricsSelector -import akka.cluster.routing.CpuMetricsSelector -import akka.cluster.routing.MetricsSelector -import akka.dispatch.sysmsg.SystemMessage -import akka.actor.ActorRef -import akka.actor.Props -import akka.routing.Pool -import akka.routing.Group -import akka.cluster.routing.ClusterRouterPool -import akka.cluster.routing.ClusterRouterGroup -import com.typesafe.config.ConfigFactory -import akka.cluster.routing.ClusterRouterPoolSettings -import akka.cluster.routing.ClusterRouterGroupSettings +import akka.routing.{ Group, Pool } +import com.typesafe.config.Config /** * INTERNAL API diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala index b0c3c6096d..38ea934e26 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala @@ -128,7 +128,7 @@ final case class AdaptiveLoadBalancingRoutingLogic(system: ActorSystem, metricsS @SerialVersionUID(1L) final case class AdaptiveLoadBalancingPool( metricsSelector: MetricsSelector = MixMetricsSelector, - override val nrOfInstances: Int = 0, + val nrOfInstances: Int = 0, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val usePoolDispatcher: Boolean = false) @@ -149,6 +149,8 @@ final case class AdaptiveLoadBalancingPool( override def resizer: Option[Resizer] = None + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + override def createRouter(system: ActorSystem): Router = new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector)) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 6ec9d27712..65ecda61fd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -3,43 +3,31 @@ */ package akka.cluster.routing -import scala.collection.immutable -import akka.routing.RouterConfig -import akka.routing.Router -import akka.actor.Props -import akka.actor.ActorContext -import akka.routing.Routee import java.util.concurrent.atomic.AtomicInteger -import akka.actor.Address -import akka.actor.ActorCell -import akka.actor.Deploy -import com.typesafe.config.ConfigFactory -import akka.routing.ActorRefRoutee -import akka.remote.RemoteScope -import akka.actor.Actor -import akka.actor.SupervisorStrategy -import akka.routing.Resizer -import akka.routing.RouterConfig -import akka.routing.Pool -import akka.routing.Group -import akka.remote.routing.RemoteRouterConfig -import akka.routing.RouterActor + +import akka.actor._ import akka.cluster.Cluster import akka.cluster.ClusterEvent._ -import akka.actor.ActorRef import akka.cluster.Member -import scala.annotation.tailrec -import akka.actor.RootActorPath import akka.cluster.MemberStatus -import akka.routing.ActorSelectionRoutee -import akka.actor.ActorInitializationException -import akka.routing.RouterPoolActor -import akka.actor.ActorSystem -import akka.actor.ActorSystem -import akka.routing.RoutingLogic -import akka.actor.RelativeActorPath -import com.typesafe.config.Config import akka.japi.Util.immutableSeq +import akka.remote.RemoteScope +import akka.routing.ActorRefRoutee +import akka.routing.ActorSelectionRoutee +import akka.routing.Group +import akka.routing.Pool +import akka.routing.Resizer +import akka.routing.Routee +import akka.routing.Router +import akka.routing.RouterActor +import akka.routing.RouterConfig +import akka.routing.RouterPoolActor +import akka.routing.RoutingLogic +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +import scala.annotation.tailrec +import scala.collection.immutable object ClusterRouterGroupSettings { def fromConfig(config: Config): ClusterRouterGroupSettings = @@ -127,7 +115,8 @@ private[akka] trait ClusterRouterSettingsBase { def allowLocalRoutees: Boolean def useRole: Option[String] - if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0") + require(useRole.isEmpty || useRole.get.nonEmpty, "useRole must be either None or non-empty Some wrapped role") + require(totalInstances > 0, "totalInstances of cluster router must be > 0") } /** @@ -183,7 +172,14 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti /** * Initial number of routee instances */ - override def nrOfInstances: Int = if (settings.allowLocalRoutees) settings.maxInstancesPerNode else 0 + override def nrOfInstances(sys: ActorSystem): Int = + if (settings.allowLocalRoutees && settings.useRole.isDefined) { + if (Cluster(sys).selfRoles.contains(settings.useRole.get)) { + settings.maxInstancesPerNode + } else 0 + } else if (settings.allowLocalRoutees && settings.useRole.isEmpty) { + settings.maxInstancesPerNode + } else 0 override def resizer: Option[Resizer] = local.resizer @@ -195,8 +191,8 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy override def withFallback(other: RouterConfig): RouterConfig = other match { - case ClusterRouterPool(_: ClusterRouterPool, _) ⇒ throw new IllegalStateException( - "ClusterRouterPool is not allowed to wrap a ClusterRouterPool") + case ClusterRouterPool(_: ClusterRouterPool, _) ⇒ + throw new IllegalStateException("ClusterRouterPool is not allowed to wrap a ClusterRouterPool") case ClusterRouterPool(otherLocal, _) ⇒ copy(local = this.local.withFallback(otherLocal).asInstanceOf[Pool]) case _ ⇒ @@ -321,6 +317,7 @@ private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSett } else { // find the node with least routees val unusedNodes = currentNodes filterNot usedRouteePaths.contains + if (unusedNodes.nonEmpty) { Some((unusedNodes.head, settings.routeesPaths.head)) } else { @@ -359,7 +356,7 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ override def postStop(): Unit = cluster.unsubscribe(self) var nodes: immutable.SortedSet[Address] = { - import Member.addressOrdering + import akka.cluster.Member.addressOrdering cluster.readView.members.collect { case m if isAvailable(m) ⇒ m.address } @@ -376,13 +373,12 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ } def availableNodes: immutable.SortedSet[Address] = { - import Member.addressOrdering - val currentNodes = nodes - if (currentNodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles)) - //use my own node, cluster information not updated yet + import akka.cluster.Member.addressOrdering + if (nodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles)) + // use my own node, cluster information not updated yet immutable.SortedSet(cluster.selfAddress) else - currentNodes + nodes } /** @@ -424,7 +420,7 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ def clusterReceive: Receive = { case s: CurrentClusterState ⇒ - import Member.addressOrdering + import akka.cluster.Member.addressOrdering nodes = s.members.collect { case m if isAvailable(m) ⇒ m.address } addRoutees() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala new file mode 100644 index 0000000000..77797a665f --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala @@ -0,0 +1,210 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.cluster.routing + +import akka.actor._ +import akka.cluster.MultiNodeClusterSpec +import akka.pattern.ask +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.routing.GetRoutees +import akka.routing.RoundRobinPool +import akka.routing.Routees +import akka.testkit.DefaultTimeout +import akka.testkit.ImplicitSender +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.postfixOps + +object UseRoleIgnoredMultiJvmSpec extends MultiNodeConfig { + + class SomeActor(routeeType: RouteeType) extends Actor with ActorLogging { + log.info("Starting on {}", self.path.address) + + def this() = this(PoolRoutee) + + def receive = { + case msg ⇒ + log.info("msg = {}", msg) + sender() ! Reply(routeeType, self) + } + } + + final case class Reply(routeeType: RouteeType, ref: ActorRef) + + sealed trait RouteeType extends Serializable + object PoolRoutee extends RouteeType + object GroupRoutee extends RouteeType + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false). + withFallback(MultiNodeClusterSpec.clusterConfig)) + + nodeConfig(first)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]""")) + nodeConfig(second, third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]""")) + +} + +class UseRoleIgnoredMultiJvmNode1 extends UseRoleIgnoredSpec +class UseRoleIgnoredMultiJvmNode2 extends UseRoleIgnoredSpec +class UseRoleIgnoredMultiJvmNode3 extends UseRoleIgnoredSpec + +abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with DefaultTimeout { + import akka.cluster.routing.UseRoleIgnoredMultiJvmSpec._ + + def receiveReplies(routeeType: RouteeType, expectedReplies: Int): Map[Address, Int] = { + val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) + (receiveWhile(5 seconds, messages = expectedReplies) { + case Reply(`routeeType`, ref) ⇒ fullAddress(ref) + }).foldLeft(zero) { + case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) + } + } + + /** + * Fills in self address for local ActorRef + */ + private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { + case Address(_, _, None, None) ⇒ cluster.selfAddress + case a ⇒ a + } + + def currentRoutees(router: ActorRef) = + Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees + + "A cluster" must { + "start cluster" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third) + runOn(first) { info("first, roles: " + cluster.selfRoles) } + runOn(second) { info("second, roles: " + cluster.selfRoles) } + runOn(third) { info("third, roles: " + cluster.selfRoles) } + enterBarrier("after-1") + } + + "local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("b") + + val router = system.actorOf(ClusterRouterPool( + RoundRobinPool(nrOfInstances = 6), + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = false, useRole = role)). + props(Props[SomeActor]), + "router-2") + + awaitAssert(currentRoutees(router).size should be(4)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(PoolRoutee, iterationCount) + + replies(first) should be(0) // should not be deployed locally, does not have required role + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should be(iterationCount) + } + + enterBarrier("after-2") + } + + "local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("b") + + val router = system.actorOf(ClusterRouterPool( + RoundRobinPool(nrOfInstances = 6), + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). + props(Props[SomeActor]), + "router-3") + + awaitAssert(currentRoutees(router).size should be(4)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(PoolRoutee, iterationCount) + + replies(first) should be(0) // should not be deployed locally, does not have required role + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should be(iterationCount) + } + + enterBarrier("after-3") + } + + "local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("a") + + val router = system.actorOf(ClusterRouterPool( + RoundRobinPool(nrOfInstances = 6), + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). + props(Props[SomeActor]), + "router-4") + + awaitAssert(currentRoutees(router).size should be(2)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(PoolRoutee, iterationCount) + + replies(first) should be > 0 + replies(second) should be(0) + replies(third) should be(0) + replies.values.sum should be(iterationCount) + } + + enterBarrier("after-4") + } + + "local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("c") + + val router = system.actorOf(ClusterRouterPool( + RoundRobinPool(nrOfInstances = 6), + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). + props(Props[SomeActor]), + "router-5") + + awaitAssert(currentRoutees(router).size should be(6)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(PoolRoutee, iterationCount) + + replies(first) should be > 0 + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should be(iterationCount) + } + + enterBarrier("after-5") + } + + } +} diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 47ac809453..2f200a3143 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -115,3 +115,12 @@ If you use ``Slf4jLogger`` you should add the following configuration:: It will filter the log events using the backend configuration (e.g. logback.xml) before they are published to the event bus. + +Pool routers nrOfInstances method now takes ActorSystem +======================================================= + +In order to make cluster routers smarter about when they can start local routees, +``nrOfInstances`` defined on ``Pool`` now takes ``ActorSystem`` as an argument. +In case you have implemented a custom Pool you will have to update the method's signature, +however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic. + diff --git a/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala index 170772287f..09f3ab7b7a 100644 --- a/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala @@ -3,25 +3,25 @@ */ package akka.remote.routing -import akka.routing.Router -import akka.actor.Props -import akka.actor.ActorContext -import akka.routing.Routee import java.util.concurrent.atomic.AtomicInteger -import akka.actor.Address + import akka.actor.ActorCell -import akka.actor.Deploy -import com.typesafe.config.ConfigFactory -import akka.routing.ActorRefRoutee -import akka.remote.RemoteScope -import akka.actor.Actor -import akka.actor.SupervisorStrategy -import akka.routing.Resizer -import akka.routing.RouterConfig -import akka.routing.Pool +import akka.actor.ActorContext import akka.actor.ActorSystem -import akka.routing.RouterActor +import akka.actor.Address +import akka.actor.Deploy +import akka.actor.Props +import akka.actor.SupervisorStrategy import akka.japi.Util.immutableSeq +import akka.remote.RemoteScope +import akka.routing.ActorRefRoutee +import akka.routing.Pool +import akka.routing.Resizer +import akka.routing.Routee +import akka.routing.Router +import akka.routing.RouterActor +import akka.routing.RouterConfig +import com.typesafe.config.ConfigFactory /** * [[akka.routing.RouterConfig]] implementation for remote deployment on defined @@ -44,7 +44,7 @@ final case class RemoteRouterConfig(local: Pool, nodes: Iterable[Address]) exten override def createRouter(system: ActorSystem): Router = local.createRouter(system) - override def nrOfInstances: Int = local.nrOfInstances + override def nrOfInstances(sys: ActorSystem): Int = local.nrOfInstances(sys) override def newRoutee(routeeProps: Props, context: ActorContext): Routee = { val name = "c" + childNameCounter.incrementAndGet