From e422d188c2421db081677796ae7c177d3a8eb47a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 30 Aug 2012 12:13:50 +0200 Subject: [PATCH] Use the router actor as cluster listener, see #2103 * Removed need for extra actor that subscribes to cluster events * ClusterRouterActor is the router actor and aslo subscribes * Less scary initialization, and safe calls to createRoutees * Intercept ClusterDomainEvent to route them to ClusterRouterActor instead of to the routees --- .../cluster/routing/ClusterRouterConfig.scala | 91 ++++++++++++------- .../ClusterRoundRobinRoutedActorSpec.scala | 2 +- 2 files changed, 61 insertions(+), 32 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 2103820895..c0ce6eb359 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -29,6 +29,8 @@ import akka.routing.RouterConfig import java.lang.IllegalStateException import akka.cluster.ClusterScope import akka.routing.RoundRobinRouter +import akka.routing.Destination +import java.lang.IllegalStateException /** * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. @@ -41,9 +43,16 @@ case class ClusterRouterConfig(local: RouterConfig, totalInstances: Int, maxInst override def createRouteeProvider(context: ActorContext, routeeProps: Props) = new ClusterRouteeProvider(context, routeeProps, resizer, totalInstances, maxInstancesPerNode) - override def createRoute(routeeProvider: RouteeProvider): Route = local.createRoute(routeeProvider) + override def createRoute(routeeProvider: RouteeProvider): Route = { + val localRoute = local.createRoute(routeeProvider) - override def createActor(): Router = local.createActor() + // Intercept ClusterDomainEvent and route them to the ClusterRouterActor + ({ + case (sender, message: ClusterDomainEvent) ⇒ Seq(Destination(sender, routeeProvider.context.self)) + }: Route) orElse localRoute + } + + override def createActor(): Router = new ClusterRouterActor override def supervisorStrategy: SupervisorStrategy = local.supervisorStrategy @@ -58,10 +67,12 @@ case class ClusterRouterConfig(local: RouterConfig, totalInstances: Int, maxInst } /** + * INTERNAL API + * * Factory and registry for routees of the router. * Deploys new routees on the cluster nodes. */ -class ClusterRouteeProvider( +private[akka] class ClusterRouteeProvider( _context: ActorContext, _routeeProps: Props, _resizer: Option[Resizer], @@ -97,6 +108,8 @@ class ClusterRouteeProvider( } } + private[routing] def createRoutees(): Unit = createRoutees(totalInstances) + private def selectDeploymentTarget: Option[Address] = { val currentRoutees = routees val currentNodes = upNodes @@ -117,54 +130,70 @@ class ClusterRouteeProvider( } } + private[routing] def cluster: Cluster = Cluster(context.system) + /** * Fills in self address for local ActorRef */ - private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { + private[routing] def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { case Address(_, _, None, None) ⇒ cluster.selfAddress case a ⇒ a } - private def cluster: Cluster = Cluster(context.system) - import Member.addressOrdering @volatile - private var upNodes: SortedSet[Address] = cluster.readView.members.collect { + private[routing] var upNodes: SortedSet[Address] = cluster.readView.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } - // create actor that subscribes to the cluster eventBus - private val eventBusListener: ActorRef = context.actorOf(Props(new Actor { - override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) - override def postStop(): Unit = cluster.unsubscribe(self) +} - def receive = { - case s: CurrentClusterState ⇒ - upNodes = s.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } +/** + * INTERNAL API + * The router actor, subscribes to cluster events. + */ +private[akka] class ClusterRouterActor extends Router { - case MemberUp(m) ⇒ - upNodes += m.address - // createRoutees will not create more than createRoutees and maxInstancesPerNode - createRoutees(totalInstances) + override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) + override def postStop(): Unit = cluster.unsubscribe(self) - case other: MemberEvent ⇒ - // other events means that it is no longer interesting, such as - // MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved - val address = other.member.address - upNodes -= address + // lazy to not interfere with RoutedActorCell initialization + lazy val routeeProvider: ClusterRouteeProvider = ref.routeeProvider match { + case x: ClusterRouteeProvider ⇒ x + case _ ⇒ + throw new IllegalStateException("ClusterRouteeProvider must be used together with [%s]".format(this.getClass)) + } - // unregister routees that live on that node - val affectedRoutes = routees.filter(fullAddress(_) == address) - unregisterRoutees(affectedRoutes) + def cluster: Cluster = routeeProvider.cluster - // createRoutees will not create more than createRoutees and maxInstancesPerNode - // this is useful when totalInstances < upNodes.size - createRoutees(totalInstances) + def fullAddress(actorRef: ActorRef): Address = routeeProvider.fullAddress(actorRef) - } + override def routerReceive: Receive = { + case s: CurrentClusterState ⇒ + import Member.addressOrdering + routeeProvider.upNodes = s.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } - }), name = "cluster-listener") + case MemberUp(m) ⇒ + routeeProvider.upNodes += m.address + // createRoutees will create routees based on + // totalInstances and maxInstancesPerNode + routeeProvider.createRoutees() + case other: MemberEvent ⇒ + // other events means that it is no longer interesting, such as + // MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved + val address = other.member.address + routeeProvider.upNodes -= address + + // unregister routees that live on that node + val affectedRoutes = routeeProvider.routees.filter(fullAddress(_) == address) + routeeProvider.unregisterRoutees(affectedRoutes) + + // createRoutees will not create more than createRoutees and maxInstancesPerNode + // this is useful when totalInstances < upNodes.size + routeeProvider.createRoutees() + + } } /** diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index cac3cd67fe..fe1498a3cd 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -160,7 +160,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou enterBarrier("after-4") } - "deploy to other node when a node is down" taggedAs LongRunningTest in { + "deploy to other node when a node becomes down" taggedAs LongRunningTest in { runOn(first) { def currentRoutees = Await.result(router2 ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees