From a900052f681dae82aba1c386e44b588372b5b962 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Jun 2012 11:32:11 +0200 Subject: [PATCH 01/13] Propagate akka system properties to multi-node tests, see #2280 * Change build to propagate all system properties starting with 'akka.' to multi-jvm and multi-node tests. * Adjusted AkkaSpec and MultiNodeSpec to use load of the config, which means that default overrides (system properties) are used. --- .../test/scala/akka/remote/testkit/MultiNodeSpec.scala | 3 ++- .../src/test/scala/akka/testkit/AkkaSpec.scala | 3 ++- project/AkkaBuild.scala | 10 +++++----- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 25bb8df7dc..0efde92c79 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -138,7 +138,8 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: import MultiNodeSpec._ def this(config: MultiNodeConfig) = - this(config.myself, ActorSystem(AkkaSpec.getCallerName(classOf[MultiNodeSpec]), config.config), config.roles, config.deployments) + this(config.myself, ActorSystem(AkkaSpec.getCallerName(classOf[MultiNodeSpec]), ConfigFactory.load(config.config)), + config.roles, config.deployments) /* * Test Class Interface diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index f9ee989e1c..f381e53013 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -59,7 +59,8 @@ object AkkaSpec { abstract class AkkaSpec(_system: ActorSystem) extends TestKit(_system) with WordSpec with MustMatchers with BeforeAndAfterAll { - def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName(getClass), config.withFallback(AkkaSpec.testConf))) + def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName(getClass), + ConfigFactory.load(config.withFallback(AkkaSpec.testConf)))) def this(s: String) = this(ConfigFactory.parseString(s)) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 24e803aecc..340516aa03 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -310,11 +310,11 @@ object AkkaBuild extends Build { val defaultExcludedTags = Set("timing", "long-running") lazy val defaultMultiJvmOptions: Seq[String] = { - (System.getProperty("akka.test.timefactor") match { - case null => Nil - case x => List("-Dakka.test.timefactor=" + x) - }) ::: - (if (getBoolean("sbt.log.noformat")) List("-Dakka.test.nocolor=true") else Nil) + import scala.collection.JavaConverters._ + val akkaProperties = System.getProperties.propertyNames.asScala.toList.collect { + case key: String if key.startsWith("akka.") => "-D" + key + "=" + System.getProperty(key) + } + akkaProperties ::: (if (getBoolean("sbt.log.noformat")) List("-Dakka.test.nocolor=true") else Nil) } // for excluding tests by name use system property: -Dakka.test.names.exclude=TimingSpec From aca66de73296ad68b681079dee6c892b9acd8832 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 27 Jun 2012 10:56:00 +0200 Subject: [PATCH 02/13] Test gossip in large cluster, see #2239 --- .../src/main/scala/akka/cluster/Cluster.scala | 9 + .../scala/akka/cluster/LargeClusterSpec.scala | 267 ++++++++++++++++++ 2 files changed, 276 insertions(+) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 357d610ed5..eb1c2c08fb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -28,6 +28,7 @@ import MemberStatus._ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } import scala.collection.GenTraversableOnce +import java.util.concurrent.atomic.AtomicLong /** * Interface for membership change listener. @@ -948,6 +949,13 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } + // Can be removed when gossip has been optimized + private val _receivedGossipCount = new AtomicLong + /** + * INTERNAL API. + */ + private[cluster] def receivedGossipCount: Long = _receivedGossipCount.get + /** * INTERNAL API. * @@ -995,6 +1003,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update else { log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + _receivedGossipCount.incrementAndGet() notifyMembershipChangeListeners(localState, newState) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala new file mode 100644 index 0000000000..a12fc90ff9 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -0,0 +1,267 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.ActorSystem +import akka.util.Deadline +import java.util.concurrent.TimeoutException +import scala.collection.immutable.SortedSet +import akka.dispatch.Await +import akka.util.Duration +import java.util.concurrent.TimeUnit +import akka.remote.testconductor.RoleName + +object LargeClusterMultiJvmSpec extends MultiNodeConfig { + // each jvm simulates a datacenter with many nodes + val firstDatacenter = role("first-datacenter") + val secondDatacenter = role("second-datacenter") + val thirdDatacenter = role("third-datacenter") + val fourthDatacenter = role("fourth-datacenter") + val fifthDatacenter = role("fifth-datacenter") + + // Note that this test uses default configuration, + // not MultiNodeClusterSpec.clusterConfig + commonConfig(ConfigFactory.parseString(""" + # Number of ActorSystems in each jvm, can be specified as + # system property when running real tests. Many nodes + # will take long time. + akka.test.large-cluster-spec.nodes-per-datacenter = 2 + akka.cluster { + gossip-interval = 500 ms + auto-join = off + failure-detector.threshold = 4 + } + akka.loglevel = INFO + akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 2 + akka.scheduler.tick-duration = 33 ms + akka.remote.netty.execution-pool-size = 1 + """)) +} + +class LargeClusterMultiJvmNode1 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode2 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode3 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode4 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode5 extends LargeClusterSpec with AccrualFailureDetectorStrategy + +abstract class LargeClusterSpec + extends MultiNodeSpec(LargeClusterMultiJvmSpec) + with MultiNodeClusterSpec { + + import LargeClusterMultiJvmSpec._ + + var systems: IndexedSeq[ActorSystem] = IndexedSeq(system) + val nodesPerDatacenter = system.settings.config.getInt( + "akka.test.large-cluster-spec.nodes-per-datacenter") + + /** + * Since we start some ActorSystems/Clusters outside of the + * MultiNodeClusterSpec control we can't use use the mechanism + * defined in MultiNodeClusterSpec to inject failure detector etc. + * Use ordinary Cluster extension with default AccrualFailureDetector. + */ + override def cluster: Cluster = Cluster(system) + + override def atTermination(): Unit = { + systems foreach { _.shutdown } + val shutdownTimeout = 20.seconds + val deadline = Deadline.now + shutdownTimeout + systems.foreach { sys ⇒ + if (sys.isTerminated) + () // already done + else if (deadline.isOverdue) + sys.log.warning("Failed to shutdown [{}] within [{}]", sys.name, shutdownTimeout) + else { + try sys.awaitTermination(deadline.timeLeft) catch { + case _: TimeoutException ⇒ sys.log.warning("Failed to shutdown [{}] within [{}]", sys.name, shutdownTimeout) + } + } + } + } + + def startupSystems(): Unit = { + // one system is already started by the multi-node test + for (n ← 2 to nodesPerDatacenter) + systems :+= ActorSystem(myself.name + "-" + n, system.settings.config) + + // Initialize the Cluster extensions, i.e. startup the clusters + systems foreach { Cluster(_) } + } + + def expectedMaxDuration(totalNodes: Int): Duration = + 5.seconds + (2.seconds * totalNodes) + + def joinAll(from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = { + val joiningClusters = systems.map(Cluster(_)).toSet + join(joiningClusters, from, to, totalNodes, runOnRoles: _*) + } + + def join(joiningClusterNodes: Set[Cluster], from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = { + runOnRoles must contain(from) + runOnRoles must contain(to) + + runOn(runOnRoles: _*) { + systems.size must be(nodesPerDatacenter) // make sure it is initialized + + val clusterNodes = ifNode(from)(joiningClusterNodes)(systems.map(Cluster(_)).toSet) + val startGossipCounts = Map.empty[Cluster, Long] ++ + clusterNodes.map(c ⇒ (c -> c.receivedGossipCount)) + def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c) + val startTime = System.nanoTime + def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" + + val latch = TestLatch(clusterNodes.size) + clusterNodes foreach { c ⇒ + c.registerListener(new MembershipChangeListener { + override def notify(members: SortedSet[Member]): Unit = { + if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) { + log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages", + totalNodes, c.selfAddress, tookMillis, gossipCount(c)) + latch.countDown() + } + } + }) + } + + runOn(from) { + clusterNodes foreach { _ join to } + } + + Await.ready(latch, remaining) + + awaitCond(clusterNodes.forall(_.convergence.isDefined)) + val counts = clusterNodes.map(gossipCount(_)) + val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max) + log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node", + totalNodes, tookMillis, formattedStats) + + } + } + + "A large cluster" must { + + "join all nodes in first-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(firstDatacenter) { + startupSystems() + startClusterNode() + } + enterBarrier("first-datacenter-started") + + val totalNodes = nodesPerDatacenter + within(expectedMaxDuration(totalNodes)) { + joinAll(from = firstDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter) + enterBarrier("first-datacenter-joined") + } + } + + "join all nodes in second-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(secondDatacenter) { + startupSystems() + } + enterBarrier("second-datacenter-started") + + val totalNodes = nodesPerDatacenter * 2 + within(expectedMaxDuration(totalNodes)) { + joinAll(from = secondDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter, secondDatacenter) + enterBarrier("second-datacenter-joined") + } + } + + "join all nodes in third-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(thirdDatacenter) { + startupSystems() + } + enterBarrier("third-datacenter-started") + + val totalNodes = nodesPerDatacenter * 3 + within(expectedMaxDuration(totalNodes)) { + joinAll(from = thirdDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter) + enterBarrier("third-datacenter-joined") + } + } + + "join all nodes in fourth-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(fourthDatacenter) { + startupSystems() + } + enterBarrier("fourth-datacenter-started") + + val totalNodes = nodesPerDatacenter * 4 + within(expectedMaxDuration(totalNodes)) { + joinAll(from = fourthDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter) + enterBarrier("fourth-datacenter-joined") + } + } + + "join nodes one by one from fifth-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(fifthDatacenter) { + startupSystems() + } + enterBarrier("fifth-datacenter-started") + + for (i ← 0 until nodesPerDatacenter) { + val totalNodes = nodesPerDatacenter * 4 + i + 1 + within(expectedMaxDuration(totalNodes)) { + val joiningClusters = ifNode(fifthDatacenter)(Set(Cluster(systems(i))))(Set.empty) + join(joiningClusters, from = fifthDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) + enterBarrier("fifth-datacenter-joined-" + i) + } + } + } + + // FIXME sometimes this fails, FD marks nodes from other than second-datacenter as unavailable + "detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest ignore { + val unreachableNodes = nodesPerDatacenter + val liveNodes = nodesPerDatacenter * 4 + + within(20.seconds + expectedMaxDuration(liveNodes)) { + val startGossipCounts = Map.empty[Cluster, Long] ++ + systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).receivedGossipCount)) + def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c) + val startTime = System.nanoTime + def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" + + val latch = TestLatch(nodesPerDatacenter) + systems foreach { sys ⇒ + Cluster(sys).registerListener(new MembershipChangeListener { + override def notify(members: SortedSet[Member]): Unit = { + if (!latch.isOpen && members.size == liveNodes && Cluster(sys).latestGossip.overview.unreachable.size == unreachableNodes) { + log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", + unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) + latch.countDown() + } + } + }) + } + + runOn(firstDatacenter) { + testConductor.shutdown(secondDatacenter, 0) + } + + enterBarrier("second-datacenter-shutdown") + + runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { + Await.ready(latch, remaining) + awaitCond(systems.forall(Cluster(_).convergence.isDefined)) + val counts = systems.map(sys ⇒ gossipCount(Cluster(sys))) + val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) + log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node", + liveNodes, tookMillis, formattedStats) + } + + enterBarrier("after-6") + } + + } + + } +} From 2da1a912fe58e629a4acf30727e0a63e0e83cfb3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 28 Jun 2012 11:36:13 +0200 Subject: [PATCH 03/13] Improve efficiency of gossip, see #2193 and #2253 * Essentially as already described in cluster specification, but now fully implemented and tested with LargeClusterSpec * Gossip to nodes with different view (using seen table) with certain probability * Gossip chat, gossip back to sender * Immediate gossip to joining node * Updated some tests to reflect current implementation --- .../src/main/scala/akka/cluster/Cluster.scala | 40 +++- .../scala/akka/cluster/SunnyWeatherSpec.scala | 2 - .../scala/akka/cluster/TransitionSpec.scala | 204 +++--------------- .../test/scala/akka/cluster/ClusterSpec.scala | 3 +- akka-docs/cluster/cluster.rst | 9 +- 5 files changed, 67 insertions(+), 191 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index eb1c2c08fb..0d87a4b89c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -832,7 +832,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) else { log.info("Cluster Node [{}] - Node [{}] is JOINING", selfAddress, node) // treat join as initial heartbeat, so that it becomes unavailable if nothing more happens - if (node != selfAddress) failureDetector heartbeat node + if (node != selfAddress) { + failureDetector heartbeat node + gossipTo(node) + } notifyMembershipChangeListeners(localState, newState) } } @@ -974,10 +977,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip :+ vclockNode - log.debug( - """Can't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merging them into [{}]""", - remoteGossip, localGossip, versionedMergedGossip) - versionedMergedGossip } else if (remoteGossip.version < localGossip.version) { @@ -1003,8 +1002,20 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update else { log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + + if ((winningGossip ne localGossip) && (winningGossip ne remoteGossip)) + log.debug( + """Couldn't establish a causal relationship between "remote" gossip and "local" gossip - Remote[{}] - Local[{}] - merged them into [{}]""", + remoteGossip, localGossip, winningGossip) + _receivedGossipCount.incrementAndGet() notifyMembershipChangeListeners(localState, newState) + + if ((winningGossip ne remoteGossip) || (newState.latestGossip ne remoteGossip)) { + // send back gossip to sender when sender had different view, i.e. merge, or sender had + // older or sender had newer + gossipTo(from) + } } } } @@ -1055,6 +1066,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } + private def gossipToDifferentViewProbability: Double = 0.8 + /** * INTERNAL API. * @@ -1075,8 +1088,21 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val localUnreachableMembers = localGossip.overview.unreachable.toIndexedSeq val localUnreachableSize = localUnreachableMembers.size - // 1. gossip to alive members - val gossipedToAlive = gossipToRandomNodeOf(localMemberAddresses) + // 1. gossip to a random alive member with preference to a member + // with older or newer gossip version + val nodesWithdifferentView = { + val localMemberAddressesSet = localGossip.members map { _.address } + for { + (address, version) ← localGossip.overview.seen + if localMemberAddressesSet contains address + if version != localGossip.version + } yield address + } + val gossipedToAlive = + if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < gossipToDifferentViewProbability) + gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq) + else + gossipToRandomNodeOf(localMemberAddresses) // 2. gossip to a deputy nodes for facilitating partition healing val deputies = deputyNodes(localMemberAddresses) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index 3be082d2f3..ddacf668e0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -23,8 +23,6 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString(""" akka.cluster { - # FIXME remove this (use default) when ticket #2239 has been fixed - gossip-interval = 400 ms auto-join = off } akka.loglevel = INFO diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 397d824ef4..c4e43b9abf 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -16,8 +16,6 @@ object TransitionMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") val third = role("third") - val fourth = role("fourth") - val fifth = role("fifth") commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString("akka.cluster.periodic-tasks-initial-delay = 300 s # turn off all periodic tasks")). @@ -27,8 +25,6 @@ object TransitionMultiJvmSpec extends MultiNodeConfig { class TransitionMultiJvmNode1 extends TransitionSpec with FailureDetectorPuppetStrategy class TransitionMultiJvmNode2 extends TransitionSpec with FailureDetectorPuppetStrategy class TransitionMultiJvmNode3 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode4 extends TransitionSpec with FailureDetectorPuppetStrategy -class TransitionMultiJvmNode5 extends TransitionSpec with FailureDetectorPuppetStrategy abstract class TransitionSpec extends MultiNodeSpec(TransitionMultiJvmSpec) @@ -81,11 +77,15 @@ abstract class TransitionSpec val g = cluster.latestGossip enterBarrier("before-gossip-" + gossipBarrierCounter) awaitCond(cluster.latestGossip != g) // received gossip + // gossip chat will synchronize the views + awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) } runOn(fromRole) { enterBarrier("before-gossip-" + gossipBarrierCounter) cluster.gossipTo(toRole) // send gossip + // gossip chat will synchronize the views + awaitCond((Set(fromRole, toRole) -- seenLatestGossip).isEmpty) enterBarrier("after-gossip-" + gossipBarrierCounter) } runOn(roles.filterNot(r ⇒ r == fromRole || r == toRole): _*) { @@ -116,22 +116,12 @@ abstract class TransitionSpec runOn(second) { cluster.join(first) } - runOn(first) { + runOn(first, second) { + // gossip chat from the join will synchronize the views awaitMembers(first, second) memberStatus(first) must be(Up) memberStatus(second) must be(Joining) - seenLatestGossip must be(Set(first)) - cluster.convergence.isDefined must be(false) - } - enterBarrier("second-joined") - - first gossipTo second - second gossipTo first - - runOn(first, second) { - memberStatus(first) must be(Up) - memberStatus(second) must be(Joining) - seenLatestGossip must be(Set(first, second)) + awaitCond(seenLatestGossip == Set(first, second)) cluster.convergence.isDefined must be(true) } enterBarrier("convergence-joining-2") @@ -144,18 +134,11 @@ abstract class TransitionSpec enterBarrier("leader-actions-2") leader(first, second) gossipTo nonLeader(first, second).head - runOn(nonLeader(first, second).head) { - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - seenLatestGossip must be(Set(first, second)) - cluster.convergence.isDefined must be(true) - } - - nonLeader(first, second).head gossipTo leader(first, second) runOn(first, second) { - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) + // gossip chat will synchronize the views + awaitCond(memberStatus(second) == Up) seenLatestGossip must be(Set(first, second)) + memberStatus(first) must be(Up) cluster.convergence.isDefined must be(true) } @@ -167,25 +150,26 @@ abstract class TransitionSpec runOn(third) { cluster.join(second) } - runOn(second) { + runOn(second, third) { + // gossip chat from the join will synchronize the views awaitMembers(first, second, third) - cluster.convergence.isDefined must be(false) memberStatus(third) must be(Joining) - seenLatestGossip must be(Set(second)) + awaitCond(seenLatestGossip == Set(second, third)) + cluster.convergence.isDefined must be(false) } enterBarrier("third-joined-second") second gossipTo first - runOn(first) { - members must be(Set(first, second, third)) + runOn(first, second) { + // gossip chat will synchronize the views + awaitMembers(first, second, third) memberStatus(third) must be(Joining) - seenLatestGossip must be(Set(first, second)) - cluster.convergence.isDefined must be(false) + awaitCond(memberStatus(second) == Up) + seenLatestGossip must be(Set(first, second, third)) + cluster.convergence.isDefined must be(true) } first gossipTo third - third gossipTo first - third gossipTo second runOn(first, second, third) { members must be(Set(first, second, third)) memberStatus(first) must be(Up) @@ -224,14 +208,6 @@ abstract class TransitionSpec cluster.convergence.isDefined must be(true) } - // and back again - nonLeader(first, second, third).tail.head gossipTo nonLeader(first, second, third).head - runOn(nonLeader(first, second, third).head) { - memberStatus(third) must be(Up) - seenLatestGossip must be(Set(first, second, third)) - cluster.convergence.isDefined must be(true) - } - // first non-leader gossipTo the leader nonLeader(first, second, third).head gossipTo leader(first, second, third) runOn(first, second, third) { @@ -245,160 +221,36 @@ abstract class TransitionSpec enterBarrier("after-3") } - "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { - runOn(fifth) { - startClusterNode() - cluster.leaderActions() - cluster.status must be(Up) - } - enterBarrier("fifth-started") - - runOn(fourth) { - cluster.join(fifth) - } - runOn(fifth) { - awaitMembers(fourth, fifth) - } - enterBarrier("fourth-joined") - - fifth gossipTo fourth - fourth gossipTo fifth - - runOn(fourth, fifth) { - memberStatus(fourth) must be(Joining) - memberStatus(fifth) must be(Up) - seenLatestGossip must be(Set(fourth, fifth)) - cluster.convergence.isDefined must be(true) - } - - enterBarrier("after-4") - } - - "perform correct transitions when second cluster (node fourth) joins first cluster (node third)" taggedAs LongRunningTest in { - - runOn(fourth) { - cluster.join(third) - } - runOn(third) { - awaitMembers(first, second, third, fourth) - seenLatestGossip must be(Set(third)) - } - enterBarrier("fourth-joined-third") - - third gossipTo second - runOn(second) { - seenLatestGossip must be(Set(second, third)) - } - - second gossipTo fourth - runOn(fourth) { - members must be(roles.toSet) - // merge conflict - seenLatestGossip must be(Set(fourth)) - } - - fourth gossipTo first - fourth gossipTo second - fourth gossipTo third - fourth gossipTo fifth - runOn(first, second, third, fifth) { - members must be(roles.toSet) - seenLatestGossip must be(Set(fourth, myself)) - } - - first gossipTo fifth - runOn(fifth) { - seenLatestGossip must be(Set(first, fourth, fifth)) - } - - fifth gossipTo third - runOn(third) { - seenLatestGossip must be(Set(first, third, fourth, fifth)) - } - - third gossipTo second - runOn(second) { - seenLatestGossip must be(roles.toSet) - cluster.convergence.isDefined must be(true) - } - - second gossipTo first - second gossipTo third - second gossipTo fourth - third gossipTo fifth - - seenLatestGossip must be(roles.toSet) - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - memberStatus(third) must be(Up) - memberStatus(fourth) must be(Joining) - memberStatus(fifth) must be(Up) - cluster.convergence.isDefined must be(true) - - enterBarrier("convergence-joining-3") - - runOn(leader(roles: _*)) { - cluster.leaderActions() - memberStatus(fourth) must be(Up) - seenLatestGossip must be(Set(myself)) - cluster.convergence.isDefined must be(false) - } - // spread the word - for (x :: y :: Nil ← (roles.sorted ++ roles.sorted.dropRight(1)).toList.sliding(2)) { - x gossipTo y - } - - enterBarrier("spread-5") - - seenLatestGossip must be(roles.toSet) - memberStatus(first) must be(Up) - memberStatus(second) must be(Up) - memberStatus(third) must be(Up) - memberStatus(fourth) must be(Up) - memberStatus(fifth) must be(Up) - cluster.convergence.isDefined must be(true) - - enterBarrier("after-5") - } - "perform correct transitions when second becomes unavailble" taggedAs LongRunningTest in { - runOn(fifth) { + runOn(third) { markNodeAsUnavailable(second) cluster.reapUnreachableMembers() cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) - seenLatestGossip must be(Set(fifth)) + seenLatestGossip must be(Set(third)) } enterBarrier("after-second-unavailble") - // spread the word - val gossipRound = List(fifth, fourth, third, first, third, fourth, fifth) - for (x :: y :: Nil ← gossipRound.sliding(2)) { - x gossipTo y - } + third gossipTo first - runOn((roles.filterNot(_ == second)): _*) { + runOn(first, third) { cluster.latestGossip.overview.unreachable must contain(Member(second, Up)) cluster.convergence.isDefined must be(false) } - runOn(third) { + runOn(first) { cluster.down(second) awaitMemberStatus(second, Down) } enterBarrier("after-second-down") - // spread the word - val gossipRound2 = List(third, fourth, fifth, first, third, fourth, fifth) - for (x :: y :: Nil ← gossipRound2.sliding(2)) { - x gossipTo y - } + first gossipTo third - runOn((roles.filterNot(_ == second)): _*) { + runOn(first, third) { cluster.latestGossip.overview.unreachable must contain(Member(second, Down)) memberStatus(second) must be(Down) - seenLatestGossip must be(Set(first, third, fourth, fifth)) + seenLatestGossip must be(Set(first, third)) cluster.convergence.isDefined must be(true) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index e818847969..68731b89b2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -105,12 +105,14 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { cluster.latestGossip.members.map(_.address) must be(Set(selfAddress, addresses(1))) memberStatus(addresses(1)) must be(Some(MemberStatus.Joining)) cluster.convergence.isDefined must be(false) + expectMsg(GossipTo(addresses(1))) } "accept a few more joining nodes" in { for (a ← addresses.drop(2)) { cluster.joining(a) memberStatus(a) must be(Some(MemberStatus.Joining)) + expectMsg(GossipTo(a)) } cluster.latestGossip.members.map(_.address) must be(addresses.toSet) } @@ -121,7 +123,6 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { } "gossip to random live node" in { - cluster.latestGossip.members cluster.gossip() cluster.gossip() cluster.gossip() diff --git a/akka-docs/cluster/cluster.rst b/akka-docs/cluster/cluster.rst index 1812c33561..cbad3ef690 100644 --- a/akka-docs/cluster/cluster.rst +++ b/akka-docs/cluster/cluster.rst @@ -138,7 +138,7 @@ implementation of `The Phi Accrual Failure Detector`_ by Hayashibara et al. An accrual failure detector decouple monitoring and interpretation. That makes them applicable to a wider area of scenarios and more adequate to build generic failure detection services. The idea is that it is keeping a history of failure -statistics, calculated from heartbeats received from the gossip protocol, and is +statistics, calculated from heartbeats received from other nodes, and is trying to do educated guesses by taking multiple factors, and how they accumulate over time, into account in order to come up with a better guess if a specific node is up or down. Rather than just answering "yes" or "no" to the @@ -232,15 +232,14 @@ breaking logical partitions as seen in the gossip algorithm defined below. During each round of gossip exchange the following process is used: -1. Gossip to random live node (if any) +1. Gossip to random node with newer or older state information, if any, based on the + current gossip overview, with some probability. Otherwise Gossip to any random + live node. 2. If the node gossiped to at (1) was not a ``deputy`` node, or the number of live nodes is less than number of ``deputy`` nodes, gossip to random ``deputy`` node with certain probability depending on number of unreachable, ``deputy``, and live nodes. -3. Gossip to random node with newer or older state information, based on the - current gossip overview, with some probability (?) - The gossiper only sends the gossip overview to the chosen node. The recipient of the gossip can use the gossip overview to determine whether: From 211732391dc5738e5e4d301c129b379987592f6a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Jun 2012 08:42:52 +0200 Subject: [PATCH 04/13] Minor improvement of LargeClusterSpec, see #2239 --- .../scala/akka/cluster/LargeClusterSpec.scala | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala index a12fc90ff9..e3dc7719c1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -30,7 +30,8 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" # Number of ActorSystems in each jvm, can be specified as # system property when running real tests. Many nodes - # will take long time. + # will take long time and consume many threads. + # 10 => 50 nodes is possible to run on one machine. akka.test.large-cluster-spec.nodes-per-datacenter = 2 akka.cluster { gossip-interval = 500 ms @@ -40,7 +41,19 @@ object LargeClusterMultiJvmSpec extends MultiNodeConfig { akka.loglevel = INFO akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 2 akka.scheduler.tick-duration = 33 ms - akka.remote.netty.execution-pool-size = 1 + akka.remote.netty.execution-pool-size = 0 + + # don't use testconductor transport in this test, especially not + # when using use-dispatcher-for-io + akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" + + # Using a separate dispatcher for netty io doesn't reduce number + # of needed threads + # akka.remote.netty.use-dispatcher-for-io=akka.test.io-dispatcher + # akka.test.io-dispatcher.fork-join-executor { + # parallelism-min = 100 + # parallelism-max = 100 + # } """)) } @@ -207,13 +220,26 @@ abstract class LargeClusterSpec } enterBarrier("fifth-datacenter-started") - for (i ← 0 until nodesPerDatacenter) { - val totalNodes = nodesPerDatacenter * 4 + i + 1 + // enough to join a few one-by-one (takes too long time otherwise) + val (bulk, oneByOne) = systems.splitAt(systems.size - 3) + + if (bulk.nonEmpty) { + val totalNodes = nodesPerDatacenter * 4 + bulk.size within(expectedMaxDuration(totalNodes)) { - val joiningClusters = ifNode(fifthDatacenter)(Set(Cluster(systems(i))))(Set.empty) + val joiningClusters = ifNode(fifthDatacenter)(bulk.map(Cluster(_)).toSet)(Set.empty) join(joiningClusters, from = fifthDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) - enterBarrier("fifth-datacenter-joined-" + i) + enterBarrier("fifth-datacenter-joined-" + bulk.size) + } + } + + for (i ← 0 until oneByOne.size) { + val totalNodes = nodesPerDatacenter * 4 + bulk.size + i + 1 + within(expectedMaxDuration(totalNodes)) { + val joiningClusters = ifNode(fifthDatacenter)(Set(Cluster(oneByOne(i))))(Set.empty) + join(joiningClusters, from = fifthDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) + enterBarrier("fifth-datacenter-joined-" + (bulk.size + i)) } } } From d47ff04c0323b08f05eeaa3ab49de0d10d424bc3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Jun 2012 08:56:58 +0200 Subject: [PATCH 05/13] Moved GossipDifferentViewProbability to config, see #2253 --- akka-cluster/src/main/resources/reference.conf | 5 +++++ akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 4 +--- .../src/main/scala/akka/cluster/ClusterSettings.scala | 1 + .../src/test/scala/akka/cluster/ClusterConfigSpec.scala | 1 + 4 files changed, 8 insertions(+), 3 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index a06e9273cb..b60b91ec43 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -52,6 +52,11 @@ akka { # of the cluster within this deadline. join-timeout = 60s + # Gossip to random node with newer or older state information, if any with some + # this probability. Otherwise Gossip to any random live node. + # Probability value is between 0.0 and 1.0. 0.0 means never, 1.0 means always. + gossip-different-view-probability = 0.8 + failure-detector { # defines the failure detector threshold diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 0d87a4b89c..40d67d7161 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1066,8 +1066,6 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } - private def gossipToDifferentViewProbability: Double = 0.8 - /** * INTERNAL API. * @@ -1099,7 +1097,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } yield address } val gossipedToAlive = - if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < gossipToDifferentViewProbability) + if (nodesWithdifferentView.nonEmpty && ThreadLocalRandom.current.nextDouble() < GossipDifferentViewProbability) gossipToRandomNodeOf(nodesWithdifferentView.toIndexedSeq) else gossipToRandomNodeOf(localMemberAddresses) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 08a9b5160d..6e4cbc4e60 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -36,6 +36,7 @@ class ClusterSettings(val config: Config, val systemName: String) { final val AutoJoin: Boolean = getBoolean("akka.cluster.auto-join") final val AutoDown: Boolean = getBoolean("akka.cluster.auto-down") final val JoinTimeout: Duration = Duration(getMilliseconds("akka.cluster.join-timeout"), MILLISECONDS) + final val GossipDifferentViewProbability: Double = getDouble("akka.cluster.gossip-different-view-probability") final val SchedulerTickDuration: Duration = Duration(getMilliseconds("akka.cluster.scheduler.tick-duration"), MILLISECONDS) final val SchedulerTicksPerWheel: Int = getInt("akka.cluster.scheduler.ticks-per-wheel") } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 92e219a540..07671c6164 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -32,6 +32,7 @@ class ClusterConfigSpec extends AkkaSpec { NrOfGossipDaemons must be(4) AutoJoin must be(true) AutoDown must be(true) + GossipDifferentViewProbability must be(0.8 plusOrMinus 0.0001) SchedulerTickDuration must be(33 millis) SchedulerTicksPerWheel must be(512) } From ab3a26d0d25cdd743887ba3b3f62df87e289b6dd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 29 Jun 2012 10:57:34 +0200 Subject: [PATCH 06/13] Fix LogRoleReplace, remote lifecyle not logged --- .../akka/remote/testkit/LogRoleReplace.scala | 20 ++++--------------- .../akka/remote/testkit/MultiNodeSpec.scala | 8 ++++---- 2 files changed, 8 insertions(+), 20 deletions(-) diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala index 1e5a53d82e..5dd41365bf 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala @@ -88,12 +88,10 @@ object LogRoleReplace extends ClipboardOwner { class LogRoleReplace { - private val RoleStarted = """\[([\w\-]+)\].*Role \[([\w]+)\] started""".r - private val RemoteServerStarted = """\[([\w\-]+)\].*RemoteServerStarted@akka://.*@([\w\-\.]+):([0-9]+)""".r + private val RoleStarted = """\[([\w\-]+)\].*Role \[([\w]+)\] started with address \[akka://.*@([\w\-\.]+):([0-9]+)\]""".r private val ColorCode = """\[[0-9]+m""" private var replacements: Map[String, String] = Map.empty - private var jvmToAddress: Map[String, String] = Map.empty def process(in: BufferedReader, out: PrintWriter): Unit = { @@ -121,23 +119,13 @@ class LogRoleReplace { if (line.startsWith("[info] * ")) { // reset when new test begins replacements = Map.empty - jvmToAddress = Map.empty } line match { - case RemoteServerStarted(jvm, host, port) ⇒ - jvmToAddress += (jvm -> (host + ":" + port)) + case RoleStarted(jvm, role, host, port) ⇒ + replacements += (jvm -> role) + replacements += ((host + ":" + port) -> role) false - - case RoleStarted(jvm, role) ⇒ - jvmToAddress.get(jvm) match { - case Some(address) ⇒ - replacements += (jvm -> role) - replacements += (address -> role) - false - case None ⇒ false - } - case _ ⇒ true } } diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 25bb8df7dc..8abfd887e5 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -4,9 +4,7 @@ package akka.remote.testkit import java.net.InetSocketAddress - import com.typesafe.config.{ ConfigObject, ConfigFactory, Config } - import akka.actor.{ RootActorPath, ActorPath, ActorSystem, ExtendedActorSystem } import akka.dispatch.Await import akka.dispatch.Await.Awaitable @@ -14,6 +12,7 @@ import akka.remote.testconductor.{ TestConductorExt, TestConductor, RoleName } import akka.testkit.AkkaSpec import akka.util.{ Timeout, NonFatal } import akka.util.duration._ +import akka.remote.RemoteActorRefProvider /** * Configure the role names and participants of the test, including configuration settings. @@ -259,8 +258,9 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: } } - // useful to see which jvm is running which role - log.info("Role [{}] started", myself.name) + // useful to see which jvm is running which role, used by LogRoleReplace utility + log.info("Role [{}] started with address [{}]", myself.name, + system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport.address) // wait for all nodes to remove themselves before we shut the conductor down final override def beforeShutdown() = { From 2ea0bba9e9efcd76d0571b54595134dff4042923 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Tue, 5 Jun 2012 15:46:26 +0200 Subject: [PATCH 07/13] Cluster node that is UNREACHABLE and rejoins. see #2160 --- .../akka/cluster/MultiNodeClusterSpec.scala | 4 + .../UnreachableNodeRejoinsClusterSpec.scala | 154 ++++++++++++++++++ .../akka/remote/testconductor/Conductor.scala | 13 ++ 3 files changed, 171 insertions(+) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 3264c661b0..8e0f781ceb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -203,6 +203,10 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu nodesInCluster.sorted.head } + def clusterSortedRoles(nodesInCluster: Seq[RoleName]): Seq[RoleName] = { + nodesInCluster.sorted + } + /** * Sort the roles in the order used by the cluster. */ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala new file mode 100644 index 0000000000..6ce00687bf --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -0,0 +1,154 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import akka.actor.Address +import akka.remote.testconductor.{RoleName, Direction} +import akka.util.duration._ + +object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + val allRoles = Seq(first, second, third, fourth) + + def allBut(role: RoleName, roles: Seq[RoleName] = allRoles): Seq[RoleName] = { + roles.filter(_ != role) + } + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + failure-detector.threshold = 5 + } """) + ).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class UnreachableNodeRejoinsClusterMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec +class UnreachableNodeRejoinsClusterMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec +class UnreachableNodeRejoinsClusterMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec +class UnreachableNodeRejoinsClusterMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec + +class UnreachableNodeRejoinsClusterSpec + extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with BeforeAndAfter { + import UnreachableNodeRejoinsClusterMultiJvmSpec._ + + override def initialParticipants = allRoles.size + + val sortedRoles = clusterSortedRoles(allRoles) + val master = sortedRoles(0) + val victim = sortedRoles(1) + + var endBarrierNumber = 0 + def endBarrier = { + endBarrierNumber += 1 + testConductor.enter("after_" + endBarrierNumber) + } + + "A cluster of " + allRoles.size + " members" must { + + "reach initial convergence" taggedAs LongRunningTest in { + runOn(master) { + cluster.self + awaitUpConvergence(numberOfMembers = allRoles.size) + } + + runOn(allBut(master):_*) { + cluster.join(node(master).address) + awaitUpConvergence(numberOfMembers = allRoles.size) + } + + endBarrier + } + + "mark a node as UNREACHABLE when we pull the network" taggedAs LongRunningTest in { + runOn(first) { + // pull network for victim node from all nodes + allBut(victim).foreach { roleName => + testConductor.blackhole(victim, roleName, Direction.Both).await + } + testConductor.enter("unplug_victim") + } + + runOn(allBut(first):_*) { + testConductor.enter("unplug_victim") + } + + runOn(victim) { + val otherAddresses = sortedRoles.filter(_ != victim).map(node(_).address) + within(30 seconds) { + awaitCond(cluster.latestGossip.overview.unreachable.size == (allRoles.size - 1)) + awaitCond(cluster.latestGossip.members.size == 1) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + cluster.latestGossip.overview.unreachable.map(_.address) must be(otherAddresses.toSet) + cluster.convergence.isDefined must be(false) + } + } + + val allButVictim = allBut(victim) + runOn(allButVictim: _*) { + val victimAddress = node(victim).address + val otherAddresses = allButVictim.map(node(_).address) + within(30 seconds) { + // victim becomes unreachable + awaitCond(cluster.latestGossip.overview.unreachable.size == 1) + awaitCond(cluster.latestGossip.members.size == (allRoles.size - 1)) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitSeenSameState(otherAddresses) + // still one unreachable + cluster.latestGossip.overview.unreachable.size must be(1) + cluster.latestGossip.overview.unreachable.head.address must be(victimAddress) + // and therefore no convergence + cluster.convergence.isDefined must be(false) + } + } + + endBarrier + } + + "mark the node as DOWN" taggedAs LongRunningTest in { + val victimAddress = node(victim).address + runOn(master) { + cluster.down(victimAddress) + } + + runOn(allBut(victim):_*) { + awaitUpConvergence(allRoles.size - 1, Seq(victimAddress)) + } + + endBarrier + } + + "allow node to REJOIN when the network is plugged back in" taggedAs LongRunningTest in { + runOn(first) { + // put the network back in + allBut(victim).foreach { roleName => + testConductor.passThrough(victim, roleName, Direction.Both).await + } + testConductor.enter("plug_in_victim") + } + + runOn(allBut(first):_*) { + testConductor.enter("plug_in_victim") + } + + runOn(victim) { + cluster.join(node(master).address) + } + + awaitUpConvergence(allRoles.size) + + endBarrier + } + } +} diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index eba0fffe63..24377d54a1 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -139,6 +139,19 @@ trait Conductor { this: TestConductorExt ⇒ controller ? Throttle(node, target, direction, 0f) mapTo } + /** + * Switch the Netty pipeline of the remote support into pass through mode for + * sending and/or receiving. + * + * @param node is the symbolic name of the node which is to be affected + * @param target is the symbolic name of the other node to which connectivity shall be impeded + * @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both` + */ + def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = { + import Settings.QueryTimeout + controller ? Throttle(node, target, direction, -1f) mapTo + } + /** * Tell the remote support to shutdown the connection to the given remote * peer. It works regardless of whether the recipient was initiator or From 9691dd0325a015830ac40064dd32c63a6f63ba72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 7 Jun 2012 10:37:04 +0200 Subject: [PATCH 08/13] Changes after review --- .../UnreachableNodeRejoinsClusterSpec.scala | 35 ++++++------------- 1 file changed, 10 insertions(+), 25 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 6ce00687bf..da9d62e1d2 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -21,7 +21,7 @@ object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { val allRoles = Seq(first, second, third, fourth) def allBut(role: RoleName, roles: Seq[RoleName] = allRoles): Seq[RoleName] = { - roles.filter(_ != role) + roles.filterNot(_ == role) } commonConfig(debugConfig(on = false). @@ -45,12 +45,12 @@ class UnreachableNodeRejoinsClusterSpec override def initialParticipants = allRoles.size - val sortedRoles = clusterSortedRoles(allRoles) - val master = sortedRoles(0) - val victim = sortedRoles(1) + lazy val sortedRoles = clusterSortedRoles(allRoles) + lazy val master = sortedRoles(0) + lazy val victim = sortedRoles(1) var endBarrierNumber = 0 - def endBarrier = { + def endBarrier: Unit = { endBarrierNumber += 1 testConductor.enter("after_" + endBarrierNumber) } @@ -58,16 +58,7 @@ class UnreachableNodeRejoinsClusterSpec "A cluster of " + allRoles.size + " members" must { "reach initial convergence" taggedAs LongRunningTest in { - runOn(master) { - cluster.self - awaitUpConvergence(numberOfMembers = allRoles.size) - } - - runOn(allBut(master):_*) { - cluster.join(node(master).address) - awaitUpConvergence(numberOfMembers = allRoles.size) - } - + awaitClusterUp(allRoles:_*) endBarrier } @@ -77,15 +68,12 @@ class UnreachableNodeRejoinsClusterSpec allBut(victim).foreach { roleName => testConductor.blackhole(victim, roleName, Direction.Both).await } - testConductor.enter("unplug_victim") } - runOn(allBut(first):_*) { - testConductor.enter("unplug_victim") - } + testConductor.enter("unplug_victim") runOn(victim) { - val otherAddresses = sortedRoles.filter(_ != victim).map(node(_).address) + val otherAddresses = sortedRoles.collect { case x if x != victim => node(x).address } within(30 seconds) { awaitCond(cluster.latestGossip.overview.unreachable.size == (allRoles.size - 1)) awaitCond(cluster.latestGossip.members.size == 1) @@ -96,7 +84,7 @@ class UnreachableNodeRejoinsClusterSpec } val allButVictim = allBut(victim) - runOn(allButVictim: _*) { + runOn(allButVictim:_*) { val victimAddress = node(victim).address val otherAddresses = allButVictim.map(node(_).address) within(30 seconds) { @@ -135,12 +123,9 @@ class UnreachableNodeRejoinsClusterSpec allBut(victim).foreach { roleName => testConductor.passThrough(victim, roleName, Direction.Both).await } - testConductor.enter("plug_in_victim") } - runOn(allBut(first):_*) { - testConductor.enter("plug_in_victim") - } + testConductor.enter("plug_in_victim") runOn(victim) { cluster.join(node(master).address) From dd042e3573e4b334097e8f18c81dc4877a687a59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 7 Jun 2012 11:08:23 +0200 Subject: [PATCH 09/13] Group multiple awaitCond into single one --- .../UnreachableNodeRejoinsClusterSpec.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index da9d62e1d2..347a2c79bc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -75,9 +75,11 @@ class UnreachableNodeRejoinsClusterSpec runOn(victim) { val otherAddresses = sortedRoles.collect { case x if x != victim => node(x).address } within(30 seconds) { - awaitCond(cluster.latestGossip.overview.unreachable.size == (allRoles.size - 1)) - awaitCond(cluster.latestGossip.members.size == 1) - awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + // victim becomes all alone + awaitCond({ val gossip = cluster.latestGossip + gossip.overview.unreachable.size == (allRoles.size - 1) && + gossip.members.size == 1 && + gossip.members.forall(_.status == MemberStatus.Up) }) cluster.latestGossip.overview.unreachable.map(_.address) must be(otherAddresses.toSet) cluster.convergence.isDefined must be(false) } @@ -89,9 +91,10 @@ class UnreachableNodeRejoinsClusterSpec val otherAddresses = allButVictim.map(node(_).address) within(30 seconds) { // victim becomes unreachable - awaitCond(cluster.latestGossip.overview.unreachable.size == 1) - awaitCond(cluster.latestGossip.members.size == (allRoles.size - 1)) - awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond({ val gossip = cluster.latestGossip + gossip.overview.unreachable.size == 1 && + gossip.members.size == (allRoles.size - 1) && + gossip.members.forall(_.status == MemberStatus.Up) }) awaitSeenSameState(otherAddresses) // still one unreachable cluster.latestGossip.overview.unreachable.size must be(1) From db1175e1f3c6a00ebf1e0ccf4bda7d21caf71e9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Wed, 27 Jun 2012 13:54:43 +0200 Subject: [PATCH 10/13] Bringing UnreachableNodeRejoinsClusterSpec up to speed with master --- .../akka/cluster/MultiNodeClusterSpec.scala | 4 --- .../UnreachableNodeRejoinsClusterSpec.scala | 35 ++++++++++--------- 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 8e0f781ceb..3264c661b0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -203,10 +203,6 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu nodesInCluster.sorted.head } - def clusterSortedRoles(nodesInCluster: Seq[RoleName]): Seq[RoleName] = { - nodesInCluster.sorted - } - /** * Sort the roles in the order used by the cluster. */ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 347a2c79bc..e943ae6c6c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -24,20 +24,21 @@ object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { roles.filterNot(_ == role) } - commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString(""" - akka.cluster { - failure-detector.threshold = 5 - } """) - ).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class UnreachableNodeRejoinsClusterMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec -class UnreachableNodeRejoinsClusterMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec -class UnreachableNodeRejoinsClusterMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec -class UnreachableNodeRejoinsClusterMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy +class UnreachableNodeRejoinsClusterWithFailureDetectorPuppetMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with FailureDetectorPuppetStrategy -class UnreachableNodeRejoinsClusterSpec + +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode1 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode2 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode3 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy +class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 extends UnreachableNodeRejoinsClusterSpec with AccrualFailureDetectorStrategy + +abstract class UnreachableNodeRejoinsClusterSpec extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { @@ -45,14 +46,14 @@ class UnreachableNodeRejoinsClusterSpec override def initialParticipants = allRoles.size - lazy val sortedRoles = clusterSortedRoles(allRoles) + lazy val sortedRoles = allRoles.sorted lazy val master = sortedRoles(0) lazy val victim = sortedRoles(1) var endBarrierNumber = 0 def endBarrier: Unit = { endBarrierNumber += 1 - testConductor.enter("after_" + endBarrierNumber) + enterBarrier("after_" + endBarrierNumber) } "A cluster of " + allRoles.size + " members" must { @@ -70,10 +71,11 @@ class UnreachableNodeRejoinsClusterSpec } } - testConductor.enter("unplug_victim") + enterBarrier("unplug_victim") runOn(victim) { val otherAddresses = sortedRoles.collect { case x if x != victim => node(x).address } + otherAddresses.foreach(markNodeAsUnavailable(_)) within(30 seconds) { // victim becomes all alone awaitCond({ val gossip = cluster.latestGossip @@ -89,13 +91,14 @@ class UnreachableNodeRejoinsClusterSpec runOn(allButVictim:_*) { val victimAddress = node(victim).address val otherAddresses = allButVictim.map(node(_).address) + markNodeAsUnavailable(victimAddress) within(30 seconds) { // victim becomes unreachable awaitCond({ val gossip = cluster.latestGossip gossip.overview.unreachable.size == 1 && gossip.members.size == (allRoles.size - 1) && gossip.members.forall(_.status == MemberStatus.Up) }) - awaitSeenSameState(otherAddresses) + awaitSeenSameState(otherAddresses:_*) // still one unreachable cluster.latestGossip.overview.unreachable.size must be(1) cluster.latestGossip.overview.unreachable.head.address must be(victimAddress) @@ -128,7 +131,7 @@ class UnreachableNodeRejoinsClusterSpec } } - testConductor.enter("plug_in_victim") + enterBarrier("plug_in_victim") runOn(victim) { cluster.join(node(master).address) From 574ff26bb46baaf8ca46e38900e5ab9abd7e584d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Wed, 27 Jun 2012 15:56:45 +0200 Subject: [PATCH 11/13] Support for re-JOINING a node that have been DOWN. See #1908 --- .../src/main/scala/akka/cluster/Cluster.scala | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 40d67d7161..709f82a5e8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -256,7 +256,7 @@ object Gossip { * When convergence is reached the leader change status of `members` from `Joining` * to `Up`. * - * When failure detector consider a node as unavailble it will be moved from + * When failure detector consider a node as unavailable it will be moved from * `members` to `overview.unreachable`. * * When a node is downed, either manually or automatically, its status is changed to `Down`. @@ -555,12 +555,14 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } - private val state = { + private def createCleanState: State = { // note that self is not initially member, // and the Gossip is not versioned for this 'Node' yet - new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers))) + State(Gossip(members = Gossip.emptyMembers)) } + private val state = new AtomicReference[State](createCleanState) + // try to join one of the nodes defined in the 'akka.cluster.seed-nodes' if (AutoJoin) joinSeedNode() @@ -735,8 +737,10 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) @tailrec final def join(address: Address): Unit = { val localState = state.get - val newState = localState copy (joinInProgress = localState.joinInProgress + - (address -> (Deadline.now + JoinTimeout))) + // wipe our state + val newState = createCleanState copy (joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout))) + // wipe the failure detector + failureDetector.reset() if (!state.compareAndSet(localState, newState)) join(address) // recur else { val connection = clusterCommandConnectionFor(address) @@ -818,6 +822,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachable filterNot { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) + // remove the node from the failure detector if it is a DOWN node that is rejoining cluster + if (localUnreachable.size > newUnreachableMembers.size) failureDetector.remove(node) + // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining) @@ -998,6 +1005,13 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) latestGossip = winningGossip seen selfAddress, joinInProgress = newJoinInProgress) + // for all new joining nodes we optimistically remove them from the failure detector, since if we wait until + // we have won the CAS, then the node might be picked up by the reapUnreachableMembers task and moved to + // unreachable before we can remove the node from the failure detector + (newState.latestGossip.members -- localState.latestGossip.members).filter(_.status == Joining).foreach { + case node ⇒ failureDetector.remove(node.address) + } + // if we won the race then update else try again if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update else { From 6ad96c257909576c5d95bd6f7ace1192a7912d89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Thu, 28 Jun 2012 14:52:12 +0200 Subject: [PATCH 12/13] Review changes --- .../src/main/scala/akka/cluster/Cluster.scala | 8 ++-- .../UnreachableNodeRejoinsClusterSpec.scala | 48 ++++++++----------- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 709f82a5e8..a15a361aff 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -737,9 +737,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) @tailrec final def join(address: Address): Unit = { val localState = state.get - // wipe our state + // wipe our state since a node that joins a cluster must be empty val newState = createCleanState copy (joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout))) - // wipe the failure detector + // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() if (!state.compareAndSet(localState, newState)) join(address) // recur else { @@ -819,11 +819,11 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!alreadyMember && !isUnreachable) { // remove the node from the 'unreachable' set in case it is a DOWN node that is rejoining cluster - val newUnreachableMembers = localUnreachable filterNot { _.address == node } + val (rejoiningMember, newUnreachableMembers) = localUnreachable partition { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) // remove the node from the failure detector if it is a DOWN node that is rejoining cluster - if (localUnreachable.size > newUnreachableMembers.size) failureDetector.remove(node) + if (rejoiningMember.nonEmpty) failureDetector.remove(node) // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index e943ae6c6c..34f8605af1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -18,12 +18,6 @@ object UnreachableNodeRejoinsClusterMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - val allRoles = Seq(first, second, third, fourth) - - def allBut(role: RoleName, roles: Seq[RoleName] = allRoles): Seq[RoleName] = { - roles.filterNot(_ == role) - } - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -40,13 +34,15 @@ class UnreachableNodeRejoinsClusterWithAccrualFailureDetectorMultiJvmNode4 exten abstract class UnreachableNodeRejoinsClusterSpec extends MultiNodeSpec(UnreachableNodeRejoinsClusterMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender with BeforeAndAfter { + with MultiNodeClusterSpec { import UnreachableNodeRejoinsClusterMultiJvmSpec._ - override def initialParticipants = allRoles.size + def allBut(role: RoleName, roles: Seq[RoleName] = roles): Seq[RoleName] = { + roles.filterNot(_ == role) + } - lazy val sortedRoles = allRoles.sorted + + lazy val sortedRoles = roles.sorted lazy val master = sortedRoles(0) lazy val victim = sortedRoles(1) @@ -56,10 +52,10 @@ abstract class UnreachableNodeRejoinsClusterSpec enterBarrier("after_" + endBarrierNumber) } - "A cluster of " + allRoles.size + " members" must { + "A cluster of " + roles.size + " members" must { "reach initial convergence" taggedAs LongRunningTest in { - awaitClusterUp(allRoles:_*) + awaitClusterUp(roles:_*) endBarrier } @@ -73,35 +69,32 @@ abstract class UnreachableNodeRejoinsClusterSpec enterBarrier("unplug_victim") + val allButVictim = allBut(victim, sortedRoles) runOn(victim) { - val otherAddresses = sortedRoles.collect { case x if x != victim => node(x).address } - otherAddresses.foreach(markNodeAsUnavailable(_)) + allButVictim.foreach(markNodeAsUnavailable(_)) within(30 seconds) { // victim becomes all alone awaitCond({ val gossip = cluster.latestGossip - gossip.overview.unreachable.size == (allRoles.size - 1) && + gossip.overview.unreachable.size == (roles.size - 1) && gossip.members.size == 1 && gossip.members.forall(_.status == MemberStatus.Up) }) - cluster.latestGossip.overview.unreachable.map(_.address) must be(otherAddresses.toSet) + cluster.latestGossip.overview.unreachable.map(_.address) must be((allButVictim map address).toSet) cluster.convergence.isDefined must be(false) } } - val allButVictim = allBut(victim) runOn(allButVictim:_*) { - val victimAddress = node(victim).address - val otherAddresses = allButVictim.map(node(_).address) - markNodeAsUnavailable(victimAddress) + markNodeAsUnavailable(victim) within(30 seconds) { // victim becomes unreachable awaitCond({ val gossip = cluster.latestGossip gossip.overview.unreachable.size == 1 && - gossip.members.size == (allRoles.size - 1) && + gossip.members.size == (roles.size - 1) && gossip.members.forall(_.status == MemberStatus.Up) }) - awaitSeenSameState(otherAddresses:_*) + awaitSeenSameState(allButVictim map address:_*) // still one unreachable cluster.latestGossip.overview.unreachable.size must be(1) - cluster.latestGossip.overview.unreachable.head.address must be(victimAddress) + cluster.latestGossip.overview.unreachable.head.address must be(node(victim).address) // and therefore no convergence cluster.convergence.isDefined must be(false) } @@ -111,13 +104,12 @@ abstract class UnreachableNodeRejoinsClusterSpec } "mark the node as DOWN" taggedAs LongRunningTest in { - val victimAddress = node(victim).address runOn(master) { - cluster.down(victimAddress) + cluster down victim } runOn(allBut(victim):_*) { - awaitUpConvergence(allRoles.size - 1, Seq(victimAddress)) + awaitUpConvergence(roles.size - 1, Seq(victim)) } endBarrier @@ -134,10 +126,10 @@ abstract class UnreachableNodeRejoinsClusterSpec enterBarrier("plug_in_victim") runOn(victim) { - cluster.join(node(master).address) + cluster join master } - awaitUpConvergence(allRoles.size) + awaitUpConvergence(roles.size) endBarrier } From 675dfd918285bfdbd95a9708c5420a1eb6ffdf9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Fri, 29 Jun 2012 12:32:41 +0200 Subject: [PATCH 13/13] Keep the cluster node membership change listeners when joining. --- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index a15a361aff..44c646ebe8 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -738,7 +738,8 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) final def join(address: Address): Unit = { val localState = state.get // wipe our state since a node that joins a cluster must be empty - val newState = createCleanState copy (joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout))) + val newState = createCleanState copy (joinInProgress = Map.empty + (address -> (Deadline.now + JoinTimeout)), + memberMembershipChangeListeners = localState.memberMembershipChangeListeners) // wipe the failure detector since we are starting fresh and shouldn't care about the past failureDetector.reset() if (!state.compareAndSet(localState, newState)) join(address) // recur