2012-05-18 15:55:04 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.remote.testkit
|
|
|
|
|
|
2012-06-25 17:09:00 +02:00
|
|
|
import language.implicitConversions
|
2012-09-12 15:12:13 +02:00
|
|
|
import language.postfixOps
|
2012-06-25 17:09:00 +02:00
|
|
|
|
2012-05-18 15:55:04 +02:00
|
|
|
import java.net.InetSocketAddress
|
2012-06-04 21:21:36 +02:00
|
|
|
import com.typesafe.config.{ ConfigObject, ConfigFactory, Config }
|
2012-09-12 15:12:13 +02:00
|
|
|
import akka.actor._
|
2012-07-22 15:33:18 +02:00
|
|
|
import akka.util.Timeout
|
2012-06-04 21:21:36 +02:00
|
|
|
import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName }
|
2012-07-22 15:33:18 +02:00
|
|
|
import akka.remote.RemoteActorRefProvider
|
2012-09-12 15:12:13 +02:00
|
|
|
import akka.testkit.TestKit
|
2012-07-22 15:33:18 +02:00
|
|
|
import scala.concurrent.{ Await, Awaitable }
|
|
|
|
|
import scala.util.control.NonFatal
|
2012-06-29 13:33:20 +02:00
|
|
|
import scala.concurrent.util.Duration
|
|
|
|
|
import scala.concurrent.util.duration._
|
2012-09-12 15:12:13 +02:00
|
|
|
import java.util.concurrent.TimeoutException
|
|
|
|
|
import akka.remote.testconductor.RoleName
|
|
|
|
|
import akka.actor.RootActorPath
|
|
|
|
|
import akka.event.{ Logging, LoggingAdapter }
|
2012-05-18 18:44:53 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Configure the role names and participants of the test, including configuration settings.
|
|
|
|
|
*/
|
|
|
|
|
abstract class MultiNodeConfig {
|
|
|
|
|
|
|
|
|
|
private var _commonConf: Option[Config] = None
|
|
|
|
|
private var _nodeConf = Map[RoleName, Config]()
|
2012-05-24 12:40:52 +02:00
|
|
|
private var _roles = Vector[RoleName]()
|
|
|
|
|
private var _deployments = Map[RoleName, Seq[String]]()
|
|
|
|
|
private var _allDeploy = Vector[String]()
|
2012-05-18 18:44:53 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Register a common base config for all test participants, if so desired.
|
|
|
|
|
*/
|
|
|
|
|
def commonConfig(config: Config): Unit = _commonConf = Some(config)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Register a config override for a specific participant.
|
|
|
|
|
*/
|
|
|
|
|
def nodeConfig(role: RoleName, config: Config): Unit = _nodeConf += role -> config
|
|
|
|
|
|
2012-05-23 16:35:42 +02:00
|
|
|
/**
|
|
|
|
|
* Include for verbose debug logging
|
2012-05-31 17:39:35 +02:00
|
|
|
* @param on when `true` debug Config is returned, otherwise config with info logging
|
2012-05-23 16:35:42 +02:00
|
|
|
*/
|
|
|
|
|
def debugConfig(on: Boolean): Config =
|
|
|
|
|
if (on)
|
|
|
|
|
ConfigFactory.parseString("""
|
|
|
|
|
akka.loglevel = DEBUG
|
|
|
|
|
akka.remote {
|
|
|
|
|
log-received-messages = on
|
|
|
|
|
log-sent-messages = on
|
|
|
|
|
}
|
|
|
|
|
akka.actor.debug {
|
|
|
|
|
receive = on
|
|
|
|
|
fsm = on
|
|
|
|
|
}
|
2012-09-11 10:55:47 +02:00
|
|
|
akka.remote.log-remote-lifecycle-events = on
|
2012-05-23 16:35:42 +02:00
|
|
|
""")
|
2012-05-31 17:39:35 +02:00
|
|
|
else
|
|
|
|
|
ConfigFactory.parseString("akka.loglevel = INFO")
|
2012-05-23 16:35:42 +02:00
|
|
|
|
2012-05-18 18:44:53 +02:00
|
|
|
/**
|
|
|
|
|
* Construct a RoleName and return it, to be used as an identifier in the
|
|
|
|
|
* test. Registration of a role name creates a role which then needs to be
|
|
|
|
|
* filled.
|
|
|
|
|
*/
|
|
|
|
|
def role(name: String): RoleName = {
|
|
|
|
|
if (_roles exists (_.name == name)) throw new IllegalArgumentException("non-unique role name " + name)
|
|
|
|
|
val r = RoleName(name)
|
|
|
|
|
_roles :+= r
|
|
|
|
|
r
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-24 12:40:52 +02:00
|
|
|
def deployOn(role: RoleName, deployment: String): Unit =
|
|
|
|
|
_deployments += role -> ((_deployments get role getOrElse Vector()) :+ deployment)
|
|
|
|
|
|
|
|
|
|
def deployOnAll(deployment: String): Unit = _allDeploy :+= deployment
|
|
|
|
|
|
2012-05-28 15:29:00 +02:00
|
|
|
private[testkit] lazy val myself: RoleName = {
|
2012-05-18 18:44:53 +02:00
|
|
|
require(_roles.size > MultiNodeSpec.selfIndex, "not enough roles declared for this test")
|
|
|
|
|
_roles(MultiNodeSpec.selfIndex)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[testkit] def config: Config = {
|
2012-09-12 15:12:13 +02:00
|
|
|
val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil
|
2012-05-18 18:44:53 +02:00
|
|
|
configs reduce (_ withFallback _)
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-24 12:40:52 +02:00
|
|
|
private[testkit] def deployments(node: RoleName): Seq[String] = (_deployments get node getOrElse Nil) ++ _allDeploy
|
|
|
|
|
|
2012-06-08 09:23:36 +02:00
|
|
|
private[testkit] def roles: Seq[RoleName] = _roles
|
2012-05-24 12:40:52 +02:00
|
|
|
|
2012-05-18 18:44:53 +02:00
|
|
|
}
|
2012-05-18 15:55:04 +02:00
|
|
|
|
|
|
|
|
object MultiNodeSpec {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Names (or IP addresses; must be resolvable using InetAddress.getByName)
|
|
|
|
|
* of all nodes taking part in this test, including symbolic name and host
|
|
|
|
|
* definition:
|
|
|
|
|
*
|
|
|
|
|
* {{{
|
|
|
|
|
* -D"multinode.hosts=host1@workerA.example.com,host2@workerB.example.com"
|
|
|
|
|
* }}}
|
|
|
|
|
*/
|
|
|
|
|
val nodeNames: Seq[String] = Vector.empty ++ (
|
|
|
|
|
Option(System.getProperty("multinode.hosts")) getOrElse
|
|
|
|
|
(throw new IllegalStateException("need system property multinode.hosts to be set")) split ",")
|
|
|
|
|
|
|
|
|
|
require(nodeNames != List(""), "multinode.hosts must not be empty")
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Index of this node in the nodeNames / nodeAddresses lists. The TestConductor
|
|
|
|
|
* is started in “controller” mode on selfIndex 0, i.e. there you can inject
|
|
|
|
|
* failures and shutdown other nodes etc.
|
|
|
|
|
*/
|
|
|
|
|
val selfIndex = Option(Integer.getInteger("multinode.index")) getOrElse
|
|
|
|
|
(throw new IllegalStateException("need system property multinode.index to be set"))
|
|
|
|
|
|
|
|
|
|
require(selfIndex >= 0 && selfIndex < nodeNames.size, "selfIndex out of bounds: " + selfIndex)
|
|
|
|
|
|
2012-09-12 15:12:13 +02:00
|
|
|
private[testkit] val nodeConfig = mapToConfig(Map(
|
2012-05-18 15:55:04 +02:00
|
|
|
"akka.actor.provider" -> "akka.remote.RemoteActorRefProvider",
|
|
|
|
|
"akka.remote.transport" -> "akka.remote.testconductor.TestConductorTransport",
|
|
|
|
|
"akka.remote.netty.hostname" -> nodeNames(selfIndex),
|
|
|
|
|
"akka.remote.netty.port" -> 0))
|
|
|
|
|
|
2012-09-12 15:12:13 +02:00
|
|
|
private[testkit] val baseConfig: Config = ConfigFactory.parseString("""
|
|
|
|
|
akka {
|
|
|
|
|
event-handlers = ["akka.testkit.TestEventListener"]
|
|
|
|
|
loglevel = "WARNING"
|
|
|
|
|
stdout-loglevel = "WARNING"
|
|
|
|
|
actor {
|
|
|
|
|
default-dispatcher {
|
|
|
|
|
executor = "fork-join-executor"
|
|
|
|
|
fork-join-executor {
|
|
|
|
|
parallelism-min = 8
|
|
|
|
|
parallelism-factor = 2.0
|
|
|
|
|
parallelism-max = 8
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
""")
|
|
|
|
|
|
|
|
|
|
private def mapToConfig(map: Map[String, Any]): Config = {
|
|
|
|
|
import scala.collection.JavaConverters._
|
|
|
|
|
ConfigFactory.parseMap(map.asJava)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def getCallerName(clazz: Class[_]): String = {
|
|
|
|
|
val s = Thread.currentThread.getStackTrace map (_.getClassName) drop 1 dropWhile (_ matches ".*MultiNodeSpec.?$")
|
|
|
|
|
val reduced = s.lastIndexWhere(_ == clazz.getName) match {
|
|
|
|
|
case -1 ⇒ s
|
|
|
|
|
case z ⇒ s drop (z + 1)
|
|
|
|
|
}
|
|
|
|
|
reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_")
|
|
|
|
|
}
|
2012-05-18 15:55:04 +02:00
|
|
|
}
|
|
|
|
|
|
2012-05-29 12:50:50 +02:00
|
|
|
/**
|
|
|
|
|
* Note: To be able to run tests with everything ignored or excluded by tags
|
|
|
|
|
* you must not use `testconductor`, or helper methods that use `testconductor`,
|
|
|
|
|
* from the constructor of your test class. Otherwise the controller node might
|
|
|
|
|
* be shutdown before other nodes have completed and you will see errors like:
|
|
|
|
|
* `AskTimeoutException: sending to terminated ref breaks promises`. Using lazy
|
|
|
|
|
* val is fine.
|
|
|
|
|
*/
|
2012-06-08 09:23:36 +02:00
|
|
|
abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: Seq[RoleName], deployments: RoleName ⇒ Seq[String])
|
2012-09-12 15:12:13 +02:00
|
|
|
extends TestKit(_system) with MultiNodeSpecCallbacks {
|
2012-05-18 15:55:04 +02:00
|
|
|
|
|
|
|
|
import MultiNodeSpec._
|
|
|
|
|
|
2012-05-24 12:40:52 +02:00
|
|
|
def this(config: MultiNodeConfig) =
|
2012-09-12 15:12:13 +02:00
|
|
|
this(config.myself, ActorSystem(MultiNodeSpec.getCallerName(classOf[MultiNodeSpec]), ConfigFactory.load(config.config)),
|
2012-06-28 11:32:11 +02:00
|
|
|
config.roles, config.deployments)
|
2012-05-18 15:55:04 +02:00
|
|
|
|
2012-09-12 15:12:13 +02:00
|
|
|
val log: LoggingAdapter = Logging(system, this.getClass)
|
|
|
|
|
|
|
|
|
|
final override def multiNodeSpecBeforeAll {
|
|
|
|
|
atStartup()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
final override def multiNodeSpecAfterAll {
|
|
|
|
|
// wait for all nodes to remove themselves before we shut the conductor down
|
|
|
|
|
if (selfIndex == 0) {
|
|
|
|
|
testConductor.removeNode(myself)
|
|
|
|
|
within(testConductor.Settings.BarrierTimeout.duration) {
|
|
|
|
|
awaitCond {
|
|
|
|
|
testConductor.getNodes.await.filterNot(_ == myself).isEmpty
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
system.shutdown()
|
|
|
|
|
try system.awaitTermination(5 seconds) catch {
|
|
|
|
|
case _: TimeoutException ⇒
|
|
|
|
|
system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
|
|
|
|
|
println(system.asInstanceOf[ActorSystemImpl].printTree)
|
|
|
|
|
}
|
|
|
|
|
atTermination()
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-18 15:55:04 +02:00
|
|
|
/*
|
2012-09-12 15:12:13 +02:00
|
|
|
* Test Class Interface
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Override this method to do something when the whole test is starting up.
|
|
|
|
|
*/
|
|
|
|
|
protected def atStartup(): Unit = {}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Override this method to do something when the whole test is terminating.
|
2012-05-18 15:55:04 +02:00
|
|
|
*/
|
2012-09-12 15:12:13 +02:00
|
|
|
protected def atTermination(): Unit = {}
|
2012-05-18 15:55:04 +02:00
|
|
|
|
2012-06-08 09:23:36 +02:00
|
|
|
/**
|
|
|
|
|
* All registered roles
|
|
|
|
|
*/
|
|
|
|
|
def roles: Seq[RoleName] = _roles
|
|
|
|
|
|
2012-05-18 15:55:04 +02:00
|
|
|
/**
|
|
|
|
|
* TO BE DEFINED BY USER: Defines the number of participants required for starting the test. This
|
|
|
|
|
* might not be equals to the number of nodes available to the test.
|
|
|
|
|
*
|
|
|
|
|
* Must be a `def`:
|
|
|
|
|
* {{{
|
|
|
|
|
* def initialParticipants = 5
|
|
|
|
|
* }}}
|
|
|
|
|
*/
|
|
|
|
|
def initialParticipants: Int
|
|
|
|
|
require(initialParticipants > 0, "initialParticipants must be a 'def' or early initializer, and it must be greater zero")
|
|
|
|
|
require(initialParticipants <= nodeNames.size, "not enough nodes to run this test")
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Access to the barriers, failure injection, etc. The extension will have
|
|
|
|
|
* been started either in Conductor or Player mode when the constructor of
|
|
|
|
|
* MultiNodeSpec finishes, i.e. do not call the start*() methods yourself!
|
|
|
|
|
*/
|
|
|
|
|
val testConductor: TestConductorExt = TestConductor(system)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Execute the given block of code only on the given nodes (names according
|
|
|
|
|
* to the `roleMap`).
|
|
|
|
|
*/
|
2012-05-18 18:44:53 +02:00
|
|
|
def runOn(nodes: RoleName*)(thunk: ⇒ Unit): Unit = {
|
2012-05-28 15:29:00 +02:00
|
|
|
if (nodes exists (_ == myself)) {
|
2012-05-18 15:55:04 +02:00
|
|
|
thunk
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-18 18:44:53 +02:00
|
|
|
def ifNode[T](nodes: RoleName*)(yes: ⇒ T)(no: ⇒ T): T = {
|
2012-05-28 15:29:00 +02:00
|
|
|
if (nodes exists (_ == myself)) yes else no
|
2012-05-18 15:55:04 +02:00
|
|
|
}
|
|
|
|
|
|
2012-06-13 13:52:58 +02:00
|
|
|
/**
|
|
|
|
|
* Enter the named barriers in the order given. Use the remaining duration from
|
|
|
|
|
* the innermost enclosing `within` block or the default `BarrierTimeout`
|
|
|
|
|
*/
|
2012-06-15 14:39:47 +02:00
|
|
|
def enterBarrier(name: String*) {
|
2012-06-13 13:52:58 +02:00
|
|
|
testConductor.enter(Timeout.durationToTimeout(remainingOr(testConductor.Settings.BarrierTimeout.duration)), name)
|
|
|
|
|
}
|
|
|
|
|
|
2012-05-18 15:55:04 +02:00
|
|
|
/**
|
|
|
|
|
* Query the controller for the transport address of the given node (by role name) and
|
|
|
|
|
* return that as an ActorPath for easy composition:
|
|
|
|
|
*
|
|
|
|
|
* {{{
|
|
|
|
|
* val serviceA = system.actorFor(node("master") / "user" / "serviceA")
|
|
|
|
|
* }}}
|
|
|
|
|
*/
|
2012-05-18 18:44:53 +02:00
|
|
|
def node(role: RoleName): ActorPath = RootActorPath(testConductor.getAddressFor(role).await)
|
2012-05-18 15:55:04 +02:00
|
|
|
|
|
|
|
|
/**
|
2012-06-13 13:52:58 +02:00
|
|
|
* Enrich `.await()` onto all Awaitables, using remaining duration from the innermost
|
2012-06-15 14:39:47 +02:00
|
|
|
* enclosing `within` block or QueryTimeout.
|
2012-05-18 15:55:04 +02:00
|
|
|
*/
|
|
|
|
|
implicit def awaitHelper[T](w: Awaitable[T]) = new AwaitHelper(w)
|
|
|
|
|
class AwaitHelper[T](w: Awaitable[T]) {
|
2012-06-15 14:39:47 +02:00
|
|
|
def await: T = Await.result(w, remainingOr(testConductor.Settings.QueryTimeout.duration))
|
2012-05-18 15:55:04 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Implementation (i.e. wait for start etc.)
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
private val controllerAddr = new InetSocketAddress(nodeNames(0), 4711)
|
|
|
|
|
if (selfIndex == 0) {
|
2012-06-15 14:39:47 +02:00
|
|
|
Await.result(testConductor.startController(initialParticipants, myself, controllerAddr),
|
|
|
|
|
testConductor.Settings.BarrierTimeout.duration)
|
2012-05-18 15:55:04 +02:00
|
|
|
} else {
|
2012-06-15 14:39:47 +02:00
|
|
|
Await.result(testConductor.startClient(myself, controllerAddr),
|
|
|
|
|
testConductor.Settings.BarrierTimeout.duration)
|
2012-05-18 15:55:04 +02:00
|
|
|
}
|
|
|
|
|
|
2012-05-24 12:40:52 +02:00
|
|
|
// now add deployments, if so desired
|
|
|
|
|
|
|
|
|
|
private case class Replacement(tag: String, role: RoleName) {
|
|
|
|
|
lazy val addr = node(role).address.toString
|
|
|
|
|
}
|
|
|
|
|
private val replacements = roles map (r ⇒ Replacement("@" + r.name + "@", r))
|
|
|
|
|
private val deployer = system.asInstanceOf[ExtendedActorSystem].provider.deployer
|
2012-05-28 15:29:00 +02:00
|
|
|
deployments(myself) foreach { str ⇒
|
2012-05-24 12:40:52 +02:00
|
|
|
val deployString = (str /: replacements) {
|
|
|
|
|
case (base, r @ Replacement(tag, _)) ⇒
|
|
|
|
|
base.indexOf(tag) match {
|
2012-05-29 11:41:22 +02:00
|
|
|
case -1 ⇒ base
|
|
|
|
|
case start ⇒
|
|
|
|
|
val replaceWith = try
|
|
|
|
|
r.addr
|
|
|
|
|
catch {
|
|
|
|
|
case NonFatal(e) ⇒
|
|
|
|
|
// might happen if all test cases are ignored (excluded) and
|
|
|
|
|
// controller node is finished/exited before r.addr is run
|
|
|
|
|
// on the other nodes
|
|
|
|
|
val unresolved = "akka://unresolved-replacement-" + r.role.name
|
|
|
|
|
log.warning(unresolved + " due to: " + e.getMessage)
|
|
|
|
|
unresolved
|
|
|
|
|
}
|
|
|
|
|
base.replace(tag, replaceWith)
|
2012-05-24 12:40:52 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
import scala.collection.JavaConverters._
|
|
|
|
|
ConfigFactory.parseString(deployString).root.asScala foreach {
|
|
|
|
|
case (key, value: ConfigObject) ⇒
|
|
|
|
|
deployer.parseConfig(key, value.toConfig) foreach deployer.deploy
|
|
|
|
|
case (key, x) ⇒
|
|
|
|
|
throw new IllegalArgumentException("key " + key + " must map to deployment section, not simple value " + x)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-06-29 10:57:34 +02:00
|
|
|
// useful to see which jvm is running which role, used by LogRoleReplace utility
|
|
|
|
|
log.info("Role [{}] started with address [{}]", myself.name,
|
|
|
|
|
system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address)
|
2012-06-04 08:49:05 +02:00
|
|
|
|
2012-09-12 15:12:13 +02:00
|
|
|
}
|
2012-06-21 13:16:35 +02:00
|
|
|
|
2012-09-12 15:12:13 +02:00
|
|
|
/**
|
|
|
|
|
* Use this to hook MultiNodeSpec into your test framework lifecycle, either by having your test extend MultiNodeSpec
|
2012-09-12 15:50:02 +02:00
|
|
|
* and call these methods or by creating a trait that calls them and then mixing that trait with your test together
|
|
|
|
|
* with MultiNodeSpec.
|
2012-09-12 15:12:13 +02:00
|
|
|
*
|
|
|
|
|
* Example trait for MultiNodeSpec with ScalaTest
|
|
|
|
|
*
|
|
|
|
|
* {{{
|
|
|
|
|
* trait STMultiNodeSpec extends MultiNodeSpecCallbacks with WordSpec with MustMatchers with BeforeAndAfterAll {
|
|
|
|
|
* override def beforeAll() = multiNodeSpecBeforeAll()
|
|
|
|
|
* override def afterAll() = multiNodeSpecAfterAll()
|
|
|
|
|
* }
|
|
|
|
|
* }}}
|
|
|
|
|
*/
|
|
|
|
|
trait MultiNodeSpecCallbacks {
|
|
|
|
|
/**
|
|
|
|
|
* Call this before the start of the test run. NOT before every test case.
|
|
|
|
|
*/
|
|
|
|
|
def multiNodeSpecBeforeAll(): Unit
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Call this after the all test cases have run. NOT after every test case.
|
|
|
|
|
*/
|
|
|
|
|
def multiNodeSpecAfterAll(): Unit
|
2012-06-04 21:21:36 +02:00
|
|
|
}
|