diff --git a/akka-actor/src/main/scala/akka/routing/Balancing.scala b/akka-actor/src/main/scala/akka/routing/Balancing.scala index bc4fd0372c..74dc885265 100644 --- a/akka-actor/src/main/scala/akka/routing/Balancing.scala +++ b/akka-actor/src/main/scala/akka/routing/Balancing.scala @@ -66,14 +66,13 @@ private[akka] final class BalancingRoutingLogic extends RoutingLogic { */ @SerialVersionUID(1L) final case class BalancingPool( - override val nrOfInstances: Int, + nrOfInstances: Int, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends Pool { def this(config: Config) = - this( - nrOfInstances = config.getInt("nr-of-instances")) + this(nrOfInstances = config.getInt("nr-of-instances")) /** * Java API @@ -94,6 +93,8 @@ final case class BalancingPool( */ def withDispatcher(dispatcherId: String): BalancingPool = copy(routerDispatcher = dispatcherId) + def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * INTERNAL API */ diff --git a/akka-actor/src/main/scala/akka/routing/Broadcast.scala b/akka-actor/src/main/scala/akka/routing/Broadcast.scala index 3e0ba27418..d0ab00a2d3 100644 --- a/akka-actor/src/main/scala/akka/routing/Broadcast.scala +++ b/akka-actor/src/main/scala/akka/routing/Broadcast.scala @@ -58,7 +58,7 @@ final class BroadcastRoutingLogic extends RoutingLogic { */ @SerialVersionUID(1L) final case class BroadcastPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val usePoolDispatcher: Boolean = false) @@ -78,6 +78,8 @@ final case class BroadcastPool( override def createRouter(system: ActorSystem): Router = new Router(BroadcastRoutingLogic()) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala index 846362184a..4a70b5fb91 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala @@ -260,7 +260,8 @@ final case class ConsistentHashingRoutingLogic( */ @SerialVersionUID(1L) final case class ConsistentHashingPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, + override val resizer: Option[Resizer] = None, val virtualNodesFactor: Int = 0, val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, @@ -283,6 +284,8 @@ final case class ConsistentHashingPool( override def createRouter(system: ActorSystem): Router = new Router(ConsistentHashingRoutingLogic(system, virtualNodesFactor, hashMapping)) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/Random.scala b/akka-actor/src/main/scala/akka/routing/Random.scala index bfdf1ff288..9056000c05 100644 --- a/akka-actor/src/main/scala/akka/routing/Random.scala +++ b/akka-actor/src/main/scala/akka/routing/Random.scala @@ -59,7 +59,7 @@ final class RandomRoutingLogic extends RoutingLogic { */ @SerialVersionUID(1L) final case class RandomPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val usePoolDispatcher: Boolean = false) @@ -79,6 +79,8 @@ final case class RandomPool( override def createRouter(system: ActorSystem): Router = new Router(RandomRoutingLogic()) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala index 52a68739cc..4b0d0a99f6 100644 --- a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala +++ b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala @@ -64,7 +64,7 @@ final class RoundRobinRoutingLogic extends RoutingLogic { */ @SerialVersionUID(1L) final case class RoundRobinPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val usePoolDispatcher: Boolean = false) @@ -83,6 +83,8 @@ final case class RoundRobinPool( override def createRouter(system: ActorSystem): Router = new Router(RoundRobinRoutingLogic()) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala b/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala index f62c03f03a..7b06b2b489 100644 --- a/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala +++ b/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala @@ -3,26 +3,22 @@ */ package akka.routing -import scala.collection.immutable -import scala.concurrent.duration._ import akka.actor.Actor import akka.actor.ActorCell import akka.actor.ActorInitializationException +import akka.actor.ActorRef import akka.actor.ActorSystemImpl -import akka.actor.AutoReceivedMessage import akka.actor.IndirectActorProducer import akka.actor.InternalActorRef +import akka.actor.PoisonPill import akka.actor.Props +import akka.actor.SupervisorStrategy import akka.actor.Terminated import akka.dispatch.Envelope import akka.dispatch.MessageDispatcher -import akka.actor.ActorContext -import akka.actor.PoisonPill -import akka.actor.SupervisorStrategy -import akka.actor.ActorRef -import akka.actor.ReceiveTimeout -import akka.actor.Identify -import akka.actor.ActorIdentity + +import scala.collection.immutable +import scala.concurrent.duration._ /** * INTERNAL API @@ -106,8 +102,9 @@ private[akka] class RoutedActorCell( _router = routerConfig.createRouter(system) routerConfig match { case pool: Pool ⇒ - if (pool.nrOfInstances > 0) - addRoutees(Vector.fill(pool.nrOfInstances)(pool.newRoutee(routeeProps, this))) + val nrOfRoutees = pool.nrOfInstances(system) + if (nrOfRoutees > 0) + addRoutees(Vector.fill(nrOfRoutees)(pool.newRoutee(routeeProps, this))) case group: Group ⇒ val paths = group.paths if (paths.nonEmpty) diff --git a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala index ae6895d77e..e3cb6235c1 100644 --- a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala +++ b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala @@ -176,10 +176,11 @@ abstract class PoolBase extends Pool * them from the router if they terminate. */ trait Pool extends RouterConfig { + /** * Initial number of routee instances */ - def nrOfInstances: Int + def nrOfInstances(sys: ActorSystem): Int /** * Use a dedicated dispatcher for the routees of the pool. @@ -315,7 +316,7 @@ class FromConfig(override val resizer: Option[Resizer], def withDispatcher(dispatcherId: String): FromConfig = new FromConfig(resizer, supervisorStrategy, dispatcherId) - override val nrOfInstances: Int = 0 + override def nrOfInstances(sys: ActorSystem): Int = 0 /** * [[akka.actor.Props]] for a group router based on the settings defined by diff --git a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala index fb5dbcd41d..1d409a92fe 100644 --- a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala +++ b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala @@ -93,7 +93,7 @@ private[akka] final case class ScatterGatherFirstCompletedRoutees( */ @SerialVersionUID(1L) final case class ScatterGatherFirstCompletedPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, override val resizer: Option[Resizer] = None, within: FiniteDuration, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, @@ -117,6 +117,8 @@ final case class ScatterGatherFirstCompletedPool( override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within)) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala b/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala index e8b3a39b9f..53b3aa027a 100644 --- a/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala +++ b/akka-actor/src/main/scala/akka/routing/SmallestMailbox.scala @@ -174,7 +174,7 @@ class SmallestMailboxRoutingLogic extends RoutingLogic { */ @SerialVersionUID(1L) final case class SmallestMailboxPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, override val resizer: Option[Resizer] = None, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val usePoolDispatcher: Boolean = false) @@ -194,6 +194,8 @@ final case class SmallestMailboxPool( override def createRouter(system: ActorSystem): Router = new Router(SmallestMailboxRoutingLogic()) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-actor/src/main/scala/akka/routing/TailChopping.scala b/akka-actor/src/main/scala/akka/routing/TailChopping.scala index 4d51bdf90f..379bd219a0 100644 --- a/akka-actor/src/main/scala/akka/routing/TailChopping.scala +++ b/akka-actor/src/main/scala/akka/routing/TailChopping.scala @@ -120,7 +120,7 @@ private[akka] final case class TailChoppingRoutees( */ @SerialVersionUID(1L) final case class TailChoppingPool( - override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, + val nrOfInstances: Int, override val resizer: Option[Resizer] = None, within: FiniteDuration, interval: FiniteDuration, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, @@ -150,6 +150,8 @@ final case class TailChoppingPool( new Router(TailChoppingRoutingLogic(system.scheduler, within, interval, system.dispatchers.lookup(routerDispatcher))) + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + /** * Setting the supervisor strategy to be used for the “head” Router actor. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 0bdfd9eb22..89966e2de9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -3,40 +3,14 @@ */ package akka.cluster -import com.typesafe.config.Config import akka.ConfigurationException -import akka.actor.ActorSystem -import akka.actor.ActorSystemImpl -import akka.actor.Deploy -import akka.actor.DynamicAccess -import akka.actor.InternalActorRef -import akka.actor.NoScopeGiven -import akka.actor.Scheduler -import akka.actor.Scope -import akka.actor.Terminated -import akka.dispatch.sysmsg.DeathWatchNotification +import akka.actor.{ ActorRef, ActorSystem, ActorSystemImpl, Deploy, DynamicAccess, NoScopeGiven, Scope } +import akka.cluster.routing.{ ClusterRouterGroup, ClusterRouterGroupSettings, ClusterRouterPool, ClusterRouterPoolSettings } import akka.event.EventStream -import akka.japi.Util.immutableSeq -import akka.remote.RemoteActorRefProvider -import akka.remote.RemoteDeployer +import akka.remote.{ RemoteActorRefProvider, RemoteDeployer } import akka.remote.routing.RemoteRouterConfig -import akka.routing.RouterConfig -import akka.routing.DefaultResizer -import akka.cluster.routing.MixMetricsSelector -import akka.cluster.routing.HeapMetricsSelector -import akka.cluster.routing.SystemLoadAverageMetricsSelector -import akka.cluster.routing.CpuMetricsSelector -import akka.cluster.routing.MetricsSelector -import akka.dispatch.sysmsg.SystemMessage -import akka.actor.ActorRef -import akka.actor.Props -import akka.routing.Pool -import akka.routing.Group -import akka.cluster.routing.ClusterRouterPool -import akka.cluster.routing.ClusterRouterGroup -import com.typesafe.config.ConfigFactory -import akka.cluster.routing.ClusterRouterPoolSettings -import akka.cluster.routing.ClusterRouterGroupSettings +import akka.routing.{ Group, Pool } +import com.typesafe.config.Config /** * INTERNAL API diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala index b0c3c6096d..38ea934e26 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala @@ -128,7 +128,7 @@ final case class AdaptiveLoadBalancingRoutingLogic(system: ActorSystem, metricsS @SerialVersionUID(1L) final case class AdaptiveLoadBalancingPool( metricsSelector: MetricsSelector = MixMetricsSelector, - override val nrOfInstances: Int = 0, + val nrOfInstances: Int = 0, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val usePoolDispatcher: Boolean = false) @@ -149,6 +149,8 @@ final case class AdaptiveLoadBalancingPool( override def resizer: Option[Resizer] = None + override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances + override def createRouter(system: ActorSystem): Router = new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector)) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 6ec9d27712..65ecda61fd 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -3,43 +3,31 @@ */ package akka.cluster.routing -import scala.collection.immutable -import akka.routing.RouterConfig -import akka.routing.Router -import akka.actor.Props -import akka.actor.ActorContext -import akka.routing.Routee import java.util.concurrent.atomic.AtomicInteger -import akka.actor.Address -import akka.actor.ActorCell -import akka.actor.Deploy -import com.typesafe.config.ConfigFactory -import akka.routing.ActorRefRoutee -import akka.remote.RemoteScope -import akka.actor.Actor -import akka.actor.SupervisorStrategy -import akka.routing.Resizer -import akka.routing.RouterConfig -import akka.routing.Pool -import akka.routing.Group -import akka.remote.routing.RemoteRouterConfig -import akka.routing.RouterActor + +import akka.actor._ import akka.cluster.Cluster import akka.cluster.ClusterEvent._ -import akka.actor.ActorRef import akka.cluster.Member -import scala.annotation.tailrec -import akka.actor.RootActorPath import akka.cluster.MemberStatus -import akka.routing.ActorSelectionRoutee -import akka.actor.ActorInitializationException -import akka.routing.RouterPoolActor -import akka.actor.ActorSystem -import akka.actor.ActorSystem -import akka.routing.RoutingLogic -import akka.actor.RelativeActorPath -import com.typesafe.config.Config import akka.japi.Util.immutableSeq +import akka.remote.RemoteScope +import akka.routing.ActorRefRoutee +import akka.routing.ActorSelectionRoutee +import akka.routing.Group +import akka.routing.Pool +import akka.routing.Resizer +import akka.routing.Routee +import akka.routing.Router +import akka.routing.RouterActor +import akka.routing.RouterConfig +import akka.routing.RouterPoolActor +import akka.routing.RoutingLogic +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory + +import scala.annotation.tailrec +import scala.collection.immutable object ClusterRouterGroupSettings { def fromConfig(config: Config): ClusterRouterGroupSettings = @@ -127,7 +115,8 @@ private[akka] trait ClusterRouterSettingsBase { def allowLocalRoutees: Boolean def useRole: Option[String] - if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0") + require(useRole.isEmpty || useRole.get.nonEmpty, "useRole must be either None or non-empty Some wrapped role") + require(totalInstances > 0, "totalInstances of cluster router must be > 0") } /** @@ -183,7 +172,14 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti /** * Initial number of routee instances */ - override def nrOfInstances: Int = if (settings.allowLocalRoutees) settings.maxInstancesPerNode else 0 + override def nrOfInstances(sys: ActorSystem): Int = + if (settings.allowLocalRoutees && settings.useRole.isDefined) { + if (Cluster(sys).selfRoles.contains(settings.useRole.get)) { + settings.maxInstancesPerNode + } else 0 + } else if (settings.allowLocalRoutees && settings.useRole.isEmpty) { + settings.maxInstancesPerNode + } else 0 override def resizer: Option[Resizer] = local.resizer @@ -195,8 +191,8 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy override def withFallback(other: RouterConfig): RouterConfig = other match { - case ClusterRouterPool(_: ClusterRouterPool, _) ⇒ throw new IllegalStateException( - "ClusterRouterPool is not allowed to wrap a ClusterRouterPool") + case ClusterRouterPool(_: ClusterRouterPool, _) ⇒ + throw new IllegalStateException("ClusterRouterPool is not allowed to wrap a ClusterRouterPool") case ClusterRouterPool(otherLocal, _) ⇒ copy(local = this.local.withFallback(otherLocal).asInstanceOf[Pool]) case _ ⇒ @@ -321,6 +317,7 @@ private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSett } else { // find the node with least routees val unusedNodes = currentNodes filterNot usedRouteePaths.contains + if (unusedNodes.nonEmpty) { Some((unusedNodes.head, settings.routeesPaths.head)) } else { @@ -359,7 +356,7 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ override def postStop(): Unit = cluster.unsubscribe(self) var nodes: immutable.SortedSet[Address] = { - import Member.addressOrdering + import akka.cluster.Member.addressOrdering cluster.readView.members.collect { case m if isAvailable(m) ⇒ m.address } @@ -376,13 +373,12 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ } def availableNodes: immutable.SortedSet[Address] = { - import Member.addressOrdering - val currentNodes = nodes - if (currentNodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles)) - //use my own node, cluster information not updated yet + import akka.cluster.Member.addressOrdering + if (nodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles)) + // use my own node, cluster information not updated yet immutable.SortedSet(cluster.selfAddress) else - currentNodes + nodes } /** @@ -424,7 +420,7 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ def clusterReceive: Receive = { case s: CurrentClusterState ⇒ - import Member.addressOrdering + import akka.cluster.Member.addressOrdering nodes = s.members.collect { case m if isAvailable(m) ⇒ m.address } addRoutees() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala new file mode 100644 index 0000000000..77797a665f --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala @@ -0,0 +1,210 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.cluster.routing + +import akka.actor._ +import akka.cluster.MultiNodeClusterSpec +import akka.pattern.ask +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.routing.GetRoutees +import akka.routing.RoundRobinPool +import akka.routing.Routees +import akka.testkit.DefaultTimeout +import akka.testkit.ImplicitSender +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.postfixOps + +object UseRoleIgnoredMultiJvmSpec extends MultiNodeConfig { + + class SomeActor(routeeType: RouteeType) extends Actor with ActorLogging { + log.info("Starting on {}", self.path.address) + + def this() = this(PoolRoutee) + + def receive = { + case msg ⇒ + log.info("msg = {}", msg) + sender() ! Reply(routeeType, self) + } + } + + final case class Reply(routeeType: RouteeType, ref: ActorRef) + + sealed trait RouteeType extends Serializable + object PoolRoutee extends RouteeType + object GroupRoutee extends RouteeType + + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(debugConfig(on = false). + withFallback(MultiNodeClusterSpec.clusterConfig)) + + nodeConfig(first)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]""")) + nodeConfig(second, third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]""")) + +} + +class UseRoleIgnoredMultiJvmNode1 extends UseRoleIgnoredSpec +class UseRoleIgnoredMultiJvmNode2 extends UseRoleIgnoredSpec +class UseRoleIgnoredMultiJvmNode3 extends UseRoleIgnoredSpec + +abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with DefaultTimeout { + import akka.cluster.routing.UseRoleIgnoredMultiJvmSpec._ + + def receiveReplies(routeeType: RouteeType, expectedReplies: Int): Map[Address, Int] = { + val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) + (receiveWhile(5 seconds, messages = expectedReplies) { + case Reply(`routeeType`, ref) ⇒ fullAddress(ref) + }).foldLeft(zero) { + case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) + } + } + + /** + * Fills in self address for local ActorRef + */ + private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { + case Address(_, _, None, None) ⇒ cluster.selfAddress + case a ⇒ a + } + + def currentRoutees(router: ActorRef) = + Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees + + "A cluster" must { + "start cluster" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third) + runOn(first) { info("first, roles: " + cluster.selfRoles) } + runOn(second) { info("second, roles: " + cluster.selfRoles) } + runOn(third) { info("third, roles: " + cluster.selfRoles) } + enterBarrier("after-1") + } + + "local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("b") + + val router = system.actorOf(ClusterRouterPool( + RoundRobinPool(nrOfInstances = 6), + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = false, useRole = role)). + props(Props[SomeActor]), + "router-2") + + awaitAssert(currentRoutees(router).size should be(4)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(PoolRoutee, iterationCount) + + replies(first) should be(0) // should not be deployed locally, does not have required role + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should be(iterationCount) + } + + enterBarrier("after-2") + } + + "local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("b") + + val router = system.actorOf(ClusterRouterPool( + RoundRobinPool(nrOfInstances = 6), + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). + props(Props[SomeActor]), + "router-3") + + awaitAssert(currentRoutees(router).size should be(4)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(PoolRoutee, iterationCount) + + replies(first) should be(0) // should not be deployed locally, does not have required role + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should be(iterationCount) + } + + enterBarrier("after-3") + } + + "local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("a") + + val router = system.actorOf(ClusterRouterPool( + RoundRobinPool(nrOfInstances = 6), + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). + props(Props[SomeActor]), + "router-4") + + awaitAssert(currentRoutees(router).size should be(2)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(PoolRoutee, iterationCount) + + replies(first) should be > 0 + replies(second) should be(0) + replies(third) should be(0) + replies.values.sum should be(iterationCount) + } + + enterBarrier("after-4") + } + + "local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("c") + + val router = system.actorOf(ClusterRouterPool( + RoundRobinPool(nrOfInstances = 6), + ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)). + props(Props[SomeActor]), + "router-5") + + awaitAssert(currentRoutees(router).size should be(6)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(PoolRoutee, iterationCount) + + replies(first) should be > 0 + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should be(iterationCount) + } + + enterBarrier("after-5") + } + + } +} diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 47ac809453..2f200a3143 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -115,3 +115,12 @@ If you use ``Slf4jLogger`` you should add the following configuration:: It will filter the log events using the backend configuration (e.g. logback.xml) before they are published to the event bus. + +Pool routers nrOfInstances method now takes ActorSystem +======================================================= + +In order to make cluster routers smarter about when they can start local routees, +``nrOfInstances`` defined on ``Pool`` now takes ``ActorSystem`` as an argument. +In case you have implemented a custom Pool you will have to update the method's signature, +however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic. + diff --git a/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala index 170772287f..09f3ab7b7a 100644 --- a/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/remote/routing/RemoteRouterConfig.scala @@ -3,25 +3,25 @@ */ package akka.remote.routing -import akka.routing.Router -import akka.actor.Props -import akka.actor.ActorContext -import akka.routing.Routee import java.util.concurrent.atomic.AtomicInteger -import akka.actor.Address + import akka.actor.ActorCell -import akka.actor.Deploy -import com.typesafe.config.ConfigFactory -import akka.routing.ActorRefRoutee -import akka.remote.RemoteScope -import akka.actor.Actor -import akka.actor.SupervisorStrategy -import akka.routing.Resizer -import akka.routing.RouterConfig -import akka.routing.Pool +import akka.actor.ActorContext import akka.actor.ActorSystem -import akka.routing.RouterActor +import akka.actor.Address +import akka.actor.Deploy +import akka.actor.Props +import akka.actor.SupervisorStrategy import akka.japi.Util.immutableSeq +import akka.remote.RemoteScope +import akka.routing.ActorRefRoutee +import akka.routing.Pool +import akka.routing.Resizer +import akka.routing.Routee +import akka.routing.Router +import akka.routing.RouterActor +import akka.routing.RouterConfig +import com.typesafe.config.ConfigFactory /** * [[akka.routing.RouterConfig]] implementation for remote deployment on defined @@ -44,7 +44,7 @@ final case class RemoteRouterConfig(local: Pool, nodes: Iterable[Address]) exten override def createRouter(system: ActorSystem): Router = local.createRouter(system) - override def nrOfInstances: Int = local.nrOfInstances + override def nrOfInstances(sys: ActorSystem): Int = local.nrOfInstances(sys) override def newRoutee(routeeProps: Props, context: ActorContext): Routee = { val name = "c" + childNameCounter.incrementAndGet