Merge pull request #505 from akka/wip-2164-convergence-patriknw

Test gossip convergence, see #2164
This commit is contained in:
patriknw 2012-06-04 06:16:34 -07:00
commit ce332a9f96
3 changed files with 140 additions and 1 deletions

View file

@ -879,7 +879,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val localGossip = localState.latestGossip val localGossip = localState.latestGossip
val localOverview = localGossip.overview val localOverview = localGossip.overview
val localSeen = localOverview.seen
val localMembers = localGossip.members val localMembers = localGossip.members
val localUnreachableMembers = localGossip.overview.unreachable val localUnreachableMembers = localGossip.overview.unreachable

View file

@ -0,0 +1,129 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import akka.util.duration._
import akka.actor.Address
object ConvergenceMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("""
akka.cluster {
failure-detector.threshold = 4
}
""")).
withFallback(MultiNodeClusterSpec.clusterConfig))
}
class ConvergenceMultiJvmNode1 extends ConvergenceSpec
class ConvergenceMultiJvmNode2 extends ConvergenceSpec
class ConvergenceMultiJvmNode3 extends ConvergenceSpec
class ConvergenceMultiJvmNode4 extends ConvergenceSpec
abstract class ConvergenceSpec
extends MultiNodeSpec(ConvergenceMultiJvmSpec)
with MultiNodeClusterSpec {
import ConvergenceMultiJvmSpec._
override def initialParticipants = 4
"A cluster of 3 members" must {
"reach initial convergence" taggedAs LongRunningTest in {
runOn(first) {
cluster.self
awaitUpConvergence(numberOfMembers = 3)
}
runOn(second, third) {
cluster.join(node(first).address)
awaitUpConvergence(numberOfMembers = 3)
}
runOn(fourth) {
// doesn't join immediately
}
testConductor.enter("after-1")
}
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in {
val thirdAddress = node(third).address
testConductor.enter("before-shutdown")
runOn(first) {
// kill 'third' node
testConductor.shutdown(third, 0)
}
runOn(first, second) {
val firstAddress = node(first).address
val secondAddress = node(second).address
within(25 seconds) {
// third becomes unreachable
awaitCond(cluster.latestGossip.overview.unreachable.size == 1)
awaitCond(cluster.latestGossip.members.size == 2)
awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up))
awaitSeenSameState(Seq(firstAddress, secondAddress))
// still one unreachable
cluster.latestGossip.overview.unreachable.size must be(1)
cluster.latestGossip.overview.unreachable.head.address must be(thirdAddress)
// and therefore no convergence
cluster.convergence.isDefined must be(false)
}
}
testConductor.enter("after-2")
}
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in {
runOn(fourth) {
// try to join
cluster.join(node(first).address)
}
val firstAddress = node(first).address
val secondAddress = node(second).address
val fourthAddress = node(fourth).address
def memberStatus(address: Address): Option[MemberStatus] =
cluster.latestGossip.members.collectFirst { case m if m.address == address m.status }
def assertNotMovedUp: Unit = {
within(20 seconds) {
awaitCond(cluster.latestGossip.members.size == 3)
awaitSeenSameState(Seq(firstAddress, secondAddress, fourthAddress))
memberStatus(firstAddress) must be(Some(MemberStatus.Up))
memberStatus(secondAddress) must be(Some(MemberStatus.Up))
// leader is not allowed to move the new node to Up
memberStatus(fourthAddress) must be(Some(MemberStatus.Joining))
// still no convergence
cluster.convergence.isDefined must be(false)
}
}
runOn(first, second, fourth) {
for (n 1 to 5) {
log.debug("assertNotMovedUp#" + n)
assertNotMovedUp
// wait and then check again
1.second.dilated.sleep
}
}
testConductor.enter("after-3")
}
}
}

View file

@ -76,6 +76,17 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
} }
} }
/**
* Wait until the specified nodes have seen the same gossip overview.
*/
def awaitSeenSameState(addresses: Seq[Address]): Unit = {
awaitCond {
val seen = cluster.latestGossip.overview.seen
val seenVectorClocks = addresses.flatMap(seen.get(_))
seenVectorClocks.size == addresses.size && seenVectorClocks.toSet.size == 1
}
}
def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = {
nodesInCluster.length must not be (0) nodesInCluster.length must not be (0)
nodesInCluster.sorted.head nodesInCluster.sorted.head