Merge pull request #779 from akka/wip-2586-testconductor-transport-patriknw
Avoid TestConductorTransport unless needed, see #2586
This commit is contained in:
commit
a471545eec
8 changed files with 54 additions and 4 deletions
|
|
@ -27,6 +27,8 @@ case class SplitBrainMultiNodeConfig(failureDetectorPuppet: Boolean) extends Mul
|
|||
failure-detector.threshold = 4
|
||||
}""")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig(failureDetectorPuppet)))
|
||||
|
||||
testTransport(on = true)
|
||||
}
|
||||
|
||||
class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec(failureDetectorPuppet = true)
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ case class UnreachableNodeRejoinsClusterMultiNodeConfig(failureDetectorPuppet: B
|
|||
val fourth = role("fourth")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
testTransport(on = true)
|
||||
}
|
||||
|
||||
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec(failureDetectorPuppet = true)
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import akka.testkit.TestProbe
|
|||
object ReliableProxySpec extends MultiNodeConfig {
|
||||
val local = role("local")
|
||||
val remote = role("remote")
|
||||
|
||||
testTransport(on = true)
|
||||
}
|
||||
|
||||
class ReliableProxyMultiJvmNode1 extends ReliableProxySpec
|
||||
|
|
|
|||
|
|
@ -207,6 +207,9 @@ surprising ways.
|
|||
|
||||
* Don't issue a shutdown of the first node. The first node is the controller and if it shuts down your test will break.
|
||||
|
||||
* To be able to use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the ``TestConductorTranport``
|
||||
by specifying ``testTransport(on = true)`` in your MultiNodeConfig.
|
||||
|
||||
* Throttling, shutdown and other failure injections can only be done from the first node, which again is the controller.
|
||||
|
||||
* Don't ask for the address of a node using ``node(address)`` after the node has been shut down. Grab the address before
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import akka.util.{ Timeout }
|
|||
import scala.concurrent.util.{ Deadline, Duration }
|
||||
import scala.reflect.classTag
|
||||
import scala.concurrent.util.FiniteDuration
|
||||
import akka.ConfigurationException
|
||||
|
||||
sealed trait Direction {
|
||||
def includes(other: Direction): Boolean
|
||||
|
|
@ -114,6 +115,10 @@ trait Conductor { this: TestConductorExt ⇒
|
|||
* determining how much to send, leading to the correct output rate, but with
|
||||
* increased latency.
|
||||
*
|
||||
* ====Note====
|
||||
* To use this feature you must activate the `TestConductorTranport`
|
||||
* by specifying `testTransport(on = true)` in your MultiNodeConfig.
|
||||
*
|
||||
* @param node is the symbolic name of the node which is to be affected
|
||||
* @param target is the symbolic name of the other node to which connectivity shall be throttled
|
||||
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
|
||||
|
|
@ -121,6 +126,7 @@ trait Conductor { this: TestConductorExt ⇒
|
|||
*/
|
||||
def throttle(node: RoleName, target: RoleName, direction: Direction, rateMBit: Double): Future[Done] = {
|
||||
import Settings.QueryTimeout
|
||||
requireTestConductorTranport()
|
||||
controller ? Throttle(node, target, direction, rateMBit.toFloat) mapTo classTag[Done]
|
||||
}
|
||||
|
||||
|
|
@ -130,25 +136,40 @@ trait Conductor { this: TestConductorExt ⇒
|
|||
* submitting them to the Socket or right after receiving them from the
|
||||
* Socket.
|
||||
*
|
||||
* ====Note====
|
||||
* To use this feature you must activate the `TestConductorTranport`
|
||||
* by specifying `testTransport(on = true)` in your MultiNodeConfig.
|
||||
*
|
||||
* @param node is the symbolic name of the node which is to be affected
|
||||
* @param target is the symbolic name of the other node to which connectivity shall be impeded
|
||||
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
|
||||
*/
|
||||
def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
|
||||
import Settings.QueryTimeout
|
||||
requireTestConductorTranport()
|
||||
controller ? Throttle(node, target, direction, 0f) mapTo classTag[Done]
|
||||
}
|
||||
|
||||
private def requireTestConductorTranport(): Unit =
|
||||
if (!transport.isInstanceOf[TestConductorTransport])
|
||||
throw new ConfigurationException("To use this feature you must activate the TestConductorTranport by " +
|
||||
"specifying `testTransport(on = true)` in your MultiNodeConfig.")
|
||||
|
||||
/**
|
||||
* Switch the Netty pipeline of the remote support into pass through mode for
|
||||
* sending and/or receiving.
|
||||
*
|
||||
* ====Note====
|
||||
* To use this feature you must activate the `TestConductorTranport`
|
||||
* by specifying `testTransport(on = true)` in your MultiNodeConfig.
|
||||
*
|
||||
* @param node is the symbolic name of the node which is to be affected
|
||||
* @param target is the symbolic name of the other node to which connectivity shall be impeded
|
||||
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
|
||||
*/
|
||||
def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
|
||||
import Settings.QueryTimeout
|
||||
requireTestConductorTranport()
|
||||
controller ? Throttle(node, target, direction, -1f) mapTo classTag[Done]
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,8 +29,14 @@ object TestConductor extends ExtensionKey[TestConductorExt] {
|
|||
* [[akka.actor.Extension]]. Please follow the aforementioned links for
|
||||
* more information.
|
||||
*
|
||||
* <b>This extension requires the `akka.actor.provider`
|
||||
* to be a [[akka.remote.RemoteActorRefProvider]].</b>
|
||||
* ====Note====
|
||||
* This extension requires the `akka.actor.provider`
|
||||
* to be a [[akka.remote.RemoteActorRefProvider]].
|
||||
*
|
||||
* To use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the
|
||||
* `TestConductorTranport` by specifying `testTransport(on = true)` in your
|
||||
* MultiNodeConfig.
|
||||
*
|
||||
*/
|
||||
class TestConductorExt(val system: ExtendedActorSystem) extends Extension with Conductor with Player {
|
||||
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import scala.concurrent.util.Duration
|
|||
import scala.concurrent.util.duration._
|
||||
import java.util.concurrent.TimeoutException
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testconductor.TestConductorTransport
|
||||
import akka.actor.RootActorPath
|
||||
import akka.event.{ Logging, LoggingAdapter }
|
||||
|
||||
|
|
@ -32,6 +33,7 @@ abstract class MultiNodeConfig {
|
|||
private var _roles = Vector[RoleName]()
|
||||
private var _deployments = Map[RoleName, Seq[String]]()
|
||||
private var _allDeploy = Vector[String]()
|
||||
private var _testTransport = false
|
||||
|
||||
/**
|
||||
* Register a common base config for all test participants, if so desired.
|
||||
|
|
@ -81,13 +83,24 @@ abstract class MultiNodeConfig {
|
|||
|
||||
def deployOnAll(deployment: String): Unit = _allDeploy :+= deployment
|
||||
|
||||
/**
|
||||
* To be able to use `blackhole`, `passThrough`, and `throttle` you must
|
||||
* activate the TestConductorTranport by specifying
|
||||
* `testTransport(on = true)` in your MultiNodeConfig.
|
||||
*/
|
||||
def testTransport(on: Boolean): Unit = _testTransport = on
|
||||
|
||||
private[testkit] lazy val myself: RoleName = {
|
||||
require(_roles.size > MultiNodeSpec.selfIndex, "not enough roles declared for this test")
|
||||
_roles(MultiNodeSpec.selfIndex)
|
||||
}
|
||||
|
||||
private[testkit] def config: Config = {
|
||||
val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil
|
||||
val transportConfig =
|
||||
if (_testTransport) ConfigFactory.parseString("akka.remote.transport=" + classOf[TestConductorTransport].getName)
|
||||
else ConfigFactory.empty
|
||||
|
||||
val configs = (_nodeConf get myself).toList ::: _commonConf.toList ::: transportConfig :: MultiNodeSpec.nodeConfig :: MultiNodeSpec.baseConfig :: Nil
|
||||
configs reduce (_ withFallback _)
|
||||
}
|
||||
|
||||
|
|
@ -175,7 +188,6 @@ object MultiNodeSpec {
|
|||
|
||||
private[testkit] val nodeConfig = mapToConfig(Map(
|
||||
"akka.actor.provider" -> "akka.remote.RemoteActorRefProvider",
|
||||
"akka.remote.transport" -> "akka.remote.testconductor.TestConductorTransport",
|
||||
"akka.remote.netty.hostname" -> selfName,
|
||||
"akka.remote.netty.port" -> selfPort))
|
||||
|
||||
|
|
|
|||
|
|
@ -23,6 +23,8 @@ object TestConductorMultiJvmSpec extends MultiNodeConfig {
|
|||
|
||||
val master = role("master")
|
||||
val slave = role("slave")
|
||||
|
||||
testTransport(on = true)
|
||||
}
|
||||
|
||||
class TestConductorMultiJvmNode1 extends TestConductorSpec
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue