From 402674ce10940d0a45fe0fb45ea0d405a1d0da71 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 16 Oct 2013 11:06:38 +0200 Subject: [PATCH] +clu #3627 Cluster router group with multiple paths per node * Use the ordinary routees.paths config property instead of cluster.routees-path * Backwards compatible in deprecation phase --- akka-actor/src/main/resources/reference.conf | 4 +- .../src/main/resources/reference.conf | 6 +- .../cluster/ClusterActorRefProvider.scala | 3 +- .../routing/AdaptiveLoadBalancing.scala | 10 +- .../cluster/routing/ClusterRouterConfig.scala | 112 ++++++++++++------ .../DeprecatedClusterRouterConfig.scala | 2 +- .../scala/akka/cluster/StressSpec.scala | 2 +- .../AdaptiveLoadBalancingRouterSpec.scala | 4 +- .../routing/ClusterRoundRobinSpec.scala | 47 ++++---- .../akka/cluster/ClusterDeployerSpec.scala | 28 ++++- akka-docs/rst/java/cluster-usage.rst | 10 +- .../project/migration-guide-2.2.x-2.3.x.rst | 14 ++- akka-docs/rst/scala/cluster-usage.rst | 10 +- .../cluster/stats/japi/StatsService.java | 7 +- .../src/main/resources/application.conf | 4 +- .../cluster/factorial/FactorialSample.scala | 2 +- .../sample/cluster/stats/StatsSample.scala | 2 +- .../cluster/stats/StatsSampleSpec.scala | 2 +- .../stats/japi/StatsSampleJapiSpec.scala | 2 +- 19 files changed, 168 insertions(+), 103 deletions(-) diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index c8f2f1d64f..43ab67a4b4 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -151,8 +151,8 @@ akka { # In case of routing, the actors to be routed to can be specified # in several ways: # - nr-of-instances: will create that many children - # - routees.paths: will look the paths up using actorFor and route to - # them, i.e. will not create children + # - routees.paths: will route messages to these paths using ActorSelection, + # i.e. will not create children # - resizer: dynamically resizable number of routees as specified in # resizer below router = "from-code" diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index af641df32f..ea9a208a9d 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -216,11 +216,7 @@ akka { # Useful for master-worker scenario where all routees are remote. allow-local-routees = on - # Actor path of the routees to lookup with actorFor on the member - # nodes in the cluster. E.g. "/user/myservice". If this isn't defined - # the routees will be deployed instead of looked up. - # max-nr-of-instances-per-node should not be configured (default value is 1) - # when routees-path is defined. + # Deprecated in 2.3, use routees.paths instead routees-path = "" # Use members with specified role, or all members if undefined or empty. diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 63a0d7600d..b1d03db1ff 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -90,8 +90,7 @@ private[akka] class ClusterActorRefProvider( private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends RemoteDeployer(_settings, _pm) { override def parseConfig(path: String, config: Config): Option[Deploy] = { - // For backwards compatibility we must add this fake routees.paths so that the deployer creates a Group - // even though routees.paths is not defined. This will be cleaned up by ticket #3627 + // For backwards compatibility we must transform 'cluster.routees-path' to 'routees.paths' val config2 = if (config.hasPath("cluster.routees-path")) config.withFallback(ConfigFactory.parseString(s"""routees.paths=["${config.getString("cluster.routees-path")}"]""")) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala index 9838c99925..a4fafe3a7f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala @@ -219,11 +219,11 @@ final case class AdaptiveLoadBalancingGroup( * Java API * @param metricsSelector decides what probability to use for selecting a routee, based * on remaining capacity as indicated by the node metrics - * @param routeePaths string representation of the actor paths of the routees, messages are + * @param routeesPaths string representation of the actor paths of the routees, messages are * sent with [[akka.actor.ActorSelection]] to these paths */ def this(metricsSelector: MetricsSelector, - routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths)) + routeesPaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeesPaths)) override def createRouter(system: ActorSystem): Router = new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector)) @@ -566,11 +566,11 @@ case class AdaptiveLoadBalancingRouter( * Java API: Constructor that sets the routees to be used. * * @param selector the selector is responsible for producing weighted mix of routees from the node metrics - * @param routeePaths string representation of the actor paths of the routees that will be looked up + * @param routeesPaths string representation of the actor paths of the routees that will be looked up * using `actorFor` in [[akka.actor.ActorRefProvider]] */ - def this(selector: MetricsSelector, routeePaths: java.lang.Iterable[String]) = - this(metricsSelector = selector, routees = immutableSeq(routeePaths)) + def this(selector: MetricsSelector, routeesPaths: java.lang.Iterable[String]) = + this(metricsSelector = selector, routees = immutableSeq(routeesPaths)) /** * Java API: Constructor that sets the resizer to be used. diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 0f770f457e..cc30d1a6f7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -40,12 +40,13 @@ import akka.routing.RoutingLogic import akka.actor.RelativeActorPath import com.typesafe.config.Config import akka.routing.DeprecatedRouterConfig +import akka.japi.Util.immutableSeq object ClusterRouterGroupSettings { def fromConfig(config: Config): ClusterRouterGroupSettings = ClusterRouterGroupSettings( totalInstances = config.getInt("nr-of-instances"), - routeesPath = config.getString("cluster.routees-path"), + routeesPaths = immutableSeq(config.getStringList("routees.paths")), allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"), useRole = ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role"))) } @@ -56,26 +57,40 @@ object ClusterRouterGroupSettings { @SerialVersionUID(1L) case class ClusterRouterGroupSettings( totalInstances: Int, - routeesPath: String, + routeesPaths: immutable.Seq[String], allowLocalRoutees: Boolean, useRole: Option[String]) extends ClusterRouterSettingsBase { + @deprecated("Use constructor with routeesPaths Seq", "2.3") + def this( + totalInstances: Int, + routeesPath: String, + allowLocalRoutees: Boolean, + useRole: Option[String]) = + this(totalInstances, List(routeesPath), allowLocalRoutees, useRole) + /** * Java API */ + def this(totalInstances: Int, routeesPaths: java.lang.Iterable[String], allowLocalRoutees: Boolean, useRole: String) = + this(totalInstances, immutableSeq(routeesPaths), allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole)) + + /** + * Java API + */ + @deprecated("Use constructor with routeesPaths Iterable", "2.3") def this(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: String) = this(totalInstances, routeesPath, allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole)) if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0") - if (!isRouteesPathDefined) throw new IllegalArgumentException("routeesPath must be defined") + if ((routeesPaths eq null) || routeesPaths.isEmpty || routeesPaths.head == "") + throw new IllegalArgumentException("routeesPaths must be defined") - routeesPath match { + routeesPaths.foreach(p ⇒ p match { case RelativeActorPath(elements) ⇒ // good case _ ⇒ - throw new IllegalArgumentException("routeesPath [%s] is not a valid relative actor path" format routeesPath) - } - - def isRouteesPathDefined: Boolean = (routeesPath ne null) && routeesPath != "" + throw new IllegalArgumentException(s"routeesPaths [$p] is not a valid relative actor path") + }) } @@ -140,9 +155,7 @@ private[akka] trait ClusterRouterSettingsBase { @SerialVersionUID(1L) final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings) extends Group with ClusterRouterConfigBase { - require(settings.routeesPath.nonEmpty, "routeesPath must be defined") - - override def paths: immutable.Iterable[String] = if (settings.allowLocalRoutees) List(settings.routeesPath) else Nil + override def paths: immutable.Iterable[String] = if (settings.allowLocalRoutees) settings.routeesPaths else Nil /** * INTERNAL API @@ -259,7 +272,23 @@ private[akka] class ClusterRouterPoolActor( doAddRoutees() } - override def maxInstancesPerNode: Int = settings.maxInstancesPerNode + def selectDeploymentTarget: Option[Address] = { + val currentRoutees = cell.router.routees + val currentNodes = availableNodes + if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) { + None + } else { + // find the node with least routees + val numberOfRouteesPerNode: Map[Address, Int] = + currentRoutees.foldLeft(currentNodes.map(_ -> 0).toMap.withDefaultValue(0)) { (acc, x) ⇒ + val address = fullAddress(x) + acc + (address -> (acc(address) + 1)) + } + + val (address, count) = numberOfRouteesPerNode.minBy(_._2) + if (count < settings.maxInstancesPerNode) Some(address) else None + } + } } @@ -277,6 +306,12 @@ private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSett override def receive = clusterReceive orElse super.receive + var usedRouteePaths: Map[Address, Set[String]] = + if (settings.allowLocalRoutees) + Map(cluster.selfAddress -> settings.routeesPaths.toSet) + else + Map.empty + /** * Adds routees based on totalInstances and maxInstancesPerNode settings */ @@ -284,8 +319,9 @@ private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSett @tailrec def doAddRoutees(): Unit = selectDeploymentTarget match { case None ⇒ // done - case Some(target) ⇒ - val routee = group.routeeFor(target + settings.routeesPath, context) + case Some((address, path)) ⇒ + val routee = group.routeeFor(address + path, context) + usedRouteePaths = usedRouteePaths.updated(address, usedRouteePaths.getOrElse(address, Set.empty) + path) // must register each one, since registered routees are used in selectDeploymentTarget cell.addRoutee(routee) @@ -296,8 +332,28 @@ private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSett doAddRoutees() } - override def maxInstancesPerNode: Int = 1 + def selectDeploymentTarget: Option[(Address, String)] = { + val currentRoutees = cell.router.routees + val currentNodes = availableNodes + if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) { + None + } else { + // find the node with least routees + val unusedNodes = currentNodes filterNot usedRouteePaths.contains + if (unusedNodes.nonEmpty) { + Some((unusedNodes.head, settings.routeesPaths.head)) + } else { + val (address, used) = usedRouteePaths.minBy { case (address, used) ⇒ used.size } + // pick next of the unused paths + settings.routeesPaths.collectFirst { case p if !used.contains(p) ⇒ (address, p) } + } + } + } + override def removeMember(member: Member): Unit = { + usedRouteePaths -= member.address + super.removeMember(member) + } } /** @@ -364,36 +420,16 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒ } /** - * Adds routees based on totalInstances and maxInstancesPerNode settings + * Adds routees based on settings */ def addRoutees(): Unit - def maxInstancesPerNode: Int - - def selectDeploymentTarget: Option[Address] = { - val currentRoutees = cell.router.routees - val currentNodes = availableNodes - if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) { - None - } else { - // find the node with least routees - val numberOfRouteesPerNode: Map[Address, Int] = - currentRoutees.foldLeft(currentNodes.map(_ -> 0).toMap.withDefaultValue(0)) { (acc, x) ⇒ - val address = fullAddress(x) - acc + (address -> (acc(address) + 1)) - } - - val (address, count) = numberOfRouteesPerNode.minBy(_._2) - if (count < maxInstancesPerNode) Some(address) else None - } - } - - def addMember(member: Member) = { + def addMember(member: Member): Unit = { nodes += member.address addRoutees() } - def removeMember(member: Member) = { + def removeMember(member: Member): Unit = { val address = member.address nodes -= address diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/DeprecatedClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/DeprecatedClusterRouterConfig.scala index a4d9c36a00..03e3615098 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/DeprecatedClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/DeprecatedClusterRouterConfig.scala @@ -149,7 +149,7 @@ final case class ClusterRouterConfig(local: DeprecatedRouterConfig, settings: Cl new ClusterRouterPoolActor(local.supervisorStrategy, ClusterRouterPoolSettings(settings.totalInstances, settings.maxInstancesPerNode, settings.allowLocalRoutees, settings.useRole)) else - new ClusterRouterGroupActor(ClusterRouterGroupSettings(settings.totalInstances, settings.routeesPath, + new ClusterRouterGroupActor(ClusterRouterGroupSettings(settings.totalInstances, List(settings.routeesPath), settings.allowLocalRoutees, settings.useRole)) override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 6a94835cf1..bb79c241f5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -147,9 +147,9 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { /master-node-2/workers { router = round-robin-group nr-of-instances = 100 + routees.paths = ["/user/worker"] cluster { enabled = on - routees-path = "/user/worker" allow-local-routees = on } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala index 3ff99ea291..5f5e6a4896 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/AdaptiveLoadBalancingRouterSpec.scala @@ -25,7 +25,7 @@ import akka.routing.Routees object AdaptiveLoadBalancingRouterMultiJvmSpec extends MultiNodeConfig { - class Routee extends Actor { + class Echo extends Actor { def receive = { case _ ⇒ sender ! Reply(Cluster(context.system).selfAddress) } @@ -117,7 +117,7 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa val router = system.actorOf(ClusterRouterPool( local = AdaptiveLoadBalancingPool(HeapMetricsSelector), settings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 1, allowLocalRoutees = true, useRole = None)). - props(Props[Routee]), + props(Props[Echo]), name) // it may take some time until router receives cluster member events awaitAssert { currentRoutees(router).size must be(roles.size) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala index 160d74fcef..69dbf8191f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinSpec.scala @@ -30,7 +30,7 @@ import akka.routing.Routees object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig { class SomeActor(routeeType: RouteeType) extends Actor { - def this() = this(DeployRoutee) + def this() = this(PoolRoutee) def receive = { case "hit" ⇒ sender ! Reply(routeeType, self) @@ -40,8 +40,8 @@ object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig { case class Reply(routeeType: RouteeType, ref: ActorRef) sealed trait RouteeType extends Serializable - object DeployRoutee extends RouteeType - object LookupRoutee extends RouteeType + object PoolRoutee extends RouteeType + object GroupRoutee extends RouteeType val first = role("first") val second = role("second") @@ -71,10 +71,8 @@ object ClusterRoundRobinMultiJvmSpec extends MultiNodeConfig { /router4 { router = round-robin nr-of-instances = 10 - cluster { - enabled = on - routees-path = "/user/myservice" - } + routees.paths = ["/user/myserviceA", "/user/myserviceB"] + cluster.enabled = on } /router5 { router = round-robin @@ -153,7 +151,7 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult router1 ! "hit" } - val replies = receiveReplies(DeployRoutee, iterationCount) + val replies = receiveReplies(PoolRoutee, iterationCount) replies(first) must be > (0) replies(second) must be > (0) @@ -169,19 +167,20 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult // cluster consists of first and second - system.actorOf(Props(classOf[SomeActor], LookupRoutee), "myservice") + system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceA") + system.actorOf(Props(classOf[SomeActor], GroupRoutee), "myserviceB") enterBarrier("myservice-started") runOn(first) { - // 2 nodes, 1 routee on each node - awaitAssert(currentRoutees(router4).size must be(2)) + // 2 nodes, 2 routees on each node + awaitAssert(currentRoutees(router4).size must be(4)) val iterationCount = 10 for (i ← 0 until iterationCount) { router4 ! "hit" } - val replies = receiveReplies(LookupRoutee, iterationCount) + val replies = receiveReplies(GroupRoutee, iterationCount) replies(first) must be > (0) replies(second) must be > (0) @@ -207,7 +206,7 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult router1 ! "hit" } - val replies = receiveReplies(DeployRoutee, iterationCount) + val replies = receiveReplies(PoolRoutee, iterationCount) replies.values.foreach { _ must be > (0) } replies.values.sum must be(iterationCount) @@ -221,15 +220,15 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult // cluster consists of first, second, third and fourth runOn(first) { - // 4 nodes, 1 routee on each node - awaitAssert(currentRoutees(router4).size must be(4)) + // 4 nodes, 2 routee on each node + awaitAssert(currentRoutees(router4).size must be(8)) val iterationCount = 10 for (i ← 0 until iterationCount) { router4 ! "hit" } - val replies = receiveReplies(LookupRoutee, iterationCount) + val replies = receiveReplies(GroupRoutee, iterationCount) replies.values.foreach { _ must be > (0) } replies.values.sum must be(iterationCount) @@ -249,7 +248,7 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult router3 ! "hit" } - val replies = receiveReplies(DeployRoutee, iterationCount) + val replies = receiveReplies(PoolRoutee, iterationCount) replies(first) must be(0) replies(second) must be > (0) @@ -271,7 +270,7 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult router5 ! "hit" } - val replies = receiveReplies(DeployRoutee, iterationCount) + val replies = receiveReplies(PoolRoutee, iterationCount) replies(first) must be > (0) replies(second) must be > (0) @@ -296,7 +295,7 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult router2 ! "hit" } - val replies = receiveReplies(DeployRoutee, iterationCount) + val replies = receiveReplies(PoolRoutee, iterationCount) // note that router2 has totalInstances = 3, maxInstancesPerNode = 1 val routees = currentRoutees(router2) @@ -317,16 +316,16 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult def routeeAddresses = (routees map { case ActorSelectionRoutee(sel) ⇒ fullAddress(sel.anchor) }).toSet runOn(first) { - // 4 nodes, 1 routee on each node - awaitAssert(currentRoutees(router4).size must be(4)) + // 4 nodes, 2 routees on each node + awaitAssert(currentRoutees(router4).size must be(8)) testConductor.blackhole(first, second, Direction.Both).await - awaitAssert(routees.size must be(3)) + awaitAssert(routees.size must be(6)) routeeAddresses must not contain (address(second)) testConductor.passThrough(first, second, Direction.Both).await - awaitAssert(routees.size must be(4)) + awaitAssert(routees.size must be(8)) routeeAddresses must contain(address(second)) } @@ -360,7 +359,7 @@ abstract class ClusterRoundRobinSpec extends MultiNodeSpec(ClusterRoundRobinMult router2 ! "hit" } - val replies = receiveReplies(DeployRoutee, iterationCount) + val replies = receiveReplies(PoolRoutee, iterationCount) routeeAddresses.size must be(3) replies.values.sum must be(iterationCount) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 1c7cddd9ea..847136e25c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -24,6 +24,16 @@ object ClusterDeployerSpec { cluster.allow-local-routees = off } /user/service2 { + dispatcher = mydispatcher + mailbox = mymailbox + router = round-robin + nr-of-instances = 20 + routees.paths = ["/user/myservice"] + cluster.enabled = on + cluster.allow-local-routees = off + } + # deprecated cluster.routees-path + /user/service3 { dispatcher = mydispatcher mailbox = mymailbox router = round-robin @@ -73,7 +83,23 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { service, deployment.get.config, ClusterRouterGroup(RoundRobinGroup(List("/user/myservice")), ClusterRouterGroupSettings( - totalInstances = 20, routeesPath = "/user/myservice", allowLocalRoutees = false, useRole = None)), + totalInstances = 20, routeesPaths = List("/user/myservice"), allowLocalRoutees = false, useRole = None)), + ClusterScope, + "mydispatcher", + "mymailbox"))) + } + + "be able to parse 'akka.actor.deployment._' with deprecated 'cluster.routees-path'" in { + val service = "/user/service3" + val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service.split("/").drop(1)) + deployment must not be (None) + + deployment must be(Some( + Deploy( + service, + deployment.get.config, + ClusterRouterGroup(RoundRobinGroup(List("/user/myservice")), ClusterRouterGroupSettings( + totalInstances = 20, routeesPaths = List("/user/myservice"), allowLocalRoutees = false, useRole = None)), ClusterScope, "mydispatcher", "mymailbox"))) diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index 67b01314f8..8dd2db1661 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -457,14 +457,12 @@ That is not done by the router. The configuration for a group looks like this: available at that point it will be removed from the router and it will only re-try when the cluster members are changed. -It is the relative actor path defined in ``routees-path`` that identify what actor to lookup. +It is the relative actor paths defined in ``routees.paths`` that identify what actor to lookup. It is possible to limit the lookup of routees to member nodes tagged with a certain role by specifying ``use-role``. -``nr-of-instances`` defines total number of routees in the cluster, but there will not be -more than one per node. That routee actor could easily fan out to local children if more parallelism -is needed. Setting ``nr-of-instances`` to a high value will result in new routees -added to the router when nodes join the cluster. +``nr-of-instances`` defines total number of routees in the cluster. Setting ``nr-of-instances`` +to a high value will result in new routees added to the router when nodes join the cluster. The same type of router could also have been defined in code: @@ -506,7 +504,7 @@ The service that receives text from users and splits it up into words, delegates Note, nothing cluster specific so far, just plain actors. All nodes start ``StatsService`` and ``StatsWorker`` actors. Remember, routees are the workers in this case. -The router is configured with ``routees-path``: +The router is configured with ``routees.paths``: .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#config-router-lookup diff --git a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst index 8cacb7bc82..4cc1139a17 100644 --- a/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst +++ b/akka-docs/rst/project/migration-guide-2.2.x-2.3.x.rst @@ -73,10 +73,22 @@ Example in Java:: getContext().actorOf(new RoundRobinPool(5).props(Props.create(Worker.class)), "router2"); +To support multiple routee paths for a cluster aware router sending to paths the deployment configuration +property ``cluster.routees-path`` has been changed to string list ``routees.paths`` property. +The old ``cluster.routees-path`` is deprecated, but still working during the deprecation phase. + +Example:: + + /router4 { + router = round-robin + nr-of-instances = 10 + routees.paths = ["/user/myserviceA", "/user/myserviceB"] + cluster.enabled = on + } + The API for creating custom routers and resizers have changed without keeping the old API as deprecated. That should be a an API used by only a few users and they should be able to migrate to the new API without much trouble. Read more about the new routers in the :ref:`documentation for Scala ` and :ref:`documentation for Java `. - diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index f39210f42c..aeba1ec383 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -448,14 +448,12 @@ That is not done by the router. The configuration for a group looks like this: available at that point it will be removed from the router and it will only re-try when the cluster members are changed. -It is the relative actor path defined in ``routees-path`` that identify what actor to lookup. +It is the relative actor paths defined in ``routees.paths`` that identify what actor to lookup. It is possible to limit the lookup of routees to member nodes tagged with a certain role by specifying ``use-role``. -``nr-of-instances`` defines total number of routees in the cluster, but there will not be -more than one per node. That routee actor could easily fan out to local children if more parallelism -is needed. Setting ``nr-of-instances`` to a high value will result in new routees -added to the router when nodes join the cluster. +``nr-of-instances`` defines total number of routees in the cluster. Setting ``nr-of-instances`` +to a high value will result in new routees added to the router when nodes join the cluster. The same type of router could also have been defined in code: @@ -495,7 +493,7 @@ The service that receives text from users and splits it up into words, delegates Note, nothing cluster specific so far, just plain actors. All nodes start ``StatsService`` and ``StatsWorker`` actors. Remember, routees are the workers in this case. -The router is configured with ``routees-path``: +The router is configured with ``routees.paths``: .. includecode:: ../../../akka-samples/akka-sample-cluster/src/main/resources/application.conf#config-router-lookup diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java index 220be6c1b8..4e490925ca 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsService.java @@ -3,6 +3,7 @@ package sample.cluster.stats.japi; import java.util.Collections; import sample.cluster.stats.japi.StatsMessages.StatsJob; + //#imports import akka.actor.ActorRef; import akka.actor.Props; @@ -60,13 +61,13 @@ public class StatsService extends UntypedActor { abstract class StatsService2 extends UntypedActor { //#router-lookup-in-code int totalInstances = 100; - String routeesPath = "/user/statsWorker"; + Iterable routeesPaths = Collections.singletonList("/user/statsWorker"); boolean allowLocalRoutees = true; String useRole = "compute"; ActorRef workerRouter = getContext().actorOf( new ClusterRouterGroup( - new ConsistentHashingGroup(Collections.emptyList()), new ClusterRouterGroupSettings( - totalInstances, routeesPath, allowLocalRoutees, useRole)).props(), + new ConsistentHashingGroup(routeesPaths), new ClusterRouterGroupSettings( + totalInstances, routeesPaths, allowLocalRoutees, useRole)).props(), "workerRouter2"); //#router-lookup-in-code } diff --git a/akka-samples/akka-sample-cluster/src/main/resources/application.conf b/akka-samples/akka-sample-cluster/src/main/resources/application.conf index 4a5dead489..2b9eb7ae19 100644 --- a/akka-samples/akka-sample-cluster/src/main/resources/application.conf +++ b/akka-samples/akka-sample-cluster/src/main/resources/application.conf @@ -26,9 +26,9 @@ akka.actor.deployment { /statsService/workerRouter { router = consistent-hashing-group nr-of-instances = 100 + routees.paths = ["/user/statsWorker"] cluster { enabled = on - routees-path = "/user/statsWorker" allow-local-routees = on use-role = compute } @@ -60,9 +60,9 @@ akka.actor.deployment { # metrics-selector = cpu metrics-selector = mix nr-of-instances = 100 + routees.paths = ["/user/factorialBackend"] cluster { enabled = on - routees-path = "/user/factorialBackend" use-role = backend allow-local-routees = off } diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala index 04cd7d7dd9..f08d37ac6e 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/factorial/FactorialSample.scala @@ -149,7 +149,7 @@ abstract class FactorialFrontend2 extends Actor { val backend = context.actorOf( ClusterRouterGroup(AdaptiveLoadBalancingGroup(HeapMetricsSelector), ClusterRouterGroupSettings( - totalInstances = 100, routeesPath = "/user/factorialBackend", + totalInstances = 100, routeesPaths = List("/user/factorialBackend"), allowLocalRoutees = true, useRole = Some("backend"))).props(), name = "factorialBackendRouter2") //#router-lookup-in-code diff --git a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala index 85c2128c82..888eec38d2 100644 --- a/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala +++ b/akka-samples/akka-sample-cluster/src/main/scala/sample/cluster/stats/StatsSample.scala @@ -230,7 +230,7 @@ abstract class StatsService2 extends Actor { val workerRouter = context.actorOf( ClusterRouterGroup(ConsistentHashingGroup(Nil), ClusterRouterGroupSettings( - totalInstances = 100, routeesPath = "/user/statsWorker", + totalInstances = 100, routeesPaths = List("/user/statsWorker"), allowLocalRoutees = true, useRole = Some("compute"))).props(), name = "workerRouter2") //#router-lookup-in-code diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala index 4b8534b785..efd18eb7ff 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/StatsSampleSpec.scala @@ -34,9 +34,9 @@ object StatsSampleSpecConfig extends MultiNodeConfig { /statsService/workerRouter { router = consistent-hashing-group nr-of-instances = 100 + routees.paths = ["/user/statsWorker"] cluster { enabled = on - routees-path = "/user/statsWorker" allow-local-routees = on use-role = compute } diff --git a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala index 0ff510b418..3210a3999f 100644 --- a/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala +++ b/akka-samples/akka-sample-cluster/src/multi-jvm/scala/sample/cluster/stats/japi/StatsSampleJapiSpec.scala @@ -37,9 +37,9 @@ object StatsSampleJapiSpecConfig extends MultiNodeConfig { /statsService/workerRouter { router = consistent-hashing-group nr-of-instances = 100 + routees.paths = ["/user/statsWorker"] cluster { enabled = on - routees-path = "/user/statsWorker" allow-local-routees = on use-role = compute }