Port LeaderElectionSpec to MultiNodeSpec. See #2113
This commit is contained in:
parent
57313cc9e0
commit
597271052f
2 changed files with 110 additions and 122 deletions
|
|
@ -4,128 +4,100 @@
|
|||
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.dispatch._
|
||||
import akka.actor._
|
||||
import akka.remote._
|
||||
import akka.util.duration._
|
||||
|
||||
import com.typesafe.config._
|
||||
object LeaderElectionMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val forth = role("forth")
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.auto-down = off
|
||||
""")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
}
|
||||
|
||||
class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec
|
||||
class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec
|
||||
class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec
|
||||
class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec
|
||||
|
||||
abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec {
|
||||
import LeaderElectionMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 4
|
||||
|
||||
val firstAddress = node(first).address
|
||||
val myAddress = node(mySelf).address
|
||||
|
||||
// sorted in the order used by the cluster
|
||||
val roles = Seq(first, second, third, forth).sorted
|
||||
|
||||
"A cluster of three nodes" must {
|
||||
|
||||
"be able to 'elect' a single leader" in {
|
||||
// make sure that the first cluster is started before other join
|
||||
runOn(first) {
|
||||
cluster
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
cluster.join(firstAddress)
|
||||
awaitUpConvergence(numberOfMembers = 4)
|
||||
cluster.isLeader must be(mySelf == roles.head)
|
||||
testConductor.enter("after")
|
||||
}
|
||||
|
||||
def shutdownLeaderAndVerifyNewLeader(alreadyShutdown: Int): Unit = {
|
||||
val currentRoles = roles.drop(alreadyShutdown)
|
||||
currentRoles.size must be >= (2)
|
||||
|
||||
runOn(currentRoles.head) {
|
||||
cluster.shutdown()
|
||||
testConductor.enter("after-shutdown")
|
||||
testConductor.enter("after-down")
|
||||
}
|
||||
|
||||
// runOn previously shutdown cluster nodes
|
||||
if ((roles diff currentRoles).contains(mySelf)) {
|
||||
testConductor.enter("after-shutdown")
|
||||
testConductor.enter("after-down")
|
||||
}
|
||||
|
||||
// runOn remaining cluster nodes
|
||||
if (currentRoles.tail.contains(mySelf)) {
|
||||
|
||||
testConductor.enter("after-shutdown")
|
||||
|
||||
runOn(currentRoles.last) {
|
||||
// user marks the shutdown leader as DOWN
|
||||
val leaderAddress = node(currentRoles.head).address
|
||||
cluster.down(leaderAddress)
|
||||
}
|
||||
|
||||
testConductor.enter("after-down")
|
||||
|
||||
awaitUpConvergence(currentRoles.size - 1)
|
||||
val nextExpectedLeader = currentRoles.tail.head
|
||||
cluster.isLeader must be(mySelf == nextExpectedLeader)
|
||||
}
|
||||
|
||||
testConductor.enter("after")
|
||||
}
|
||||
|
||||
"be able to 're-elect' a single leader after leader has left" in {
|
||||
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 0)
|
||||
}
|
||||
|
||||
"be able to 're-elect' a single leader after leader has left (again)" in {
|
||||
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1)
|
||||
}
|
||||
}
|
||||
|
||||
class LeaderElectionSpec extends ClusterSpec with ImplicitSender {
|
||||
val portPrefix = 5
|
||||
|
||||
var node1: Cluster = _
|
||||
var node2: Cluster = _
|
||||
var node3: Cluster = _
|
||||
|
||||
var system1: ActorSystemImpl = _
|
||||
var system2: ActorSystemImpl = _
|
||||
var system3: ActorSystemImpl = _
|
||||
|
||||
try {
|
||||
"A cluster of three nodes" must {
|
||||
|
||||
// ======= NODE 1 ========
|
||||
system1 = ActorSystem("system1", ConfigFactory
|
||||
.parseString("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty.port = %d550
|
||||
}""".format(portPrefix))
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node1 = Cluster(system1)
|
||||
val address1 = node1.remoteAddress
|
||||
|
||||
// ======= NODE 2 ========
|
||||
system2 = ActorSystem("system2", ConfigFactory
|
||||
.parseString("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty.port = %d551
|
||||
cluster.node-to-join = "akka://system1@localhost:%d550"
|
||||
}""".format(portPrefix, portPrefix))
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node2 = Cluster(system2)
|
||||
val address2 = node2.remoteAddress
|
||||
|
||||
// ======= NODE 3 ========
|
||||
system3 = ActorSystem("system3", ConfigFactory
|
||||
.parseString("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty.port = %d552
|
||||
cluster.node-to-join = "akka://system1@localhost:%d550"
|
||||
}""".format(portPrefix, portPrefix))
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node3 = Cluster(system3)
|
||||
val address3 = node3.remoteAddress
|
||||
|
||||
"be able to 'elect' a single leader" taggedAs LongRunningTest in {
|
||||
|
||||
println("Give the system time to converge...")
|
||||
awaitConvergence(node1 :: node2 :: node3 :: Nil)
|
||||
|
||||
// check leader
|
||||
node1.isLeader must be(true)
|
||||
node2.isLeader must be(false)
|
||||
node3.isLeader must be(false)
|
||||
}
|
||||
|
||||
"be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in {
|
||||
|
||||
// shut down system1 - the leader
|
||||
node1.shutdown()
|
||||
system1.shutdown()
|
||||
|
||||
// user marks node1 as DOWN
|
||||
node2.down(address1)
|
||||
|
||||
println("Give the system time to converge...")
|
||||
Thread.sleep(10.seconds.dilated.toMillis)
|
||||
awaitConvergence(node2 :: node3 :: Nil)
|
||||
|
||||
// check leader
|
||||
node2.isLeader must be(true)
|
||||
node3.isLeader must be(false)
|
||||
}
|
||||
|
||||
"be able to 're-elect' a single leader after leader has left (again, leaving a single node)" taggedAs LongRunningTest in {
|
||||
|
||||
// shut down system1 - the leader
|
||||
node2.shutdown()
|
||||
system2.shutdown()
|
||||
|
||||
// user marks node2 as DOWN
|
||||
node3.down(address2)
|
||||
|
||||
println("Give the system time to converge...")
|
||||
Thread.sleep(10.seconds.dilated.toMillis)
|
||||
awaitConvergence(node3 :: Nil)
|
||||
|
||||
// check leader
|
||||
node3.isLeader must be(true)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
e.printStackTrace
|
||||
fail(e.toString)
|
||||
}
|
||||
|
||||
override def atTermination() {
|
||||
if (node1 ne null) node1.shutdown()
|
||||
if (system1 ne null) system1.shutdown()
|
||||
|
||||
if (node2 ne null) node2.shutdown()
|
||||
if (system2 ne null) system2.shutdown()
|
||||
|
||||
if (node3 ne null) node3.shutdown()
|
||||
if (system3 ne null) system3.shutdown()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue