diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index 0c98b178a3..57223465fe 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -27,6 +27,8 @@ case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends Mul failure-detector.threshold = 4 }""")). withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet))) + + testTransport(on = true) } class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = true) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index c95462c7d4..999f318679 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -21,6 +21,8 @@ case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: B val fourth = role("fourth") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + + testTransport(on = true) } class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true) diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala index 03fef8da54..870028a3f2 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala @@ -22,6 +22,8 @@ import akka.testkit.TestProbe object ReliableProxySpec extends MultiNodeConfig { val local = role("local") val remote = role("remote") + + testTransport(on = true) } class ReliableProxyMultiJvmNode1 extends ReliableProxySpec diff --git a/akka-docs/rst/dev/multi-node-testing.rst b/akka-docs/rst/dev/multi-node-testing.rst index eca5139a9a..b098317054 100644 --- a/akka-docs/rst/dev/multi-node-testing.rst +++ b/akka-docs/rst/dev/multi-node-testing.rst @@ -207,6 +207,9 @@ surprising ways. * Don't issue a shutdown of the first node. The first node is the controller and if it shuts down your test will break. + * To be able to use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the ``TestConductorTranport`` + by specifying ``testTransport(on = true)`` in your MultiNodeConfig. + * Throttling, shutdown and other failure injections can only be done from the first node, which again is the controller. * Don't ask for the address of a node using ``node(address)`` after the node has been shut down. Grab the address before diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index 7aaa6d72b3..4646e78a0a 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -24,6 +24,7 @@ import akka.util.{ Timeout } import scala.concurrent.util.{ Deadline, Duration } import scala.reflect.classTag import scala.concurrent.util.FiniteDuration +import akka.ConfigurationException sealed trait Direction { def includes(other: Direction): Boolean @@ -114,6 +115,10 @@ trait Conductor { this: TestConductorExt ⇒ * determining how much to send, leading to the correct output rate, but with * increased latency. * + * ====Note==== + * To use this feature you must activate the `TestConductorTranport` + * by specifying `testTransport(on = true)` in your MultiNodeConfig. + * * @param node is the symbolic name of the node which is to be affected * @param target is the symbolic name of the other node to which connectivity shall be throttled * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` @@ -121,6 +126,7 @@ trait Conductor { this: TestConductorExt ⇒ */ def throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Double): Future[Done] = { import Settings.QueryTimeout + requireTestConductorTranport() controller ? Throttle(node, target, direction, rateMBit.toFloat) mapTo classTag[Done] } @@ -130,25 +136,40 @@ trait Conductor { this: TestConductorExt ⇒ * submitting them to the Socket or right after receiving them from the * Socket. * + * ====Note==== + * To use this feature you must activate the `TestConductorTranport` + * by specifying `testTransport(on = true)` in your MultiNodeConfig. + * * @param node is the symbolic name of the node which is to be affected * @param target is the symbolic name of the other node to which connectivity shall be impeded * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` */ def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = { import Settings.QueryTimeout + requireTestConductorTranport() controller ? Throttle(node, target, direction, 0f) mapTo classTag[Done] } + private def requireTestConductorTranport(): Unit = + if (!transport.isInstanceOf[TestConductorTransport]) + throw new ConfigurationException("To use this feature you must activate the TestConductorTranport by " + + "specifying `testTransport(on = true)` in your MultiNodeConfig.") + /** * Switch the Netty pipeline of the remote support into pass through mode for * sending and/or receiving. * + * ====Note==== + * To use this feature you must activate the `TestConductorTranport` + * by specifying `testTransport(on = true)` in your MultiNodeConfig. + * * @param node is the symbolic name of the node which is to be affected * @param target is the symbolic name of the other node to which connectivity shall be impeded * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` */ def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = { import Settings.QueryTimeout + requireTestConductorTranport() controller ? Throttle(node, target, direction, -1f) mapTo classTag[Done] } diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala index 4469ce308a..1945c89a1f 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala @@ -29,8 +29,14 @@ object TestConductor extends ExtensionKey[TestConductorExt] { * [[akka.actor.Extension]]. Please follow the aforementioned links for * more information. * - * This extension requires the `akka.actor.provider` - * to be a [[akka.remote.RemoteActorRefProvider]]. + * ====Note==== + * This extension requires the `akka.actor.provider` + * to be a [[akka.remote.RemoteActorRefProvider]]. + * + * To use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the + * `TestConductorTranport` by specifying `testTransport(on = true)` in your + * MultiNodeConfig. + * */ class TestConductorExt(val system: ExtendedActorSystem) extends Extension with Conductor with Player { diff --git a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index a842a547a1..5081cde959 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -19,6 +19,7 @@ import scala.concurrent.util.Duration import scala.concurrent.util.duration._ import java.util.concurrent.TimeoutException import akka.remote.testconductor.RoleName +import akka.remote.testconductor.TestConductorTransport import akka.actor.RootActorPath import akka.event.{ Logging, LoggingAdapter } @@ -32,6 +33,7 @@ abstract class MultiNodeConfig { private var _roles = Vector[RoleName]() private var _deployments = Map[RoleName, Seq[String]]() private var _allDeploy = Vector[String]() + private var _testTransport = false /** * Register a common base config for all test participants, if so desired. @@ -81,13 +83,24 @@ abstract class MultiNodeConfig { def deployOnAll(deployment: String): Unit = _allDeploy :+= deployment + /** + * To be able to use `blackhole`, `passThrough`, and `throttle` you must + * activate the TestConductorTranport by specifying + * `testTransport(on = true)` in your MultiNodeConfig. + */ + def testTransport(on: Boolean): Unit = _testTransport = on + private[testkit] lazy val myself: RoleName = { require(_roles.size > MultiNodeSpec.selfIndex, "not enough roles declared for this test") _roles(MultiNodeSpec.selfIndex) } private[testkit] def config: Config = { - val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil + val transportConfig = + if (_testTransport) ConfigFactory.parseString("akka.remote.transport=" + classOf[TestConductorTransport].getName) + else ConfigFactory.empty + + val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: transportConfig :: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil configs reduce (_ withFallback _) } @@ -175,7 +188,6 @@ object MultiNodeSpec { private[testkit] val nodeConfig = mapToConfig(Map( "akka.actor.provider" -> "akka.remote.RemoteActorRefProvider", - "akka.remote.transport" -> "akka.remote.testconductor.TestConductorTransport", "akka.remote.netty.hostname" -> selfName, "akka.remote.netty.port" -> selfPort)) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala index 3a49490e1a..97f5827b1b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala @@ -23,6 +23,8 @@ object TestConductorMultiJvmSpec extends MultiNodeConfig { val master = role("master") val slave = role("slave") + + testTransport(on = true) } class TestConductorMultiJvmNode1 extends TestConductorSpec