Merge pull request #1222 from akka/wip-3115-clusterstressspec-failure-ban

Merge gossip seen table when versions are the same. See #3115
This commit is contained in:
Björn Antonsson 2013-03-06 22:54:55 -08:00
commit 1680702f5d
4 changed files with 164 additions and 37 deletions

View file

@ -554,7 +554,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// leader handles merge conflicts, or when they have different views of how is leader
val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader
val conflict = remoteGossip.version <> localGossip.version
val comparison = remoteGossip.version tryCompareTo localGossip.version
val conflict = comparison.isEmpty
if (conflict && !handleMerge) {
// delegate merge resolution to leader to reduce number of simultaneous resolves,
@ -573,11 +574,23 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
} else {
val winningGossip =
if (conflict) (remoteGossip merge localGossip) :+ vclockNode // conflicting versions, merge, and new version
else if (remoteGossip.version < localGossip.version) localGossip // local gossip is newer
else remoteGossip // remote gossip is newer
val (winningGossip, talkback, newStats) = comparison match {
case None
// conflicting versions, merge, and new version
((remoteGossip merge localGossip) :+ vclockNode, true, stats)
case Some(0)
// same version
// TODO optimize talkback based on how the merged seen differs
(remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementSameCount)
case Some(x) if x < 0
// local is newer
(localGossip, true, stats.incrementNewerCount)
case _
// remote is newer
(remoteGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementOlderCount)
}
stats = newStats
latestGossip = winningGossip seen selfAddress
// for all new joining nodes we remove them from the failure detector
@ -597,8 +610,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
stats = stats.incrementReceivedGossipCount
publish(latestGossip)
if (envelope.conversation &&
(conflict || (winningGossip ne remoteGossip) || (latestGossip ne remoteGossip))) {
if (envelope.conversation && talkback) {
// send back gossip to sender when sender had different view, i.e. merge, or sender had
// older or sender had newer
gossipTo(from)
@ -1040,7 +1052,10 @@ private[cluster] case class ClusterStats(
receivedGossipCount: Long = 0L,
mergeConflictCount: Long = 0L,
mergeCount: Long = 0L,
mergeDetectedCount: Long = 0L) {
mergeDetectedCount: Long = 0L,
sameCount: Long = 0L,
newerCount: Long = 0L,
olderCount: Long = 0L) {
def incrementReceivedGossipCount(): ClusterStats =
copy(receivedGossipCount = receivedGossipCount + 1)
@ -1053,4 +1068,36 @@ private[cluster] case class ClusterStats(
def incrementMergeDetectedCount(): ClusterStats =
copy(mergeDetectedCount = mergeDetectedCount + 1)
def incrementSameCount(): ClusterStats =
copy(sameCount = sameCount + 1)
def incrementNewerCount(): ClusterStats =
copy(newerCount = newerCount + 1)
def incrementOlderCount(): ClusterStats =
copy(olderCount = olderCount + 1)
def :+(that: ClusterStats): ClusterStats = {
ClusterStats(
this.receivedGossipCount + that.receivedGossipCount,
this.mergeConflictCount + that.mergeConflictCount,
this.mergeCount + that.mergeCount,
this.mergeDetectedCount + that.mergeDetectedCount,
this.sameCount + that.sameCount,
this.newerCount + that.newerCount,
this.olderCount + that.olderCount)
}
def :-(that: ClusterStats): ClusterStats = {
ClusterStats(
this.receivedGossipCount - that.receivedGossipCount,
this.mergeConflictCount - that.mergeConflictCount,
this.mergeCount - that.mergeCount,
this.mergeDetectedCount - that.mergeDetectedCount,
this.sameCount - that.sameCount,
this.newerCount - that.newerCount,
this.olderCount - that.olderCount)
}
}

View file

@ -100,7 +100,7 @@ private[cluster] case class Gossip(
* Map with the VectorClock (version) for the new gossip.
*/
def seen(address: Address): Gossip = {
if (overview.seen.contains(address) && overview.seen(address) == version) this
if (hasSeen(address)) this
else this copy (overview = overview copy (seen = overview.seen + (address -> version)))
}
@ -113,6 +113,28 @@ private[cluster] case class Gossip(
}.toSet
}
/**
* Has this Gossip been seen by this address.
*/
def hasSeen(address: Address): Boolean = {
overview.seen.get(address).exists(_ == version)
}
/**
* Merges the seen table of two Gossip instances.
*/
def mergeSeen(that: Gossip): Gossip = {
val mergedSeen = (overview.seen /: that.overview.seen) {
case (merged, (address, version))
val curr = merged.getOrElse(address, version)
if (curr > version)
merged + (address -> curr)
else
merged + (address -> version)
}
this copy (overview = overview copy (seen = mergedSeen))
}
/**
* Merges two Gossip instances including membership tables, and the VectorClock histories.
*/

View file

@ -213,7 +213,11 @@ object StressMultiJvmSpec extends MultiNodeConfig {
var nodeMetrics = Set.empty[NodeMetrics]
var phiValuesObservedByNode = {
import akka.cluster.Member.addressOrdering
immutable.SortedMap.empty[Address, Set[PhiValue]]
immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]]
}
var clusterStatsObservedByNode = {
import akka.cluster.Member.addressOrdering
immutable.SortedMap.empty[Address, ClusterStats]
}
import context.dispatcher
@ -231,13 +235,14 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def receive = {
case ClusterMetricsChanged(clusterMetrics) nodeMetrics = clusterMetrics
case PhiResult(from, phiValues) phiValuesObservedByNode += from -> phiValues
case StatsResult(from, stats) clusterStatsObservedByNode += from -> stats
case ReportTick
log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}")
log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}")
case r: ClusterResult
results :+= r
if (results.size == expectedResults) {
val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats)
log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}")
log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}")
reportTo foreach { _ ! aggregated }
context stop self
}
@ -247,13 +252,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def maxDuration = results.map(_.duration).max
def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()) { (acc, s)
ClusterStats(
receivedGossipCount = acc.receivedGossipCount + s.receivedGossipCount,
mergeConflictCount = acc.mergeConflictCount + s.mergeConflictCount,
mergeCount = acc.mergeCount + s.mergeCount,
mergeDetectedCount = acc.mergeDetectedCount + s.mergeDetectedCount)
}
def totalClusterStats = results.foldLeft(ClusterStats()){_ :+ _.clusterStats}
def formatMetrics: String = {
import akka.cluster.Member.addressOrdering
@ -287,11 +286,11 @@ object StressMultiJvmSpec extends MultiNodeConfig {
import akka.cluster.Member.addressOrdering
val lines =
for {
(monitor, phiValues) phiValuesObservedByNode.toSeq
phi phiValues.toSeq.sortBy(_.address)
(monitor, phiValues) phiValuesObservedByNode
phi phiValues
} yield formatPhiLine(monitor, phi.address, phi)
(formatPhiHeader +: lines).mkString("\n")
lines.mkString(formatPhiHeader + "\n", "\n", "")
}
}
@ -300,6 +299,9 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String =
s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}"
def formatStats: String =
(clusterStatsObservedByNode map { case (monitor, stats) => s"${monitor}\t${stats}" }).
mkString("ClusterStats\n", "\n", "")
}
/**
@ -319,10 +321,10 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def formatHistory: String =
(formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n")
def formatHistoryHeader: String = "title\tduration (ms)\tgossip count\tmerge count"
def formatHistoryHeader: String = "title\tduration (ms)\tcluster stats"
def formatHistoryLine(result: AggregatedClusterResult): String =
s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats.receivedGossipCount}\t${result.clusterStats.mergeCount}"
s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}"
}
@ -368,7 +370,8 @@ object StressMultiJvmSpec extends MultiNodeConfig {
math.max(previous.max, φ))
}
}
reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiByNode.values.toSet) }
val phiSet = immutable.SortedSet.empty[PhiValue] ++ phiByNode.values
reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiSet) }
case state: CurrentClusterState nodes = state.members.map(_.address)
case memberEvent: MemberEvent nodes += memberEvent.member.address
case ReportTo(ref) reportTo = ref
@ -381,6 +384,31 @@ object StressMultiJvmSpec extends MultiNodeConfig {
}
}
class StatsObserver extends Actor {
val cluster = Cluster(context.system)
var reportTo: Option[ActorRef] = None
var startStats = cluster.readView.latestStats
import context.dispatcher
val checkStatsTask = context.system.scheduler.schedule(
1.second, 1.second, self, StatsTick)
override def postStop(): Unit = {
checkStatsTask.cancel()
super.postStop()
}
def receive = {
case StatsTick
val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats)
reportTo foreach { _ ! res }
case ReportTo(ref)
reportTo = ref
case Reset
startStats = cluster.readView.latestStats
}
}
/**
* Master of routers
*
@ -565,9 +593,14 @@ object StressMultiJvmSpec extends MultiNodeConfig {
case object RetryTick
case object ReportTick
case object PhiTick
case class PhiResult(from: Address, phiValues: Set[PhiValue])
case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double)
case class PhiResult(from: Address, phiValues: immutable.SortedSet[PhiValue])
case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) extends Ordered[PhiValue] {
import akka.cluster.Member.addressOrdering
def compare(that: PhiValue) = addressOrdering.compare(this.address, that.address)
}
case class ReportTo(ref: Option[ActorRef])
case object StatsTick
case class StatsResult(from: Address, stats: ClusterStats)
type JobId = Int
trait Job { def id: JobId }
@ -622,6 +655,7 @@ abstract class StressSpec
sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*")))
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*PhiResult.*")))
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*SendBatch.*")))
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*ClusterStats.*")))
}
val seedNodes = roles.take(numberOfSeedNodes)
@ -645,6 +679,8 @@ abstract class StressSpec
enterBarrier("result-aggregator-created-" + step)
runOn(roles.take(nbrUsedRoles): _*) {
phiObserver ! ReportTo(Some(clusterResultAggregator))
statsObserver ! Reset
statsObserver ! ReportTo(Some(clusterResultAggregator))
}
}
@ -654,6 +690,8 @@ abstract class StressSpec
lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver")
lazy val statsObserver = system.actorOf(Props[StatsObserver], "statsObserver")
def awaitClusterResult: Unit = {
runOn(roles.head) {
val r = clusterResultAggregator
@ -789,14 +827,9 @@ abstract class StressSpec
val returnValue = thunk
val duration = (System.nanoTime - startTime).nanos
val latestStats = clusterView.latestStats
val clusterStats = ClusterStats(
receivedGossipCount = latestStats.receivedGossipCount - startStats.receivedGossipCount,
mergeConflictCount = latestStats.mergeConflictCount - startStats.mergeConflictCount,
mergeCount = latestStats.mergeCount - startStats.mergeCount,
mergeDetectedCount = latestStats.mergeDetectedCount - startStats.mergeDetectedCount)
clusterResultAggregator ! ClusterResult(cluster.selfAddress, duration, clusterStats)
clusterResultAggregator !
ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats :- startStats)
returnValue
}
@ -813,6 +846,7 @@ abstract class StressSpec
val t = title + " round " + counter
runOn(usedRoles: _*) {
phiObserver ! Reset
statsObserver ! Reset
}
createResultAggregator(t, expectedResults = nbrUsedRoles, includeInHistory = true)
val (nextAS, nextAddresses) = within(loopDuration) {
@ -852,6 +886,7 @@ abstract class StressSpec
runOn(usedRoles: _*) {
awaitUpConvergence(nbrUsedRoles, timeout = remaining)
phiObserver ! Reset
statsObserver ! Reset
}
}
enterBarrier("join-remove-shutdown-" + step)

View file

@ -90,8 +90,8 @@ class GossipSpec extends WordSpec with MustMatchers {
}
"start with fresh seen table after merge" in {
val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(a1.address)
val g2 = Gossip(members = SortedSet(a2, e2)).seen(e2.address).seen(e2.address)
val g1 = Gossip(members = SortedSet(a1, e1)).seen(a1.address).seen(e1.address)
val g2 = Gossip(members = SortedSet(a2, e2)).seen(a2.address).seen(e2.address)
val merged1 = g1 merge g2
merged1.overview.seen.isEmpty must be(true)
@ -120,5 +120,28 @@ class GossipSpec extends WordSpec with MustMatchers {
Gossip(members = SortedSet(c3)).leader must be(Some(c3.address))
}
"merge seen table correctly" in {
val vclockNode = VectorClock.Node("something")
val g1 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.address).seen(b1.address)
val g2 = (Gossip(members = SortedSet(a1, b1, c1, d1)) :+ vclockNode).seen(a1.address).seen(c1.address)
val g3 = (g1 copy (version = g2.version)).seen(d1.address)
def checkMerged(merged: Gossip) {
val keys = merged.overview.seen.keys.toSeq
keys.length must be(4)
keys.toSet must be(Set(a1.address, b1.address, c1.address, d1.address))
merged hasSeen (a1.address) must be(true)
merged hasSeen (b1.address) must be(false)
merged hasSeen (c1.address) must be(true)
merged hasSeen (d1.address) must be(true)
merged hasSeen (e1.address) must be(false)
merged.overview.seen(b1.address) must be(g1.version)
}
checkMerged(g3 mergeSeen g2)
checkMerged(g2 mergeSeen g3)
}
}
}