Merge pull request #568 from akka/wip-2287-LeaderLeavingSpec-patriknw
Fix time sensitivity in LeaderLeavingSpec, see #2287
This commit is contained in:
commit
98d78db80a
2 changed files with 77 additions and 50 deletions
|
|
@ -18,10 +18,8 @@ object LeaderLeavingMultiJvmSpec extends MultiNodeConfig {
|
|||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
|
||||
unreachable-nodes-reaper-interval = 30 s
|
||||
}""")
|
||||
# turn off unreachable reaper
|
||||
akka.cluster.unreachable-nodes-reaper-interval = 300 s""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
}
|
||||
|
||||
|
|
@ -35,7 +33,7 @@ abstract class LeaderLeavingSpec
|
|||
|
||||
import LeaderLeavingMultiJvmSpec._
|
||||
|
||||
val leaderHandoffWaitingTime = 30.seconds.dilated
|
||||
val leaderHandoffWaitingTime = 30.seconds
|
||||
|
||||
"A LEADER that is LEAVING" must {
|
||||
|
||||
|
|
@ -45,41 +43,60 @@ abstract class LeaderLeavingSpec
|
|||
|
||||
val oldLeaderAddress = cluster.leader
|
||||
|
||||
if (cluster.isLeader) {
|
||||
within(leaderHandoffWaitingTime) {
|
||||
|
||||
cluster.leave(oldLeaderAddress)
|
||||
enterBarrier("leader-left")
|
||||
if (cluster.isLeader) {
|
||||
|
||||
// verify that a NEW LEADER have taken over
|
||||
awaitCond(!cluster.isLeader)
|
||||
enterBarrier("registered-listener")
|
||||
|
||||
// verify that the LEADER is shut down
|
||||
awaitCond(!cluster.isRunning, 30.seconds.dilated)
|
||||
cluster.leave(oldLeaderAddress)
|
||||
enterBarrier("leader-left")
|
||||
|
||||
// verify that the LEADER is REMOVED
|
||||
awaitCond(cluster.status == MemberStatus.Removed)
|
||||
// verify that a NEW LEADER have taken over
|
||||
awaitCond(!cluster.isLeader)
|
||||
|
||||
} else {
|
||||
// verify that the LEADER is shut down
|
||||
awaitCond(!cluster.isRunning)
|
||||
|
||||
enterBarrier("leader-left")
|
||||
// verify that the LEADER is REMOVED
|
||||
awaitCond(cluster.status == MemberStatus.Removed)
|
||||
|
||||
// verify that the LEADER is LEAVING
|
||||
awaitCond(cluster.latestGossip.members.exists(m ⇒ m.status == MemberStatus.Leaving && m.address == oldLeaderAddress), leaderHandoffWaitingTime) // wait on LEAVING
|
||||
} else {
|
||||
|
||||
// verify that the LEADER is EXITING
|
||||
awaitCond(cluster.latestGossip.members.exists(m ⇒ m.status == MemberStatus.Exiting && m.address == oldLeaderAddress), leaderHandoffWaitingTime) // wait on EXITING
|
||||
val leavingLatch = TestLatch()
|
||||
val exitingLatch = TestLatch()
|
||||
val expectedAddresses = roles.toSet map address
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
def check(status: MemberStatus): Boolean =
|
||||
(members.map(_.address) == expectedAddresses &&
|
||||
members.exists(m ⇒ m.address == oldLeaderAddress && m.status == status))
|
||||
if (check(MemberStatus.Leaving)) leavingLatch.countDown()
|
||||
if (check(MemberStatus.Exiting)) exitingLatch.countDown()
|
||||
}
|
||||
})
|
||||
enterBarrier("registered-listener")
|
||||
|
||||
// verify that the LEADER is no longer part of the 'members' set
|
||||
awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress), leaderHandoffWaitingTime)
|
||||
enterBarrier("leader-left")
|
||||
|
||||
// verify that the LEADER is not part of the 'unreachable' set
|
||||
awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress), leaderHandoffWaitingTime)
|
||||
// verify that the LEADER is LEAVING
|
||||
leavingLatch.await
|
||||
|
||||
// verify that we have a new LEADER
|
||||
awaitCond(cluster.leader != oldLeaderAddress, leaderHandoffWaitingTime)
|
||||
// verify that the LEADER is EXITING
|
||||
exitingLatch.await
|
||||
|
||||
// verify that the LEADER is no longer part of the 'members' set
|
||||
awaitCond(cluster.latestGossip.members.forall(_.address != oldLeaderAddress))
|
||||
|
||||
// verify that the LEADER is not part of the 'unreachable' set
|
||||
awaitCond(cluster.latestGossip.overview.unreachable.forall(_.address != oldLeaderAddress))
|
||||
|
||||
// verify that we have a new LEADER
|
||||
awaitCond(cluster.leader != oldLeaderAddress)
|
||||
}
|
||||
|
||||
enterBarrier("finished")
|
||||
}
|
||||
|
||||
enterBarrier("finished")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,11 +18,8 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
|
|||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
|
||||
unreachable-nodes-reaper-interval = 300 s # turn "off"
|
||||
}
|
||||
""")
|
||||
# turn off unreachable reaper
|
||||
akka.cluster.unreachable-nodes-reaper-interval = 300 s""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
}
|
||||
|
||||
|
|
@ -42,26 +39,39 @@ abstract class NodeLeavingAndExitingSpec
|
|||
|
||||
awaitClusterUp(first, second, third)
|
||||
|
||||
runOn(first) {
|
||||
cluster.leave(second)
|
||||
}
|
||||
enterBarrier("second-left")
|
||||
|
||||
runOn(first, third) {
|
||||
val secondAddess = address(second)
|
||||
val leavingLatch = TestLatch()
|
||||
val exitingLatch = TestLatch()
|
||||
val expectedAddresses = roles.toSet map address
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
def check(status: MemberStatus): Boolean =
|
||||
(members.map(_.address) == expectedAddresses &&
|
||||
members.exists(m ⇒ m.address == secondAddess && m.status == status))
|
||||
if (check(MemberStatus.Leaving)) leavingLatch.countDown()
|
||||
if (check(MemberStatus.Exiting)) exitingLatch.countDown()
|
||||
}
|
||||
})
|
||||
enterBarrier("registered-listener")
|
||||
|
||||
// 1. Verify that 'second' node is set to LEAVING
|
||||
// We have set the 'leader-actions-interval' to 5 seconds to make sure that we get a
|
||||
// chance to test the LEAVING state before the leader moves the node to EXITING
|
||||
awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) // wait on LEAVING
|
||||
val hasLeft = cluster.latestGossip.members.find(_.status == MemberStatus.Leaving) // verify node that left
|
||||
hasLeft must be('defined)
|
||||
hasLeft.get.address must be(address(second))
|
||||
runOn(third) {
|
||||
cluster.leave(second)
|
||||
}
|
||||
enterBarrier("second-left")
|
||||
|
||||
// 2. Verify that 'second' node is set to EXITING
|
||||
awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Exiting)) // wait on EXITING
|
||||
val hasExited = cluster.latestGossip.members.find(_.status == MemberStatus.Exiting) // verify node that exited
|
||||
hasExited must be('defined)
|
||||
hasExited.get.address must be(address(second))
|
||||
// Verify that 'second' node is set to LEAVING
|
||||
leavingLatch.await
|
||||
|
||||
// Verify that 'second' node is set to EXITING
|
||||
exitingLatch.await
|
||||
|
||||
}
|
||||
|
||||
// node that is leaving
|
||||
runOn(second) {
|
||||
enterBarrier("registered-listener")
|
||||
enterBarrier("second-left")
|
||||
}
|
||||
|
||||
enterBarrier("finished")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue