From de1ad302172ff82b60b7cfbdefa2f6c5d295b811 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Jun 2012 16:07:20 +0200 Subject: [PATCH] Fix false convergence when singleton cluster, see #2222 * All members must be in seen table for convergence * Added extra debug logging due to convergence issues * Enabled test of convergence for node joining singleton cluster --- .../src/main/scala/akka/cluster/Cluster.scala | 34 +++++++++++++------ .../MembershipChangeListenerJoinSpec.scala | 7 ++-- .../test/scala/akka/cluster/ClusterSpec.scala | 3 +- 3 files changed, 29 insertions(+), 15 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 571a8eaf68..c090995e4c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1138,24 +1138,38 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) private def convergence(gossip: Gossip): Option[Gossip] = { val overview = gossip.overview val unreachable = overview.unreachable + val seen = overview.seen // First check that: - // 1. we don't have any members that are unreachable (unreachable.isEmpty == true), or + // 1. we don't have any members that are unreachable, or // 2. all unreachable members in the set have status DOWN // Else we can't continue to check for convergence // When that is done we check that all the entries in the 'seen' table have the same vector clock version - if (unreachable.isEmpty || !unreachable.exists { m ⇒ - m.status != MemberStatus.Down && - m.status != MemberStatus.Removed - }) { - val seen = gossip.overview.seen - val views = Set.empty[VectorClock] ++ seen.values + // and that all members exists in seen table + val hasUnreachable = unreachable.nonEmpty && unreachable.exists { m ⇒ + m.status != MemberStatus.Down && m.status != MemberStatus.Removed + } + val allMembersInSeen = gossip.members.forall(m ⇒ seen.contains(m.address)) - if (views.size == 1) { + if (hasUnreachable) { + log.debug("Cluster Node [{}] - No cluster convergence, due to unreachable [{}].", selfAddress, unreachable) + None + } else if (!allMembersInSeen) { + log.debug("Cluster Node [{}] - No cluster convergence, due to members not in seen table [{}].", selfAddress, + gossip.members.map(_.address) -- seen.keySet) + None + } else { + + val views = (Set.empty[VectorClock] ++ seen.values).size + + if (views == 1) { log.debug("Cluster Node [{}] - Cluster convergence reached: [{}]", selfAddress, gossip.members.mkString(", ")) Some(gossip) - } else None - } else None + } else { + log.debug("Cluster Node [{}] - No cluster convergence, due to [{}] different views.", selfAddress, views) + None + } + } } private def isAvailable(state: State): Boolean = !isUnavailable(state) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 2809ae820b..1b296c58f1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -18,7 +18,7 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(MultiNodeClusterSpec.clusterConfig))) } class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy @@ -40,14 +40,13 @@ abstract class MembershipChangeListenerJoinSpec val joinLatch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { - if (members.size == 2 && members.exists(_.status == MemberStatus.Joining)) // second node is not part of node ring anymore + if (members.size == 2 && members.exists(_.status == MemberStatus.Joining)) joinLatch.countDown() } }) testConductor.enter("registered-listener") joinLatch.await - cluster.convergence.isDefined must be(true) } runOn(second) { @@ -55,6 +54,8 @@ abstract class MembershipChangeListenerJoinSpec cluster.join(firstAddress) } + awaitUpConvergence(2) + testConductor.enter("after") } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 112da9d0c0..03f6460ea1 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -110,8 +110,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { cluster.joining(addresses(1)) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress, addresses(1))) memberStatus(addresses(1)) must be(Some(MemberStatus.Joining)) - // FIXME why is it still convergence immediately after joining? - //cluster.convergence.isDefined must be(false) + cluster.convergence.isDefined must be(false) } "accept a few more joining nodes" in {