2012-08-28 08:36:14 +02:00
|
|
|
/**
|
2013-01-09 01:47:48 +01:00
|
|
|
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
2012-08-28 08:36:14 +02:00
|
|
|
*/
|
|
|
|
|
package akka.cluster.routing
|
|
|
|
|
|
2012-08-30 13:52:47 +02:00
|
|
|
import java.lang.IllegalStateException
|
2012-08-28 08:36:14 +02:00
|
|
|
import java.util.concurrent.atomic.AtomicInteger
|
2012-11-07 16:35:14 +01:00
|
|
|
import scala.collection.immutable
|
2012-08-28 08:36:14 +02:00
|
|
|
import com.typesafe.config.ConfigFactory
|
|
|
|
|
import akka.ConfigurationException
|
2012-09-07 10:02:05 +02:00
|
|
|
import akka.actor.ActorContext
|
2012-08-28 08:36:14 +02:00
|
|
|
import akka.actor.ActorRef
|
|
|
|
|
import akka.actor.Address
|
|
|
|
|
import akka.actor.Deploy
|
|
|
|
|
import akka.actor.Props
|
|
|
|
|
import akka.actor.SupervisorStrategy
|
|
|
|
|
import akka.cluster.Cluster
|
|
|
|
|
import akka.cluster.ClusterEvent._
|
|
|
|
|
import akka.cluster.Member
|
|
|
|
|
import akka.cluster.MemberStatus
|
|
|
|
|
import akka.remote.RemoteScope
|
2012-08-30 13:52:47 +02:00
|
|
|
import akka.routing.Destination
|
2012-08-28 08:36:14 +02:00
|
|
|
import akka.routing.Resizer
|
|
|
|
|
import akka.routing.Route
|
|
|
|
|
import akka.routing.RouteeProvider
|
|
|
|
|
import akka.routing.Router
|
|
|
|
|
import akka.routing.RouterConfig
|
2012-09-17 12:54:08 +02:00
|
|
|
import akka.remote.routing.RemoteRouterConfig
|
2012-09-07 14:54:53 +02:00
|
|
|
import akka.actor.RootActorPath
|
2012-09-10 13:13:59 +02:00
|
|
|
import akka.actor.ActorCell
|
2012-09-10 14:41:51 +02:00
|
|
|
import akka.actor.RelativeActorPath
|
2012-09-17 08:42:47 +02:00
|
|
|
import scala.annotation.tailrec
|
2012-08-28 08:36:14 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes.
|
|
|
|
|
* Delegates other duties to the local [[akka.routing.RouterConfig]],
|
|
|
|
|
* which makes it possible to mix this with the built-in routers such as
|
|
|
|
|
* [[akka.routing.RoundRobinRouter]] or custom routers.
|
|
|
|
|
*/
|
2012-09-11 19:11:20 +02:00
|
|
|
@SerialVersionUID(1L)
|
|
|
|
|
final case class ClusterRouterConfig(local: RouterConfig, settings: ClusterRouterSettings) extends RouterConfig {
|
2012-08-28 08:36:14 +02:00
|
|
|
|
2012-09-07 10:02:05 +02:00
|
|
|
override def createRouteeProvider(context: ActorContext, routeeProps: Props) =
|
2012-09-07 12:07:41 +02:00
|
|
|
new ClusterRouteeProvider(context, routeeProps, resizer, settings)
|
2012-08-28 08:36:14 +02:00
|
|
|
|
2012-08-30 12:13:50 +02:00
|
|
|
override def createRoute(routeeProvider: RouteeProvider): Route = {
|
|
|
|
|
val localRoute = local.createRoute(routeeProvider)
|
|
|
|
|
|
|
|
|
|
// Intercept ClusterDomainEvent and route them to the ClusterRouterActor
|
|
|
|
|
({
|
2012-11-07 16:35:14 +01:00
|
|
|
case (sender, message: ClusterDomainEvent) ⇒ List(Destination(sender, routeeProvider.context.self))
|
2012-08-30 12:13:50 +02:00
|
|
|
}: Route) orElse localRoute
|
|
|
|
|
}
|
2012-08-28 08:36:14 +02:00
|
|
|
|
2012-08-30 12:13:50 +02:00
|
|
|
override def createActor(): Router = new ClusterRouterActor
|
2012-08-28 08:36:14 +02:00
|
|
|
|
|
|
|
|
override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy
|
|
|
|
|
|
|
|
|
|
override def routerDispatcher: String = local.routerDispatcher
|
|
|
|
|
|
|
|
|
|
override def resizer: Option[Resizer] = local.resizer
|
|
|
|
|
|
|
|
|
|
override def withFallback(other: RouterConfig): RouterConfig = other match {
|
2012-09-07 12:07:41 +02:00
|
|
|
case ClusterRouterConfig(_: RemoteRouterConfig, _) ⇒ throw new IllegalStateException(
|
|
|
|
|
"ClusterRouterConfig is not allowed to wrap a RemoteRouterConfig")
|
|
|
|
|
case ClusterRouterConfig(_: ClusterRouterConfig, _) ⇒ throw new IllegalStateException(
|
|
|
|
|
"ClusterRouterConfig is not allowed to wrap a ClusterRouterConfig")
|
|
|
|
|
case ClusterRouterConfig(local, _) ⇒ copy(local = this.local.withFallback(local))
|
|
|
|
|
case _ ⇒ copy(local = this.local.withFallback(other))
|
2012-08-28 08:36:14 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-09-10 14:11:03 +02:00
|
|
|
object ClusterRouterSettings {
|
|
|
|
|
/**
|
|
|
|
|
* Settings for create and deploy of the routees
|
|
|
|
|
*/
|
2013-03-14 20:32:43 +01:00
|
|
|
def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterSettings =
|
|
|
|
|
new ClusterRouterSettings(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees, useRole)
|
2012-09-10 14:11:03 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Settings for remote deployment of the routees, allowed to use routees on own node
|
|
|
|
|
*/
|
2013-03-14 20:32:43 +01:00
|
|
|
def apply(totalInstances: Int, maxInstancesPerNode: Int, useRole: Option[String]): ClusterRouterSettings =
|
|
|
|
|
apply(totalInstances, maxInstancesPerNode, allowLocalRoutees = true, useRole)
|
2012-09-10 14:11:03 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Settings for lookup of the routees
|
|
|
|
|
*/
|
2013-03-14 20:32:43 +01:00
|
|
|
def apply(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterSettings =
|
|
|
|
|
new ClusterRouterSettings(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees, useRole)
|
2012-09-10 14:11:03 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Settings for lookup of the routees, allowed to use routees on own node
|
|
|
|
|
*/
|
2013-03-14 20:32:43 +01:00
|
|
|
def apply(totalInstances: Int, routeesPath: String, useRole: Option[String]): ClusterRouterSettings =
|
|
|
|
|
apply(totalInstances, routeesPath, allowLocalRoutees = true, useRole)
|
|
|
|
|
|
|
|
|
|
def useRoleOption(role: String): Option[String] = role match {
|
|
|
|
|
case null | "" ⇒ None
|
|
|
|
|
case _ ⇒ Some(role)
|
|
|
|
|
}
|
2012-09-10 14:11:03 +02:00
|
|
|
}
|
|
|
|
|
|
2012-09-11 19:11:20 +02:00
|
|
|
/**
|
|
|
|
|
* `totalInstances` of cluster router must be > 0
|
|
|
|
|
* `maxInstancesPerNode` of cluster router must be > 0
|
|
|
|
|
* `maxInstancesPerNode` of cluster router must be 1 when routeesPath is defined
|
|
|
|
|
*/
|
|
|
|
|
@SerialVersionUID(1L)
|
2012-09-10 14:11:03 +02:00
|
|
|
case class ClusterRouterSettings private[akka] (
|
2012-09-07 14:54:53 +02:00
|
|
|
totalInstances: Int,
|
2012-09-10 14:11:03 +02:00
|
|
|
maxInstancesPerNode: Int,
|
|
|
|
|
routeesPath: String,
|
2013-03-14 20:32:43 +01:00
|
|
|
allowLocalRoutees: Boolean,
|
|
|
|
|
useRole: Option[String]) {
|
2012-09-10 14:11:03 +02:00
|
|
|
|
|
|
|
|
/**
|
2013-03-07 09:05:55 +01:00
|
|
|
* Java API: Settings for create and deploy of the routees
|
2012-09-10 14:11:03 +02:00
|
|
|
*/
|
2013-03-14 20:32:43 +01:00
|
|
|
def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: String) =
|
|
|
|
|
this(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees,
|
|
|
|
|
ClusterRouterSettings.useRoleOption(useRole))
|
2012-09-10 14:11:03 +02:00
|
|
|
|
|
|
|
|
/**
|
2013-03-07 09:05:55 +01:00
|
|
|
* Java API: Settings for lookup of the routees
|
2012-09-10 14:11:03 +02:00
|
|
|
*/
|
2013-03-14 20:32:43 +01:00
|
|
|
def this(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: String) =
|
|
|
|
|
this(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees,
|
|
|
|
|
ClusterRouterSettings.useRoleOption(useRole))
|
2012-09-08 17:30:42 +02:00
|
|
|
|
|
|
|
|
if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0")
|
|
|
|
|
if (maxInstancesPerNode <= 0) throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be > 0")
|
|
|
|
|
if (isRouteesPathDefined && maxInstancesPerNode != 1)
|
|
|
|
|
throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be 1 when routeesPath is defined")
|
2012-09-07 14:54:53 +02:00
|
|
|
|
2012-11-15 12:33:11 +01:00
|
|
|
val routeesPathElements: immutable.Iterable[String] = routeesPath match {
|
2012-09-10 14:41:51 +02:00
|
|
|
case RelativeActorPath(elements) ⇒ elements
|
|
|
|
|
case _ ⇒
|
|
|
|
|
throw new IllegalArgumentException("routeesPath [%s] is not a valid relative actor path" format routeesPath)
|
|
|
|
|
}
|
|
|
|
|
|
2012-09-11 19:11:20 +02:00
|
|
|
def isRouteesPathDefined: Boolean = (routeesPath ne null) && routeesPath != ""
|
2012-09-10 14:41:51 +02:00
|
|
|
|
2012-09-07 14:54:53 +02:00
|
|
|
}
|
2012-09-07 12:07:41 +02:00
|
|
|
|
2012-08-28 08:36:14 +02:00
|
|
|
/**
|
2012-08-30 12:13:50 +02:00
|
|
|
* INTERNAL API
|
|
|
|
|
*
|
2012-08-28 08:36:14 +02:00
|
|
|
* Factory and registry for routees of the router.
|
|
|
|
|
* Deploys new routees on the cluster nodes.
|
|
|
|
|
*/
|
2012-08-30 12:13:50 +02:00
|
|
|
private[akka] class ClusterRouteeProvider(
|
2012-09-07 10:02:05 +02:00
|
|
|
_context: ActorContext,
|
2012-08-29 19:33:19 +02:00
|
|
|
_routeeProps: Props,
|
|
|
|
|
_resizer: Option[Resizer],
|
2012-09-07 12:07:41 +02:00
|
|
|
settings: ClusterRouterSettings)
|
2012-08-29 19:33:19 +02:00
|
|
|
extends RouteeProvider(_context, _routeeProps, _resizer) {
|
2012-08-28 08:36:14 +02:00
|
|
|
|
|
|
|
|
// need this counter as instance variable since Resizer may call createRoutees several times
|
|
|
|
|
private val childNameCounter = new AtomicInteger
|
|
|
|
|
|
2012-11-07 16:35:14 +01:00
|
|
|
override def registerRouteesFor(paths: immutable.Iterable[String]): Unit =
|
2012-08-28 20:54:16 +02:00
|
|
|
throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]"
|
|
|
|
|
format context.self.path.toString)
|
|
|
|
|
|
2012-08-30 10:06:12 +02:00
|
|
|
/**
|
|
|
|
|
* Note that nrOfInstances is ignored for cluster routers, instead
|
|
|
|
|
* the `totalInstances` parameter is used. That is the same when
|
|
|
|
|
* using config to define `nr-of-instances`, but when defining the
|
|
|
|
|
* router programatically or using [[akka.routing.Resizer]] they
|
|
|
|
|
* might be different. `totalInstances` is the relevant parameter
|
|
|
|
|
* to use for cluster routers.
|
|
|
|
|
*/
|
2012-08-29 19:33:19 +02:00
|
|
|
override def createRoutees(nrOfInstances: Int): Unit = {
|
2012-09-17 08:42:47 +02:00
|
|
|
@tailrec
|
|
|
|
|
def doCreateRoutees(): Unit = selectDeploymentTarget match {
|
|
|
|
|
case None ⇒ // done
|
|
|
|
|
case Some(target) ⇒
|
|
|
|
|
val ref =
|
|
|
|
|
if (settings.isRouteesPathDefined) {
|
|
|
|
|
context.actorFor(RootActorPath(target) / settings.routeesPathElements)
|
|
|
|
|
} else {
|
|
|
|
|
val name = "c" + childNameCounter.incrementAndGet
|
2012-09-20 08:50:12 +02:00
|
|
|
val deploy = Deploy(config = ConfigFactory.empty(), routerConfig = routeeProps.routerConfig,
|
|
|
|
|
scope = RemoteScope(target))
|
2012-09-17 08:42:47 +02:00
|
|
|
context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false)
|
|
|
|
|
}
|
|
|
|
|
// must register each one, since registered routees are used in selectDeploymentTarget
|
2012-11-07 16:35:14 +01:00
|
|
|
registerRoutees(List(ref))
|
2012-09-17 08:42:47 +02:00
|
|
|
|
|
|
|
|
// recursion until all created
|
|
|
|
|
doCreateRoutees()
|
2012-08-29 19:33:19 +02:00
|
|
|
}
|
2012-09-17 08:42:47 +02:00
|
|
|
|
|
|
|
|
doCreateRoutees()
|
2012-08-29 19:33:19 +02:00
|
|
|
}
|
|
|
|
|
|
2012-09-07 12:07:41 +02:00
|
|
|
private[routing] def createRoutees(): Unit = createRoutees(settings.totalInstances)
|
2012-08-30 12:13:50 +02:00
|
|
|
|
2012-08-29 19:33:19 +02:00
|
|
|
private def selectDeploymentTarget: Option[Address] = {
|
|
|
|
|
val currentRoutees = routees
|
2012-10-10 09:58:18 -06:00
|
|
|
val currentNodes = availableNodes
|
2012-09-17 08:42:47 +02:00
|
|
|
if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) {
|
2012-08-29 19:33:19 +02:00
|
|
|
None
|
2012-08-28 08:36:14 +02:00
|
|
|
} else {
|
2012-09-17 08:42:47 +02:00
|
|
|
// find the node with least routees
|
2012-08-29 19:33:19 +02:00
|
|
|
val numberOfRouteesPerNode: Map[Address, Int] =
|
2012-10-30 09:28:45 +01:00
|
|
|
currentRoutees.foldLeft(currentNodes.map(_ -> 0).toMap.withDefaultValue(0)) { (acc, x) ⇒
|
2012-09-17 08:42:47 +02:00
|
|
|
val address = fullAddress(x)
|
2012-09-20 09:24:06 +02:00
|
|
|
acc + (address -> (acc(address) + 1))
|
2012-09-17 08:42:47 +02:00
|
|
|
}
|
2012-08-29 19:33:19 +02:00
|
|
|
|
|
|
|
|
val (address, count) = numberOfRouteesPerNode.minBy(_._2)
|
2012-09-07 12:07:41 +02:00
|
|
|
if (count < settings.maxInstancesPerNode) Some(address) else None
|
2012-08-28 08:36:14 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-08-30 12:13:50 +02:00
|
|
|
private[routing] def cluster: Cluster = Cluster(context.system)
|
|
|
|
|
|
2012-08-29 19:33:19 +02:00
|
|
|
/**
|
|
|
|
|
* Fills in self address for local ActorRef
|
|
|
|
|
*/
|
2012-08-30 12:13:50 +02:00
|
|
|
private[routing] def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
|
2012-08-29 19:33:19 +02:00
|
|
|
case Address(_, _, None, None) ⇒ cluster.selfAddress
|
|
|
|
|
case a ⇒ a
|
|
|
|
|
}
|
|
|
|
|
|
2012-11-07 16:35:14 +01:00
|
|
|
private[routing] def availableNodes: immutable.SortedSet[Address] = {
|
2012-09-10 14:41:51 +02:00
|
|
|
import Member.addressOrdering
|
|
|
|
|
val currentNodes = nodes
|
2013-03-14 20:32:43 +01:00
|
|
|
if (currentNodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles))
|
2012-09-10 14:41:51 +02:00
|
|
|
//use my own node, cluster information not updated yet
|
2012-11-07 16:35:14 +01:00
|
|
|
immutable.SortedSet(cluster.selfAddress)
|
2012-09-10 14:41:51 +02:00
|
|
|
else
|
|
|
|
|
currentNodes
|
|
|
|
|
}
|
|
|
|
|
|
2012-08-28 08:36:14 +02:00
|
|
|
@volatile
|
2012-11-07 16:35:14 +01:00
|
|
|
private[routing] var nodes: immutable.SortedSet[Address] = {
|
2012-09-10 14:41:51 +02:00
|
|
|
import Member.addressOrdering
|
|
|
|
|
cluster.readView.members.collect {
|
2012-10-10 10:05:41 -06:00
|
|
|
case m if isAvailable(m) ⇒ m.address
|
2012-09-10 14:41:51 +02:00
|
|
|
}
|
2012-09-07 12:07:41 +02:00
|
|
|
}
|
|
|
|
|
|
2012-11-07 16:35:14 +01:00
|
|
|
private[routing] def isAvailable(m: Member): Boolean =
|
2013-03-14 20:32:43 +01:00
|
|
|
m.status == MemberStatus.Up &&
|
|
|
|
|
satisfiesRole(m.roles) &&
|
|
|
|
|
(settings.allowLocalRoutees || m.address != cluster.selfAddress)
|
|
|
|
|
|
|
|
|
|
private def satisfiesRole(memberRoles: Set[String]): Boolean = settings.useRole match {
|
|
|
|
|
case None ⇒ true
|
|
|
|
|
case Some(r) ⇒ memberRoles.contains(r)
|
|
|
|
|
}
|
2012-08-28 08:36:14 +02:00
|
|
|
|
2012-08-30 12:13:50 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
* The router actor, subscribes to cluster events.
|
|
|
|
|
*/
|
|
|
|
|
private[akka] class ClusterRouterActor extends Router {
|
|
|
|
|
|
2012-09-10 14:41:51 +02:00
|
|
|
// re-subscribe when restart
|
2012-11-27 18:07:37 +01:00
|
|
|
override def preStart(): Unit = {
|
|
|
|
|
cluster.subscribe(self, classOf[MemberEvent])
|
|
|
|
|
cluster.subscribe(self, classOf[UnreachableMember])
|
|
|
|
|
}
|
2012-08-30 12:13:50 +02:00
|
|
|
override def postStop(): Unit = cluster.unsubscribe(self)
|
2012-08-28 08:36:14 +02:00
|
|
|
|
2012-08-30 12:13:50 +02:00
|
|
|
// lazy to not interfere with RoutedActorCell initialization
|
|
|
|
|
lazy val routeeProvider: ClusterRouteeProvider = ref.routeeProvider match {
|
|
|
|
|
case x: ClusterRouteeProvider ⇒ x
|
2012-09-11 19:11:20 +02:00
|
|
|
case _ ⇒ throw new IllegalStateException(
|
|
|
|
|
"ClusterRouteeProvider must be used together with [%s]".format(getClass))
|
2012-08-30 12:13:50 +02:00
|
|
|
}
|
2012-08-28 08:36:14 +02:00
|
|
|
|
2012-08-30 12:13:50 +02:00
|
|
|
def cluster: Cluster = routeeProvider.cluster
|
2012-08-28 08:36:14 +02:00
|
|
|
|
2012-08-30 12:13:50 +02:00
|
|
|
def fullAddress(actorRef: ActorRef): Address = routeeProvider.fullAddress(actorRef)
|
2012-08-30 10:54:58 +02:00
|
|
|
|
2012-11-27 18:07:37 +01:00
|
|
|
def unregisterRoutees(member: Member) = {
|
|
|
|
|
val address = member.address
|
|
|
|
|
routeeProvider.nodes -= address
|
|
|
|
|
|
|
|
|
|
// unregister routees that live on that node
|
|
|
|
|
val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address)
|
|
|
|
|
routeeProvider.unregisterRoutees(affectedRoutes)
|
|
|
|
|
|
|
|
|
|
// createRoutees will not create more than createRoutees and maxInstancesPerNode
|
|
|
|
|
// this is useful when totalInstances < upNodes.size
|
|
|
|
|
routeeProvider.createRoutees()
|
|
|
|
|
}
|
|
|
|
|
|
2012-08-30 12:13:50 +02:00
|
|
|
override def routerReceive: Receive = {
|
|
|
|
|
case s: CurrentClusterState ⇒
|
|
|
|
|
import Member.addressOrdering
|
2012-10-10 10:05:41 -06:00
|
|
|
routeeProvider.nodes = s.members.collect { case m if routeeProvider.isAvailable(m) ⇒ m.address }
|
2012-08-31 10:44:07 +02:00
|
|
|
routeeProvider.createRoutees()
|
2012-08-28 08:36:14 +02:00
|
|
|
|
2012-10-10 10:05:41 -06:00
|
|
|
case m: MemberEvent if routeeProvider.isAvailable(m.member) ⇒
|
2012-09-10 14:41:51 +02:00
|
|
|
routeeProvider.nodes += m.member.address
|
2012-08-30 12:13:50 +02:00
|
|
|
// createRoutees will create routees based on
|
|
|
|
|
// totalInstances and maxInstancesPerNode
|
|
|
|
|
routeeProvider.createRoutees()
|
2012-08-28 08:36:14 +02:00
|
|
|
|
2012-08-30 12:13:50 +02:00
|
|
|
case other: MemberEvent ⇒
|
|
|
|
|
// other events means that it is no longer interesting, such as
|
2012-11-27 18:07:37 +01:00
|
|
|
// MemberJoined, MemberLeft, MemberExited, MemberRemoved
|
|
|
|
|
unregisterRoutees(other.member)
|
2012-08-30 12:13:50 +02:00
|
|
|
|
2012-11-27 18:07:37 +01:00
|
|
|
case UnreachableMember(m) ⇒
|
|
|
|
|
unregisterRoutees(m)
|
2012-08-30 12:13:50 +02:00
|
|
|
}
|
2012-08-28 08:36:14 +02:00
|
|
|
}
|