Incorparate feedback from rkuhn, see #2103

This commit is contained in:
Patrik Nordwall 2012-09-10 14:41:51 +02:00
parent 7cf6ab54c8
commit c8cfe0eb1f

View file

@ -31,6 +31,7 @@ import akka.routing.RouterConfig
import akka.routing.RemoteRouterConfig
import akka.actor.RootActorPath
import akka.actor.ActorCell
import akka.actor.RelativeActorPath
/**
* [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes.
@ -121,7 +122,14 @@ case class ClusterRouterSettings private[akka] (
if (isRouteesPathDefined && maxInstancesPerNode != 1)
throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be 1 when routeesPath is defined")
val routeesPathElements: Iterable[String] = routeesPath match {
case RelativeActorPath(elements) elements
case _
throw new IllegalArgumentException("routeesPath [%s] is not a valid relative actor path" format routeesPath)
}
def isRouteesPathDefined: Boolean = ((routeesPath ne null) && routeesPath != "")
}
/**
@ -158,7 +166,7 @@ private[akka] class ClusterRouteeProvider(
for (i 1 to settings.totalInstances; target selectDeploymentTarget) {
val ref =
if (settings.isRouteesPathDefined) {
context.actorFor(target.toString + settings.routeesPath)
context.actorFor(RootActorPath(target) / settings.routeesPathElements)
} else {
val name = "c" + childNameCounter.incrementAndGet
val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target))
@ -176,9 +184,6 @@ private[akka] class ClusterRouteeProvider(
val currentNodes = availbleNodes
if (currentRoutees.size >= settings.totalInstances) {
None
} else if (currentNodes.isEmpty && settings.routeesOnOwnNode) {
// use my own node, cluster information not updated yet
Some(cluster.selfAddress)
} else {
val numberOfRouteesPerNode: Map[Address, Int] =
Map.empty[Address, Int] ++ currentNodes.toSeq.map(_ -> 0) ++
@ -201,10 +206,22 @@ private[akka] class ClusterRouteeProvider(
case a a
}
import Member.addressOrdering
private[routing] def availbleNodes: SortedSet[Address] = {
import Member.addressOrdering
val currentNodes = nodes
if (currentNodes.isEmpty && settings.routeesOnOwnNode)
//use my own node, cluster information not updated yet
SortedSet(cluster.selfAddress)
else
currentNodes
}
@volatile
private[routing] var availbleNodes: SortedSet[Address] = cluster.readView.members.collect {
case m if isAvailble(m) m.address
private[routing] var nodes: SortedSet[Address] = {
import Member.addressOrdering
cluster.readView.members.collect {
case m if isAvailble(m) m.address
}
}
private[routing] def isAvailble(m: Member): Boolean = {
@ -219,7 +236,9 @@ private[akka] class ClusterRouteeProvider(
*/
private[akka] class ClusterRouterActor extends Router {
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent])
// subscribe to cluster changes, MemberEvent
// re-subscribe when restart
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent])
override def postStop(): Unit = cluster.unsubscribe(self)
// lazy to not interfere with RoutedActorCell initialization
@ -236,11 +255,11 @@ private[akka] class ClusterRouterActor extends Router {
override def routerReceive: Receive = {
case s: CurrentClusterState
import Member.addressOrdering
routeeProvider.availbleNodes = s.members.collect { case m if routeeProvider.isAvailble(m) m.address }
routeeProvider.nodes = s.members.collect { case m if routeeProvider.isAvailble(m) m.address }
routeeProvider.createRoutees()
case m: MemberEvent if routeeProvider.isAvailble(m.member)
routeeProvider.availbleNodes += m.member.address
routeeProvider.nodes += m.member.address
// createRoutees will create routees based on
// totalInstances and maxInstancesPerNode
routeeProvider.createRoutees()
@ -249,7 +268,7 @@ private[akka] class ClusterRouterActor extends Router {
// other events means that it is no longer interesting, such as
// MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved
val address = other.member.address
routeeProvider.availbleNodes -= address
routeeProvider.nodes -= address
// unregister routees that live on that node
val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address)