Make multi node tests use the within() aware barrier

This commit is contained in:
Björn Antonsson 2012-06-13 14:55:33 +02:00
parent 463e62926e
commit 5714d8327f
25 changed files with 90 additions and 90 deletions

View file

@ -47,23 +47,23 @@ abstract class ClientDowningNodeThatIsUnreachableSpec
// mark 'third' node as DOWN // mark 'third' node as DOWN
cluster.down(thirdAddress) cluster.down(thirdAddress)
testConductor.enter("down-third-node") enter("down-third-node")
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress))
cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false)
} }
runOn(third) { runOn(third) {
testConductor.enter("down-third-node") enter("down-third-node")
} }
runOn(second, fourth) { runOn(second, fourth) {
testConductor.enter("down-third-node") enter("down-third-node")
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress))
} }
testConductor.enter("await-completion") enter("await-completion")
} }
} }
} }

View file

@ -43,7 +43,7 @@ abstract class ClientDowningNodeThatIsUpSpec
runOn(first) { runOn(first) {
// mark 'third' node as DOWN // mark 'third' node as DOWN
cluster.down(thirdAddress) cluster.down(thirdAddress)
testConductor.enter("down-third-node") enter("down-third-node")
markNodeAsUnavailable(thirdAddress) markNodeAsUnavailable(thirdAddress)
@ -52,16 +52,16 @@ abstract class ClientDowningNodeThatIsUpSpec
} }
runOn(third) { runOn(third) {
testConductor.enter("down-third-node") enter("down-third-node")
} }
runOn(second, fourth) { runOn(second, fourth) {
testConductor.enter("down-third-node") enter("down-third-node")
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress))
} }
testConductor.enter("await-completion") enter("await-completion")
} }
} }
} }

View file

@ -46,12 +46,12 @@ abstract class ConvergenceSpec
// doesn't join immediately // doesn't join immediately
} }
testConductor.enter("after-1") enter("after-1")
} }
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore { "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore {
val thirdAddress = node(third).address val thirdAddress = node(third).address
testConductor.enter("before-shutdown") enter("before-shutdown")
runOn(first) { runOn(first) {
// kill 'third' node // kill 'third' node
@ -78,7 +78,7 @@ abstract class ConvergenceSpec
} }
} }
testConductor.enter("after-2") enter("after-2")
} }
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore { "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore {
@ -116,7 +116,7 @@ abstract class ConvergenceSpec
} }
} }
testConductor.enter("after-3") enter("after-3")
} }
} }
} }

View file

@ -43,7 +43,7 @@ abstract class GossipingAccrualFailureDetectorSpec
cluster.failureDetector.isAvailable(secondAddress) must be(true) cluster.failureDetector.isAvailable(secondAddress) must be(true)
cluster.failureDetector.isAvailable(thirdAddress) must be(true) cluster.failureDetector.isAvailable(thirdAddress) must be(true)
testConductor.enter("after-1") enter("after-1")
} }
"mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in { "mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in {
@ -59,7 +59,7 @@ abstract class GossipingAccrualFailureDetectorSpec
cluster.failureDetector.isAvailable(secondAddress) must be(true) cluster.failureDetector.isAvailable(secondAddress) must be(true)
} }
testConductor.enter("after-2") enter("after-2")
} }
} }
} }

View file

@ -46,7 +46,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec
runOn(first) { runOn(first) {
// kill 'fourth' node // kill 'fourth' node
testConductor.shutdown(fourth, 0) testConductor.shutdown(fourth, 0)
testConductor.enter("down-fourth-node") enter("down-fourth-node")
// mark the node as unreachable in the failure detector // mark the node as unreachable in the failure detector
markNodeAsUnavailable(fourthAddress) markNodeAsUnavailable(fourthAddress)
@ -57,26 +57,26 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec
} }
runOn(fourth) { runOn(fourth) {
testConductor.enter("down-fourth-node") enter("down-fourth-node")
} }
runOn(second, third) { runOn(second, third) {
testConductor.enter("down-fourth-node") enter("down-fourth-node")
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds)
} }
testConductor.enter("await-completion-1") enter("await-completion-1")
} }
"be able to DOWN a 'middle' node that is UNREACHABLE" taggedAs LongRunningTest in { "be able to DOWN a 'middle' node that is UNREACHABLE" taggedAs LongRunningTest in {
val secondAddress = node(second).address val secondAddress = node(second).address
testConductor.enter("before-down-second-node") enter("before-down-second-node")
runOn(first) { runOn(first) {
// kill 'second' node // kill 'second' node
testConductor.shutdown(second, 0) testConductor.shutdown(second, 0)
testConductor.enter("down-second-node") enter("down-second-node")
// mark the node as unreachable in the failure detector // mark the node as unreachable in the failure detector
markNodeAsUnavailable(secondAddress) markNodeAsUnavailable(secondAddress)
@ -87,16 +87,16 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec
} }
runOn(second) { runOn(second) {
testConductor.enter("down-second-node") enter("down-second-node")
} }
runOn(third) { runOn(third) {
testConductor.enter("down-second-node") enter("down-second-node")
awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30 seconds) awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30 seconds)
} }
testConductor.enter("await-completion-2") enter("await-completion-2")
} }
} }
} }

View file

@ -50,7 +50,7 @@ abstract class LeaderElectionSpec
assertLeaderIn(sortedRoles) assertLeaderIn(sortedRoles)
} }
testConductor.enter("after") enter("after")
} }
def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = { def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = {
@ -64,33 +64,33 @@ abstract class LeaderElectionSpec
case `controller` case `controller`
val leaderAddress = node(leader).address val leaderAddress = node(leader).address
testConductor.enter("before-shutdown") enter("before-shutdown")
testConductor.shutdown(leader, 0) testConductor.shutdown(leader, 0)
testConductor.enter("after-shutdown", "after-down", "completed") enter("after-shutdown", "after-down", "completed")
markNodeAsUnavailable(leaderAddress) markNodeAsUnavailable(leaderAddress)
case `leader` case `leader`
testConductor.enter("before-shutdown", "after-shutdown") enter("before-shutdown", "after-shutdown")
// this node will be shutdown by the controller and doesn't participate in more barriers // this node will be shutdown by the controller and doesn't participate in more barriers
case `aUser` case `aUser`
val leaderAddress = node(leader).address val leaderAddress = node(leader).address
testConductor.enter("before-shutdown", "after-shutdown") enter("before-shutdown", "after-shutdown")
// user marks the shutdown leader as DOWN // user marks the shutdown leader as DOWN
cluster.down(leaderAddress) cluster.down(leaderAddress)
testConductor.enter("after-down", "completed") enter("after-down", "completed")
markNodeAsUnavailable(leaderAddress) markNodeAsUnavailable(leaderAddress)
case _ if remainingRoles.contains(myself) case _ if remainingRoles.contains(myself)
// remaining cluster nodes, not shutdown // remaining cluster nodes, not shutdown
testConductor.enter("before-shutdown", "after-shutdown", "after-down") enter("before-shutdown", "after-shutdown", "after-down")
awaitUpConvergence(currentRoles.size - 1) awaitUpConvergence(currentRoles.size - 1)
val nextExpectedLeader = remainingRoles.head val nextExpectedLeader = remainingRoles.head
cluster.isLeader must be(myself == nextExpectedLeader) cluster.isLeader must be(myself == nextExpectedLeader)
assertLeaderIn(remainingRoles) assertLeaderIn(remainingRoles)
testConductor.enter("completed") enter("completed")
} }
} }

View file

@ -47,12 +47,12 @@ abstract class MembershipChangeListenerExitingSpec
awaitClusterUp(first, second, third) awaitClusterUp(first, second, third)
runOn(first) { runOn(first) {
testConductor.enter("registered-listener") enter("registered-listener")
cluster.leave(secondAddress) cluster.leave(secondAddress)
} }
runOn(second) { runOn(second) {
testConductor.enter("registered-listener") enter("registered-listener")
} }
runOn(third) { runOn(third) {
@ -63,11 +63,11 @@ abstract class MembershipChangeListenerExitingSpec
exitingLatch.countDown() exitingLatch.countDown()
} }
}) })
testConductor.enter("registered-listener") enter("registered-listener")
exitingLatch.await exitingLatch.await
} }
testConductor.enter("finished") enter("finished")
} }
} }
} }

