Avoid TestConductorTransport unless needed, see #2586
* Due to the shutdown issues the TestConductorTransport is by default not active, but it's easy to activate it and exception will be thrown if trying to use the featues that require it, i.e blackhole, passThrow and throttle * Documented
This commit is contained in:
parent
89c1f66b1f
commit
495ace37f4
6 changed files with 41 additions and 2 deletions
|
|
@ -27,6 +27,8 @@ case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends Mul
|
||||||
failure-detector.threshold = 4
|
failure-detector.threshold = 4
|
||||||
}""")).
|
}""")).
|
||||||
withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||||
|
|
||||||
|
testTransport(on = true)
|
||||||
}
|
}
|
||||||
|
|
||||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = true)
|
class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = true)
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,8 @@ case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: B
|
||||||
val fourth = role("fourth")
|
val fourth = role("fourth")
|
||||||
|
|
||||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||||
|
|
||||||
|
testTransport(on = true)
|
||||||
}
|
}
|
||||||
|
|
||||||
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true)
|
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true)
|
||||||
|
|
|
||||||
|
|
@ -207,6 +207,9 @@ surprising ways.
|
||||||
|
|
||||||
* Don't issue a shutdown of the first node. The first node is the controller and if it shuts down your test will break.
|
* Don't issue a shutdown of the first node. The first node is the controller and if it shuts down your test will break.
|
||||||
|
|
||||||
|
* To be able to use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the ``TestConductorTranport``
|
||||||
|
by specifying ``testTransport(on = true)`` in your MultiNodeConfig.
|
||||||
|
|
||||||
* Throttling, shutdown and other failure injections can only be done from the first node, which again is the controller.
|
* Throttling, shutdown and other failure injections can only be done from the first node, which again is the controller.
|
||||||
|
|
||||||
* Don't ask for the address of a node using ``node(address)`` after the node has been shut down. Grab the address before
|
* Don't ask for the address of a node using ``node(address)`` after the node has been shut down. Grab the address before
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ import akka.util.{ Timeout }
|
||||||
import scala.concurrent.util.{ Deadline, Duration }
|
import scala.concurrent.util.{ Deadline, Duration }
|
||||||
import scala.reflect.classTag
|
import scala.reflect.classTag
|
||||||
import scala.concurrent.util.FiniteDuration
|
import scala.concurrent.util.FiniteDuration
|
||||||
|
import akka.ConfigurationException
|
||||||
|
|
||||||
sealed trait Direction {
|
sealed trait Direction {
|
||||||
def includes(other: Direction): Boolean
|
def includes(other: Direction): Boolean
|
||||||
|
|
@ -102,6 +103,9 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* To use this feature you must activate the TestConductorTranport by
|
||||||
|
* specifying `testTransport(on = true)` in your MultiNodeConfig.
|
||||||
|
*
|
||||||
* Make the remoting pipeline on the node throttle data sent to or received
|
* Make the remoting pipeline on the node throttle data sent to or received
|
||||||
* from the given remote peer. Throttling works by delaying packet submission
|
* from the given remote peer. Throttling works by delaying packet submission
|
||||||
* within the netty pipeline until the packet would have been completely sent
|
* within the netty pipeline until the packet would have been completely sent
|
||||||
|
|
@ -121,10 +125,14 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Double): Future[Done] = {
|
def throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Double): Future[Done] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
|
requireTestConductorTranport()
|
||||||
controller ? Throttle(node, target, direction, rateMBit.toFloat) mapTo classTag[Done]
|
controller ? Throttle(node, target, direction, rateMBit.toFloat) mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* To use this feature you must activate the TestConductorTranport by
|
||||||
|
* specifying `testTransport(on = true)` in your MultiNodeConfig.
|
||||||
|
*
|
||||||
* Switch the Netty pipeline of the remote support into blackhole mode for
|
* Switch the Netty pipeline of the remote support into blackhole mode for
|
||||||
* sending and/or receiving: it will just drop all messages right before
|
* sending and/or receiving: it will just drop all messages right before
|
||||||
* submitting them to the Socket or right after receiving them from the
|
* submitting them to the Socket or right after receiving them from the
|
||||||
|
|
@ -136,10 +144,19 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
|
def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
|
requireTestConductorTranport()
|
||||||
controller ? Throttle(node, target, direction, 0f) mapTo classTag[Done]
|
controller ? Throttle(node, target, direction, 0f) mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def requireTestConductorTranport(): Unit =
|
||||||
|
if (!transport.isInstanceOf[TestConductorTransport])
|
||||||
|
throw new ConfigurationException("To use this feature you must activate the TestConductorTranport by " +
|
||||||
|
"specifying `testTransport(on = true)` in your MultiNodeConfig.")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* To use this feature you must activate the TestConductorTranport by
|
||||||
|
* specifying `testTransport(on = true)` in your MultiNodeConfig.
|
||||||
|
*
|
||||||
* Switch the Netty pipeline of the remote support into pass through mode for
|
* Switch the Netty pipeline of the remote support into pass through mode for
|
||||||
* sending and/or receiving.
|
* sending and/or receiving.
|
||||||
*
|
*
|
||||||
|
|
@ -149,6 +166,7 @@ trait Conductor { this: TestConductorExt ⇒
|
||||||
*/
|
*/
|
||||||
def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
|
def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
|
||||||
import Settings.QueryTimeout
|
import Settings.QueryTimeout
|
||||||
|
requireTestConductorTranport()
|
||||||
controller ? Throttle(node, target, direction, -1f) mapTo classTag[Done]
|
controller ? Throttle(node, target, direction, -1f) mapTo classTag[Done]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,7 @@ import scala.concurrent.util.Duration
|
||||||
import scala.concurrent.util.duration._
|
import scala.concurrent.util.duration._
|
||||||
import java.util.concurrent.TimeoutException
|
import java.util.concurrent.TimeoutException
|
||||||
import akka.remote.testconductor.RoleName
|
import akka.remote.testconductor.RoleName
|
||||||
|
import akka.remote.testconductor.TestConductorTransport
|
||||||
import akka.actor.RootActorPath
|
import akka.actor.RootActorPath
|
||||||
import akka.event.{ Logging, LoggingAdapter }
|
import akka.event.{ Logging, LoggingAdapter }
|
||||||
|
|
||||||
|
|
@ -32,6 +33,7 @@ abstract class MultiNodeConfig {
|
||||||
private var _roles = Vector[RoleName]()
|
private var _roles = Vector[RoleName]()
|
||||||
private var _deployments = Map[RoleName, Seq[String]]()
|
private var _deployments = Map[RoleName, Seq[String]]()
|
||||||
private var _allDeploy = Vector[String]()
|
private var _allDeploy = Vector[String]()
|
||||||
|
private var _testTransport = false
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a common base config for all test participants, if so desired.
|
* Register a common base config for all test participants, if so desired.
|
||||||
|
|
@ -81,13 +83,24 @@ abstract class MultiNodeConfig {
|
||||||
|
|
||||||
def deployOnAll(deployment: String): Unit = _allDeploy :+= deployment
|
def deployOnAll(deployment: String): Unit = _allDeploy :+= deployment
|
||||||
|
|
||||||
|
/**
|
||||||
|
* To be able to use `blackhole`, `passThrough`, and `throttle` you must
|
||||||
|
* activate the TestConductorTranport by specifying
|
||||||
|
* `testTransport(on = true)` in your MultiNodeConfig.
|
||||||
|
*/
|
||||||
|
def testTransport(on: Boolean): Unit = _testTransport = on
|
||||||
|
|
||||||
private[testkit] lazy val myself: RoleName = {
|
private[testkit] lazy val myself: RoleName = {
|
||||||
require(_roles.size > MultiNodeSpec.selfIndex, "not enough roles declared for this test")
|
require(_roles.size > MultiNodeSpec.selfIndex, "not enough roles declared for this test")
|
||||||
_roles(MultiNodeSpec.selfIndex)
|
_roles(MultiNodeSpec.selfIndex)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[testkit] def config: Config = {
|
private[testkit] def config: Config = {
|
||||||
val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil
|
val transportConfig =
|
||||||
|
if (_testTransport) ConfigFactory.parseString("akka.remote.transport=" + classOf[TestConductorTransport].getName)
|
||||||
|
else ConfigFactory.empty
|
||||||
|
|
||||||
|
val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: transportConfig :: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil
|
||||||
configs reduce (_ withFallback _)
|
configs reduce (_ withFallback _)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -175,7 +188,6 @@ object MultiNodeSpec {
|
||||||
|
|
||||||
private[testkit] val nodeConfig = mapToConfig(Map(
|
private[testkit] val nodeConfig = mapToConfig(Map(
|
||||||
"akka.actor.provider" -> "akka.remote.RemoteActorRefProvider",
|
"akka.actor.provider" -> "akka.remote.RemoteActorRefProvider",
|
||||||
"akka.remote.transport" -> "akka.remote.testconductor.TestConductorTransport",
|
|
||||||
"akka.remote.netty.hostname" -> selfName,
|
"akka.remote.netty.hostname" -> selfName,
|
||||||
"akka.remote.netty.port" -> selfPort))
|
"akka.remote.netty.port" -> selfPort))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,8 @@ object TestConductorMultiJvmSpec extends MultiNodeConfig {
|
||||||
|
|
||||||
val master = role("master")
|
val master = role("master")
|
||||||
val slave = role("slave")
|
val slave = role("slave")
|
||||||
|
|
||||||
|
testTransport(on = true)
|
||||||
}
|
}
|
||||||
|
|
||||||
class TestConductorMultiJvmNode1 extends TestConductorSpec
|
class TestConductorMultiJvmNode1 extends TestConductorSpec
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue