Harden ClusterSingletonManagerSpec, see #3116
* One of the test steps was not deterministic and verification too strict * Created a separate test for the chaotic scenarios
This commit is contained in:
parent
370d6451c7
commit
2cba3606ae
2 changed files with 180 additions and 45 deletions
|
|
@ -0,0 +1,151 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.contrib.pattern
|
||||
|
||||
import language.postfixOps
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorLogging
|
||||
import akka.actor.ActorRef
|
||||
import akka.actor.Address
|
||||
import akka.actor.Props
|
||||
import akka.actor.PoisonPill
|
||||
import akka.actor.RootActorPath
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.ClusterEvent._
|
||||
import akka.cluster.Member
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.remote.testkit.STMultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.testkit.TestEvent._
|
||||
import akka.actor.Terminated
|
||||
|
||||
object ClusterSingletonManagerChaosSpec extends MultiNodeConfig {
|
||||
val controller = role("controller")
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
val fifth = role("fifth")
|
||||
val sixth = role("sixth")
|
||||
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
akka.loglevel = INFO
|
||||
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
|
||||
akka.remote.log-remote-lifecycle-events = off
|
||||
akka.cluster.auto-join = off
|
||||
akka.cluster.auto-down = on
|
||||
"""))
|
||||
|
||||
case object EchoStarted
|
||||
/**
|
||||
* The singleton actor
|
||||
*/
|
||||
class Echo(testActor: ActorRef) extends Actor {
|
||||
testActor ! EchoStarted
|
||||
|
||||
def receive = {
|
||||
case _ ⇒ sender ! self
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class ClusterSingletonManagerChaosMultiJvmNode1 extends ClusterSingletonManagerChaosSpec
|
||||
class ClusterSingletonManagerChaosMultiJvmNode2 extends ClusterSingletonManagerChaosSpec
|
||||
class ClusterSingletonManagerChaosMultiJvmNode3 extends ClusterSingletonManagerChaosSpec
|
||||
class ClusterSingletonManagerChaosMultiJvmNode4 extends ClusterSingletonManagerChaosSpec
|
||||
class ClusterSingletonManagerChaosMultiJvmNode5 extends ClusterSingletonManagerChaosSpec
|
||||
class ClusterSingletonManagerChaosMultiJvmNode6 extends ClusterSingletonManagerChaosSpec
|
||||
class ClusterSingletonManagerChaosMultiJvmNode7 extends ClusterSingletonManagerChaosSpec
|
||||
|
||||
class ClusterSingletonManagerChaosSpec extends MultiNodeSpec(ClusterSingletonManagerChaosSpec) with STMultiNodeSpec with ImplicitSender {
|
||||
import ClusterSingletonManagerChaosSpec._
|
||||
|
||||
override def initialParticipants = roles.size
|
||||
|
||||
// Sort the roles in the order used by the cluster.
|
||||
lazy val sortedClusterRoles: immutable.IndexedSeq[RoleName] = {
|
||||
implicit val clusterOrdering: Ordering[RoleName] = new Ordering[RoleName] {
|
||||
import Member.addressOrdering
|
||||
def compare(x: RoleName, y: RoleName) =
|
||||
addressOrdering.compare(node(x).address, node(y).address)
|
||||
}
|
||||
roles.filterNot(_ == controller).toVector.sorted
|
||||
}
|
||||
|
||||
def join(from: RoleName, to: RoleName): Unit = {
|
||||
runOn(from) {
|
||||
Cluster(system) join node(to).address
|
||||
createSingleton()
|
||||
}
|
||||
}
|
||||
|
||||
def createSingleton(): ActorRef = {
|
||||
system.actorOf(Props(new ClusterSingletonManager(
|
||||
singletonProps = handOverData ⇒ Props(new Echo(testActor)),
|
||||
singletonName = "echo",
|
||||
terminationMessage = PoisonPill)),
|
||||
name = "singleton")
|
||||
}
|
||||
|
||||
def crash(roles: RoleName*): Unit = {
|
||||
runOn(controller) {
|
||||
roles foreach { r ⇒
|
||||
log.info("Shutdown [{}]", node(r).address)
|
||||
testConductor.shutdown(r, 0).await
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def echo(leader: RoleName): ActorRef =
|
||||
system.actorFor(RootActorPath(node(leader).address) / "user" / "singleton" / "echo")
|
||||
|
||||
def verify(leader: RoleName): Unit = {
|
||||
enterBarrier("before-" + leader.name + "-verified")
|
||||
runOn(leader) {
|
||||
expectMsg(EchoStarted)
|
||||
}
|
||||
enterBarrier(leader.name + "-active")
|
||||
|
||||
runOn(sortedClusterRoles.filterNot(_ == leader): _*) {
|
||||
ignoreMsg { case EchoStarted ⇒ true }
|
||||
echo(leader) ! "hello"
|
||||
expectMsgType[ActorRef].path.address must be(node(leader).address)
|
||||
ignoreNoMsg()
|
||||
}
|
||||
enterBarrier(leader.name + "-verified")
|
||||
}
|
||||
|
||||
"A ClusterSingletonManager in chaotic cluster" must {
|
||||
|
||||
"startup 3 node cluster" in within(90 seconds) {
|
||||
log.info("Sorted cluster nodes [{}]", sortedClusterRoles.map(node(_).address).mkString(", "))
|
||||
|
||||
join(sortedClusterRoles(5), sortedClusterRoles.last)
|
||||
join(sortedClusterRoles(4), sortedClusterRoles.last)
|
||||
join(sortedClusterRoles(3), sortedClusterRoles.last)
|
||||
|
||||
verify(sortedClusterRoles(3))
|
||||
}
|
||||
|
||||
"hand over when joining 3 more nodes" in within(90 seconds) {
|
||||
join(sortedClusterRoles(2), sortedClusterRoles(3))
|
||||
join(sortedClusterRoles(1), sortedClusterRoles(4))
|
||||
join(sortedClusterRoles(0), sortedClusterRoles(5))
|
||||
|
||||
verify(sortedClusterRoles(0))
|
||||
}
|
||||
|
||||
"take over when three leaders crash in 6 nodes cluster" in within(90 seconds) {
|
||||
crash(sortedClusterRoles(0), sortedClusterRoles(1), sortedClusterRoles(2))
|
||||
verify(sortedClusterRoles(3))
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -22,7 +22,6 @@ import akka.remote.testkit.MultiNodeConfig
|
|||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.remote.testkit.STMultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.testkit.ImplicitSender
|
||||
import akka.testkit.TestEvent._
|
||||
import akka.actor.Terminated
|
||||
|
||||
|
|
@ -43,8 +42,6 @@ object ClusterSingletonManagerSpec extends MultiNodeConfig {
|
|||
akka.cluster.auto-down = on
|
||||
"""))
|
||||
|
||||
testTransport(on = true)
|
||||
|
||||
object PointToPointChannel {
|
||||
case object RegisterConsumer
|
||||
case object UnregisterConsumer
|
||||
|
|
@ -196,6 +193,13 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
|
|||
|
||||
def queue: ActorRef = system.actorFor(node(controller) / "user" / "queue")
|
||||
|
||||
def join(from: RoleName, to: RoleName): Unit = {
|
||||
runOn(from) {
|
||||
Cluster(system) join node(to).address
|
||||
createSingleton()
|
||||
}
|
||||
}
|
||||
|
||||
def createSingleton(): ActorRef = {
|
||||
//#create-singleton-manager
|
||||
system.actorOf(Props(new ClusterSingletonManager(
|
||||
|
|
@ -255,58 +259,38 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
|
|||
}
|
||||
enterBarrier("queue-started")
|
||||
|
||||
runOn(sortedClusterRoles(5)) {
|
||||
Cluster(system) join node(sortedClusterRoles(5)).address
|
||||
createSingleton()
|
||||
}
|
||||
|
||||
join(sortedClusterRoles.last, sortedClusterRoles.last)
|
||||
verify(sortedClusterRoles.last, msg = 1, expectedCurrent = 0)
|
||||
}
|
||||
|
||||
"hand over when new leader joins to 1 node cluster" in within(15 seconds) {
|
||||
val newLeaderRole = sortedClusterRoles(4)
|
||||
runOn(newLeaderRole) {
|
||||
Cluster(system) join node(sortedClusterRoles.last).address
|
||||
createSingleton()
|
||||
}
|
||||
|
||||
join(newLeaderRole, sortedClusterRoles.last)
|
||||
verify(newLeaderRole, msg = 2, expectedCurrent = 1)
|
||||
}
|
||||
|
||||
"hand over when new leader joins to 2 nodes cluster" in within(15 seconds) {
|
||||
val newLeaderRole = sortedClusterRoles(3)
|
||||
runOn(newLeaderRole) {
|
||||
Cluster(system) join node(sortedClusterRoles.last).address
|
||||
createSingleton()
|
||||
}
|
||||
|
||||
join(newLeaderRole, sortedClusterRoles.last)
|
||||
verify(newLeaderRole, msg = 3, expectedCurrent = 2)
|
||||
}
|
||||
|
||||
"hand over when adding three new potential leaders to 3 nodes cluster" in within(60 seconds) {
|
||||
// this test will result in restart after retry timeout
|
||||
// because the new leader will not know about the real previous leader and the
|
||||
// previous leader sortedClusterRoles(3) will first think that sortedClusterRoles(2)
|
||||
// is the new leader
|
||||
runOn(controller) {
|
||||
queue ! Reset
|
||||
expectMsg(ResetOk)
|
||||
}
|
||||
runOn(sortedClusterRoles(2)) {
|
||||
// previous leader
|
||||
Cluster(system) join node(sortedClusterRoles(3)).address
|
||||
createSingleton()
|
||||
}
|
||||
runOn(sortedClusterRoles(1)) {
|
||||
Cluster(system) join node(sortedClusterRoles(4)).address
|
||||
createSingleton()
|
||||
}
|
||||
runOn(sortedClusterRoles(0)) {
|
||||
Cluster(system) join node(sortedClusterRoles(5)).address
|
||||
createSingleton()
|
||||
"hand over when new leader joins to 3 nodes cluster" in within(15 seconds) {
|
||||
val newLeaderRole = sortedClusterRoles(2)
|
||||
join(newLeaderRole, sortedClusterRoles.last)
|
||||
verify(newLeaderRole, msg = 4, expectedCurrent = 3)
|
||||
}
|
||||
|
||||
verify(sortedClusterRoles(0), msg = 4, expectedCurrent = 0)
|
||||
"hand over when new leader joins to 4 nodes cluster" in within(15 seconds) {
|
||||
val newLeaderRole = sortedClusterRoles(1)
|
||||
join(newLeaderRole, sortedClusterRoles.last)
|
||||
verify(newLeaderRole, msg = 5, expectedCurrent = 4)
|
||||
}
|
||||
|
||||
"hand over when new leader joins to 5 nodes cluster" in within(15 seconds) {
|
||||
val newLeaderRole = sortedClusterRoles(0)
|
||||
join(newLeaderRole, sortedClusterRoles.last)
|
||||
verify(newLeaderRole, msg = 6, expectedCurrent = 5)
|
||||
}
|
||||
|
||||
"hand over when leader leaves in 6 nodes cluster " in within(30 seconds) {
|
||||
|
|
@ -319,7 +303,7 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
|
|||
}
|
||||
//#test-leave
|
||||
|
||||
verify(newLeaderRole, msg = 5, expectedCurrent = 4)
|
||||
verify(newLeaderRole, msg = 7, expectedCurrent = 6)
|
||||
|
||||
runOn(leaveRole) {
|
||||
val singleton = system.actorFor("/user/singleton")
|
||||
|
|
@ -337,17 +321,17 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
|
|||
enterBarrier("logs-muted")
|
||||
|
||||
crash(sortedClusterRoles(1))
|
||||
verify(sortedClusterRoles(2), msg = 6, expectedCurrent = 0)
|
||||
verify(sortedClusterRoles(2), msg = 8, expectedCurrent = 0)
|
||||
}
|
||||
|
||||
"take over when two leaders crash in 3 nodes cluster" in within(45 seconds) {
|
||||
crash(sortedClusterRoles(2), sortedClusterRoles(3))
|
||||
verify(sortedClusterRoles(4), msg = 7, expectedCurrent = 0)
|
||||
verify(sortedClusterRoles(4), msg = 9, expectedCurrent = 0)
|
||||
}
|
||||
|
||||
"take over when leader crashes in 2 nodes cluster" in within(25 seconds) {
|
||||
crash(sortedClusterRoles(4))
|
||||
verify(sortedClusterRoles(5), msg = 6, expectedCurrent = 0)
|
||||
verify(sortedClusterRoles(5), msg = 10, expectedCurrent = 0)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue