From a94f7cdc98332875e21900017c7eac52dbfa2bbd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 20 Aug 2015 17:31:36 +0200 Subject: [PATCH] =clu #15412 Add paths(system) method to Group router to be able to use the role correctly in cluster aware routers This solution is very similar to what we did for nrOfInstances in Pool routers. --- .../main/scala/akka/routing/Broadcast.scala | 4 +- .../akka/routing/ConsistentHashing.scala | 2 + .../src/main/scala/akka/routing/Random.scala | 2 + .../main/scala/akka/routing/RoundRobin.scala | 2 + .../scala/akka/routing/RoutedActorCell.scala | 5 +- .../scala/akka/routing/RouterConfig.scala | 14 +- .../routing/ScatterGatherFirstCompleted.scala | 2 + .../scala/akka/routing/TailChopping.scala | 2 + .../metrics/ClusterMetricsRouting.scala | 4 +- .../routing/AdaptiveLoadBalancing.scala | 4 +- .../cluster/routing/ClusterRouterConfig.scala | 9 +- .../cluster/routing/UseRoleIgnoredSpec.scala | 130 +++++++++++++++++- .../code/docs/jrouting/RedundancyGroup.java | 2 +- .../project/migration-guide-2.3.x-2.4.x.rst | 8 ++ .../docs/routing/CustomRouterDocSpec.scala | 6 +- project/MiMa.scala | 7 +- 16 files changed, 188 insertions(+), 15 deletions(-) diff --git a/akka-actor/src/main/scala/akka/routing/Broadcast.scala b/akka-actor/src/main/scala/akka/routing/Broadcast.scala index 7110ea50cb..77fe2506b4 100644 --- a/akka-actor/src/main/scala/akka/routing/Broadcast.scala +++ b/akka-actor/src/main/scala/akka/routing/Broadcast.scala @@ -120,7 +120,7 @@ final case class BroadcastPool( */ @SerialVersionUID(1L) final case class BroadcastGroup( - paths: immutable.Iterable[String], + override val paths: immutable.Iterable[String], override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends Group { @@ -134,6 +134,8 @@ final case class BroadcastGroup( */ def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(BroadcastRoutingLogic()) /** diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala index 283bf52055..eb29ef72ee 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala @@ -364,6 +364,8 @@ final case class ConsistentHashingGroup( */ def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(ConsistentHashingRoutingLogic(system, virtualNodesFactor, hashMapping)) diff --git a/akka-actor/src/main/scala/akka/routing/Random.scala b/akka-actor/src/main/scala/akka/routing/Random.scala index ccd5d08b97..bf99c7261b 100644 --- a/akka-actor/src/main/scala/akka/routing/Random.scala +++ b/akka-actor/src/main/scala/akka/routing/Random.scala @@ -135,6 +135,8 @@ final case class RandomGroup( */ def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(RandomRoutingLogic()) /** diff --git a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala index 868e9193fb..134e6319e2 100644 --- a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala +++ b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala @@ -141,6 +141,8 @@ final case class RoundRobinGroup( */ def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(RoundRobinRoutingLogic()) /** diff --git a/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala b/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala index 9a424f6c15..427924d0a5 100644 --- a/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala +++ b/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala @@ -109,7 +109,10 @@ private[akka] class RoutedActorCell( if (nrOfRoutees > 0) addRoutees(Vector.fill(nrOfRoutees)(pool.newRoutee(routeeProps, this))) case group: Group ⇒ - val paths = group.paths + // must not use group.paths(system) for old (not re-compiled) custom routers + // for binary backwards compatibility reasons + val deprecatedPaths = group.paths + val paths = if (deprecatedPaths == null) group.paths(system) else deprecatedPaths if (paths.nonEmpty) addRoutees(paths.map(p ⇒ group.routeeFor(p, this))(collection.breakOut)) case _ ⇒ diff --git a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala index 6a8a4570ee..53b50a80dc 100644 --- a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala +++ b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala @@ -128,9 +128,16 @@ private[akka] trait PoolOverrideUnsetConfig[T <: Pool] extends Pool { * Java API: Base class for custom router [[Group]] */ abstract class GroupBase extends Group { - def getPaths: java.lang.Iterable[String] + @deprecated("Implement getPaths with ActorSystem parameter instead", "2.4") + def getPaths: java.lang.Iterable[String] = null + @deprecated("Use paths with ActorSystem parameter instead", "2.4") override final def paths: immutable.Iterable[String] = immutableSeq(getPaths) + + def getPaths(system: ActorSystem): java.lang.Iterable[String] + + override final def paths(system: ActorSystem): immutable.Iterable[String] = + immutableSeq(getPaths(system)) } /** @@ -140,7 +147,10 @@ abstract class GroupBase extends Group { */ trait Group extends RouterConfig { - def paths: immutable.Iterable[String] + @deprecated("Implement paths with ActorSystem parameter instead", "2.4") + def paths: immutable.Iterable[String] = null + + def paths(system: ActorSystem): immutable.Iterable[String] /** * [[akka.actor.Props]] for a group router based on the settings defined by diff --git a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala index 20b2f1a6f0..77391a4504 100644 --- a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala +++ b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala @@ -188,6 +188,8 @@ final case class ScatterGatherFirstCompletedGroup( def this(routeePaths: java.lang.Iterable[String], within: FiniteDuration) = this(paths = immutableSeq(routeePaths), within = within) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within)) /** diff --git a/akka-actor/src/main/scala/akka/routing/TailChopping.scala b/akka-actor/src/main/scala/akka/routing/TailChopping.scala index 7cfcb66b9e..1b44b39be3 100644 --- a/akka-actor/src/main/scala/akka/routing/TailChopping.scala +++ b/akka-actor/src/main/scala/akka/routing/TailChopping.scala @@ -222,6 +222,8 @@ final case class TailChoppingGroup( override def createRouter(system: ActorSystem): Router = new Router(TailChoppingRoutingLogic(system.scheduler, within, interval, system.dispatchers.lookup(routerDispatcher))) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + /** * Setting the dispatcher to be used for the router head actor, which handles * router management messages diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsRouting.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsRouting.scala index bf4b71418e..03e8470812 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsRouting.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsRouting.scala @@ -201,7 +201,7 @@ final case class AdaptiveLoadBalancingPool( @SerialVersionUID(1L) final case class AdaptiveLoadBalancingGroup( metricsSelector: MetricsSelector = MixMetricsSelector, - paths: immutable.Iterable[String] = Nil, + override val paths: immutable.Iterable[String] = Nil, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends Group { @@ -219,6 +219,8 @@ final case class AdaptiveLoadBalancingGroup( def this(metricsSelector: MetricsSelector, routeesPaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeesPaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector)) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala index c4fe183558..b1ae64b95f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala @@ -213,7 +213,7 @@ final case class AdaptiveLoadBalancingPool( @deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4") final case class AdaptiveLoadBalancingGroup( metricsSelector: MetricsSelector = MixMetricsSelector, - paths: immutable.Iterable[String] = Nil, + override val paths: immutable.Iterable[String] = Nil, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends Group { @@ -231,6 +231,8 @@ final case class AdaptiveLoadBalancingGroup( def this(metricsSelector: MetricsSelector, routeesPaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeesPaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector)) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 0b0e55da53..a3d34e2c17 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -128,7 +128,14 @@ private[akka] trait ClusterRouterSettingsBase { @SerialVersionUID(1L) final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings) extends Group with ClusterRouterConfigBase { - override def paths: immutable.Iterable[String] = if (settings.allowLocalRoutees) settings.routeesPaths else Nil + override def paths(system: ActorSystem): immutable.Iterable[String] = + if (settings.allowLocalRoutees && settings.useRole.isDefined) { + if (Cluster(system).selfRoles.contains(settings.useRole.get)) { + settings.routeesPaths + } else Nil + } else if (settings.allowLocalRoutees && settings.useRole.isEmpty) { + settings.routeesPaths + } else Nil /** * INTERNAL API diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala index ecbdd1939f..0ab28b00fc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala @@ -9,6 +9,7 @@ import akka.pattern.ask import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.routing.GetRoutees +import akka.routing.RoundRobinGroup import akka.routing.RoundRobinPool import akka.routing.Routees import akka.testkit.DefaultTimeout @@ -87,10 +88,15 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp runOn(first) { info("first, roles: " + cluster.selfRoles) } runOn(second) { info("second, roles: " + cluster.selfRoles) } runOn(third) { info("third, roles: " + cluster.selfRoles) } + + // routees for the group routers + system.actorOf(Props(classOf[SomeActor], GroupRoutee), "foo") + system.actorOf(Props(classOf[SomeActor], GroupRoutee), "bar") + enterBarrier("after-1") } - "local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { + "pool local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { runOn(first) { val role = Some("b") @@ -119,7 +125,36 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp enterBarrier("after-2") } - "local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { + "group local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("b") + + val router = system.actorOf(ClusterRouterGroup( + RoundRobinGroup(paths = Nil), + ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), + allowLocalRoutees = false, useRole = role)).props, + "router-2b") + + awaitAssert(currentRoutees(router).size should ===(4)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(GroupRoutee, iterationCount) + + replies(first) should ===(0) // should not be deployed locally, does not have required role + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should ===(iterationCount) + } + + enterBarrier("after-2b") + } + + "pool local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { runOn(first) { val role = Some("b") @@ -148,7 +183,36 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp enterBarrier("after-3") } - "local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { + "group local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("b") + + val router = system.actorOf(ClusterRouterGroup( + RoundRobinGroup(paths = Nil), + ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), + allowLocalRoutees = true, useRole = role)).props, + "router-3b") + + awaitAssert(currentRoutees(router).size should ===(4)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(GroupRoutee, iterationCount) + + replies(first) should ===(0) // should not be deployed locally, does not have required role + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should ===(iterationCount) + } + + enterBarrier("after-3b") + } + + "pool local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { runOn(first) { val role = Some("a") @@ -177,7 +241,36 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp enterBarrier("after-4") } - "local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { + "group local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("a") + + val router = system.actorOf(ClusterRouterGroup( + RoundRobinGroup(paths = Nil), + ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), + allowLocalRoutees = true, useRole = role)).props, + "router-4b") + + awaitAssert(currentRoutees(router).size should ===(2)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(GroupRoutee, iterationCount) + + replies(first) should be > 0 + replies(second) should ===(0) + replies(third) should ===(0) + replies.values.sum should ===(iterationCount) + } + + enterBarrier("after-4b") + } + + "pool local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { runOn(first) { val role = Some("c") @@ -206,5 +299,34 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp enterBarrier("after-5") } + "group local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("c") + + val router = system.actorOf(ClusterRouterGroup( + RoundRobinGroup(paths = Nil), + ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), + allowLocalRoutees = true, useRole = role)).props, + "router-5b") + + awaitAssert(currentRoutees(router).size should ===(6)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(GroupRoutee, iterationCount) + + replies(first) should be > 0 + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should ===(iterationCount) + } + + enterBarrier("after-5b") + } + } } diff --git a/akka-docs/rst/java/code/docs/jrouting/RedundancyGroup.java b/akka-docs/rst/java/code/docs/jrouting/RedundancyGroup.java index 6cbc2c51ac..9062e95710 100644 --- a/akka-docs/rst/java/code/docs/jrouting/RedundancyGroup.java +++ b/akka-docs/rst/java/code/docs/jrouting/RedundancyGroup.java @@ -40,7 +40,7 @@ public class RedundancyGroup extends GroupBase { } @Override - public java.lang.Iterable getPaths() { + public java.lang.Iterable getPaths(ActorSystem system) { return paths; } diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 5f027f01a4..2055e72b39 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -147,6 +147,14 @@ In order to make cluster routers smarter about when they can start local routees In case you have implemented a custom Pool you will have to update the method's signature, however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic. +Group routers paths method now takes ActorSystem +=============================================== + +In order to make cluster routers smarter about when they can start local routees, +``paths`` defined on ``Group`` now takes ``ActorSystem`` as an argument. +In case you have implemented a custom Group you will have to update the method's signature, +however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic. + Logger names use full class name ================================ Previously, few places in akka used "simple" logger names, such as ``Cluster`` or ``Remoting``. diff --git a/akka-docs/rst/scala/code/docs/routing/CustomRouterDocSpec.scala b/akka-docs/rst/scala/code/docs/routing/CustomRouterDocSpec.scala index 7406702851..d73da598da 100644 --- a/akka-docs/rst/scala/code/docs/routing/CustomRouterDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/routing/CustomRouterDocSpec.scala @@ -77,12 +77,14 @@ import akka.routing.Router import akka.japi.Util.immutableSeq import com.typesafe.config.Config -final case class RedundancyGroup(override val paths: immutable.Iterable[String], nbrCopies: Int) extends Group { +final case class RedundancyGroup(routeePaths: immutable.Iterable[String], nbrCopies: Int) extends Group { def this(config: Config) = this( - paths = immutableSeq(config.getStringList("routees.paths")), + routeePaths = immutableSeq(config.getStringList("routees.paths")), nbrCopies = config.getInt("nbr-copies")) + override def paths(system: ActorSystem): immutable.Iterable[String] = routeePaths + override def createRouter(system: ActorSystem): Router = new Router(new RedundancyRoutingLogic(nbrCopies)) diff --git a/project/MiMa.scala b/project/MiMa.scala index 585f2caa0b..9101e09714 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -417,9 +417,14 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingMethodProblem]("akka.remote.testkit.MultiNodeSpec#Replacement.akka$remote$testkit$MultiNodeSpec$Replacement$$$outer"), - // method nrOfInstances(akka.actor.ActorSystem)Int in trait akka.routing.Pool does not have a correspondent in old version + // method nrOfInstances(akka.actor.ActorSystem) in trait akka.routing.Pool does not have a correspondent in old version // ok to exclude, since we don't call nrOfInstances(sys) for old implementations ProblemFilters.exclude[MissingMethodProblem]("akka.routing.Pool.nrOfInstances"), + + // method paths(akka.actor.ActorSystem) in trait akka.routing.Group does not have a correspondent in old version + // ok to exclude, since we don't call paths(sys) for old implementations + ProblemFilters.exclude[MissingMethodProblem]("akka.routing.Group.paths"), + ProblemFilters.exclude[MissingMethodProblem]("akka.routing.GroupBase.getPaths"), // removed deprecated ProblemFilters.exclude[MissingClassProblem]("akka.actor.UntypedActorFactory"),