Cluster node that is UNREACHABLE and rejoins. see #2160

This commit is contained in:
Björn Antonsson 2012-06-05 15:46:26 +02:00
parent 3d7360cd3c
commit 2ea0bba9e9
3 changed files with 171 additions and 0 deletions

View file

@ -203,6 +203,10 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu
nodesInCluster.sorted.head
}
def clusterSortedRoles(nodesInCluster: Seq[RoleName]): Seq[RoleName] = {
nodesInCluster.sorted
}
/**
* Sort the roles in the order used by the cluster.
*/

View file

@ -0,0 +1,154 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import akka.actor.Address
import akka.remote.testconductor.{RoleName, Direction}
import akka.util.duration._
object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
val allRoles = Seq(first, second, third, fourth)
def allBut(role: RoleName, roles: Seq[RoleName] = allRoles): Seq[RoleName] = {
roles.filter(_ != role)
}
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster {
failure-detector.threshold = 5
} """)
).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class UnreachableNodeRejoinsClusterMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec
class UnreachableNodeRejoinsClusterMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec
class UnreachableNodeRejoinsClusterMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec
class UnreachableNodeRejoinsClusterMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec
class UnreachableNodeRejoinsClusterSpec
extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec)
with MultiNodeClusterSpec
with ImplicitSender with BeforeAndAfter {
import UnreachableNodeRejoinsClusterMultiJvmSpec._
override def initialParticipants = allRoles.size
val sortedRoles = clusterSortedRoles(allRoles)
val master = sortedRoles(0)
val victim = sortedRoles(1)
var endBarrierNumber = 0
def endBarrier = {
endBarrierNumber += 1
testConductor.enter("after_" + endBarrierNumber)
}
"A cluster of " + allRoles.size + " members" must {
"reach initial convergence" taggedAs LongRunningTest in {
runOn(master) {
cluster.self
awaitUpConvergence(numberOfMembers = allRoles.size)
}
runOn(allBut(master):_*) {
cluster.join(node(master).address)
awaitUpConvergence(numberOfMembers = allRoles.size)
}
endBarrier
}
"mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in {
runOn(first) {
// pull network for victim node from all nodes
allBut(victim).foreach { roleName =>
testConductor.blackhole(victim, roleName, Direction.Both).await
}
testConductor.enter("unplug_victim")
}
runOn(allBut(first):_*) {
testConductor.enter("unplug_victim")
}
runOn(victim) {
val otherAddresses = sortedRoles.filter(_ != victim).map(node(_).address)
within(30 seconds) {
awaitCond(cluster.latestGossip.overview.unreachable.size == (allRoles.size - 1))
awaitCond(cluster.latestGossip.members.size == 1)
awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up))
cluster.latestGossip.overview.unreachable.map(_.address) must be(otherAddresses.toSet)
cluster.convergence.isDefined must be(false)
}
}
val allButVictim = allBut(victim)
runOn(allButVictim: _*) {
val victimAddress = node(victim).address
val otherAddresses = allButVictim.map(node(_).address)
within(30 seconds) {
// victim becomes unreachable
awaitCond(cluster.latestGossip.overview.unreachable.size == 1)
awaitCond(cluster.latestGossip.members.size == (allRoles.size - 1))
awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up))
awaitSeenSameState(otherAddresses)
// still one unreachable
cluster.latestGossip.overview.unreachable.size must be(1)
cluster.latestGossip.overview.unreachable.head.address must be(victimAddress)
// and therefore no convergence
cluster.convergence.isDefined must be(false)
}
}
endBarrier
}
"mark the node as DOWN" taggedAs LongRunningTest in {
val victimAddress = node(victim).address
runOn(master) {
cluster.down(victimAddress)
}
runOn(allBut(victim):_*) {
awaitUpConvergence(allRoles.size - 1, Seq(victimAddress))
}
endBarrier
}
"allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest in {
runOn(first) {
// put the network back in
allBut(victim).foreach { roleName =>
testConductor.passThrough(victim, roleName, Direction.Both).await
}
testConductor.enter("plug_in_victim")
}
runOn(allBut(first):_*) {
testConductor.enter("plug_in_victim")
}
runOn(victim) {
cluster.join(node(master).address)
}
awaitUpConvergence(allRoles.size)
endBarrier
}
}
}

View file

@ -139,6 +139,19 @@ trait Conductor { this: TestConductorExt ⇒
controller ? Throttle(node, target, direction, 0f) mapTo
}
/**
* Switch the Netty pipeline of the remote support into pass through mode for
* sending and/or receiving.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
*/
def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
import Settings.QueryTimeout
controller ? Throttle(node, target, direction, -1f) mapTo
}
/**
* Tell the remote support to shutdown the connection to the given remote
* peer. It works regardless of whether the recipient was initiator or