diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 5bd15fffde..1d18e7ed2e 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -64,9 +64,9 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo _routeeProvider = routerConfig.createRouteeProvider(this, routeeProps) val r = routerConfig.createRoute(routeeProvider) // initial resize, before message send - routerConfig.resizer foreach { r ⇒ - if (r.isTimeForResize(resizeCounter.getAndIncrement())) - r.resize(routeeProvider) + routerConfig.resizer foreach { resizer ⇒ + if (resizer.isTimeForResize(resizeCounter.getAndIncrement())) + resizer.resize(routeeProvider) } r } @@ -106,8 +106,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo * `Resizer.resize` */ private[akka] def removeRoutees(abandonedRoutees: Iterable[ActorRef]): Unit = { - _routees = _routees diff abandonedRoutees.toSeq - abandonedRoutees foreach unwatch + _routees = abandonedRoutees.foldLeft(_routees) { (xs, x) ⇒ unwatch(x); xs.filterNot(_ == x) } } override def tell(message: Any, sender: ActorRef): Unit = { @@ -157,11 +156,28 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo */ trait RouterConfig { + /** + * Implement the routing logic by returning a partial function of + * partial function from (sender, message) to a set of destinations. + * This `Route` will be applied for each incoming message. + * + * When `createRoute` is called the routees should also be registered, + * typically by using `createRoutees` or `registerRouteesFor` of the + * supplied `RouteeProvider`. + */ def createRoute(routeeProvider: RouteeProvider): Route + /** + * The `RouteeProvider` responsible for creating or + * looking up routees. It's used in `createRoute` to register routees, + * and also from [[akka.routing.Resizer]]. + */ def createRouteeProvider(context: ActorContext, routeeProps: Props): RouteeProvider = new RouteeProvider(context, routeeProps, resizer) + /** + * The router "head" actor. + */ def createActor(): Router = new Router { override def supervisorStrategy: SupervisorStrategy = RouterConfig.this.supervisorStrategy } @@ -243,10 +259,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi /** * Looks up routes with specified paths and registers them. */ - def registerRouteesFor(paths: Iterable[String]): Unit = { - val routees = paths.map(context.actorFor(_))(scala.collection.breakOut) - registerRoutees(routees) - } + def registerRouteesFor(paths: Iterable[String]): Unit = registerRoutees(paths.map(context.actorFor(_))) /** * Looks up routes with specified paths and registers them. @@ -260,10 +273,8 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi def createRoutees(nrOfInstances: Int): Unit = { if (nrOfInstances <= 0) throw new IllegalArgumentException( "Must specify nrOfInstances or routees for [%s]" format context.self.path.toString) - else { - val routees = (1 to nrOfInstances).map(_ ⇒ context.actorOf(routeeProps))(scala.collection.breakOut) - registerRoutees(routees) - } + else + registerRoutees(IndexedSeq.fill(nrOfInstances)(context.actorOf(routeeProps))) } /** @@ -277,7 +288,7 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi throw new IllegalArgumentException("Expected positive nrOfInstances, got [%s]".format(nrOfInstances)) } else if (nrOfInstances > 0) { val currentRoutees = routees - val (keep, abandon) = currentRoutees.splitAt(currentRoutees.length - nrOfInstances) + val abandon = currentRoutees.drop(currentRoutees.length - nrOfInstances) unregisterRoutees(abandon) delayedStop(context.system.scheduler, abandon, stopDelay) } @@ -293,8 +304,10 @@ class RouteeProvider(val context: ActorContext, val routeeProps: Props, val resi abandon foreach (_ ! PoisonPill) } else { import context.dispatcher + // Iterable could potentially be mutable + val localAbandon = abandon.toIndexedSeq scheduler.scheduleOnce(stopDelay) { - abandon foreach (_ ! PoisonPill) + localAbandon foreach (_ ! PoisonPill) } } } diff --git a/akka-actor/src/main/scala/akka/routing/package.scala b/akka-actor/src/main/scala/akka/routing/package.scala index c1672fc0e2..0b40793861 100644 --- a/akka-actor/src/main/scala/akka/routing/package.scala +++ b/akka-actor/src/main/scala/akka/routing/package.scala @@ -5,5 +5,9 @@ package akka package object routing { + /** + * Routing logic, partial function from (sender, message) to a + * set of destinations. + */ type Route = PartialFunction[(akka.actor.ActorRef, Any), Iterable[Destination]] } diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index bfe419ec4d..60e69c1984 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -128,11 +128,13 @@ akka { # Defines if routees are allowed to be located on the same node as # the head router actor, or only on remote nodes. # Useful for master-worker scenario where all routees are remote. - routees-on-own-node = on + allow-local-routees = on # Actor path of the routees to lookup with actorFor on the member # nodes in the cluster. E.g. "/user/myservice". If this isn't defined # the routees will be deployed instead of looked up. + # max-nr-of-instances-per-node should not be configured (default value is 1) + # when routees-path is defined. routees-path = "" } diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 496836f435..647f335b8c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -61,13 +61,15 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { import ClusterEvent._ - if (!system.provider.isInstanceOf[ClusterActorRefProvider]) - throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'ClusterActorRefProvider' enabled in the configuration") - val settings = new ClusterSettings(system.settings.config, system.name) import settings._ - val selfAddress = system.provider.asInstanceOf[ClusterActorRefProvider].transport.address + val selfAddress = system.provider match { + case c: ClusterActorRefProvider ⇒ c.transport.address + case other ⇒ throw new ConfigurationException( + "ActorSystem [%s] needs to have a 'ClusterActorRefProvider' enabled in the configuration, currently uses [%s]". + format(system, other.getClass.getName)) + } private val _isRunning = new AtomicBoolean(true) private val log = Logging(system, "Cluster") diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 17e7cc0515..6f35dc1660 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -43,7 +43,7 @@ private[akka] class ClusterDeployer(_settings: ActorSystem.Settings, _pm: Dynami val clusterRouterSettings = ClusterRouterSettings( totalInstances = deploy.config.getInt("nr-of-instances"), maxInstancesPerNode = deploy.config.getInt("cluster.max-nr-of-instances-per-node"), - routeesOnOwnNode = deploy.config.getBoolean("cluster.routees-on-own-node"), + routeesOnOwnNode = deploy.config.getBoolean("cluster.allow-local-routees"), routeesPath = deploy.config.getString("cluster.routees-path")) Some(deploy.copy( diff --git a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala index 0f489b2738..e6fcc190f3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala +++ b/akka-cluster/src/main/scala/akka/cluster/routing/ClusterRouterConfig.scala @@ -39,7 +39,8 @@ import akka.actor.RelativeActorPath * which makes it possible to mix this with the built-in routers such as * [[akka.routing.RoundRobinRouter]] or custom routers. */ -case class ClusterRouterConfig(local: RouterConfig, settings: ClusterRouterSettings) extends RouterConfig { +@SerialVersionUID(1L) +final case class ClusterRouterConfig(local: RouterConfig, settings: ClusterRouterSettings) extends RouterConfig { override def createRouteeProvider(context: ActorContext, routeeProps: Props) = new ClusterRouteeProvider(context, routeeProps, resizer, settings) @@ -97,6 +98,12 @@ object ClusterRouterSettings { apply(totalInstances, routeesPath, routeesOnOwnNode = true) } +/** + * `totalInstances` of cluster router must be > 0 + * `maxInstancesPerNode` of cluster router must be > 0 + * `maxInstancesPerNode` of cluster router must be 1 when routeesPath is defined + */ +@SerialVersionUID(1L) case class ClusterRouterSettings private[akka] ( totalInstances: Int, maxInstancesPerNode: Int, @@ -128,7 +135,7 @@ case class ClusterRouterSettings private[akka] ( throw new IllegalArgumentException("routeesPath [%s] is not a valid relative actor path" format routeesPath) } - def isRouteesPathDefined: Boolean = ((routeesPath ne null) && routeesPath != "") + def isRouteesPathDefined: Boolean = (routeesPath ne null) && routeesPath != "" } @@ -161,8 +168,6 @@ private[akka] class ClusterRouteeProvider( * to use for cluster routers. */ override def createRoutees(nrOfInstances: Int): Unit = { - val impl = context.system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 - for (i ← 1 to settings.totalInstances; target ← selectDeploymentTarget) { val ref = if (settings.isRouteesPathDefined) { @@ -181,12 +186,11 @@ private[akka] class ClusterRouteeProvider( private def selectDeploymentTarget: Option[Address] = { val currentRoutees = routees - val currentNodes = availbleNodes if (currentRoutees.size >= settings.totalInstances) { None } else { val numberOfRouteesPerNode: Map[Address, Int] = - Map.empty[Address, Int] ++ currentNodes.toSeq.map(_ -> 0) ++ + Map.empty[Address, Int] ++ availbleNodes.toSeq.map(_ -> 0) ++ currentRoutees.groupBy(fullAddress).map { case (address, refs) ⇒ address -> refs.size } @@ -244,8 +248,8 @@ private[akka] class ClusterRouterActor extends Router { // lazy to not interfere with RoutedActorCell initialization lazy val routeeProvider: ClusterRouteeProvider = ref.routeeProvider match { case x: ClusterRouteeProvider ⇒ x - case _ ⇒ - throw new IllegalStateException("ClusterRouteeProvider must be used together with [%s]".format(this.getClass)) + case _ ⇒ throw new IllegalStateException( + "ClusterRouteeProvider must be used together with [%s]".format(getClass)) } def cluster: Cluster = routeeProvider.cluster diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala index 0405b152a5..3e3d607679 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/routing/ClusterRoundRobinRoutedActorSpec.scala @@ -60,7 +60,7 @@ object ClusterRoundRobinRoutedActorMultiJvmSpec extends MultiNodeConfig { cluster { enabled = on max-nr-of-instances-per-node = 1 - routees-on-own-node = off + allow-local-routees = off } } /router4 { @@ -202,7 +202,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou enterBarrier("after-5") } - "deploy routees to only remote nodes when routees-on-own-node = off" taggedAs LongRunningTest in { + "deploy routees to only remote nodes when allow-local-routees = off" taggedAs LongRunningTest in { runOn(first) { val iterationCount = 10 diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 013f94b408..61c62ad69d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -19,13 +19,13 @@ object ClusterDeployerSpec { nr-of-instances = 20 cluster.enabled = on cluster.max-nr-of-instances-per-node = 3 - cluster.routees-on-own-node = off + cluster.allow-local-routees = off } /user/service2 { router = round-robin nr-of-instances = 20 cluster.enabled = on - cluster.routees-on-own-node = off + cluster.allow-local-routees = off cluster.routees-path = "/user/myservice" } } diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index f800872657..7075aa5ea7 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -25,7 +25,8 @@ import akka.actor.ActorCell * which makes it possible to mix this with the built-in routers such as * [[akka.routing.RoundRobinRouter]] or custom routers. */ -case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address]) extends RouterConfig { +@SerialVersionUID(1L) +final case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address]) extends RouterConfig { def this(local: RouterConfig, nodes: java.lang.Iterable[Address]) = this(local, nodes.asScala) def this(local: RouterConfig, nodes: Array[Address]) = this(local, nodes: Iterable[Address]) @@ -59,7 +60,7 @@ case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[Address]) ext * * Routee paths may not be combined with remote target nodes. */ -class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _routeeProps: Props, _resizer: Option[Resizer]) +final class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _routeeProps: Props, _resizer: Option[Resizer]) extends RouteeProvider(_context, _routeeProps, _resizer) { if (nodes.isEmpty) throw new ConfigurationException("Must specify list of remote target.nodes for [%s]" @@ -75,7 +76,7 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _ro format context.self.path.toString) override def createRoutees(nrOfInstances: Int): Unit = { - val refs = IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { + val refs = IndexedSeq.fill(nrOfInstances) { val name = "c" + childNameCounter.incrementAndGet val deploy = Deploy("", ConfigFactory.empty(), routeeProps.routerConfig, RemoteScope(nodeAddressIter.next)) @@ -83,7 +84,7 @@ class RemoteRouteeProvider(nodes: Iterable[Address], _context: ActorContext, _ro // context and use RepointableActorRef instead of LocalActorRef. Seems like a slightly sub-optimal // choice in a corner case (and hence not worth fixing). context.asInstanceOf[ActorCell].attachChild(routeeProps.withDeploy(deploy), name, systemService = false) - }) + } registerRoutees(refs) } }