Merge pull request #18281 from akka/wip-15412-group-paths-patriknw
=clu #15412 Add paths(system) method to Group router
This commit is contained in:
commit
12bee9433e
16 changed files with 188 additions and 15 deletions
|
|
@ -120,7 +120,7 @@ final case class BroadcastPool(
|
|||
*/
|
||||
@SerialVersionUID(1L)
|
||||
final case class BroadcastGroup(
|
||||
paths: immutable.Iterable[String],
|
||||
override val paths: immutable.Iterable[String],
|
||||
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
|
||||
extends Group {
|
||||
|
||||
|
|
@ -134,6 +134,8 @@ final case class BroadcastGroup(
|
|||
*/
|
||||
def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths))
|
||||
|
||||
override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths
|
||||
|
||||
override def createRouter(system: ActorSystem): Router = new Router(BroadcastRoutingLogic())
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -364,6 +364,8 @@ final case class ConsistentHashingGroup(
|
|||
*/
|
||||
def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths))
|
||||
|
||||
override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths
|
||||
|
||||
override def createRouter(system: ActorSystem): Router =
|
||||
new Router(ConsistentHashingRoutingLogic(system, virtualNodesFactor, hashMapping))
|
||||
|
||||
|
|
|
|||
|
|
@ -135,6 +135,8 @@ final case class RandomGroup(
|
|||
*/
|
||||
def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths))
|
||||
|
||||
override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths
|
||||
|
||||
override def createRouter(system: ActorSystem): Router = new Router(RandomRoutingLogic())
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -141,6 +141,8 @@ final case class RoundRobinGroup(
|
|||
*/
|
||||
def this(routeePaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeePaths))
|
||||
|
||||
override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths
|
||||
|
||||
override def createRouter(system: ActorSystem): Router = new Router(RoundRobinRoutingLogic())
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -109,7 +109,10 @@ private[akka] class RoutedActorCell(
|
|||
if (nrOfRoutees > 0)
|
||||
addRoutees(Vector.fill(nrOfRoutees)(pool.newRoutee(routeeProps, this)))
|
||||
case group: Group ⇒
|
||||
val paths = group.paths
|
||||
// must not use group.paths(system) for old (not re-compiled) custom routers
|
||||
// for binary backwards compatibility reasons
|
||||
val deprecatedPaths = group.paths
|
||||
val paths = if (deprecatedPaths == null) group.paths(system) else deprecatedPaths
|
||||
if (paths.nonEmpty)
|
||||
addRoutees(paths.map(p ⇒ group.routeeFor(p, this))(collection.breakOut))
|
||||
case _ ⇒
|
||||
|
|
|
|||
|
|
@ -128,9 +128,16 @@ private[akka] trait PoolOverrideUnsetConfig[T <: Pool] extends Pool {
|
|||
* Java API: Base class for custom router [[Group]]
|
||||
*/
|
||||
abstract class GroupBase extends Group {
|
||||
def getPaths: java.lang.Iterable[String]
|
||||
@deprecated("Implement getPaths with ActorSystem parameter instead", "2.4")
|
||||
def getPaths: java.lang.Iterable[String] = null
|
||||
|
||||
@deprecated("Use paths with ActorSystem parameter instead", "2.4")
|
||||
override final def paths: immutable.Iterable[String] = immutableSeq(getPaths)
|
||||
|
||||
def getPaths(system: ActorSystem): java.lang.Iterable[String]
|
||||
|
||||
override final def paths(system: ActorSystem): immutable.Iterable[String] =
|
||||
immutableSeq(getPaths(system))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -140,7 +147,10 @@ abstract class GroupBase extends Group {
|
|||
*/
|
||||
trait Group extends RouterConfig {
|
||||
|
||||
def paths: immutable.Iterable[String]
|
||||
@deprecated("Implement paths with ActorSystem parameter instead", "2.4")
|
||||
def paths: immutable.Iterable[String] = null
|
||||
|
||||
def paths(system: ActorSystem): immutable.Iterable[String]
|
||||
|
||||
/**
|
||||
* [[akka.actor.Props]] for a group router based on the settings defined by
|
||||
|
|
|
|||
|
|
@ -188,6 +188,8 @@ final case class ScatterGatherFirstCompletedGroup(
|
|||
def this(routeePaths: java.lang.Iterable[String], within: FiniteDuration) =
|
||||
this(paths = immutableSeq(routeePaths), within = within)
|
||||
|
||||
override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths
|
||||
|
||||
override def createRouter(system: ActorSystem): Router = new Router(ScatterGatherFirstCompletedRoutingLogic(within))
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -222,6 +222,8 @@ final case class TailChoppingGroup(
|
|||
override def createRouter(system: ActorSystem): Router =
|
||||
new Router(TailChoppingRoutingLogic(system.scheduler, within, interval, system.dispatchers.lookup(routerDispatcher)))
|
||||
|
||||
override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths
|
||||
|
||||
/**
|
||||
* Setting the dispatcher to be used for the router head actor, which handles
|
||||
* router management messages
|
||||
|
|
|
|||
|
|
@ -201,7 +201,7 @@ final case class AdaptiveLoadBalancingPool(
|
|||
@SerialVersionUID(1L)
|
||||
final case class AdaptiveLoadBalancingGroup(
|
||||
metricsSelector: MetricsSelector = MixMetricsSelector,
|
||||
paths: immutable.Iterable[String] = Nil,
|
||||
override val paths: immutable.Iterable[String] = Nil,
|
||||
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
|
||||
extends Group {
|
||||
|
||||
|
|
@ -219,6 +219,8 @@ final case class AdaptiveLoadBalancingGroup(
|
|||
def this(metricsSelector: MetricsSelector,
|
||||
routeesPaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeesPaths))
|
||||
|
||||
override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths
|
||||
|
||||
override def createRouter(system: ActorSystem): Router =
|
||||
new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector))
|
||||
|
||||
|
|
|
|||
|
|
@ -213,7 +213,7 @@ final case class AdaptiveLoadBalancingPool(
|
|||
@deprecated("Superseded by akka.cluster.metrics (in akka-cluster-metrics jar)", "2.4")
|
||||
final case class AdaptiveLoadBalancingGroup(
|
||||
metricsSelector: MetricsSelector = MixMetricsSelector,
|
||||
paths: immutable.Iterable[String] = Nil,
|
||||
override val paths: immutable.Iterable[String] = Nil,
|
||||
override val routerDispatcher: String = Dispatchers.DefaultDispatcherId)
|
||||
extends Group {
|
||||
|
||||
|
|
@ -231,6 +231,8 @@ final case class AdaptiveLoadBalancingGroup(
|
|||
def this(metricsSelector: MetricsSelector,
|
||||
routeesPaths: java.lang.Iterable[String]) = this(paths = immutableSeq(routeesPaths))
|
||||
|
||||
override def paths(system: ActorSystem): immutable.Iterable[String] = this.paths
|
||||
|
||||
override def createRouter(system: ActorSystem): Router =
|
||||
new Router(AdaptiveLoadBalancingRoutingLogic(system, metricsSelector))
|
||||
|
||||
|
|
|
|||
|
|
@ -128,7 +128,14 @@ private[akka] trait ClusterRouterSettingsBase {
|
|||
@SerialVersionUID(1L)
|
||||
final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings) extends Group with ClusterRouterConfigBase {
|
||||
|
||||
override def paths: immutable.Iterable[String] = if (settings.allowLocalRoutees) settings.routeesPaths else Nil
|
||||
override def paths(system: ActorSystem): immutable.Iterable[String] =
|
||||
if (settings.allowLocalRoutees && settings.useRole.isDefined) {
|
||||
if (Cluster(system).selfRoles.contains(settings.useRole.get)) {
|
||||
settings.routeesPaths
|
||||
} else Nil
|
||||
} else if (settings.allowLocalRoutees && settings.useRole.isEmpty) {
|
||||
settings.routeesPaths
|
||||
} else Nil
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.pattern.ask
|
|||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.routing.GetRoutees
|
||||
import akka.routing.RoundRobinGroup
|
||||
import akka.routing.RoundRobinPool
|
||||
import akka.routing.Routees
|
||||
import akka.testkit.DefaultTimeout
|
||||
|
|
@ -87,10 +88,15 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
|
|||
runOn(first) { info("first, roles: " + cluster.selfRoles) }
|
||||
runOn(second) { info("second, roles: " + cluster.selfRoles) }
|
||||
runOn(third) { info("third, roles: " + cluster.selfRoles) }
|
||||
|
||||
// routees for the group routers
|
||||
system.actorOf(Props(classOf[SomeActor], GroupRoutee), "foo")
|
||||
system.actorOf(Props(classOf[SomeActor], GroupRoutee), "bar")
|
||||
|
||||
enterBarrier("after-1")
|
||||
}
|
||||
|
||||
"local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in {
|
||||
"pool local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
val role = Some("b")
|
||||
|
|
@ -119,7 +125,36 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
|
|||
enterBarrier("after-2")
|
||||
}
|
||||
|
||||
"local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in {
|
||||
"group local: off, roles: off, 6 => 0,2,2" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
val role = Some("b")
|
||||
|
||||
val router = system.actorOf(ClusterRouterGroup(
|
||||
RoundRobinGroup(paths = Nil),
|
||||
ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"),
|
||||
allowLocalRoutees = false, useRole = role)).props,
|
||||
"router-2b")
|
||||
|
||||
awaitAssert(currentRoutees(router).size should ===(4))
|
||||
|
||||
val iterationCount = 10
|
||||
for (i ← 0 until iterationCount) {
|
||||
router ! s"hit-$i"
|
||||
}
|
||||
|
||||
val replies = receiveReplies(GroupRoutee, iterationCount)
|
||||
|
||||
replies(first) should ===(0) // should not be deployed locally, does not have required role
|
||||
replies(second) should be > 0
|
||||
replies(third) should be > 0
|
||||
replies.values.sum should ===(iterationCount)
|
||||
}
|
||||
|
||||
enterBarrier("after-2b")
|
||||
}
|
||||
|
||||
"pool local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
val role = Some("b")
|
||||
|
|
@ -148,7 +183,36 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
|
|||
enterBarrier("after-3")
|
||||
}
|
||||
|
||||
"local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in {
|
||||
"group local: on, role: b, 6 => 0,2,2" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
val role = Some("b")
|
||||
|
||||
val router = system.actorOf(ClusterRouterGroup(
|
||||
RoundRobinGroup(paths = Nil),
|
||||
ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"),
|
||||
allowLocalRoutees = true, useRole = role)).props,
|
||||
"router-3b")
|
||||
|
||||
awaitAssert(currentRoutees(router).size should ===(4))
|
||||
|
||||
val iterationCount = 10
|
||||
for (i ← 0 until iterationCount) {
|
||||
router ! s"hit-$i"
|
||||
}
|
||||
|
||||
val replies = receiveReplies(GroupRoutee, iterationCount)
|
||||
|
||||
replies(first) should ===(0) // should not be deployed locally, does not have required role
|
||||
replies(second) should be > 0
|
||||
replies(third) should be > 0
|
||||
replies.values.sum should ===(iterationCount)
|
||||
}
|
||||
|
||||
enterBarrier("after-3b")
|
||||
}
|
||||
|
||||
"pool local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
val role = Some("a")
|
||||
|
|
@ -177,7 +241,36 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
|
|||
enterBarrier("after-4")
|
||||
}
|
||||
|
||||
"local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in {
|
||||
"group local: on, role: a, 6 => 2,0,0" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
val role = Some("a")
|
||||
|
||||
val router = system.actorOf(ClusterRouterGroup(
|
||||
RoundRobinGroup(paths = Nil),
|
||||
ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"),
|
||||
allowLocalRoutees = true, useRole = role)).props,
|
||||
"router-4b")
|
||||
|
||||
awaitAssert(currentRoutees(router).size should ===(2))
|
||||
|
||||
val iterationCount = 10
|
||||
for (i ← 0 until iterationCount) {
|
||||
router ! s"hit-$i"
|
||||
}
|
||||
|
||||
val replies = receiveReplies(GroupRoutee, iterationCount)
|
||||
|
||||
replies(first) should be > 0
|
||||
replies(second) should ===(0)
|
||||
replies(third) should ===(0)
|
||||
replies.values.sum should ===(iterationCount)
|
||||
}
|
||||
|
||||
enterBarrier("after-4b")
|
||||
}
|
||||
|
||||
"pool local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
val role = Some("c")
|
||||
|
|
@ -206,5 +299,34 @@ abstract class UseRoleIgnoredSpec extends MultiNodeSpec(UseRoleIgnoredMultiJvmSp
|
|||
enterBarrier("after-5")
|
||||
}
|
||||
|
||||
"group local: on, role: c, 6 => 2,2,2" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
val role = Some("c")
|
||||
|
||||
val router = system.actorOf(ClusterRouterGroup(
|
||||
RoundRobinGroup(paths = Nil),
|
||||
ClusterRouterGroupSettings(totalInstances = 6, routeesPaths = List("/user/foo", "/user/bar"),
|
||||
allowLocalRoutees = true, useRole = role)).props,
|
||||
"router-5b")
|
||||
|
||||
awaitAssert(currentRoutees(router).size should ===(6))
|
||||
|
||||
val iterationCount = 10
|
||||
for (i ← 0 until iterationCount) {
|
||||
router ! s"hit-$i"
|
||||
}
|
||||
|
||||
val replies = receiveReplies(GroupRoutee, iterationCount)
|
||||
|
||||
replies(first) should be > 0
|
||||
replies(second) should be > 0
|
||||
replies(third) should be > 0
|
||||
replies.values.sum should ===(iterationCount)
|
||||
}
|
||||
|
||||
enterBarrier("after-5b")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ public class RedundancyGroup extends GroupBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public java.lang.Iterable<String> getPaths() {
|
||||
public java.lang.Iterable<String> getPaths(ActorSystem system) {
|
||||
return paths;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -147,6 +147,14 @@ In order to make cluster routers smarter about when they can start local routees
|
|||
In case you have implemented a custom Pool you will have to update the method's signature,
|
||||
however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic.
|
||||
|
||||
Group routers paths method now takes ActorSystem
|
||||
===============================================
|
||||
|
||||
In order to make cluster routers smarter about when they can start local routees,
|
||||
``paths`` defined on ``Group`` now takes ``ActorSystem`` as an argument.
|
||||
In case you have implemented a custom Group you will have to update the method's signature,
|
||||
however the implementation can remain the same if you don't need to rely on an ActorSystem in your logic.
|
||||
|
||||
Logger names use full class name
|
||||
================================
|
||||
Previously, few places in akka used "simple" logger names, such as ``Cluster`` or ``Remoting``.
|
||||
|
|
|
|||
|
|
@ -77,12 +77,14 @@ import akka.routing.Router
|
|||
import akka.japi.Util.immutableSeq
|
||||
import com.typesafe.config.Config
|
||||
|
||||
final case class RedundancyGroup(override val paths: immutable.Iterable[String], nbrCopies: Int) extends Group {
|
||||
final case class RedundancyGroup(routeePaths: immutable.Iterable[String], nbrCopies: Int) extends Group {
|
||||
|
||||
def this(config: Config) = this(
|
||||
paths = immutableSeq(config.getStringList("routees.paths")),
|
||||
routeePaths = immutableSeq(config.getStringList("routees.paths")),
|
||||
nbrCopies = config.getInt("nbr-copies"))
|
||||
|
||||
override def paths(system: ActorSystem): immutable.Iterable[String] = routeePaths
|
||||
|
||||
override def createRouter(system: ActorSystem): Router =
|
||||
new Router(new RedundancyRoutingLogic(nbrCopies))
|
||||
|
||||
|
|
|
|||
|
|
@ -417,10 +417,15 @@ object MiMa extends AutoPlugin {
|
|||
ProblemFilters.exclude[MissingMethodProblem]("akka.remote.testkit.MultiNodeSpec#Replacement.akka$remote$testkit$MultiNodeSpec$Replacement$$$outer"),
|
||||
|
||||
|
||||
// method nrOfInstances(akka.actor.ActorSystem)Int in trait akka.routing.Pool does not have a correspondent in old version
|
||||
// method nrOfInstances(akka.actor.ActorSystem) in trait akka.routing.Pool does not have a correspondent in old version
|
||||
// ok to exclude, since we don't call nrOfInstances(sys) for old implementations
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.routing.Pool.nrOfInstances"),
|
||||
|
||||
// method paths(akka.actor.ActorSystem) in trait akka.routing.Group does not have a correspondent in old version
|
||||
// ok to exclude, since we don't call paths(sys) for old implementations
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.routing.Group.paths"),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.routing.GroupBase.getPaths"),
|
||||
|
||||
// removed deprecated
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.actor.UntypedActorFactory"),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("akka.util.Timeout.longToTimeout"),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue