Merge pull request #1230 from akka/wip-3076-gossip-merge-changes-ban

Don't increment vector-clock on merge and merge locally. See #3076
This commit is contained in:
Viktor Klang (√) 2013-03-12 08:49:30 -07:00
commit 05593f5dd8
7 changed files with 68 additions and 167 deletions

View file

@ -65,10 +65,6 @@ akka {
# Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always.
gossip-different-view-probability = 0.8
# Limit number of merge conflicts per second that are handled. If the limit is
# exceeded the conflicting gossip messages are dropped and will reappear later.
max-gossip-merge-rate = 5.0
# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
# [Hayashibara et al]) used by the cluster subsystem to detect unreachable members.
failure-detector {

View file

@ -289,7 +289,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
def initialized: Actor.Receive = {
case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipMergeConflict receiveGossipMerge(msg)
case GossipTick gossip()
case ReapUnreachableTick reapUnreachableMembers()
case LeaderActionsTick leaderActions()
@ -505,35 +504,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
publish(latestGossip)
}
/**
* When conflicting versions of received and local [[akka.cluster.Gossip]] is detected
* it's forwarded to the leader for conflict resolution. Trying to simultaneously
* resolving conflicts at several nodes creates new conflicts. Therefore the leader resolves
* conflicts to limit divergence. To avoid overload there is also a configurable rate
* limit of how many conflicts that are handled by second. If the limit is
* exceeded the conflicting gossip messages are dropped and will reappear later.
*/
def receiveGossipMerge(merge: GossipMergeConflict): Unit = {
stats = stats.incrementMergeConflictCount
val rate = mergeRate(stats.mergeConflictCount)
if (rate <= MaxGossipMergeRate) {
receiveGossip(merge.a.copy(conversation = false))
receiveGossip(merge.b.copy(conversation = false))
// use one-way gossip from leader to reduce load of leader
def sendBack(to: Address): Unit = {
if (to != selfAddress && !latestGossip.overview.unreachable.exists(_.address == to))
oneWayGossipTo(to)
}
sendBack(merge.a.from)
sendBack(merge.b.from)
} else {
log.debug("Dropping gossip merge conflict due to rate [{}] / s ", rate)
}
}
/**
* Receive new gossip.
*/
@ -547,57 +517,35 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
} else if (localGossip.overview.isNonDownUnreachable(from)) {
log.debug("Ignoring received gossip from unreachable [{}] ", from)
} else {
// leader handles merge conflicts, or when they have different views of how is leader
val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader
val comparison = remoteGossip.version tryCompareTo localGossip.version
val conflict = comparison.isEmpty
if (conflict && !handleMerge) {
// delegate merge resolution to leader to reduce number of simultaneous resolves,
// which will result in new conflicts
stats = stats.incrementMergeDetectedCount
log.debug("Merge conflict [{}] detected [{}] <> [{}]", stats.mergeDetectedCount, selfAddress, from)
stats = stats.incrementMergeConflictCount
val rate = mergeRate(stats.mergeConflictCount)
if (rate <= MaxGossipMergeRate)
localGossip.leader foreach { clusterCore(_) ! GossipMergeConflict(GossipEnvelope(selfAddress, localGossip), envelope) }
else
log.debug("Skipping gossip merge conflict due to rate [{}] / s ", rate)
} else {
val (winningGossip, talkback, newStats) = comparison match {
case None
// conflicting versions, merge, and new version
((remoteGossip merge localGossip) :+ vclockNode, true, stats)
// conflicting versions, merge
(remoteGossip merge localGossip, true, stats.incrementMergeCount)
case Some(0)
// same version
// TODO optimize talkback based on how the merged seen differs
(remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementSameCount)
(remoteGossip mergeSeen localGossip, !remoteGossip.seenByAddress(selfAddress), stats.incrementSameCount)
case Some(x) if x < 0
// local is newer
(localGossip, true, stats.incrementNewerCount)
case _
// remote is newer
(remoteGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementOlderCount)
(remoteGossip, !remoteGossip.seenByAddress(selfAddress), stats.incrementOlderCount)
}
stats = newStats
latestGossip = winningGossip seen selfAddress
// for all new joining nodes we remove them from the failure detector
(latestGossip.members -- localGossip.members).foreach {
node if (node.status == Joining) failureDetector.remove(node.address)
latestGossip.members foreach {
node if (node.status == Joining && !localGossip.members(node)) failureDetector.remove(node.address)
}
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
if (conflict) {
stats = stats.incrementMergeCount
log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip)
@ -613,7 +561,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
}
}
}
}
def mergeRate(count: Long): Double = (count * 1000.0) / GossipInterval.toMillis
@ -621,8 +568,6 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
* Initiates a new round of gossip.
*/
def gossip(): Unit = {
stats = stats.copy(mergeConflictCount = 0)
log.debug("Cluster Node [{}] - Initiating new round of gossip", selfAddress)
if (!isSingletonCluster && isAvailable) {
@ -1053,9 +998,7 @@ private[cluster] class OnMemberUpListener(callback: Runnable) extends Actor with
*/
private[cluster] case class ClusterStats(
receivedGossipCount: Long = 0L,
mergeConflictCount: Long = 0L,
mergeCount: Long = 0L,
mergeDetectedCount: Long = 0L,
sameCount: Long = 0L,
newerCount: Long = 0L,
olderCount: Long = 0L) {
@ -1063,15 +1006,9 @@ private[cluster] case class ClusterStats(
def incrementReceivedGossipCount(): ClusterStats =
copy(receivedGossipCount = receivedGossipCount + 1)
def incrementMergeConflictCount(): ClusterStats =
copy(mergeConflictCount = mergeConflictCount + 1)
def incrementMergeCount(): ClusterStats =
copy(mergeCount = mergeCount + 1)
def incrementMergeDetectedCount(): ClusterStats =
copy(mergeDetectedCount = mergeDetectedCount + 1)
def incrementSameCount(): ClusterStats =
copy(sameCount = sameCount + 1)
@ -1084,9 +1021,7 @@ private[cluster] case class ClusterStats(
def :+(that: ClusterStats): ClusterStats = {
ClusterStats(
this.receivedGossipCount + that.receivedGossipCount,
this.mergeConflictCount + that.mergeConflictCount,
this.mergeCount + that.mergeCount,
this.mergeDetectedCount + that.mergeDetectedCount,
this.sameCount + that.sameCount,
this.newerCount + that.newerCount,
this.olderCount + that.olderCount)
@ -1095,9 +1030,7 @@ private[cluster] case class ClusterStats(
def :-(that: ClusterStats): ClusterStats = {
ClusterStats(
this.receivedGossipCount - that.receivedGossipCount,
this.mergeConflictCount - that.mergeConflictCount,
this.mergeCount - that.mergeCount,
this.mergeDetectedCount - that.mergeDetectedCount,
this.sameCount - that.sameCount,
this.newerCount - that.newerCount,
this.olderCount - that.olderCount)

View file

@ -59,7 +59,6 @@ class ClusterSettings(val config: Config, val systemName: String) {
case id id
}
final val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability")
final val MaxGossipMergeRate: Double = cc.getDouble("max-gossip-merge-rate")
final val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS)
final val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel")
final val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled")

View file

@ -43,7 +43,7 @@ private[cluster] object Gossip {
* When a `Gossip` is received the version (vector clock) is used to determine if the
* received `Gossip` is newer or older than the current local `Gossip`. The received `Gossip`
* and local `Gossip` is merged in case of conflicting version, i.e. vector clocks without
* same history. When merged the seen table is cleared.
* same history.
*
* When a node is told by the user to leave the cluster the leader will move it to `Leaving`
* and then rebalance and repartition the cluster and start hand-off by migrating the actors
@ -100,7 +100,7 @@ private[cluster] case class Gossip(
* Map with the VectorClock (version) for the new gossip.
*/
def seen(address: Address): Gossip = {
if (hasSeen(address)) this
if (seenByAddress(address)) this
else this copy (overview = overview copy (seen = overview.seen + (address -> version)))
}
@ -116,24 +116,33 @@ private[cluster] case class Gossip(
/**
* Has this Gossip been seen by this address.
*/
def hasSeen(address: Address): Boolean = {
def seenByAddress(address: Address): Boolean = {
overview.seen.get(address).exists(_ == version)
}
private def mergeSeenTables(allowed: Set[Member], one: Map[Address, VectorClock], another: Map[Address, VectorClock]): Map[Address, VectorClock] = {
(Map.empty[Address, VectorClock] /: allowed) {
(merged, member)
val address = member.address
(one.get(address), another.get(address)) match {
case (None, None) merged
case (Some(v1), None) merged.updated(address, v1)
case (None, Some(v2)) merged.updated(address, v2)
case (Some(v1), Some(v2))
v1 tryCompareTo v2 match {
case None merged
case Some(x) if x > 0 merged.updated(address, v1)
case _ merged.updated(address, v2)
}
}
}
}
/**
* Merges the seen table of two Gossip instances.
*/
def mergeSeen(that: Gossip): Gossip = {
val mergedSeen = (overview.seen /: that.overview.seen) {
case (merged, (address, version))
val curr = merged.getOrElse(address, version)
if (curr > version)
merged + (address -> curr)
else
merged + (address -> version)
}
this copy (overview = overview copy (seen = mergedSeen))
}
def mergeSeen(that: Gossip): Gossip =
this copy (overview = overview copy (seen = mergeSeenTables(members, overview.seen, that.overview.seen)))
/**
* Merges two Gossip instances including membership tables, and the VectorClock histories.
@ -151,8 +160,8 @@ private[cluster] case class Gossip(
// and exclude unreachable
val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains)
// 4. fresh seen table
val mergedSeen = Map.empty[Address, VectorClock]
// 4. merge seen table
val mergedSeen = mergeSeenTables(mergedMembers, overview.seen, that.overview.seen)
Gossip(mergedMembers, GossipOverview(mergedSeen, mergedUnreachable), mergedVClock)
}
@ -164,29 +173,14 @@ private[cluster] case class Gossip(
* @return true if convergence have been reached and false if not
*/
def convergence: Boolean = {
val unreachable = overview.unreachable
val seen = overview.seen
// First check that:
// 1. we don't have any members that are unreachable, or
// 2. all unreachable members in the set have status DOWN
// Else we can't continue to check for convergence
// When that is done we check that all the entries in the 'seen' table have the same vector clock version
// and that all members exists in seen table
val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down }
def allMembersInSeen = members.forall(m seen.contains(m.address))
// When that is done we check that all members exists in the seen table and
// have the latest vector clock version
def seenSame: Boolean =
if (seen.isEmpty) {
// if both seen and members are empty, then every(no)body has seen the same thing
members.isEmpty
} else {
val values = seen.values
val seenHead = values.head
values.forall(_ == seenHead)
}
!hasUnreachable && allMembersInSeen && seenSame
overview.unreachable.forall(_.status == Down) && members.forall(m seenByAddress(m.address))
}
def isLeader(address: Address): Boolean = leader == Some(address)
@ -240,11 +234,3 @@ private[cluster] case class GossipOverview(
* Envelope adding a sender address to the gossip.
*/
private[cluster] case class GossipEnvelope(from: Address, gossip: Gossip, conversation: Boolean = true) extends ClusterMessage
/**
* INTERNAL API
* When conflicting versions of received and local [[akka.cluster.Gossip]] is detected
* it's forwarded to the leader for conflict resolution.
*/
private[cluster] case class GossipMergeConflict(a: GossipEnvelope, b: GossipEnvelope) extends ClusterMessage

View file

@ -84,7 +84,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
normal-throughput-duration = 30s
high-throughput-duration = 10s
supervision-duration = 10s
supervision-one-iteration = 1s
supervision-one-iteration = 2.5s
expected-test-duration = 600s
# actors are created in a tree structure defined
# by tree-width (number of children for each actor) and

View file

@ -42,7 +42,6 @@ class ClusterConfigSpec extends AkkaSpec {
JmxEnabled must be(true)
UseDispatcher must be(Dispatchers.DefaultDispatcherId)
GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001)
MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001)
SchedulerTickDuration must be(33 millis)
SchedulerTicksPerWheel must be(512)
MetricsEnabled must be(true)

View file

@ -79,18 +79,6 @@ class GossipSpec extends WordSpec with MustMatchers {
}
"start with fresh seen table after merge" in {
val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address)
val g2 = Gossip(members = SortedSet(a2, e2)).seen(a2.address).seen(e2.address)
val merged1 = g1 merge g2
merged1.overview.seen.isEmpty must be(true)
val merged2 = g2 merge g1
merged2.overview.seen.isEmpty must be(true)
}
"not have node in both members and unreachable" in intercept[IllegalArgumentException] {
Gossip(members = SortedSet(a1, b1), overview = GossipOverview(unreachable = Set(b2)))
}
@ -121,17 +109,17 @@ class GossipSpec extends WordSpec with MustMatchers {
keys.length must be(4)
keys.toSet must be(Set(a1.address, b1.address, c1.address, d1.address))
merged hasSeen (a1.address) must be(true)
merged hasSeen (b1.address) must be(false)
merged hasSeen (c1.address) must be(true)
merged hasSeen (d1.address) must be(true)
merged hasSeen (e1.address) must be(false)
merged seenByAddress (a1.address) must be(true)
merged seenByAddress (b1.address) must be(false)
merged seenByAddress (c1.address) must be(true)
merged seenByAddress (d1.address) must be(true)
merged seenByAddress (e1.address) must be(false)
merged.overview.seen(b1.address) must be(g1.version)
}
checkMerged(g3 mergeSeen g2)
checkMerged(g2 mergeSeen g3)
checkMerged(g3 merge g2)
checkMerged(g2 merge g3)
}
}
}