From dbac17621f80d62e7cd9aa0d4adbc964ad2e82a6 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 7 Jun 2012 15:45:10 +0200 Subject: [PATCH 1/2] Node that joins again should be ignored, see #2184 --- .../src/main/scala/akka/cluster/Cluster.scala | 31 ++++++++++--------- .../scala/akka/cluster/NodeUpSpec.scala | 30 +++++++++++++++++- 2 files changed, 45 insertions(+), 16 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index ce2e01cbca..7d1222a7ab 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -571,27 +571,28 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val localState = state.get val localGossip = localState.latestGossip val localMembers = localGossip.members - val localOverview = localGossip.overview - val localUnreachableMembers = localOverview.unreachable - // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster - val newUnreachableMembers = localUnreachableMembers filterNot { _.address == node } - val newOverview = localOverview copy (unreachable = newUnreachableMembers) + if (!localMembers.exists(_.address == node)) { - val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining - val newGossip = localGossip copy (overview = newOverview, members = newMembers) + // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster + val newUnreachableMembers = localGossip.overview.unreachable filterNot { _.address == node } + val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) - val versionedGossip = newGossip + vclockNode - val seenVersionedGossip = versionedGossip seen selfAddress + val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining + val newGossip = localGossip copy (overview = newOverview, members = newMembers) - val newState = localState copy (latestGossip = seenVersionedGossip) + val versionedGossip = newGossip + vclockNode + val seenVersionedGossip = versionedGossip seen selfAddress - if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update - else { - if (node != selfAddress) failureDetector heartbeat node + val newState = localState copy (latestGossip = seenVersionedGossip) - if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners foreach { _ notify newMembers } + if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update + else { + if (node != selfAddress) failureDetector heartbeat node + + if (convergence(newState.latestGossip).isDefined) { + newState.memberMembershipChangeListeners foreach { _ notify newMembers } + } } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index eafdf2fffd..b5fc5d626b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -9,6 +9,8 @@ import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.util.duration._ +import scala.collection.immutable.SortedSet +import java.util.concurrent.atomic.AtomicReference object NodeUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") @@ -33,7 +35,33 @@ abstract class NodeUpSpec awaitClusterUp(first, second) - testConductor.enter("after") + testConductor.enter("after-1") + } + + "be unaffected when joining again" taggedAs LongRunningTest in { + + val unexpected = new AtomicReference[SortedSet[Member]] + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size != 2 || members.exists(_.status != MemberStatus.Up)) + unexpected.set(members) + } + }) + testConductor.enter("listener-registered") + + runOn(second) { + cluster.join(node(first).address) + } + testConductor.enter("joined-again") + + // let it run for a while to make sure that nothing bad happens + for (n ← 1 to 20) { + 100.millis.dilated.sleep() + unexpected.get must be(null) + cluster.latestGossip.members.forall(_.status == MemberStatus.Up) + } + + testConductor.enter("after-2") } } } From 8c9d40eb00f927353eda94ccf3dcb0dae97ef302 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 7 Jun 2012 15:56:59 +0200 Subject: [PATCH 2/2] Add missing assert --- akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index b5fc5d626b..f8d0a1f6e2 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -58,7 +58,7 @@ abstract class NodeUpSpec for (n ← 1 to 20) { 100.millis.dilated.sleep() unexpected.get must be(null) - cluster.latestGossip.members.forall(_.status == MemberStatus.Up) + cluster.latestGossip.members.forall(_.status == MemberStatus.Up) must be(true) } testConductor.enter("after-2")