Merge pull request #483 from akka/wip-2113-LeaderElectionSpec-patriknw

Port LeaderElectionSpec to MultiNodeSpec. See #2113
This commit is contained in:
patriknw 2012-05-28 04:56:29 -07:00
commit 12bd97b66d
3 changed files with 126 additions and 133 deletions

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
object LeaderElectionMultiJvmSpec extends MultiNodeConfig {
val controller = role("controller")
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster.auto-down = off
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec
class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec
class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec
class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec
class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec
abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec {
import LeaderElectionMultiJvmSpec._
override def initialParticipants = 5
lazy val firstAddress = node(first).address
// sorted in the order used by the cluster
lazy val roles = Seq(first, second, third, fourth).sorted
"A cluster of four nodes" must {
"be able to 'elect' a single leader" taggedAs LongRunningTest in {
// make sure that the node-to-join is started before other join
runOn(first) {
cluster
}
testConductor.enter("first-started")
if (mySelf != controller) {
cluster.join(firstAddress)
awaitUpConvergence(numberOfMembers = roles.size)
cluster.isLeader must be(mySelf == roles.head)
}
testConductor.enter("after")
}
def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = {
val currentRoles = roles.drop(alreadyShutdown)
currentRoles.size must be >= (2)
val leader = currentRoles.head
val aUser = currentRoles.last
mySelf match {
case `controller`
testConductor.enter("before-shutdown")
testConductor.shutdown(leader, 0)
testConductor.removeNode(leader)
testConductor.enter("after-shutdown", "after-down", "completed")
case `leader`
testConductor.enter("before-shutdown")
// this node will be shutdown by the controller and doesn't participate in more barriers
case `aUser`
val leaderAddress = node(leader).address
testConductor.enter("before-shutdown", "after-shutdown")
// user marks the shutdown leader as DOWN
cluster.down(leaderAddress)
testConductor.enter("after-down", "completed")
case _ if currentRoles.tail.contains(mySelf)
// remaining cluster nodes, not shutdown
testConductor.enter("before-shutdown", "after-shutdown", "after-down")
awaitUpConvergence(currentRoles.size - 1)
val nextExpectedLeader = currentRoles.tail.head
cluster.isLeader must be(mySelf == nextExpectedLeader)
testConductor.enter("completed")
}
}
"be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in {
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0)
}
"be able to 're-elect' a single leader after leader has left (again)" taggedAs LongRunningTest in {
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1)
}
}
}

View file

@ -45,8 +45,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
*/
def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(mySelf)) {
nodesInCluster.length must not be (0)
import Member.addressOrdering
val expectedLeader = nodesInCluster.map(role (role, node(role).address)).sortBy(_._2).head._1
val expectedLeader = roleOfLeader(nodesInCluster)
cluster.isLeader must be(ifNode(expectedLeader)(true)(false))
}
@ -60,4 +59,21 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
awaitCond(cluster.convergence.isDefined, 10 seconds)
}
def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = {
nodesInCluster.length must not be (0)
nodesInCluster.sorted.head
}
/**
* Sort the roles in the order used by the cluster.
*/
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
import Member.addressOrdering
def compare(x: RoleName, y: RoleName) = addressOrdering.compare(node(x).address, node(y).address)
}
def roleName(address: Address): Option[RoleName] = {
testConductor.getNodes.await.find(node(_).address == address)
}
}