Merge gossip seen table when versions are the same. See #3115

This commit is contained in:
Björn Antonsson 2013-03-05 12:49:35 +01:00
parent 370d6451c7
commit fad4289b1b
4 changed files with 162 additions and 30 deletions

View file

@ -554,7 +554,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// leader handles merge conflicts, or when they have different views of how is leader // leader handles merge conflicts, or when they have different views of how is leader
val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader
val conflict = remoteGossip.version <> localGossip.version val comparison = remoteGossip.version tryCompareTo localGossip.version
val conflict = !comparison.isDefined
if (conflict && !handleMerge) { if (conflict && !handleMerge) {
// delegate merge resolution to leader to reduce number of simultaneous resolves, // delegate merge resolution to leader to reduce number of simultaneous resolves,
@ -573,10 +574,24 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
} else { } else {
val winningGossip = val (winningGossip, talkback) = comparison match {
if (conflict) (remoteGossip merge localGossip) :+ vclockNode // conflicting versions, merge, and new version case None
else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer // conflicting versions, merge, and new version
else remoteGossip // remote gossip is newer ((remoteGossip merge localGossip) :+ vclockNode, true)
case Some(0)
// same version
stats = stats.incrementSameCount
// TODO optimize talkback based on how the merged seen differs
(remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress))
case Some(x) if (x < 0)
// local is newer
stats = stats.incrementNewerCount
(localGossip, true)
case _
// remote is newer
stats = stats.incrementOlderCount
(remoteGossip, !remoteGossip.hasSeen(selfAddress))
}
latestGossip = winningGossip seen selfAddress latestGossip = winningGossip seen selfAddress
@ -597,8 +612,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
stats = stats.incrementReceivedGossipCount stats = stats.incrementReceivedGossipCount
publish(latestGossip) publish(latestGossip)
if (envelope.conversation && if (envelope.conversation && talkback) {
(conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) {
// send back gossip to sender when sender had different view, i.e. merge, or sender had // send back gossip to sender when sender had different view, i.e. merge, or sender had
// older or sender had newer // older or sender had newer
gossipTo(from) gossipTo(from)
@ -1040,7 +1054,10 @@ private[cluster] case class ClusterStats(
receivedGossipCount: Long = 0L, receivedGossipCount: Long = 0L,
mergeConflictCount: Long = 0L, mergeConflictCount: Long = 0L,
mergeCount: Long = 0L, mergeCount: Long = 0L,
mergeDetectedCount: Long = 0L) { mergeDetectedCount: Long = 0L,
sameCount: Long = 0L,
newerCount: Long = 0L,
olderCount: Long = 0L) {
def incrementReceivedGossipCount(): ClusterStats = def incrementReceivedGossipCount(): ClusterStats =
copy(receivedGossipCount = receivedGossipCount + 1) copy(receivedGossipCount = receivedGossipCount + 1)
@ -1053,4 +1070,36 @@ private[cluster] case class ClusterStats(
def incrementMergeDetectedCount(): ClusterStats = def incrementMergeDetectedCount(): ClusterStats =
copy(mergeDetectedCount = mergeDetectedCount + 1) copy(mergeDetectedCount = mergeDetectedCount + 1)
def incrementSameCount(): ClusterStats =
copy(sameCount = sameCount + 1)
def incrementNewerCount(): ClusterStats =
copy(newerCount = newerCount + 1)
def incrementOlderCount(): ClusterStats =
copy(olderCount = olderCount + 1)
def +(that: ClusterStats): ClusterStats = {
ClusterStats(
this.receivedGossipCount + that.receivedGossipCount,
this.mergeConflictCount + that.mergeConflictCount,
this.mergeCount + that.mergeCount,
this.mergeDetectedCount + that.mergeDetectedCount,
this.sameCount + that.sameCount,
this.newerCount + that.newerCount,
this.olderCount + that.olderCount)
}
def -(that: ClusterStats): ClusterStats = {
ClusterStats(
this.receivedGossipCount - that.receivedGossipCount,
this.mergeConflictCount - that.mergeConflictCount,
this.mergeCount - that.mergeCount,
this.mergeDetectedCount - that.mergeDetectedCount,
this.sameCount - that.sameCount,
this.newerCount - that.newerCount,
this.olderCount - that.olderCount)
}
} }

View file

@ -100,7 +100,7 @@ private[cluster] case class Gossip(
* Map with the VectorClock (version) for the new gossip. * Map with the VectorClock (version) for the new gossip.
*/ */
def seen(address: Address): Gossip = { def seen(address: Address): Gossip = {
if (overview.seen.contains(address) && overview.seen(address) == version) this if (hasSeen(address)) this
else this copy (overview = overview copy (seen = overview.seen + (address -> version))) else this copy (overview = overview copy (seen = overview.seen + (address -> version)))
} }
@ -113,6 +113,28 @@ private[cluster] case class Gossip(
}.toSet }.toSet
} }
/**
* Has this Gossip been seen by this address.
*/
def hasSeen(address: Address): Boolean = {
overview.seen.get(address).fold { false } { _ == version }
}
/**
* Merges the seen table of two Gossip instances.
*/
def mergeSeen(that: Gossip): Gossip = {
val mergedSeen = (overview.seen /: that.overview.seen) {
case (merged, (address, version))
val curr = merged.getOrElse(address, version)
if (curr > version)
merged + (address -> curr)
else
merged + (address -> version)
}
this copy (overview = overview copy (seen = mergedSeen))
}
/** /**
* Merges two Gossip instances including membership tables, and the VectorClock histories. * Merges two Gossip instances including membership tables, and the VectorClock histories.
*/ */

View file

@ -215,6 +215,10 @@ object StressMultiJvmSpec extends MultiNodeConfig {
import akka.cluster.Member.addressOrdering import akka.cluster.Member.addressOrdering
immutable.SortedMap.empty[Address, Set[PhiValue]] immutable.SortedMap.empty[Address, Set[PhiValue]]
} }
var clusterStatsObservedByNode = {
import akka.cluster.Member.addressOrdering
immutable.SortedMap.empty[Address, ClusterStats]
}
import context.dispatcher import context.dispatcher
val reportMetricsTask = context.system.scheduler.schedule( val reportMetricsTask = context.system.scheduler.schedule(
@ -231,13 +235,14 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def receive = { def receive = {
case ClusterMetricsChanged(clusterMetrics) nodeMetrics = clusterMetrics case ClusterMetricsChanged(clusterMetrics) nodeMetrics = clusterMetrics
case PhiResult(from, phiValues) phiValuesObservedByNode += from -> phiValues case PhiResult(from, phiValues) phiValuesObservedByNode += from -> phiValues
case StatsResult(from, stats) => clusterStatsObservedByNode += from -> stats
case ReportTick case ReportTick
log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}") log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}")
case r: ClusterResult case r: ClusterResult
results :+= r results :+= r
if (results.size == expectedResults) { if (results.size == expectedResults) {
val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats) val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats)
log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}") log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}")
reportTo foreach { _ ! aggregated } reportTo foreach { _ ! aggregated }
context stop self context stop self
} }
@ -247,13 +252,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def maxDuration = results.map(_.duration).max def maxDuration = results.map(_.duration).max
def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()) { (acc, s) def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()){_ + _}
ClusterStats(
receivedGossipCount = acc.receivedGossipCount + s.receivedGossipCount,
mergeConflictCount = acc.mergeConflictCount + s.mergeConflictCount,
mergeCount = acc.mergeCount + s.mergeCount,
mergeDetectedCount = acc.mergeDetectedCount + s.mergeDetectedCount)
}
def formatMetrics: String = { def formatMetrics: String = {
import akka.cluster.Member.addressOrdering import akka.cluster.Member.addressOrdering
@ -300,6 +299,16 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String =
s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}"
def formatStats: String = {
if (clusterStatsObservedByNode.isEmpty) ""
else {
val lines =
for {
(monitor, stats) clusterStatsObservedByNode.toSeq
} yield s"${monitor}\t${stats}"
("ClusterStats" +: lines).mkString("\n")
}
}
} }
/** /**
@ -319,10 +328,10 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def formatHistory: String = def formatHistory: String =
(formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n") (formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n")
def formatHistoryHeader: String = "title\tduration (ms)\tgossip count\tmerge count" def formatHistoryHeader: String = "title\tduration (ms)\tcluster stats"
def formatHistoryLine(result: AggregatedClusterResult): String = def formatHistoryLine(result: AggregatedClusterResult): String =
s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats.receivedGossipCount}\t${result.clusterStats.mergeCount}" s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}"
} }
@ -381,6 +390,31 @@ object StressMultiJvmSpec extends MultiNodeConfig {
} }
} }
class StatsObserver extends Actor {
val cluster = Cluster(context.system)
var reportTo: Option[ActorRef] = None
var startStats = cluster.readView.latestStats
import context.dispatcher
val checkStatsTask = context.system.scheduler.schedule(
1.second, 1.second, self, StatsTick)
override def postStop(): Unit = {
checkStatsTask.cancel()
super.postStop()
}
def receive = {
case StatsTick
val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats - startStats)
reportTo foreach { _ ! res }
case ReportTo(ref)
reportTo = ref
case Reset
startStats = cluster.readView.latestStats
}
}
/** /**
* Master of routers * Master of routers
* *
@ -568,6 +602,8 @@ object StressMultiJvmSpec extends MultiNodeConfig {
case class PhiResult(from: Address, phiValues: Set[PhiValue]) case class PhiResult(from: Address, phiValues: Set[PhiValue])
case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double)
case class ReportTo(ref: Option[ActorRef]) case class ReportTo(ref: Option[ActorRef])
case object StatsTick
case class StatsResult(from: Address, stats: ClusterStats)
type JobId = Int type JobId = Int
trait Job { def id: JobId } trait Job { def id: JobId }
@ -622,6 +658,7 @@ abstract class StressSpec
sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*")))
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*PhiResult.*"))) sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*PhiResult.*")))
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*SendBatch.*"))) sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*SendBatch.*")))
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*ClusterStats.*")))
} }
val seedNodes = roles.take(numberOfSeedNodes) val seedNodes = roles.take(numberOfSeedNodes)
@ -645,6 +682,8 @@ abstract class StressSpec
enterBarrier("result-aggregator-created-" + step) enterBarrier("result-aggregator-created-" + step)
runOn(roles.take(nbrUsedRoles): _*) { runOn(roles.take(nbrUsedRoles): _*) {
phiObserver ! ReportTo(Some(clusterResultAggregator)) phiObserver ! ReportTo(Some(clusterResultAggregator))
statsObserver ! Reset
statsObserver ! ReportTo(Some(clusterResultAggregator))
} }
} }
@ -654,6 +693,8 @@ abstract class StressSpec
lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver") lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver")
lazy val statsObserver = system.actorOf(Props[StatsObserver], "statsObserver")
def awaitClusterResult: Unit = { def awaitClusterResult: Unit = {
runOn(roles.head) { runOn(roles.head) {
val r = clusterResultAggregator val r = clusterResultAggregator
@ -789,14 +830,9 @@ abstract class StressSpec
val returnValue = thunk val returnValue = thunk
val duration = (System.nanoTime - startTime).nanos clusterResultAggregator !
val latestStats = clusterView.latestStats ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats - startStats)
val clusterStats = ClusterStats(
receivedGossipCount = latestStats.receivedGossipCount - startStats.receivedGossipCount,
mergeConflictCount = latestStats.mergeConflictCount - startStats.mergeConflictCount,
mergeCount = latestStats.mergeCount - startStats.mergeCount,
mergeDetectedCount = latestStats.mergeDetectedCount - startStats.mergeDetectedCount)
clusterResultAggregator ! ClusterResult(cluster.selfAddress, duration, clusterStats)
returnValue returnValue
} }
@ -813,6 +849,7 @@ abstract class StressSpec
val t = title + " round " + counter val t = title + " round " + counter
runOn(usedRoles: _*) { runOn(usedRoles: _*) {
phiObserver ! Reset phiObserver ! Reset
statsObserver ! Reset
} }
createResultAggregator(t, expectedResults = nbrUsedRoles, includeInHistory = true) createResultAggregator(t, expectedResults = nbrUsedRoles, includeInHistory = true)
val (nextAS, nextAddresses) = within(loopDuration) { val (nextAS, nextAddresses) = within(loopDuration) {
@ -852,6 +889,7 @@ abstract class StressSpec
runOn(usedRoles: _*) { runOn(usedRoles: _*) {
awaitUpConvergence(nbrUsedRoles, timeout = remaining) awaitUpConvergence(nbrUsedRoles, timeout = remaining)
phiObserver ! Reset phiObserver ! Reset
statsObserver ! Reset
} }
} }
enterBarrier("join-remove-shutdown-" + step) enterBarrier("join-remove-shutdown-" + step)

View file

@ -90,8 +90,8 @@ class GossipSpec extends WordSpec with MustMatchers {
} }
"start with fresh seen table after merge" in { "start with fresh seen table after merge" in {
val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(a1.address) val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address)
val g2 = Gossip(members = SortedSet(a2, e2)).seen(e2.address).seen(e2.address) val g2 = Gossip(members = SortedSet(a2, e2)).seen(a2.address).seen(e2.address)
val merged1 = g1 merge g2 val merged1 = g1 merge g2
merged1.overview.seen.isEmpty must be(true) merged1.overview.seen.isEmpty must be(true)
@ -120,5 +120,28 @@ class GossipSpec extends WordSpec with MustMatchers {
Gossip(members = SortedSet(c3)).leader must be(Some(c3.address)) Gossip(members = SortedSet(c3)).leader must be(Some(c3.address))
} }
"merge seen table correctly" in {
val vclockNode = VectorClock.Node("something")
val g1 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.address).seen(b1.address)
val g2 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.address).seen(c1.address)
val g3 = (g1 copy (version = g2.version)).seen(d1.address)
def checkMerged(merged: Gossip) {
val keys = merged.overview.seen.keys.toSeq
keys.length must be(4)
keys.toSet must be(Set(a1.address, b1.address, c1.address, d1.address))
merged hasSeen (a1.address) must be(true)
merged hasSeen (b1.address) must be(false)
merged hasSeen (c1.address) must be(true)
merged hasSeen (d1.address) must be(true)
merged hasSeen (e1.address) must be(false)
merged.overview.seen(b1.address) must be(g1.version)
}
checkMerged(g3 mergeSeen g2)
checkMerged(g2 mergeSeen g3)
}
} }
} }