View file

@ -44,18 +44,18 @@ abstract class MembershipChangeListenerJoinSpec
joinLatch.countDown() joinLatch.countDown()
} }
}) })
testConductor.enter("registered-listener") enter("registered-listener")
joinLatch.await joinLatch.await
cluster.convergence.isDefined must be(true) cluster.convergence.isDefined must be(true)
} }
runOn(second) { runOn(second) {
testConductor.enter("registered-listener") enter("registered-listener")
cluster.join(firstAddress) cluster.join(firstAddress)
} }
testConductor.enter("after") enter("after")
} }
} }
} }

View file

@ -44,12 +44,12 @@ abstract class MembershipChangeListenerLeavingSpec
awaitClusterUp(first, second, third) awaitClusterUp(first, second, third)
runOn(first) { runOn(first) {
testConductor.enter("registered-listener") enter("registered-listener")
cluster.leave(secondAddress) cluster.leave(secondAddress)
} }
runOn(second) { runOn(second) {
testConductor.enter("registered-listener") enter("registered-listener")
} }
runOn(third) { runOn(third) {
@ -60,11 +60,11 @@ abstract class MembershipChangeListenerLeavingSpec
latch.countDown() latch.countDown()
} }
}) })
testConductor.enter("registered-listener") enter("registered-listener")
latch.await latch.await
} }
testConductor.enter("finished") enter("finished")
} }
} }
} }

View file

@ -44,16 +44,16 @@ abstract class MembershipChangeListenerUpSpec
latch.countDown() latch.countDown()
} }
}) })
testConductor.enter("listener-1-registered") enter("listener-1-registered")
cluster.join(firstAddress) cluster.join(firstAddress)
latch.await latch.await
} }
runOn(third) { runOn(third) {
testConductor.enter("listener-1-registered") enter("listener-1-registered")
} }
testConductor.enter("after-1") enter("after-1")
} }
"(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { "(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
@ -65,7 +65,7 @@ abstract class MembershipChangeListenerUpSpec
latch.countDown() latch.countDown()
} }
}) })
testConductor.enter("listener-2-registered") enter("listener-2-registered")
runOn(third) { runOn(third) {
cluster.join(firstAddress) cluster.join(firstAddress)
@ -73,7 +73,7 @@ abstract class MembershipChangeListenerUpSpec
latch.await latch.await
testConductor.enter("after-2") enter("after-2")
} }
} }
} }

View file

