Use the router actor as cluster listener, see #2103

* Removed need for extra actor that subscribes to cluster
  events
* ClusterRouterActor is the router actor and aslo subscribes
* Less scary initialization, and safe calls to createRoutees
* Intercept ClusterDomainEvent to route them to ClusterRouterActor
  instead of to the routees
This commit is contained in:
Patrik Nordwall 2012-08-30 12:13:50 +02:00
parent ba5df98740
commit e422d188c2
2 changed files with 61 additions and 32 deletions

View file

@ -29,6 +29,8 @@ import akka.routing.RouterConfig
import java.lang.IllegalStateException
import akka.cluster.ClusterScope
import akka.routing.RoundRobinRouter
import akka.routing.Destination
import java.lang.IllegalStateException
/**
* [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes.
@ -41,9 +43,16 @@ case class ClusterRouterConfig(local: RouterConfig, totalInstances: Int, maxInst
override def createRouteeProvider(context: ActorContext, routeeProps: Props) =
new ClusterRouteeProvider(context, routeeProps, resizer, totalInstances, maxInstancesPerNode)
override def createRoute(routeeProvider: RouteeProvider): Route = local.createRoute(routeeProvider)
override def createRoute(routeeProvider: RouteeProvider): Route = {
val localRoute = local.createRoute(routeeProvider)
override def createActor(): Router = local.createActor()
// Intercept ClusterDomainEvent and route them to the ClusterRouterActor
({
case (sender, message: ClusterDomainEvent) Seq(Destination(sender, routeeProvider.context.self))
}: Route) orElse localRoute
}
override def createActor(): Router = new ClusterRouterActor
override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy
@ -58,10 +67,12 @@ case class ClusterRouterConfig(local: RouterConfig, totalInstances: Int, maxInst
}
/**
* INTERNAL API
*
* Factory and registry for routees of the router.
* Deploys new routees on the cluster nodes.
*/
class ClusterRouteeProvider(
private[akka] class ClusterRouteeProvider(
_context: ActorContext,
_routeeProps: Props,
_resizer: Option[Resizer],
@ -97,6 +108,8 @@ class ClusterRouteeProvider(
}
}
private[routing] def createRoutees(): Unit = createRoutees(totalInstances)
private def selectDeploymentTarget: Option[Address] = {
val currentRoutees = routees
val currentNodes = upNodes
@ -117,54 +130,70 @@ class ClusterRouteeProvider(
}
}
private[routing] def cluster: Cluster = Cluster(context.system)
/**
* Fills in self address for local ActorRef
*/
private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
private[routing] def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match {
case Address(_, _, None, None) cluster.selfAddress
case a a
}
private def cluster: Cluster = Cluster(context.system)
import Member.addressOrdering
@volatile
private var upNodes: SortedSet[Address] = cluster.readView.members.collect {
private[routing] var upNodes: SortedSet[Address] = cluster.readView.members.collect {
case m if m.status == MemberStatus.Up m.address
}
// create actor that subscribes to the cluster eventBus
private val eventBusListener: ActorRef = context.actorOf(Props(new Actor {
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent])
override def postStop(): Unit = cluster.unsubscribe(self)
}
def receive = {
case s: CurrentClusterState
upNodes = s.members.collect { case m if m.status == MemberStatus.Up m.address }
/**
* INTERNAL API
* The router actor, subscribes to cluster events.
*/
private[akka] class ClusterRouterActor extends Router {
case MemberUp(m)
upNodes += m.address
// createRoutees will not create more than createRoutees and maxInstancesPerNode
createRoutees(totalInstances)
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent])
override def postStop(): Unit = cluster.unsubscribe(self)
case other: MemberEvent
// other events means that it is no longer interesting, such as
// MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved
val address = other.member.address
upNodes -= address
// lazy to not interfere with RoutedActorCell initialization
lazy val routeeProvider: ClusterRouteeProvider = ref.routeeProvider match {
case x: ClusterRouteeProvider x
case _
throw new IllegalStateException("ClusterRouteeProvider must be used together with [%s]".format(this.getClass))
}
// unregister routees that live on that node
val affectedRoutes = routees.filter(fullAddress(_) == address)
unregisterRoutees(affectedRoutes)
def cluster: Cluster = routeeProvider.cluster
// createRoutees will not create more than createRoutees and maxInstancesPerNode
// this is useful when totalInstances < upNodes.size
createRoutees(totalInstances)
def fullAddress(actorRef: ActorRef): Address = routeeProvider.fullAddress(actorRef)
}
override def routerReceive: Receive = {
case s: CurrentClusterState
import Member.addressOrdering
routeeProvider.upNodes = s.members.collect { case m if m.status == MemberStatus.Up m.address }
}), name = "cluster-listener")
case MemberUp(m)
routeeProvider.upNodes += m.address
// createRoutees will create routees based on
// totalInstances and maxInstancesPerNode
routeeProvider.createRoutees()
case other: MemberEvent
// other events means that it is no longer interesting, such as
// MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved
val address = other.member.address
routeeProvider.upNodes -= address
// unregister routees that live on that node
val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address)
routeeProvider.unregisterRoutees(affectedRoutes)
// createRoutees will not create more than createRoutees and maxInstancesPerNode
// this is useful when totalInstances < upNodes.size
routeeProvider.createRoutees()
}
}
/**