From b349ad8d872e0c91ffaa5c4cf261e0d1fa68fa64 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 12 Feb 2013 21:45:41 +0100 Subject: [PATCH] Nodes not part of cluster have marked the Gossip as seen, see #3031 * Problem may occur when joining member with same hostname:port again, after downing. * Reproduced with StressSpec exerciseJoinRemove with fixed port that joins and shutdown several times. * Real solution for this will be covered by ticket #2788 by adding uid to member identifier, but as first step we need to support this scenario with current design. * Use unique node identifier for vector clock to avoid mixup of old and new member instance. * Support transition from Down to Joining in Gossip merge * Don't gossip to unknown or unreachable members. --- .../main/scala/akka/cluster/ClusterDaemon.scala | 15 +++++++++------ .../main/scala/akka/cluster/ClusterReadView.scala | 3 ++- .../src/main/scala/akka/cluster/Gossip.scala | 7 ++++++- .../src/test/scala/akka/cluster/GossipSpec.scala | 11 +++++++++++ 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index c6154716b1..e626da6852 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -9,6 +9,7 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.control.NonFatal +import java.util.UUID import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler } import akka.actor.OneForOneStrategy import akka.actor.Status.Failure @@ -219,7 +220,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto import cluster.{ selfAddress, scheduler, failureDetector } import cluster.settings._ - val vclockNode = VectorClock.Node(selfAddress.toString) + // FIXME the UUID should not be needed when Address contains uid, ticket #2788 + val vclockNode = VectorClock.Node(selfAddress.toString + "-" + UUID.randomUUID()) // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet @@ -507,10 +509,10 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto val localGossip = latestGossip if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) { - // FIXME how should we handle this situation? - log.debug("Received gossip with self as unreachable, from [{}]", from) - - } else if (!localGossip.overview.isNonDownUnreachable(from)) { + log.debug("Ignoring received gossip with self [{}] as unreachable, from [{}]", selfAddress, from) + } else if (localGossip.overview.isNonDownUnreachable(from)) { + log.debug("Ignoring received gossip from unreachable [{}] ", from) + } else { // leader handles merge conflicts, or when they have different views of how is leader val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader @@ -830,7 +832,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false)) def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = - if (address != selfAddress) clusterCore(address) ! gossipMsg + if (address != selfAddress && gossipMsg.gossip.members.exists(_.address == address)) + clusterCore(address) ! gossipMsg def publish(newGossip: Gossip): Unit = { publisher ! PublishChanges(newGossip) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index af69f977e0..00b0f18435 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -58,7 +58,8 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { state = state.copy(members = state.members - member, unreachable = state.unreachable - member + member) case event: MemberEvent ⇒ // replace current member with new member (might have different status, only address is used in equals) - state = state.copy(members = state.members - event.member + event.member) + state = state.copy(members = state.members - event.member + event.member, + unreachable = state.unreachable - event.member) case LeaderChanged(leader) ⇒ state = state.copy(leader = leader) case s: CurrentClusterState ⇒ state = s case CurrentInternalStats(stats) ⇒ _latestStats = stats diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 66a0b7d623..0309a8d67a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -123,7 +123,12 @@ private[cluster] case class Gossip( val mergedVClock = this.version merge that.version // 2. merge unreachable by selecting the single Member with highest MemberStatus out of the Member groups - val mergedUnreachable = Member.pickHighestPriority(this.overview.unreachable, that.overview.unreachable) + // FIXME allowing Down -> Joining should be adjusted as part of ticket #2788 + val mergedUnreachable = Member.pickHighestPriority( + this.overview.unreachable.filterNot(m1 ⇒ + m1.status == Down && that.members.exists(m2 ⇒ m2.status == Joining && m2.address == m1.address)), + that.overview.unreachable.filterNot(m1 ⇒ + m1.status == Down && this.members.exists(m2 ⇒ m2.status == Joining && m2.address == m1.address))) // 3. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index ca20858743..cca7c4213e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -25,6 +25,7 @@ class GossipSpec extends WordSpec with MustMatchers { val d2 = Member(Address("akka.tcp", "sys", "d", 2552), Removed) val e1 = Member(Address("akka.tcp", "sys", "e", 2552), Joining) val e2 = Member(Address("akka.tcp", "sys", "e", 2552), Up) + val e3 = Member(Address("akka.tcp", "sys", "e", 2552), Down) "A Gossip" must { @@ -78,6 +79,16 @@ class GossipSpec extends WordSpec with MustMatchers { } + "merge by allowing Down -> Joining" in { + val g1 = Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(e3))) + val g2 = Gossip(members = SortedSet(a1, b1, e1), overview = GossipOverview(unreachable = Set.empty)) + + val merged2 = g2 merge g1 + merged2.members must be(SortedSet(a1, b1, e1)) + merged2.members.toSeq.map(_.status) must be(Seq(Up, Up, Joining)) + merged2.overview.unreachable must be(Set.empty) + } + "start with fresh seen table after merge" in { val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(a1.address) val g2 = Gossip(members = SortedSet(a2, e2)).seen(e2.address).seen(e2.address)