From 695ce49727a88018fcc752528dfc3731bdd35abf Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 29 Aug 2012 19:33:19 +0200 Subject: [PATCH] Deploy to new members in cluster, see #2103 * Config max-nr-of-instances-per-node * selectDeploymentTarget that takes max-nr-of-instances-per-node and nr-of-instances into account * Deploy when new member added or removed * Moved routeeProps to RouteeProvider constructor, needed for this feature, but also simplifies createRoute, createRoutee, and resize, since routeeProps doesn't have to be passed around. --- .../scala/akka/routing/CustomRouteSpec.scala | 4 +- .../test/scala/akka/routing/RoutingSpec.scala | 4 +- .../src/main/scala/akka/routing/Routing.scala | 57 ++++---- .../src/main/resources/reference.conf | 16 ++- .../cluster/ClusterActorRefProvider.scala | 9 +- .../cluster/routing/ClusterRouterConfig.scala | 125 ++++++++++-------- .../ClusterRoundRobinRoutedActorSpec.scala | 79 ++++++----- .../scala/akka/remote/RemoteDeployer.scala | 1 + .../akka/routing/RemoteRouterConfig.scala | 17 +-- 9 files changed, 185 insertions(+), 127 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/CustomRouteSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/CustomRouteSpec.scala index 56193f668f..7fa311c305 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/CustomRouteSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/CustomRouteSpec.scala @@ -15,8 +15,8 @@ class CustomRouteSpec extends AkkaSpec { import akka.dispatch.Dispatchers class MyRouter(target: ActorRef) extends RouterConfig { - override def createRoute(props: Props, provider: RouteeProvider): Route = { - provider.createRoutees(props, 1) + override def createRoute(provider: RouteeProvider): Route = { + provider.createRoutees(1) { case (sender, message: String) ⇒ Seq(Destination(sender, target)) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 8b20189dcb..ec5d825fec 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -51,7 +51,7 @@ object RoutingSpec { class MyRouter(config: Config) extends RouterConfig { val foo = config.getString("foo") - def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { + def createRoute(routeeProvider: RouteeProvider): Route = { val routees = IndexedSeq(routeeProvider.context.actorOf(Props[Echo])) routeeProvider.registerRoutees(routees) @@ -618,7 +618,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy //#crRoute - def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { + def createRoute(routeeProvider: RouteeProvider): Route = { val democratActor = routeeProvider.context.actorOf(Props(new DemocratActor()), "d") val republicanActor = routeeProvider.context.actorOf(Props(new RepublicanActor()), "r") val routees = Vector[ActorRef](democratActor, republicanActor) diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index b5362048a7..6c7f47fc32 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -48,7 +48,6 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo _supervisor) { private[akka] val routerConfig = _props.routerConfig - private[akka] val routeeProps = _props.copy(routerConfig = NoRouter) private[akka] val resizeInProgress = new AtomicBoolean private val resizeCounter = new AtomicLong @@ -61,12 +60,13 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo def routeeProvider = _routeeProvider val route = { - _routeeProvider = routerConfig.createRouteeProvider(this) - val r = routerConfig.createRoute(routeeProps, routeeProvider) + val routeeProps = _props.copy(routerConfig = NoRouter) + _routeeProvider = routerConfig.createRouteeProvider(this, routeeProps) + val r = routerConfig.createRoute(routeeProvider) // initial resize, before message send routerConfig.resizer foreach { r ⇒ if (r.isTimeForResize(resizeCounter.getAndIncrement())) - r.resize(routeeProps, routeeProvider) + r.resize(routeeProvider) } r } @@ -160,9 +160,10 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo */ trait RouterConfig { - def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route + def createRoute(routeeProvider: RouteeProvider): Route - def createRouteeProvider(context: ActorContext): RouteeProvider = new RouteeProvider(context, resizer) + def createRouteeProvider(context: ActorContext, routeeProps: Props): RouteeProvider = + new RouteeProvider(context, routeeProps, resizer) def createActor(): Router = new Router { override def supervisorStrategy: SupervisorStrategy = RouterConfig.this.supervisorStrategy @@ -204,7 +205,7 @@ trait RouterConfig { * Uses `context.actorOf` to create routees from nrOfInstances property * and `context.actorFor` lookup routees from paths. */ -class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { +class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resizer: Option[Resizer]) { /** * Adds the routees to the router. @@ -245,11 +246,11 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { /** * Creates new routees from specified `Props` and registers them. */ - def createRoutees(props: Props, nrOfInstances: Int): Unit = { + def createRoutees(nrOfInstances: Int): Unit = { if (nrOfInstances <= 0) throw new IllegalArgumentException( "Must specify nrOfInstances or routees for [%s]" format context.self.path.toString) else { - val routees = (1 to nrOfInstances).map(_ ⇒ context.actorOf(props))(scala.collection.breakOut) + val routees = (1 to nrOfInstances).map(_ ⇒ context.actorOf(routeeProps))(scala.collection.breakOut) registerRoutees(routees) } } @@ -301,16 +302,16 @@ class RouteeProvider(val context: ActorContext, val resizer: Option[Resizer]) { * @see akka.routing.RouterConfig */ abstract class CustomRouterConfig extends RouterConfig { - override def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + override def createRoute(routeeProvider: RouteeProvider): Route = { // as a bonus, this prevents closing of props and context in the returned Route PartialFunction - val customRoute = createCustomRoute(props, routeeProvider) + val customRoute = createCustomRoute(routeeProvider) { case (sender, message) ⇒ customRoute.destinationsFor(sender, message) } } - def createCustomRoute(props: Props, routeeProvider: RouteeProvider): CustomRoute + def createCustomRoute(routeeProvider: RouteeProvider): CustomRoute } @@ -334,7 +335,7 @@ trait Router extends Actor { case Router.Resize ⇒ val ab = ref.resizeInProgress - if (ab.get) try ref.routerConfig.resizer foreach (_.resize(ref.routeeProps, ref.routeeProvider)) finally ab.set(false) + if (ab.get) try ref.routerConfig.resizer foreach (_.resize(ref.routeeProvider)) finally ab.set(false) case Terminated(child) ⇒ ref.removeRoutees(IndexedSeq(child)) @@ -409,7 +410,7 @@ case class Destination(sender: ActorRef, recipient: ActorRef) @SerialVersionUID(1L) abstract class NoRouter extends RouterConfig case object NoRouter extends NoRouter { - def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null // FIXME, null, really?? + def createRoute(routeeProvider: RouteeProvider): Route = null // FIXME, null, really?? def routerDispatcher: String = "" def supervisorStrategy = null // FIXME null, really?? override def withFallback(other: RouterConfig): RouterConfig = other @@ -448,7 +449,7 @@ class FromConfig(val routerDispatcher: String = Dispatchers.DefaultDispatcherId) override def verifyConfig(): Unit = throw new ConfigurationException("router needs external configuration from file (e.g. application.conf)") - def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null + def createRoute(routeeProvider: RouteeProvider): Route = null def supervisorStrategy: SupervisorStrategy = Router.defaultSupervisorStrategy } @@ -553,9 +554,9 @@ trait RoundRobinLike { this: RouterConfig ⇒ def routees: Iterable[String] - def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { - if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances) + if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) else routeeProvider.registerRouteesFor(routees) } @@ -675,9 +676,9 @@ trait RandomLike { this: RouterConfig ⇒ def routees: Iterable[String] - def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { - if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances) + if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) else routeeProvider.registerRouteesFor(routees) } @@ -858,9 +859,9 @@ trait SmallestMailboxLike { this: RouterConfig ⇒ case _ ⇒ 0 } - def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { - if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances) + if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) else routeeProvider.registerRouteesFor(routees) } @@ -1009,9 +1010,9 @@ trait BroadcastLike { this: RouterConfig ⇒ def routees: Iterable[String] - def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { - if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances) + if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) else routeeProvider.registerRouteesFor(routees) } @@ -1130,9 +1131,9 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ def within: Duration - def createRoute(props: Props, routeeProvider: RouteeProvider): Route = { + def createRoute(routeeProvider: RouteeProvider): Route = { if (resizer.isEmpty) { - if (routees.isEmpty) routeeProvider.createRoutees(props, nrOfInstances) + if (routees.isEmpty) routeeProvider.createRoutees(nrOfInstances) else routeeProvider.registerRouteesFor(routees) } @@ -1175,7 +1176,7 @@ trait Resizer { * This method is invoked only in the context of the Router actor in order to safely * create/stop children. */ - def resize(props: Props, routeeProvider: RouteeProvider): Unit + def resize(routeeProvider: RouteeProvider): Unit } case object DefaultResizer { @@ -1271,11 +1272,11 @@ case class DefaultResizer( def isTimeForResize(messageCounter: Long): Boolean = (messageCounter % messagesPerResize == 0) - def resize(props: Props, routeeProvider: RouteeProvider): Unit = { + def resize(routeeProvider: RouteeProvider): Unit = { val currentRoutees = routeeProvider.routees val requestedCapacity = capacity(currentRoutees) - if (requestedCapacity > 0) routeeProvider.createRoutees(props, requestedCapacity) + if (requestedCapacity > 0) routeeProvider.createRoutees(requestedCapacity) else if (requestedCapacity < 0) routeeProvider.removeRoutees(-requestedCapacity, stopDelay) } diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 556c39898d..d0ef3cf309 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -107,5 +107,19 @@ akka { } } - actor.deployment.default.cluster = off + # Default configuration for routers + actor.deployment.default.cluster { + # enable cluster aware router that deploys to nodes in the cluster + enabled = off + + # Maximum number of routees that will be deployed on each cluster + # member node. + # Note that nr-of-instances defines total number of routees, but + # number of routees per node will not be exceeded, i.e. if you + # define nr-of-instances = 50 and max-nr-of-instances-per-node = 2 + # it will deploy 2 routees per new member in the cluster, up to + # 25 members. + max-nr-of-instances-per-node = 1 + + } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index bea6d6f57f..0d56418b18 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -33,12 +33,17 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami override def parseConfig(path: String, config: Config): Option[Deploy] = { super.parseConfig(path, config) match { case d @ Some(deploy) ⇒ - if (deploy.config.getBoolean("cluster")) { + if (deploy.config.getBoolean("cluster.enabled")) { if (deploy.scope != NoScopeGiven) throw new ConfigurationException("Cluster deployment can't be combined with scope [%s]".format(deploy.scope)) if (deploy.routerConfig.isInstanceOf[RemoteRouterConfig]) throw new ConfigurationException("Cluster deployment can't be combined with [%s]".format(deploy.routerConfig)) - Some(deploy.copy(routerConfig = ClusterRouterConfig(deploy.routerConfig))) + + val totalInstances = deploy.config.getInt("nr-of-instances") + val maxInstancesPerNode = deploy.config.getInt("cluster.max-nr-of-instances-per-node") + Some(deploy.copy( + routerConfig = ClusterRouterConfig(deploy.routerConfig, totalInstances, maxInstancesPerNode), + scope = ClusterScope)) } else d case None ⇒ None } diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 8bc38440ad..945497677c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -6,7 +6,6 @@ package akka.cluster.routing import java.util.concurrent.atomic.AtomicInteger import scala.collection.immutable.SortedSet import com.typesafe.config.ConfigFactory - import akka.ConfigurationException import akka.actor.Actor import akka.actor.ActorContext @@ -27,6 +26,8 @@ import akka.routing.Route import akka.routing.RouteeProvider import akka.routing.Router import akka.routing.RouterConfig +import java.lang.IllegalStateException +import akka.cluster.ClusterScope /** * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. @@ -34,13 +35,12 @@ import akka.routing.RouterConfig * which makes it possible to mix this with the built-in routers such as * [[akka.routing.RoundRobinRouter]] or custom routers. */ -case class ClusterRouterConfig(local: RouterConfig) extends RouterConfig { +case class ClusterRouterConfig(local: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int) extends RouterConfig { - override def createRouteeProvider(context: ActorContext) = new ClusterRouteeProvider(context, resizer) + override def createRouteeProvider(context: ActorContext, routeeProps: Props) = + new ClusterRouteeProvider(context, routeeProps, resizer, totalInstances, maxInstancesPerNode) - override def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { - local.createRoute(routeeProps, routeeProvider) - } + override def createRoute(routeeProvider: RouteeProvider): Route = local.createRoute(routeeProvider) override def createActor(): Router = local.createActor() @@ -51,8 +51,8 @@ case class ClusterRouterConfig(local: RouterConfig) extends RouterConfig { override def resizer: Option[Resizer] = local.resizer override def withFallback(other: RouterConfig): RouterConfig = other match { - case ClusterRouterConfig(local) ⇒ copy(local = this.local.withFallback(local)) - case _ ⇒ copy(local = this.local.withFallback(other)) + case ClusterRouterConfig(local, _, _) ⇒ copy(local = this.local.withFallback(local)) + case _ ⇒ copy(local = this.local.withFallback(other)) } } @@ -60,8 +60,13 @@ case class ClusterRouterConfig(local: RouterConfig) extends RouterConfig { * Factory and registry for routees of the router. * Deploys new routees on the cluster nodes. */ -class ClusterRouteeProvider(_context: ActorContext, _resizer: Option[Resizer]) - extends RouteeProvider(_context, _resizer) { +class ClusterRouteeProvider( + _context: ActorContext, + _routeeProps: Props, + _resizer: Option[Resizer], + totalInstances: Int, + maxInstancesPerNode: Int) + extends RouteeProvider(_context, _routeeProps, _resizer) { // need this counter as instance variable since Resizer may call createRoutees several times private val childNameCounter = new AtomicInteger @@ -70,64 +75,80 @@ class ClusterRouteeProvider(_context: ActorContext, _resizer: Option[Resizer]) throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]" format context.self.path.toString) - override def createRoutees(props: Props, nrOfInstances: Int): Unit = { - val nodes = upNodes - if (nodes.isEmpty) { - IndexedSeq.empty - } else { - val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 - // FIXME We could count number of routees per node and select nodes with least routees first - val nodesIter: Iterator[Address] = Stream.continually(nodes).flatten.iterator - val refs = IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { - val name = "c" + childNameCounter.incrementAndGet - val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodesIter.next)) - impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, - systemService = false, Some(deploy), lookupDeploy = false, async = false) - }) - registerRoutees(refs) + override def createRoutees(nrOfInstances: Int): Unit = { + val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 + + for (i ← 1 to nrOfInstances; target ← selectDeploymentTarget) { + val name = "c" + childNameCounter.incrementAndGet + val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) + var ref = impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name, + systemService = false, Some(deploy), lookupDeploy = false, async = false) + // must register each one, since registered routees are used in selectDeploymentTarget + registerRoutees(Some(ref)) } } - // FIXME experimental hack to let the cluster initialize - // What should we do before we have full cluster information (startup phase)? - Cluster(context.system).readView - Thread.sleep(2000) + private def selectDeploymentTarget: Option[Address] = { + val currentRoutees = routees + val currentNodes = upNodes + if (currentRoutees.size >= totalInstances) { + None + } else if (currentNodes.isEmpty) { + // use my own node, cluster information not updated yet + Some(cluster.selfAddress) + } else { + val numberOfRouteesPerNode: Map[Address, Int] = + Map.empty[Address, Int] ++ currentNodes.toSeq.map(_ -> 0) ++ + currentRoutees.groupBy(fullAddress).map { + case (address, refs) ⇒ address -> refs.size + } + + val (address, count) = numberOfRouteesPerNode.minBy(_._2) + if (count < maxInstancesPerNode) Some(address) else None + } + } + + /** + * Fills in self address for local ActorRef + */ + private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { + case Address(_, _, None, None) ⇒ cluster.selfAddress + case a ⇒ a + } + + private def cluster: Cluster = Cluster(context.system) import Member.addressOrdering @volatile - private var upNodes: SortedSet[Address] = Cluster(context.system).readView.members.collect { + private var upNodes: SortedSet[Address] = cluster.readView.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } // create actor that subscribes to the cluster eventBus - private val eventBusListener: ActorRef = { + private val eventBusListener: ActorRef = context.actorOf(Props(new Actor { + override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) + override def postStop(): Unit = cluster.unsubscribe(self) - // FIXME is this allowed, are we inside or outside of the actor? - context.actorOf(Props(new Actor { - override def preStart(): Unit = Cluster(context.system).subscribe(self, classOf[ClusterDomainEvent]) - override def postStop(): Unit = Cluster(context.system).unsubscribe(self) + def receive = { + case s: CurrentClusterState ⇒ + upNodes = s.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } - def receive = { - case s: CurrentClusterState ⇒ - upNodes = s.members.collect { case m if m.status == MemberStatus.Up ⇒ m.address } + case MemberUp(m) ⇒ + upNodes += m.address + // createRoutees will not create more than createRoutees and maxInstancesPerNode + createRoutees(totalInstances) - case MemberUp(m) ⇒ - upNodes += m.address - // FIXME Here we could trigger a rebalance, by counting number of routees per node and unregister - // routees from nodes with many routees and deploy on this new node instead + case other: MemberEvent ⇒ + // other events means that it is no longer interesting, such as + // MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved + upNodes -= other.member.address - case other: MemberEvent ⇒ - // other events means that it is no longer interesting, such as - // MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved - upNodes -= other.member.address + // createRoutees will not create more than createRoutees and maxInstancesPerNode + createRoutees(totalInstances) // Here we - // FIXME Should we deploy new routees corresponding to the ones that goes away here? - // or is that a job for a special Cluster Resizer? + } - } - - }), name = "cluster-listener") - } + }), name = "cluster-listener") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index ed2b338250..5cfd9aabf4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -40,8 +40,9 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { akka.actor.deployment { /service-hello { router = round-robin - nr-of-instances = 3 - cluster = on + nr-of-instances = 10 + cluster.enabled = on + cluster.max-nr-of-instances-per-node = 2 } } """)). @@ -59,52 +60,66 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou with ImplicitSender with DefaultTimeout { import ClusterRoundRobinRoutedActorMultiJvmSpec._ - // sorted in the order used by the cluster - lazy val sortedRoles = Seq(first, second, third, fourth).sorted + lazy val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") + + def receiveReplies(expectedReplies: Int): Map[Address, Int] = { + val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) + (receiveWhile(5 seconds, messages = expectedReplies) { + case ref: ActorRef ⇒ ref.path.address match { + case Address(_, _, None, None) ⇒ cluster.selfAddress + case a ⇒ a + } + }).foldLeft(zero) { + case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) + } + } "A cluster router configured with a RoundRobin router" must { - "start cluster" taggedAs LongRunningTest in { - awaitClusterUp(first, second, third, fourth) + "start cluster with 2 nodes" taggedAs LongRunningTest in { + awaitClusterUp(first, second) enterBarrier("after-1") } - "be locally instantiated on a cluster node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in { + "deploy routees to the member nodes in the cluster" taggedAs LongRunningTest in { - runOn(sortedRoles.dropRight(1): _*) { - enterBarrier("start", "broadcast-end", "end") - } - - runOn(sortedRoles.last) { - enterBarrier("start") - val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") + runOn(first) { actor.isInstanceOf[RoutedActorRef] must be(true) - val connectionCount = 3 val iterationCount = 10 - - for (i ← 0 until iterationCount; k ← 0 until connectionCount) { + for (i ← 0 until iterationCount) { actor ! "hit" } - val replies: Map[Address, Int] = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) { - case ref: ActorRef ⇒ ref.path.address - }).foldLeft(Map(node(first).address -> 0, node(second).address -> 0, node(third).address -> 0)) { - case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) - } + val replies = receiveReplies(iterationCount) - enterBarrier("broadcast-end") - actor ! Broadcast(PoisonPill) - - enterBarrier("end") - replies.values foreach { _ must be(iterationCount) } - replies.get(node(fourth).address) must be(None) - - // shut down the actor before we let the other node(s) shut down so we don't try to send - // "Terminate" to a shut down node - system.stop(actor) + replies(first) must be > (0) + replies(second) must be > (0) + replies(third) must be(0) + replies(fourth) must be(0) + replies.values.sum must be(iterationCount) } enterBarrier("after-2") } + + "deploy routees to new nodes in the cluster" taggedAs LongRunningTest in { + + // add third and fourth + awaitClusterUp(first, second, third, fourth) + + runOn(first) { + val iterationCount = 10 + for (i ← 0 until iterationCount) { + actor ! "hit" + } + + val replies = receiveReplies(iterationCount) + + replies.values.foreach { _ must be > (0) } + replies.values.sum must be(iterationCount) + } + + enterBarrier("after-3") + } } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index f5589b3f72..16c65986ee 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -8,6 +8,7 @@ import akka.routing._ import com.typesafe.config._ import akka.ConfigurationException +@SerialVersionUID(1L) case class RemoteScope(node: Address) extends Scope { def withFallback(other: Scope): Scope = this } diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index 01230605d2..183ba66beb 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -29,10 +29,11 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address]) ext def this(local: RouterConfig, nodes: java.lang.Iterable[Address]) = this(local, nodes.asScala) def this(local: RouterConfig, nodes: Array[Address]) = this(local, nodes: Iterable[Address]) - override def createRouteeProvider(context: ActorContext) = new RemoteRouteeProvider(nodes, context, resizer) + override def createRouteeProvider(context: ActorContext, routeeProps: Props) = + new RemoteRouteeProvider(nodes, context, routeeProps, resizer) - override def createRoute(routeeProps: Props, routeeProvider: RouteeProvider): Route = { - local.createRoute(routeeProps, routeeProvider) + override def createRoute(routeeProvider: RouteeProvider): Route = { + local.createRoute(routeeProvider) } override def createActor(): Router = local.createActor() @@ -55,8 +56,8 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address]) ext * * Routee paths may not be combined with remote target nodes. */ -class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _resizer: Option[Resizer]) - extends RouteeProvider(_context, _resizer) { +class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _routeeProps: Props, _resizer: Option[Resizer]) + extends RouteeProvider(_context, _routeeProps, _resizer) { if (nodes.isEmpty) throw new ConfigurationException("Must specify list of remote target.nodes for [%s]" format context.self.path.toString) @@ -68,12 +69,12 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _re throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]" format context.self.path.toString) - override def createRoutees(props: Props, nrOfInstances: Int): Unit = { + override def createRoutees(nrOfInstances: Int): Unit = { val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 val refs = IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { val name = "c" + i - val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next)) - impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, + val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(nodeAddressIter.next)) + impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name, systemService = false, Some(deploy), lookupDeploy = false, async = false) }) registerRoutees(refs)