+clu #3627 Cluster router group with multiple paths per node

* Use the ordinary routees.paths config property instead of
  cluster.routees-path
* Backwards compatible in deprecation phase
This commit is contained in:
Patrik Nordwall 2013-10-16 11:06:38 +02:00
parent c2faa48351
commit 402674ce10
19 changed files with 168 additions and 103 deletions

View file

@ -40,12 +40,13 @@ import akka.routing.RoutingLogic
import akka.actor.RelativeActorPath
import com.typesafe.config.Config
import akka.routing.DeprecatedRouterConfig
import akka.japi.Util.immutableSeq
object ClusterRouterGroupSettings {
def fromConfig(config: Config): ClusterRouterGroupSettings =
ClusterRouterGroupSettings(
totalInstances = config.getInt("nr-of-instances"),
routeesPath = config.getString("cluster.routees-path"),
routeesPaths = immutableSeq(config.getStringList("routees.paths")),
allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"),
useRole = ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role")))
}
@ -56,26 +57,40 @@ object ClusterRouterGroupSettings {
@SerialVersionUID(1L)
case class ClusterRouterGroupSettings(
totalInstances: Int,
routeesPath: String,
routeesPaths: immutable.Seq[String],
allowLocalRoutees: Boolean,
useRole: Option[String]) extends ClusterRouterSettingsBase {
@deprecated("Use constructor with routeesPaths Seq", "2.3")
def this(
totalInstances: Int,
routeesPath: String,
allowLocalRoutees: Boolean,
useRole: Option[String]) =
this(totalInstances, List(routeesPath), allowLocalRoutees, useRole)
/**
* Java API
*/
def this(totalInstances: Int, routeesPaths: java.lang.Iterable[String], allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, immutableSeq(routeesPaths), allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole))
/**
* Java API
*/
@deprecated("Use constructor with routeesPaths Iterable", "2.3")
def this(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, routeesPath, allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole))
if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0")
if (!isRouteesPathDefined) throw new IllegalArgumentException("routeesPath must be defined")
if ((routeesPaths eq null) || routeesPaths.isEmpty || routeesPaths.head == "")
throw new IllegalArgumentException("routeesPaths must be defined")
routeesPath match {
routeesPaths.foreach(p p match {
case RelativeActorPath(elements) // good
case _
throw new IllegalArgumentException("routeesPath [%s] is not a valid relative actor path" format routeesPath)
}
def isRouteesPathDefined: Boolean = (routeesPath ne null) && routeesPath != ""
throw new IllegalArgumentException(s"routeesPaths [$p] is not a valid relative actor path")
})
}
@ -140,9 +155,7 @@ private[akka] trait ClusterRouterSettingsBase {
@SerialVersionUID(1L)
final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings) extends Group with ClusterRouterConfigBase {
require(settings.routeesPath.nonEmpty, "routeesPath must be defined")
override def paths: immutable.Iterable[String] = if (settings.allowLocalRoutees) List(settings.routeesPath) else Nil
override def paths: immutable.Iterable[String] = if (settings.allowLocalRoutees) settings.routeesPaths else Nil
/**
* INTERNAL API
@ -259,7 +272,23 @@ private[akka] class ClusterRouterPoolActor(
doAddRoutees()
}
override def maxInstancesPerNode: Int = settings.maxInstancesPerNode
def selectDeploymentTarget: Option[Address] = {
val currentRoutees = cell.router.routees
val currentNodes = availableNodes
if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) {
None
} else {
// find the node with least routees
val numberOfRouteesPerNode: Map[Address, Int] =
currentRoutees.foldLeft(currentNodes.map(_ -> 0).toMap.withDefaultValue(0)) { (acc, x)
val address = fullAddress(x)
acc + (address -> (acc(address) + 1))
}
val (address, count) = numberOfRouteesPerNode.minBy(_._2)
if (count < settings.maxInstancesPerNode) Some(address) else None
}
}
}
@ -277,6 +306,12 @@ private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSett
override def receive = clusterReceive orElse super.receive
var usedRouteePaths: Map[Address, Set[String]] =
if (settings.allowLocalRoutees)
Map(cluster.selfAddress -> settings.routeesPaths.toSet)
else
Map.empty
/**
* Adds routees based on totalInstances and maxInstancesPerNode settings
*/
@ -284,8 +319,9 @@ private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSett
@tailrec
def doAddRoutees(): Unit = selectDeploymentTarget match {
case None // done
case Some(target)
val routee = group.routeeFor(target + settings.routeesPath, context)
case Some((address, path))
val routee = group.routeeFor(address + path, context)
usedRouteePaths = usedRouteePaths.updated(address, usedRouteePaths.getOrElse(address, Set.empty) + path)
// must register each one, since registered routees are used in selectDeploymentTarget
cell.addRoutee(routee)
@ -296,8 +332,28 @@ private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSett
doAddRoutees()
}
override def maxInstancesPerNode: Int = 1
def selectDeploymentTarget: Option[(Address, String)] = {
val currentRoutees = cell.router.routees
val currentNodes = availableNodes
if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) {
None
} else {
// find the node with least routees
val unusedNodes = currentNodes filterNot usedRouteePaths.contains
if (unusedNodes.nonEmpty) {
Some((unusedNodes.head, settings.routeesPaths.head))
} else {
val (address, used) = usedRouteePaths.minBy { case (address, used) used.size }
// pick next of the unused paths
settings.routeesPaths.collectFirst { case p if !used.contains(p) (address, p) }
}
}
}
override def removeMember(member: Member): Unit = {
usedRouteePaths -= member.address
super.removeMember(member)
}
}
/**
@ -364,36 +420,16 @@ private[akka] trait ClusterRouterActor { this: RouterActor ⇒
}
/**
* Adds routees based on totalInstances and maxInstancesPerNode settings
* Adds routees based on settings
*/
def addRoutees(): Unit
def maxInstancesPerNode: Int
def selectDeploymentTarget: Option[Address] = {
val currentRoutees = cell.router.routees
val currentNodes = availableNodes
if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) {
None
} else {
// find the node with least routees
val numberOfRouteesPerNode: Map[Address, Int] =
currentRoutees.foldLeft(currentNodes.map(_ -> 0).toMap.withDefaultValue(0)) { (acc, x)
val address = fullAddress(x)
acc + (address -> (acc(address) + 1))
}
val (address, count) = numberOfRouteesPerNode.minBy(_._2)
if (count < maxInstancesPerNode) Some(address) else None
}
}
def addMember(member: Member) = {
def addMember(member: Member): Unit = {
nodes += member.address
addRoutees()
}
def removeMember(member: Member) = {
def removeMember(member: Member): Unit = {
val address = member.address
nodes -= address