diff --git a/akka-actor/src/main/scala/akka/routing/Broadcast.scala b/akka-actor/src/main/scala/akka/routing/Broadcast.scala index 7110ea50cb..77fe2506b4 100644 --- a/akka-actor/src/main/scala/akka/routing/Broadcast.scala +++ b/akka-actor/src/main/scala/akka/routing/Broadcast.scala @@ -120,7 +120,7 @@ final case class BroadcastPool( */ @SerialVersionUID(1L) final case class BroadcastGroup( - paths: immutable.Iterable[String], + override val paths: immutable.Iterable[String], override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends Group { @@ -134,6 +134,8 @@ final case class BroadcastGroup( */ def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(BroadcastRoutingLogic()) /** diff --git a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala index 283bf52055..eb29ef72ee 100644 --- a/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala +++ b/akka-actor/src/main/scala/akka/routing/ConsistentHashing.scala @@ -364,6 +364,8 @@ final case class ConsistentHashingGroup( */ def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(ConsistentHashingRoutingLogic(system, virtualNodesFactor, hashMapping)) diff --git a/akka-actor/src/main/scala/akka/routing/Random.scala b/akka-actor/src/main/scala/akka/routing/Random.scala index ccd5d08b97..bf99c7261b 100644 --- a/akka-actor/src/main/scala/akka/routing/Random.scala +++ b/akka-actor/src/main/scala/akka/routing/Random.scala @@ -135,6 +135,8 @@ final case class RandomGroup( */ def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(RandomRoutingLogic()) /** diff --git a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala index 868e9193fb..134e6319e2 100644 --- a/akka-actor/src/main/scala/akka/routing/RoundRobin.scala +++ b/akka-actor/src/main/scala/akka/routing/RoundRobin.scala @@ -141,6 +141,8 @@ final case class RoundRobinGroup( */ def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(RoundRobinRoutingLogic()) /** diff --git a/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala b/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala index 9a424f6c15..427924d0a5 100644 --- a/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala +++ b/akka-actor/src/main/scala/akka/routing/RoutedActorCell.scala @@ -109,7 +109,10 @@ private[akka] class RoutedActorCell( if (nrOfRoutees > 0) addRoutees(Vector.fill(nrOfRoutees)(pool.newRoutee(routeeProps, this))) case group: Group ⇒ - val paths = group.paths + // must not use group.paths(system) for old (not re-compiled) custom routers + // for binary backwards compatibility reasons + val deprecatedPaths = group.paths + val paths = if (deprecatedPaths == null) group.paths(system) else deprecatedPaths if (paths.nonEmpty) addRoutees(paths.map(p ⇒ group.routeeFor(p, this))(collection.breakOut)) case _ ⇒ diff --git a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala index 6a8a4570ee..53b50a80dc 100644 --- a/akka-actor/src/main/scala/akka/routing/RouterConfig.scala +++ b/akka-actor/src/main/scala/akka/routing/RouterConfig.scala @@ -128,9 +128,16 @@ private[akka] trait PoolOverrideUnsetConfig[T <: Pool] extends Pool { * Java API: Base class for custom router [[Group]] */ abstract class GroupBase extends Group { - def getPaths: java.lang.Iterable[String] + @deprecated("Implement getPaths with ActorSystem parameter instead", "2.4") + def getPaths: java.lang.Iterable[String] = null + @deprecated("Use paths with ActorSystem parameter instead", "2.4") override final def paths: immutable.Iterable[String] = immutableSeq(getPaths) + + def getPaths(system: ActorSystem): java.lang.Iterable[String] + + override final def paths(system: ActorSystem): immutable.Iterable[String] = + immutableSeq(getPaths(system)) } /** @@ -140,7 +147,10 @@ abstract class GroupBase extends Group { */ trait Group extends RouterConfig { - def paths: immutable.Iterable[String] + @deprecated("Implement paths with ActorSystem parameter instead", "2.4") + def paths: immutable.Iterable[String] = null + + def paths(system: ActorSystem): immutable.Iterable[String] /** * [[akka.actor.Props]] for a group router based on the settings defined by diff --git a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala index 20b2f1a6f0..77391a4504 100644 --- a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala +++ b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala @@ -188,6 +188,8 @@ final case class ScatterGatherFirstCompletedGroup( def this(routeePaths: java.lang.Iterable[String], within: FiniteDuration) = this(paths = immutableSeq(routeePaths), within = within) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within)) /** diff --git a/akka-actor/src/main/scala/akka/routing/TailChopping.scala b/akka-actor/src/main/scala/akka/routing/TailChopping.scala index 7cfcb66b9e..1b44b39be3 100644 --- a/akka-actor/src/main/scala/akka/routing/TailChopping.scala +++ b/akka-actor/src/main/scala/akka/routing/TailChopping.scala @@ -222,6 +222,8 @@ final case class TailChoppingGroup( override def createRouter(system: ActorSystem): Router = new Router(TailChoppingRoutingLogic(system.scheduler, within, interval, system.dispatchers.lookup(routerDispatcher))) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + /** * Setting the dispatcher to be used for the router head actor, which handles * router management messages diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsRouting.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsRouting.scala index bf4b71418e..03e8470812 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsRouting.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/ClusterMetricsRouting.scala @@ -201,7 +201,7 @@ final case class AdaptiveLoadBalancingPool( @SerialVersionUID(1L) final case class AdaptiveLoadBalancingGroup( metricsSelector: MetricsSelector = MixMetricsSelector, - paths: immutable.Iterable[String] = Nil, + override val paths: immutable.Iterable[String] = Nil, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends Group { @@ -219,6 +219,8 @@ final case class AdaptiveLoadBalancingGroup( def this(metricsSelector: MetricsSelector, routeesPaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeesPaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector)) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala index c4fe183558..b1ae64b95f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/AdaptiveLoadBalancing.scala @@ -213,7 +213,7 @@ final case class AdaptiveLoadBalancingPool( @deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4") final case class AdaptiveLoadBalancingGroup( metricsSelector: MetricsSelector = MixMetricsSelector, - paths: immutable.Iterable[String] = Nil, + override val paths: immutable.Iterable[String] = Nil, override val routerDispatcher: String = Dispatchers.DefaultDispatcherId) extends Group { @@ -231,6 +231,8 @@ final case class AdaptiveLoadBalancingGroup( def this(metricsSelector: MetricsSelector, routeesPaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeesPaths)) + override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths + override def createRouter(system: ActorSystem): Router = new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector)) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 0b0e55da53..a3d34e2c17 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -128,7 +128,14 @@ private[akka] trait ClusterRouterSettingsBase { @SerialVersionUID(1L) final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings) extends Group with ClusterRouterConfigBase { - override def paths: immutable.Iterable[String] = if (settings.allowLocalRoutees) settings.routeesPaths else Nil + override def paths(system: ActorSystem): immutable.Iterable[String] = + if (settings.allowLocalRoutees && settings.useRole.isDefined) { + if (Cluster(system).selfRoles.contains(settings.useRole.get)) { + settings.routeesPaths + } else Nil + } else if (settings.allowLocalRoutees && settings.useRole.isEmpty) { + settings.routeesPaths + } else Nil /** * INTERNAL API diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala index ecbdd1939f..0ab28b00fc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/UseRoleIgnoredSpec.scala @@ -9,6 +9,7 @@ import akka.pattern.ask import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.routing.GetRoutees +import akka.routing.RoundRobinGroup import akka.routing.RoundRobinPool import akka.routing.Routees import akka.testkit.DefaultTimeout @@ -87,10 +88,15 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp runOn(first) { info("first, roles: " + cluster.selfRoles) } runOn(second) { info("second, roles: " + cluster.selfRoles) } runOn(third) { info("third, roles: " + cluster.selfRoles) } + + // routees for the group routers + system.actorOf(Props(classOf[SomeActor], GroupRoutee), "foo") + system.actorOf(Props(classOf[SomeActor], GroupRoutee), "bar") + enterBarrier("after-1") } - "local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { + "pool local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { runOn(first) { val role = Some("b") @@ -119,7 +125,36 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp enterBarrier("after-2") } - "local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { + "group local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("b") + + val router = system.actorOf(ClusterRouterGroup( + RoundRobinGroup(paths = Nil), + ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), + allowLocalRoutees = false, useRole = role)).props, + "router-2b") + + awaitAssert(currentRoutees(router).size should ===(4)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(GroupRoutee, iterationCount) + + replies(first) should ===(0) // should not be deployed locally, does not have required role + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should ===(iterationCount) + } + + enterBarrier("after-2b") + } + + "pool local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { runOn(first) { val role = Some("b") @@ -148,7 +183,36 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp enterBarrier("after-3") } - "local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { + "group local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("b") + + val router = system.actorOf(ClusterRouterGroup( + RoundRobinGroup(paths = Nil), + ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), + allowLocalRoutees = true, useRole = role)).props, + "router-3b") + + awaitAssert(currentRoutees(router).size should ===(4)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(GroupRoutee, iterationCount) + + replies(first) should ===(0) // should not be deployed locally, does not have required role + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should ===(iterationCount) + } + + enterBarrier("after-3b") + } + + "pool local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { runOn(first) { val role = Some("a") @@ -177,7 +241,36 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp enterBarrier("after-4") } - "local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { + "group local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("a") + + val router = system.actorOf(ClusterRouterGroup( + RoundRobinGroup(paths = Nil), + ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), + allowLocalRoutees = true, useRole = role)).props, + "router-4b") + + awaitAssert(currentRoutees(router).size should ===(2)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(GroupRoutee, iterationCount) + + replies(first) should be > 0 + replies(second) should ===(0) + replies(third) should ===(0) + replies.values.sum should ===(iterationCount) + } + + enterBarrier("after-4b") + } + + "pool local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { runOn(first) { val role = Some("c") @@ -206,5 +299,34 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp enterBarrier("after-5") } + "group local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in { + + runOn(first) { + val role = Some("c") + + val router = system.actorOf(ClusterRouterGroup( + RoundRobinGroup(paths = Nil), + ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"), + allowLocalRoutees = true, useRole = role)).props, + "router-5b") + + awaitAssert(currentRoutees(router).size should ===(6)) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router ! s"hit-$i" + } + + val replies = receiveReplies(GroupRoutee, iterationCount) + + replies(first) should be > 0 + replies(second) should be > 0 + replies(third) should be > 0 + replies.values.sum should ===(iterationCount) + } + + enterBarrier("after-5b") + } + } } diff --git a/akka-docs/rst/java/code/docs/jrouting/RedundancyGroup.java b/akka-docs/rst/java/code/docs/jrouting/RedundancyGroup.java index 6cbc2c51ac..9062e95710 100644 --- a/akka-docs/rst/java/code/docs/jrouting/RedundancyGroup.java +++ b/akka-docs/rst/java/code/docs/jrouting/RedundancyGroup.java @@ -40,7 +40,7 @@ public class RedundancyGroup extends GroupBase { } @Override - public java.lang.Iterable getPaths() { + public java.lang.Iterable getPaths(ActorSystem system) { return paths; } diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 5f027f01a4..2055e72b39 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -147,6 +147,14 @@ In order to make cluster routers smarter about when they can start local routees In case you have implemented a custom Pool you will have to update the method's signature, however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic. +Group routers paths method now takes ActorSystem +=============================================== + +In order to make cluster routers smarter about when they can start local routees, +``paths`` defined on ``Group`` now takes ``ActorSystem`` as an argument. +In case you have implemented a custom Group you will have to update the method's signature, +however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic. + Logger names use full class name ================================ Previously, few places in akka used "simple" logger names, such as ``Cluster`` or ``Remoting``. diff --git a/akka-docs/rst/scala/code/docs/routing/CustomRouterDocSpec.scala b/akka-docs/rst/scala/code/docs/routing/CustomRouterDocSpec.scala index 7406702851..d73da598da 100644 --- a/akka-docs/rst/scala/code/docs/routing/CustomRouterDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/routing/CustomRouterDocSpec.scala @@ -77,12 +77,14 @@ import akka.routing.Router import akka.japi.Util.immutableSeq import com.typesafe.config.Config -final case class RedundancyGroup(override val paths: immutable.Iterable[String], nbrCopies: Int) extends Group { +final case class RedundancyGroup(routeePaths: immutable.Iterable[String], nbrCopies: Int) extends Group { def this(config: Config) = this( - paths = immutableSeq(config.getStringList("routees.paths")), + routeePaths = immutableSeq(config.getStringList("routees.paths")), nbrCopies = config.getInt("nbr-copies")) + override def paths(system: ActorSystem): immutable.Iterable[String] = routeePaths + override def createRouter(system: ActorSystem): Router = new Router(new RedundancyRoutingLogic(nbrCopies)) diff --git a/project/MiMa.scala b/project/MiMa.scala index 585f2caa0b..9101e09714 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -417,9 +417,14 @@ object MiMa extends AutoPlugin { ProblemFilters.exclude[MissingMethodProblem]("akka.remote.testkit.MultiNodeSpec#Replacement.akka$remote$testkit$MultiNodeSpec$Replacement$$$outer"), - // method nrOfInstances(akka.actor.ActorSystem)Int in trait akka.routing.Pool does not have a correspondent in old version + // method nrOfInstances(akka.actor.ActorSystem) in trait akka.routing.Pool does not have a correspondent in old version // ok to exclude, since we don't call nrOfInstances(sys) for old implementations ProblemFilters.exclude[MissingMethodProblem]("akka.routing.Pool.nrOfInstances"), + + // method paths(akka.actor.ActorSystem) in trait akka.routing.Group does not have a correspondent in old version + // ok to exclude, since we don't call paths(sys) for old implementations + ProblemFilters.exclude[MissingMethodProblem]("akka.routing.Group.paths"), + ProblemFilters.exclude[MissingMethodProblem]("akka.routing.GroupBase.getPaths"), // removed deprecated ProblemFilters.exclude[MissingClassProblem]("akka.actor.UntypedActorFactory"),