Improvements based on review comments from √, see #2103
This commit is contained in:
parent
324ef78e53
commit
ac16d567ea
9 changed files with 63 additions and 37 deletions
|
|
@ -39,7 +39,8 @@ import akka.actor.RelativeActorPath
|
|||
* which makes it possible to mix this with the built-in routers such as
|
||||
* [[akka.routing.RoundRobinRouter]] or custom routers.
|
||||
*/
|
||||
case class ClusterRouterConfig(local: RouterConfig, settings: ClusterRouterSettings) extends RouterConfig {
|
||||
@SerialVersionUID(1L)
|
||||
final case class ClusterRouterConfig(local: RouterConfig, settings: ClusterRouterSettings) extends RouterConfig {
|
||||
|
||||
override def createRouteeProvider(context: ActorContext, routeeProps: Props) =
|
||||
new ClusterRouteeProvider(context, routeeProps, resizer, settings)
|
||||
|
|
@ -97,6 +98,12 @@ object ClusterRouterSettings {
|
|||
apply(totalInstances, routeesPath, routeesOnOwnNode = true)
|
||||
}
|
||||
|
||||
/**
|
||||
* `totalInstances` of cluster router must be > 0
|
||||
* `maxInstancesPerNode` of cluster router must be > 0
|
||||
* `maxInstancesPerNode` of cluster router must be 1 when routeesPath is defined
|
||||
*/
|
||||
@SerialVersionUID(1L)
|
||||
case class ClusterRouterSettings private[akka] (
|
||||
totalInstances: Int,
|
||||
maxInstancesPerNode: Int,
|
||||
|
|
@ -128,7 +135,7 @@ case class ClusterRouterSettings private[akka] (
|
|||
throw new IllegalArgumentException("routeesPath [%s] is not a valid relative actor path" format routeesPath)
|
||||
}
|
||||
|
||||
def isRouteesPathDefined: Boolean = ((routeesPath ne null) && routeesPath != "")
|
||||
def isRouteesPathDefined: Boolean = (routeesPath ne null) && routeesPath != ""
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -161,8 +168,6 @@ private[akka] class ClusterRouteeProvider(
|
|||
* to use for cluster routers.
|
||||
*/
|
||||
override def createRoutees(nrOfInstances: Int): Unit = {
|
||||
val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
|
||||
|
||||
for (i ← 1 to settings.totalInstances; target ← selectDeploymentTarget) {
|
||||
val ref =
|
||||
if (settings.isRouteesPathDefined) {
|
||||
|
|
@ -181,12 +186,11 @@ private[akka] class ClusterRouteeProvider(
|
|||
|
||||
private def selectDeploymentTarget: Option[Address] = {
|
||||
val currentRoutees = routees
|
||||
val currentNodes = availbleNodes
|
||||
if (currentRoutees.size >= settings.totalInstances) {
|
||||
None
|
||||
} else {
|
||||
val numberOfRouteesPerNode: Map[Address, Int] =
|
||||
Map.empty[Address, Int] ++ currentNodes.toSeq.map(_ -> 0) ++
|
||||
Map.empty[Address, Int] ++ availbleNodes.toSeq.map(_ -> 0) ++
|
||||
currentRoutees.groupBy(fullAddress).map {
|
||||
case (address, refs) ⇒ address -> refs.size
|
||||
}
|
||||
|
|
@ -244,8 +248,8 @@ private[akka] class ClusterRouterActor extends Router {
|
|||
// lazy to not interfere with RoutedActorCell initialization
|
||||
lazy val routeeProvider: ClusterRouteeProvider = ref.routeeProvider match {
|
||||
case x: ClusterRouteeProvider ⇒ x
|
||||
case _ ⇒
|
||||
throw new IllegalStateException("ClusterRouteeProvider must be used together with [%s]".format(this.getClass))
|
||||
case _ ⇒ throw new IllegalStateException(
|
||||
"ClusterRouteeProvider must be used together with [%s]".format(getClass))
|
||||
}
|
||||
|
||||
def cluster: Cluster = routeeProvider.cluster
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue