Use awaitAssert in cluster tests, see #3168

This commit is contained in:
Patrik Nordwall 2013-03-24 22:01:57 +01:00
parent 118917d2be
commit 806fc0c525
20 changed files with 119 additions and 144 deletions

View file

@ -82,11 +82,11 @@ abstract class ClusterDeathWatchSpec
enterBarrier("second-terminated") enterBarrier("second-terminated")
markNodeAsUnavailable(third) markNodeAsUnavailable(third)
awaitCond(clusterView.members.forall(_.address != address(third))) awaitAssert(clusterView.members.map(_.address) must not contain (address(third)))
awaitCond(clusterView.unreachableMembers.exists(_.address == address(third))) awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(third)))
cluster.down(third) cluster.down(third)
// removed // removed
awaitCond(clusterView.unreachableMembers.forall(_.address != address(third))) awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(third)))
expectMsg(path3) expectMsg(path3)
enterBarrier("third-terminated") enterBarrier("third-terminated")
@ -98,11 +98,11 @@ abstract class ClusterDeathWatchSpec
enterBarrier("watch-established") enterBarrier("watch-established")
runOn(third) { runOn(third) {
markNodeAsUnavailable(second) markNodeAsUnavailable(second)
awaitCond(clusterView.members.forall(_.address != address(second))) awaitAssert(clusterView.members.map(_.address) must not contain (address(second)))
awaitCond(clusterView.unreachableMembers.exists(_.address == address(second))) awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(second)))
cluster.down(second) cluster.down(second)
// removed // removed
awaitCond(clusterView.unreachableMembers.forall(_.address != address(second))) awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(second)))
} }
enterBarrier("second-terminated") enterBarrier("second-terminated")
enterBarrier("third-terminated") enterBarrier("third-terminated")
@ -137,11 +137,11 @@ abstract class ClusterDeathWatchSpec
enterBarrier("hello-deployed") enterBarrier("hello-deployed")
markNodeAsUnavailable(first) markNodeAsUnavailable(first)
awaitCond(clusterView.members.forall(_.address != address(first))) awaitAssert(clusterView.members.map(_.address) must not contain (address(first)))
awaitCond(clusterView.unreachableMembers.exists(_.address == address(first))) awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(address(first)))
cluster.down(first) cluster.down(first)
// removed // removed
awaitCond(clusterView.unreachableMembers.forall(_.address != address(first))) awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(first)))
val t = expectMsgType[Terminated] val t = expectMsgType[Terminated]
t.actor must be(hello) t.actor must be(hello)

View file

@ -38,8 +38,8 @@ abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSp
"and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) { "and gossip metrics around the node ring" taggedAs LongRunningTest in within(60 seconds) {
awaitClusterUp(roles: _*) awaitClusterUp(roles: _*)
enterBarrier("cluster-started") enterBarrier("cluster-started")
awaitCond(clusterView.members.filter(_.status == MemberStatus.Up).size == roles.size) awaitAssert(clusterView.members.count(_.status == MemberStatus.Up) must be(roles.size))
awaitCond(clusterView.clusterMetrics.size == roles.size) awaitAssert(clusterView.clusterMetrics.size must be(roles.size))
val collector = MetricsCollector(cluster.system, cluster.settings) val collector = MetricsCollector(cluster.system, cluster.settings)
collector.sample.metrics.size must be > (3) collector.sample.metrics.size must be > (3)
enterBarrier("after") enterBarrier("after")
@ -50,7 +50,7 @@ abstract class ClusterMetricsSpec extends MultiNodeSpec(ClusterMetricsMultiJvmSp
} }
enterBarrier("first-left") enterBarrier("first-left")
runOn(second, third, fourth, fifth) { runOn(second, third, fourth, fifth) {
awaitCond(clusterView.clusterMetrics.size == (roles.size - 1)) awaitAssert(clusterView.clusterMetrics.size must be(roles.size - 1))
} }
enterBarrier("finished") enterBarrier("finished")
} }

View file

@ -69,9 +69,9 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig)
within(28 seconds) { within(28 seconds) {
// third becomes unreachable // third becomes unreachable
awaitCond(clusterView.unreachableMembers.size == 1) awaitAssert(clusterView.unreachableMembers.size must be(1))
awaitCond(clusterView.members.size == 2) awaitAssert(clusterView.members.size must be(2))
awaitCond(clusterView.members.forall(_.status == MemberStatus.Up)) awaitAssert(clusterView.members.map(_.status) must be(Set(MemberStatus.Up)))
awaitSeenSameState(first, second) awaitSeenSameState(first, second)
// still one unreachable // still one unreachable
clusterView.unreachableMembers.size must be(1) clusterView.unreachableMembers.size must be(1)
@ -96,7 +96,7 @@ abstract class ConvergenceSpec(multiNodeConfig: ConvergenceMultiNodeConfig)
runOn(first, second, fourth) { runOn(first, second, fourth) {
for (n 1 to 5) { for (n 1 to 5) {
awaitCond(clusterView.members.size == 2) awaitAssert(clusterView.members.size must be(2))
awaitSeenSameState(first, second, fourth) awaitSeenSameState(first, second, fourth)
memberStatus(first) must be(Some(MemberStatus.Up)) memberStatus(first) must be(Some(MemberStatus.Up))
memberStatus(second) must be(Some(MemberStatus.Up)) memberStatus(second) must be(Some(MemberStatus.Up))

View file

@ -48,9 +48,9 @@ abstract class InitialHeartbeatSpec
runOn(first) { runOn(first) {
within(10 seconds) { within(10 seconds) {
awaitCond { awaitAssert {
cluster.sendCurrentClusterState(testActor) cluster.sendCurrentClusterState(testActor)
expectMsgType[CurrentClusterState].members.exists(_.address == secondAddress) expectMsgType[CurrentClusterState].members.map(_.address) must contain(secondAddress)
} }
} }
} }

View file

@ -84,13 +84,13 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig
// detect failure // detect failure
markNodeAsUnavailable(leaderAddress) markNodeAsUnavailable(leaderAddress)
awaitCond(clusterView.unreachableMembers.exists(_.address == leaderAddress)) awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(leaderAddress))
enterBarrier("after-unavailable" + n) enterBarrier("after-unavailable" + n)
// user marks the shutdown leader as DOWN // user marks the shutdown leader as DOWN
cluster.down(leaderAddress) cluster.down(leaderAddress)
// removed // removed
awaitCond(clusterView.unreachableMembers.forall(_.address != leaderAddress)) awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (leaderAddress))
enterBarrier("after-down" + n, "completed" + n) enterBarrier("after-down" + n, "completed" + n)
case _ if remainingRoles.contains(myself) case _ if remainingRoles.contains(myself)
@ -98,7 +98,7 @@ abstract class LeaderElectionSpec(multiNodeConfig: LeaderElectionMultiNodeConfig
val leaderAddress = address(leader) val leaderAddress = address(leader)
enterBarrier("before-shutdown" + n, "after-shutdown" + n) enterBarrier("before-shutdown" + n, "after-shutdown" + n)
awaitCond(clusterView.unreachableMembers.exists(_.address == leaderAddress)) awaitAssert(clusterView.unreachableMembers.map(_.address) must contain(leaderAddress))
enterBarrier("after-unavailable" + n) enterBarrier("after-unavailable" + n)
enterBarrier("after-down" + n) enterBarrier("after-down" + n)

View file

@ -77,19 +77,19 @@ abstract class LeaderLeavingSpec
enterBarrier("leader-left") enterBarrier("leader-left")
val expectedAddresses = roles.toSet map address val expectedAddresses = roles.toSet map address
awaitCond(clusterView.members.map(_.address) == expectedAddresses) awaitAssert(clusterView.members.map(_.address) must be(expectedAddresses))
// verify that the LEADER is EXITING // verify that the LEADER is EXITING
exitingLatch.await exitingLatch.await
// verify that the LEADER is no longer part of the 'members' set // verify that the LEADER is no longer part of the 'members' set
awaitCond(clusterView.members.forall(_.address != oldLeaderAddress)) awaitAssert(clusterView.members.map(_.address) must not contain (oldLeaderAddress))
// verify that the LEADER is not part of the 'unreachable' set // verify that the LEADER is not part of the 'unreachable' set
awaitCond(clusterView.unreachableMembers.forall(_.address != oldLeaderAddress)) awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (oldLeaderAddress))
// verify that we have a new LEADER // verify that we have a new LEADER
awaitCond(clusterView.leader != oldLeaderAddress) awaitAssert(clusterView.leader must not be (oldLeaderAddress))
} }
enterBarrier("finished") enterBarrier("finished")

View file

@ -67,8 +67,8 @@ abstract class MBeanSpec
} }
awaitClusterUp(first) awaitClusterUp(first)
runOn(first) { runOn(first) {
awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up") awaitAssert(mbeanServer.getAttribute(mbeanName, "MemberStatus") must be("Up"))
awaitCond(mbeanServer.getAttribute(mbeanName, "Leader") == address(first).toString) awaitAssert(mbeanServer.getAttribute(mbeanName, "Leader") must be(address(first).toString))
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(true) mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(true)
mbeanServer.getAttribute(mbeanName, "Members") must be(address(first).toString) mbeanServer.getAttribute(mbeanName, "Members") must be(address(first).toString)
mbeanServer.getAttribute(mbeanName, "Unreachable") must be("") mbeanServer.getAttribute(mbeanName, "Unreachable") must be("")
@ -85,11 +85,11 @@ abstract class MBeanSpec
awaitMembersUp(4) awaitMembersUp(4)
assertMembers(clusterView.members, roles.map(address(_)): _*) assertMembers(clusterView.members, roles.map(address(_)): _*)
awaitCond(mbeanServer.getAttribute(mbeanName, "MemberStatus") == "Up") awaitAssert(mbeanServer.getAttribute(mbeanName, "MemberStatus") must be("Up"))
val expectedMembers = roles.sorted.map(address(_)).mkString(",") val expectedMembers = roles.sorted.map(address(_)).mkString(",")
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers) awaitAssert(mbeanServer.getAttribute(mbeanName, "Members") must be(expectedMembers))
val expectedLeader = address(roleOfLeader()) val expectedLeader = address(roleOfLeader())
awaitCond(mbeanServer.getAttribute(mbeanName, "Leader") == expectedLeader.toString) awaitAssert(mbeanServer.getAttribute(mbeanName, "Leader") must be(expectedLeader.toString))
mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(false) mbeanServer.getAttribute(mbeanName, "Singleton").asInstanceOf[Boolean] must be(false)
enterBarrier("after-4") enterBarrier("after-4")
@ -103,9 +103,9 @@ abstract class MBeanSpec
enterBarrier("fourth-shutdown") enterBarrier("fourth-shutdown")
runOn(first, second, third) { runOn(first, second, third) {
awaitCond(mbeanServer.getAttribute(mbeanName, "Unreachable") == fourthAddress.toString) awaitAssert(mbeanServer.getAttribute(mbeanName, "Unreachable") must be(fourthAddress.toString))
val expectedMembers = Seq(first, second, third).sorted.map(address(_)).mkString(",") val expectedMembers = Seq(first, second, third).sorted.map(address(_)).mkString(",")
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers) awaitAssert(mbeanServer.getAttribute(mbeanName, "Members") must be(expectedMembers))
} }
enterBarrier("fourth-unreachable") enterBarrier("fourth-unreachable")
@ -117,7 +117,7 @@ abstract class MBeanSpec
runOn(first, second, third) { runOn(first, second, third) {
awaitMembersUp(3, canNotBePartOfMemberRing = Set(fourthAddress)) awaitMembersUp(3, canNotBePartOfMemberRing = Set(fourthAddress))
assertMembers(clusterView.members, first, second, third) assertMembers(clusterView.members, first, second, third)
awaitCond(mbeanServer.getAttribute(mbeanName, "Unreachable") == "") awaitAssert(mbeanServer.getAttribute(mbeanName, "Unreachable") must be(""))
} }
enterBarrier("after-5") enterBarrier("after-5")
@ -132,15 +132,14 @@ abstract class MBeanSpec
awaitMembersUp(2) awaitMembersUp(2)
assertMembers(clusterView.members, first, second) assertMembers(clusterView.members, first, second)
val expectedMembers = Seq(first, second).sorted.map(address(_)).mkString(",") val expectedMembers = Seq(first, second).sorted.map(address(_)).mkString(",")
awaitCond(mbeanServer.getAttribute(mbeanName, "Members") == expectedMembers) awaitAssert(mbeanServer.getAttribute(mbeanName, "Members") must be(expectedMembers))
} }
runOn(third) { runOn(third) {
awaitCond(cluster.isTerminated) awaitCond(cluster.isTerminated)
// mbean should be unregistered, i.e. throw InstanceNotFoundException // mbean should be unregistered, i.e. throw InstanceNotFoundException
awaitCond(Try { mbeanServer.getMBeanInfo(mbeanName); false } recover { awaitAssert(intercept[InstanceNotFoundException] {
case e: InstanceNotFoundException true mbeanServer.getMBeanInfo(mbeanName)
case _ false })
} get)
} }
enterBarrier("after-6") enterBarrier("after-6")

View file

@ -92,10 +92,9 @@ abstract class MinMembersBeforeUpBase(multiNodeConfig: MultiNodeConfig)
runOn(first) { runOn(first) {
cluster join myself cluster join myself
awaitCond { awaitAssert {
val result = clusterView.status == Joining
clusterView.refreshCurrentState() clusterView.refreshCurrentState()
result clusterView.status must be(Joining)
} }
} }
enterBarrier("first-started") enterBarrier("first-started")
@ -107,10 +106,9 @@ abstract class MinMembersBeforeUpBase(multiNodeConfig: MultiNodeConfig)
} }
runOn(first, second) { runOn(first, second) {
val expectedAddresses = Set(first, second) map address val expectedAddresses = Set(first, second) map address
awaitCond { awaitAssert {
val result = clusterView.members.map(_.address) == expectedAddresses
clusterView.refreshCurrentState() clusterView.refreshCurrentState()
result clusterView.members.map(_.address) must be(expectedAddresses)
} }
clusterView.members.map(_.status) must be(Set(Joining)) clusterView.members.map(_.status) must be(Set(Joining))
// and it should not change // and it should not change

View file

@ -156,7 +156,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
def startClusterNode(): Unit = { def startClusterNode(): Unit = {
if (clusterView.members.isEmpty) { if (clusterView.members.isEmpty) {
cluster join myself cluster join myself
awaitCond(clusterView.members.exists(_.address == address(myself))) awaitAssert(clusterView.members.map(_.address) must contain(address(myself)))
} else } else
clusterView.self clusterView.self
} }
@ -256,13 +256,12 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
timeout: FiniteDuration = 20.seconds): Unit = { timeout: FiniteDuration = 20.seconds): Unit = {
within(timeout) { within(timeout) {
if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set
awaitCond( awaitAssert(canNotBePartOfMemberRing foreach (a clusterView.members.map(_.address) must not contain (a)))
canNotBePartOfMemberRing forall (address !(clusterView.members exists (_.address == address)))) awaitAssert(clusterView.members.size must be(numberOfMembers))
awaitCond(clusterView.members.size == numberOfMembers) awaitAssert(clusterView.members.map(_.status) must be(Set(MemberStatus.Up)))
awaitCond(clusterView.members.forall(_.status == MemberStatus.Up))
// clusterView.leader is updated by LeaderChanged, await that to be updated also // clusterView.leader is updated by LeaderChanged, await that to be updated also
val expectedLeader = clusterView.members.headOption.map(_.address) val expectedLeader = clusterView.members.headOption.map(_.address)
awaitCond(clusterView.leader == expectedLeader) awaitAssert(clusterView.leader must be(expectedLeader))
} }
} }
@ -270,7 +269,7 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
* Wait until the specified nodes have seen the same gossip overview. * Wait until the specified nodes have seen the same gossip overview.
*/ */
def awaitSeenSameState(addresses: Address*): Unit = def awaitSeenSameState(addresses: Address*): Unit =
awaitCond((addresses.toSet -- clusterView.seenBy).isEmpty) awaitAssert((addresses.toSet -- clusterView.seenBy) must be(Set.empty))
/** /**
* Leader according to the address ordering of the roles. * Leader according to the address ordering of the roles.

View file

@ -43,10 +43,10 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
runOn(first, third) { runOn(first, third) {
// verify that the 'second' node is no longer part of the 'members' set // verify that the 'second' node is no longer part of the 'members' set
awaitCond(clusterView.members.forall(_.address != address(second)), reaperWaitingTime) awaitAssert(clusterView.members.map(_.address) must not contain (address(second)), reaperWaitingTime)
// verify that the 'second' node is not part of the 'unreachable' set // verify that the 'second' node is not part of the 'unreachable' set
awaitCond(clusterView.unreachableMembers.forall(_.address != address(second)), reaperWaitingTime) awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (address(second)), reaperWaitingTime)
} }
runOn(second) { runOn(second) {

View file

@ -64,7 +64,7 @@ abstract class NodeLeavingAndExitingSpec
enterBarrier("second-left") enterBarrier("second-left")
val expectedAddresses = roles.toSet map address val expectedAddresses = roles.toSet map address
awaitCond(clusterView.members.map(_.address) == expectedAddresses) awaitAssert(clusterView.members.map(_.address) must be(expectedAddresses))
// Verify that 'second' node is set to EXITING // Verify that 'second' node is set to EXITING
exitingLatch.await exitingLatch.await

View file

@ -38,11 +38,9 @@ abstract class NodeMembershipSpec
runOn(first, second) { runOn(first, second) {
cluster.join(first) cluster.join(first)
awaitCond(clusterView.members.size == 2) awaitAssert(clusterView.members.size must be(2))
assertMembers(clusterView.members, first, second) assertMembers(clusterView.members, first, second)
awaitCond { awaitAssert(clusterView.members.map(_.status) must be(Set(MemberStatus.Up)))
clusterView.members.forall(_.status == MemberStatus.Up)
}
} }
enterBarrier("after-1") enterBarrier("after-1")
@ -54,11 +52,9 @@ abstract class NodeMembershipSpec
cluster.join(first) cluster.join(first)
} }
awaitCond(clusterView.members.size == 3) awaitAssert(clusterView.members.size must be(3))
assertMembers(clusterView.members, first, second, third) assertMembers(clusterView.members, first, second, third)
awaitCond { awaitAssert(clusterView.members.map(_.status) must be(Set(MemberStatus.Up)))
clusterView.members.forall(_.status == MemberStatus.Up)
}
enterBarrier("after-2") enterBarrier("after-2")
} }

View file

@ -92,8 +92,8 @@ abstract class RestartFirstSeedNodeSpec
// now we can join seed1System, seed2, seed3 together // now we can join seed1System, seed2, seed3 together
runOn(seed1) { runOn(seed1) {
Cluster(seed1System).joinSeedNodes(seedNodes) Cluster(seed1System).joinSeedNodes(seedNodes)
awaitCond(Cluster(seed1System).readView.members.size == 3) awaitAssert(Cluster(seed1System).readView.members.size must be(3))
awaitCond(Cluster(seed1System).readView.members.forall(_.status == Up)) awaitAssert(Cluster(seed1System).readView.members.map(_.status) must be(Set(Up)))
} }
runOn(seed2, seed3) { runOn(seed2, seed3) {
cluster.joinSeedNodes(seedNodes) cluster.joinSeedNodes(seedNodes)
@ -108,15 +108,15 @@ abstract class RestartFirstSeedNodeSpec
} }
runOn(seed2, seed3) { runOn(seed2, seed3) {
awaitMembersUp(2, canNotBePartOfMemberRing = Set(seedNodes.head)) awaitMembersUp(2, canNotBePartOfMemberRing = Set(seedNodes.head))
awaitCond(clusterView.unreachableMembers.forall(_.address != seedNodes.head)) awaitAssert(clusterView.unreachableMembers.map(_.address) must not contain (seedNodes.head))
} }
enterBarrier("seed1-shutdown") enterBarrier("seed1-shutdown")
// then start restartedSeed1System, which has the same address as seed1System // then start restartedSeed1System, which has the same address as seed1System
runOn(seed1) { runOn(seed1) {
Cluster(restartedSeed1System).joinSeedNodes(seedNodes) Cluster(restartedSeed1System).joinSeedNodes(seedNodes)
awaitCond(Cluster(restartedSeed1System).readView.members.size == 3) awaitAssert(Cluster(restartedSeed1System).readView.members.size must be(3))
awaitCond(Cluster(restartedSeed1System).readView.members.forall(_.status == Up)) awaitAssert(Cluster(restartedSeed1System).readView.members.map(_.status) must be(Set(Up)))
} }
runOn(seed2, seed3) { runOn(seed2, seed3) {
awaitMembersUp(3) awaitMembersUp(3)

View file

@ -965,19 +965,19 @@ abstract class StressSpec
expectMsgType[ChildrenCount] must be(ChildrenCount(nbrUsedRoles, 0)) expectMsgType[ChildrenCount] must be(ChildrenCount(nbrUsedRoles, 0))
1 to 5 foreach { _ supervisor ! new RuntimeException("Simulated exception") } 1 to 5 foreach { _ supervisor ! new RuntimeException("Simulated exception") }
awaitCond { awaitAssert {
supervisor ! GetChildrenCount supervisor ! GetChildrenCount
val c = expectMsgType[ChildrenCount] val c = expectMsgType[ChildrenCount]
c == ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles) c must be(ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles))
} }
// after 5 restart attempts the children should be stopped // after 5 restart attempts the children should be stopped
supervisor ! new RuntimeException("Simulated exception") supervisor ! new RuntimeException("Simulated exception")
awaitCond { awaitAssert {
supervisor ! GetChildrenCount supervisor ! GetChildrenCount
val c = expectMsgType[ChildrenCount] val c = expectMsgType[ChildrenCount]
// zero children // zero children
c == ChildrenCount(0, 6 * nbrUsedRoles) c must be(ChildrenCount(0, 6 * nbrUsedRoles))
} }
supervisor ! Reset supervisor ! Reset

View file

@ -55,20 +55,18 @@ abstract class TransitionSpec
def seenLatestGossip: Set[RoleName] = clusterView.seenBy flatMap roleName def seenLatestGossip: Set[RoleName] = clusterView.seenBy flatMap roleName
def awaitSeen(addresses: Address*): Unit = awaitCond { def awaitSeen(addresses: Address*): Unit = awaitAssert {
(seenLatestGossip map address) == addresses.toSet (seenLatestGossip map address) must be(addresses.toSet)
} }
def awaitMembers(addresses: Address*): Unit = awaitCond { def awaitMembers(addresses: Address*): Unit = awaitAssert {
val result = memberAddresses == addresses.toSet
clusterView.refreshCurrentState() clusterView.refreshCurrentState()
result memberAddresses must be(addresses.toSet)
} }
def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitCond { def awaitMemberStatus(address: Address, status: MemberStatus): Unit = awaitAssert {
val result = memberStatus(address) == status
clusterView.refreshCurrentState() clusterView.refreshCurrentState()
result memberStatus(address) must be(status)
} }
def leaderActions(): Unit = def leaderActions(): Unit =
@ -133,7 +131,7 @@ abstract class TransitionSpec
awaitMembers(first, second) awaitMembers(first, second)
awaitMemberStatus(first, Up) awaitMemberStatus(first, Up)
awaitMemberStatus(second, Joining) awaitMemberStatus(second, Joining)
awaitCond(seenLatestGossip == Set(first, second)) awaitAssert(seenLatestGossip must be(Set(first, second)))
} }
enterBarrier("convergence-joining-2") enterBarrier("convergence-joining-2")
@ -148,7 +146,7 @@ abstract class TransitionSpec
runOn(first, second) { runOn(first, second) {
// gossip chat will synchronize the views // gossip chat will synchronize the views
awaitMemberStatus(second, Up) awaitMemberStatus(second, Up)
awaitCond(seenLatestGossip == Set(first, second)) awaitAssert(seenLatestGossip must be(Set(first, second)))
awaitMemberStatus(first, Up) awaitMemberStatus(first, Up)
} }
@ -162,7 +160,7 @@ abstract class TransitionSpec
} }
runOn(second, third) { runOn(second, third) {
// gossip chat from the join will synchronize the views // gossip chat from the join will synchronize the views
awaitCond(seenLatestGossip == Set(second, third)) awaitAssert(seenLatestGossip must be(Set(second, third)))
} }
enterBarrier("third-joined-second") enterBarrier("third-joined-second")
@ -172,7 +170,7 @@ abstract class TransitionSpec
awaitMembers(first, second, third) awaitMembers(first, second, third)
awaitMemberStatus(third, Joining) awaitMemberStatus(third, Joining)
awaitMemberStatus(second, Up) awaitMemberStatus(second, Up)
awaitCond(seenLatestGossip == Set(first, second, third)) awaitAssert(seenLatestGossip must be(Set(first, second, third)))
} }
first gossipTo third first gossipTo third
@ -181,7 +179,7 @@ abstract class TransitionSpec
awaitMemberStatus(first, Up) awaitMemberStatus(first, Up)
awaitMemberStatus(second, Up) awaitMemberStatus(second, Up)
awaitMemberStatus(third, Joining) awaitMemberStatus(third, Joining)
awaitCond(seenLatestGossip == Set(first, second, third)) awaitAssert(seenLatestGossip must be(Set(first, second, third)))
} }
enterBarrier("convergence-joining-3") enterBarrier("convergence-joining-3")
@ -200,7 +198,7 @@ abstract class TransitionSpec
leader12 gossipTo other1 leader12 gossipTo other1
runOn(other1) { runOn(other1) {
awaitMemberStatus(third, Up) awaitMemberStatus(third, Up)
awaitCond(seenLatestGossip == Set(leader12, myself)) awaitAssert(seenLatestGossip must be(Set(leader12, myself)))
} }
// first non-leader gossipTo the other non-leader // first non-leader gossipTo the other non-leader
@ -211,7 +209,7 @@ abstract class TransitionSpec
} }
runOn(other2) { runOn(other2) {
awaitMemberStatus(third, Up) awaitMemberStatus(third, Up)
awaitCond(seenLatestGossip == Set(first, second, third)) awaitAssert(seenLatestGossip must be(Set(first, second, third)))
} }
// first non-leader gossipTo the leader // first non-leader gossipTo the leader
@ -220,7 +218,7 @@ abstract class TransitionSpec
awaitMemberStatus(first, Up) awaitMemberStatus(first, Up)
awaitMemberStatus(second, Up) awaitMemberStatus(second, Up)
awaitMemberStatus(third, Up) awaitMemberStatus(third, Up)
awaitCond(seenLatestGossip == Set(first, second, third)) awaitAssert(seenLatestGossip must be(Set(first, second, third)))
} }
enterBarrier("after-3") enterBarrier("after-3")
@ -230,8 +228,8 @@ abstract class TransitionSpec
runOn(third) { runOn(third) {
markNodeAsUnavailable(second) markNodeAsUnavailable(second)
reapUnreachable() reapUnreachable()
awaitCond(clusterView.unreachableMembers.contains(Member(second, Up, Set.empty))) awaitAssert(clusterView.unreachableMembers must contain(Member(second, Up, Set.empty)))
awaitCond(seenLatestGossip == Set(third)) awaitAssert(seenLatestGossip must be(Set(third)))
} }
enterBarrier("after-second-unavailble") enterBarrier("after-second-unavailble")
@ -239,7 +237,7 @@ abstract class TransitionSpec
third gossipTo first third gossipTo first
runOn(first, third) { runOn(first, third) {
awaitCond(clusterView.unreachableMembers.contains(Member(second, Up, Set.empty))) awaitAssert(clusterView.unreachableMembers must contain(Member(second, Up, Set.empty)))
} }
runOn(first) { runOn(first) {
@ -251,9 +249,9 @@ abstract class TransitionSpec
first gossipTo third first gossipTo third
runOn(first, third) { runOn(first, third) {
awaitCond(clusterView.unreachableMembers.contains(Member(second, Down, Set.empty))) awaitAssert(clusterView.unreachableMembers must contain(Member(second, Down, Set.empty)))
awaitMemberStatus(second, Down) awaitMemberStatus(second, Down)
awaitCond(seenLatestGossip == Set(first, third)) awaitAssert(seenLatestGossip must be(Set(first, third)))
} }
enterBarrier("after-6") enterBarrier("after-6")

View file

@ -98,12 +98,12 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
allButVictim.foreach(markNodeAsUnavailable(_)) allButVictim.foreach(markNodeAsUnavailable(_))
within(30 seconds) { within(30 seconds) {
// victim becomes all alone // victim becomes all alone
awaitCond({ awaitAssert {
val members = clusterView.members val members = clusterView.members
clusterView.unreachableMembers.size == (roles.size - 1) && clusterView.unreachableMembers.size must be(roles.size - 1)
members.size == 1 && members.size must be(1)
members.forall(_.status == MemberStatus.Up) members.map(_.status) must be(Set(MemberStatus.Up))
}) }
clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet) clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet)
} }
} }
@ -112,12 +112,12 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
markNodeAsUnavailable(victim) markNodeAsUnavailable(victim)
within(30 seconds) { within(30 seconds) {
// victim becomes unreachable // victim becomes unreachable
awaitCond({ awaitAssert {
val members = clusterView.members val members = clusterView.members
clusterView.unreachableMembers.size == 1 && clusterView.unreachableMembers.size must be(1)
members.size == (roles.size - 1) && members.size must be(roles.size - 1)
members.forall(_.status == MemberStatus.Up) members.map(_.status) must be(Set(MemberStatus.Up))
}) }
awaitSeenSameState(allButVictim map address: _*) awaitSeenSameState(allButVictim map address: _*)
// still one unreachable // still one unreachable
clusterView.unreachableMembers.size must be(1) clusterView.unreachableMembers.size must be(1)
@ -136,7 +136,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod
runOn(allBut(victim): _*) { runOn(allBut(victim): _*) {
awaitMembersUp(roles.size - 1, Set(victim)) awaitMembersUp(roles.size - 1, Set(victim))
// eventually removed // eventually removed
awaitCond(clusterView.unreachableMembers.isEmpty, 15 seconds) awaitAssert(clusterView.unreachableMembers must be(Set.empty), 15 seconds)
} }
endBarrier endBarrier

View file

@ -116,10 +116,8 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa
val router = system.actorOf(Props[Routee].withRouter(ClusterRouterConfig( val router = system.actorOf(Props[Routee].withRouter(ClusterRouterConfig(
local = AdaptiveLoadBalancingRouter(HeapMetricsSelector), local = AdaptiveLoadBalancingRouter(HeapMetricsSelector),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1, useRole = None))), name) settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 1, useRole = None))), name)
awaitCond { // it may take some time until router receives cluster member events
// it may take some time until router receives cluster member events awaitAssert { currentRoutees(router).size must be(roles.size) }
currentRoutees(router).size == roles.size
}
currentRoutees(router).map(fullAddress).toSet must be(roles.map(address).toSet) currentRoutees(router).map(fullAddress).toSet must be(roles.map(address).toSet)
router router
} }
@ -170,7 +168,6 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa
runOn(first) { runOn(first) {
val router2 = startRouter("router2") val router2 = startRouter("router2")
router2
// collect some metrics before we start // collect some metrics before we start
Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10) Thread.sleep(cluster.settings.MetricsInterval.toMillis * 10)
@ -193,10 +190,8 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa
"create routees from configuration" taggedAs LongRunningTest in { "create routees from configuration" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
val router3 = system.actorOf(Props[Memory].withRouter(FromConfig()), "router3") val router3 = system.actorOf(Props[Memory].withRouter(FromConfig()), "router3")
awaitCond { // it may take some time until router receives cluster member events
// it may take some time until router receives cluster member events awaitAssert { currentRoutees(router3).size must be(9) }
currentRoutees(router3).size == 9
}
currentRoutees(router3).map(fullAddress).toSet must be(Set(address(first))) currentRoutees(router3).map(fullAddress).toSet must be(Set(address(first)))
} }
enterBarrier("after-4") enterBarrier("after-4")
@ -205,10 +200,8 @@ abstract class AdaptiveLoadBalancingRouterSpec extends MultiNodeSpec(AdaptiveLoa
"create routees from cluster.enabled configuration" taggedAs LongRunningTest in { "create routees from cluster.enabled configuration" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
val router4 = system.actorOf(Props[Memory].withRouter(FromConfig()), "router4") val router4 = system.actorOf(Props[Memory].withRouter(FromConfig()), "router4")
awaitCond { // it may take some time until router receives cluster member events
// it may take some time until router receives cluster member events awaitAssert { currentRoutees(router4).size must be(6) }
currentRoutees(router4).size == 6
}
currentRoutees(router4).map(fullAddress).toSet must be(Set( currentRoutees(router4).map(fullAddress).toSet must be(Set(
address(first), address(second), address(third))) address(first), address(second), address(third)))
} }

View file

@ -87,10 +87,8 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
"create routees from configuration" in { "create routees from configuration" in {
runOn(first) { runOn(first) {
awaitCond { // it may take some time until router receives cluster member events
// it may take some time until router receives cluster member events awaitAssert { currentRoutees(router1).size must be(4) }
currentRoutees(router1).size == 4
}
currentRoutees(router1).map(fullAddress).toSet must be(Set(address(first), address(second))) currentRoutees(router1).map(fullAddress).toSet must be(Set(address(first), address(second)))
} }
enterBarrier("after-2") enterBarrier("after-2")
@ -111,10 +109,8 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
awaitClusterUp(first, second, third) awaitClusterUp(first, second, third)
runOn(first) { runOn(first) {
awaitCond { // it may take some time until router receives cluster member events
// it may take some time until router receives cluster member events awaitAssert { currentRoutees(router1).size must be(6) }
currentRoutees(router1).size == 6
}
currentRoutees(router1).map(fullAddress).toSet must be(roles.map(address).toSet) currentRoutees(router1).map(fullAddress).toSet must be(roles.map(address).toSet)
} }
@ -125,10 +121,8 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
runOn(first) { runOn(first) {
val router2 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(local = ConsistentHashingRouter(), val router2 = system.actorOf(Props[Echo].withRouter(ClusterRouterConfig(local = ConsistentHashingRouter(),
settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2, useRole = None))), "router2") settings = ClusterRouterSettings(totalInstances = 10, maxInstancesPerNode = 2, useRole = None))), "router2")
awaitCond { // it may take some time until router receives cluster member events
// it may take some time until router receives cluster member events awaitAssert { currentRoutees(router2).size must be(6) }
currentRoutees(router2).size == 6
}
currentRoutees(router2).map(fullAddress).toSet must be(roles.map(address).toSet) currentRoutees(router2).map(fullAddress).toSet must be(roles.map(address).toSet)
} }
@ -166,10 +160,8 @@ abstract class ClusterConsistentHashingRouterSpec extends MultiNodeSpec(ClusterC
} }
def assertHashMapping(router: ActorRef): Unit = { def assertHashMapping(router: ActorRef): Unit = {
awaitCond { // it may take some time until router receives cluster member events
// it may take some time until router receives cluster member events awaitAssert { currentRoutees(router).size must be(6) }
currentRoutees(router).size == 6
}
currentRoutees(router).map(fullAddress).toSet must be(roles.map(address).toSet) currentRoutees(router).map(fullAddress).toSet must be(roles.map(address).toSet)
router ! "a" router ! "a"

View file

@ -137,7 +137,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
router1.isInstanceOf[RoutedActorRef] must be(true) router1.isInstanceOf[RoutedActorRef] must be(true)
// max-nr-of-instances-per-node=2 times 2 nodes // max-nr-of-instances-per-node=2 times 2 nodes
awaitCond(currentRoutees(router1).size == 4) awaitAssert(currentRoutees(router1).size must be(4))
val iterationCount = 10 val iterationCount = 10
for (i 0 until iterationCount) { for (i 0 until iterationCount) {
@ -165,7 +165,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
runOn(first) { runOn(first) {
// 2 nodes, 1 routee on each node // 2 nodes, 1 routee on each node
awaitCond(currentRoutees(router4).size == 2) awaitAssert(currentRoutees(router4).size must be(2))
val iterationCount = 10 val iterationCount = 10
for (i 0 until iterationCount) { for (i 0 until iterationCount) {
@ -191,7 +191,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
runOn(first) { runOn(first) {
// max-nr-of-instances-per-node=2 times 4 nodes // max-nr-of-instances-per-node=2 times 4 nodes
awaitCond(currentRoutees(router1).size == 8) awaitAssert(currentRoutees(router1).size must be(8))
val iterationCount = 10 val iterationCount = 10
for (i 0 until iterationCount) { for (i 0 until iterationCount) {
@ -213,7 +213,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
runOn(first) { runOn(first) {
// 4 nodes, 1 routee on each node // 4 nodes, 1 routee on each node
awaitCond(currentRoutees(router4).size == 4) awaitAssert(currentRoutees(router4).size must be(4))
val iterationCount = 10 val iterationCount = 10
for (i 0 until iterationCount) { for (i 0 until iterationCount) {
@ -233,7 +233,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
runOn(first) { runOn(first) {
// max-nr-of-instances-per-node=1 times 3 nodes // max-nr-of-instances-per-node=1 times 3 nodes
awaitCond(currentRoutees(router3).size == 3) awaitAssert(currentRoutees(router3).size must be(3))
val iterationCount = 10 val iterationCount = 10
for (i 0 until iterationCount) { for (i 0 until iterationCount) {
@ -255,7 +255,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
"deploy routees to specified node role" taggedAs LongRunningTest in { "deploy routees to specified node role" taggedAs LongRunningTest in {
runOn(first) { runOn(first) {
awaitCond(currentRoutees(router5).size == 2) awaitAssert(currentRoutees(router5).size must be(2))
val iterationCount = 10 val iterationCount = 10
for (i 0 until iterationCount) { for (i 0 until iterationCount) {
@ -280,7 +280,7 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
router2.isInstanceOf[RoutedActorRef] must be(true) router2.isInstanceOf[RoutedActorRef] must be(true)
// totalInstances = 3, maxInstancesPerNode = 1 // totalInstances = 3, maxInstancesPerNode = 1
awaitCond(currentRoutees(router2).size == 3) awaitAssert(currentRoutees(router2).size must be(3))
val iterationCount = 10 val iterationCount = 10
for (i 0 until iterationCount) { for (i 0 until iterationCount) {
@ -311,8 +311,9 @@ abstract class ClusterRoundRobinRoutedActorSpec extends MultiNodeSpec(ClusterRou
val downAddress = routeeAddresses.find(_ != address(first)).get val downAddress = routeeAddresses.find(_ != address(first)).get
cluster.down(downAddress) cluster.down(downAddress)
awaitCond { awaitAssert {
routeeAddresses.contains(notUsedAddress) && !routeeAddresses.contains(downAddress) routeeAddresses must contain(notUsedAddress)
routeeAddresses must not contain (downAddress)
} }
val iterationCount = 10 val iterationCount = 10

View file

@ -38,8 +38,7 @@ object ClusterSpec {
class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
import ClusterSpec._ import ClusterSpec._
// FIXME: temporary workaround. See #2663 val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val selfAddress = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[ClusterActorRefProvider].transport.defaultAddress
val cluster = Cluster(system) val cluster = Cluster(system)
def clusterView = cluster.readView def clusterView = cluster.readView
@ -67,7 +66,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
awaitCond(clusterView.isSingletonCluster) awaitCond(clusterView.isSingletonCluster)
clusterView.self.address must be(selfAddress) clusterView.self.address must be(selfAddress)
clusterView.members.map(_.address) must be(Set(selfAddress)) clusterView.members.map(_.address) must be(Set(selfAddress))
awaitCond(clusterView.status == MemberStatus.Up) awaitAssert(clusterView.status must be(MemberStatus.Up))
} }
"publish CurrentClusterState to subscribers when requested" in { "publish CurrentClusterState to subscribers when requested" in {