Add deploy-on-own-node setting for cluster router, see #2103

* Useful for master-worker scenario where all routees are remote.
This commit is contained in:
Patrik Nordwall 2012-09-07 12:07:41 +02:00
parent f1f145ab43
commit d552e06a07
7 changed files with 94 additions and 46 deletions

View file

@ -71,9 +71,6 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
r r
} }
if (routerConfig.resizer.isEmpty && _routees.isEmpty)
throw ActorInitializationException("router " + routerConfig + " did not register routees!")
start(sendSupervise = false, _uid) start(sendSupervise = false, _uid)
/* /*

View file

@ -121,5 +121,10 @@ akka {
# 25 members. # 25 members.
max-nr-of-instances-per-node = 1 max-nr-of-instances-per-node = 1
# Defines if routees are to be deployed on the same node as the head router
# actor, or only on remote nodes.
# Useful for master-worker scenario where all routees are remote.
deploy-on-own-node = on
} }
} }

View file

@ -4,7 +4,6 @@
package akka.cluster package akka.cluster
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.ConfigurationException import akka.ConfigurationException
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.Deploy import akka.actor.Deploy
@ -17,6 +16,7 @@ import akka.event.EventStream
import akka.remote.RemoteActorRefProvider import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteDeployer import akka.remote.RemoteDeployer
import akka.routing.RemoteRouterConfig import akka.routing.RemoteRouterConfig
import akka.cluster.routing.ClusterRouterSettings
class ClusterActorRefProvider( class ClusterActorRefProvider(
_systemName: String, _systemName: String,
@ -40,11 +40,13 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami
if (deploy.routerConfig.isInstanceOf[RemoteRouterConfig]) if (deploy.routerConfig.isInstanceOf[RemoteRouterConfig])
throw new ConfigurationException("Cluster deployment can't be combined with [%s]".format(deploy.routerConfig)) throw new ConfigurationException("Cluster deployment can't be combined with [%s]".format(deploy.routerConfig))
val totalInstances = deploy.config.getInt("nr-of-instances") val clusterRouterSettings = ClusterRouterSettings(
val maxInstancesPerNode = deploy.config.getInt("cluster.max-nr-of-instances-per-node") totalInstances = deploy.config.getInt("nr-of-instances"),
maxInstancesPerNode = deploy.config.getInt("cluster.max-nr-of-instances-per-node"),
deployOnOwnNode = deploy.config.getBoolean("cluster.deploy-on-own-node"))
Some(deploy.copy( Some(deploy.copy(
routerConfig = ClusterRouterConfig(deploy.routerConfig, totalInstances, maxInstancesPerNode), routerConfig = ClusterRouterConfig(deploy.routerConfig, clusterRouterSettings), scope = ClusterScope))
scope = ClusterScope))
} else d } else d
case None None case None None
} }

View file

@ -17,7 +17,7 @@ package object routing {
* [[[ * [[[
* import akka.cluster.routing.ClusterRouterProps * import akka.cluster.routing.ClusterRouterProps
* context.actorOf(Props[SomeActor].withClusterRouter(RoundRobinRouter(), * context.actorOf(Props[SomeActor].withClusterRouter(RoundRobinRouter(),
* totalInstances = 10, maxInstancesPerNode = 2), "myrouter") * totalInstances = 10, maxInstancesPerNode = 2, deployOnOwnNode = true), "myrouter")
* ]]] * ]]]
* *
* Corresponding for Java API is found in [[akka.cluster.routing.ClusterRouterPropsDecorator]]. * Corresponding for Java API is found in [[akka.cluster.routing.ClusterRouterPropsDecorator]].
@ -28,13 +28,15 @@ package object routing {
* Without this helper it would look as ugly as: * Without this helper it would look as ugly as:
* val router = RoundRobinRouter(nrOfInstances = 10) * val router = RoundRobinRouter(nrOfInstances = 10)
* val actor = system.actorOf(Props[SomeActor].withRouter(router).withDeploy( * val actor = system.actorOf(Props[SomeActor].withRouter(router).withDeploy(
* Deploy(routerConfig = ClusterRouterConfig(router, totalInstances = router.nrOfInstances, maxInstancesPerNode = 2))), * Deploy(routerConfig = ClusterRouterConfig(router, totalInstances = router.nrOfInstances, maxInstancesPerNode = 2,
* "myrouter") * deployOnOwnNode = true))), "myrouter")
*/ */
def withClusterRouter(router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int): Props = { def withClusterRouter(router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int,
deployOnOwnNode: Boolean = true): Props = {
props.withRouter(router).withDeploy( props.withRouter(router).withDeploy(
Deploy(routerConfig = ClusterRouterConfig(router, totalInstances, maxInstancesPerNode))) Deploy(routerConfig = ClusterRouterConfig(router,
ClusterRouterSettings(totalInstances, maxInstancesPerNode, deployOnOwnNode))))
} }
} }

View file

@ -7,7 +7,6 @@ import java.lang.IllegalStateException
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.collection.immutable.SortedSet import scala.collection.immutable.SortedSet
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.ConfigurationException import akka.ConfigurationException
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorContext import akka.actor.ActorContext
@ -29,6 +28,7 @@ import akka.routing.Route
import akka.routing.RouteeProvider import akka.routing.RouteeProvider
import akka.routing.Router import akka.routing.Router
import akka.routing.RouterConfig import akka.routing.RouterConfig
import akka.routing.RemoteRouterConfig
/** /**
* [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes.
@ -36,10 +36,10 @@ import akka.routing.RouterConfig
* which makes it possible to mix this with the built-in routers such as * which makes it possible to mix this with the built-in routers such as
* [[akka.routing.RoundRobinRouter]] or custom routers. * [[akka.routing.RoundRobinRouter]] or custom routers.
*/ */
case class ClusterRouterConfig(local: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int) extends RouterConfig { case class ClusterRouterConfig(local: RouterConfig, settings: ClusterRouterSettings) extends RouterConfig {
override def createRouteeProvider(context: ActorContext, routeeProps: Props) = override def createRouteeProvider(context: ActorContext, routeeProps: Props) =
new ClusterRouteeProvider(context, routeeProps, resizer, totalInstances, maxInstancesPerNode) new ClusterRouteeProvider(context, routeeProps, resizer, settings)
override def createRoute(routeeProvider: RouteeProvider): Route = { override def createRoute(routeeProvider: RouteeProvider): Route = {
val localRoute = local.createRoute(routeeProvider) val localRoute = local.createRoute(routeeProvider)
@ -59,11 +59,17 @@ case class ClusterRouterConfig(local: RouterConfig, totalInstances: Int, maxInst
override def resizer: Option[Resizer] = local.resizer override def resizer: Option[Resizer] = local.resizer
override def withFallback(other: RouterConfig): RouterConfig = other match { override def withFallback(other: RouterConfig): RouterConfig = other match {
case ClusterRouterConfig(local, _, _) copy(local = this.local.withFallback(local)) case ClusterRouterConfig(_: RemoteRouterConfig, _) throw new IllegalStateException(
case _ copy(local = this.local.withFallback(other)) "ClusterRouterConfig is not allowed to wrap a RemoteRouterConfig")
case ClusterRouterConfig(_: ClusterRouterConfig, _) throw new IllegalStateException(
"ClusterRouterConfig is not allowed to wrap a ClusterRouterConfig")
case ClusterRouterConfig(local, _) copy(local = this.local.withFallback(local))
case _ copy(local = this.local.withFallback(other))
} }
} }
case class ClusterRouterSettings(totalInstances: Int, maxInstancesPerNode: Int, deployOnOwnNode: Boolean)
/** /**
* INTERNAL API * INTERNAL API
* *
@ -74,8 +80,7 @@ private[akka] class ClusterRouteeProvider(
_context: ActorContext, _context: ActorContext,
_routeeProps: Props, _routeeProps: Props,
_resizer: Option[Resizer], _resizer: Option[Resizer],
totalInstances: Int, settings: ClusterRouterSettings)
maxInstancesPerNode: Int)
extends RouteeProvider(_context, _routeeProps, _resizer) { extends RouteeProvider(_context, _routeeProps, _resizer) {
// need this counter as instance variable since Resizer may call createRoutees several times // need this counter as instance variable since Resizer may call createRoutees several times
@ -96,7 +101,7 @@ private[akka] class ClusterRouteeProvider(
override def createRoutees(nrOfInstances: Int): Unit = { override def createRoutees(nrOfInstances: Int): Unit = {
val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
for (i 1 to totalInstances; target selectDeploymentTarget) { for (i 1 to settings.totalInstances; target selectDeploymentTarget) {
val name = "c" + childNameCounter.incrementAndGet val name = "c" + childNameCounter.incrementAndGet
val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target))
var ref = impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name, var ref = impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name,
@ -106,14 +111,14 @@ private[akka] class ClusterRouteeProvider(
} }
} }
private[routing] def createRoutees(): Unit = createRoutees(totalInstances) private[routing] def createRoutees(): Unit = createRoutees(settings.totalInstances)
private def selectDeploymentTarget: Option[Address] = { private def selectDeploymentTarget: Option[Address] = {
val currentRoutees = routees val currentRoutees = routees
val currentNodes = upNodes val currentNodes = availbleNodes
if (currentRoutees.size >= totalInstances) { if (currentRoutees.size >= settings.totalInstances) {
None None
} else if (currentNodes.isEmpty) { } else if (currentNodes.isEmpty && settings.deployOnOwnNode) {
// use my own node, cluster information not updated yet // use my own node, cluster information not updated yet
Some(cluster.selfAddress) Some(cluster.selfAddress)
} else { } else {
@ -124,7 +129,7 @@ private[akka] class ClusterRouteeProvider(
} }
val (address, count) = numberOfRouteesPerNode.minBy(_._2) val (address, count) = numberOfRouteesPerNode.minBy(_._2)
if (count < maxInstancesPerNode) Some(address) else None if (count < settings.maxInstancesPerNode) Some(address) else None
} }
} }
@ -140,8 +145,12 @@ private[akka] class ClusterRouteeProvider(
import Member.addressOrdering import Member.addressOrdering
@volatile @volatile
private[routing] var upNodes: SortedSet[Address] = cluster.readView.members.collect { private[routing] var availbleNodes: SortedSet[Address] = cluster.readView.members.collect {
case m if m.status == MemberStatus.Up m.address case m if isAvailble(m) m.address
}
private[routing] def isAvailble(m: Member): Boolean = {
m.status == MemberStatus.Up && (settings.deployOnOwnNode || m.address != cluster.selfAddress)
} }
} }
@ -169,11 +178,11 @@ private[akka] class ClusterRouterActor extends Router {
override def routerReceive: Receive = { override def routerReceive: Receive = {
case s: CurrentClusterState case s: CurrentClusterState
import Member.addressOrdering import Member.addressOrdering
routeeProvider.upNodes = s.members.collect { case m if m.status == MemberStatus.Up m.address } routeeProvider.availbleNodes = s.members.collect { case m if routeeProvider.isAvailble(m) m.address }
routeeProvider.createRoutees() routeeProvider.createRoutees()
case MemberUp(m) case m: MemberEvent if routeeProvider.isAvailble(m.member)
routeeProvider.upNodes += m.address routeeProvider.availbleNodes += m.member.address
// createRoutees will create routees based on // createRoutees will create routees based on
// totalInstances and maxInstancesPerNode // totalInstances and maxInstancesPerNode
routeeProvider.createRoutees() routeeProvider.createRoutees()
@ -182,7 +191,7 @@ private[akka] class ClusterRouterActor extends Router {
// other events means that it is no longer interesting, such as // other events means that it is no longer interesting, such as
// MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved // MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved
val address = other.member.address val address = other.member.address
routeeProvider.upNodes -= address routeeProvider.availbleNodes -= address
// unregister routees that live on that node // unregister routees that live on that node
val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address) val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address)
@ -200,14 +209,15 @@ private[akka] class ClusterRouterActor extends Router {
* Usage Java API: * Usage Java API:
* [[[ * [[[
* context.actorOf(ClusterRouterPropsDecorator.decorate(new Props(MyActor.class), * context.actorOf(ClusterRouterPropsDecorator.decorate(new Props(MyActor.class),
* new RoundRobinRouter(0), 10, 2), "myrouter"); * new RoundRobinRouter(0), 10, 2, true), "myrouter");
* ]]] * ]]]
* *
* Corresponding for Scala API is found in [[akka.cluster.routing.ClusterRouterProps]]. * Corresponding for Scala API is found in [[akka.cluster.routing.ClusterRouterProps]].
* *
*/ */
object ClusterRouterPropsDecorator { object ClusterRouterPropsDecorator {
def decorate(props: Props, router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int): Props = def decorate(props: Props, router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int,
props.withClusterRouter(router, totalInstances, maxInstancesPerNode) deployOnOwnNode: Boolean): Props =
props.withClusterRouter(router, totalInstances, maxInstancesPerNode, deployOnOwnNode)
} }

View file

@ -40,8 +40,19 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig {
/router1 { /router1 {
router = round-robin router = round-robin
nr-of-instances = 10 nr-of-instances = 10
cluster.enabled = on cluster {
cluster.max-nr-of-instances-per-node = 2 enabled = on
max-nr-of-instances-per-node = 2
}
}
/router3 {
router = round-robin
nr-of-instances = 10
cluster {
enabled = on
max-nr-of-instances-per-node = 1
deploy-on-own-node = off
}
} }
} }
""")). """)).
@ -63,8 +74,9 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
lazy val router2 = { lazy val router2 = {
import akka.cluster.routing.ClusterRouterProps import akka.cluster.routing.ClusterRouterProps
system.actorOf(Props[SomeActor].withClusterRouter(RoundRobinRouter(), system.actorOf(Props[SomeActor].withClusterRouter(RoundRobinRouter(),
totalInstances = 3, maxInstancesPerNode = 1), "router2") totalInstances = 3, maxInstancesPerNode = 1, deployOnOwnNode = true), "router2")
} }
lazy val router3 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router3")
def receiveReplies(expectedReplies: Int): Map[Address, Int] = { def receiveReplies(expectedReplies: Int): Map[Address, Int] = {
val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0)
@ -83,7 +95,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
case a a case a a
} }
"A cluster router configured with a RoundRobin router" must { "A cluster router with a RoundRobin router" must {
"start cluster with 2 nodes" taggedAs LongRunningTest in { "start cluster with 2 nodes" taggedAs LongRunningTest in {
awaitClusterUp(first, second) awaitClusterUp(first, second)
enterBarrier("after-1") enterBarrier("after-1")
@ -130,10 +142,28 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
enterBarrier("after-3") enterBarrier("after-3")
} }
}
"A programatically defined RoundRobin cluster router" must { "deploy routees to only remote nodes when deploy-on-own-node = off" taggedAs LongRunningTest in {
"deploy routees to the member nodes in the cluster" taggedAs LongRunningTest in {
runOn(first) {
val iterationCount = 10
for (i 0 until iterationCount) {
router3 ! "hit"
}
val replies = receiveReplies(iterationCount)
replies(first) must be(0)
replies(second) must be > (0)
replies(third) must be > (0)
replies(fourth) must be > (0)
replies.values.sum must be(iterationCount)
}
enterBarrier("after-4")
}
"deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
router2.isInstanceOf[RoutedActorRef] must be(true) router2.isInstanceOf[RoutedActorRef] must be(true)
@ -153,10 +183,10 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
replies.values.sum must be(iterationCount) replies.values.sum must be(iterationCount)
} }
enterBarrier("after-4") enterBarrier("after-5")
} }
"deploy to other node when a node becomes down" taggedAs LongRunningTest in { "deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
def currentRoutees = Await.result(router2 ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees def currentRoutees = Await.result(router2 ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees
@ -181,7 +211,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
replies.values.sum must be(iterationCount) replies.values.sum must be(iterationCount)
} }
enterBarrier("after-5") enterBarrier("after-6")
} }
} }
} }

View file

@ -8,6 +8,7 @@ import akka.actor._
import akka.routing._ import akka.routing._
import com.typesafe.config._ import com.typesafe.config._
import akka.cluster.routing.ClusterRouterConfig import akka.cluster.routing.ClusterRouterConfig
import akka.cluster.routing.ClusterRouterSettings
object ClusterDeployerSpec { object ClusterDeployerSpec {
val deployerConf = ConfigFactory.parseString(""" val deployerConf = ConfigFactory.parseString("""
@ -18,6 +19,7 @@ object ClusterDeployerSpec {
nr-of-instances = 20 nr-of-instances = 20
cluster.enabled = on cluster.enabled = on
cluster.max-nr-of-instances-per-node = 3 cluster.max-nr-of-instances-per-node = 3
cluster.deploy-on-own-node = off
} }
} }
akka.remote.netty.port = 0 akka.remote.netty.port = 0
@ -43,7 +45,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) {
Deploy( Deploy(
service, service,
deployment.get.config, deployment.get.config,
ClusterRouterConfig(RoundRobinRouter(20), 20, 3), ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings(20, 3, false)),
ClusterScope))) ClusterScope)))
} }