diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index c038c5ecb5..0b81eed8eb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -3,31 +3,23 @@ */ package akka.cluster -import language.existentials -import scala.collection.{ SortedSet, breakOut, immutable, mutable } -import scala.concurrent.duration._ -import java.util.concurrent.ThreadLocalRandom - -import scala.util.control.NonFatal import akka.actor._ +import akka.annotation.InternalApi import akka.actor.SupervisorStrategy.Stop import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ -import akka.cluster.ClusterSettings.DataCenter import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } -import akka.remote.QuarantinedEvent -import java.util.ArrayList -import java.util.Collections - -import akka.pattern.ask -import akka.util.Timeout import akka.Done -import akka.annotation.InternalApi -import akka.cluster.ClusterSettings.DataCenter +import akka.pattern.ask +import akka.remote.QuarantinedEvent +import akka.util.Timeout +import scala.collection.immutable +import scala.concurrent.duration._ import scala.concurrent.Future import scala.concurrent.Promise -import scala.util.Random +import scala.util.control.NonFatal +import language.existentials /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -235,7 +227,6 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac */ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { - import InternalClusterAction._ // Important - don't use Cluster(context.system) in constructor because that would // cause deadlock. The Cluster extension is currently being created and is waiting @@ -274,8 +265,6 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi */ @InternalApi private[cluster] object ClusterCoreDaemon { - def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}" - val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5 val MaxGossipsBeforeShuttingDownMyself = 5 @@ -300,7 +289,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with protected def selfUniqueAddress = cluster.selfUniqueAddress - val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress)) + val vclockNode = VectorClock.Node(Gossip.vclockName(selfUniqueAddress)) val gossipTargetSelector = new GossipTargetSelector( ReduceGossipDifferentViewProbability, cluster.settings.MultiDataCenter.CrossDcGossipProbability) @@ -737,8 +726,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def downing(address: Address): Unit = { val localGossip = latestGossip val localMembers = localGossip.members - val localOverview = localGossip.overview - val localSeen = localOverview.seen val localReachability = membershipState.dcReachability // check if the node to DOWN is in the `members` set @@ -847,14 +834,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m) ⇒ if (removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) { log.debug("Cluster Node [{}] - Pruned conflicting local gossip: {}", selfAddress, m) - g.prune(VectorClock.Node(vclockName(m.uniqueAddress))) + g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress))) } else g } val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m) ⇒ if (removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) { log.debug("Cluster Node [{}] - Pruned conflicting remote gossip: {}", selfAddress, m) - g.prune(VectorClock.Node(vclockName(m.uniqueAddress))) + g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress))) } else g } diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index 9a05003004..cc42764524 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -22,6 +22,8 @@ private[cluster] object Gossip { def apply(members: immutable.SortedSet[Member]) = if (members.isEmpty) empty else empty.copy(members = members) + def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}" + } /** @@ -150,11 +152,14 @@ private[cluster] final case class Gossip( */ def merge(that: Gossip): Gossip = { - // 1. merge vector clocks - val mergedVClock = this.version merge that.version - - // 2. merge sets of tombstones + // 1. merge sets of tombstones val mergedTombstones = tombstones ++ that.tombstones + val newTombstonedNodes = mergedTombstones.keySet diff that.tombstones.keySet + + // 2. merge vector clocks (but remove entries for tombstoned nodes) + val mergedVClock = newTombstonedNodes.foldLeft(this.version merge that.version) { (vclock, node) ⇒ + vclock.prune(VectorClock.Node(Gossip.vclockName(node))) + } // 2. merge members by selecting the single Member with highest MemberStatus out of the Member groups val mergedMembers = Gossip.emptyMembers union Member.pickHighestPriority(this.members, that.members, mergedTombstones) @@ -228,7 +233,7 @@ private[cluster] final case class Gossip( // and will propagate as is if there are no other changes on other nodes. // If other concurrent changes on other nodes (e.g. join) the pruning is also // taken care of when receiving gossips. - val newVersion = version.prune(VectorClock.Node(ClusterCoreDaemon.vclockName(node))) + val newVersion = version.prune(VectorClock.Node(Gossip.vclockName(node))) val newMembers = members.filterNot(_.uniqueAddress == node) val newTombstones = tombstones + (node → removalTimestamp) copy(version = newVersion, members = newMembers, overview = newOverview, tombstones = newTombstones) diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index f4c29f2115..5ee3d0aea3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -7,7 +7,7 @@ package akka.cluster import org.scalatest.WordSpec import org.scalatest.Matchers import akka.actor.Address -import akka.cluster.ClusterSettings.DataCenter +import akka.cluster.Gossip.vclockName import akka.cluster.ClusterSettings.DefaultDataCenter import scala.collection.immutable.SortedSet @@ -354,9 +354,13 @@ class GossipSpec extends WordSpec with Matchers { "not reintroduce members from out-of data center gossip when merging" in { // dc1 does not know about any unreachability nor that the node has been downed val gdc1 = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1)) + .seen(dc1b1.uniqueAddress) + .seen(dc2c1.uniqueAddress) + .:+(VectorClock.Node(vclockName(dc2d1.uniqueAddress))) // just to make sure these are also pruned // dc2 has downed the dc2d1 node, seen it as unreachable and removed it val gdc2 = Gossip(members = SortedSet(dc1a1, dc1b1, dc2c1, dc2d1)) + .seen(dc1a1.uniqueAddress) .remove(dc2d1.uniqueAddress, System.currentTimeMillis()) gdc2.tombstones.keys should contain(dc2d1.uniqueAddress) @@ -372,7 +376,7 @@ class GossipSpec extends WordSpec with Matchers { merged1.members should not contain (dc2d1) merged1.overview.reachability.records.filter(r ⇒ r.subject == dc2d1.uniqueAddress || r.observer == dc2d1.uniqueAddress) should be(empty) merged1.overview.reachability.versions.keys should not contain (dc2d1.uniqueAddress) - + merged1.version.versions.keys should not contain (VectorClock.Node(vclockName(dc2d1.uniqueAddress))) } "prune old tombstones" in {