Merging did not prune vector clocks for tombstoned nodes #23318
This commit is contained in:
parent
9f4da87840
commit
a15e459922
3 changed files with 27 additions and 31 deletions
|
|
@ -3,31 +3,23 @@
|
|||
*/
|
||||
package akka.cluster
|
||||
|
||||
import language.existentials
|
||||
import scala.collection.{ SortedSet, breakOut, immutable, mutable }
|
||||
import scala.concurrent.duration._
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor._
|
||||
import akka.annotation.InternalApi
|
||||
import akka.actor.SupervisorStrategy.Stop
|
||||
import akka.cluster.MemberStatus._
|
||||
import akka.cluster.ClusterEvent._
|
||||
import akka.cluster.ClusterSettings.DataCenter
|
||||
import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
|
||||
import akka.remote.QuarantinedEvent
|
||||
import java.util.ArrayList
|
||||
import java.util.Collections
|
||||
|
||||
import akka.pattern.ask
|
||||
import akka.util.Timeout
|
||||
import akka.Done
|
||||
import akka.annotation.InternalApi
|
||||
import akka.cluster.ClusterSettings.DataCenter
|
||||
import akka.pattern.ask
|
||||
import akka.remote.QuarantinedEvent
|
||||
import akka.util.Timeout
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.Promise
|
||||
import scala.util.Random
|
||||
import scala.util.control.NonFatal
|
||||
import language.existentials
|
||||
|
||||
/**
|
||||
* Base trait for all cluster messages. All ClusterMessage's are serializable.
|
||||
|
|
@ -235,7 +227,6 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
|
|||
*/
|
||||
private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLogging
|
||||
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
|
||||
import InternalClusterAction._
|
||||
|
||||
// Important - don't use Cluster(context.system) in constructor because that would
|
||||
// cause deadlock. The Cluster extension is currently being created and is waiting
|
||||
|
|
@ -274,8 +265,6 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
|
|||
*/
|
||||
@InternalApi
|
||||
private[cluster] object ClusterCoreDaemon {
|
||||
def vclockName(node: UniqueAddress): String = s"${node.address}-${node.longUid}"
|
||||
|
||||
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 5
|
||||
val MaxGossipsBeforeShuttingDownMyself = 5
|
||||
|
||||
|
|
@ -300,7 +289,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
|
||||
protected def selfUniqueAddress = cluster.selfUniqueAddress
|
||||
|
||||
val vclockNode = VectorClock.Node(vclockName(selfUniqueAddress))
|
||||
val vclockNode = VectorClock.Node(Gossip.vclockName(selfUniqueAddress))
|
||||
val gossipTargetSelector = new GossipTargetSelector(
|
||||
ReduceGossipDifferentViewProbability,
|
||||
cluster.settings.MultiDataCenter.CrossDcGossipProbability)
|
||||
|
|
@ -737,8 +726,6 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
def downing(address: Address): Unit = {
|
||||
val localGossip = latestGossip
|
||||
val localMembers = localGossip.members
|
||||
val localOverview = localGossip.overview
|
||||
val localSeen = localOverview.seen
|
||||
val localReachability = membershipState.dcReachability
|
||||
|
||||
// check if the node to DOWN is in the `members` set
|
||||
|
|
@ -847,14 +834,14 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
val prunedLocalGossip = localGossip.members.foldLeft(localGossip) { (g, m) ⇒
|
||||
if (removeUnreachableWithMemberStatus(m.status) && !remoteGossip.members.contains(m)) {
|
||||
log.debug("Cluster Node [{}] - Pruned conflicting local gossip: {}", selfAddress, m)
|
||||
g.prune(VectorClock.Node(vclockName(m.uniqueAddress)))
|
||||
g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress)))
|
||||
} else
|
||||
g
|
||||
}
|
||||
val prunedRemoteGossip = remoteGossip.members.foldLeft(remoteGossip) { (g, m) ⇒
|
||||
if (removeUnreachableWithMemberStatus(m.status) && !localGossip.members.contains(m)) {
|
||||
log.debug("Cluster Node [{}] - Pruned conflicting remote gossip: {}", selfAddress, m)
|
||||
g.prune(VectorClock.Node(vclockName(m.uniqueAddress)))
|
||||
g.prune(VectorClock.Node(Gossip.vclockName(m.uniqueAddress)))
|
||||
} else
|
||||
g
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue