diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 2cbd01e784..0f489b2738 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -31,6 +31,7 @@ import akka.routing.RouterConfig import akka.routing.RemoteRouterConfig import akka.actor.RootActorPath import akka.actor.ActorCell +import akka.actor.RelativeActorPath /** * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. @@ -121,7 +122,14 @@ case class ClusterRouterSettings private[akka] ( if (isRouteesPathDefined && maxInstancesPerNode != 1) throw new IllegalArgumentException("maxInstancesPerNode of cluster router must be 1 when routeesPath is defined") + val routeesPathElements: Iterable[String] = routeesPath match { + case RelativeActorPath(elements) ⇒ elements + case _ ⇒ + throw new IllegalArgumentException("routeesPath [%s] is not a valid relative actor path" format routeesPath) + } + def isRouteesPathDefined: Boolean = ((routeesPath ne null) && routeesPath != "") + } /** @@ -158,7 +166,7 @@ private[akka] class ClusterRouteeProvider( for (i ← 1 to settings.totalInstances; target ← selectDeploymentTarget) { val ref = if (settings.isRouteesPathDefined) { - context.actorFor(target.toString + settings.routeesPath) + context.actorFor(RootActorPath(target) / settings.routeesPathElements) } else { val name = "c" + childNameCounter.incrementAndGet val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) @@ -176,9 +184,6 @@ private[akka] class ClusterRouteeProvider( val currentNodes = availbleNodes if (currentRoutees.size >= settings.totalInstances) { None - } else if (currentNodes.isEmpty && settings.routeesOnOwnNode) { - // use my own node, cluster information not updated yet - Some(cluster.selfAddress) } else { val numberOfRouteesPerNode: Map[Address, Int] = Map.empty[Address, Int] ++ currentNodes.toSeq.map(_ -> 0) ++ @@ -201,10 +206,22 @@ private[akka] class ClusterRouteeProvider( case a ⇒ a } - import Member.addressOrdering + private[routing] def availbleNodes: SortedSet[Address] = { + import Member.addressOrdering + val currentNodes = nodes + if (currentNodes.isEmpty && settings.routeesOnOwnNode) + //use my own node, cluster information not updated yet + SortedSet(cluster.selfAddress) + else + currentNodes + } + @volatile - private[routing] var availbleNodes: SortedSet[Address] = cluster.readView.members.collect { - case m if isAvailble(m) ⇒ m.address + private[routing] var nodes: SortedSet[Address] = { + import Member.addressOrdering + cluster.readView.members.collect { + case m if isAvailble(m) ⇒ m.address + } } private[routing] def isAvailble(m: Member): Boolean = { @@ -219,7 +236,9 @@ private[akka] class ClusterRouteeProvider( */ private[akka] class ClusterRouterActor extends Router { - override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) + // subscribe to cluster changes, MemberEvent + // re-subscribe when restart + override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) override def postStop(): Unit = cluster.unsubscribe(self) // lazy to not interfere with RoutedActorCell initialization @@ -236,11 +255,11 @@ private[akka] class ClusterRouterActor extends Router { override def routerReceive: Receive = { case s: CurrentClusterState ⇒ import Member.addressOrdering - routeeProvider.availbleNodes = s.members.collect { case m if routeeProvider.isAvailble(m) ⇒ m.address } + routeeProvider.nodes = s.members.collect { case m if routeeProvider.isAvailble(m) ⇒ m.address } routeeProvider.createRoutees() case m: MemberEvent if routeeProvider.isAvailble(m.member) ⇒ - routeeProvider.availbleNodes += m.member.address + routeeProvider.nodes += m.member.address // createRoutees will create routees based on // totalInstances and maxInstancesPerNode routeeProvider.createRoutees() @@ -249,7 +268,7 @@ private[akka] class ClusterRouterActor extends Router { // other events means that it is no longer interesting, such as // MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved val address = other.member.address - routeeProvider.availbleNodes -= address + routeeProvider.nodes -= address // unregister routees that live on that node val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address)