diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index e2d8b8b204..c99ae68704 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -32,6 +32,7 @@ import akka.routing.RemoteRouterConfig import akka.actor.RootActorPath import akka.actor.ActorCell import akka.actor.RelativeActorPath +import scala.annotation.tailrec /** * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. @@ -168,18 +169,26 @@ private[akka] class ClusterRouteeProvider( * to use for cluster routers. */ override def createRoutees(nrOfInstances: Int): Unit = { - for (i ← 1 to settings.totalInstances; target ← selectDeploymentTarget) { - val ref = - if (settings.isRouteesPathDefined) { - context.actorFor(RootActorPath(target) / settings.routeesPathElements) - } else { - val name = "c" + childNameCounter.incrementAndGet - val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) - context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false) - } - // must register each one, since registered routees are used in selectDeploymentTarget - registerRoutees(Some(ref)) + @tailrec + def doCreateRoutees(): Unit = selectDeploymentTarget match { + case None ⇒ // done + case Some(target) ⇒ + val ref = + if (settings.isRouteesPathDefined) { + context.actorFor(RootActorPath(target) / settings.routeesPathElements) + } else { + val name = "c" + childNameCounter.incrementAndGet + val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) + context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false) + } + // must register each one, since registered routees are used in selectDeploymentTarget + registerRoutees(Some(ref)) + + // recursion until all created + doCreateRoutees() } + + doCreateRoutees() } private[routing] def createRoutees(): Unit = createRoutees(settings.totalInstances) @@ -187,14 +196,18 @@ private[akka] class ClusterRouteeProvider( private def selectDeploymentTarget: Option[Address] = { val currentRoutees = routees val currentNodes = availbleNodes - if (currentRoutees.size >= settings.totalInstances || currentNodes.isEmpty) { + if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) { None } else { + // find the node with least routees val numberOfRouteesPerNode: Map[Address, Int] = - Map.empty[Address, Int] ++ currentNodes.toSeq.map(_ -> 0) ++ - currentRoutees.groupBy(fullAddress).map { - case (address, refs) ⇒ address -> refs.size + currentRoutees.foldLeft(currentNodes.map(_ -> 0).toMap) { (acc, x) ⇒ + val address = fullAddress(x) + acc.get(address) match { + case Some(count) ⇒ acc + (address -> (count + 1)) + case None ⇒ acc + (address -> 1) } + } val (address, count) = numberOfRouteesPerNode.minBy(_._2) if (count < settings.maxInstancesPerNode) Some(address) else None