Fixes according review. See #3076
This commit is contained in:
parent
386bf87f0e
commit
7ed6b3d4ee
6 changed files with 19 additions and 26 deletions
|
|
@ -65,10 +65,6 @@ akka {
|
|||
# Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always.
|
||||
gossip-different-view-probability = 0.8
|
||||
|
||||
# Limit number of merge conflicts per second that are handled. If the limit is
|
||||
# exceeded the conflicting gossip messages are dropped and will reappear later.
|
||||
max-gossip-merge-rate = 5.0
|
||||
|
||||
# Settings for the Phi accrual failure detector (http://ddg.jaist.ac.jp/pub/HDY+04.pdf
|
||||
# [Hayashibara et al]) used by the cluster subsystem to detect unreachable members.
|
||||
failure-detector {
|
||||
|
|
|
|||
|
|
@ -540,8 +540,8 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
|
|||
latestGossip = winningGossip seen selfAddress
|
||||
|
||||
// for all new joining nodes we remove them from the failure detector
|
||||
(latestGossip.members -- localGossip.members).foreach {
|
||||
node ⇒ if (node.status == Joining) failureDetector.remove(node.address)
|
||||
latestGossip.members foreach {
|
||||
node ⇒ if (node.status == Joining && !localGossip.members(node)) failureDetector.remove(node.address)
|
||||
}
|
||||
|
||||
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
|
||||
|
|
|
|||
|
|
@ -59,7 +59,6 @@ class ClusterSettings(val config: Config, val systemName: String) {
|
|||
case id ⇒ id
|
||||
}
|
||||
final val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability")
|
||||
final val MaxGossipMergeRate: Double = cc.getDouble("max-gossip-merge-rate")
|
||||
final val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS)
|
||||
final val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel")
|
||||
final val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled")
|
||||
|
|
|
|||
|
|
@ -120,17 +120,21 @@ private[cluster] case class Gossip(
|
|||
overview.seen.get(address).exists(_ == version)
|
||||
}
|
||||
|
||||
private def mergeSeenTables(allowed: immutable.Set[Member], one: Map[Address, VectorClock], another: Map[Address, VectorClock]): Map[Address, VectorClock] = {
|
||||
(one.filter { case (a, v) ⇒ allowed.exists(_.address == a) } /: another) {
|
||||
case (merged, (address, oneVersion)) ⇒
|
||||
if (allowed.exists(_.address == address)) {
|
||||
val anotherVersion = merged.getOrElse(address, oneVersion)
|
||||
anotherVersion tryCompareTo oneVersion match {
|
||||
case None ⇒ merged - address
|
||||
case Some(x) if x > 0 ⇒ merged + (address -> anotherVersion)
|
||||
case _ ⇒ merged + (address -> oneVersion)
|
||||
}
|
||||
} else merged
|
||||
private def mergeSeenTables(allowed: Set[Member], one: Map[Address, VectorClock], another: Map[Address, VectorClock]): Map[Address, VectorClock] = {
|
||||
(Map.empty[Address, VectorClock] /: allowed) {
|
||||
(merged, member) ⇒
|
||||
val address = member.address
|
||||
(one.get(address), another.get(address)) match {
|
||||
case (None, None) ⇒ merged
|
||||
case (Some(v1), None) ⇒ merged.updated(address, v1)
|
||||
case (None, Some(v2)) ⇒ merged.updated(address, v2)
|
||||
case (Some(v1), Some(v2)) ⇒
|
||||
v1 tryCompareTo v2 match {
|
||||
case None ⇒ merged
|
||||
case Some(x) if x > 0 ⇒ merged.updated(address, v1)
|
||||
case _ ⇒ merged.updated(address, v2)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -169,19 +173,14 @@ private[cluster] case class Gossip(
|
|||
* @return true if convergence have been reached and false if not
|
||||
*/
|
||||
def convergence: Boolean = {
|
||||
val unreachable = overview.unreachable
|
||||
val seen = overview.seen
|
||||
|
||||
// First check that:
|
||||
// 1. we don't have any members that are unreachable, or
|
||||
// 2. all unreachable members in the set have status DOWN
|
||||
// Else we can't continue to check for convergence
|
||||
// When that is done we check that all members exists in the seen table and
|
||||
// have the latest vector clock version
|
||||
val hasUnreachable = unreachable.nonEmpty && unreachable.exists { _.status != Down }
|
||||
def allMembersInSeenHasLatest = members.forall(m ⇒ seen.get(m.address).exists(_ == version))
|
||||
|
||||
!hasUnreachable && allMembersInSeenHasLatest
|
||||
overview.unreachable.forall(_.status == Down) && members.forall(m ⇒ seenByAddress(m.address))
|
||||
}
|
||||
|
||||
def isLeader(address: Address): Boolean = leader == Some(address)
|
||||
|
|
|
|||
|
|
@ -84,7 +84,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
normal-throughput-duration = 30s
|
||||
high-throughput-duration = 10s
|
||||
supervision-duration = 10s
|
||||
supervision-one-iteration = 1s
|
||||
supervision-one-iteration = 2.5s
|
||||
expected-test-duration = 600s
|
||||
# actors are created in a tree structure defined
|
||||
# by tree-width (number of children for each actor) and
|
||||
|
|
|
|||
|
|
@ -42,7 +42,6 @@ class ClusterConfigSpec extends AkkaSpec {
|
|||
JmxEnabled must be(true)
|
||||
UseDispatcher must be(Dispatchers.DefaultDispatcherId)
|
||||
GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001)
|
||||
MaxGossipMergeRate must be(5.0 plusOrMinus 0.0001)
|
||||
SchedulerTickDuration must be(33 millis)
|
||||
SchedulerTicksPerWheel must be(512)
|
||||
MetricsEnabled must be(true)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue