diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index 0c98b178a3..57223465fe 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -27,6 +27,8 @@ case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends Mul failure-detector.threshold = 4 }""")). withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) + + testTransport(on = true) } class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = true) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index c95462c7d4..999f318679 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -21,6 +21,8 @@ case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: B val fourth = role("fourth") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) } class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) diff --git a/akka-docs/rst/dev/multi-node-testing.rst b/akka-docs/rst/dev/multi-node-testing.rst index eca5139a9a..b098317054 100644 --- a/akka-docs/rst/dev/multi-node-testing.rst +++ b/akka-docs/rst/dev/multi-node-testing.rst @@ -207,6 +207,9 @@ surprising ways. * Don't issue a shutdown of the first node. The first node is the controller and if it shuts down your test will break. + * To be able to use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the ``TestConductorTranport`` + by specifying ``testTransport(on = true)`` in your MultiNodeConfig. + * Throttling, shutdown and other failure injections can only be done from the first node, which again is the controller. * Don't ask for the address of a node using ``node(address)`` after the node has been shut down. Grab the address before diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index 7aaa6d72b3..fca4d62763 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -24,6 +24,7 @@ import akka.util.{ Timeout } import scala.concurrent.util.{ Deadline, Duration } import scala.reflect.classTag import scala.concurrent.util.FiniteDuration +import akka.ConfigurationException sealed trait Direction { def includes(other: Direction): Boolean @@ -102,6 +103,9 @@ trait Conductor { this: TestConductorExt ⇒ } /** + * To use this feature you must activate the TestConductorTranport by + * specifying `testTransport(on = true)` in your MultiNodeConfig. + * * Make the remoting pipeline on the node throttle data sent to or received * from the given remote peer. Throttling works by delaying packet submission * within the netty pipeline until the packet would have been completely sent @@ -121,10 +125,14 @@ trait Conductor { this: TestConductorExt ⇒ */ def throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Double): Future[Done] = { import Settings.QueryTimeout + requireTestConductorTranport() controller ? Throttle(node, target, direction, rateMBit.toFloat) mapTo classTag[Done] } /** + * To use this feature you must activate the TestConductorTranport by + * specifying `testTransport(on = true)` in your MultiNodeConfig. + * * Switch the Netty pipeline of the remote support into blackhole mode for * sending and/or receiving: it will just drop all messages right before * submitting them to the Socket or right after receiving them from the @@ -136,10 +144,19 @@ trait Conductor { this: TestConductorExt ⇒ */ def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = { import Settings.QueryTimeout + requireTestConductorTranport() controller ? Throttle(node, target, direction, 0f) mapTo classTag[Done] } + private def requireTestConductorTranport(): Unit = + if (!transport.isInstanceOf[TestConductorTransport]) + throw new ConfigurationException("To use this feature you must activate the TestConductorTranport by " + + "specifying `testTransport(on = true)` in your MultiNodeConfig.") + /** + * To use this feature you must activate the TestConductorTranport by + * specifying `testTransport(on = true)` in your MultiNodeConfig. + * * Switch the Netty pipeline of the remote support into pass through mode for * sending and/or receiving. * @@ -149,6 +166,7 @@ trait Conductor { this: TestConductorExt ⇒ */ def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = { import Settings.QueryTimeout + requireTestConductorTranport() controller ? Throttle(node, target, direction, -1f) mapTo classTag[Done] } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index a842a547a1..5081cde959 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -19,6 +19,7 @@ import scala.concurrent.util.Duration import scala.concurrent.util.duration._ import java.util.concurrent.TimeoutException import akka.remote.testconductor.RoleName +import akka.remote.testconductor.TestConductorTransport import akka.actor.RootActorPath import akka.event.{ Logging, LoggingAdapter } @@ -32,6 +33,7 @@ abstract class MultiNodeConfig { private var _roles = Vector[RoleName]() private var _deployments = Map[RoleName, Seq[String]]() private var _allDeploy = Vector[String]() + private var _testTransport = false /** * Register a common base config for all test participants, if so desired. @@ -81,13 +83,24 @@ abstract class MultiNodeConfig { def deployOnAll(deployment: String): Unit = _allDeploy :+= deployment + /** + * To be able to use `blackhole`, `passThrough`, and `throttle` you must + * activate the TestConductorTranport by specifying + * `testTransport(on = true)` in your MultiNodeConfig. + */ + def testTransport(on: Boolean): Unit = _testTransport = on + private[testkit] lazy val myself: RoleName = { require(_roles.size > MultiNodeSpec.selfIndex, "not enough roles declared for this test") _roles(MultiNodeSpec.selfIndex) } private[testkit] def config: Config = { - val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil + val transportConfig = + if (_testTransport) ConfigFactory.parseString("akka.remote.transport=" + classOf[TestConductorTransport].getName) + else ConfigFactory.empty + + val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: transportConfig :: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil configs reduce (_ withFallback _) } @@ -175,7 +188,6 @@ object MultiNodeSpec { private[testkit] val nodeConfig = mapToConfig(Map( "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider", - "akka.remote.transport" -> "akka.remote.testconductor.TestConductorTransport", "akka.remote.netty.hostname" -> selfName, "akka.remote.netty.port" -> selfPort)) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala index 3a49490e1a..97f5827b1b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala @@ -23,6 +23,8 @@ object TestConductorMultiJvmSpec extends MultiNodeConfig { val master = role("master") val slave = role("slave") + + testTransport(on = true) } class TestConductorMultiJvmNode1 extends TestConductorSpec