pekko/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala

147 lines
5.2 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import language.postfixOps
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.actor.Address
import akka.remote.testconductor.{ RoleName, Direction }
2012-07-22 21:40:09 +02:00
import scala.concurrent.util.duration._
object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy
class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy
class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy
abstract class UnreachableNodeRejoinsClusterSpec
extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec)
2012-06-28 14:52:12 +02:00
with MultiNodeClusterSpec {
import UnreachableNodeRejoinsClusterMultiJvmSpec._
2012-06-28 14:52:12 +02:00
def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = {
roles.filterNot(_ == role)
}
lazy val sortedRoles = roles.sorted
2012-06-07 10:37:04 +02:00
lazy val master = sortedRoles(0)
lazy val victim = sortedRoles(1)
var endBarrierNumber = 0
2012-06-07 10:37:04 +02:00
def endBarrier: Unit = {
endBarrierNumber += 1
enterBarrier("after_" + endBarrierNumber)
}
2012-06-28 14:52:12 +02:00
"A cluster of " + roles.size + " members" must {
"reach initial convergence" taggedAs LongRunningTest in {
awaitClusterUp(roles: _*)
endBarrier
}
"mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in {
// let them send at least one heartbeat to each other after the gossip convergence
// because for new joining nodes we remove them from the failure detector when
// receive gossip
2012-07-25 18:02:45 +02:00
Thread.sleep(2.seconds.dilated.toMillis)
runOn(first) {
// pull network for victim node from all nodes
allBut(victim).foreach { roleName
testConductor.blackhole(victim, roleName, Direction.Both).await
}
}
enterBarrier("unplug_victim")
2012-06-28 14:52:12 +02:00
val allButVictim = allBut(victim, sortedRoles)
runOn(victim) {
2012-06-28 14:52:12 +02:00
allButVictim.foreach(markNodeAsUnavailable(_))
within(30 seconds) {
// victim becomes all alone
awaitCond({
val members = clusterView.members
clusterView.unreachableMembers.size == (roles.size - 1) &&
members.size == 1 &&
members.forall(_.status == MemberStatus.Up)
})
clusterView.unreachableMembers.map(_.address) must be((allButVictim map address).toSet)
clusterView.convergence must be(false)
}
}
runOn(allButVictim: _*) {
2012-06-28 14:52:12 +02:00
markNodeAsUnavailable(victim)
within(30 seconds) {
// victim becomes unreachable
awaitCond({
val members = clusterView.members
clusterView.unreachableMembers.size == 1 &&
members.size == (roles.size - 1) &&
members.forall(_.status == MemberStatus.Up)
})
awaitSeenSameState(allButVictim map address: _*)
// still one unreachable
clusterView.unreachableMembers.size must be(1)
clusterView.unreachableMembers.head.address must be(node(victim).address)
// and therefore no convergence
clusterView.convergence must be(false)
}
}
endBarrier
}
"mark the node as DOWN" taggedAs LongRunningTest in {
runOn(master) {
2012-06-28 14:52:12 +02:00
cluster down victim
}
runOn(allBut(victim): _*) {
2012-06-28 14:52:12 +02:00
awaitUpConvergence(roles.size - 1, Seq(victim))
}
endBarrier
}
"allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest in {
runOn(first) {
// put the network back in
allBut(victim).foreach { roleName
testConductor.passThrough(victim, roleName, Direction.Both).await
}
}
enterBarrier("plug_in_victim")
runOn(victim) {
2012-06-28 14:52:12 +02:00
cluster join master
}
2012-06-28 14:52:12 +02:00
awaitUpConvergence(roles.size)
endBarrier
}
}
}