merge with master
This commit is contained in:
commit
037caf5136
4 changed files with 68 additions and 21 deletions
|
|
@ -135,7 +135,7 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
|
|||
add(d.path.split("/").drop(1), d)
|
||||
}
|
||||
|
||||
protected def parseConfig(key: String, config: Config): Option[Deploy] = {
|
||||
def parseConfig(key: String, config: Config): Option[Deploy] = {
|
||||
|
||||
val deployment = config.withFallback(default)
|
||||
|
||||
|
|
|
|||
|
|
@ -27,20 +27,9 @@ object DirectRoutedRemoteActorMultiJvmSpec extends MultiNodeConfig {
|
|||
val master = role("master")
|
||||
val slave = role("slave")
|
||||
|
||||
nodeConfig(master, ConfigFactory.parseString("""
|
||||
akka.actor {
|
||||
deployment {
|
||||
/service-hello.remote = "akka://MultiNodeSpec@%s"
|
||||
}
|
||||
}
|
||||
# FIXME When using NettyRemoteTransport instead of TestConductorTransport it works
|
||||
# akka.remote.transport = "akka.remote.netty.NettyRemoteTransport"
|
||||
""".format("localhost:2553"))) // FIXME is there a way to avoid hardcoding the host:port here?
|
||||
|
||||
nodeConfig(slave, ConfigFactory.parseString("""
|
||||
akka.remote.netty.port = 2553
|
||||
"""))
|
||||
|
||||
deployOn(master, """/service-hello.remote = "@slave@" """)
|
||||
|
||||
deployOnAll("""/service-hello2.remote = "@slave@" """)
|
||||
}
|
||||
|
||||
class DirectRoutedRemoteActorMultiJvmNode1 extends DirectRoutedRemoteActorSpec
|
||||
|
|
@ -60,7 +49,26 @@ class DirectRoutedRemoteActorSpec extends MultiNodeSpec(DirectRoutedRemoteActorM
|
|||
actor.isInstanceOf[RemoteActorRef] must be(true)
|
||||
|
||||
val slaveAddress = testConductor.getAddressFor(slave).await
|
||||
(actor ? "identify").await.asInstanceOf[ActorRef].path.address must equal(slaveAddress)
|
||||
actor ! "identify"
|
||||
expectMsgType[ActorRef].path.address must equal(slaveAddress)
|
||||
|
||||
// shut down the actor before we let the other node(s) shut down so we don't try to send
|
||||
// "Terminate" to a shut down node
|
||||
system.stop(actor)
|
||||
}
|
||||
|
||||
testConductor.enter("done")
|
||||
}
|
||||
|
||||
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef (with deployOnAll)" in {
|
||||
|
||||
runOn(master) {
|
||||
val actor = system.actorOf(Props[SomeActor], "service-hello2")
|
||||
actor.isInstanceOf[RemoteActorRef] must be(true)
|
||||
|
||||
val slaveAddress = testConductor.getAddressFor(slave).await
|
||||
actor ! "identify"
|
||||
expectMsgType[ActorRef].path.address must equal(slaveAddress)
|
||||
|
||||
// shut down the actor before we let the other node(s) shut down so we don't try to send
|
||||
// "Terminate" to a shut down node
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.remote.testkit
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.actor.ActorSystem
|
||||
import akka.actor.{ ActorSystem, ExtendedActorSystem }
|
||||
import akka.remote.testconductor.TestConductor
|
||||
import java.net.InetAddress
|
||||
import java.net.InetSocketAddress
|
||||
|
|
@ -17,6 +17,8 @@ import akka.util.Duration
|
|||
import akka.actor.ActorPath
|
||||
import akka.actor.RootActorPath
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.actor.Deploy
|
||||
import com.typesafe.config.ConfigObject
|
||||
|
||||
/**
|
||||
* Configure the role names and participants of the test, including configuration settings.
|
||||
|
|
@ -25,7 +27,9 @@ abstract class MultiNodeConfig {
|
|||
|
||||
private var _commonConf: Option[Config] = None
|
||||
private var _nodeConf = Map[RoleName, Config]()
|
||||
private var _roles = Seq[RoleName]()
|
||||
private var _roles = Vector[RoleName]()
|
||||
private var _deployments = Map[RoleName, Seq[String]]()
|
||||
private var _allDeploy = Vector[String]()
|
||||
|
||||
/**
|
||||
* Register a common base config for all test participants, if so desired.
|
||||
|
|
@ -68,6 +72,11 @@ abstract class MultiNodeConfig {
|
|||
r
|
||||
}
|
||||
|
||||
def deployOn(role: RoleName, deployment: String): Unit =
|
||||
_deployments += role -> ((_deployments get role getOrElse Vector()) :+ deployment)
|
||||
|
||||
def deployOnAll(deployment: String): Unit = _allDeploy :+= deployment
|
||||
|
||||
private[testkit] lazy val mySelf: RoleName = {
|
||||
require(_roles.size > MultiNodeSpec.selfIndex, "not enough roles declared for this test")
|
||||
_roles(MultiNodeSpec.selfIndex)
|
||||
|
|
@ -78,6 +87,10 @@ abstract class MultiNodeConfig {
|
|||
configs reduce (_ withFallback _)
|
||||
}
|
||||
|
||||
private[testkit] def deployments(node: RoleName): Seq[String] = (_deployments get node getOrElse Nil) ++ _allDeploy
|
||||
|
||||
private[testkit] def roles: Seq[RoleName] = _roles
|
||||
|
||||
}
|
||||
|
||||
object MultiNodeSpec {
|
||||
|
|
@ -115,11 +128,13 @@ object MultiNodeSpec {
|
|||
|
||||
}
|
||||
|
||||
abstract class MultiNodeSpec(val mySelf: RoleName, _system: ActorSystem) extends AkkaSpec(_system) {
|
||||
abstract class MultiNodeSpec(val mySelf: RoleName, _system: ActorSystem, roles: Seq[RoleName], deployments: RoleName ⇒ Seq[String])
|
||||
extends AkkaSpec(_system) {
|
||||
|
||||
import MultiNodeSpec._
|
||||
|
||||
def this(config: MultiNodeConfig) = this(config.mySelf, ActorSystem(AkkaSpec.getCallerName, config.config))
|
||||
def this(config: MultiNodeConfig) =
|
||||
this(config.mySelf, ActorSystem(AkkaSpec.getCallerName, config.config), config.roles, config.deployments)
|
||||
|
||||
/*
|
||||
* Test Class Interface
|
||||
|
|
@ -188,4 +203,28 @@ abstract class MultiNodeSpec(val mySelf: RoleName, _system: ActorSystem) extends
|
|||
testConductor.startClient(mySelf, controllerAddr).await
|
||||
}
|
||||
|
||||
// now add deployments, if so desired
|
||||
|
||||
private case class Replacement(tag: String, role: RoleName) {
|
||||
lazy val addr = node(role).address.toString
|
||||
}
|
||||
private val replacements = roles map (r ⇒ Replacement("@" + r.name + "@", r))
|
||||
private val deployer = system.asInstanceOf[ExtendedActorSystem].provider.deployer
|
||||
deployments(mySelf) foreach { str ⇒
|
||||
val deployString = (str /: replacements) {
|
||||
case (base, r @ Replacement(tag, _)) ⇒
|
||||
base.indexOf(tag) match {
|
||||
case -1 ⇒ base
|
||||
case start ⇒ base.replace(tag, r.addr)
|
||||
}
|
||||
}
|
||||
import scala.collection.JavaConverters._
|
||||
ConfigFactory.parseString(deployString).root.asScala foreach {
|
||||
case (key, value: ConfigObject) ⇒
|
||||
deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
|
||||
case (key, x) ⇒
|
||||
throw new IllegalArgumentException("key " + key + " must map to deployment section, not simple value " + x)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -13,7 +13,7 @@ case class RemoteScope(node: Address) extends Scope {
|
|||
}
|
||||
|
||||
private[akka] class RemoteDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends Deployer(_settings, _pm) {
|
||||
override protected def parseConfig(path: String, config: Config): Option[Deploy] = {
|
||||
override def parseConfig(path: String, config: Config): Option[Deploy] = {
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
super.parseConfig(path, config) match {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue