From d552e06a077a5be5607d97c8192c72c1e820e86d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 7 Sep 2012 12:07:41 +0200 Subject: [PATCH] Add deploy-on-own-node setting for cluster router, see #2103 * Useful for master-worker scenario where all routees are remote. --- .../src/main/scala/akka/routing/Routing.scala | 3 -- .../src/main/resources/reference.conf | 5 ++ .../cluster/ClusterActorRefProvider.scala | 12 +++-- .../src/main/scala/akka/cluster/package.scala | 12 +++-- .../cluster/routing/ClusterRouterConfig.scala | 54 +++++++++++-------- .../ClusterRoundRobinRoutedActorSpec.scala | 50 +++++++++++++---- .../akka/cluster/ClusterDeployerSpec.scala | 4 +- 7 files changed, 94 insertions(+), 46 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 4556cbc90c..249772ab77 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -71,9 +71,6 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo r } - if (routerConfig.resizer.isEmpty && _routees.isEmpty) - throw ActorInitializationException("router " + routerConfig + " did not register routees!") - start(sendSupervise = false, _uid) /* diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index d0ef3cf309..b2a7849f0e 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -121,5 +121,10 @@ akka { # 25 members. max-nr-of-instances-per-node = 1 + # Defines if routees are to be deployed on the same node as the head router + # actor, or only on remote nodes. + # Useful for master-worker scenario where all routees are remote. + deploy-on-own-node = on + } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 680bb39015..b81cdd8812 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -4,7 +4,6 @@ package akka.cluster import com.typesafe.config.Config - import akka.ConfigurationException import akka.actor.ActorSystem import akka.actor.Deploy @@ -17,6 +16,7 @@ import akka.event.EventStream import akka.remote.RemoteActorRefProvider import akka.remote.RemoteDeployer import akka.routing.RemoteRouterConfig +import akka.cluster.routing.ClusterRouterSettings class ClusterActorRefProvider( _systemName: String, @@ -40,11 +40,13 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami if (deploy.routerConfig.isInstanceOf[RemoteRouterConfig]) throw new ConfigurationException("Cluster deployment can't be combined with [%s]".format(deploy.routerConfig)) - val totalInstances = deploy.config.getInt("nr-of-instances") - val maxInstancesPerNode = deploy.config.getInt("cluster.max-nr-of-instances-per-node") + val clusterRouterSettings = ClusterRouterSettings( + totalInstances = deploy.config.getInt("nr-of-instances"), + maxInstancesPerNode = deploy.config.getInt("cluster.max-nr-of-instances-per-node"), + deployOnOwnNode = deploy.config.getBoolean("cluster.deploy-on-own-node")) + Some(deploy.copy( - routerConfig = ClusterRouterConfig(deploy.routerConfig, totalInstances, maxInstancesPerNode), - scope = ClusterScope)) + routerConfig = ClusterRouterConfig(deploy.routerConfig, clusterRouterSettings), scope = ClusterScope)) } else d case None ⇒ None } diff --git a/akka-cluster/src/main/scala/akka/cluster/package.scala b/akka-cluster/src/main/scala/akka/cluster/package.scala index 4d1931fcc2..bdd818603e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/package.scala +++ b/akka-cluster/src/main/scala/akka/cluster/package.scala @@ -17,7 +17,7 @@ package object routing { * [[[ * import akka.cluster.routing.ClusterRouterProps * context.actorOf(Props[SomeActor].withClusterRouter(RoundRobinRouter(), - * totalInstances = 10, maxInstancesPerNode = 2), "myrouter") + * totalInstances = 10, maxInstancesPerNode = 2, deployOnOwnNode = true), "myrouter") * ]]] * * Corresponding for Java API is found in [[akka.cluster.routing.ClusterRouterPropsDecorator]]. @@ -28,13 +28,15 @@ package object routing { * Without this helper it would look as ugly as: * val router = RoundRobinRouter(nrOfInstances = 10) * val actor = system.actorOf(Props[SomeActor].withRouter(router).withDeploy( - * Deploy(routerConfig = ClusterRouterConfig(router, totalInstances = router.nrOfInstances, maxInstancesPerNode = 2))), - * "myrouter") + * Deploy(routerConfig = ClusterRouterConfig(router, totalInstances = router.nrOfInstances, maxInstancesPerNode = 2, + * deployOnOwnNode = true))), "myrouter") */ - def withClusterRouter(router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int): Props = { + def withClusterRouter(router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int, + deployOnOwnNode: Boolean = true): Props = { props.withRouter(router).withDeploy( - Deploy(routerConfig = ClusterRouterConfig(router, totalInstances, maxInstancesPerNode))) + Deploy(routerConfig = ClusterRouterConfig(router, + ClusterRouterSettings(totalInstances, maxInstancesPerNode, deployOnOwnNode)))) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index eb21958b8e..befe48861e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -7,7 +7,6 @@ import java.lang.IllegalStateException import java.util.concurrent.atomic.AtomicInteger import scala.collection.immutable.SortedSet import com.typesafe.config.ConfigFactory - import akka.ConfigurationException import akka.actor.Actor import akka.actor.ActorContext @@ -29,6 +28,7 @@ import akka.routing.Route import akka.routing.RouteeProvider import akka.routing.Router import akka.routing.RouterConfig +import akka.routing.RemoteRouterConfig /** * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. @@ -36,10 +36,10 @@ import akka.routing.RouterConfig * which makes it possible to mix this with the built-in routers such as * [[akka.routing.RoundRobinRouter]] or custom routers. */ -case class ClusterRouterConfig(local: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int) extends RouterConfig { +case class ClusterRouterConfig(local: RouterConfig, settings: ClusterRouterSettings) extends RouterConfig { override def createRouteeProvider(context: ActorContext, routeeProps: Props) = - new ClusterRouteeProvider(context, routeeProps, resizer, totalInstances, maxInstancesPerNode) + new ClusterRouteeProvider(context, routeeProps, resizer, settings) override def createRoute(routeeProvider: RouteeProvider): Route = { val localRoute = local.createRoute(routeeProvider) @@ -59,11 +59,17 @@ case class ClusterRouterConfig(local: RouterConfig, totalInstances: Int, maxInst override def resizer: Option[Resizer] = local.resizer override def withFallback(other: RouterConfig): RouterConfig = other match { - case ClusterRouterConfig(local, _, _) ⇒ copy(local = this.local.withFallback(local)) - case _ ⇒ copy(local = this.local.withFallback(other)) + case ClusterRouterConfig(_: RemoteRouterConfig, _) ⇒ throw new IllegalStateException( + "ClusterRouterConfig is not allowed to wrap a RemoteRouterConfig") + case ClusterRouterConfig(_: ClusterRouterConfig, _) ⇒ throw new IllegalStateException( + "ClusterRouterConfig is not allowed to wrap a ClusterRouterConfig") + case ClusterRouterConfig(local, _) ⇒ copy(local = this.local.withFallback(local)) + case _ ⇒ copy(local = this.local.withFallback(other)) } } +case class ClusterRouterSettings(totalInstances: Int, maxInstancesPerNode: Int, deployOnOwnNode: Boolean) + /** * INTERNAL API * @@ -74,8 +80,7 @@ private[akka] class ClusterRouteeProvider( _context: ActorContext, _routeeProps: Props, _resizer: Option[Resizer], - totalInstances: Int, - maxInstancesPerNode: Int) + settings: ClusterRouterSettings) extends RouteeProvider(_context, _routeeProps, _resizer) { // need this counter as instance variable since Resizer may call createRoutees several times @@ -96,7 +101,7 @@ private[akka] class ClusterRouteeProvider( override def createRoutees(nrOfInstances: Int): Unit = { val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 - for (i ← 1 to totalInstances; target ← selectDeploymentTarget) { + for (i ← 1 to settings.totalInstances; target ← selectDeploymentTarget) { val name = "c" + childNameCounter.incrementAndGet val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) var ref = impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name, @@ -106,14 +111,14 @@ private[akka] class ClusterRouteeProvider( } } - private[routing] def createRoutees(): Unit = createRoutees(totalInstances) + private[routing] def createRoutees(): Unit = createRoutees(settings.totalInstances) private def selectDeploymentTarget: Option[Address] = { val currentRoutees = routees - val currentNodes = upNodes - if (currentRoutees.size >= totalInstances) { + val currentNodes = availbleNodes + if (currentRoutees.size >= settings.totalInstances) { None - } else if (currentNodes.isEmpty) { + } else if (currentNodes.isEmpty && settings.deployOnOwnNode) { // use my own node, cluster information not updated yet Some(cluster.selfAddress) } else { @@ -124,7 +129,7 @@ private[akka] class ClusterRouteeProvider( } val (address, count) = numberOfRouteesPerNode.minBy(_._2) - if (count < maxInstancesPerNode) Some(address) else None + if (count < settings.maxInstancesPerNode) Some(address) else None } } @@ -140,8 +145,12 @@ private[akka] class ClusterRouteeProvider( import Member.addressOrdering @volatile - private[routing] var upNodes: SortedSet[Address] = cluster.readView.members.collect { - case m if m.status == MemberStatus.Up ⇒ m.address + private[routing] var availbleNodes: SortedSet[Address] = cluster.readView.members.collect { + case m if isAvailble(m) ⇒ m.address + } + + private[routing] def isAvailble(m: Member): Boolean = { + m.status == MemberStatus.Up && (settings.deployOnOwnNode || m.address != cluster.selfAddress) } } @@ -169,11 +178,11 @@ private[akka] class ClusterRouterActor extends Router { override def routerReceive: Receive = { case s: CurrentClusterState ⇒ import Member.addressOrdering - routeeProvider.upNodes = s.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } + routeeProvider.availbleNodes = s.members.collect { case m if routeeProvider.isAvailble(m) ⇒ m.address } routeeProvider.createRoutees() - case MemberUp(m) ⇒ - routeeProvider.upNodes += m.address + case m: MemberEvent if routeeProvider.isAvailble(m.member) ⇒ + routeeProvider.availbleNodes += m.member.address // createRoutees will create routees based on // totalInstances and maxInstancesPerNode routeeProvider.createRoutees() @@ -182,7 +191,7 @@ private[akka] class ClusterRouterActor extends Router { // other events means that it is no longer interesting, such as // MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved val address = other.member.address - routeeProvider.upNodes -= address + routeeProvider.availbleNodes -= address // unregister routees that live on that node val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address) @@ -200,14 +209,15 @@ private[akka] class ClusterRouterActor extends Router { * Usage Java API: * [[[ * context.actorOf(ClusterRouterPropsDecorator.decorate(new Props(MyActor.class), - * new RoundRobinRouter(0), 10, 2), "myrouter"); + * new RoundRobinRouter(0), 10, 2, true), "myrouter"); * ]]] * * Corresponding for Scala API is found in [[akka.cluster.routing.ClusterRouterProps]]. * */ object ClusterRouterPropsDecorator { - def decorate(props: Props, router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int): Props = - props.withClusterRouter(router, totalInstances, maxInstancesPerNode) + def decorate(props: Props, router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int, + deployOnOwnNode: Boolean): Props = + props.withClusterRouter(router, totalInstances, maxInstancesPerNode, deployOnOwnNode) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index bfa50a472a..9bbd778410 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -40,8 +40,19 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { /router1 { router = round-robin nr-of-instances = 10 - cluster.enabled = on - cluster.max-nr-of-instances-per-node = 2 + cluster { + enabled = on + max-nr-of-instances-per-node = 2 + } + } + /router3 { + router = round-robin + nr-of-instances = 10 + cluster { + enabled = on + max-nr-of-instances-per-node = 1 + deploy-on-own-node = off + } } } """)). @@ -63,8 +74,9 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou lazy val router2 = { import akka.cluster.routing.ClusterRouterProps system.actorOf(Props[SomeActor].withClusterRouter(RoundRobinRouter(), - totalInstances = 3, maxInstancesPerNode = 1), "router2") + totalInstances = 3, maxInstancesPerNode = 1, deployOnOwnNode = true), "router2") } + lazy val router3 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router3") def receiveReplies(expectedReplies: Int): Map[Address, Int] = { val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) @@ -83,7 +95,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou case a ⇒ a } - "A cluster router configured with a RoundRobin router" must { + "A cluster router with a RoundRobin router" must { "start cluster with 2 nodes" taggedAs LongRunningTest in { awaitClusterUp(first, second) enterBarrier("after-1") @@ -130,10 +142,28 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou enterBarrier("after-3") } - } - "A programatically defined RoundRobin cluster router" must { - "deploy routees to the member nodes in the cluster" taggedAs LongRunningTest in { + "deploy routees to only remote nodes when deploy-on-own-node = off" taggedAs LongRunningTest in { + + runOn(first) { + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router3 ! "hit" + } + + val replies = receiveReplies(iterationCount) + + replies(first) must be(0) + replies(second) must be > (0) + replies(third) must be > (0) + replies(fourth) must be > (0) + replies.values.sum must be(iterationCount) + } + + enterBarrier("after-4") + } + + "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { runOn(first) { router2.isInstanceOf[RoutedActorRef] must be(true) @@ -153,10 +183,10 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou replies.values.sum must be(iterationCount) } - enterBarrier("after-4") + enterBarrier("after-5") } - "deploy to other node when a node becomes down" taggedAs LongRunningTest in { + "deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in { runOn(first) { def currentRoutees = Await.result(router2 ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees @@ -181,7 +211,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou replies.values.sum must be(iterationCount) } - enterBarrier("after-5") + enterBarrier("after-6") } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 60aeec3013..5c2b233c7d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -8,6 +8,7 @@ import akka.actor._ import akka.routing._ import com.typesafe.config._ import akka.cluster.routing.ClusterRouterConfig +import akka.cluster.routing.ClusterRouterSettings object ClusterDeployerSpec { val deployerConf = ConfigFactory.parseString(""" @@ -18,6 +19,7 @@ object ClusterDeployerSpec { nr-of-instances = 20 cluster.enabled = on cluster.max-nr-of-instances-per-node = 3 + cluster.deploy-on-own-node = off } } akka.remote.netty.port = 0 @@ -43,7 +45,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { Deploy( service, deployment.get.config, - ClusterRouterConfig(RoundRobinRouter(20), 20, 3), + ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings(20, 3, false)), ClusterScope))) }