diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index a06e9273cb..b60b91ec43 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -52,6 +52,11 @@ akka { # of the cluster within this deadline. join-timeout = 60s + # Gossip to random node with newer or older state information, if any with some + # this probability. Otherwise Gossip to any random live node. + # Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always. + gossip-different-view-probability = 0.8 + failure-detector { # defines the failure detector threshold diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 809d62165f..3e7ace6c0f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -30,6 +30,7 @@ import MemberStatus._ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } import scala.collection.GenTraversableOnce +import java.util.concurrent.atomic.AtomicLong /** * Interface for membership change listener. @@ -257,7 +258,7 @@ object Gossip { * When convergence is reached the leader change status of `members` from `Joining` * to `Up`. * - * When failure detector consider a node as unavailble it will be moved from + * When failure detector consider a node as unavailable it will be moved from * `members` to `overview.unreachable`. * * When a node is downed, either manually or automatically, its status is changed to `Down`. @@ -556,12 +557,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } - private val state = { + private def createCleanState: State = { // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet - new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers))) + State(Gossip(members = Gossip.emptyMembers)) } + private val state = new AtomicReference[State](createCleanState) + // try to join one of the nodes defined in the 'akka.cluster.seed-nodes' if (AutoJoin) joinSeedNode() @@ -736,8 +739,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) @tailrec final def join(address: Address): Unit = { val localState = state.get - val newState = localState copy (joinInProgress = localState.joinInProgress + - (address -> (Deadline.now + JoinTimeout))) + // wipe our state since a node that joins a cluster must be empty + val newState = createCleanState copy (joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout)), + memberMembershipChangeListeners = localState.memberMembershipChangeListeners) + // wipe the failure detector since we are starting fresh and shouldn't care about the past + failureDetector.reset() if (!state.compareAndSet(localState, newState)) join(address) // recur else { val connection = clusterCommandConnectionFor(address) @@ -816,9 +822,12 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!alreadyMember && !isUnreachable) { // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster - val newUnreachableMembers = localUnreachable filterNot { _.address == node } + val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) + // remove the node from the failure detector if it is a DOWN node that is rejoining cluster + if (rejoiningMember.nonEmpty) failureDetector.remove(node) + // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining) @@ -833,7 +842,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) else { log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens - if (node != selfAddress) failureDetector heartbeat node + if (node != selfAddress) { + failureDetector heartbeat node + gossipTo(node) + } notifyMembershipChangeListeners(localState, newState) } } @@ -950,6 +962,13 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } + // Can be removed when gossip has been optimized + private val _receivedGossipCount = new AtomicLong + /** + * INTERNAL API. + */ + private[cluster] def receivedGossipCount: Long = _receivedGossipCount.get + /** * INTERNAL API. * @@ -968,10 +987,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip :+ vclockNode - log.debug( - """Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""", - remoteGossip, localGossip, versionedMergedGossip) - versionedMergedGossip } else if (remoteGossip.version < localGossip.version) { @@ -993,11 +1008,31 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) latestGossip = winningGossip seen selfAddress, joinInProgress = newJoinInProgress) + // for all new joining nodes we optimistically remove them from the failure detector, since if we wait until + // we have won the CAS, then the node might be picked up by the reapUnreachableMembers task and moved to + // unreachable before we can remove the node from the failure detector + (newState.latestGossip.members -- localState.latestGossip.members).filter(_.status == Joining).foreach { + case node ⇒ failureDetector.remove(node.address) + } + // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update else { log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + + if ((winningGossip ne localGossip) && (winningGossip ne remoteGossip)) + log.debug( + """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", + remoteGossip, localGossip, winningGossip) + + _receivedGossipCount.incrementAndGet() notifyMembershipChangeListeners(localState, newState) + + if ((winningGossip ne remoteGossip) || (newState.latestGossip ne remoteGossip)) { + // send back gossip to sender when sender had different view, i.e. merge, or sender had + // older or sender had newer + gossipTo(from) + } } } } @@ -1068,8 +1103,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq val localUnreachableSize = localUnreachableMembers.size - // 1. gossip to alive members - val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) + // 1. gossip to a random alive member with preference to a member + // with older or newer gossip version + val nodesWithdifferentView = { + val localMemberAddressesSet = localGossip.members map { _.address } + for { + (address, version) ← localGossip.overview.seen + if localMemberAddressesSet contains address + if version != localGossip.version + } yield address + } + val gossipedToAlive = + if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) + gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq) + else + gossipToRandomNodeOf(localMemberAddresses) // 2. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes(localMemberAddresses) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 41ab6b4755..defb72b313 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -36,6 +36,7 @@ class ClusterSettings(val config: Config, val systemName: String) { final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) + final val GossipDifferentViewProbability: Double = getDouble("akka.cluster.gossip-different-view-probability") final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala new file mode 100644 index 0000000000..e3dc7719c1 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -0,0 +1,293 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.ActorSystem +import akka.util.Deadline +import java.util.concurrent.TimeoutException +import scala.collection.immutable.SortedSet +import akka.dispatch.Await +import akka.util.Duration +import java.util.concurrent.TimeUnit +import akka.remote.testconductor.RoleName + +object LargeClusterMultiJvmSpec extends MultiNodeConfig { + // each jvm simulates a datacenter with many nodes + val firstDatacenter = role("first-datacenter") + val secondDatacenter = role("second-datacenter") + val thirdDatacenter = role("third-datacenter") + val fourthDatacenter = role("fourth-datacenter") + val fifthDatacenter = role("fifth-datacenter") + + // Note that this test uses default configuration, + // not MultiNodeClusterSpec.clusterConfig + commonConfig(ConfigFactory.parseString(""" + # Number of ActorSystems in each jvm, can be specified as + # system property when running real tests. Many nodes + # will take long time and consume many threads. + # 10 => 50 nodes is possible to run on one machine. + akka.test.large-cluster-spec.nodes-per-datacenter = 2 + akka.cluster { + gossip-interval = 500 ms + auto-join = off + failure-detector.threshold = 4 + } + akka.loglevel = INFO + akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 2 + akka.scheduler.tick-duration = 33 ms + akka.remote.netty.execution-pool-size = 0 + + # don't use testconductor transport in this test, especially not + # when using use-dispatcher-for-io + akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" + + # Using a separate dispatcher for netty io doesn't reduce number + # of needed threads + # akka.remote.netty.use-dispatcher-for-io=akka.test.io-dispatcher + # akka.test.io-dispatcher.fork-join-executor { + # parallelism-min = 100 + # parallelism-max = 100 + # } + """)) +} + +class LargeClusterMultiJvmNode1 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode2 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode3 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode4 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode5 extends LargeClusterSpec with AccrualFailureDetectorStrategy + +abstract class LargeClusterSpec + extends MultiNodeSpec(LargeClusterMultiJvmSpec) + with MultiNodeClusterSpec { + + import LargeClusterMultiJvmSpec._ + + var systems: IndexedSeq[ActorSystem] = IndexedSeq(system) + val nodesPerDatacenter = system.settings.config.getInt( + "akka.test.large-cluster-spec.nodes-per-datacenter") + + /** + * Since we start some ActorSystems/Clusters outside of the + * MultiNodeClusterSpec control we can't use use the mechanism + * defined in MultiNodeClusterSpec to inject failure detector etc. + * Use ordinary Cluster extension with default AccrualFailureDetector. + */ + override def cluster: Cluster = Cluster(system) + + override def atTermination(): Unit = { + systems foreach { _.shutdown } + val shutdownTimeout = 20.seconds + val deadline = Deadline.now + shutdownTimeout + systems.foreach { sys ⇒ + if (sys.isTerminated) + () // already done + else if (deadline.isOverdue) + sys.log.warning("Failed to shutdown [{}] within [{}]", sys.name, shutdownTimeout) + else { + try sys.awaitTermination(deadline.timeLeft) catch { + case _: TimeoutException ⇒ sys.log.warning("Failed to shutdown [{}] within [{}]", sys.name, shutdownTimeout) + } + } + } + } + + def startupSystems(): Unit = { + // one system is already started by the multi-node test + for (n ← 2 to nodesPerDatacenter) + systems :+= ActorSystem(myself.name + "-" + n, system.settings.config) + + // Initialize the Cluster extensions, i.e. startup the clusters + systems foreach { Cluster(_) } + } + + def expectedMaxDuration(totalNodes: Int): Duration = + 5.seconds + (2.seconds * totalNodes) + + def joinAll(from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = { + val joiningClusters = systems.map(Cluster(_)).toSet + join(joiningClusters, from, to, totalNodes, runOnRoles: _*) + } + + def join(joiningClusterNodes: Set[Cluster], from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = { + runOnRoles must contain(from) + runOnRoles must contain(to) + + runOn(runOnRoles: _*) { + systems.size must be(nodesPerDatacenter) // make sure it is initialized + + val clusterNodes = ifNode(from)(joiningClusterNodes)(systems.map(Cluster(_)).toSet) + val startGossipCounts = Map.empty[Cluster, Long] ++ + clusterNodes.map(c ⇒ (c -> c.receivedGossipCount)) + def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c) + val startTime = System.nanoTime + def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" + + val latch = TestLatch(clusterNodes.size) + clusterNodes foreach { c ⇒ + c.registerListener(new MembershipChangeListener { + override def notify(members: SortedSet[Member]): Unit = { + if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) { + log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages", + totalNodes, c.selfAddress, tookMillis, gossipCount(c)) + latch.countDown() + } + } + }) + } + + runOn(from) { + clusterNodes foreach { _ join to } + } + + Await.ready(latch, remaining) + + awaitCond(clusterNodes.forall(_.convergence.isDefined)) + val counts = clusterNodes.map(gossipCount(_)) + val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max) + log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node", + totalNodes, tookMillis, formattedStats) + + } + } + + "A large cluster" must { + + "join all nodes in first-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(firstDatacenter) { + startupSystems() + startClusterNode() + } + enterBarrier("first-datacenter-started") + + val totalNodes = nodesPerDatacenter + within(expectedMaxDuration(totalNodes)) { + joinAll(from = firstDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter) + enterBarrier("first-datacenter-joined") + } + } + + "join all nodes in second-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(secondDatacenter) { + startupSystems() + } + enterBarrier("second-datacenter-started") + + val totalNodes = nodesPerDatacenter * 2 + within(expectedMaxDuration(totalNodes)) { + joinAll(from = secondDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter, secondDatacenter) + enterBarrier("second-datacenter-joined") + } + } + + "join all nodes in third-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(thirdDatacenter) { + startupSystems() + } + enterBarrier("third-datacenter-started") + + val totalNodes = nodesPerDatacenter * 3 + within(expectedMaxDuration(totalNodes)) { + joinAll(from = thirdDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter) + enterBarrier("third-datacenter-joined") + } + } + + "join all nodes in fourth-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(fourthDatacenter) { + startupSystems() + } + enterBarrier("fourth-datacenter-started") + + val totalNodes = nodesPerDatacenter * 4 + within(expectedMaxDuration(totalNodes)) { + joinAll(from = fourthDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter) + enterBarrier("fourth-datacenter-joined") + } + } + + "join nodes one by one from fifth-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(fifthDatacenter) { + startupSystems() + } + enterBarrier("fifth-datacenter-started") + + // enough to join a few one-by-one (takes too long time otherwise) + val (bulk, oneByOne) = systems.splitAt(systems.size - 3) + + if (bulk.nonEmpty) { + val totalNodes = nodesPerDatacenter * 4 + bulk.size + within(expectedMaxDuration(totalNodes)) { + val joiningClusters = ifNode(fifthDatacenter)(bulk.map(Cluster(_)).toSet)(Set.empty) + join(joiningClusters, from = fifthDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) + enterBarrier("fifth-datacenter-joined-" + bulk.size) + } + } + + for (i ← 0 until oneByOne.size) { + val totalNodes = nodesPerDatacenter * 4 + bulk.size + i + 1 + within(expectedMaxDuration(totalNodes)) { + val joiningClusters = ifNode(fifthDatacenter)(Set(Cluster(oneByOne(i))))(Set.empty) + join(joiningClusters, from = fifthDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) + enterBarrier("fifth-datacenter-joined-" + (bulk.size + i)) + } + } + } + + // FIXME sometimes this fails, FD marks nodes from other than second-datacenter as unavailable + "detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest ignore { + val unreachableNodes = nodesPerDatacenter + val liveNodes = nodesPerDatacenter * 4 + + within(20.seconds + expectedMaxDuration(liveNodes)) { + val startGossipCounts = Map.empty[Cluster, Long] ++ + systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).receivedGossipCount)) + def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c) + val startTime = System.nanoTime + def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" + + val latch = TestLatch(nodesPerDatacenter) + systems foreach { sys ⇒ + Cluster(sys).registerListener(new MembershipChangeListener { + override def notify(members: SortedSet[Member]): Unit = { + if (!latch.isOpen && members.size == liveNodes && Cluster(sys).latestGossip.overview.unreachable.size == unreachableNodes) { + log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", + unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) + latch.countDown() + } + } + }) + } + + runOn(firstDatacenter) { + testConductor.shutdown(secondDatacenter, 0) + } + + enterBarrier("second-datacenter-shutdown") + + runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { + Await.ready(latch, remaining) + awaitCond(systems.forall(Cluster(_).convergence.isDefined)) + val counts = systems.map(sys ⇒ gossipCount(Cluster(sys))) + val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) + log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node", + liveNodes, tookMillis, formattedStats) + } + + enterBarrier("after-6") + } + + } + + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 2794f03434..215b9f24e4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -23,8 +23,6 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString(""" akka.cluster { - # FIXME remove this (use default) when ticket #2239 has been fixed - gossip-interval = 400 ms auto-join = off } akka.loglevel = INFO diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 397d824ef4..c4e43b9abf 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -16,8 +16,6 @@ object TransitionMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") - val fourth = role("fourth") - val fifth = role("fifth") commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")). @@ -27,8 +25,6 @@ object TransitionMultiJvmSpec extends MultiNodeConfig { class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode4 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode5 extends TransitionSpec with FailureDetectorPuppetStrategy abstract class TransitionSpec extends MultiNodeSpec(TransitionMultiJvmSpec) @@ -81,11 +77,15 @@ abstract class TransitionSpec val g = cluster.latestGossip enterBarrier("before-gossip-" + gossipBarrierCounter) awaitCond(cluster.latestGossip != g) // received gossip + // gossip chat will synchronize the views + awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) } runOn(fromRole) { enterBarrier("before-gossip-" + gossipBarrierCounter) cluster.gossipTo(toRole) // send gossip + // gossip chat will synchronize the views + awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) } runOn(roles.filterNot(r ⇒ r == fromRole || r == toRole): _*) { @@ -116,22 +116,12 @@ abstract class TransitionSpec runOn(second) { cluster.join(first) } - runOn(first) { + runOn(first, second) { + // gossip chat from the join will synchronize the views awaitMembers(first, second) memberStatus(first) must be(Up) memberStatus(second) must be(Joining) - seenLatestGossip must be(Set(first)) - cluster.convergence.isDefined must be(false) - } - enterBarrier("second-joined") - - first gossipTo second - second gossipTo first - - runOn(first, second) { - memberStatus(first) must be(Up) - memberStatus(second) must be(Joining) - seenLatestGossip must be(Set(first, second)) + awaitCond(seenLatestGossip == Set(first, second)) cluster.convergence.isDefined must be(true) } enterBarrier("convergence-joining-2") @@ -144,18 +134,11 @@ abstract class TransitionSpec enterBarrier("leader-actions-2") leader(first, second) gossipTo nonLeader(first, second).head - runOn(nonLeader(first, second).head) { - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - seenLatestGossip must be(Set(first, second)) - cluster.convergence.isDefined must be(true) - } - - nonLeader(first, second).head gossipTo leader(first, second) runOn(first, second) { - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) + // gossip chat will synchronize the views + awaitCond(memberStatus(second) == Up) seenLatestGossip must be(Set(first, second)) + memberStatus(first) must be(Up) cluster.convergence.isDefined must be(true) } @@ -167,25 +150,26 @@ abstract class TransitionSpec runOn(third) { cluster.join(second) } - runOn(second) { + runOn(second, third) { + // gossip chat from the join will synchronize the views awaitMembers(first, second, third) - cluster.convergence.isDefined must be(false) memberStatus(third) must be(Joining) - seenLatestGossip must be(Set(second)) + awaitCond(seenLatestGossip == Set(second, third)) + cluster.convergence.isDefined must be(false) } enterBarrier("third-joined-second") second gossipTo first - runOn(first) { - members must be(Set(first, second, third)) + runOn(first, second) { + // gossip chat will synchronize the views + awaitMembers(first, second, third) memberStatus(third) must be(Joining) - seenLatestGossip must be(Set(first, second)) - cluster.convergence.isDefined must be(false) + awaitCond(memberStatus(second) == Up) + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) } first gossipTo third - third gossipTo first - third gossipTo second runOn(first, second, third) { members must be(Set(first, second, third)) memberStatus(first) must be(Up) @@ -224,14 +208,6 @@ abstract class TransitionSpec cluster.convergence.isDefined must be(true) } - // and back again - nonLeader(first, second, third).tail.head gossipTo nonLeader(first, second, third).head - runOn(nonLeader(first, second, third).head) { - memberStatus(third) must be(Up) - seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) - } - // first non-leader gossipTo the leader nonLeader(first, second, third).head gossipTo leader(first, second, third) runOn(first, second, third) { @@ -245,160 +221,36 @@ abstract class TransitionSpec enterBarrier("after-3") } - "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { - runOn(fifth) { - startClusterNode() - cluster.leaderActions() - cluster.status must be(Up) - } - enterBarrier("fifth-started") - - runOn(fourth) { - cluster.join(fifth) - } - runOn(fifth) { - awaitMembers(fourth, fifth) - } - enterBarrier("fourth-joined") - - fifth gossipTo fourth - fourth gossipTo fifth - - runOn(fourth, fifth) { - memberStatus(fourth) must be(Joining) - memberStatus(fifth) must be(Up) - seenLatestGossip must be(Set(fourth, fifth)) - cluster.convergence.isDefined must be(true) - } - - enterBarrier("after-4") - } - - "perform correct transitions when second cluster (node fourth) joins first cluster (node third)" taggedAs LongRunningTest in { - - runOn(fourth) { - cluster.join(third) - } - runOn(third) { - awaitMembers(first, second, third, fourth) - seenLatestGossip must be(Set(third)) - } - enterBarrier("fourth-joined-third") - - third gossipTo second - runOn(second) { - seenLatestGossip must be(Set(second, third)) - } - - second gossipTo fourth - runOn(fourth) { - members must be(roles.toSet) - // merge conflict - seenLatestGossip must be(Set(fourth)) - } - - fourth gossipTo first - fourth gossipTo second - fourth gossipTo third - fourth gossipTo fifth - runOn(first, second, third, fifth) { - members must be(roles.toSet) - seenLatestGossip must be(Set(fourth, myself)) - } - - first gossipTo fifth - runOn(fifth) { - seenLatestGossip must be(Set(first, fourth, fifth)) - } - - fifth gossipTo third - runOn(third) { - seenLatestGossip must be(Set(first, third, fourth, fifth)) - } - - third gossipTo second - runOn(second) { - seenLatestGossip must be(roles.toSet) - cluster.convergence.isDefined must be(true) - } - - second gossipTo first - second gossipTo third - second gossipTo fourth - third gossipTo fifth - - seenLatestGossip must be(roles.toSet) - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - memberStatus(third) must be(Up) - memberStatus(fourth) must be(Joining) - memberStatus(fifth) must be(Up) - cluster.convergence.isDefined must be(true) - - enterBarrier("convergence-joining-3") - - runOn(leader(roles: _*)) { - cluster.leaderActions() - memberStatus(fourth) must be(Up) - seenLatestGossip must be(Set(myself)) - cluster.convergence.isDefined must be(false) - } - // spread the word - for (x :: y :: Nil ← (roles.sorted ++ roles.sorted.dropRight(1)).toList.sliding(2)) { - x gossipTo y - } - - enterBarrier("spread-5") - - seenLatestGossip must be(roles.toSet) - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - memberStatus(third) must be(Up) - memberStatus(fourth) must be(Up) - memberStatus(fifth) must be(Up) - cluster.convergence.isDefined must be(true) - - enterBarrier("after-5") - } - "perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in { - runOn(fifth) { + runOn(third) { markNodeAsUnavailable(second) cluster.reapUnreachableMembers() cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) - seenLatestGossip must be(Set(fifth)) + seenLatestGossip must be(Set(third)) } enterBarrier("after-second-unavailble") - // spread the word - val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth) - for (x :: y :: Nil ← gossipRound.sliding(2)) { - x gossipTo y - } + third gossipTo first - runOn((roles.filterNot(_ == second)): _*) { + runOn(first, third) { cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) cluster.convergence.isDefined must be(false) } - runOn(third) { + runOn(first) { cluster.down(second) awaitMemberStatus(second, Down) } enterBarrier("after-second-down") - // spread the word - val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth) - for (x :: y :: Nil ← gossipRound2.sliding(2)) { - x gossipTo y - } + first gossipTo third - runOn((roles.filterNot(_ == second)): _*) { + runOn(first, third) { cluster.latestGossip.overview.unreachable must contain(Member(second, Down)) memberStatus(second) must be(Down) - seenLatestGossip must be(Set(first, third, fourth, fifth)) + seenLatestGossip must be(Set(first, third)) cluster.convergence.isDefined must be(true) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala new file mode 100644 index 0000000000..34f8605af1 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -0,0 +1,137 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import akka.actor.Address +import akka.remote.testconductor.{RoleName, Direction} +import akka.util.duration._ + +object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy + + +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy + +abstract class UnreachableNodeRejoinsClusterSpec + extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec) + with MultiNodeClusterSpec { + import UnreachableNodeRejoinsClusterMultiJvmSpec._ + + def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = { + roles.filterNot(_ == role) + } + + + lazy val sortedRoles = roles.sorted + lazy val master = sortedRoles(0) + lazy val victim = sortedRoles(1) + + var endBarrierNumber = 0 + def endBarrier: Unit = { + endBarrierNumber += 1 + enterBarrier("after_" + endBarrierNumber) + } + + "A cluster of " + roles.size + " members" must { + + "reach initial convergence" taggedAs LongRunningTest in { + awaitClusterUp(roles:_*) + endBarrier + } + + "mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in { + runOn(first) { + // pull network for victim node from all nodes + allBut(victim).foreach { roleName => + testConductor.blackhole(victim, roleName, Direction.Both).await + } + } + + enterBarrier("unplug_victim") + + val allButVictim = allBut(victim, sortedRoles) + runOn(victim) { + allButVictim.foreach(markNodeAsUnavailable(_)) + within(30 seconds) { + // victim becomes all alone + awaitCond({ val gossip = cluster.latestGossip + gossip.overview.unreachable.size == (roles.size - 1) && + gossip.members.size == 1 && + gossip.members.forall(_.status == MemberStatus.Up) }) + cluster.latestGossip.overview.unreachable.map(_.address) must be((allButVictim map address).toSet) + cluster.convergence.isDefined must be(false) + } + } + + runOn(allButVictim:_*) { + markNodeAsUnavailable(victim) + within(30 seconds) { + // victim becomes unreachable + awaitCond({ val gossip = cluster.latestGossip + gossip.overview.unreachable.size == 1 && + gossip.members.size == (roles.size - 1) && + gossip.members.forall(_.status == MemberStatus.Up) }) + awaitSeenSameState(allButVictim map address:_*) + // still one unreachable + cluster.latestGossip.overview.unreachable.size must be(1) + cluster.latestGossip.overview.unreachable.head.address must be(node(victim).address) + // and therefore no convergence + cluster.convergence.isDefined must be(false) + } + } + + endBarrier + } + + "mark the node as DOWN" taggedAs LongRunningTest in { + runOn(master) { + cluster down victim + } + + runOn(allBut(victim):_*) { + awaitUpConvergence(roles.size - 1, Seq(victim)) + } + + endBarrier + } + + "allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest in { + runOn(first) { + // put the network back in + allBut(victim).foreach { roleName => + testConductor.passThrough(victim, roleName, Direction.Both).await + } + } + + enterBarrier("plug_in_victim") + + runOn(victim) { + cluster join master + } + + awaitUpConvergence(roles.size) + + endBarrier + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index a8d552874e..4bb8f46b85 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -34,6 +34,7 @@ class ClusterConfigSpec extends AkkaSpec { NrOfGossipDaemons must be(4) AutoJoin must be(true) AutoDown must be(true) + GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index e161f88e8c..77ab199310 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -108,12 +108,14 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { cluster.latestGossip.members.map(_.address) must be(Set(selfAddress, addresses(1))) memberStatus(addresses(1)) must be(Some(MemberStatus.Joining)) cluster.convergence.isDefined must be(false) + expectMsg(GossipTo(addresses(1))) } "accept a few more joining nodes" in { for (a ← addresses.drop(2)) { cluster.joining(a) memberStatus(a) must be(Some(MemberStatus.Joining)) + expectMsg(GossipTo(a)) } cluster.latestGossip.members.map(_.address) must be(addresses.toSet) } @@ -124,7 +126,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { } "gossip to random live node" in { - cluster.latestGossip.members cluster.gossip() cluster.gossip() cluster.gossip() diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 1812c33561..cbad3ef690 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -138,7 +138,7 @@ implementation of `The Phi Accrual Failure Detector`_ by Hayashibara et al. An accrual failure detector decouple monitoring and interpretation. That makes them applicable to a wider area of scenarios and more adequate to build generic failure detection services. The idea is that it is keeping a history of failure -statistics, calculated from heartbeats received from the gossip protocol, and is +statistics, calculated from heartbeats received from other nodes, and is trying to do educated guesses by taking multiple factors, and how they accumulate over time, into account in order to come up with a better guess if a specific node is up or down. Rather than just answering "yes" or "no" to the @@ -232,15 +232,14 @@ breaking logical partitions as seen in the gossip algorithm defined below. During each round of gossip exchange the following process is used: -1. Gossip to random live node (if any) +1. Gossip to random node with newer or older state information, if any, based on the + current gossip overview, with some probability. Otherwise Gossip to any random + live node. 2. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with certain probability depending on number of unreachable, ``deputy``, and live nodes. -3. Gossip to random node with newer or older state information, based on the - current gossip overview, with some probability (?) - The gossiper only sends the gossip overview to the chosen node. The recipient of the gossip can use the gossip overview to determine whether: diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index f7684f759f..a1cae44773 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -137,6 +137,19 @@ trait Conductor { this: TestConductorExt ⇒ controller ? Throttle(node, target, direction, 0f) mapTo } + /** + * Switch the Netty pipeline of the remote support into pass through mode for + * sending and/or receiving. + * + * @param node is the symbolic name of the node which is to be affected + * @param target is the symbolic name of the other node to which connectivity shall be impeded + * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` + */ + def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = { + import Settings.QueryTimeout + controller ? Throttle(node, target, direction, -1f) mapTo + } + /** * Tell the remote support to shutdown the connection to the given remote * peer. It works regardless of whether the recipient was initiator or diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala index 1e5a53d82e..5dd41365bf 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala @@ -88,12 +88,10 @@ object LogRoleReplace extends ClipboardOwner { class LogRoleReplace { - private val RoleStarted = """\[([\w\-]+)\].*Role \[([\w]+)\] started""".r - private val RemoteServerStarted = """\[([\w\-]+)\].*RemoteServerStarted@akka://.*@([\w\-\.]+):([0-9]+)""".r + private val RoleStarted = """\[([\w\-]+)\].*Role \[([\w]+)\] started with address \[akka://.*@([\w\-\.]+):([0-9]+)\]""".r private val ColorCode = """\[[0-9]+m""" private var replacements: Map[String, String] = Map.empty - private var jvmToAddress: Map[String, String] = Map.empty def process(in: BufferedReader, out: PrintWriter): Unit = { @@ -121,23 +119,13 @@ class LogRoleReplace { if (line.startsWith("[info] * ")) { // reset when new test begins replacements = Map.empty - jvmToAddress = Map.empty } line match { - case RemoteServerStarted(jvm, host, port) ⇒ - jvmToAddress += (jvm -> (host + ":" + port)) + case RoleStarted(jvm, role, host, port) ⇒ + replacements += (jvm -> role) + replacements += ((host + ":" + port) -> role) false - - case RoleStarted(jvm, role) ⇒ - jvmToAddress.get(jvm) match { - case Some(address) ⇒ - replacements += (jvm -> role) - replacements += (address -> role) - false - case None ⇒ false - } - case _ ⇒ true } } diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 80dc010053..61dd57478b 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -6,9 +6,7 @@ package akka.remote.testkit import language.implicitConversions import java.net.InetSocketAddress - import com.typesafe.config.{ ConfigObject, ConfigFactory, Config } - import akka.actor.{ RootActorPath, ActorPath, ActorSystem, ExtendedActorSystem } import scala.concurrent.Await import scala.concurrent.Await.Awaitable @@ -17,6 +15,7 @@ import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.testkit.AkkaSpec import scala.concurrent.util.Duration import scala.concurrent.util.duration._ +import akka.remote.RemoteActorRefProvider /** * Configure the role names and participants of the test, including configuration settings. @@ -141,7 +140,8 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: import MultiNodeSpec._ def this(config: MultiNodeConfig) = - this(config.myself, ActorSystem(AkkaSpec.getCallerName(classOf[MultiNodeSpec]), config.config), config.roles, config.deployments) + this(config.myself, ActorSystem(AkkaSpec.getCallerName(classOf[MultiNodeSpec]), ConfigFactory.load(config.config)), + config.roles, config.deployments) /* * Test Class Interface @@ -262,8 +262,9 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: } } - // useful to see which jvm is running which role - log.info("Role [{}] started", myself.name) + // useful to see which jvm is running which role, used by LogRoleReplace utility + log.info("Role [{}] started with address [{}]", myself.name, + system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address) // wait for all nodes to remove themselves before we shut the conductor down final override def beforeShutdown() = { diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 087c85691d..215c4006e9 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -57,7 +57,8 @@ object AkkaSpec { abstract class AkkaSpec(_system: ActorSystem) extends TestKit(_system) with WordSpec with MustMatchers with BeforeAndAfterAll { - def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName(getClass), config.withFallback(AkkaSpec.testConf))) + def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName(getClass), + ConfigFactory.load(config.withFallback(AkkaSpec.testConf)))) def this(s: String) = this(ConfigFactory.parseString(s)) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index c90cddb2b1..a706ec2c64 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -319,11 +319,11 @@ object AkkaBuild extends Build { val defaultExcludedTags = Set("timing", "long-running") lazy val defaultMultiJvmOptions: Seq[String] = { - (System.getProperty("akka.test.timefactor") match { - case null => Nil - case x => List("-Dakka.test.timefactor=" + x) - }) ::: - (if (getBoolean("sbt.log.noformat")) List("-Dakka.test.nocolor=true") else Nil) + import scala.collection.JavaConverters._ + val akkaProperties = System.getProperties.propertyNames.asScala.toList.collect { + case key: String if key.startsWith("akka.") => "-D" + key + "=" + System.getProperty(key) + } + akkaProperties ::: (if (getBoolean("sbt.log.noformat")) List("-Dakka.test.nocolor=true") else Nil) } // for excluding tests by name use system property: -Dakka.test.names.exclude=TimingSpec