From 6ad96c257909576c5d95bd6f7ace1192a7912d89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 28 Jun 2012 14:52:12 +0200 Subject: [PATCH] Review changes --- .../src/main/scala/akka/cluster/Cluster.scala | 8 ++-- .../UnreachableNodeRejoinsClusterSpec.scala | 48 ++++++++----------- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 709f82a5e8..a15a361aff 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -737,9 +737,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) @tailrec final def join(address: Address): Unit = { val localState = state.get - // wipe our state + // wipe our state since a node that joins a cluster must be empty val newState = createCleanState copy (joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout))) - // wipe the failure detector + // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() if (!state.compareAndSet(localState, newState)) join(address) // recur else { @@ -819,11 +819,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!alreadyMember && !isUnreachable) { // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster - val newUnreachableMembers = localUnreachable filterNot { _.address == node } + val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) // remove the node from the failure detector if it is a DOWN node that is rejoining cluster - if (localUnreachable.size > newUnreachableMembers.size) failureDetector.remove(node) + if (rejoiningMember.nonEmpty) failureDetector.remove(node) // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index e943ae6c6c..34f8605af1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -18,12 +18,6 @@ object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - val allRoles = Seq(first, second, third, fourth) - - def allBut(role: RoleName, roles: Seq[RoleName] = allRoles): Seq[RoleName] = { - roles.filterNot(_ == role) - } - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -40,13 +34,15 @@ class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 exten abstract class UnreachableNodeRejoinsClusterSpec extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender with BeforeAndAfter { + with MultiNodeClusterSpec { import UnreachableNodeRejoinsClusterMultiJvmSpec._ - override def initialParticipants = allRoles.size + def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = { + roles.filterNot(_ == role) + } - lazy val sortedRoles = allRoles.sorted + + lazy val sortedRoles = roles.sorted lazy val master = sortedRoles(0) lazy val victim = sortedRoles(1) @@ -56,10 +52,10 @@ abstract class UnreachableNodeRejoinsClusterSpec enterBarrier("after_" + endBarrierNumber) } - "A cluster of " + allRoles.size + " members" must { + "A cluster of " + roles.size + " members" must { "reach initial convergence" taggedAs LongRunningTest in { - awaitClusterUp(allRoles:_*) + awaitClusterUp(roles:_*) endBarrier } @@ -73,35 +69,32 @@ abstract class UnreachableNodeRejoinsClusterSpec enterBarrier("unplug_victim") + val allButVictim = allBut(victim, sortedRoles) runOn(victim) { - val otherAddresses = sortedRoles.collect { case x if x != victim => node(x).address } - otherAddresses.foreach(markNodeAsUnavailable(_)) + allButVictim.foreach(markNodeAsUnavailable(_)) within(30 seconds) { // victim becomes all alone awaitCond({ val gossip = cluster.latestGossip - gossip.overview.unreachable.size == (allRoles.size - 1) && + gossip.overview.unreachable.size == (roles.size - 1) && gossip.members.size == 1 && gossip.members.forall(_.status == MemberStatus.Up) }) - cluster.latestGossip.overview.unreachable.map(_.address) must be(otherAddresses.toSet) + cluster.latestGossip.overview.unreachable.map(_.address) must be((allButVictim map address).toSet) cluster.convergence.isDefined must be(false) } } - val allButVictim = allBut(victim) runOn(allButVictim:_*) { - val victimAddress = node(victim).address - val otherAddresses = allButVictim.map(node(_).address) - markNodeAsUnavailable(victimAddress) + markNodeAsUnavailable(victim) within(30 seconds) { // victim becomes unreachable awaitCond({ val gossip = cluster.latestGossip gossip.overview.unreachable.size == 1 && - gossip.members.size == (allRoles.size - 1) && + gossip.members.size == (roles.size - 1) && gossip.members.forall(_.status == MemberStatus.Up) }) - awaitSeenSameState(otherAddresses:_*) + awaitSeenSameState(allButVictim map address:_*) // still one unreachable cluster.latestGossip.overview.unreachable.size must be(1) - cluster.latestGossip.overview.unreachable.head.address must be(victimAddress) + cluster.latestGossip.overview.unreachable.head.address must be(node(victim).address) // and therefore no convergence cluster.convergence.isDefined must be(false) } @@ -111,13 +104,12 @@ abstract class UnreachableNodeRejoinsClusterSpec } "mark the node as DOWN" taggedAs LongRunningTest in { - val victimAddress = node(victim).address runOn(master) { - cluster.down(victimAddress) + cluster down victim } runOn(allBut(victim):_*) { - awaitUpConvergence(allRoles.size - 1, Seq(victimAddress)) + awaitUpConvergence(roles.size - 1, Seq(victim)) } endBarrier @@ -134,10 +126,10 @@ abstract class UnreachableNodeRejoinsClusterSpec enterBarrier("plug_in_victim") runOn(victim) { - cluster.join(node(master).address) + cluster join master } - awaitUpConvergence(allRoles.size) + awaitUpConvergence(roles.size) endBarrier }