diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala index f2443d6edb..d8084c0f37 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeployerSpec.scala @@ -64,7 +64,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { service, deployment.get.config, NoRouter, - LocalScope))) + NoScope))) } "use None deployment for undefined service" in { @@ -117,9 +117,9 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) { val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service) deployment must be('defined) deployment.get.path must be(service) - deployment.get.routing.getClass must be(expected.getClass) - deployment.get.routing.resizer must be(expected.resizer) - deployment.get.scope must be(LocalScope) + deployment.get.routerConfig.getClass must be(expected.getClass) + deployment.get.routerConfig.resizer must be(expected.resizer) + deployment.get.scope must be(NoScope) } } diff --git a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala index f2707e042c..cc6fd9b852 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ConfiguredLocalRoutingSpec.scala @@ -1,12 +1,19 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ package akka.routing -import akka.actor._ -import akka.routing._ import java.util.concurrent.atomic.AtomicInteger -import akka.testkit._ -import akka.util.duration._ + +import org.junit.runner.RunWith + +import akka.actor.actorRef2Scala +import akka.actor.{ Props, LocalActorRef, Deploy, Actor } +import akka.config.ConfigurationException import akka.dispatch.Await import akka.pattern.ask +import akka.testkit.{ TestLatch, ImplicitSender, DefaultTimeout, AkkaSpec } +import akka.util.duration.intToDurationInt object ConfiguredLocalRoutingSpec { val config = """ @@ -16,6 +23,12 @@ object ConfiguredLocalRoutingSpec { core-pool-size-min = 8 core-pool-size-max = 16 } + deployment { + /config { + router = random + nr-of-instances = 4 + } + } } } """ @@ -24,18 +37,52 @@ object ConfiguredLocalRoutingSpec { @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.config) with DefaultTimeout with ImplicitSender { - val deployer = system.asInstanceOf[ActorSystemImpl].provider.deployer - "RouterConfig" must { + "be picked up from Props" in { + val actor = system.actorOf(Props(new Actor { + def receive = { + case "get" ⇒ sender ! context.props + } + }).withRouter(RoundRobinRouter(12)), "someOther") + actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RoundRobinRouter(12) + system.stop(actor) + } + "be overridable in config" in { - deployer.deploy(Deploy("/config", null, RandomRouter(4), LocalScope)) val actor = system.actorOf(Props(new Actor { def receive = { case "get" ⇒ sender ! context.props } }).withRouter(RoundRobinRouter(12)), "config") actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RandomRouter(4) + system.stop(actor) + } + + "be overridable in explicit deployment" in { + val actor = system.actorOf(Props(new Actor { + def receive = { + case "get" ⇒ sender ! context.props + } + }).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "someOther") + actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RoundRobinRouter(12) + system.stop(actor) + } + + "be overridable in config even with explicit deployment" in { + val actor = system.actorOf(Props(new Actor { + def receive = { + case "get" ⇒ sender ! context.props + } + }).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "config") + actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RandomRouter(4) + system.stop(actor) + } + + "fail with an exception if not correct" in { + intercept[ConfigurationException] { + system.actorOf(Props.empty.withRouter(FromConfig)) + } } } diff --git a/akka-actor/src/main/java/com/typesafe/config/ConfigValue.java b/akka-actor/src/main/java/com/typesafe/config/ConfigValue.java index 1f389be08f..903b8c8c8f 100644 --- a/akka-actor/src/main/java/com/typesafe/config/ConfigValue.java +++ b/akka-actor/src/main/java/com/typesafe/config/ConfigValue.java @@ -18,7 +18,7 @@ package com.typesafe.config; * interface is likely to grow new methods over time, so third-party * implementations will break. */ -public interface ConfigValue extends ConfigMergeable { +public interface ConfigValue extends ConfigMergeable, java.io.Serializable { /** * The origin of the value (file, line number, etc.), for debugging and * error messages. diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfig.java b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfig.java index a87b12316a..9655cba12b 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfig.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfig.java @@ -28,7 +28,7 @@ import com.typesafe.config.ConfigValueType; * with a one-level java.util.Map from paths to non-null values. Null values are * not "in" the map. */ -final class SimpleConfig implements Config, MergeableValue { +final class SimpleConfig implements Config, MergeableValue, java.io.Serializable { final private AbstractConfigObject object; diff --git a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigOrigin.java b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigOrigin.java index 4f8859f050..74d7bd999d 100644 --- a/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigOrigin.java +++ b/akka-actor/src/main/java/com/typesafe/config/impl/SimpleConfigOrigin.java @@ -17,7 +17,7 @@ import com.typesafe.config.ConfigOrigin; // it would be cleaner to have a class hierarchy for various origin types, // but was hoping this would be enough simpler to be a little messy. eh. -final class SimpleConfigOrigin implements ConfigOrigin { +final class SimpleConfigOrigin implements ConfigOrigin, java.io.Serializable { final private String description; final private int lineNumber; final private int endLineNumber; diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 999c4286c2..65ffac2342 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -97,7 +97,8 @@ akka { paths = [] } - # Routers with dynamically resizable number of routees + # Routers with dynamically resizable number of routees; this feature is enabled + # by including (parts of) this section in the deployment resizer { # The fewest number of routees the router should ever have. diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 5aaf4ae8d5..783f50f82c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -225,7 +225,7 @@ private[akka] class ActorCell( } } } - val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None) + val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None, true) childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor)) actor } diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index 5ba0ae4600..8997b119e8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -17,6 +17,9 @@ object ActorPath { rec(s.length, Nil) } + /** + * Parse string as actor path; throws java.net.MalformedURLException if unable to do so. + */ def fromString(s: String): ActorPath = s match { case ActorPathExtractor(addr, elems) ⇒ RootActorPath(addr) / elems case _ ⇒ throw new MalformedURLException("cannot parse as ActorPath: " + s) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 56c3389072..532395c172 100755 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -10,6 +10,7 @@ import akka.routing._ import akka.AkkaException import akka.util.{ Switch, Helpers } import akka.event._ +import com.typesafe.config.ConfigFactory /** * Interface for all ActorRef providers to implement. @@ -97,7 +98,14 @@ trait ActorRefProvider { * in case of remote supervision). If systemService is true, deployment is * bypassed (local-only). */ - def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef + def actorOf( + system: ActorSystemImpl, + props: Props, + supervisor: InternalActorRef, + path: ActorPath, + systemService: Boolean, + deploy: Option[Deploy], + lookupDeploy: Boolean): InternalActorRef /** * Create actor reference for a specified local or remote path. If no such @@ -454,10 +462,10 @@ class LocalActorRefProvider( } lazy val guardian: InternalActorRef = - actorOf(system, guardianProps, rootGuardian, rootPath / "user", true, None) + actorOf(system, guardianProps, rootGuardian, rootPath / "user", true, None, false) lazy val systemGuardian: InternalActorRef = - actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None) + actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None, false) lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) @@ -510,15 +518,15 @@ class LocalActorRefProvider( case x ⇒ x } - def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = { + def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, + systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean): InternalActorRef = { props.routerConfig match { case NoRouter ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor case router ⇒ - val depl = deploy orElse { - val lookupPath = path.elements.drop(1).mkString("/", "/", "") - deployer.lookup(lookupPath) - } - new RoutedActorRef(system, props.withRouter(router.adaptFromDeploy(depl)), supervisor, path) + val lookup = if (lookupDeploy) deployer.lookup(path.elements.drop(1).mkString("/", "/", "")) else None + val fromProps = props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router) :: Nil + val d = lookup.toList ::: deploy.toList ::: fromProps reduceRight (_ withFallback _) + new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index 956657dc7c..87c1af897b 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -4,6 +4,7 @@ package akka.actor import java.net.URI import java.net.URISyntaxException +import java.net.MalformedURLException /** * The address specifies the physical location under which an Actor can be @@ -56,6 +57,9 @@ object RelativeActorPath { } } +/** + * This object serves as extractor for Scala and as address parser for Java. + */ object AddressExtractor { def unapply(addr: String): Option[Address] = { try { @@ -71,6 +75,19 @@ object AddressExtractor { case _: URISyntaxException ⇒ None } } + + /** + * Try to construct an Address from the given String or throw a java.net.MalformedURLException. + */ + def apply(addr: String): Address = addr match { + case AddressExtractor(address) ⇒ address + case _ ⇒ throw new MalformedURLException + } + + /** + * Java API: Try to construct an Address from the given String or throw a java.net.MalformedURLException. + */ + def parse(addr: String): Address = apply(addr) } object ActorPathExtractor { diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 36d82b2cec..04cefda6db 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -10,13 +10,58 @@ import akka.routing._ import java.util.concurrent.{ TimeUnit, ConcurrentHashMap } import akka.util.ReflectiveAccess -case class Deploy(path: String, config: Config, routing: RouterConfig = NoRouter, scope: Scope = LocalScope) +/** + * This class represents deployment configuration for a given actor path. It is + * marked final in order to guarantee stable merge semantics (i.e. what + * overrides what in case multiple configuration sources are available) and is + * fully extensible via its Scope argument, and by the fact that an arbitrary + * Config section can be passed along with it (which will be merged when merging + * two Deploys). + * + * The path field is used only when inserting the Deploy into a deployer and + * not needed when just doing deploy-as-you-go: + * + * {{{ + * context.actorOf(someProps, "someName", Deploy(scope = RemoteScope("someOtherNodeName"))) + * }}} + */ +final case class Deploy( + path: String = "", + config: Config = ConfigFactory.empty, + routerConfig: RouterConfig = NoRouter, + scope: Scope = NoScope) { -case class ActorRecipe(implementationClass: Class[_ <: Actor]) //TODO Add ActorConfiguration here + def this(routing: RouterConfig) = this("", ConfigFactory.empty, routing) + def this(routing: RouterConfig, scope: Scope) = this("", ConfigFactory.empty, routing, scope) -trait Scope -case class LocalScope() extends Scope -case object LocalScope extends Scope + /** + * Do a merge between this and the other Deploy, where values from “this” take + * precedence. The “path” of the other Deploy is not taken into account. All + * other members are merged using ``.withFallback(other.)``. + */ + def withFallback(other: Deploy) = + Deploy(path, config.withFallback(other.config), routerConfig.withFallback(other.routerConfig), scope.withFallback(other.scope)) +} + +trait Scope { + def withFallback(other: Scope): Scope +} + +case object LocalScope extends Scope { + /** + * Java API + */ + def scope = this + + def withFallback(other: Scope): Scope = this +} + +/** + * This is the default value and as such allows overrides. + */ +case object NoScope extends Scope { + def withFallback(other: Scope): Scope = other +} /** * Deployer maps actor paths to actor deployments. @@ -76,7 +121,7 @@ class Deployer(val settings: ActorSystem.Settings, val classloader: ClassLoader) } } - Some(Deploy(key, deployment, router, LocalScope)) + Some(Deploy(key, deployment, router, NoScope)) } } diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index cd9a62abe7..70ba50f255 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -22,6 +22,8 @@ object Props { final val defaultRoutedProps: RouterConfig = NoRouter + final val defaultDeploy = Deploy() + final val noHotSwap: Stack[Actor.Receive] = Stack.empty final val empty = new Props(() ⇒ new Actor { def receive = Actor.emptyBehavior }) @@ -105,7 +107,8 @@ object Props { case class Props( creator: () ⇒ Actor = Props.defaultCreator, dispatcher: String = Dispatchers.DefaultDispatcherId, - routerConfig: RouterConfig = Props.defaultRoutedProps) { + routerConfig: RouterConfig = Props.defaultRoutedProps, + deploy: Deploy = Props.defaultDeploy) { /** * No-args constructor that sets all the default values. @@ -159,4 +162,9 @@ case class Props( * Returns a new Props with the specified router config set. */ def withRouter(r: RouterConfig) = copy(routerConfig = r) + + /** + * Returns a new Props with the specified deployment configuration. + */ + def withDeploy(d: Deploy) = copy(deploy = d) } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 69c8e44fc3..7e3f965851 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -431,6 +431,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( interfaces: Seq[Class[_]], creator: () ⇒ T, dispatcher: String = TypedProps.defaultDispatcherId, + deploy: Deploy = Props.defaultDeploy, timeout: Option[Timeout] = TypedProps.defaultTimeout, loader: Option[ClassLoader] = TypedProps.defaultLoader) { @@ -469,12 +470,17 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( creator = () ⇒ implementation.newInstance()) /** - * Returns a new Props with the specified dispatcher set. + * Returns a new TypedProps with the specified dispatcher set. */ def withDispatcher(d: String) = copy(dispatcher = d) /** - * @return a new Props that will use the specified ClassLoader to create its proxy class in + * Returns a new TypedProps with the specified deployment configuration. + */ + def withDeploy(d: Deploy) = copy(deploy = d) + + /** + * @return a new TypedProps that will use the specified ClassLoader to create its proxy class in * If loader is null, it will use the bootstrap classloader. * * Java API @@ -482,7 +488,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( def withLoader(loader: ClassLoader): TypedProps[T] = withLoader(Option(loader)) /** - * @return a new Props that will use the specified ClassLoader to create its proxy class in + * @return a new TypedProps that will use the specified ClassLoader to create its proxy class in * If loader is null, it will use the bootstrap classloader. * * Scala API @@ -490,7 +496,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( def withLoader(loader: Option[ClassLoader]): TypedProps[T] = this.copy(loader = loader) /** - * @return a new Props that will use the specified Timeout for its non-void-returning methods, + * @return a new TypedProps that will use the specified Timeout for its non-void-returning methods, * if null is specified, it will use the default ActorTimeout as specified in the configuration. * * Java API @@ -498,7 +504,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( def withTimeout(timeout: Timeout): TypedProps[T] = this.copy(timeout = Option(timeout)) /** - * @return a new Props that will use the specified Timeout for its non-void-returning methods, + * @return a new TypedProps that will use the specified Timeout for its non-void-returning methods, * if None is specified, it will use the default ActorTimeout as specified in the configuration. * * Scala API @@ -506,7 +512,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( def withTimeout(timeout: Option[Timeout]): TypedProps[T] = this.copy(timeout = timeout) /** - * Returns a new Props that has the specified interface, + * Returns a new TypedProps that has the specified interface, * or if the interface class is not an interface, all the interfaces it implements, * appended in the sequence of interfaces. */ @@ -514,7 +520,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] ( this.copy(interfaces = interfaces ++ TypedProps.extractInterfaces(interface)) /** - * Returns a new Props without the specified interface, + * Returns a new TypedProps without the specified interface, * or if the interface class is not an interface, all the interfaces it implements. */ def withoutInterface(interface: Class[_ >: T]): TypedProps[T] = diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 4ef7cff330..39a60623d7 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -133,13 +133,10 @@ trait RouterConfig { def createActor(): Router = new Router {} - def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = { - deploy match { - case Some(Deploy(_, _, NoRouter, _)) ⇒ this - case Some(Deploy(_, _, r, _)) ⇒ r - case _ ⇒ this - } - } + /** + * Overridable merge strategy, by default completely prefers “this” (i.e. no merge). + */ + def withFallback(other: RouterConfig): RouterConfig = this protected def toAll(sender: ActorRef, routees: Iterable[ActorRef]): Iterable[Destination] = routees.map(Destination(sender, _)) @@ -291,11 +288,14 @@ case class RouterRoutees(routees: Iterable[ActorRef]) case class Destination(sender: ActorRef, recipient: ActorRef) /** - * Routing configuration that indicates no routing. - * Oxymoron style. + * Routing configuration that indicates no routing; this is also the default + * value which hence overrides the merge strategy in order to accept values + * from lower-precendence sources. The decision whether or not to create a + * router is taken in the LocalActorRefProvider based on Props. */ case object NoRouter extends RouterConfig { def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null + override def withFallback(other: RouterConfig): RouterConfig = other } /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 6081372e6b..9df72fd1a9 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -112,8 +112,9 @@ class RemoteActorRefProvider( terminationFuture.onComplete(_ ⇒ transport.shutdown()) } - def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = { - if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy) + def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, + systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean): InternalActorRef = { + if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy) else { /* @@ -152,22 +153,33 @@ class RemoteActorRefProvider( } val elems = path.elements - val deployment = deploy orElse (elems.head match { - case "user" ⇒ deployer.lookup(elems.drop(1).mkString("/", "/", "")) - case "remote" ⇒ lookupRemotes(elems) - case _ ⇒ None - }) + val lookup = + if (lookupDeploy) + elems.head match { + case "user" ⇒ deployer.lookup(elems.drop(1).mkString("/", "/", "")) + case "remote" ⇒ lookupRemotes(elems) + case _ ⇒ None + } + else None - deployment match { - case Some(Deploy(_, _, _, RemoteScope(addr))) ⇒ - if (addr == rootPath.address) local.actorOf(system, props, supervisor, path, false, deployment) - else { + val deployment = { + lookup.toList ::: deploy.toList ::: Nil match { + case Nil ⇒ Nil + case l ⇒ List(l reduceRight (_ withFallback _)) + } + } + + deployment ::: props.deploy :: Nil reduceRight (_ withFallback _) match { + case d @ Deploy(_, _, _, RemoteScope(addr)) ⇒ + if (addr == rootPath.address || addr == transport.address) { + local.actorOf(system, props, supervisor, path, false, deployment.headOption, false) + } else { val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements - useActorOnNode(rpath, props.creator, supervisor) + useActorOnNode(rpath, props, d, supervisor) new RemoteActorRef(this, transport, rpath, supervisor) } - case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment) + case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false) } } } @@ -188,11 +200,11 @@ class RemoteActorRefProvider( /** * Using (checking out) actor on a specific node. */ - def useActorOnNode(path: ActorPath, actorFactory: () ⇒ Actor, supervisor: ActorRef) { + def useActorOnNode(path: ActorPath, props: Props, deploy: Deploy, supervisor: ActorRef) { log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path) // we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor - actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(actorFactory, path.toString, supervisor) + actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(props, deploy, path.toString, supervisor) } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index bda71bcc00..d230c89534 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -6,11 +6,11 @@ package akka.remote import scala.annotation.tailrec -import akka.actor.{ VirtualPathContainer, Terminated, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor } +import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor } import akka.event.LoggingAdapter sealed trait DaemonMsg -case class DaemonMsgCreate(factory: () ⇒ Actor, path: String, supervisor: ActorRef) extends DaemonMsg +case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg /** @@ -52,17 +52,15 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath case message: DaemonMsg ⇒ log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address) message match { - case DaemonMsgCreate(factory, path, supervisor) ⇒ + case DaemonMsgCreate(props, deploy, path, supervisor) ⇒ path match { case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" ⇒ // TODO RK currently the extracted “address” is just ignored, is that okay? // TODO RK canonicalize path so as not to duplicate it always #1446 val subpath = elems.drop(1) val path = this.path / subpath - val actor = system.provider.actorOf(system, - Props(creator = factory), - supervisor.asInstanceOf[InternalActorRef], - path, true, None) + val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], + path, false, Some(deploy), true) addChild(subpath.mkString("/"), actor) system.deathWatch.subscribe(this, actor) case _ ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index 799bba13e3..9686d295ad 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -8,7 +8,9 @@ import akka.routing._ import com.typesafe.config._ import akka.config.ConfigurationException -case class RemoteScope(node: Address) extends Scope +case class RemoteScope(node: Address) extends Scope { + def withFallback(other: Scope): Scope = this +} class RemoteDeployer(_settings: ActorSystem.Settings, _classloader: ClassLoader) extends Deployer(_settings, _classloader) { @@ -22,8 +24,8 @@ class RemoteDeployer(_settings: ActorSystem.Settings, _classloader: ClassLoader) case str ⇒ if (!str.isEmpty) throw new ConfigurationException("unparseable remote node name " + str) val nodes = deploy.config.getStringList("target.nodes").asScala - if (nodes.isEmpty || deploy.routing == NoRouter) d - else Some(deploy.copy(routing = new RemoteRouterConfig(deploy.routing, nodes))) + if (nodes.isEmpty || deploy.routerConfig == NoRouter) d + else Some(deploy.copy(routerConfig = RemoteRouterConfig(deploy.routerConfig, nodes))) } case None ⇒ None } diff --git a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala index 0f35ca05cb..21f5c400b0 100644 --- a/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala +++ b/akka-remote/src/main/scala/akka/routing/RemoteRouterConfig.scala @@ -20,7 +20,7 @@ import akka.actor.AddressExtractor * which makes it possible to mix this with the built-in routers such as * [[akka.routing.RoundRobinRouter]] or custom routers. */ -class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) extends RouterConfig { +case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) extends RouterConfig { override def createRouteeProvider(context: ActorContext) = new RemoteRouteeProvider(nodes, context, resizer) @@ -32,6 +32,10 @@ class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) extends R override def resizer: Option[Resizer] = local.resizer + override def withFallback(other: RouterConfig): RouterConfig = other match { + case RemoteRouterConfig(local, nodes) ⇒ copy(local = this.local.withFallback(local)) + case _ ⇒ this + } } /** @@ -62,7 +66,7 @@ class RemoteRouteeProvider(nodes: Iterable[String], _context: ActorContext, _res IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield { val name = "c" + i val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next)) - impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy)) + impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy), false) }) case (_, xs, _) ⇒ throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]" diff --git a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala index 26c28860f7..bcabd85098 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteRouterSpec.scala @@ -38,13 +38,35 @@ akka { } target.nodes = ["akka://remote_sys@localhost:12347"] } + /remote-blub { + remote = "akka://remote_sys@localhost:12347" + router = round-robin + nr-of-instances = 2 + } + /local-blub { + remote = "akka://RemoteRouterSpec" + router = round-robin + nr-of-instances = 2 + target.nodes = ["akka://remote_sys@localhost:12347"] + } + /local-blub2 { + router = round-robin + nr-of-instances = 4 + target.nodes = ["akka://remote_sys@localhost:12347"] + } } } """) with ImplicitSender { import RemoteRouterSpec._ - val conf = ConfigFactory.parseString("akka.remote.netty.port=12347").withFallback(system.settings.config) + val conf = ConfigFactory.parseString("""akka.remote.netty.port=12347 +akka.actor.deployment { + /remote-override { + router = round-robin + nr-of-instances = 4 + } +}""").withFallback(system.settings.config) val other = ActorSystem("remote_sys", conf) override def atTermination() { @@ -55,27 +77,126 @@ akka { "deploy its children on remote host driven by configuration" in { val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)), "blub") - router ! "" - expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" - router ! "" - expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" + val replies = for (i ← 1 to 5) yield { + router ! "" + expectMsgType[ActorRef].path + } + val children = replies.toSet + children must have size 2 + children.map(_.parent) must have size 1 + children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347") + system.stop(router) } "deploy its children on remote host driven by programatic definition" in { val router = system.actorOf(Props[Echo].withRouter(new RemoteRouterConfig(RoundRobinRouter(2), Seq("akka://remote_sys@localhost:12347"))), "blub2") - router ! "" - expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" - router ! "" - expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" + val replies = for (i ← 1 to 5) yield { + router ! "" + expectMsgType[ActorRef].path + } + val children = replies.toSet + children must have size 2 + children.map(_.parent) must have size 1 + children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347") + system.stop(router) } "deploy dynamic resizable number of children on remote host driven by configuration" in { val router = system.actorOf(Props[Echo].withRouter(FromConfig), "elastic-blub") - router ! "" - expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" - router ! "" - expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347" + val replies = for (i ← 1 to 5000) yield { + router ! "" + expectMsgType[ActorRef].path + } + val children = replies.toSet + children.size must be >= 2 + children.map(_.parent) must have size 1 + children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347") + system.stop(router) + } + + "deploy remote routers based on configuration" in { + val router = system.actorOf(Props[Echo].withRouter(FromConfig), "remote-blub") + router.path.address.toString must be("akka://remote_sys@localhost:12347") + val replies = for (i ← 1 to 5) yield { + router ! "" + expectMsgType[ActorRef].path + } + val children = replies.toSet + children must have size 2 + val parents = children.map(_.parent) + parents must have size 1 + parents.head must be(router.path) + children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347") + system.stop(router) + } + + "deploy remote routers based on explicit deployment" in { + val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)) + .withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "remote-blub2") + router.path.address.toString must be("akka://remote_sys@localhost:12347") + val replies = for (i ← 1 to 5) yield { + router ! "" + expectMsgType[ActorRef].path + } + val children = replies.toSet + children must have size 2 + val parents = children.map(_.parent) + parents must have size 1 + parents.head must be(router.path) + children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347") + system.stop(router) + } + + "let remote deployment be overridden by local configuration" in { + val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)) + .withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "local-blub") + router.path.address.toString must be("akka://RemoteRouterSpec") + val replies = for (i ← 1 to 5) yield { + router ! "" + expectMsgType[ActorRef].path + } + val children = replies.toSet + children must have size 2 + val parents = children.map(_.parent) + parents must have size 1 + parents.head.address must be(Address("akka", "remote_sys", Some("localhost"), Some(12347))) + children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347") + system.stop(router) + } + + "let remote deployment router be overridden by local configuration" in { + val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)) + .withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "local-blub2") + router.path.address.toString must be("akka://remote_sys@localhost:12347") + val replies = for (i ← 1 to 5) yield { + router ! "" + expectMsgType[ActorRef].path + } + val children = replies.toSet + children must have size 4 + val parents = children.map(_.parent) + parents must have size 1 + parents.head must be(router.path) + children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347") + system.stop(router) + } + + "let remote deployment be overridden by remote configuration" in { + val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)) + .withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "remote-override") + router.path.address.toString must be("akka://remote_sys@localhost:12347") + val replies = for (i ← 1 to 5) yield { + router ! "" + expectMsgType[ActorRef].path + } + val children = replies.toSet + children must have size 4 + val parents = children.map(_.parent) + parents must have size 1 + parents.head must be(router.path) + children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347") + system.stop(router) } }