Improve efficiency of gossip, see #2193 and #2253

* Essentially as already described in cluster specification,
  but now fully implemented and tested with LargeClusterSpec
* Gossip to nodes with different view (using seen table)
  with certain probability
* Gossip chat, gossip back to sender
* Immediate gossip to joining node
* Updated some tests to reflect current implementation
This commit is contained in:
Patrik Nordwall 2012-06-28 11:36:13 +02:00
parent aca66de732
commit 2da1a912fe
5 changed files with 67 additions and 191 deletions

View file

@ -832,7 +832,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
else { else {
log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node)
// treat join as initial heartbeat, so that it becomes unavailable if nothing more happens // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens
if (node != selfAddress) failureDetector heartbeat node if (node != selfAddress) {
failureDetector heartbeat node
gossipTo(node)
}
notifyMembershipChangeListeners(localState, newState) notifyMembershipChangeListeners(localState, newState)
} }
} }
@ -974,10 +977,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val mergedGossip = remoteGossip merge localGossip val mergedGossip = remoteGossip merge localGossip
val versionedMergedGossip = mergedGossip :+ vclockNode val versionedMergedGossip = mergedGossip :+ vclockNode
log.debug(
"""Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
remoteGossip, localGossip, versionedMergedGossip)
versionedMergedGossip versionedMergedGossip
} else if (remoteGossip.version < localGossip.version) { } else if (remoteGossip.version < localGossip.version) {
@ -1003,8 +1002,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
else { else {
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
if ((winningGossip ne localGossip) && (winningGossip ne remoteGossip))
log.debug(
"""Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""",
remoteGossip, localGossip, winningGossip)
_receivedGossipCount.incrementAndGet() _receivedGossipCount.incrementAndGet()
notifyMembershipChangeListeners(localState, newState) notifyMembershipChangeListeners(localState, newState)
if ((winningGossip ne remoteGossip) || (newState.latestGossip ne remoteGossip)) {
// send back gossip to sender when sender had different view, i.e. merge, or sender had
// older or sender had newer
gossipTo(from)
}
} }
} }
} }
@ -1055,6 +1066,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
} }
} }
private def gossipToDifferentViewProbability: Double = 0.8
/** /**
* INTERNAL API. * INTERNAL API.
* *
@ -1075,8 +1088,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq
val localUnreachableSize = localUnreachableMembers.size val localUnreachableSize = localUnreachableMembers.size
// 1. gossip to alive members // 1. gossip to a random alive member with preference to a member
val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) // with older or newer gossip version
val nodesWithdifferentView = {
val localMemberAddressesSet = localGossip.members map { _.address }
for {
(address, version) localGossip.overview.seen
if localMemberAddressesSet contains address
if version != localGossip.version
} yield address
}
val gossipedToAlive =
if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < gossipToDifferentViewProbability)
gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq)
else
gossipToRandomNodeOf(localMemberAddresses)
// 2. gossip to a deputy nodes for facilitating partition healing // 2. gossip to a deputy nodes for facilitating partition healing
val deputies = deputyNodes(localMemberAddresses) val deputies = deputyNodes(localMemberAddresses)

View file

@ -23,8 +23,6 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
// not MultiNodeClusterSpec.clusterConfig // not MultiNodeClusterSpec.clusterConfig
commonConfig(ConfigFactory.parseString(""" commonConfig(ConfigFactory.parseString("""
akka.cluster { akka.cluster {
# FIXME remove this (use default) when ticket #2239 has been fixed
gossip-interval = 400 ms
auto-join = off auto-join = off
} }
akka.loglevel = INFO akka.loglevel = INFO

View file

@ -16,8 +16,6 @@ object TransitionMultiJvmSpec extends MultiNodeConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third") val third = role("third")
val fourth = role("fourth")
val fifth = role("fifth")
commonConfig(debugConfig(on = false). commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")). withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")).
@ -27,8 +25,6 @@ object TransitionMultiJvmSpec extends MultiNodeConfig {
class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode4 extends TransitionSpec with FailureDetectorPuppetStrategy
class TransitionMultiJvmNode5 extends TransitionSpec with FailureDetectorPuppetStrategy
abstract class TransitionSpec abstract class TransitionSpec
extends MultiNodeSpec(TransitionMultiJvmSpec) extends MultiNodeSpec(TransitionMultiJvmSpec)
@ -81,11 +77,15 @@ abstract class TransitionSpec
val g = cluster.latestGossip val g = cluster.latestGossip
enterBarrier("before-gossip-" + gossipBarrierCounter) enterBarrier("before-gossip-" + gossipBarrierCounter)
awaitCond(cluster.latestGossip != g) // received gossip awaitCond(cluster.latestGossip != g) // received gossip
// gossip chat will synchronize the views
awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty)
enterBarrier("after-gossip-" + gossipBarrierCounter) enterBarrier("after-gossip-" + gossipBarrierCounter)
} }
runOn(fromRole) { runOn(fromRole) {
enterBarrier("before-gossip-" + gossipBarrierCounter) enterBarrier("before-gossip-" + gossipBarrierCounter)
cluster.gossipTo(toRole) // send gossip cluster.gossipTo(toRole) // send gossip
// gossip chat will synchronize the views
awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty)
enterBarrier("after-gossip-" + gossipBarrierCounter) enterBarrier("after-gossip-" + gossipBarrierCounter)
} }
runOn(roles.filterNot(r r == fromRole || r == toRole): _*) { runOn(roles.filterNot(r r == fromRole || r == toRole): _*) {
@ -116,22 +116,12 @@ abstract class TransitionSpec
runOn(second) { runOn(second) {
cluster.join(first) cluster.join(first)
} }
runOn(first) { runOn(first, second) {
// gossip chat from the join will synchronize the views
awaitMembers(first, second) awaitMembers(first, second)
memberStatus(first) must be(Up) memberStatus(first) must be(Up)
memberStatus(second) must be(Joining) memberStatus(second) must be(Joining)
seenLatestGossip must be(Set(first)) awaitCond(seenLatestGossip == Set(first, second))
cluster.convergence.isDefined must be(false)
}
enterBarrier("second-joined")
first gossipTo second
second gossipTo first
runOn(first, second) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Joining)
seenLatestGossip must be(Set(first, second))
cluster.convergence.isDefined must be(true) cluster.convergence.isDefined must be(true)
} }
enterBarrier("convergence-joining-2") enterBarrier("convergence-joining-2")
@ -144,18 +134,11 @@ abstract class TransitionSpec
enterBarrier("leader-actions-2") enterBarrier("leader-actions-2")
leader(first, second) gossipTo nonLeader(first, second).head leader(first, second) gossipTo nonLeader(first, second).head
runOn(nonLeader(first, second).head) {
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
seenLatestGossip must be(Set(first, second))
cluster.convergence.isDefined must be(true)
}
nonLeader(first, second).head gossipTo leader(first, second)
runOn(first, second) { runOn(first, second) {
memberStatus(first) must be(Up) // gossip chat will synchronize the views
memberStatus(second) must be(Up) awaitCond(memberStatus(second) == Up)
seenLatestGossip must be(Set(first, second)) seenLatestGossip must be(Set(first, second))
memberStatus(first) must be(Up)
cluster.convergence.isDefined must be(true) cluster.convergence.isDefined must be(true)
} }
@ -167,25 +150,26 @@ abstract class TransitionSpec
runOn(third) { runOn(third) {
cluster.join(second) cluster.join(second)
} }
runOn(second) { runOn(second, third) {
// gossip chat from the join will synchronize the views
awaitMembers(first, second, third) awaitMembers(first, second, third)
cluster.convergence.isDefined must be(false)
memberStatus(third) must be(Joining) memberStatus(third) must be(Joining)
seenLatestGossip must be(Set(second)) awaitCond(seenLatestGossip == Set(second, third))
cluster.convergence.isDefined must be(false)
} }
enterBarrier("third-joined-second") enterBarrier("third-joined-second")
second gossipTo first second gossipTo first
runOn(first) { runOn(first, second) {
members must be(Set(first, second, third)) // gossip chat will synchronize the views
awaitMembers(first, second, third)
memberStatus(third) must be(Joining) memberStatus(third) must be(Joining)
seenLatestGossip must be(Set(first, second)) awaitCond(memberStatus(second) == Up)
cluster.convergence.isDefined must be(false) seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
} }
first gossipTo third first gossipTo third
third gossipTo first
third gossipTo second
runOn(first, second, third) { runOn(first, second, third) {
members must be(Set(first, second, third)) members must be(Set(first, second, third))
memberStatus(first) must be(Up) memberStatus(first) must be(Up)
@ -224,14 +208,6 @@ abstract class TransitionSpec
cluster.convergence.isDefined must be(true) cluster.convergence.isDefined must be(true)
} }
// and back again
nonLeader(first, second, third).tail.head gossipTo nonLeader(first, second, third).head
runOn(nonLeader(first, second, third).head) {
memberStatus(third) must be(Up)
seenLatestGossip must be(Set(first, second, third))
cluster.convergence.isDefined must be(true)
}
// first non-leader gossipTo the leader // first non-leader gossipTo the leader
nonLeader(first, second, third).head gossipTo leader(first, second, third) nonLeader(first, second, third).head gossipTo leader(first, second, third)
runOn(first, second, third) { runOn(first, second, third) {
@ -245,160 +221,36 @@ abstract class TransitionSpec
enterBarrier("after-3") enterBarrier("after-3")
} }
"startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in {
runOn(fifth) {
startClusterNode()
cluster.leaderActions()
cluster.status must be(Up)
}
enterBarrier("fifth-started")
runOn(fourth) {
cluster.join(fifth)
}
runOn(fifth) {
awaitMembers(fourth, fifth)
}
enterBarrier("fourth-joined")
fifth gossipTo fourth
fourth gossipTo fifth
runOn(fourth, fifth) {
memberStatus(fourth) must be(Joining)
memberStatus(fifth) must be(Up)
seenLatestGossip must be(Set(fourth, fifth))
cluster.convergence.isDefined must be(true)
}
enterBarrier("after-4")
}
"perform correct transitions when second cluster (node fourth) joins first cluster (node third)" taggedAs LongRunningTest in {
runOn(fourth) {
cluster.join(third)
}
runOn(third) {
awaitMembers(first, second, third, fourth)
seenLatestGossip must be(Set(third))
}
enterBarrier("fourth-joined-third")
third gossipTo second
runOn(second) {
seenLatestGossip must be(Set(second, third))
}
second gossipTo fourth
runOn(fourth) {
members must be(roles.toSet)
// merge conflict
seenLatestGossip must be(Set(fourth))
}
fourth gossipTo first
fourth gossipTo second
fourth gossipTo third
fourth gossipTo fifth
runOn(first, second, third, fifth) {
members must be(roles.toSet)
seenLatestGossip must be(Set(fourth, myself))
}
first gossipTo fifth
runOn(fifth) {
seenLatestGossip must be(Set(first, fourth, fifth))
}
fifth gossipTo third
runOn(third) {
seenLatestGossip must be(Set(first, third, fourth, fifth))
}
third gossipTo second
runOn(second) {
seenLatestGossip must be(roles.toSet)
cluster.convergence.isDefined must be(true)
}
second gossipTo first
second gossipTo third
second gossipTo fourth
third gossipTo fifth
seenLatestGossip must be(roles.toSet)
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
memberStatus(fourth) must be(Joining)
memberStatus(fifth) must be(Up)
cluster.convergence.isDefined must be(true)
enterBarrier("convergence-joining-3")
runOn(leader(roles: _*)) {
cluster.leaderActions()
memberStatus(fourth) must be(Up)
seenLatestGossip must be(Set(myself))
cluster.convergence.isDefined must be(false)
}
// spread the word
for (x :: y :: Nil (roles.sorted ++ roles.sorted.dropRight(1)).toList.sliding(2)) {
x gossipTo y
}
enterBarrier("spread-5")
seenLatestGossip must be(roles.toSet)
memberStatus(first) must be(Up)
memberStatus(second) must be(Up)
memberStatus(third) must be(Up)
memberStatus(fourth) must be(Up)
memberStatus(fifth) must be(Up)
cluster.convergence.isDefined must be(true)
enterBarrier("after-5")
}
"perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in { "perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in {
runOn(fifth) { runOn(third) {
markNodeAsUnavailable(second) markNodeAsUnavailable(second)
cluster.reapUnreachableMembers() cluster.reapUnreachableMembers()
cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) cluster.latestGossip.overview.unreachable must contain(Member(second, Up))
seenLatestGossip must be(Set(fifth)) seenLatestGossip must be(Set(third))
} }
enterBarrier("after-second-unavailble") enterBarrier("after-second-unavailble")
// spread the word third gossipTo first
val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth)
for (x :: y :: Nil gossipRound.sliding(2)) {
x gossipTo y
}
runOn((roles.filterNot(_ == second)): _*) { runOn(first, third) {
cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) cluster.latestGossip.overview.unreachable must contain(Member(second, Up))
cluster.convergence.isDefined must be(false) cluster.convergence.isDefined must be(false)
} }
runOn(third) { runOn(first) {
cluster.down(second) cluster.down(second)
awaitMemberStatus(second, Down) awaitMemberStatus(second, Down)
} }
enterBarrier("after-second-down") enterBarrier("after-second-down")
// spread the word first gossipTo third
val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth)
for (x :: y :: Nil gossipRound2.sliding(2)) {
x gossipTo y
}
runOn((roles.filterNot(_ == second)): _*) { runOn(first, third) {
cluster.latestGossip.overview.unreachable must contain(Member(second, Down)) cluster.latestGossip.overview.unreachable must contain(Member(second, Down))
memberStatus(second) must be(Down) memberStatus(second) must be(Down)
seenLatestGossip must be(Set(first, third, fourth, fifth)) seenLatestGossip must be(Set(first, third))
cluster.convergence.isDefined must be(true) cluster.convergence.isDefined must be(true)
} }

View file

@ -105,12 +105,14 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
cluster.latestGossip.members.map(_.address) must be(Set(selfAddress, addresses(1))) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress, addresses(1)))
memberStatus(addresses(1)) must be(Some(MemberStatus.Joining)) memberStatus(addresses(1)) must be(Some(MemberStatus.Joining))
cluster.convergence.isDefined must be(false) cluster.convergence.isDefined must be(false)
expectMsg(GossipTo(addresses(1)))
} }
"accept a few more joining nodes" in { "accept a few more joining nodes" in {
for (a addresses.drop(2)) { for (a addresses.drop(2)) {
cluster.joining(a) cluster.joining(a)
memberStatus(a) must be(Some(MemberStatus.Joining)) memberStatus(a) must be(Some(MemberStatus.Joining))
expectMsg(GossipTo(a))
} }
cluster.latestGossip.members.map(_.address) must be(addresses.toSet) cluster.latestGossip.members.map(_.address) must be(addresses.toSet)
} }
@ -121,7 +123,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
} }
"gossip to random live node" in { "gossip to random live node" in {
cluster.latestGossip.members
cluster.gossip() cluster.gossip()
cluster.gossip() cluster.gossip()
cluster.gossip() cluster.gossip()

View file

@ -138,7 +138,7 @@ implementation of `The Phi Accrual Failure Detector`_ by Hayashibara et al.
An accrual failure detector decouple monitoring and interpretation. That makes An accrual failure detector decouple monitoring and interpretation. That makes
them applicable to a wider area of scenarios and more adequate to build generic them applicable to a wider area of scenarios and more adequate to build generic
failure detection services. The idea is that it is keeping a history of failure failure detection services. The idea is that it is keeping a history of failure
statistics, calculated from heartbeats received from the gossip protocol, and is statistics, calculated from heartbeats received from other nodes, and is
trying to do educated guesses by taking multiple factors, and how they trying to do educated guesses by taking multiple factors, and how they
accumulate over time, into account in order to come up with a better guess if a accumulate over time, into account in order to come up with a better guess if a
specific node is up or down. Rather than just answering "yes" or "no" to the specific node is up or down. Rather than just answering "yes" or "no" to the
@ -232,15 +232,14 @@ breaking logical partitions as seen in the gossip algorithm defined below.
During each round of gossip exchange the following process is used: During each round of gossip exchange the following process is used:
1. Gossip to random live node (if any) 1. Gossip to random node with newer or older state information, if any, based on the
current gossip overview, with some probability. Otherwise Gossip to any random
live node.
2. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live 2. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live
nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with
certain probability depending on number of unreachable, ``deputy``, and live nodes. certain probability depending on number of unreachable, ``deputy``, and live nodes.
3. Gossip to random node with newer or older state information, based on the
current gossip overview, with some probability (?)
The gossiper only sends the gossip overview to the chosen node. The recipient of The gossiper only sends the gossip overview to the chosen node. The recipient of
the gossip can use the gossip overview to determine whether: the gossip can use the gossip overview to determine whether: