diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 7af2bfd185..2103820895 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -150,10 +150,16 @@ class ClusterRouteeProvider( case other: MemberEvent ⇒ // other events means that it is no longer interesting, such as // MemberJoined, MemberLeft, MemberExited, MemberUnreachable, MemberRemoved - upNodes -= other.member.address + val address = other.member.address + upNodes -= address + + // unregister routees that live on that node + val affectedRoutes = routees.filter(fullAddress(_) == address) + unregisterRoutees(affectedRoutes) // createRoutees will not create more than createRoutees and maxInstancesPerNode - createRoutees(totalInstances) // Here we + // this is useful when totalInstances < upNodes.size + createRoutees(totalInstances) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index 3173984f68..cac3cd67fe 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -22,6 +22,8 @@ import akka.cluster.MultiNodeClusterSpec import com.typesafe.config.ConfigFactory import akka.cluster.Cluster import akka.actor.Deploy +import akka.routing.CurrentRoutees +import akka.routing.RouterRoutees object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { @@ -39,11 +41,11 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" akka.actor.deployment { - /service-hello { + /router1 { router = round-robin nr-of-instances = 10 - #cluster.enabled = on - #cluster.max-nr-of-instances-per-node = 2 + cluster.enabled = on + cluster.max-nr-of-instances-per-node = 2 } } """)). @@ -61,25 +63,30 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou with ImplicitSender with DefaultTimeout { import ClusterRoundRobinRoutedActorMultiJvmSpec._ - // lazy val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") - lazy val router = { + lazy val router1 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router1") + lazy val router2 = { import akka.cluster.routing.ClusterRouterProps system.actorOf(Props[SomeActor].withClusterRouter(RoundRobinRouter(), - totalInstances = 10, maxInstancesPerNode = 2), "service-foo") + totalInstances = 3, maxInstancesPerNode = 1), "router2") } def receiveReplies(expectedReplies: Int): Map[Address, Int] = { val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) (receiveWhile(5 seconds, messages = expectedReplies) { - case ref: ActorRef ⇒ ref.path.address match { - case Address(_, _, None, None) ⇒ cluster.selfAddress - case a ⇒ a - } + case ref: ActorRef ⇒ fullAddress(ref) }).foldLeft(zero) { case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) } } + /** + * Fills in self address for local ActorRef + */ + private def fullAddress(actorRef: ActorRef): Address = actorRef.path.address match { + case Address(_, _, None, None) ⇒ cluster.selfAddress + case a ⇒ a + } + "A cluster router configured with a RoundRobin router" must { "start cluster with 2 nodes" taggedAs LongRunningTest in { awaitClusterUp(first, second) @@ -89,11 +96,11 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou "deploy routees to the member nodes in the cluster" taggedAs LongRunningTest in { runOn(first) { - router.isInstanceOf[RoutedActorRef] must be(true) + router1.isInstanceOf[RoutedActorRef] must be(true) val iterationCount = 10 for (i ← 0 until iterationCount) { - router ! "hit" + router1 ! "hit" } val replies = receiveReplies(iterationCount) @@ -116,7 +123,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou runOn(first) { val iterationCount = 10 for (i ← 0 until iterationCount) { - router ! "hit" + router1 ! "hit" } val replies = receiveReplies(iterationCount) @@ -128,4 +135,57 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou enterBarrier("after-3") } } + + "A programatically defined RoundRobin cluster router" must { + "deploy routees to the member nodes in the cluster" taggedAs LongRunningTest in { + + runOn(first) { + router2.isInstanceOf[RoutedActorRef] must be(true) + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router2 ! "hit" + } + + val replies = receiveReplies(iterationCount) + + // note that router2 has totalInstances = 3, maxInstancesPerNode = 1 + val currentRoutees = Await.result(router2 ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees + val routeeAddresses = currentRoutees map fullAddress + + routeeAddresses.size must be(3) + replies.values.sum must be(iterationCount) + } + + enterBarrier("after-4") + } + + "deploy to other node when a node is down" taggedAs LongRunningTest in { + + runOn(first) { + def currentRoutees = Await.result(router2 ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees + def routeeAddresses = (currentRoutees map fullAddress).toSet + + val notUsedAddress = ((roles map address).toSet -- routeeAddresses).head + + val downAddress = routeeAddresses.find(_ != address(first)).get + cluster.down(downAddress) + awaitCond { + routeeAddresses.contains(notUsedAddress) && !routeeAddresses.contains(downAddress) + } + + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router2 ! "hit" + } + + val replies = receiveReplies(iterationCount) + + routeeAddresses.size must be(3) + replies.values.sum must be(iterationCount) + } + + enterBarrier("after-5") + } + } }