Speed up cluster gossip processing. See #3441

Check VectorClock for common case first and cache hashCodes. See #3441
Make ClusterDaemon a bit more testable. See #3441
Changing VectorClock and GossipOverview to TreeMaps. See #3441
Make VectorClock private[cluster] and remove unused code. See #3441
This commit is contained in:
Björn Antonsson 2013-06-13 15:43:37 -04:00
parent 981bce5dd0
commit 1adfcb8454
7 changed files with 128 additions and 237 deletions

View file

@ -9,7 +9,6 @@ import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.control.NonFatal
import java.util.UUID
import akka.actor.{ Actor, ActorLogging, ActorRef, Address, Cancellable, Props, PoisonPill, ReceiveTimeout, RootActorPath, Scheduler }
import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy.Stop
@ -215,15 +214,17 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi
/**
* INTERNAL API.
*/
private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging
with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
import InternalClusterAction._
val cluster = Cluster(context.system)
import cluster.{ selfAddress, selfUniqueAddress, scheduler, failureDetector }
import cluster.{ selfAddress, scheduler, failureDetector }
import cluster.settings._
import cluster.InfoLogger._
protected def selfUniqueAddress = cluster.selfUniqueAddress
val NumberOfGossipsBeforeShutdownWhenLeaderExits = 3
def vclockName(node: UniqueAddress): String = node.address + "-" + node.uid
@ -549,49 +550,63 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
else if (latestGossip.members.forall(_.uniqueAddress != from))
logInfo("Ignoring received gossip status from unknown [{}]", from)
else {
(status.version tryCompareTo latestGossip.version) match {
case Some(0) // same version
case Some(x) if x > 0 gossipStatusTo(from, sender) // remote is newer
case _ gossipTo(from, sender) // conflicting or local is newer
(status.version compareTo latestGossip.version) match {
case VectorClock.Same // same version
case VectorClock.After gossipStatusTo(from, sender) // remote is newer
case _ gossipTo(from, sender) // conflicting or local is newer
}
}
}
/**
* The types of gossip actions that receive gossip has performed.
*/
sealed trait ReceiveGossipType
case object Ignored extends ReceiveGossipType
case object Older extends ReceiveGossipType
case object Newer extends ReceiveGossipType
case object Same extends ReceiveGossipType
case object Merge extends ReceiveGossipType
/**
* Receive new gossip.
*/
def receiveGossip(envelope: GossipEnvelope): Unit = {
def receiveGossip(envelope: GossipEnvelope): ReceiveGossipType = {
val from = envelope.from
val remoteGossip = envelope.gossip
val localGossip = latestGossip
if (envelope.to != selfUniqueAddress)
if (envelope.to != selfUniqueAddress) {
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
if (remoteGossip.overview.unreachable.exists(_.address == selfAddress))
Ignored
} else if (remoteGossip.overview.unreachable.exists(_.address == selfAddress)) {
logInfo("Ignoring received gossip with myself as unreachable, from [{}]", from.address)
else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from))
Ignored
} else if (localGossip.overview.unreachable.exists(_.uniqueAddress == from)) {
logInfo("Ignoring received gossip from unreachable [{}] ", from)
else if (localGossip.members.forall(_.uniqueAddress != from))
Ignored
} else if (localGossip.members.forall(_.uniqueAddress != from)) {
logInfo("Ignoring received gossip from unknown [{}]", from)
else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress))
Ignored
} else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) {
logInfo("Ignoring received gossip that does not contain myself, from [{}]", from)
else {
Ignored
} else {
val comparison = remoteGossip.version compareTo localGossip.version
val comparison = remoteGossip.version tryCompareTo localGossip.version
val (winningGossip, talkback) = comparison match {
case None
val (winningGossip, talkback, gossipType) = comparison match {
case VectorClock.Concurrent
// conflicting versions, merge
(remoteGossip merge localGossip, true)
case Some(0)
(remoteGossip merge localGossip, true, Merge)
case VectorClock.Same
// same version
(remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress))
case Some(x) if x < 0
(remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), Same)
case VectorClock.Before
// local is newer
(localGossip, true)
case _
(localGossip, true, Older)
case VectorClock.After
// remote is newer
(remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress))
(remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), Newer)
}
latestGossip = winningGossip seen selfUniqueAddress
@ -603,18 +618,18 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
if (comparison.isEmpty) {
if (comparison == VectorClock.Concurrent) {
log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip)
}
if (statsEnabled) {
gossipStats = comparison match {
case None gossipStats.incrementMergeCount
case Some(0) gossipStats.incrementSameCount
case Some(x) if x < 0 gossipStats.incrementNewerCount
case _ gossipStats.incrementOlderCount
gossipStats = gossipType match {
case Merge gossipStats.incrementMergeCount
case Same gossipStats.incrementSameCount
case Newer gossipStats.incrementNewerCount
case Older gossipStats.incrementOlderCount
}
vclockStats = VectorClockStats(
versionSize = latestGossip.version.versions.size,
@ -630,6 +645,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// older or sender had newer
gossipTo(from, sender)
}
gossipType
}
}