!clu #15042 useRole restriction on local node is now respected

This is an API breaking change if someone implemented their own Routers.
The change is required because the router must know if the local routees
should be started or not so it has to check the roles of the cluster
member (the local one). We could delay this decision of starting local
routees, but that would allow messages to be dead-letter-ed (bad).
This commit is contained in:
Konrad 'ktoso' Malawski 2014-09-05 14:15:46 +02:00
parent f2f88d9dd7
commit 3f12ef262f
16 changed files with 319 additions and 114 deletions

View file

@ -66,14 +66,13 @@ private[akka] final class BalancingRoutingLogic extends RoutingLogic {
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class BalancingPool( final case class BalancingPool(
override val nrOfInstances: Int, nrOfInstances: Int,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
extends Pool { extends Pool {
def this(config: Config) = def this(config: Config) =
this( this(nrOfInstances = config.getInt("nr-of-instances"))
nrOfInstances = config.getInt("nr-of-instances"))
/** /**
* Java API * Java API
@ -94,6 +93,8 @@ final case class BalancingPool(
*/ */
def withDispatcher(dispatcherId: String): BalancingPool = copy(routerDispatcher = dispatcherId) def withDispatcher(dispatcherId: String): BalancingPool = copy(routerDispatcher = dispatcherId)
def nrOfInstances(sys: ActorSystem) = this.nrOfInstances
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -58,7 +58,7 @@ final class BroadcastRoutingLogic extends RoutingLogic {
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class BroadcastPool( final case class BroadcastPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, nrOfInstances: Int, override val resizer: Option[Resizer] = None,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false) override val usePoolDispatcher: Boolean = false)
@ -78,6 +78,8 @@ final case class BroadcastPool(
override def createRouter(system: ActorSystem): Router = new Router(BroadcastRoutingLogic()) override def createRouter(system: ActorSystem): Router = new Router(BroadcastRoutingLogic())
override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances
/** /**
* Setting the supervisor strategy to be used for the head Router actor. * Setting the supervisor strategy to be used for the head Router actor.
*/ */

View file

@ -260,7 +260,8 @@ final case class ConsistentHashingRoutingLogic(
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class ConsistentHashingPool( final case class ConsistentHashingPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, val nrOfInstances: Int,
override val resizer: Option[Resizer] = None,
val virtualNodesFactor: Int = 0, val virtualNodesFactor: Int = 0,
val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping, val hashMapping: ConsistentHashingRouter.ConsistentHashMapping = ConsistentHashingRouter.emptyConsistentHashMapping,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
@ -283,6 +284,8 @@ final case class ConsistentHashingPool(
override def createRouter(system: ActorSystem): Router = override def createRouter(system: ActorSystem): Router =
new Router(ConsistentHashingRoutingLogic(system, virtualNodesFactor, hashMapping)) new Router(ConsistentHashingRoutingLogic(system, virtualNodesFactor, hashMapping))
override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances
/** /**
* Setting the supervisor strategy to be used for the head Router actor. * Setting the supervisor strategy to be used for the head Router actor.
*/ */

View file

@ -59,7 +59,7 @@ final class RandomRoutingLogic extends RoutingLogic {
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class RandomPool( final case class RandomPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false) override val usePoolDispatcher: Boolean = false)
@ -79,6 +79,8 @@ final case class RandomPool(
override def createRouter(system: ActorSystem): Router = new Router(RandomRoutingLogic()) override def createRouter(system: ActorSystem): Router = new Router(RandomRoutingLogic())
override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances
/** /**
* Setting the supervisor strategy to be used for the head Router actor. * Setting the supervisor strategy to be used for the head Router actor.
*/ */

View file

@ -64,7 +64,7 @@ final class RoundRobinRoutingLogic extends RoutingLogic {
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class RoundRobinPool( final case class RoundRobinPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false) override val usePoolDispatcher: Boolean = false)
@ -83,6 +83,8 @@ final case class RoundRobinPool(
override def createRouter(system: ActorSystem): Router = new Router(RoundRobinRoutingLogic()) override def createRouter(system: ActorSystem): Router = new Router(RoundRobinRoutingLogic())
override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances
/** /**
* Setting the supervisor strategy to be used for the head Router actor. * Setting the supervisor strategy to be used for the head Router actor.
*/ */

View file

@ -3,26 +3,22 @@
*/ */
package akka.routing package akka.routing
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorCell import akka.actor.ActorCell
import akka.actor.ActorInitializationException import akka.actor.ActorInitializationException
import akka.actor.ActorRef
import akka.actor.ActorSystemImpl import akka.actor.ActorSystemImpl
import akka.actor.AutoReceivedMessage
import akka.actor.IndirectActorProducer import akka.actor.IndirectActorProducer
import akka.actor.InternalActorRef import akka.actor.InternalActorRef
import akka.actor.PoisonPill
import akka.actor.Props import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.actor.Terminated import akka.actor.Terminated
import akka.dispatch.Envelope import akka.dispatch.Envelope
import akka.dispatch.MessageDispatcher import akka.dispatch.MessageDispatcher
import akka.actor.ActorContext
import akka.actor.PoisonPill import scala.collection.immutable
import akka.actor.SupervisorStrategy import scala.concurrent.duration._
import akka.actor.ActorRef
import akka.actor.ReceiveTimeout
import akka.actor.Identify
import akka.actor.ActorIdentity
/** /**
* INTERNAL API * INTERNAL API
@ -106,8 +102,9 @@ private[akka] class RoutedActorCell(
_router = routerConfig.createRouter(system) _router = routerConfig.createRouter(system)
routerConfig match { routerConfig match {
case pool: Pool case pool: Pool
if (pool.nrOfInstances > 0) val nrOfRoutees = pool.nrOfInstances(system)
addRoutees(Vector.fill(pool.nrOfInstances)(pool.newRoutee(routeeProps, this))) if (nrOfRoutees > 0)
addRoutees(Vector.fill(nrOfRoutees)(pool.newRoutee(routeeProps, this)))
case group: Group case group: Group
val paths = group.paths val paths = group.paths
if (paths.nonEmpty) if (paths.nonEmpty)

View file

@ -176,10 +176,11 @@ abstract class PoolBase extends Pool
* them from the router if they terminate. * them from the router if they terminate.
*/ */
trait Pool extends RouterConfig { trait Pool extends RouterConfig {
/** /**
* Initial number of routee instances * Initial number of routee instances
*/ */
def nrOfInstances: Int def nrOfInstances(sys: ActorSystem): Int
/** /**
* Use a dedicated dispatcher for the routees of the pool. * Use a dedicated dispatcher for the routees of the pool.
@ -315,7 +316,7 @@ class FromConfig(override val resizer: Option[Resizer],
def withDispatcher(dispatcherId: String): FromConfig = def withDispatcher(dispatcherId: String): FromConfig =
new FromConfig(resizer, supervisorStrategy, dispatcherId) new FromConfig(resizer, supervisorStrategy, dispatcherId)
override val nrOfInstances: Int = 0 override def nrOfInstances(sys: ActorSystem): Int = 0
/** /**
* [[akka.actor.Props]] for a group router based on the settings defined by * [[akka.actor.Props]] for a group router based on the settings defined by

View file

@ -93,7 +93,7 @@ private[akka] final case class ScatterGatherFirstCompletedRoutees(
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class ScatterGatherFirstCompletedPool( final case class ScatterGatherFirstCompletedPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
within: FiniteDuration, within: FiniteDuration,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
@ -117,6 +117,8 @@ final case class ScatterGatherFirstCompletedPool(
override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within)) override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within))
override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances
/** /**
* Setting the supervisor strategy to be used for the head Router actor. * Setting the supervisor strategy to be used for the head Router actor.
*/ */

View file

@ -174,7 +174,7 @@ class SmallestMailboxRoutingLogic extends RoutingLogic {
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class SmallestMailboxPool( final case class SmallestMailboxPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false) override val usePoolDispatcher: Boolean = false)
@ -194,6 +194,8 @@ final case class SmallestMailboxPool(
override def createRouter(system: ActorSystem): Router = new Router(SmallestMailboxRoutingLogic()) override def createRouter(system: ActorSystem): Router = new Router(SmallestMailboxRoutingLogic())
override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances
/** /**
* Setting the supervisor strategy to be used for the head Router actor. * Setting the supervisor strategy to be used for the head Router actor.
*/ */

View file

@ -120,7 +120,7 @@ private[akka] final case class TailChoppingRoutees(
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class TailChoppingPool( final case class TailChoppingPool(
override val nrOfInstances: Int, override val resizer: Option[Resizer] = None, val nrOfInstances: Int, override val resizer: Option[Resizer] = None,
within: FiniteDuration, within: FiniteDuration,
interval: FiniteDuration, interval: FiniteDuration,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
@ -150,6 +150,8 @@ final case class TailChoppingPool(
new Router(TailChoppingRoutingLogic(system.scheduler, within, new Router(TailChoppingRoutingLogic(system.scheduler, within,
interval, system.dispatchers.lookup(routerDispatcher))) interval, system.dispatchers.lookup(routerDispatcher)))
override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances
/** /**
* Setting the supervisor strategy to be used for the head Router actor. * Setting the supervisor strategy to be used for the head Router actor.
*/ */

View file

@ -3,40 +3,14 @@
*/ */
package akka.cluster package akka.cluster
import com.typesafe.config.Config
import akka.ConfigurationException import akka.ConfigurationException
import akka.actor.ActorSystem import akka.actor.{ ActorRef, ActorSystem, ActorSystemImpl, Deploy, DynamicAccess, NoScopeGiven, Scope }
import akka.actor.ActorSystemImpl import akka.cluster.routing.{ ClusterRouterGroup, ClusterRouterGroupSettings, ClusterRouterPool, ClusterRouterPoolSettings }
import akka.actor.Deploy
import akka.actor.DynamicAccess
import akka.actor.InternalActorRef
import akka.actor.NoScopeGiven
import akka.actor.Scheduler
import akka.actor.Scope
import akka.actor.Terminated
import akka.dispatch.sysmsg.DeathWatchNotification
import akka.event.EventStream import akka.event.EventStream
import akka.japi.Util.immutableSeq import akka.remote.{ RemoteActorRefProvider, RemoteDeployer }
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteDeployer
import akka.remote.routing.RemoteRouterConfig import akka.remote.routing.RemoteRouterConfig
import akka.routing.RouterConfig import akka.routing.{ Group, Pool }
import akka.routing.DefaultResizer import com.typesafe.config.Config
import akka.cluster.routing.MixMetricsSelector
import akka.cluster.routing.HeapMetricsSelector
import akka.cluster.routing.SystemLoadAverageMetricsSelector
import akka.cluster.routing.CpuMetricsSelector
import akka.cluster.routing.MetricsSelector
import akka.dispatch.sysmsg.SystemMessage
import akka.actor.ActorRef
import akka.actor.Props
import akka.routing.Pool
import akka.routing.Group
import akka.cluster.routing.ClusterRouterPool
import akka.cluster.routing.ClusterRouterGroup
import com.typesafe.config.ConfigFactory
import akka.cluster.routing.ClusterRouterPoolSettings
import akka.cluster.routing.ClusterRouterGroupSettings
/** /**
* INTERNAL API * INTERNAL API

View file

@ -128,7 +128,7 @@ final case class AdaptiveLoadBalancingRoutingLogic(system: ActorSystem, metricsS
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class AdaptiveLoadBalancingPool( final case class AdaptiveLoadBalancingPool(
metricsSelector: MetricsSelector = MixMetricsSelector, metricsSelector: MetricsSelector = MixMetricsSelector,
override val nrOfInstances: Int = 0, val nrOfInstances: Int = 0,
override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy, override val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false) override val usePoolDispatcher: Boolean = false)
@ -149,6 +149,8 @@ final case class AdaptiveLoadBalancingPool(
override def resizer: Option[Resizer] = None override def resizer: Option[Resizer] = None
override def nrOfInstances(sys: ActorSystem) = this.nrOfInstances
override def createRouter(system: ActorSystem): Router = override def createRouter(system: ActorSystem): Router =
new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector)) new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector))

View file

@ -3,43 +3,31 @@
*/ */
package akka.cluster.routing package akka.cluster.routing
import scala.collection.immutable
import akka.routing.RouterConfig
import akka.routing.Router
import akka.actor.Props
import akka.actor.ActorContext
import akka.routing.Routee
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Address
import akka.actor.ActorCell import akka.actor._
import akka.actor.Deploy
import com.typesafe.config.ConfigFactory
import akka.routing.ActorRefRoutee
import akka.remote.RemoteScope
import akka.actor.Actor
import akka.actor.SupervisorStrategy
import akka.routing.Resizer
import akka.routing.RouterConfig
import akka.routing.Pool
import akka.routing.Group
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RouterActor
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
import akka.actor.ActorRef
import akka.cluster.Member import akka.cluster.Member
import scala.annotation.tailrec
import akka.actor.RootActorPath
import akka.cluster.MemberStatus import akka.cluster.MemberStatus
import akka.routing.ActorSelectionRoutee
import akka.actor.ActorInitializationException
import akka.routing.RouterPoolActor
import akka.actor.ActorSystem
import akka.actor.ActorSystem
import akka.routing.RoutingLogic
import akka.actor.RelativeActorPath
import com.typesafe.config.Config
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.remote.RemoteScope
import akka.routing.ActorRefRoutee
import akka.routing.ActorSelectionRoutee
import akka.routing.Group
import akka.routing.Pool
import akka.routing.Resizer
import akka.routing.Routee
import akka.routing.Router
import akka.routing.RouterActor
import akka.routing.RouterConfig
import akka.routing.RouterPoolActor
import akka.routing.RoutingLogic
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import scala.annotation.tailrec
import scala.collection.immutable
object ClusterRouterGroupSettings { object ClusterRouterGroupSettings {
def fromConfig(config: Config): ClusterRouterGroupSettings = def fromConfig(config: Config): ClusterRouterGroupSettings =
@ -127,7 +115,8 @@ private[akka] trait ClusterRouterSettingsBase {
def allowLocalRoutees: Boolean def allowLocalRoutees: Boolean
def useRole: Option[String] def useRole: Option[String]
if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0") require(useRole.isEmpty || useRole.get.nonEmpty, "useRole must be either None or non-empty Some wrapped role")
require(totalInstances > 0, "totalInstances of cluster router must be > 0")
} }
/** /**
@ -183,7 +172,14 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti
/** /**
* Initial number of routee instances * Initial number of routee instances
*/ */
override def nrOfInstances: Int = if (settings.allowLocalRoutees) settings.maxInstancesPerNode else 0 override def nrOfInstances(sys: ActorSystem): Int =
if (settings.allowLocalRoutees && settings.useRole.isDefined) {
if (Cluster(sys).selfRoles.contains(settings.useRole.get)) {
settings.maxInstancesPerNode
} else 0
} else if (settings.allowLocalRoutees && settings.useRole.isEmpty) {
settings.maxInstancesPerNode
} else 0
override def resizer: Option[Resizer] = local.resizer override def resizer: Option[Resizer] = local.resizer
@ -195,8 +191,8 @@ final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSetti
override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy
override def withFallback(other: RouterConfig): RouterConfig = other match { override def withFallback(other: RouterConfig): RouterConfig = other match {
case ClusterRouterPool(_: ClusterRouterPool, _) throw new IllegalStateException( case ClusterRouterPool(_: ClusterRouterPool, _)
"ClusterRouterPool is not allowed to wrap a ClusterRouterPool") throw new IllegalStateException("ClusterRouterPool is not allowed to wrap a ClusterRouterPool")
case ClusterRouterPool(otherLocal, _) case ClusterRouterPool(otherLocal, _)
copy(local = this.local.withFallback(otherLocal).asInstanceOf[Pool]) copy(local = this.local.withFallback(otherLocal).asInstanceOf[Pool])
case _ case _
@ -321,6 +317,7 @@ private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSett
} else { } else {
// find the node with least routees // find the node with least routees
val unusedNodes = currentNodes filterNot usedRouteePaths.contains val unusedNodes = currentNodes filterNot usedRouteePaths.contains
if (unusedNodes.nonEmpty) { if (unusedNodes.nonEmpty) {
Some((unusedNodes.head, settings.routeesPaths.head)) Some((unusedNodes.head, settings.routeesPaths.head))
} else { } else {
@ -359,7 +356,7 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒
override def postStop(): Unit = cluster.unsubscribe(self) override def postStop(): Unit = cluster.unsubscribe(self)
var nodes: immutable.SortedSet[Address] = { var nodes: immutable.SortedSet[Address] = {
import Member.addressOrdering import akka.cluster.Member.addressOrdering
cluster.readView.members.collect { cluster.readView.members.collect {
case m if isAvailable(m) m.address case m if isAvailable(m) m.address
} }
@ -376,13 +373,12 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒
} }
def availableNodes: immutable.SortedSet[Address] = { def availableNodes: immutable.SortedSet[Address] = {
import Member.addressOrdering import akka.cluster.Member.addressOrdering
val currentNodes = nodes if (nodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles))
if (currentNodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles))
// use my own node, cluster information not updated yet // use my own node, cluster information not updated yet
immutable.SortedSet(cluster.selfAddress) immutable.SortedSet(cluster.selfAddress)
else else
currentNodes nodes
} }
/** /**
@ -424,7 +420,7 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒
def clusterReceive: Receive = { def clusterReceive: Receive = {
case s: CurrentClusterState case s: CurrentClusterState
import Member.addressOrdering import akka.cluster.Member.addressOrdering
nodes = s.members.collect { case m if isAvailable(m) m.address } nodes = s.members.collect { case m if isAvailable(m) m.address }
addRoutees() addRoutees()

View file

@ -0,0 +1,210 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.routing
import akka.actor._
import akka.cluster.MultiNodeClusterSpec
import akka.pattern.ask
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.routing.GetRoutees
import akka.routing.RoundRobinPool
import akka.routing.Routees
import akka.testkit.DefaultTimeout
import akka.testkit.ImplicitSender
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
object UseRoleIgnoredMultiJvmSpec extends MultiNodeConfig {
class SomeActor(routeeType: RouteeType) extends Actor with ActorLogging {
log.info("Starting on {}", self.path.address)
def this() = this(PoolRoutee)
def receive = {
case msg
log.info("msg = {}", msg)
sender() ! Reply(routeeType, self)
}
}
final case class Reply(routeeType: RouteeType, ref: ActorRef)
sealed trait RouteeType extends Serializable
object PoolRoutee extends RouteeType
object GroupRoutee extends RouteeType
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).
withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(first)(ConfigFactory.parseString("""akka.cluster.roles =["a", "c"]"""))
nodeConfig(second, third)(ConfigFactory.parseString("""akka.cluster.roles =["b", "c"]"""))
}
class UseRoleIgnoredMultiJvmNode1 extends UseRoleIgnoredSpec
class UseRoleIgnoredMultiJvmNode2 extends UseRoleIgnoredSpec
class UseRoleIgnoredMultiJvmNode3 extends UseRoleIgnoredSpec
abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with DefaultTimeout {
import akka.cluster.routing.UseRoleIgnoredMultiJvmSpec._
def receiveReplies(routeeType: RouteeType, expectedReplies: Int): Map[Address, Int] = {
val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0)
(receiveWhile(5 seconds, messages = expectedReplies) {
case Reply(`routeeType`, ref) fullAddress(ref)
}).foldLeft(zero) {
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
}
}
/**
* Fills in self address for local ActorRef
*/
private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) cluster.selfAddress
case a a
}
def currentRoutees(router: ActorRef) =
Await.result(router ? GetRoutees, timeout.duration).asInstanceOf[Routees].routees
"A cluster" must {
"start cluster" taggedAs LongRunningTest in {
awaitClusterUp(first, second, third)
runOn(first) { info("first, roles: " + cluster.selfRoles) }
runOn(second) { info("second, roles: " + cluster.selfRoles) }
runOn(third) { info("third, roles: " + cluster.selfRoles) }
enterBarrier("after-1")
}
"local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in {
runOn(first) {
val role = Some("b")
val router = system.actorOf(ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = false, useRole = role)).
props(Props[SomeActor]),
"router-2")
awaitAssert(currentRoutees(router).size should be(4))
val iterationCount = 10
for (i 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be(0) // should not be deployed locally, does not have required role
replies(second) should be > 0
replies(third) should be > 0
replies.values.sum should be(iterationCount)
}
enterBarrier("after-2")
}
"local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in {
runOn(first) {
val role = Some("b")
val router = system.actorOf(ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)).
props(Props[SomeActor]),
"router-3")
awaitAssert(currentRoutees(router).size should be(4))
val iterationCount = 10
for (i 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be(0) // should not be deployed locally, does not have required role
replies(second) should be > 0
replies(third) should be > 0
replies.values.sum should be(iterationCount)
}
enterBarrier("after-3")
}
"local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in {
runOn(first) {
val role = Some("a")
val router = system.actorOf(ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)).
props(Props[SomeActor]),
"router-4")
awaitAssert(currentRoutees(router).size should be(2))
val iterationCount = 10
for (i 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be > 0
replies(second) should be(0)
replies(third) should be(0)
replies.values.sum should be(iterationCount)
}
enterBarrier("after-4")
}
"local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in {
runOn(first) {
val role = Some("c")
val router = system.actorOf(ClusterRouterPool(
RoundRobinPool(nrOfInstances = 6),
ClusterRouterPoolSettings(totalInstances = 6, maxInstancesPerNode = 2, allowLocalRoutees = true, useRole = role)).
props(Props[SomeActor]),
"router-5")
awaitAssert(currentRoutees(router).size should be(6))
val iterationCount = 10
for (i 0 until iterationCount) {
router ! s"hit-$i"
}
val replies = receiveReplies(PoolRoutee, iterationCount)
replies(first) should be > 0
replies(second) should be > 0
replies(third) should be > 0
replies.values.sum should be(iterationCount)
}
enterBarrier("after-5")
}
}
}

View file

@ -115,3 +115,12 @@ If you use ``Slf4jLogger`` you should add the following configuration::
It will filter the log events using the backend configuration (e.g. logback.xml) before It will filter the log events using the backend configuration (e.g. logback.xml) before
they are published to the event bus. they are published to the event bus.
Pool routers nrOfInstances method now takes ActorSystem
=======================================================
In order to make cluster routers smarter about when they can start local routees,
``nrOfInstances`` defined on ``Pool`` now takes ``ActorSystem`` as an argument.
In case you have implemented a custom Pool you will have to update the method's signature,
however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic.

View file

@ -3,25 +3,25 @@
*/ */
package akka.remote.routing package akka.remote.routing
import akka.routing.Router
import akka.actor.Props
import akka.actor.ActorContext
import akka.routing.Routee
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Address
import akka.actor.ActorCell import akka.actor.ActorCell
import akka.actor.Deploy import akka.actor.ActorContext
import com.typesafe.config.ConfigFactory
import akka.routing.ActorRefRoutee
import akka.remote.RemoteScope
import akka.actor.Actor
import akka.actor.SupervisorStrategy
import akka.routing.Resizer
import akka.routing.RouterConfig
import akka.routing.Pool
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.routing.RouterActor import akka.actor.Address
import akka.actor.Deploy
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.remote.RemoteScope
import akka.routing.ActorRefRoutee
import akka.routing.Pool
import akka.routing.Resizer
import akka.routing.Routee
import akka.routing.Router
import akka.routing.RouterActor
import akka.routing.RouterConfig
import com.typesafe.config.ConfigFactory
/** /**
* [[akka.routing.RouterConfig]] implementation for remote deployment on defined * [[akka.routing.RouterConfig]] implementation for remote deployment on defined
@ -44,7 +44,7 @@ final case class RemoteRouterConfig(local: Pool, nodes: Iterable[Address]) exten
override def createRouter(system: ActorSystem): Router = local.createRouter(system) override def createRouter(system: ActorSystem): Router = local.createRouter(system)
override def nrOfInstances: Int = local.nrOfInstances override def nrOfInstances(sys: ActorSystem): Int = local.nrOfInstances(sys)
override def newRoutee(routeeProps: Props, context: ActorContext): Routee = { override def newRoutee(routeeProps: Props, context: ActorContext): Routee = {
val name = "c" + childNameCounter.incrementAndGet val name = "c" + childNameCounter.incrementAndGet