Don't gossip to unreachable, see #2263
* Also, ignore gossip from unreachable, see #2264 * Update gossip protocol in cluster doc
This commit is contained in:
parent
32604bb3d8
commit
cba64403a7
3 changed files with 44 additions and 93 deletions
|
|
@ -197,6 +197,9 @@ case class GossipOverview(
|
||||||
seen: Map[Address, VectorClock] = Map.empty,
|
seen: Map[Address, VectorClock] = Map.empty,
|
||||||
unreachable: Set[Member] = Set.empty) {
|
unreachable: Set[Member] = Set.empty) {
|
||||||
|
|
||||||
|
def isNonDownUnreachable(address: Address): Boolean =
|
||||||
|
unreachable.exists { m ⇒ m.address == address && m.status != Down }
|
||||||
|
|
||||||
override def toString =
|
override def toString =
|
||||||
"GossipOverview(seen = [" + seen.mkString(", ") +
|
"GossipOverview(seen = [" + seen.mkString(", ") +
|
||||||
"], unreachable = [" + unreachable.mkString(", ") +
|
"], unreachable = [" + unreachable.mkString(", ") +
|
||||||
|
|
@ -751,7 +754,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
||||||
val localUnreachable = localGossip.overview.unreachable
|
val localUnreachable = localGossip.overview.unreachable
|
||||||
|
|
||||||
val alreadyMember = localMembers.exists(_.address == node)
|
val alreadyMember = localMembers.exists(_.address == node)
|
||||||
val isUnreachable = localUnreachable.exists { m ⇒ m.address == node && m.status != Down }
|
val isUnreachable = localGossip.overview.isNonDownUnreachable(node)
|
||||||
|
|
||||||
if (!alreadyMember && !isUnreachable) {
|
if (!alreadyMember && !isUnreachable) {
|
||||||
|
|
||||||
|
|
@ -898,46 +901,49 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
||||||
val localState = state.get
|
val localState = state.get
|
||||||
val localGossip = localState.latestGossip
|
val localGossip = localState.latestGossip
|
||||||
|
|
||||||
val winningGossip =
|
if (!localGossip.overview.isNonDownUnreachable(from)) {
|
||||||
if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) {
|
|
||||||
// a fresh singleton cluster that is joining, no need to merge, use received gossip
|
|
||||||
remoteGossip
|
|
||||||
|
|
||||||
} else if (remoteGossip.version <> localGossip.version) {
|
val winningGossip =
|
||||||
// concurrent
|
if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) {
|
||||||
val mergedGossip = remoteGossip merge localGossip
|
// a fresh singleton cluster that is joining, no need to merge, use received gossip
|
||||||
val versionedMergedGossip = mergedGossip :+ vclockNode
|
remoteGossip
|
||||||
|
|
||||||
log.debug(
|
} else if (remoteGossip.version <> localGossip.version) {
|
||||||
"""Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
|
// concurrent
|
||||||
remoteGossip, localGossip, versionedMergedGossip)
|
val mergedGossip = remoteGossip merge localGossip
|
||||||
|
val versionedMergedGossip = mergedGossip :+ vclockNode
|
||||||
|
|
||||||
versionedMergedGossip
|
log.debug(
|
||||||
|
"""Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""",
|
||||||
|
remoteGossip, localGossip, versionedMergedGossip)
|
||||||
|
|
||||||
} else if (remoteGossip.version < localGossip.version) {
|
versionedMergedGossip
|
||||||
// local gossip is newer
|
|
||||||
localGossip
|
|
||||||
|
|
||||||
} else {
|
} else if (remoteGossip.version < localGossip.version) {
|
||||||
// remote gossip is newer
|
// local gossip is newer
|
||||||
remoteGossip
|
localGossip
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// remote gossip is newer
|
||||||
|
remoteGossip
|
||||||
|
}
|
||||||
|
|
||||||
|
val newJoinInProgress =
|
||||||
|
if (localState.joinInProgress.isEmpty) localState.joinInProgress
|
||||||
|
else localState.joinInProgress --
|
||||||
|
winningGossip.members.map(_.address) --
|
||||||
|
winningGossip.overview.unreachable.map(_.address)
|
||||||
|
|
||||||
|
val newState = localState copy (
|
||||||
|
latestGossip = winningGossip seen selfAddress,
|
||||||
|
joinInProgress = newJoinInProgress)
|
||||||
|
|
||||||
|
// if we won the race then update else try again
|
||||||
|
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
|
||||||
|
else {
|
||||||
|
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
|
||||||
|
notifyMembershipChangeListeners(localState, newState)
|
||||||
}
|
}
|
||||||
|
|
||||||
val newJoinInProgress =
|
|
||||||
if (localState.joinInProgress.isEmpty) localState.joinInProgress
|
|
||||||
else localState.joinInProgress --
|
|
||||||
winningGossip.members.map(_.address) --
|
|
||||||
winningGossip.overview.unreachable.map(_.address)
|
|
||||||
|
|
||||||
val newState = localState copy (
|
|
||||||
latestGossip = winningGossip seen selfAddress,
|
|
||||||
joinInProgress = newJoinInProgress)
|
|
||||||
|
|
||||||
// if we won the race then update else try again
|
|
||||||
if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update
|
|
||||||
else {
|
|
||||||
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from)
|
|
||||||
notifyMembershipChangeListeners(localState, newState)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -975,15 +981,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
||||||
peer
|
peer
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* INTERNAL API.
|
|
||||||
*/
|
|
||||||
private[cluster] def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double =
|
|
||||||
(membersSize + unreachableSize) match {
|
|
||||||
case 0 ⇒ 0.0
|
|
||||||
case sum ⇒ unreachableSize.toDouble / sum
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API.
|
* INTERNAL API.
|
||||||
*/
|
*/
|
||||||
|
|
@ -1019,13 +1016,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector)
|
||||||
// 1. gossip to alive members
|
// 1. gossip to alive members
|
||||||
val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses)
|
val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses)
|
||||||
|
|
||||||
// 2. gossip to unreachable members
|
|
||||||
if (localUnreachableSize > 0) {
|
|
||||||
val probability = gossipToUnreachableProbablity(localMembersSize, localUnreachableSize)
|
|
||||||
if (ThreadLocalRandom.current.nextDouble() < probability)
|
|
||||||
gossipToRandomNodeOf(localUnreachableMembers.map(_.address))
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. gossip to a deputy nodes for facilitating partition healing
|
// 3. gossip to a deputy nodes for facilitating partition healing
|
||||||
val deputies = deputyNodes(localMemberAddresses)
|
val deputies = deputyNodes(localMemberAddresses)
|
||||||
val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false)
|
val alreadyGossipedToDeputy = gossipedToAlive.map(deputies.contains(_)).getOrElse(false)
|
||||||
|
|
|
||||||
|
|
@ -50,14 +50,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
||||||
testActor ! GossipTo(address)
|
testActor ! GossipTo(address)
|
||||||
}
|
}
|
||||||
|
|
||||||
@volatile
|
|
||||||
var _gossipToUnreachableProbablity = 0.0
|
|
||||||
|
|
||||||
override def gossipToUnreachableProbablity(membersSize: Int, unreachableSize: Int): Double = {
|
|
||||||
if (_gossipToUnreachableProbablity < 0.0) super.gossipToUnreachableProbablity(membersSize, unreachableSize)
|
|
||||||
else _gossipToUnreachableProbablity
|
|
||||||
}
|
|
||||||
|
|
||||||
@volatile
|
@volatile
|
||||||
var _gossipToDeputyProbablity = 0.0
|
var _gossipToDeputyProbablity = 0.0
|
||||||
|
|
||||||
|
|
@ -81,7 +73,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
||||||
cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status }
|
cluster.latestGossip.members.collectFirst { case m if m.address == address ⇒ m.status }
|
||||||
|
|
||||||
before {
|
before {
|
||||||
cluster._gossipToUnreachableProbablity = 0.0
|
|
||||||
cluster._gossipToDeputyProbablity = 0.0
|
cluster._gossipToDeputyProbablity = 0.0
|
||||||
addresses foreach failureDetector.remove
|
addresses foreach failureDetector.remove
|
||||||
deterministicRandom.set(0)
|
deterministicRandom.set(0)
|
||||||
|
|
@ -133,17 +124,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
||||||
expectNoMsg(1 second)
|
expectNoMsg(1 second)
|
||||||
}
|
}
|
||||||
|
|
||||||
"use certain probability for gossiping to unreachable node depending on the number of unreachable and live nodes" in {
|
|
||||||
cluster._gossipToUnreachableProbablity = -1.0 // use real impl
|
|
||||||
cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(9, 1))
|
|
||||||
cluster.gossipToUnreachableProbablity(10, 1) must be < (cluster.gossipToUnreachableProbablity(10, 2))
|
|
||||||
cluster.gossipToUnreachableProbablity(10, 5) must be < (cluster.gossipToUnreachableProbablity(10, 9))
|
|
||||||
cluster.gossipToUnreachableProbablity(0, 10) must be <= (1.0)
|
|
||||||
cluster.gossipToUnreachableProbablity(1, 10) must be <= (1.0)
|
|
||||||
cluster.gossipToUnreachableProbablity(10, 0) must be(0.0 plusOrMinus (0.0001))
|
|
||||||
cluster.gossipToUnreachableProbablity(0, 0) must be(0.0 plusOrMinus (0.0001))
|
|
||||||
}
|
|
||||||
|
|
||||||
"use certain probability for gossiping to deputy node depending on the number of unreachable and live nodes" in {
|
"use certain probability for gossiping to deputy node depending on the number of unreachable and live nodes" in {
|
||||||
cluster._gossipToDeputyProbablity = -1.0 // use real impl
|
cluster._gossipToDeputyProbablity = -1.0 // use real impl
|
||||||
cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(9, 1, 2))
|
cluster.gossipToDeputyProbablity(10, 1, 2) must be < (cluster.gossipToDeputyProbablity(9, 1, 2))
|
||||||
|
|
@ -178,22 +158,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"gossip to random unreachable node" in {
|
|
||||||
val dead = Set(addresses(1))
|
|
||||||
dead foreach failureDetector.markNodeAsUnavailable
|
|
||||||
cluster._gossipToUnreachableProbablity = 1.0 // always
|
|
||||||
|
|
||||||
cluster.reapUnreachableMembers()
|
|
||||||
cluster.latestGossip.overview.unreachable.map(_.address) must be(dead)
|
|
||||||
|
|
||||||
cluster.gossip()
|
|
||||||
|
|
||||||
expectMsg(GossipTo(addresses(2))) // first available
|
|
||||||
expectMsg(GossipTo(addresses(1))) // the unavailable
|
|
||||||
|
|
||||||
expectNoMsg(1 second)
|
|
||||||
}
|
|
||||||
|
|
||||||
"gossip to random deputy node if number of live nodes is less than number of deputy nodes" in {
|
"gossip to random deputy node if number of live nodes is less than number of deputy nodes" in {
|
||||||
cluster._gossipToDeputyProbablity = -1.0 // real impl
|
cluster._gossipToDeputyProbablity = -1.0 // real impl
|
||||||
// 0 and 2 still alive
|
// 0 and 2 still alive
|
||||||
|
|
|
||||||
|
|
@ -213,7 +213,7 @@ nodes involved in a gossip exchange.
|
||||||
|
|
||||||
Periodically, the default is every 1 second, each node chooses another random
|
Periodically, the default is every 1 second, each node chooses another random
|
||||||
node to initiate a round of gossip with. The choice of node is random but can
|
node to initiate a round of gossip with. The choice of node is random but can
|
||||||
also include extra gossiping for unreachable nodes, ``deputy`` nodes, and nodes with
|
also include extra gossiping for ``deputy`` nodes, and nodes with
|
||||||
either newer or older state versions.
|
either newer or older state versions.
|
||||||
|
|
||||||
The gossip overview contains the current state version for all nodes and also a
|
The gossip overview contains the current state version for all nodes and also a
|
||||||
|
|
@ -228,14 +228,11 @@ During each round of gossip exchange the following process is used:
|
||||||
|
|
||||||
1. Gossip to random live node (if any)
|
1. Gossip to random live node (if any)
|
||||||
|
|
||||||
2. Gossip to random unreachable node with certain probability depending on the
|
2. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live
|
||||||
number of unreachable and live nodes
|
|
||||||
|
|
||||||
3. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live
|
|
||||||
nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with
|
nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with
|
||||||
certain probability depending on number of unreachable, ``deputy``, and live nodes.
|
certain probability depending on number of unreachable, ``deputy``, and live nodes.
|
||||||
|
|
||||||
4. Gossip to random node with newer or older state information, based on the
|
3. Gossip to random node with newer or older state information, based on the
|
||||||
current gossip overview, with some probability (?)
|
current gossip overview, with some probability (?)
|
||||||
|
|
||||||
The gossiper only sends the gossip overview to the chosen node. The recipient of
|
The gossiper only sends the gossip overview to the chosen node. The recipient of
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue