diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index b2a7849f0e..7094397f00 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -126,5 +126,10 @@ akka { # Useful for master-worker scenario where all routees are remote. deploy-on-own-node = on + # Actor path of the routees to lookup with actorFor on the member + # nodes in the cluster. E.g. "/user/myservice". If this isn't defined + # the routees will be deployed instead of looked up. + routees-path = "" + } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index b81cdd8812..e2824daffc 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -43,7 +43,8 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami val clusterRouterSettings = ClusterRouterSettings( totalInstances = deploy.config.getInt("nr-of-instances"), maxInstancesPerNode = deploy.config.getInt("cluster.max-nr-of-instances-per-node"), - deployOnOwnNode = deploy.config.getBoolean("cluster.deploy-on-own-node")) + deployOnOwnNode = deploy.config.getBoolean("cluster.deploy-on-own-node"), + routeesPath = deploy.config.getString("cluster.routees-path")) Some(deploy.copy( routerConfig = ClusterRouterConfig(deploy.routerConfig, clusterRouterSettings), scope = ClusterScope)) diff --git a/akka-cluster/src/main/scala/akka/cluster/package.scala b/akka-cluster/src/main/scala/akka/cluster/package.scala index bdd818603e..ece0d5d8ce 100644 --- a/akka-cluster/src/main/scala/akka/cluster/package.scala +++ b/akka-cluster/src/main/scala/akka/cluster/package.scala @@ -33,10 +33,10 @@ package object routing { */ def withClusterRouter(router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int, - deployOnOwnNode: Boolean = true): Props = { + deployOnOwnNode: Boolean = true, routeesPath: String = ""): Props = { props.withRouter(router).withDeploy( Deploy(routerConfig = ClusterRouterConfig(router, - ClusterRouterSettings(totalInstances, maxInstancesPerNode, deployOnOwnNode)))) + ClusterRouterSettings(totalInstances, maxInstancesPerNode, deployOnOwnNode, routeesPath)))) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index befe48861e..74b890ce7a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -29,6 +29,7 @@ import akka.routing.RouteeProvider import akka.routing.Router import akka.routing.RouterConfig import akka.routing.RemoteRouterConfig +import akka.actor.RootActorPath /** * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. @@ -68,7 +69,14 @@ case class ClusterRouterConfig(local: RouterConfig, settings: ClusterRouterSetti } } -case class ClusterRouterSettings(totalInstances: Int, maxInstancesPerNode: Int, deployOnOwnNode: Boolean) +case class ClusterRouterSettings( + totalInstances: Int, + maxInstancesPerNode: Int, + deployOnOwnNode: Boolean, + routeesPath: String) { + + def isRouteesPathDefined: Boolean = ((routeesPath ne null) && routeesPath != "") +} /** * INTERNAL API @@ -102,10 +110,15 @@ private[akka] class ClusterRouteeProvider( val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 for (i ← 1 to settings.totalInstances; target ← selectDeploymentTarget) { - val name = "c" + childNameCounter.incrementAndGet - val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) - var ref = impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name, - systemService = false, Some(deploy), lookupDeploy = false, async = false) + val ref = + if (settings.isRouteesPathDefined) { + context.actorFor(target.toString + settings.routeesPath) + } else { + val name = "c" + childNameCounter.incrementAndGet + val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) + impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name, + systemService = false, Some(deploy), lookupDeploy = false, async = false) + } // must register each one, since registered routees are used in selectDeploymentTarget registerRoutees(Some(ref)) } @@ -217,7 +230,13 @@ private[akka] class ClusterRouterActor extends Router { */ object ClusterRouterPropsDecorator { def decorate(props: Props, router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int, - deployOnOwnNode: Boolean): Props = + deployOnOwnNode: Boolean): Props = { props.withClusterRouter(router, totalInstances, maxInstancesPerNode, deployOnOwnNode) + } + + def decorate(props: Props, router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int, + deployOnOwnNode: Boolean, routeesPath: String): Props = { + props.withClusterRouter(router, totalInstances, maxInstancesPerNode, deployOnOwnNode, routeesPath) + } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index 9bbd778410..7ae98bf261 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -23,12 +23,20 @@ import akka.testkit._ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { - class SomeActor extends Actor { + class SomeActor(routeeType: RouteeType) extends Actor { + def this() = this(DeployRoutee) + def receive = { - case "hit" ⇒ sender ! self + case "hit" ⇒ sender ! Reply(routeeType, self) } } + case class Reply(routeeType: RouteeType, ref: ActorRef) + + sealed trait RouteeType extends Serializable + object DeployRoutee extends RouteeType + object LookupRoutee extends RouteeType + val first = role("first") val second = role("second") val third = role("third") @@ -54,6 +62,15 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { deploy-on-own-node = off } } + /router4 { + router = round-robin + nr-of-instances = 10 + cluster { + enabled = on + max-nr-of-instances-per-node = 2 + routees-path = "/user/myservice" + } + } } """)). withFallback(MultiNodeClusterSpec.clusterConfig)) @@ -77,11 +94,12 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou totalInstances = 3, maxInstancesPerNode = 1, deployOnOwnNode = true), "router2") } lazy val router3 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router3") + lazy val router4 = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "router4") - def receiveReplies(expectedReplies: Int): Map[Address, Int] = { + def receiveReplies(routeeType: RouteeType, expectedReplies: Int): Map[Address, Int] = { val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) (receiveWhile(5 seconds, messages = expectedReplies) { - case ref: ActorRef ⇒ fullAddress(ref) + case Reply(`routeeType`, ref) ⇒ fullAddress(ref) }).foldLeft(zero) { case (replyMap, address) ⇒ replyMap + (address -> (replyMap(address) + 1)) } @@ -111,7 +129,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou router1 ! "hit" } - val replies = receiveReplies(iterationCount) + val replies = receiveReplies(DeployRoutee, iterationCount) replies(first) must be > (0) replies(second) must be > (0) @@ -123,6 +141,31 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou enterBarrier("after-2") } + "lookup routees on the member nodes in the cluster" taggedAs LongRunningTest in { + + // cluster consists of first and second + + system.actorOf(Props(new SomeActor(LookupRoutee)), "myservice") + enterBarrier("myservice-started") + + runOn(first) { + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router4 ! "hit" + } + + val replies = receiveReplies(LookupRoutee, iterationCount) + + replies(first) must be > (0) + replies(second) must be > (0) + replies(third) must be(0) + replies(fourth) must be(0) + replies.values.sum must be(iterationCount) + } + + enterBarrier("after-3") + } + "deploy routees to new nodes in the cluster" taggedAs LongRunningTest in { // add third and fourth @@ -134,13 +177,32 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou router1 ! "hit" } - val replies = receiveReplies(iterationCount) + val replies = receiveReplies(DeployRoutee, iterationCount) replies.values.foreach { _ must be > (0) } replies.values.sum must be(iterationCount) } - enterBarrier("after-3") + enterBarrier("after-4") + } + + "lookup routees on new nodes in the cluster" taggedAs LongRunningTest in { + + // cluster consists of first, second, third and fourth + + runOn(first) { + val iterationCount = 10 + for (i ← 0 until iterationCount) { + router4 ! "hit" + } + + val replies = receiveReplies(LookupRoutee, iterationCount) + + replies.values.foreach { _ must be > (0) } + replies.values.sum must be(iterationCount) + } + + enterBarrier("after-5") } "deploy routees to only remote nodes when deploy-on-own-node = off" taggedAs LongRunningTest in { @@ -151,7 +213,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou router3 ! "hit" } - val replies = receiveReplies(iterationCount) + val replies = receiveReplies(DeployRoutee, iterationCount) replies(first) must be(0) replies(second) must be > (0) @@ -160,7 +222,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou replies.values.sum must be(iterationCount) } - enterBarrier("after-4") + enterBarrier("after-6") } "deploy programatically defined routees to the member nodes in the cluster" taggedAs LongRunningTest in { @@ -173,7 +235,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou router2 ! "hit" } - val replies = receiveReplies(iterationCount) + val replies = receiveReplies(DeployRoutee, iterationCount) // note that router2 has totalInstances = 3, maxInstancesPerNode = 1 val currentRoutees = Await.result(router2 ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees @@ -183,7 +245,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou replies.values.sum must be(iterationCount) } - enterBarrier("after-5") + enterBarrier("after-7") } "deploy programatically defined routees to other node when a node becomes down" taggedAs LongRunningTest in { @@ -205,13 +267,13 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou router2 ! "hit" } - val replies = receiveReplies(iterationCount) + val replies = receiveReplies(DeployRoutee, iterationCount) routeeAddresses.size must be(3) replies.values.sum must be(iterationCount) } - enterBarrier("after-6") + enterBarrier("after-8") } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 5c2b233c7d..251f58ab90 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -20,6 +20,7 @@ object ClusterDeployerSpec { cluster.enabled = on cluster.max-nr-of-instances-per-node = 3 cluster.deploy-on-own-node = off + cluster.routees-path = "/user/myservice" } } akka.remote.netty.port = 0 @@ -45,7 +46,7 @@ class ClusterDeployerSpec extends AkkaSpec(ClusterDeployerSpec.deployerConf) { Deploy( service, deployment.get.config, - ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings(20, 3, false)), + ClusterRouterConfig(RoundRobinRouter(20), ClusterRouterSettings(20, 3, false, "/user/myservice")), ClusterScope))) }