@ -71,14 +71,14 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec
// make sure that the node-to-join is started before other join // make sure that the node-to-join is started before other join
startClusterNode() startClusterNode()
} }
testConductor.enter(roles.head.name + "-started") enter(roles.head.name + "-started")
if (roles.tail.contains(myself)) { if (roles.tail.contains(myself)) {
cluster.join(node(roles.head).address) cluster.join(node(roles.head).address)
} }
if (upConvergence && roles.contains(myself)) { if (upConvergence && roles.contains(myself)) {
awaitUpConvergence(numberOfMembers = roles.length) awaitUpConvergence(numberOfMembers = roles.length)
} }
testConductor.enter(roles.map(_.name).mkString("-") + "-joined") enter(roles.map(_.name).mkString("-") + "-joined")
} }
/** /**

View file

@ -45,7 +45,7 @@ abstract class NodeJoinSpec
awaitCond(cluster.latestGossip.members.exists { member member.address == secondAddress && member.status == MemberStatus.Joining }) awaitCond(cluster.latestGossip.members.exists { member member.address == secondAddress && member.status == MemberStatus.Joining })
testConductor.enter("after") enter("after")
} }
} }
} }

View file

@ -44,7 +44,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
runOn(first) { runOn(first) {
cluster.leave(secondAddress) cluster.leave(secondAddress)
} }
testConductor.enter("second-left") enter("second-left")
runOn(first, third) { runOn(first, third) {
// verify that the 'second' node is no longer part of the 'members' set // verify that the 'second' node is no longer part of the 'members' set
@ -59,7 +59,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
isRemoved.get.address must be(secondAddress) isRemoved.get.address must be(secondAddress)
} }
testConductor.enter("finished") enter("finished")
} }
} }
} }

View file

@ -50,7 +50,7 @@ abstract class NodeLeavingAndExitingSpec
runOn(first) { runOn(first) {
cluster.leave(secondAddress) cluster.leave(secondAddress)
} }
testConductor.enter("second-left") enter("second-left")
runOn(first, third) { runOn(first, third) {
@ -69,7 +69,7 @@ abstract class NodeLeavingAndExitingSpec
hasExited.get.address must be(secondAddress) hasExited.get.address must be(secondAddress)
} }
testConductor.enter("finished") enter("finished")
} }
} }
} }

View file

@ -44,7 +44,7 @@ abstract class NodeLeavingSpec
runOn(first) { runOn(first) {
cluster.leave(secondAddress) cluster.leave(secondAddress)
} }
testConductor.enter("second-left") enter("second-left")
runOn(first, third) { runOn(first, third) {
awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving))
@ -54,7 +54,7 @@ abstract class NodeLeavingSpec
hasLeft.get.address must be(secondAddress) hasLeft.get.address must be(secondAddress)
} }
testConductor.enter("finished") enter("finished")
} }
} }
} }

View file

@ -38,7 +38,7 @@ abstract class NodeMembershipSpec
runOn(first) { runOn(first) {
startClusterNode() startClusterNode()
} }
testConductor.enter("first-started") enter("first-started")
runOn(first, second) { runOn(first, second) {
cluster.join(firstAddress) cluster.join(firstAddress)
@ -50,7 +50,7 @@ abstract class NodeMembershipSpec
awaitCond(cluster.convergence.isDefined) awaitCond(cluster.convergence.isDefined)
} }
testConductor.enter("after-1") enter("after-1")
} }
"(when three nodes) start gossiping to each other so that all nodes gets the same gossip info" taggedAs LongRunningTest in { "(when three nodes) start gossiping to each other so that all nodes gets the same gossip info" taggedAs LongRunningTest in {
@ -66,7 +66,7 @@ abstract class NodeMembershipSpec
} }
awaitCond(cluster.convergence.isDefined) awaitCond(cluster.convergence.isDefined)
testConductor.enter("after-2") enter("after-2")
} }
} }
} }

View file

@ -33,7 +33,7 @@ abstract class NodeUpSpec
awaitClusterUp(first, second) awaitClusterUp(first, second)
testConductor.enter("after-1") enter("after-1")
} }
"be unaffected when joining again" taggedAs LongRunningTest in { "be unaffected when joining again" taggedAs LongRunningTest in {
@ -45,12 +45,12 @@ abstract class NodeUpSpec
unexpected.set(members) unexpected.set(members)
} }
}) })
testConductor.enter("listener-registered") enter("listener-registered")
runOn(second) { runOn(second) {
cluster.join(node(first).address) cluster.join(node(first).address)
} }
testConductor.enter("joined-again") enter("joined-again")
// let it run for a while to make sure that nothing bad happens // let it run for a while to make sure that nothing bad happens
for (n 1 to 20) { for (n 1 to 20) {
@ -59,7 +59,7 @@ abstract class NodeUpSpec
cluster.latestGossip.members.forall(_.status == MemberStatus.Up) must be(true) cluster.latestGossip.members.forall(_.status == MemberStatus.Up) must be(true)
} }
testConductor.enter("after-2") enter("after-2")
} }
} }
} }

View file

@ -43,7 +43,7 @@ abstract class SingletonClusterSpec
cluster.isSingletonCluster must be(false) cluster.isSingletonCluster must be(false)
assertLeader(first, second) assertLeader(first, second)
testConductor.enter("after-1") enter("after-1")
} }
"become singleton cluster when one node is shutdown" taggedAs LongRunningTest in { "become singleton cluster when one node is shutdown" taggedAs LongRunningTest in {
@ -58,7 +58,7 @@ abstract class SingletonClusterSpec
assertLeader(first) assertLeader(first)
} }
testConductor.enter("after-2") enter("after-2")
} }
} }
} }

View file

@ -62,7 +62,7 @@ abstract class SunnyWeatherSpec
}) })
for (n 1 to 30) { for (n 1 to 30) {
testConductor.enter("period-" + n) enter("period-" + n)
unexpected.get must be(null) unexpected.get must be(null)
awaitUpConvergence(roles.size) awaitUpConvergence(roles.size)
assertLeaderIn(roles) assertLeaderIn(roles)
@ -70,7 +70,7 @@ abstract class SunnyWeatherSpec
1.seconds.sleep 1.seconds.sleep
} }
testConductor.enter("after") enter("after")
} }
} }
} }

View file

@ -47,7 +47,7 @@ class LookupRemoteActorSpec extends MultiNodeSpec(LookupRemoteActorMultiJvmSpec)
val masterAddress = testConductor.getAddressFor(master).await val masterAddress = testConductor.getAddressFor(master).await
(hello ? "identify").await.asInstanceOf[ActorRef].path.address must equal(masterAddress) (hello ? "identify").await.asInstanceOf[ActorRef].path.address must equal(masterAddress)
} }
testConductor.enter("done") enter("done")
} }
} }

View file

@ -56,7 +56,7 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec)
system.stop(actor) system.stop(actor)
} }
testConductor.enter("done") enter("done")
} }
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef (with deployOnAll)" taggedAs LongRunningTest in { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef (with deployOnAll)" taggedAs LongRunningTest in {
@ -74,7 +74,7 @@ class NewRemoteActorSpec extends MultiNodeSpec(NewRemoteActorMultiJvmSpec)
system.stop(actor) system.stop(actor)
} }
testConductor.enter("done") enter("done")
} }
} }
} }

View file

@ -55,11 +55,11 @@ class RandomRoutedRemoteActorSpec extends MultiNodeSpec(RandomRoutedRemoteActorM
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in {
runOn(first, second, third) { runOn(first, second, third) {
testConductor.enter("start", "broadcast-end", "end", "done") enter("start", "broadcast-end", "end", "done")
} }
runOn(fourth) { runOn(fourth) {
testConductor.enter("start") enter("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RandomRouter()), "service-hello") val actor = system.actorOf(Props[SomeActor].withRouter(RandomRouter()), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true) actor.isInstanceOf[RoutedActorRef] must be(true)
@ -76,17 +76,17 @@ class RandomRoutedRemoteActorSpec extends MultiNodeSpec(RandomRoutedRemoteActorM
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1)) case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
} }
testConductor.enter("broadcast-end") enter("broadcast-end")
actor ! Broadcast(PoisonPill) actor ! Broadcast(PoisonPill)
testConductor.enter("end") enter("end")
replies.values foreach { _ must be > (0) } replies.values foreach { _ must be > (0) }
replies.get(node(fourth).address) must be(None) replies.get(node(fourth).address) must be(None)
// shut down the actor before we let the other node(s) shut down so we don't try to send // shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node // "Terminate" to a shut down node
system.stop(actor) system.stop(actor)
testConductor.enter("done") enter("done")
} }
} }
} }

View file

@ -55,11 +55,11 @@ class RoundRobinRoutedRemoteActorSpec extends MultiNodeSpec(RoundRobinRoutedRemo
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in {
runOn(first, second, third) { runOn(first, second, third) {
testConductor.enter("start", "broadcast-end", "end", "done") enter("start", "broadcast-end", "end", "done")
} }
runOn(fourth) { runOn(fourth) {
testConductor.enter("start") enter("start")
val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello") val actor = system.actorOf(Props[SomeActor].withRouter(RoundRobinRouter()), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true) actor.isInstanceOf[RoutedActorRef] must be(true)
@ -76,17 +76,17 @@ class RoundRobinRoutedRemoteActorSpec extends MultiNodeSpec(RoundRobinRoutedRemo
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1)) case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
} }
testConductor.enter("broadcast-end") enter("broadcast-end")
actor ! Broadcast(PoisonPill) actor ! Broadcast(PoisonPill)
testConductor.enter("end") enter("end")
replies.values foreach { _ must be(iterationCount) } replies.values foreach { _ must be(iterationCount) }
replies.get(node(fourth).address) must be(None) replies.get(node(fourth).address) must be(None)
// shut down the actor before we let the other node(s) shut down so we don't try to send // shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node // "Terminate" to a shut down node
system.stop(actor) system.stop(actor)
testConductor.enter("done") enter("done")
} }
} }
} }

View file

@ -55,11 +55,11 @@ class ScatterGatherRoutedRemoteActorSpec extends MultiNodeSpec(ScatterGatherRout
"be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in { "be locally instantiated on a remote node and be able to communicate through its RemoteActorRef" taggedAs LongRunningTest in {
runOn(first, second, third) { runOn(first, second, third) {
testConductor.enter("start", "broadcast-end", "end", "done") enter("start", "broadcast-end", "end", "done")
} }
runOn(fourth) { runOn(fourth) {
testConductor.enter("start") enter("start")
val actor = system.actorOf(Props[SomeActor].withRouter(ScatterGatherFirstCompletedRouter(within = 10 seconds)), "service-hello") val actor = system.actorOf(Props[SomeActor].withRouter(ScatterGatherFirstCompletedRouter(within = 10 seconds)), "service-hello")
actor.isInstanceOf[RoutedActorRef] must be(true) actor.isInstanceOf[RoutedActorRef] must be(true)
@ -76,17 +76,17 @@ class ScatterGatherRoutedRemoteActorSpec extends MultiNodeSpec(ScatterGatherRout
case (replyMap, address) replyMap + (address -> (replyMap(address) + 1)) case (replyMap, address) replyMap + (address -> (replyMap(address) + 1))
} }
testConductor.enter("broadcast-end") enter("broadcast-end")
actor ! Broadcast(PoisonPill) actor ! Broadcast(PoisonPill)
testConductor.enter("end") enter("end")
replies.values.sum must be === connectionCount * iterationCount replies.values.sum must be === connectionCount * iterationCount
replies.get(node(fourth).address) must be(None) replies.get(node(fourth).address) must be(None)
// shut down the actor before we let the other node(s) shut down so we don't try to send // shut down the actor before we let the other node(s) shut down so we don't try to send
// "Terminate" to a shut down node // "Terminate" to a shut down node
system.stop(actor) system.stop(actor)
testConductor.enter("done") enter("done")
} }
} }
} }

View file

@ -46,7 +46,7 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with Im
}), "echo") }), "echo")
} }
testConductor.enter("name") enter("name")
} }
"support throttling of network connections" taggedAs LongRunningTest in { "support throttling of network connections" taggedAs LongRunningTest in {
@ -62,7 +62,7 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with Im
testConductor.throttle(slave, master, Direction.Send, rateMBit = 0.01).await testConductor.throttle(slave, master, Direction.Send, rateMBit = 0.01).await
} }
testConductor.enter("throttled_send") enter("throttled_send")
runOn(slave) { runOn(slave) {
for (i 0 to 9) echo ! i for (i 0 to 9) echo ! i
@ -73,14 +73,14 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with Im
receiveN(9) must be(1 to 9) receiveN(9) must be(1 to 9)
} }
testConductor.enter("throttled_send2") enter("throttled_send2")
runOn(master) { runOn(master) {
testConductor.throttle(slave, master, Direction.Send, -1).await testConductor.throttle(slave, master, Direction.Send, -1).await
testConductor.throttle(slave, master, Direction.Receive, rateMBit = 0.01).await testConductor.throttle(slave, master, Direction.Receive, rateMBit = 0.01).await
} }
testConductor.enter("throttled_recv") enter("throttled_recv")
runOn(slave) { runOn(slave) {
for (i 10 to 19) echo ! i for (i 10 to 19) echo ! i
@ -98,7 +98,7 @@ class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with Im
receiveN(9) must be(11 to 19) receiveN(9) must be(11 to 19)
} }
testConductor.enter("throttled_recv2") enter("throttled_recv2")
runOn(master) { runOn(master) {
testConductor.throttle(slave, master, Direction.Receive, -1).await testConductor.throttle(slave, master, Direction.Receive, -1).await