diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index eb1c2c08fb..0d87a4b89c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -832,7 +832,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) else { log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens - if (node != selfAddress) failureDetector heartbeat node + if (node != selfAddress) { + failureDetector heartbeat node + gossipTo(node) + } notifyMembershipChangeListeners(localState, newState) } } @@ -974,10 +977,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip :+ vclockNode - log.debug( - """Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""", - remoteGossip, localGossip, versionedMergedGossip) - versionedMergedGossip } else if (remoteGossip.version < localGossip.version) { @@ -1003,8 +1002,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update else { log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + + if ((winningGossip ne localGossip) && (winningGossip ne remoteGossip)) + log.debug( + """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", + remoteGossip, localGossip, winningGossip) + _receivedGossipCount.incrementAndGet() notifyMembershipChangeListeners(localState, newState) + + if ((winningGossip ne remoteGossip) || (newState.latestGossip ne remoteGossip)) { + // send back gossip to sender when sender had different view, i.e. merge, or sender had + // older or sender had newer + gossipTo(from) + } } } } @@ -1055,6 +1066,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } + private def gossipToDifferentViewProbability: Double = 0.8 + /** * INTERNAL API. * @@ -1075,8 +1088,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq val localUnreachableSize = localUnreachableMembers.size - // 1. gossip to alive members - val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) + // 1. gossip to a random alive member with preference to a member + // with older or newer gossip version + val nodesWithdifferentView = { + val localMemberAddressesSet = localGossip.members map { _.address } + for { + (address, version) ← localGossip.overview.seen + if localMemberAddressesSet contains address + if version != localGossip.version + } yield address + } + val gossipedToAlive = + if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < gossipToDifferentViewProbability) + gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq) + else + gossipToRandomNodeOf(localMemberAddresses) // 2. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes(localMemberAddresses) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 3be082d2f3..ddacf668e0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -23,8 +23,6 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString(""" akka.cluster { - # FIXME remove this (use default) when ticket #2239 has been fixed - gossip-interval = 400 ms auto-join = off } akka.loglevel = INFO diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 397d824ef4..c4e43b9abf 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -16,8 +16,6 @@ object TransitionMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") - val fourth = role("fourth") - val fifth = role("fifth") commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")). @@ -27,8 +25,6 @@ object TransitionMultiJvmSpec extends MultiNodeConfig { class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode4 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode5 extends TransitionSpec with FailureDetectorPuppetStrategy abstract class TransitionSpec extends MultiNodeSpec(TransitionMultiJvmSpec) @@ -81,11 +77,15 @@ abstract class TransitionSpec val g = cluster.latestGossip enterBarrier("before-gossip-" + gossipBarrierCounter) awaitCond(cluster.latestGossip != g) // received gossip + // gossip chat will synchronize the views + awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) } runOn(fromRole) { enterBarrier("before-gossip-" + gossipBarrierCounter) cluster.gossipTo(toRole) // send gossip + // gossip chat will synchronize the views + awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) } runOn(roles.filterNot(r ⇒ r == fromRole || r == toRole): _*) { @@ -116,22 +116,12 @@ abstract class TransitionSpec runOn(second) { cluster.join(first) } - runOn(first) { + runOn(first, second) { + // gossip chat from the join will synchronize the views awaitMembers(first, second) memberStatus(first) must be(Up) memberStatus(second) must be(Joining) - seenLatestGossip must be(Set(first)) - cluster.convergence.isDefined must be(false) - } - enterBarrier("second-joined") - - first gossipTo second - second gossipTo first - - runOn(first, second) { - memberStatus(first) must be(Up) - memberStatus(second) must be(Joining) - seenLatestGossip must be(Set(first, second)) + awaitCond(seenLatestGossip == Set(first, second)) cluster.convergence.isDefined must be(true) } enterBarrier("convergence-joining-2") @@ -144,18 +134,11 @@ abstract class TransitionSpec enterBarrier("leader-actions-2") leader(first, second) gossipTo nonLeader(first, second).head - runOn(nonLeader(first, second).head) { - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - seenLatestGossip must be(Set(first, second)) - cluster.convergence.isDefined must be(true) - } - - nonLeader(first, second).head gossipTo leader(first, second) runOn(first, second) { - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) + // gossip chat will synchronize the views + awaitCond(memberStatus(second) == Up) seenLatestGossip must be(Set(first, second)) + memberStatus(first) must be(Up) cluster.convergence.isDefined must be(true) } @@ -167,25 +150,26 @@ abstract class TransitionSpec runOn(third) { cluster.join(second) } - runOn(second) { + runOn(second, third) { + // gossip chat from the join will synchronize the views awaitMembers(first, second, third) - cluster.convergence.isDefined must be(false) memberStatus(third) must be(Joining) - seenLatestGossip must be(Set(second)) + awaitCond(seenLatestGossip == Set(second, third)) + cluster.convergence.isDefined must be(false) } enterBarrier("third-joined-second") second gossipTo first - runOn(first) { - members must be(Set(first, second, third)) + runOn(first, second) { + // gossip chat will synchronize the views + awaitMembers(first, second, third) memberStatus(third) must be(Joining) - seenLatestGossip must be(Set(first, second)) - cluster.convergence.isDefined must be(false) + awaitCond(memberStatus(second) == Up) + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) } first gossipTo third - third gossipTo first - third gossipTo second runOn(first, second, third) { members must be(Set(first, second, third)) memberStatus(first) must be(Up) @@ -224,14 +208,6 @@ abstract class TransitionSpec cluster.convergence.isDefined must be(true) } - // and back again - nonLeader(first, second, third).tail.head gossipTo nonLeader(first, second, third).head - runOn(nonLeader(first, second, third).head) { - memberStatus(third) must be(Up) - seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) - } - // first non-leader gossipTo the leader nonLeader(first, second, third).head gossipTo leader(first, second, third) runOn(first, second, third) { @@ -245,160 +221,36 @@ abstract class TransitionSpec enterBarrier("after-3") } - "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { - runOn(fifth) { - startClusterNode() - cluster.leaderActions() - cluster.status must be(Up) - } - enterBarrier("fifth-started") - - runOn(fourth) { - cluster.join(fifth) - } - runOn(fifth) { - awaitMembers(fourth, fifth) - } - enterBarrier("fourth-joined") - - fifth gossipTo fourth - fourth gossipTo fifth - - runOn(fourth, fifth) { - memberStatus(fourth) must be(Joining) - memberStatus(fifth) must be(Up) - seenLatestGossip must be(Set(fourth, fifth)) - cluster.convergence.isDefined must be(true) - } - - enterBarrier("after-4") - } - - "perform correct transitions when second cluster (node fourth) joins first cluster (node third)" taggedAs LongRunningTest in { - - runOn(fourth) { - cluster.join(third) - } - runOn(third) { - awaitMembers(first, second, third, fourth) - seenLatestGossip must be(Set(third)) - } - enterBarrier("fourth-joined-third") - - third gossipTo second - runOn(second) { - seenLatestGossip must be(Set(second, third)) - } - - second gossipTo fourth - runOn(fourth) { - members must be(roles.toSet) - // merge conflict - seenLatestGossip must be(Set(fourth)) - } - - fourth gossipTo first - fourth gossipTo second - fourth gossipTo third - fourth gossipTo fifth - runOn(first, second, third, fifth) { - members must be(roles.toSet) - seenLatestGossip must be(Set(fourth, myself)) - } - - first gossipTo fifth - runOn(fifth) { - seenLatestGossip must be(Set(first, fourth, fifth)) - } - - fifth gossipTo third - runOn(third) { - seenLatestGossip must be(Set(first, third, fourth, fifth)) - } - - third gossipTo second - runOn(second) { - seenLatestGossip must be(roles.toSet) - cluster.convergence.isDefined must be(true) - } - - second gossipTo first - second gossipTo third - second gossipTo fourth - third gossipTo fifth - - seenLatestGossip must be(roles.toSet) - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - memberStatus(third) must be(Up) - memberStatus(fourth) must be(Joining) - memberStatus(fifth) must be(Up) - cluster.convergence.isDefined must be(true) - - enterBarrier("convergence-joining-3") - - runOn(leader(roles: _*)) { - cluster.leaderActions() - memberStatus(fourth) must be(Up) - seenLatestGossip must be(Set(myself)) - cluster.convergence.isDefined must be(false) - } - // spread the word - for (x :: y :: Nil ← (roles.sorted ++ roles.sorted.dropRight(1)).toList.sliding(2)) { - x gossipTo y - } - - enterBarrier("spread-5") - - seenLatestGossip must be(roles.toSet) - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - memberStatus(third) must be(Up) - memberStatus(fourth) must be(Up) - memberStatus(fifth) must be(Up) - cluster.convergence.isDefined must be(true) - - enterBarrier("after-5") - } - "perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in { - runOn(fifth) { + runOn(third) { markNodeAsUnavailable(second) cluster.reapUnreachableMembers() cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) - seenLatestGossip must be(Set(fifth)) + seenLatestGossip must be(Set(third)) } enterBarrier("after-second-unavailble") - // spread the word - val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth) - for (x :: y :: Nil ← gossipRound.sliding(2)) { - x gossipTo y - } + third gossipTo first - runOn((roles.filterNot(_ == second)): _*) { + runOn(first, third) { cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) cluster.convergence.isDefined must be(false) } - runOn(third) { + runOn(first) { cluster.down(second) awaitMemberStatus(second, Down) } enterBarrier("after-second-down") - // spread the word - val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth) - for (x :: y :: Nil ← gossipRound2.sliding(2)) { - x gossipTo y - } + first gossipTo third - runOn((roles.filterNot(_ == second)): _*) { + runOn(first, third) { cluster.latestGossip.overview.unreachable must contain(Member(second, Down)) memberStatus(second) must be(Down) - seenLatestGossip must be(Set(first, third, fourth, fifth)) + seenLatestGossip must be(Set(first, third)) cluster.convergence.isDefined must be(true) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index e818847969..68731b89b2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -105,12 +105,14 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { cluster.latestGossip.members.map(_.address) must be(Set(selfAddress, addresses(1))) memberStatus(addresses(1)) must be(Some(MemberStatus.Joining)) cluster.convergence.isDefined must be(false) + expectMsg(GossipTo(addresses(1))) } "accept a few more joining nodes" in { for (a ← addresses.drop(2)) { cluster.joining(a) memberStatus(a) must be(Some(MemberStatus.Joining)) + expectMsg(GossipTo(a)) } cluster.latestGossip.members.map(_.address) must be(addresses.toSet) } @@ -121,7 +123,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { } "gossip to random live node" in { - cluster.latestGossip.members cluster.gossip() cluster.gossip() cluster.gossip() diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 1812c33561..cbad3ef690 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -138,7 +138,7 @@ implementation of `The Phi Accrual Failure Detector`_ by Hayashibara et al. An accrual failure detector decouple monitoring and interpretation. That makes them applicable to a wider area of scenarios and more adequate to build generic failure detection services. The idea is that it is keeping a history of failure -statistics, calculated from heartbeats received from the gossip protocol, and is +statistics, calculated from heartbeats received from other nodes, and is trying to do educated guesses by taking multiple factors, and how they accumulate over time, into account in order to come up with a better guess if a specific node is up or down. Rather than just answering "yes" or "no" to the @@ -232,15 +232,14 @@ breaking logical partitions as seen in the gossip algorithm defined below. During each round of gossip exchange the following process is used: -1. Gossip to random live node (if any) +1. Gossip to random node with newer or older state information, if any, based on the + current gossip overview, with some probability. Otherwise Gossip to any random + live node. 2. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with certain probability depending on number of unreachable, ``deputy``, and live nodes. -3. Gossip to random node with newer or older state information, based on the - current gossip overview, with some probability (?) - The gossiper only sends the gossip overview to the chosen node. The recipient of the gossip can use the gossip overview to determine whether: