diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 0d56418b18..aa5151c282 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -25,7 +25,7 @@ class ClusterActorRefProvider( _dynamicAccess: DynamicAccess) extends RemoteActorRefProvider( _systemName, _settings, _eventStream, _scheduler, _dynamicAccess) { - override val deployer: RemoteDeployer = new ClusterDeployer(settings, dynamicAccess) + override val deployer: ClusterDeployer = new ClusterDeployer(settings, dynamicAccess) } diff --git a/akka-cluster/src/main/scala/akka/cluster/package.scala b/akka-cluster/src/main/scala/akka/cluster/package.scala new file mode 100644 index 0000000000..2db1790362 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/package.scala @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import language.implicitConversions +import akka.actor.Props +import akka.routing.RouterConfig +import akka.actor.Deploy + +package object routing { + + /** + * Sugar to define cluster aware router programatically. + * Usage Scala API: + * [[[ + * import akka.cluster.routing.ClusterRouterProps + * context.actorOf(Props[SomeActor].withClusterRouter(RoundRobinRouter(), + * totalInstances = 10, maxInstancesPerNode = 2), "myrouter") + * ]]] + * + * Corresponding for Java API is found in [[akka.cluster.routing.ClusterRouterPropsDecorator]]. + */ + implicit class ClusterRouterProps(val props: Props) extends AnyVal { + + /* + * Without this helper it would look as ugly as: + * val router = RoundRobinRouter(nrOfInstances = 10) + * val actor = system.actorOf(Props[SomeActor].withRouter(router).withDeploy( + * Deploy(routerConfig = ClusterRouterConfig(router, totalInstances = router.nrOfInstances, maxInstancesPerNode = 2))), + * "myrouter") + */ + + def withClusterRouter(router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int): Props = { + props.withRouter(router).withDeploy( + Deploy(routerConfig = ClusterRouterConfig(router, totalInstances, maxInstancesPerNode))) + } + } + +} diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 945497677c..7af2bfd185 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -28,6 +28,7 @@ import akka.routing.Router import akka.routing.RouterConfig import java.lang.IllegalStateException import akka.cluster.ClusterScope +import akka.routing.RoundRobinRouter /** * [[akka.routing.RouterConfig]] implementation for deployment on cluster nodes. @@ -75,10 +76,18 @@ class ClusterRouteeProvider( throw new ConfigurationException("Cluster deployment can not be combined with routees for [%s]" format context.self.path.toString) + /** + * Note that nrOfInstances is ignored for cluster routers, instead + * the `totalInstances` parameter is used. That is the same when + * using config to define `nr-of-instances`, but when defining the + * router programatically or using [[akka.routing.Resizer]] they + * might be different. `totalInstances` is the relevant parameter + * to use for cluster routers. + */ override def createRoutees(nrOfInstances: Int): Unit = { val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 - for (i ← 1 to nrOfInstances; target ← selectDeploymentTarget) { + for (i ← 1 to totalInstances; target ← selectDeploymentTarget) { val name = "c" + childNameCounter.incrementAndGet val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(target)) var ref = impl.provider.actorOf(impl, routeeProps, context.self.asInstanceOf[InternalActorRef], context.self.path / name, @@ -152,3 +161,19 @@ class ClusterRouteeProvider( } +/** + * Sugar to define cluster aware router programatically. + * Usage Java API: + * [[[ + * context.actorOf(ClusterRouterPropsDecorator.decorate(new Props(MyActor.class), + * new RoundRobinRouter(0), 10, 2), "myrouter"); + * ]]] + * + * Corresponding for Scala API is found in [[akka.cluster.routing.ClusterRouterProps]]. + * + */ +object ClusterRouterPropsDecorator { + def decorate(props: Props, router: RouterConfig, totalInstances: Int, maxInstancesPerNode: Int): Props = + props.withClusterRouter(router, totalInstances, maxInstancesPerNode) +} + diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index 5cfd9aabf4..3173984f68 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -21,6 +21,7 @@ import scala.concurrent.util.duration._ import akka.cluster.MultiNodeClusterSpec import com.typesafe.config.ConfigFactory import akka.cluster.Cluster +import akka.actor.Deploy object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { @@ -41,8 +42,8 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { /service-hello { router = round-robin nr-of-instances = 10 - cluster.enabled = on - cluster.max-nr-of-instances-per-node = 2 + #cluster.enabled = on + #cluster.max-nr-of-instances-per-node = 2 } } """)). @@ -60,7 +61,12 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou with ImplicitSender with DefaultTimeout { import ClusterRoundRobinRoutedActorMultiJvmSpec._ - lazy val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") + // lazy val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") + lazy val router = { + import akka.cluster.routing.ClusterRouterProps + system.actorOf(Props[SomeActor].withClusterRouter(RoundRobinRouter(), + totalInstances = 10, maxInstancesPerNode = 2), "service-foo") + } def receiveReplies(expectedReplies: Int): Map[Address, Int] = { val zero = Map.empty[Address, Int] ++ roles.map(address(_) -> 0) @@ -83,11 +89,11 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou "deploy routees to the member nodes in the cluster" taggedAs LongRunningTest in { runOn(first) { - actor.isInstanceOf[RoutedActorRef] must be(true) + router.isInstanceOf[RoutedActorRef] must be(true) val iterationCount = 10 for (i ← 0 until iterationCount) { - actor ! "hit" + router ! "hit" } val replies = receiveReplies(iterationCount) @@ -110,7 +116,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou runOn(first) { val iterationCount = 10 for (i ← 0 until iterationCount) { - actor ! "hit" + router ! "hit" } val replies = receiveReplies(iterationCount)