From 33f14f9bf691ccff6ec5d38378e1e55550059e67 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 1 Jun 2012 15:15:53 +0200 Subject: [PATCH 1/2] Test gossip convergence, see #2164 --- .../src/main/scala/akka/cluster/Cluster.scala | 1 - .../scala/akka/cluster/ConvergenceSpec.scala | 131 ++++++++++++++++++ .../akka/cluster/MultiNodeClusterSpec.scala | 11 ++ 3 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 98d0a3f11e..3729a0b3b4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -865,7 +865,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val localGossip = localState.latestGossip val localOverview = localGossip.overview - val localSeen = localOverview.seen val localMembers = localGossip.members val localUnreachableMembers = localGossip.overview.unreachable diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala new file mode 100644 index 0000000000..eeb9b864ed --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.Address + +object ConvergenceMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + failure-detector.threshold = 4 + } + """)). + withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class ConvergenceMultiJvmNode1 extends ConvergenceSpec +class ConvergenceMultiJvmNode2 extends ConvergenceSpec +class ConvergenceMultiJvmNode3 extends ConvergenceSpec +class ConvergenceMultiJvmNode4 extends ConvergenceSpec + +abstract class ConvergenceSpec + extends MultiNodeSpec(ConvergenceMultiJvmSpec) + with MultiNodeClusterSpec with BeforeAndAfter { + import ConvergenceMultiJvmSpec._ + + override def initialParticipants = 4 + + after { + testConductor.enter("after") + } + + "A cluster of 3 members" must { + + "reach initial convergence" taggedAs LongRunningTest in { + runOn(first) { + cluster.self + awaitUpConvergence(numberOfMembers = 3) + } + + runOn(second, third) { + cluster.join(node(first).address) + awaitUpConvergence(numberOfMembers = 3) + } + + runOn(fourth) { + // doesn't join immediately + } + } + + "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in { + val thirdAddress = node(third).address + testConductor.enter("before-shutdown") + + runOn(first) { + // kill 'third' node + testConductor.shutdown(third, 0) + testConductor.removeNode(third) + } + + runOn(first, second) { + val firstAddress = node(first).address + val secondAddress = node(second).address + + within(30 seconds) { + // third becomes unreachable + awaitCond(cluster.latestGossip.overview.unreachable.size == 1) + awaitCond(cluster.latestGossip.members.size == 2) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitSeenSameState(Seq(firstAddress, secondAddress)) + // still one unreachable + cluster.latestGossip.overview.unreachable.size must be(1) + cluster.latestGossip.overview.unreachable.head.address must be(thirdAddress) + // and therefore no convergence + cluster.convergence.isDefined must be(false) + + } + } + + } + + "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in { + runOn(fourth) { + // try to join + cluster.join(node(first).address) + } + + val firstAddress = node(first).address + val secondAddress = node(second).address + val fourthAddress = node(fourth).address + + def memberStatus(address: Address): Option[MemberStatus] = + cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status } + + def assertNotMovedUp: Unit = { + within(20 seconds) { + awaitCond(cluster.latestGossip.members.size == 3) + awaitSeenSameState(Seq(firstAddress, secondAddress, fourthAddress)) + memberStatus(firstAddress) must be(Some(MemberStatus.Up)) + memberStatus(secondAddress) must be(Some(MemberStatus.Up)) + // leader is not allowed to move the new node to Up + memberStatus(fourthAddress) must be(Some(MemberStatus.Joining)) + // still no convergence + cluster.convergence.isDefined must be(false) + } + } + + runOn(first, second, fourth) { + for (n ← 1 to 5) { + log.debug("assertNotMovedUp#" + n) + assertNotMovedUp + // wait and then check again + 1.second.dilated.sleep + } + } + + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index cb679c12b7..4c0232cf9e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -71,6 +71,17 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ } } + /** + * Wait until the specified nodes have seen the same gossip overview. + */ + def awaitSeenSameState(addresses: Seq[Address]): Unit = { + awaitCond { + val seen = cluster.latestGossip.overview.seen + val seenVectorClocks = addresses.flatMap(seen.get(_)) + seenVectorClocks.size == addresses.size && seenVectorClocks.toSet.size == 1 + } + } + def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { nodesInCluster.length must not be (0) nodesInCluster.sorted.head From b1c507f3b95bd69eb75d8fa2ee13adb494c16d23 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 4 Jun 2012 11:37:23 +0200 Subject: [PATCH 2/2] Shutdown does removeNode, see #2137 --- .../scala/akka/cluster/ConvergenceSpec.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index eeb9b864ed..a76083b0fc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -4,7 +4,6 @@ package akka.cluster import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ @@ -33,15 +32,11 @@ class ConvergenceMultiJvmNode4 extends ConvergenceSpec abstract class ConvergenceSpec extends MultiNodeSpec(ConvergenceMultiJvmSpec) - with MultiNodeClusterSpec with BeforeAndAfter { + with MultiNodeClusterSpec { import ConvergenceMultiJvmSpec._ override def initialParticipants = 4 - after { - testConductor.enter("after") - } - "A cluster of 3 members" must { "reach initial convergence" taggedAs LongRunningTest in { @@ -58,6 +53,8 @@ abstract class ConvergenceSpec runOn(fourth) { // doesn't join immediately } + + testConductor.enter("after-1") } "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in { @@ -67,14 +64,13 @@ abstract class ConvergenceSpec runOn(first) { // kill 'third' node testConductor.shutdown(third, 0) - testConductor.removeNode(third) } runOn(first, second) { val firstAddress = node(first).address val secondAddress = node(second).address - within(30 seconds) { + within(25 seconds) { // third becomes unreachable awaitCond(cluster.latestGossip.overview.unreachable.size == 1) awaitCond(cluster.latestGossip.members.size == 2) @@ -89,6 +85,7 @@ abstract class ConvergenceSpec } } + testConductor.enter("after-2") } "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in { @@ -126,6 +123,7 @@ abstract class ConvergenceSpec } } + testConductor.enter("after-3") } } }