make it possible to programmatically deploy (e.g. remotely), see #1644
- add Deploy to Props, which is used as the basis (overridden by configuration) - utilize general mechanism .withFallback (introduced on Deploy, RouterConfig and Scope) - actually pass Props over the wire when deploying remotely in order to retain settings (this was an oversight before) - write tests for the new functionality
This commit is contained in:
parent
9d388f2de6
commit
10974acfe8
19 changed files with 360 additions and 88 deletions
|
|
@ -64,7 +64,7 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
|
|||
service,
|
||||
deployment.get.config,
|
||||
NoRouter,
|
||||
LocalScope)))
|
||||
NoScope)))
|
||||
}
|
||||
|
||||
"use None deployment for undefined service" in {
|
||||
|
|
@ -117,9 +117,9 @@ class DeployerSpec extends AkkaSpec(DeployerSpec.deployerConf) {
|
|||
val deployment = system.asInstanceOf[ActorSystemImpl].provider.deployer.lookup(service)
|
||||
deployment must be('defined)
|
||||
deployment.get.path must be(service)
|
||||
deployment.get.routing.getClass must be(expected.getClass)
|
||||
deployment.get.routing.resizer must be(expected.resizer)
|
||||
deployment.get.scope must be(LocalScope)
|
||||
deployment.get.routerConfig.getClass must be(expected.getClass)
|
||||
deployment.get.routerConfig.resizer must be(expected.resizer)
|
||||
deployment.get.scope must be(NoScope)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,12 +1,19 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.routing
|
||||
|
||||
import akka.actor._
|
||||
import akka.routing._
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import akka.actor.actorRef2Scala
|
||||
import akka.actor.{ Props, LocalActorRef, Deploy, Actor }
|
||||
import akka.config.ConfigurationException
|
||||
import akka.dispatch.Await
|
||||
import akka.pattern.ask
|
||||
import akka.testkit.{ TestLatch, ImplicitSender, DefaultTimeout, AkkaSpec }
|
||||
import akka.util.duration.intToDurationInt
|
||||
|
||||
object ConfiguredLocalRoutingSpec {
|
||||
val config = """
|
||||
|
|
@ -16,6 +23,12 @@ object ConfiguredLocalRoutingSpec {
|
|||
core-pool-size-min = 8
|
||||
core-pool-size-max = 16
|
||||
}
|
||||
deployment {
|
||||
/config {
|
||||
router = random
|
||||
nr-of-instances = 4
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
|
@ -24,18 +37,52 @@ object ConfiguredLocalRoutingSpec {
|
|||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ConfiguredLocalRoutingSpec extends AkkaSpec(ConfiguredLocalRoutingSpec.config) with DefaultTimeout with ImplicitSender {
|
||||
|
||||
val deployer = system.asInstanceOf[ActorSystemImpl].provider.deployer
|
||||
|
||||
"RouterConfig" must {
|
||||
|
||||
"be picked up from Props" in {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "get" ⇒ sender ! context.props
|
||||
}
|
||||
}).withRouter(RoundRobinRouter(12)), "someOther")
|
||||
actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RoundRobinRouter(12)
|
||||
system.stop(actor)
|
||||
}
|
||||
|
||||
"be overridable in config" in {
|
||||
deployer.deploy(Deploy("/config", null, RandomRouter(4), LocalScope))
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "get" ⇒ sender ! context.props
|
||||
}
|
||||
}).withRouter(RoundRobinRouter(12)), "config")
|
||||
actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RandomRouter(4)
|
||||
system.stop(actor)
|
||||
}
|
||||
|
||||
"be overridable in explicit deployment" in {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "get" ⇒ sender ! context.props
|
||||
}
|
||||
}).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "someOther")
|
||||
actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RoundRobinRouter(12)
|
||||
system.stop(actor)
|
||||
}
|
||||
|
||||
"be overridable in config even with explicit deployment" in {
|
||||
val actor = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "get" ⇒ sender ! context.props
|
||||
}
|
||||
}).withRouter(FromConfig).withDeploy(Deploy(routerConfig = RoundRobinRouter(12))), "config")
|
||||
actor.asInstanceOf[LocalActorRef].underlying.props.routerConfig must be === RandomRouter(4)
|
||||
system.stop(actor)
|
||||
}
|
||||
|
||||
"fail with an exception if not correct" in {
|
||||
intercept[ConfigurationException] {
|
||||
system.actorOf(Props.empty.withRouter(FromConfig))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ package com.typesafe.config;
|
|||
* interface is likely to grow new methods over time, so third-party
|
||||
* implementations will break.
|
||||
*/
|
||||
public interface ConfigValue extends ConfigMergeable {
|
||||
public interface ConfigValue extends ConfigMergeable, java.io.Serializable {
|
||||
/**
|
||||
* The origin of the value (file, line number, etc.), for debugging and
|
||||
* error messages.
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ import com.typesafe.config.ConfigValueType;
|
|||
* with a one-level java.util.Map from paths to non-null values. Null values are
|
||||
* not "in" the map.
|
||||
*/
|
||||
final class SimpleConfig implements Config, MergeableValue {
|
||||
final class SimpleConfig implements Config, MergeableValue, java.io.Serializable {
|
||||
|
||||
final private AbstractConfigObject object;
|
||||
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import com.typesafe.config.ConfigOrigin;
|
|||
|
||||
// it would be cleaner to have a class hierarchy for various origin types,
|
||||
// but was hoping this would be enough simpler to be a little messy. eh.
|
||||
final class SimpleConfigOrigin implements ConfigOrigin {
|
||||
final class SimpleConfigOrigin implements ConfigOrigin, java.io.Serializable {
|
||||
final private String description;
|
||||
final private int lineNumber;
|
||||
final private int endLineNumber;
|
||||
|
|
|
|||
|
|
@ -97,7 +97,8 @@ akka {
|
|||
paths = []
|
||||
}
|
||||
|
||||
# Routers with dynamically resizable number of routees
|
||||
# Routers with dynamically resizable number of routees; this feature is enabled
|
||||
# by including (parts of) this section in the deployment
|
||||
resizer {
|
||||
|
||||
# The fewest number of routees the router should ever have.
|
||||
|
|
|
|||
|
|
@ -225,7 +225,7 @@ private[akka] class ActorCell(
|
|||
}
|
||||
}
|
||||
}
|
||||
val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None)
|
||||
val actor = provider.actorOf(systemImpl, props, self, self.path / name, false, None, true)
|
||||
childrenRefs = childrenRefs.updated(name, ChildRestartStats(actor))
|
||||
actor
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,9 @@ object ActorPath {
|
|||
rec(s.length, Nil)
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse string as actor path; throws java.net.MalformedURLException if unable to do so.
|
||||
*/
|
||||
def fromString(s: String): ActorPath = s match {
|
||||
case ActorPathExtractor(addr, elems) ⇒ RootActorPath(addr) / elems
|
||||
case _ ⇒ throw new MalformedURLException("cannot parse as ActorPath: " + s)
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import akka.routing._
|
|||
import akka.AkkaException
|
||||
import akka.util.{ Switch, Helpers }
|
||||
import akka.event._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
/**
|
||||
* Interface for all ActorRef providers to implement.
|
||||
|
|
@ -97,7 +98,14 @@ trait ActorRefProvider {
|
|||
* in case of remote supervision). If systemService is true, deployment is
|
||||
* bypassed (local-only).
|
||||
*/
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef
|
||||
def actorOf(
|
||||
system: ActorSystemImpl,
|
||||
props: Props,
|
||||
supervisor: InternalActorRef,
|
||||
path: ActorPath,
|
||||
systemService: Boolean,
|
||||
deploy: Option[Deploy],
|
||||
lookupDeploy: Boolean): InternalActorRef
|
||||
|
||||
/**
|
||||
* Create actor reference for a specified local or remote path. If no such
|
||||
|
|
@ -454,10 +462,10 @@ class LocalActorRefProvider(
|
|||
}
|
||||
|
||||
lazy val guardian: InternalActorRef =
|
||||
actorOf(system, guardianProps, rootGuardian, rootPath / "user", true, None)
|
||||
actorOf(system, guardianProps, rootGuardian, rootPath / "user", true, None, false)
|
||||
|
||||
lazy val systemGuardian: InternalActorRef =
|
||||
actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None)
|
||||
actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", true, None, false)
|
||||
|
||||
lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log)
|
||||
|
||||
|
|
@ -510,15 +518,15 @@ class LocalActorRefProvider(
|
|||
case x ⇒ x
|
||||
}
|
||||
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = {
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath,
|
||||
systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean): InternalActorRef = {
|
||||
props.routerConfig match {
|
||||
case NoRouter ⇒ new LocalActorRef(system, props, supervisor, path, systemService) // create a local actor
|
||||
case router ⇒
|
||||
val depl = deploy orElse {
|
||||
val lookupPath = path.elements.drop(1).mkString("/", "/", "")
|
||||
deployer.lookup(lookupPath)
|
||||
}
|
||||
new RoutedActorRef(system, props.withRouter(router.adaptFromDeploy(depl)), supervisor, path)
|
||||
val lookup = if (lookupDeploy) deployer.lookup(path.elements.drop(1).mkString("/", "/", "")) else None
|
||||
val fromProps = props.deploy.copy(routerConfig = props.deploy.routerConfig withFallback router) :: Nil
|
||||
val d = lookup.toList ::: deploy.toList ::: fromProps reduceRight (_ withFallback _)
|
||||
new RoutedActorRef(system, props.withRouter(d.routerConfig), supervisor, path)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package akka.actor
|
||||
import java.net.URI
|
||||
import java.net.URISyntaxException
|
||||
import java.net.MalformedURLException
|
||||
|
||||
/**
|
||||
* The address specifies the physical location under which an Actor can be
|
||||
|
|
@ -56,6 +57,9 @@ object RelativeActorPath {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This object serves as extractor for Scala and as address parser for Java.
|
||||
*/
|
||||
object AddressExtractor {
|
||||
def unapply(addr: String): Option[Address] = {
|
||||
try {
|
||||
|
|
@ -71,6 +75,19 @@ object AddressExtractor {
|
|||
case _: URISyntaxException ⇒ None
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to construct an Address from the given String or throw a java.net.MalformedURLException.
|
||||
*/
|
||||
def apply(addr: String): Address = addr match {
|
||||
case AddressExtractor(address) ⇒ address
|
||||
case _ ⇒ throw new MalformedURLException
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API: Try to construct an Address from the given String or throw a java.net.MalformedURLException.
|
||||
*/
|
||||
def parse(addr: String): Address = apply(addr)
|
||||
}
|
||||
|
||||
object ActorPathExtractor {
|
||||
|
|
|
|||
|
|
@ -10,13 +10,58 @@ import akka.routing._
|
|||
import java.util.concurrent.{ TimeUnit, ConcurrentHashMap }
|
||||
import akka.util.ReflectiveAccess
|
||||
|
||||
case class Deploy(path: String, config: Config, routing: RouterConfig = NoRouter, scope: Scope = LocalScope)
|
||||
/**
|
||||
* This class represents deployment configuration for a given actor path. It is
|
||||
* marked final in order to guarantee stable merge semantics (i.e. what
|
||||
* overrides what in case multiple configuration sources are available) and is
|
||||
* fully extensible via its Scope argument, and by the fact that an arbitrary
|
||||
* Config section can be passed along with it (which will be merged when merging
|
||||
* two Deploys).
|
||||
*
|
||||
* The path field is used only when inserting the Deploy into a deployer and
|
||||
* not needed when just doing deploy-as-you-go:
|
||||
*
|
||||
* {{{
|
||||
* context.actorOf(someProps, "someName", Deploy(scope = RemoteScope("someOtherNodeName")))
|
||||
* }}}
|
||||
*/
|
||||
final case class Deploy(
|
||||
path: String = "",
|
||||
config: Config = ConfigFactory.empty,
|
||||
routerConfig: RouterConfig = NoRouter,
|
||||
scope: Scope = NoScope) {
|
||||
|
||||
case class ActorRecipe(implementationClass: Class[_ <: Actor]) //TODO Add ActorConfiguration here
|
||||
def this(routing: RouterConfig) = this("", ConfigFactory.empty, routing)
|
||||
def this(routing: RouterConfig, scope: Scope) = this("", ConfigFactory.empty, routing, scope)
|
||||
|
||||
trait Scope
|
||||
case class LocalScope() extends Scope
|
||||
case object LocalScope extends Scope
|
||||
/**
|
||||
* Do a merge between this and the other Deploy, where values from “this” take
|
||||
* precedence. The “path” of the other Deploy is not taken into account. All
|
||||
* other members are merged using ``<X>.withFallback(other.<X>)``.
|
||||
*/
|
||||
def withFallback(other: Deploy) =
|
||||
Deploy(path, config.withFallback(other.config), routerConfig.withFallback(other.routerConfig), scope.withFallback(other.scope))
|
||||
}
|
||||
|
||||
trait Scope {
|
||||
def withFallback(other: Scope): Scope
|
||||
}
|
||||
|
||||
case object LocalScope extends Scope {
|
||||
/**
|
||||
* Java API
|
||||
*/
|
||||
def scope = this
|
||||
|
||||
def withFallback(other: Scope): Scope = this
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the default value and as such allows overrides.
|
||||
*/
|
||||
case object NoScope extends Scope {
|
||||
def withFallback(other: Scope): Scope = other
|
||||
}
|
||||
|
||||
/**
|
||||
* Deployer maps actor paths to actor deployments.
|
||||
|
|
@ -76,7 +121,7 @@ class Deployer(val settings: ActorSystem.Settings, val classloader: ClassLoader)
|
|||
}
|
||||
}
|
||||
|
||||
Some(Deploy(key, deployment, router, LocalScope))
|
||||
Some(Deploy(key, deployment, router, NoScope))
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ object Props {
|
|||
|
||||
final val defaultRoutedProps: RouterConfig = NoRouter
|
||||
|
||||
final val defaultDeploy = Deploy()
|
||||
|
||||
final val noHotSwap: Stack[Actor.Receive] = Stack.empty
|
||||
final val empty = new Props(() ⇒ new Actor { def receive = Actor.emptyBehavior })
|
||||
|
||||
|
|
@ -105,7 +107,8 @@ object Props {
|
|||
case class Props(
|
||||
creator: () ⇒ Actor = Props.defaultCreator,
|
||||
dispatcher: String = Dispatchers.DefaultDispatcherId,
|
||||
routerConfig: RouterConfig = Props.defaultRoutedProps) {
|
||||
routerConfig: RouterConfig = Props.defaultRoutedProps,
|
||||
deploy: Deploy = Props.defaultDeploy) {
|
||||
|
||||
/**
|
||||
* No-args constructor that sets all the default values.
|
||||
|
|
@ -159,4 +162,9 @@ case class Props(
|
|||
* Returns a new Props with the specified router config set.
|
||||
*/
|
||||
def withRouter(r: RouterConfig) = copy(routerConfig = r)
|
||||
|
||||
/**
|
||||
* Returns a new Props with the specified deployment configuration.
|
||||
*/
|
||||
def withDeploy(d: Deploy) = copy(deploy = d)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -431,6 +431,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
|
|||
interfaces: Seq[Class[_]],
|
||||
creator: () ⇒ T,
|
||||
dispatcher: String = TypedProps.defaultDispatcherId,
|
||||
deploy: Deploy = Props.defaultDeploy,
|
||||
timeout: Option[Timeout] = TypedProps.defaultTimeout,
|
||||
loader: Option[ClassLoader] = TypedProps.defaultLoader) {
|
||||
|
||||
|
|
@ -469,12 +470,17 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
|
|||
creator = () ⇒ implementation.newInstance())
|
||||
|
||||
/**
|
||||
* Returns a new Props with the specified dispatcher set.
|
||||
* Returns a new TypedProps with the specified dispatcher set.
|
||||
*/
|
||||
def withDispatcher(d: String) = copy(dispatcher = d)
|
||||
|
||||
/**
|
||||
* @return a new Props that will use the specified ClassLoader to create its proxy class in
|
||||
* Returns a new TypedProps with the specified deployment configuration.
|
||||
*/
|
||||
def withDeploy(d: Deploy) = copy(deploy = d)
|
||||
|
||||
/**
|
||||
* @return a new TypedProps that will use the specified ClassLoader to create its proxy class in
|
||||
* If loader is null, it will use the bootstrap classloader.
|
||||
*
|
||||
* Java API
|
||||
|
|
@ -482,7 +488,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
|
|||
def withLoader(loader: ClassLoader): TypedProps[T] = withLoader(Option(loader))
|
||||
|
||||
/**
|
||||
* @return a new Props that will use the specified ClassLoader to create its proxy class in
|
||||
* @return a new TypedProps that will use the specified ClassLoader to create its proxy class in
|
||||
* If loader is null, it will use the bootstrap classloader.
|
||||
*
|
||||
* Scala API
|
||||
|
|
@ -490,7 +496,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
|
|||
def withLoader(loader: Option[ClassLoader]): TypedProps[T] = this.copy(loader = loader)
|
||||
|
||||
/**
|
||||
* @return a new Props that will use the specified Timeout for its non-void-returning methods,
|
||||
* @return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
|
||||
* if null is specified, it will use the default ActorTimeout as specified in the configuration.
|
||||
*
|
||||
* Java API
|
||||
|
|
@ -498,7 +504,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
|
|||
def withTimeout(timeout: Timeout): TypedProps[T] = this.copy(timeout = Option(timeout))
|
||||
|
||||
/**
|
||||
* @return a new Props that will use the specified Timeout for its non-void-returning methods,
|
||||
* @return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
|
||||
* if None is specified, it will use the default ActorTimeout as specified in the configuration.
|
||||
*
|
||||
* Scala API
|
||||
|
|
@ -506,7 +512,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
|
|||
def withTimeout(timeout: Option[Timeout]): TypedProps[T] = this.copy(timeout = timeout)
|
||||
|
||||
/**
|
||||
* Returns a new Props that has the specified interface,
|
||||
* Returns a new TypedProps that has the specified interface,
|
||||
* or if the interface class is not an interface, all the interfaces it implements,
|
||||
* appended in the sequence of interfaces.
|
||||
*/
|
||||
|
|
@ -514,7 +520,7 @@ case class TypedProps[T <: AnyRef] protected[TypedProps] (
|
|||
this.copy(interfaces = interfaces ++ TypedProps.extractInterfaces(interface))
|
||||
|
||||
/**
|
||||
* Returns a new Props without the specified interface,
|
||||
* Returns a new TypedProps without the specified interface,
|
||||
* or if the interface class is not an interface, all the interfaces it implements.
|
||||
*/
|
||||
def withoutInterface(interface: Class[_ >: T]): TypedProps[T] =
|
||||
|
|
|
|||
|
|
@ -133,13 +133,10 @@ trait RouterConfig {
|
|||
|
||||
def createActor(): Router = new Router {}
|
||||
|
||||
def adaptFromDeploy(deploy: Option[Deploy]): RouterConfig = {
|
||||
deploy match {
|
||||
case Some(Deploy(_, _, NoRouter, _)) ⇒ this
|
||||
case Some(Deploy(_, _, r, _)) ⇒ r
|
||||
case _ ⇒ this
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Overridable merge strategy, by default completely prefers “this” (i.e. no merge).
|
||||
*/
|
||||
def withFallback(other: RouterConfig): RouterConfig = this
|
||||
|
||||
protected def toAll(sender: ActorRef, routees: Iterable[ActorRef]): Iterable[Destination] = routees.map(Destination(sender, _))
|
||||
|
||||
|
|
@ -291,11 +288,14 @@ case class RouterRoutees(routees: Iterable[ActorRef])
|
|||
case class Destination(sender: ActorRef, recipient: ActorRef)
|
||||
|
||||
/**
|
||||
* Routing configuration that indicates no routing.
|
||||
* Oxymoron style.
|
||||
* Routing configuration that indicates no routing; this is also the default
|
||||
* value which hence overrides the merge strategy in order to accept values
|
||||
* from lower-precendence sources. The decision whether or not to create a
|
||||
* router is taken in the LocalActorRefProvider based on Props.
|
||||
*/
|
||||
case object NoRouter extends RouterConfig {
|
||||
def createRoute(props: Props, routeeProvider: RouteeProvider): Route = null
|
||||
override def withFallback(other: RouterConfig): RouterConfig = other
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -112,8 +112,9 @@ class RemoteActorRefProvider(
|
|||
terminationFuture.onComplete(_ ⇒ transport.shutdown())
|
||||
}
|
||||
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy]): InternalActorRef = {
|
||||
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy)
|
||||
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath,
|
||||
systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean): InternalActorRef = {
|
||||
if (systemService) local.actorOf(system, props, supervisor, path, systemService, deploy, lookupDeploy)
|
||||
else {
|
||||
|
||||
/*
|
||||
|
|
@ -152,22 +153,33 @@ class RemoteActorRefProvider(
|
|||
}
|
||||
|
||||
val elems = path.elements
|
||||
val deployment = deploy orElse (elems.head match {
|
||||
case "user" ⇒ deployer.lookup(elems.drop(1).mkString("/", "/", ""))
|
||||
case "remote" ⇒ lookupRemotes(elems)
|
||||
case _ ⇒ None
|
||||
})
|
||||
val lookup =
|
||||
if (lookupDeploy)
|
||||
elems.head match {
|
||||
case "user" ⇒ deployer.lookup(elems.drop(1).mkString("/", "/", ""))
|
||||
case "remote" ⇒ lookupRemotes(elems)
|
||||
case _ ⇒ None
|
||||
}
|
||||
else None
|
||||
|
||||
deployment match {
|
||||
case Some(Deploy(_, _, _, RemoteScope(addr))) ⇒
|
||||
if (addr == rootPath.address) local.actorOf(system, props, supervisor, path, false, deployment)
|
||||
else {
|
||||
val deployment = {
|
||||
lookup.toList ::: deploy.toList ::: Nil match {
|
||||
case Nil ⇒ Nil
|
||||
case l ⇒ List(l reduceRight (_ withFallback _))
|
||||
}
|
||||
}
|
||||
|
||||
deployment ::: props.deploy :: Nil reduceRight (_ withFallback _) match {
|
||||
case d @ Deploy(_, _, _, RemoteScope(addr)) ⇒
|
||||
if (addr == rootPath.address || addr == transport.address) {
|
||||
local.actorOf(system, props, supervisor, path, false, deployment.headOption, false)
|
||||
} else {
|
||||
val rpath = RootActorPath(addr) / "remote" / transport.address.hostPort / path.elements
|
||||
useActorOnNode(rpath, props.creator, supervisor)
|
||||
useActorOnNode(rpath, props, d, supervisor)
|
||||
new RemoteActorRef(this, transport, rpath, supervisor)
|
||||
}
|
||||
|
||||
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment)
|
||||
case _ ⇒ local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -188,11 +200,11 @@ class RemoteActorRefProvider(
|
|||
/**
|
||||
* Using (checking out) actor on a specific node.
|
||||
*/
|
||||
def useActorOnNode(path: ActorPath, actorFactory: () ⇒ Actor, supervisor: ActorRef) {
|
||||
def useActorOnNode(path: ActorPath, props: Props, deploy: Deploy, supervisor: ActorRef) {
|
||||
log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path)
|
||||
|
||||
// we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor
|
||||
actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(actorFactory, path.toString, supervisor)
|
||||
actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(props, deploy, path.toString, supervisor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,11 +6,11 @@ package akka.remote
|
|||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
import akka.actor.{ VirtualPathContainer, Terminated, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor }
|
||||
import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor }
|
||||
import akka.event.LoggingAdapter
|
||||
|
||||
sealed trait DaemonMsg
|
||||
case class DaemonMsgCreate(factory: () ⇒ Actor, path: String, supervisor: ActorRef) extends DaemonMsg
|
||||
case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg
|
||||
case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg
|
||||
|
||||
/**
|
||||
|
|
@ -52,17 +52,15 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
|
|||
case message: DaemonMsg ⇒
|
||||
log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address)
|
||||
message match {
|
||||
case DaemonMsgCreate(factory, path, supervisor) ⇒
|
||||
case DaemonMsgCreate(props, deploy, path, supervisor) ⇒
|
||||
path match {
|
||||
case ActorPathExtractor(address, elems) if elems.nonEmpty && elems.head == "remote" ⇒
|
||||
// TODO RK currently the extracted “address” is just ignored, is that okay?
|
||||
// TODO RK canonicalize path so as not to duplicate it always #1446
|
||||
val subpath = elems.drop(1)
|
||||
val path = this.path / subpath
|
||||
val actor = system.provider.actorOf(system,
|
||||
Props(creator = factory),
|
||||
supervisor.asInstanceOf[InternalActorRef],
|
||||
path, true, None)
|
||||
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
|
||||
path, false, Some(deploy), true)
|
||||
addChild(subpath.mkString("/"), actor)
|
||||
system.deathWatch.subscribe(this, actor)
|
||||
case _ ⇒
|
||||
|
|
|
|||
|
|
@ -8,7 +8,9 @@ import akka.routing._
|
|||
import com.typesafe.config._
|
||||
import akka.config.ConfigurationException
|
||||
|
||||
case class RemoteScope(node: Address) extends Scope
|
||||
case class RemoteScope(node: Address) extends Scope {
|
||||
def withFallback(other: Scope): Scope = this
|
||||
}
|
||||
|
||||
class RemoteDeployer(_settings: ActorSystem.Settings, _classloader: ClassLoader) extends Deployer(_settings, _classloader) {
|
||||
|
||||
|
|
@ -22,8 +24,8 @@ class RemoteDeployer(_settings: ActorSystem.Settings, _classloader: ClassLoader)
|
|||
case str ⇒
|
||||
if (!str.isEmpty) throw new ConfigurationException("unparseable remote node name " + str)
|
||||
val nodes = deploy.config.getStringList("target.nodes").asScala
|
||||
if (nodes.isEmpty || deploy.routing == NoRouter) d
|
||||
else Some(deploy.copy(routing = new RemoteRouterConfig(deploy.routing, nodes)))
|
||||
if (nodes.isEmpty || deploy.routerConfig == NoRouter) d
|
||||
else Some(deploy.copy(routerConfig = RemoteRouterConfig(deploy.routerConfig, nodes)))
|
||||
}
|
||||
case None ⇒ None
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ import akka.actor.AddressExtractor
|
|||
* which makes it possible to mix this with the built-in routers such as
|
||||
* [[akka.routing.RoundRobinRouter]] or custom routers.
|
||||
*/
|
||||
class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) extends RouterConfig {
|
||||
case class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) extends RouterConfig {
|
||||
|
||||
override def createRouteeProvider(context: ActorContext) = new RemoteRouteeProvider(nodes, context, resizer)
|
||||
|
||||
|
|
@ -32,6 +32,10 @@ class RemoteRouterConfig(local: RouterConfig, nodes: Iterable[String]) extends R
|
|||
|
||||
override def resizer: Option[Resizer] = local.resizer
|
||||
|
||||
override def withFallback(other: RouterConfig): RouterConfig = other match {
|
||||
case RemoteRouterConfig(local, nodes) ⇒ copy(local = this.local.withFallback(local))
|
||||
case _ ⇒ this
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -62,7 +66,7 @@ class RemoteRouteeProvider(nodes: Iterable[String], _context: ActorContext, _res
|
|||
IndexedSeq.empty[ActorRef] ++ (for (i ← 1 to nrOfInstances) yield {
|
||||
val name = "c" + i
|
||||
val deploy = Deploy("", ConfigFactory.empty(), props.routerConfig, RemoteScope(nodeAddressIter.next))
|
||||
impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy))
|
||||
impl.provider.actorOf(impl, props, context.self.asInstanceOf[InternalActorRef], context.self.path / name, false, Some(deploy), false)
|
||||
})
|
||||
|
||||
case (_, xs, _) ⇒ throw new ConfigurationException("Remote target.nodes can not be combined with routees for [%s]"
|
||||
|
|
|
|||
|
|
@ -38,13 +38,35 @@ akka {
|
|||
}
|
||||
target.nodes = ["akka://remote_sys@localhost:12347"]
|
||||
}
|
||||
/remote-blub {
|
||||
remote = "akka://remote_sys@localhost:12347"
|
||||
router = round-robin
|
||||
nr-of-instances = 2
|
||||
}
|
||||
/local-blub {
|
||||
remote = "akka://RemoteRouterSpec"
|
||||
router = round-robin
|
||||
nr-of-instances = 2
|
||||
target.nodes = ["akka://remote_sys@localhost:12347"]
|
||||
}
|
||||
/local-blub2 {
|
||||
router = round-robin
|
||||
nr-of-instances = 4
|
||||
target.nodes = ["akka://remote_sys@localhost:12347"]
|
||||
}
|
||||
}
|
||||
}
|
||||
""") with ImplicitSender {
|
||||
|
||||
import RemoteRouterSpec._
|
||||
|
||||
val conf = ConfigFactory.parseString("akka.remote.netty.port=12347").withFallback(system.settings.config)
|
||||
val conf = ConfigFactory.parseString("""akka.remote.netty.port=12347
|
||||
akka.actor.deployment {
|
||||
/remote-override {
|
||||
router = round-robin
|
||||
nr-of-instances = 4
|
||||
}
|
||||
}""").withFallback(system.settings.config)
|
||||
val other = ActorSystem("remote_sys", conf)
|
||||
|
||||
override def atTermination() {
|
||||
|
|
@ -55,27 +77,126 @@ akka {
|
|||
|
||||
"deploy its children on remote host driven by configuration" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2)), "blub")
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347"
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347"
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
}
|
||||
val children = replies.toSet
|
||||
children must have size 2
|
||||
children.map(_.parent) must have size 1
|
||||
children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"deploy its children on remote host driven by programatic definition" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(new RemoteRouterConfig(RoundRobinRouter(2),
|
||||
Seq("akka://remote_sys@localhost:12347"))), "blub2")
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347"
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347"
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
}
|
||||
val children = replies.toSet
|
||||
children must have size 2
|
||||
children.map(_.parent) must have size 1
|
||||
children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"deploy dynamic resizable number of children on remote host driven by configuration" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(FromConfig), "elastic-blub")
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347"
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path.address.toString must be === "akka://remote_sys@localhost:12347"
|
||||
val replies = for (i ← 1 to 5000) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
}
|
||||
val children = replies.toSet
|
||||
children.size must be >= 2
|
||||
children.map(_.parent) must have size 1
|
||||
children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"deploy remote routers based on configuration" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(FromConfig), "remote-blub")
|
||||
router.path.address.toString must be("akka://remote_sys@localhost:12347")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
}
|
||||
val children = replies.toSet
|
||||
children must have size 2
|
||||
val parents = children.map(_.parent)
|
||||
parents must have size 1
|
||||
parents.head must be(router.path)
|
||||
children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"deploy remote routers based on explicit deployment" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "remote-blub2")
|
||||
router.path.address.toString must be("akka://remote_sys@localhost:12347")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
}
|
||||
val children = replies.toSet
|
||||
children must have size 2
|
||||
val parents = children.map(_.parent)
|
||||
parents must have size 1
|
||||
parents.head must be(router.path)
|
||||
children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"let remote deployment be overridden by local configuration" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "local-blub")
|
||||
router.path.address.toString must be("akka://RemoteRouterSpec")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
}
|
||||
val children = replies.toSet
|
||||
children must have size 2
|
||||
val parents = children.map(_.parent)
|
||||
parents must have size 1
|
||||
parents.head.address must be(Address("akka", "remote_sys", Some("localhost"), Some(12347)))
|
||||
children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"let remote deployment router be overridden by local configuration" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "local-blub2")
|
||||
router.path.address.toString must be("akka://remote_sys@localhost:12347")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
}
|
||||
val children = replies.toSet
|
||||
children must have size 4
|
||||
val parents = children.map(_.parent)
|
||||
parents must have size 1
|
||||
parents.head must be(router.path)
|
||||
children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
"let remote deployment be overridden by remote configuration" in {
|
||||
val router = system.actorOf(Props[Echo].withRouter(RoundRobinRouter(2))
|
||||
.withDeploy(Deploy(scope = RemoteScope(AddressExtractor("akka://remote_sys@localhost:12347")))), "remote-override")
|
||||
router.path.address.toString must be("akka://remote_sys@localhost:12347")
|
||||
val replies = for (i ← 1 to 5) yield {
|
||||
router ! ""
|
||||
expectMsgType[ActorRef].path
|
||||
}
|
||||
val children = replies.toSet
|
||||
children must have size 4
|
||||
val parents = children.map(_.parent)
|
||||
parents must have size 1
|
||||
parents.head must be(router.path)
|
||||
children foreach (_.address.toString must be === "akka://remote_sys@localhost:12347")
|
||||
system.stop(router)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue