Fixes according to review. See #3115

This commit is contained in:
Björn Antonsson 2013-03-06 13:11:46 +01:00
parent fad4289b1b
commit 78c3ca359a
3 changed files with 29 additions and 34 deletions

View file

@ -555,7 +555,7 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
// leader handles merge conflicts, or when they have different views of how is leader // leader handles merge conflicts, or when they have different views of how is leader
val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader val handleMerge = localGossip.leader == Some(selfAddress) || localGossip.leader != remoteGossip.leader
val comparison = remoteGossip.version tryCompareTo localGossip.version val comparison = remoteGossip.version tryCompareTo localGossip.version
val conflict = !comparison.isDefined val conflict = comparison.isEmpty
if (conflict && !handleMerge) { if (conflict && !handleMerge) {
// delegate merge resolution to leader to reduce number of simultaneous resolves, // delegate merge resolution to leader to reduce number of simultaneous resolves,
@ -574,25 +574,23 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
} else { } else {
val (winningGossip, talkback) = comparison match { val (winningGossip, talkback, newStats) = comparison match {
case None case None
// conflicting versions, merge, and new version // conflicting versions, merge, and new version
((remoteGossip merge localGossip) :+ vclockNode, true) ((remoteGossip merge localGossip) :+ vclockNode, true, stats)
case Some(0) case Some(0)
// same version // same version
stats = stats.incrementSameCount
// TODO optimize talkback based on how the merged seen differs // TODO optimize talkback based on how the merged seen differs
(remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress)) (remoteGossip mergeSeen localGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementSameCount)
case Some(x) if (x < 0) case Some(x) if x < 0
// local is newer // local is newer
stats = stats.incrementNewerCount (localGossip, true, stats.incrementNewerCount)
(localGossip, true)
case _ case _
// remote is newer // remote is newer
stats = stats.incrementOlderCount (remoteGossip, !remoteGossip.hasSeen(selfAddress), stats.incrementOlderCount)
(remoteGossip, !remoteGossip.hasSeen(selfAddress))
} }
stats = newStats
latestGossip = winningGossip seen selfAddress latestGossip = winningGossip seen selfAddress
// for all new joining nodes we remove them from the failure detector // for all new joining nodes we remove them from the failure detector
@ -1080,7 +1078,7 @@ private[cluster] case class ClusterStats(
def incrementOlderCount(): ClusterStats = def incrementOlderCount(): ClusterStats =
copy(olderCount = olderCount + 1) copy(olderCount = olderCount + 1)
def +(that: ClusterStats): ClusterStats = { def :+(that: ClusterStats): ClusterStats = {
ClusterStats( ClusterStats(
this.receivedGossipCount + that.receivedGossipCount, this.receivedGossipCount + that.receivedGossipCount,
this.mergeConflictCount + that.mergeConflictCount, this.mergeConflictCount + that.mergeConflictCount,
@ -1091,7 +1089,7 @@ private[cluster] case class ClusterStats(
this.olderCount + that.olderCount) this.olderCount + that.olderCount)
} }
def -(that: ClusterStats): ClusterStats = { def :-(that: ClusterStats): ClusterStats = {
ClusterStats( ClusterStats(
this.receivedGossipCount - that.receivedGossipCount, this.receivedGossipCount - that.receivedGossipCount,
this.mergeConflictCount - that.mergeConflictCount, this.mergeConflictCount - that.mergeConflictCount,

View file

@ -117,7 +117,7 @@ private[cluster] case class Gossip(
* Has this Gossip been seen by this address. * Has this Gossip been seen by this address.
*/ */
def hasSeen(address: Address): Boolean = { def hasSeen(address: Address): Boolean = {
overview.seen.get(address).fold { false } { _ == version } overview.seen.get(address).exists(_ == version)
} }
/** /**

View file

@ -213,7 +213,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
var nodeMetrics = Set.empty[NodeMetrics] var nodeMetrics = Set.empty[NodeMetrics]
var phiValuesObservedByNode = { var phiValuesObservedByNode = {
import akka.cluster.Member.addressOrdering import akka.cluster.Member.addressOrdering
immutable.SortedMap.empty[Address, Set[PhiValue]] immutable.SortedMap.empty[Address, immutable.SortedSet[PhiValue]]
} }
var clusterStatsObservedByNode = { var clusterStatsObservedByNode = {
import akka.cluster.Member.addressOrdering import akka.cluster.Member.addressOrdering
@ -235,7 +235,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def receive = { def receive = {
case ClusterMetricsChanged(clusterMetrics) nodeMetrics = clusterMetrics case ClusterMetricsChanged(clusterMetrics) nodeMetrics = clusterMetrics
case PhiResult(from, phiValues) phiValuesObservedByNode += from -> phiValues case PhiResult(from, phiValues) phiValuesObservedByNode += from -> phiValues
case StatsResult(from, stats) => clusterStatsObservedByNode += from -> stats case StatsResult(from, stats) clusterStatsObservedByNode += from -> stats
case ReportTick case ReportTick
log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}")
case r: ClusterResult case r: ClusterResult
@ -252,7 +252,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def maxDuration = results.map(_.duration).max def maxDuration = results.map(_.duration).max
def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()){_ + _} def totalClusterStats = results.foldLeft(ClusterStats()){_ :+ _.clusterStats}
def formatMetrics: String = { def formatMetrics: String = {
import akka.cluster.Member.addressOrdering import akka.cluster.Member.addressOrdering
@ -286,11 +286,11 @@ object StressMultiJvmSpec extends MultiNodeConfig {
import akka.cluster.Member.addressOrdering import akka.cluster.Member.addressOrdering
val lines = val lines =
for { for {
(monitor, phiValues) phiValuesObservedByNode.toSeq (monitor, phiValues) phiValuesObservedByNode
phi phiValues.toSeq.sortBy(_.address) phi phiValues
} yield formatPhiLine(monitor, phi.address, phi) } yield formatPhiLine(monitor, phi.address, phi)
(formatPhiHeader +: lines).mkString("\n") lines.mkString(formatPhiHeader + "\n", "\n", "")
} }
} }
@ -299,16 +299,9 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String =
s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}"
def formatStats: String = { def formatStats: String =
if (clusterStatsObservedByNode.isEmpty) "" (clusterStatsObservedByNode map { case (monitor, stats) => s"${monitor}\t${stats}" }).
else { mkString("ClusterStats\n", "\n", "")
val lines =
for {
(monitor, stats) clusterStatsObservedByNode.toSeq
} yield s"${monitor}\t${stats}"
("ClusterStats" +: lines).mkString("\n")
}
}
} }
/** /**
@ -377,7 +370,8 @@ object StressMultiJvmSpec extends MultiNodeConfig {
math.max(previous.max, φ)) math.max(previous.max, φ))
} }
} }
reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiByNode.values.toSet) } val phiSet = immutable.SortedSet.empty[PhiValue] ++ phiByNode.values
reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiSet) }
case state: CurrentClusterState nodes = state.members.map(_.address) case state: CurrentClusterState nodes = state.members.map(_.address)
case memberEvent: MemberEvent nodes += memberEvent.member.address case memberEvent: MemberEvent nodes += memberEvent.member.address
case ReportTo(ref) reportTo = ref case ReportTo(ref) reportTo = ref
@ -406,7 +400,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def receive = { def receive = {
case StatsTick case StatsTick
val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats - startStats) val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats)
reportTo foreach { _ ! res } reportTo foreach { _ ! res }
case ReportTo(ref) case ReportTo(ref)
reportTo = ref reportTo = ref
@ -599,8 +593,11 @@ object StressMultiJvmSpec extends MultiNodeConfig {
case object RetryTick case object RetryTick
case object ReportTick case object ReportTick
case object PhiTick case object PhiTick
case class PhiResult(from: Address, phiValues: Set[PhiValue]) case class PhiResult(from: Address, phiValues: immutable.SortedSet[PhiValue])
case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) extends Ordered[PhiValue] {
import akka.cluster.Member.addressOrdering
def compare(that: PhiValue) = addressOrdering.compare(this.address, that.address)
}
case class ReportTo(ref: Option[ActorRef]) case class ReportTo(ref: Option[ActorRef])
case object StatsTick case object StatsTick
case class StatsResult(from: Address, stats: ClusterStats) case class StatsResult(from: Address, stats: ClusterStats)
@ -831,7 +828,7 @@ abstract class StressSpec
val returnValue = thunk val returnValue = thunk
clusterResultAggregator ! clusterResultAggregator !
ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats - startStats) ClusterResult(cluster.selfAddress, (System.nanoTime - startTime).nanos, cluster.readView.latestStats :- startStats)
returnValue returnValue
} }