!act,rem,clu #3549 Simplify and enhance routers

* Separate routing logic, to be usable stand alone, e.g. in actors
* Simplify RouterConfig, only a factory
* Move reading of config from Deployer to the RouterConfig
* Distiction between Pool and Group router types
* Remove usage of actorFor, use ActorSelection
* Management messages to add and remove routees
* Simplify the internals of RoutedActorCell & co
* Move resize specific code to separate RoutedActorCell subclass
* Change resizer api to only return capacity change
* Resizer only allowed together with Pool
* Re-implement all routers, and keep old api during deprecation phase
* Replace ClusterRouterConfig, deprecation
* Rewrite documentation
* Migration guide
* Also includes related ticket:
  +act #3087 Create nicer Props factories for RouterConfig
This commit is contained in:
Patrik Nordwall 2013-09-19 08:00:05 +02:00
parent 81ca6fe8c8
commit ebadd567b2
104 changed files with 9671 additions and 5006 deletions

View file

@ -3,140 +3,74 @@
*/
package akka.cluster.routing
import java.lang.IllegalStateException
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.immutable
import com.typesafe.config.ConfigFactory
import akka.ConfigurationException
import akka.actor.ActorContext
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Deploy
import akka.routing.RouterConfig
import akka.routing.Router
import akka.actor.Props
import akka.actor.ActorContext
import akka.routing.Routee
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Address
import akka.actor.ActorCell
import akka.actor.Deploy
import com.typesafe.config.ConfigFactory
import akka.routing.ActorRefRoutee
import akka.remote.RemoteScope
import akka.actor.Actor
import akka.actor.SupervisorStrategy
import akka.routing.Resizer
import akka.routing.RouterConfig
import akka.routing.Pool
import akka.routing.Group
import akka.remote.routing.RemoteRouterConfig
import akka.routing.RouterActor
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.actor.ActorRef
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.remote.RemoteScope
import akka.routing.Destination
import akka.routing.Resizer
import akka.routing.Route
import akka.routing.RouteeProvider
import akka.routing.Router
import akka.routing.RouterConfig
import akka.remote.routing.RemoteRouterConfig
import akka.actor.RootActorPath
import akka.actor.ActorCell
import akka.actor.RelativeActorPath
import scala.annotation.tailrec
import akka.actor.RootActorPath
import akka.cluster.MemberStatus
import akka.routing.ActorSelectionRoutee
import akka.actor.ActorInitializationException
import akka.routing.RouterPoolActor
import akka.actor.ActorSystem
import akka.actor.ActorSystem
import akka.routing.RoutingLogic
import akka.actor.RelativeActorPath
import com.typesafe.config.Config
import akka.routing.DeprecatedRouterConfig
/**
* [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes.
* Delegates other duties to the local [[akka.routing.RouterConfig]],
* which makes it possible to mix this with the built-in routers such as
* [[akka.routing.RoundRobinRouter]] or custom routers.
*/
@SerialVersionUID(1L)
final case class ClusterRouterConfig(local: RouterConfig, settings: ClusterRouterSettings) extends RouterConfig {
override def createRouteeProvider(context: ActorContext, routeeProps: Props) =
new ClusterRouteeProvider(context, routeeProps, resizer, settings)
override def createRoute(routeeProvider: RouteeProvider): Route = {
val localRoute = local.createRoute(routeeProvider)
// Intercept ClusterDomainEvent and route them to the ClusterRouterActor
({
case (sender, message: ClusterDomainEvent) List(Destination(sender, routeeProvider.context.self))
}: Route) orElse localRoute
}
override def createActor(): Router = new ClusterRouterActor(local.supervisorStrategy)
override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy
override def routerDispatcher: String = local.routerDispatcher
override def resizer: Option[Resizer] = local.resizer
override def stopRouterWhenAllRouteesRemoved: Boolean = false
override def withFallback(other: RouterConfig): RouterConfig = other match {
case ClusterRouterConfig(_: RemoteRouterConfig, _) throw new IllegalStateException(
"ClusterRouterConfig is not allowed to wrap a RemoteRouterConfig")
case ClusterRouterConfig(_: ClusterRouterConfig, _) throw new IllegalStateException(
"ClusterRouterConfig is not allowed to wrap a ClusterRouterConfig")
case ClusterRouterConfig(local, _) copy(local = this.local.withFallback(local))
case _ copy(local = this.local.withFallback(other))
}
}
object ClusterRouterSettings {
/**
* Settings for create and deploy of the routees
*/
def apply(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterSettings =
new ClusterRouterSettings(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees, useRole)
/**
* Settings for remote deployment of the routees, allowed to use routees on own node
*/
def apply(totalInstances: Int, maxInstancesPerNode: Int, useRole: Option[String]): ClusterRouterSettings =
apply(totalInstances, maxInstancesPerNode, allowLocalRoutees = true, useRole)
/**
* Settings for lookup of the routees
*/
def apply(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: Option[String]): ClusterRouterSettings =
new ClusterRouterSettings(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees, useRole)
/**
* Settings for lookup of the routees, allowed to use routees on own node
*/
def apply(totalInstances: Int, routeesPath: String, useRole: Option[String]): ClusterRouterSettings =
apply(totalInstances, routeesPath, allowLocalRoutees = true, useRole)
def useRoleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
object ClusterRouterGroupSettings {
def fromConfig(config: Config): ClusterRouterGroupSettings =
ClusterRouterGroupSettings(
totalInstances = config.getInt("nr-of-instances"),
routeesPath = config.getString("cluster.routees-path"),
allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"),
useRole = ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role")))
}
/**
* `totalInstances` of cluster router must be > 0
* `maxInstancesPerNode` of cluster router must be > 0
* `maxInstancesPerNode` of cluster router must be 1 when routeesPath is defined
*/
@SerialVersionUID(1L)
case class ClusterRouterSettings private[akka] (
case class ClusterRouterGroupSettings(
totalInstances: Int,
maxInstancesPerNode: Int,
routeesPath: String,
allowLocalRoutees: Boolean,
useRole: Option[String]) {
useRole: Option[String]) extends ClusterRouterSettingsBase {
/**
* Java API: Settings for create and deploy of the routees
*/
def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, maxInstancesPerNode, routeesPath = "", allowLocalRoutees,
ClusterRouterSettings.useRoleOption(useRole))
/**
* Java API: Settings for lookup of the routees
* Java API
*/
def this(totalInstances: Int, routeesPath: String, allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, maxInstancesPerNode = 1, routeesPath, allowLocalRoutees,
ClusterRouterSettings.useRoleOption(useRole))
this(totalInstances, routeesPath, allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole))
if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0")
if (maxInstancesPerNode <= 0) throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be > 0")
if (isRouteesPathDefined && maxInstancesPerNode != 1)
throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be 1 when routeesPath is defined")
if (!isRouteesPathDefined) throw new IllegalArgumentException("routeesPath must be defined")
val routeesPathElements: immutable.Iterable[String] = routeesPath match {
case RelativeActorPath(elements) elements
routeesPath match {
case RelativeActorPath(elements) // good
case _
throw new IllegalArgumentException("routeesPath [%s] is not a valid relative actor path" format routeesPath)
}
@ -145,70 +79,299 @@ case class ClusterRouterSettings private[akka] (
}
object ClusterRouterPoolSettings {
def fromConfig(config: Config): ClusterRouterPoolSettings =
ClusterRouterPoolSettings(
totalInstances = config.getInt("nr-of-instances"),
maxInstancesPerNode = config.getInt("cluster.max-nr-of-instances-per-node"),
allowLocalRoutees = config.getBoolean("cluster.allow-local-routees"),
useRole = ClusterRouterSettingsBase.useRoleOption(config.getString("cluster.use-role")))
}
/**
* INTERNAL API
*
* Factory and registry for routees of the router.
* Deploys new routees on the cluster nodes.
* `totalInstances` of cluster router must be > 0
* `maxInstancesPerNode` of cluster router must be > 0
* `maxInstancesPerNode` of cluster router must be 1 when routeesPath is defined
*/
private[akka] class ClusterRouteeProvider(
_context: ActorContext,
_routeeProps: Props,
_resizer: Option[Resizer],
settings: ClusterRouterSettings)
extends RouteeProvider(_context, _routeeProps, _resizer) {
// need this counter as instance variable since Resizer may call createRoutees several times
private val childNameCounter = new AtomicInteger
override def registerRouteesFor(paths: immutable.Iterable[String]): Unit =
throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]"
format context.self.path.toString)
@SerialVersionUID(1L)
case class ClusterRouterPoolSettings(
totalInstances: Int,
maxInstancesPerNode: Int,
allowLocalRoutees: Boolean,
useRole: Option[String]) extends ClusterRouterSettingsBase {
/**
* Note that nrOfInstances is ignored for cluster routers, instead
* the `totalInstances` parameter is used. That is the same when
* using config to define `nr-of-instances`, but when defining the
* router programatically or using [[akka.routing.Resizer]] they
* might be different. `totalInstances` is the relevant parameter
* to use for cluster routers.
* Java API
*/
override def createRoutees(nrOfInstances: Int): Unit = {
def this(totalInstances: Int, maxInstancesPerNode: Int, allowLocalRoutees: Boolean, useRole: String) =
this(totalInstances, maxInstancesPerNode, allowLocalRoutees, ClusterRouterSettingsBase.useRoleOption(useRole))
if (maxInstancesPerNode <= 0) throw new IllegalArgumentException("maxInstancesPerNode of cluster pool router must be > 0")
}
/**
* INTERNAL API
*/
private[akka] object ClusterRouterSettingsBase {
def useRoleOption(role: String): Option[String] = role match {
case null | "" None
case _ Some(role)
}
}
/**
* INTERNAL API
*/
private[akka] trait ClusterRouterSettingsBase {
def totalInstances: Int
def allowLocalRoutees: Boolean
def useRole: Option[String]
if (totalInstances <= 0) throw new IllegalArgumentException("totalInstances of cluster router must be > 0")
}
/**
* [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes.
* Delegates other duties to the local [[akka.routing.RouterConfig]],
* which makes it possible to mix this with the built-in routers such as
* [[akka.routing.RoundRobinRouter]] or custom routers.
*/
@SerialVersionUID(1L)
final case class ClusterRouterGroup(local: Group, settings: ClusterRouterGroupSettings) extends Group with ClusterRouterConfigBase {
require(settings.routeesPath.nonEmpty, "routeesPath must be defined")
override def paths: immutable.Iterable[String] = if (settings.allowLocalRoutees) List(settings.routeesPath) else Nil
/**
* INTERNAL API
*/
override private[akka] def createRouterActor(): RouterActor = new ClusterRouterGroupActor(settings)
override def withFallback(other: RouterConfig): RouterConfig = other match {
case ClusterRouterGroup(_: ClusterRouterGroup, _) throw new IllegalStateException(
"ClusterRouterGroup is not allowed to wrap a ClusterRouterGroup")
case ClusterRouterGroup(local, _)
copy(local = this.local.withFallback(local).asInstanceOf[Group])
case ClusterRouterConfig(local, _)
copy(local = this.local.withFallback(local).asInstanceOf[Group])
case _
copy(local = this.local.withFallback(other).asInstanceOf[Group])
}
}
/**
* [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes.
* Delegates other duties to the local [[akka.routing.RouterConfig]],
* which makes it possible to mix this with the built-in routers such as
* [[akka.routing.RoundRobinRouter]] or custom routers.
*/
@SerialVersionUID(1L)
final case class ClusterRouterPool(local: Pool, settings: ClusterRouterPoolSettings) extends Pool with ClusterRouterConfigBase {
require(local.resizer.isEmpty, "Resizer can't be used together with cluster router")
@transient private val childNameCounter = new AtomicInteger
/**
* INTERNAL API
*/
override private[akka] def newRoutee(routeeProps: Props, context: ActorContext): Routee = {
val name = "c" + childNameCounter.incrementAndGet
val ref = context.asInstanceOf[ActorCell].attachChild(routeeProps, name, systemService = false)
ActorRefRoutee(ref)
}
/**
* Initial number of routee instances
*/
override def nrOfInstances: Int = if (settings.allowLocalRoutees) settings.maxInstancesPerNode else 0
override def resizer: Option[Resizer] = local.resizer
/**
* INTERNAL API
*/
override private[akka] def createRouterActor(): RouterActor = new ClusterRouterPoolActor(local.supervisorStrategy, settings)
override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy
override def withFallback(other: RouterConfig): RouterConfig = other match {
case ClusterRouterPool(_: ClusterRouterPool, _) throw new IllegalStateException(
"ClusterRouterPool is not allowed to wrap a ClusterRouterPool")
case ClusterRouterPool(otherLocal, _)
copy(local = this.local.withFallback(otherLocal).asInstanceOf[Pool])
case ClusterRouterConfig(otherLocal, _)
copy(local = this.local.withFallback(otherLocal).asInstanceOf[Pool])
case _
copy(local = this.local.withFallback(other).asInstanceOf[Pool])
}
}
/**
* INTERNAL API
*/
private[akka] trait ClusterRouterConfigBase extends RouterConfig {
def local: RouterConfig
def settings: ClusterRouterSettingsBase
override def createRouter(system: ActorSystem): Router = local.createRouter(system)
override def routerDispatcher: String = local.routerDispatcher
override def stopRouterWhenAllRouteesRemoved: Boolean = false
override def routingLogicController(routingLogic: RoutingLogic): Option[Props] =
local.routingLogicController(routingLogic)
// Intercept ClusterDomainEvent and route them to the ClusterRouterActor
override def isManagementMessage(msg: Any): Boolean =
(msg.isInstanceOf[ClusterDomainEvent]) || super.isManagementMessage(msg)
}
/**
* INTERNAL API
*/
private[akka] class ClusterRouterPoolActor(
supervisorStrategy: SupervisorStrategy, val settings: ClusterRouterPoolSettings)
extends RouterPoolActor(supervisorStrategy) with ClusterRouterActor {
override def receive = clusterReceive orElse super.receive
/**
* Adds routees based on totalInstances and maxInstancesPerNode settings
*/
override def addRoutees(): Unit = {
@tailrec
def doCreateRoutees(): Unit = selectDeploymentTarget match {
def doAddRoutees(): Unit = selectDeploymentTarget match {
case None // done
case Some(target)
val ref =
if (settings.isRouteesPathDefined) {
context.actorFor(RootActorPath(target) / settings.routeesPathElements)
} else {
val name = "c" + childNameCounter.incrementAndGet
val deploy = Deploy(config = ConfigFactory.empty(), routerConfig = routeeProps.routerConfig,
scope = RemoteScope(target))
context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false)
}
val routeeProps = cell.routeeProps
val deploy = Deploy(config = ConfigFactory.empty(), routerConfig = routeeProps.routerConfig,
scope = RemoteScope(target))
val routee = pool.newRoutee(routeeProps.withDeploy(deploy), context)
// must register each one, since registered routees are used in selectDeploymentTarget
registerRoutees(List(ref))
cell.addRoutee(routee)
// recursion until all created
doCreateRoutees()
doAddRoutees()
}
doCreateRoutees()
doAddRoutees()
}
private[routing] def createRoutees(): Unit = createRoutees(settings.totalInstances)
override def maxInstancesPerNode: Int = settings.maxInstancesPerNode
override def unregisterRoutees(routees: immutable.Iterable[ActorRef]): Unit = {
super.unregisterRoutees(routees)
if (!settings.isRouteesPathDefined) {
// stop remote deployed routees
routees foreach context.stop
}
/**
* INTERNAL API
*/
private[akka] class ClusterRouterGroupActor(val settings: ClusterRouterGroupSettings)
extends RouterActor with ClusterRouterActor {
val group = cell.routerConfig match {
case x: Group x
case other
throw ActorInitializationException("ClusterRouterGroupActor can only be used with Nozle, not " + other.getClass)
}
override def receive = clusterReceive orElse super.receive
/**
* Adds routees based on totalInstances and maxInstancesPerNode settings
*/
override def addRoutees(): Unit = {
@tailrec
def doAddRoutees(): Unit = selectDeploymentTarget match {
case None // done
case Some(target)
val routee = group.routeeFor(target + settings.routeesPath, context)
// must register each one, since registered routees are used in selectDeploymentTarget
cell.addRoutee(routee)
// recursion until all created
doAddRoutees()
}
doAddRoutees()
}
override def maxInstancesPerNode: Int = 1
}
/**
* INTERNAL API
* The router actor, subscribes to cluster events and
* adjusts the routees.
*/
private[akka] trait ClusterRouterActor { this: RouterActor
def settings: ClusterRouterSettingsBase
if (!cell.routerConfig.isInstanceOf[Pool] && !cell.routerConfig.isInstanceOf[Group])
throw ActorInitializationException("Cluster router actor can only be used with Pool or Group, not with " +
cell.routerConfig.getClass)
def cluster: Cluster = Cluster(context.system)
// re-subscribe when restart
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[ReachabilityEvent])
}
override def postStop(): Unit = cluster.unsubscribe(self)
var nodes: immutable.SortedSet[Address] = {
import Member.addressOrdering
cluster.readView.members.collect {
case m if isAvailable(m) m.address
}
}
private def selectDeploymentTarget: Option[Address] = {
val currentRoutees = routees
def isAvailable(m: Member): Boolean =
m.status == MemberStatus.Up &&
satisfiesRole(m.roles) &&
(settings.allowLocalRoutees || m.address != cluster.selfAddress)
private def satisfiesRole(memberRoles: Set[String]): Boolean = settings.useRole match {
case None true
case Some(r) memberRoles.contains(r)
}
def availableNodes: immutable.SortedSet[Address] = {
import Member.addressOrdering
val currentNodes = nodes
if (currentNodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles))
//use my own node, cluster information not updated yet
immutable.SortedSet(cluster.selfAddress)
else
currentNodes
}
/**
* Fills in self address for local ActorRef
*/
def fullAddress(routee: Routee): Address = {
val a = routee match {
case ActorRefRoutee(ref) ref.path.address
case ActorSelectionRoutee(sel) sel.anchor.path.address
}
a match {
case Address(_, _, None, None) cluster.selfAddress
case a a
}
}
/**
* Adds routees based on totalInstances and maxInstancesPerNode settings
*/
def addRoutees(): Unit
def maxInstancesPerNode: Int
def selectDeploymentTarget: Option[Address] = {
val currentRoutees = cell.router.routees
val currentNodes = availableNodes
if (currentNodes.isEmpty || currentRoutees.size >= settings.totalInstances) {
None
@ -221,113 +384,47 @@ private[akka] class ClusterRouteeProvider(
}
val (address, count) = numberOfRouteesPerNode.minBy(_._2)
if (count < settings.maxInstancesPerNode) Some(address) else None
if (count < maxInstancesPerNode) Some(address) else None
}
}
private[routing] def cluster: Cluster = Cluster(context.system)
/**
* Fills in self address for local ActorRef
*/
private[routing] def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) cluster.selfAddress
case a a
def addMember(member: Member) = {
nodes += member.address
addRoutees()
}
private[routing] def availableNodes: immutable.SortedSet[Address] = {
import Member.addressOrdering
val currentNodes = nodes
if (currentNodes.isEmpty && settings.allowLocalRoutees && satisfiesRole(cluster.selfRoles))
//use my own node, cluster information not updated yet
immutable.SortedSet(cluster.selfAddress)
else
currentNodes
}
@volatile
private[routing] var nodes: immutable.SortedSet[Address] = {
import Member.addressOrdering
cluster.readView.members.collect {
case m if isAvailable(m) m.address
}
}
private[routing] def isAvailable(m: Member): Boolean =
m.status == MemberStatus.Up &&
satisfiesRole(m.roles) &&
(settings.allowLocalRoutees || m.address != cluster.selfAddress)
private def satisfiesRole(memberRoles: Set[String]): Boolean = settings.useRole match {
case None true
case Some(r) memberRoles.contains(r)
}
}
/**
* INTERNAL API
* The router actor, subscribes to cluster events.
*/
private[akka] class ClusterRouterActor(override val supervisorStrategy: SupervisorStrategy) extends Router {
// re-subscribe when restart
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
cluster.subscribe(self, classOf[ReachabilityEvent])
}
override def postStop(): Unit = cluster.unsubscribe(self)
// lazy to not interfere with RoutedActorCell initialization
lazy val routeeProvider: ClusterRouteeProvider = ref.routeeProvider match {
case x: ClusterRouteeProvider x
case _ throw new IllegalStateException(
"ClusterRouteeProvider must be used together with [%s]".format(getClass))
}
def cluster: Cluster = routeeProvider.cluster
def fullAddress(actorRef: ActorRef): Address = routeeProvider.fullAddress(actorRef)
def registerRoutees(member: Member) = {
routeeProvider.nodes += member.address
// createRoutees will create routees based on
// totalInstances and maxInstancesPerNode
routeeProvider.createRoutees()
}
def unregisterRoutees(member: Member) = {
def removeMember(member: Member) = {
val address = member.address
routeeProvider.nodes -= address
nodes -= address
// unregister routees that live on that node
val affectedRoutees = routeeProvider.routees.filter(fullAddress(_) == address)
routeeProvider.unregisterRoutees(affectedRoutees)
val affectedRoutees = cell.router.routees.filter(fullAddress(_) == address)
cell.removeRoutees(affectedRoutees, stopChild = true)
// createRoutees will not create more than createRoutees and maxInstancesPerNode
// addRoutees will not create more than createRoutees and maxInstancesPerNode
// this is useful when totalInstances < upNodes.size
routeeProvider.createRoutees()
addRoutees()
}
override def routerReceive: Receive = {
def clusterReceive: Receive = {
case s: CurrentClusterState
import Member.addressOrdering
routeeProvider.nodes = s.members.collect { case m if routeeProvider.isAvailable(m) m.address }
routeeProvider.createRoutees()
nodes = s.members.collect { case m if isAvailable(m) m.address }
addRoutees()
case m: MemberEvent if routeeProvider.isAvailable(m.member)
registerRoutees(m.member)
case m: MemberEvent if isAvailable(m.member)
addMember(m.member)
case other: MemberEvent
// other events means that it is no longer interesting, such as
// MemberExited, MemberRemoved
unregisterRoutees(other.member)
removeMember(other.member)
case UnreachableMember(m)
unregisterRoutees(m)
removeMember(m)
case ReachableMember(m)
if (routeeProvider.isAvailable(m))
registerRoutees(m)
if (isAvailable(m)) addMember(m)
}
}