From 203e2cb13c90c855e52866993ede2ca4dc99ce33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 28 May 2012 11:06:02 +0200 Subject: [PATCH] Added LeaderDowning spec for testing that the leader automatically is downing an unreachable node. Fixes 2112. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - tests downing a node at the end of the node ring - tests downing a node in the middle of the node ring - added some more utility stuff to the MultiNodeClusterSpec Signed-off-by: Jonas Bonér --- .../akka/cluster/LeaderDowningSpec.scala | 124 ++++++++++++++++++ .../akka/cluster/MultiNodeClusterSpec.scala | 32 +++-- 2 files changed, 145 insertions(+), 11 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningSpec.scala new file mode 100644 index 0000000000..357f360853 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningSpec.scala @@ -0,0 +1,124 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.actor.Address +import akka.util.duration._ + +object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-down = on")). + withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class LeaderDowningNodeThatIsUnreachableMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec +class LeaderDowningNodeThatIsUnreachableMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec +class LeaderDowningNodeThatIsUnreachableMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec +class LeaderDowningNodeThatIsUnreachableMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec + +class LeaderDowningNodeThatIsUnreachableSpec + extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with BeforeAndAfter { + import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._ + + override def initialParticipants = 4 + + "The Leader in a 4 node cluster" must { + + "be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in { + runOn(first) { + cluster.self + awaitUpConvergence(numberOfMembers = 4) + + val fourthAddress = node(fourth).address + testConductor.enter("all-up") + + // kill 'fourth' node + testConductor.shutdown(fourth, 0) + testConductor.removeNode(fourth) + testConductor.enter("down-fourth-node") + + // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- + + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds.dilated) + testConductor.enter("await-completion") + } + + runOn(fourth) { + cluster.join(node(first).address) + + awaitUpConvergence(numberOfMembers = 4) + + cluster.isLeader must be(false) + testConductor.enter("all-up") + } + + runOn(second, third) { + cluster.join(node(first).address) + awaitUpConvergence(numberOfMembers = 4) + + val fourthAddress = node(fourth).address + testConductor.enter("all-up") + + testConductor.enter("down-fourth-node") + + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds.dilated) + testConductor.enter("await-completion") + } + } + + "be able to DOWN a 'middle' node that is UNREACHABLE" taggedAs LongRunningTest in { + runOn(first) { + cluster.self + awaitUpConvergence(numberOfMembers = 3) + + val secondAddress = node(second).address + testConductor.enter("all-up") + + // kill 'second' node + testConductor.shutdown(second, 0) + testConductor.removeNode(second) + testConductor.enter("down-second-node") + + // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- + + awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds.dilated) + testConductor.enter("await-completion") + } + + runOn(second) { + cluster.join(node(first).address) + + awaitUpConvergence(numberOfMembers = 3) + + cluster.isLeader must be(false) + testConductor.enter("all-up") + } + + runOn(second, third) { + cluster.join(node(first).address) + awaitUpConvergence(numberOfMembers = 3) + + val secondAddress = node(second).address + testConductor.enter("all-up") + + testConductor.enter("down-second-node") + + awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30 seconds) + testConductor.enter("await-completion") + } + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 873d819dbb..6f371ee1bc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -8,15 +8,19 @@ import com.typesafe.config.ConfigFactory import akka.actor.Address import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ import akka.util.duration._ +import akka.util.Duration object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms + gossip-frequency = 200 ms + leader-actions-frequency = 200 ms + unreachable-nodes-reaper-frequency = 200 ms + periodic-tasks-initial-delay = 300 ms } + akka.test { single-expect-default = 5 s } @@ -51,13 +55,19 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ } /** - * Wait until the expected number of members has status Up - * and convergence has been reached. + * Wait until the expected number of members has status Up and convergence has been reached. + * Also asserts that nodes in the 'canNotBePartOfMemberRing' are *not* part of the cluster ring. */ - def awaitUpConvergence(numberOfMembers: Int): Unit = { - awaitCond(cluster.latestGossip.members.size == numberOfMembers) - awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(cluster.convergence.isDefined, 10 seconds) + def awaitUpConvergence( + numberOfMembers: Int, + canNotBePartOfMemberRing: Seq[Address] = Seq.empty[Address], + timeout: Duration = 10.seconds.dilated): Unit = { + awaitCond(cluster.latestGossip.members.size == numberOfMembers, timeout) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up), timeout) + awaitCond(cluster.convergence.isDefined, timeout) + if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set + awaitCond( + canNotBePartOfMemberRing forall (address => !(cluster.latestGossip.members exists (_.address.port == address.port))), + timeout) } - -} \ No newline at end of file +}