From 25996bf28458f64ebf485fdb1b3e3dc0c250546f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 25 Jun 2012 21:07:44 +0200 Subject: [PATCH 1/9] Join seed nodes before becoming singleton cluster, see #2267 * self is initially not member (in gossip state) * if the join to seed nodes timeout it joins itself, and becomes singleton cluster * remove the special case handling of singelton cluster in gossip merge, since singleton cluster is not the normal state when joining any more --- .../src/main/resources/reference.conf | 3 +- .../src/main/scala/akka/cluster/Cluster.scala | 40 ++++++++++--------- .../scala/akka/cluster/JoinSeedNodeSpec.scala | 4 +- .../akka/cluster/MultiNodeClusterSpec.scala | 12 ++++-- .../akka/cluster/SingletonClusterSpec.scala | 15 +++++-- .../scala/akka/cluster/SunnyWeatherSpec.scala | 1 + .../test/scala/akka/cluster/ClusterSpec.scala | 8 ++-- 7 files changed, 53 insertions(+), 30 deletions(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 4da4dd6620..d226506acc 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -18,7 +18,8 @@ akka { # how long to wait for one of the seed nodes to reply to initial join request seed-node-timeout = 5s - # automatic join the seed-nodes at startup + # Automatic join the seed-nodes at startup. + # If seed-nodes is empty it will join itself and become a single node cluster. auto-join = on # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index caecf3906b..3eddb5bf60 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -364,20 +364,23 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto val log = Logging(context.system, this) def receive = { - case JoinSeedNode ⇒ joinSeedNode() - case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress) - case InitJoinAck(address) ⇒ cluster.join(address) - case Join(address) ⇒ cluster.joining(address) - case Down(address) ⇒ cluster.downing(address) - case Leave(address) ⇒ cluster.leaving(address) - case Exit(address) ⇒ cluster.exiting(address) - case Remove(address) ⇒ cluster.removing(address) + case JoinSeedNode ⇒ joinSeedNode() + case InitJoin ⇒ sender ! InitJoinAck(cluster.selfAddress) + case InitJoinAck(address) ⇒ cluster.join(address) + case Join(address) ⇒ cluster.joining(address) + case Down(address) ⇒ cluster.downing(address) + case Leave(address) ⇒ cluster.leaving(address) + case Exit(address) ⇒ cluster.exiting(address) + case Remove(address) ⇒ cluster.removing(address) + case Failure(e: AskTimeoutException) ⇒ joinSeedNodeTimeout() } def joinSeedNode(): Unit = { val seedRoutees = for (address ← cluster.seedNodes; if address != cluster.selfAddress) yield self.path.toStringWithAddress(address) - if (seedRoutees.nonEmpty) { + if (seedRoutees.isEmpty) { + cluster join cluster.selfAddress + } else { implicit val within = Timeout(cluster.clusterSettings.SeedNodeTimeout) val seedRouter = context.actorOf( Props.empty.withRouter(ScatterGatherFirstCompletedRouter( @@ -387,6 +390,8 @@ private[cluster] final class ClusterCommandDaemon(cluster: Cluster) extends Acto } } + def joinSeedNodeTimeout(): Unit = cluster join cluster.selfAddress + override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown) } @@ -534,10 +539,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } private val state = { - val member = Member(selfAddress, Joining) - val versionedGossip = Gossip(members = Gossip.emptyMembers + member) :+ vclockNode // add me as member and update my vector clock - val seenVersionedGossip = versionedGossip seen selfAddress - new AtomicReference[State](State(seenVersionedGossip)) + // note that self is not initially member, + // and the Gossip is not versioned for this 'Node' yet + new AtomicReference[State](State(Gossip(members = Gossip.emptyMembers))) } // try to join one of the nodes defined in the 'akka.cluster.seed-nodes' @@ -797,7 +801,9 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) val newUnreachableMembers = localUnreachable filterNot { _.address == node } val newOverview = localGossip.overview copy (unreachable = newUnreachableMembers) - val newMembers = localMembers + Member(node, Joining) // add joining node as Joining + // add joining node as Joining + // add self in case someone else joins before self has joined (Set discards duplicates) + val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip :+ vclockNode @@ -939,11 +945,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!localGossip.overview.isNonDownUnreachable(from)) { val winningGossip = - if (isSingletonCluster(localState) && localGossip.overview.unreachable.isEmpty && remoteGossip.members.contains(self)) { - // a fresh singleton cluster that is joining, no need to merge, use received gossip - remoteGossip - - } else if (remoteGossip.version <> localGossip.version) { + if (remoteGossip.version <> localGossip.version) { // concurrent val mergedGossip = remoteGossip merge localGossip val versionedMergedGossip = mergedGossip :+ vclockNode diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index 38f03a4e66..726a7d8c76 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -16,7 +16,9 @@ object JoinSeedNodeMultiJvmSpec extends MultiNodeConfig { val ordinary1 = role("ordinary1") val ordinary2 = role("ordinary2") - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-join = on")). + withFallback(MultiNodeClusterSpec.clusterConfig)) } class JoinSeedNodeMultiJvmNode1 extends JoinSeedNodeSpec with FailureDetectorPuppetStrategy diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 79e3a67e1e..ed95013bf4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -20,6 +20,7 @@ import akka.actor.RootActorPath object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { + auto-join = off auto-down = off gossip-interval = 200 ms heartbeat-interval = 400 ms @@ -99,10 +100,15 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu def cluster: Cluster = clusterNode /** - * Use this method instead of 'cluster.self' - * for the initial startup of the cluster node. + * Use this method for the initial startup of the cluster node. */ - def startClusterNode(): Unit = cluster.self + def startClusterNode(): Unit = { + if (cluster.latestGossip.members.isEmpty) { + cluster join myself + awaitCond(cluster.latestGossip.members.exists(_.address == address(myself))) + } else + cluster.self + } /** * Initialize the cluster with the specified member diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 9f79af2f13..3c35e95333 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -16,6 +16,7 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" akka.cluster { + auto-join = on auto-down = on failure-detector.threshold = 4 } @@ -38,12 +39,20 @@ abstract class SingletonClusterSpec "A cluster of 2 nodes" must { - "not be singleton cluster when joined" taggedAs LongRunningTest in { + "become singleton cluster when started with 'auto-join=on' and 'seed-nodes=[]'" taggedAs LongRunningTest in { + startClusterNode() + awaitUpConvergence(1) + cluster.isSingletonCluster must be(true) + + enterBarrier("after-1") + } + + "not be singleton cluster when joined with other node" taggedAs LongRunningTest in { awaitClusterUp(first, second) cluster.isSingletonCluster must be(false) assertLeader(first, second) - enterBarrier("after-1") + enterBarrier("after-2") } "become singleton cluster when one node is shutdown" taggedAs LongRunningTest in { @@ -58,7 +67,7 @@ abstract class SingletonClusterSpec assertLeader(first) } - enterBarrier("after-2") + enterBarrier("after-3") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index d661f0cc51..3be082d2f3 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -25,6 +25,7 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { akka.cluster { # FIXME remove this (use default) when ticket #2239 has been fixed gossip-interval = 400 ms + auto-join = off } akka.loglevel = INFO """)) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 6f70193715..e818847969 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -86,11 +86,13 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { "use the address of the remote transport" in { cluster.selfAddress must be(selfAddress) - cluster.self.address must be(selfAddress) } - "initially be singleton cluster and reach convergence immediately" in { - cluster.isSingletonCluster must be(true) + "initially become singleton cluster when joining itself and reach convergence" in { + cluster.isSingletonCluster must be(false) // auto-join = off + cluster.join(selfAddress) + awaitCond(cluster.isSingletonCluster) + cluster.self.address must be(selfAddress) cluster.latestGossip.members.map(_.address) must be(Set(selfAddress)) memberStatus(selfAddress) must be(Some(MemberStatus.Joining)) cluster.convergence.isDefined must be(true) From a3ceb407542e43e2312561af5e916f084115a255 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 26 Jun 2012 09:51:04 +0200 Subject: [PATCH 2/9] Start seed nodes sequentially in JoinSeedNodeSpec, see #2271 --- .../scala/akka/cluster/JoinSeedNodeSpec.scala | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala index 38f03a4e66..bd161a435c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinSeedNodeSpec.scala @@ -33,6 +33,23 @@ abstract class JoinSeedNodeSpec override def seedNodes = IndexedSeq(seed1, seed2) "A cluster with configured seed nodes" must { + "start the seed nodes sequentially" taggedAs LongRunningTest in { + runOn(seed1) { + startClusterNode() + } + enterBarrier("seed1-started") + + runOn(seed2) { + startClusterNode() + } + enterBarrier("seed2-started") + + runOn(seed1, seed2) { + awaitUpConvergence(2) + } + enterBarrier("after-1") + } + "join the seed nodes at startup" taggedAs LongRunningTest in { startClusterNode() @@ -40,7 +57,7 @@ abstract class JoinSeedNodeSpec awaitUpConvergence(4) - enterBarrier("after") + enterBarrier("after-2") } } } From aa2de6aacd00be16d1f0fb86338a645b026a21f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Tue, 26 Jun 2012 10:34:09 +0200 Subject: [PATCH 3/9] Fixed RoutingSpec test. See #2268 --- .../src/test/scala/akka/routing/RoutingSpec.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index a202778fe5..2b946ec1da 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -26,6 +26,10 @@ object RoutingSpec { router = round-robin nr-of-instances = 3 } + /router2 { + router = round-robin + nr-of-instances = 3 + } /myrouter { router = "akka.routing.RoutingSpec$MyRouter" foo = bar @@ -129,7 +133,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with } "use configured nr-of-instances when router is specified" in { - val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router1") + val router = system.actorOf(Props[TestActor].withRouter(RoundRobinRouter(nrOfInstances = 2)), "router2") Await.result(router ? CurrentRoutees, 5 seconds).asInstanceOf[RouterRoutees].routees.size must be(3) system.stop(router) } From 344824c22edc861609871d49f64b4d4c3517e6c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Tue, 26 Jun 2012 11:58:04 +0200 Subject: [PATCH 4/9] Adding test case duration output to the tests --- project/AkkaBuild.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 584625cc82..24e803aecc 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -388,8 +388,8 @@ object AkkaBuild extends Build { if (tags.isEmpty) Seq.empty else Seq(Tests.Argument("-n", tags.mkString(" "))) }, - // show full stack traces - testOptions in Test += Tests.Argument("-oF") + // show full stack traces and test case durations + testOptions in Test += Tests.Argument("-oDF") ) lazy val formatSettings = ScalariformPlugin.scalariformSettings ++ Seq( From aed78f702b64549d996b098b5091c7df7507130f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 26 Jun 2012 18:19:29 +0200 Subject: [PATCH 5/9] Workaround for SI-5986, see #2275 * Add new operators :+ and :++ by implicit conversion * Unfortunately this means that we must remember to use these until SI-5986 is fixed. Is there a better way? --- .../src/main/scala/akka/cluster/Cluster.scala | 22 +++++++++++++++--- .../scala/akka/cluster/TransitionSpec.scala | 23 +++++++++++++------ 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 3eddb5bf60..357d610ed5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -27,6 +27,7 @@ import javax.management._ import MemberStatus._ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } +import scala.collection.GenTraversableOnce /** * Interface for membership change listener. @@ -179,6 +180,20 @@ object Member { case (Joining, Joining) ⇒ m1 case (Up, Up) ⇒ m1 } + + // FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986 + // SortedSet + and ++ operators replaces existing element + // Use these :+ and :++ operators for the Gossip members + implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet) + class SortedSetWorkaround(sortedSet: SortedSet[Member]) { + implicit def :+(elem: Member): SortedSet[Member] = { + if (sortedSet.contains(elem)) sortedSet + else sortedSet + elem + } + + implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] = + sortedSet ++ (elems.toSet diff sortedSet) + } } /** @@ -226,6 +241,7 @@ case class GossipOverview( object Gossip { val emptyMembers: SortedSet[Member] = SortedSet.empty + } /** @@ -300,7 +316,7 @@ case class Gossip( */ def :+(member: Member): Gossip = { if (members contains member) this - else this copy (members = members + member) + else this copy (members = members :+ member) } /** @@ -329,7 +345,7 @@ case class Gossip( // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) + val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) // 5. fresh seen table val mergedSeen = Map.empty[Address, VectorClock] @@ -803,7 +819,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) + val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining) val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip :+ vclockNode diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 0376545b41..397d824ef4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -99,12 +99,14 @@ abstract class TransitionSpec "start nodes as singleton clusters" taggedAs LongRunningTest in { - startClusterNode() - cluster.isSingletonCluster must be(true) - cluster.status must be(Joining) - cluster.convergence.isDefined must be(true) - cluster.leaderActions() - cluster.status must be(Up) + runOn(first) { + startClusterNode() + cluster.isSingletonCluster must be(true) + cluster.status must be(Joining) + cluster.convergence.isDefined must be(true) + cluster.leaderActions() + cluster.status must be(Up) + } enterBarrier("after-1") } @@ -244,13 +246,20 @@ abstract class TransitionSpec } "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { + runOn(fifth) { + startClusterNode() + cluster.leaderActions() + cluster.status must be(Up) + } + enterBarrier("fifth-started") + runOn(fourth) { cluster.join(fifth) } runOn(fifth) { awaitMembers(fourth, fifth) } - testConductor.enter("fourth-joined") + enterBarrier("fourth-joined") fifth gossipTo fourth fourth gossipTo fifth From 932ea6f98ac7b1393ab6186832125ee1ca5efb73 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 26 Jun 2012 09:33:46 +0200 Subject: [PATCH 6/9] Test split brain scenario, see #2265 --- .../src/main/resources/reference.conf | 3 +- .../akka/cluster/MultiNodeClusterSpec.scala | 2 +- .../scala/akka/cluster/SplitBrainSpec.scala | 111 ++++++++++++++++++ 3 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index d226506acc..a1497ed4b6 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -22,7 +22,8 @@ akka { # If seed-nodes is empty it will join itself and become a single node cluster. auto-join = on - # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? + # Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? + # Using auto-down implies that two separate clusters will be formed in case of network partition. auto-down = on # the number of gossip daemon actors diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index ed95013bf4..3264c661b0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -198,7 +198,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu } } - def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { + def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = { nodesInCluster.length must not be (0) nodesInCluster.sorted.head } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala new file mode 100644 index 0000000000..24e94f715d --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.Address +import akka.remote.testconductor.Direction + +object SplitBrainMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + auto-down = on + failure-detector.threshold = 4 + }""")). + withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy + +class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy + +abstract class SplitBrainSpec + extends MultiNodeSpec(SplitBrainMultiJvmSpec) + with MultiNodeClusterSpec { + + import SplitBrainMultiJvmSpec._ + + val side1 = IndexedSeq(first, second) + val side2 = IndexedSeq(third, fourth, fifth) + + "A cluster of 5 members" must { + + "reach initial convergence" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third, fourth, fifth) + + enterBarrier("after-1") + } + + "detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in { + val thirdAddress = address(third) + enterBarrier("before-split") + + runOn(first) { + // split the cluster in two parts (first, second) / (third, fourth, fifth) + for (role1 ← side1; role2 ← side2) { + testConductor.blackhole(role1, role2, Direction.Both).await + } + } + enterBarrier("after-split") + + runOn(side1.last) { + for (role ← side2) markNodeAsUnavailable(role) + } + runOn(side2.last) { + for (role ← side1) markNodeAsUnavailable(role) + } + + runOn(side1: _*) { + awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side2.toSet map address), 20 seconds) + } + runOn(side2: _*) { + awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side1.toSet map address), 20 seconds) + } + + enterBarrier("after-2") + } + + "auto-down the other nodes and form new cluster with potentially new leader" taggedAs LongRunningTest in { + + runOn(side1: _*) { + // auto-down = on + awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + cluster.latestGossip.overview.unreachable.map(_.address) must be(side2.toSet map address) + awaitUpConvergence(side1.size, side2 map address) + assertLeader(side1: _*) + } + + runOn(side2: _*) { + // auto-down = on + awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + cluster.latestGossip.overview.unreachable.map(_.address) must be(side1.toSet map address) + awaitUpConvergence(side2.size, side1 map address) + assertLeader(side2: _*) + } + + enterBarrier("after-3") + } + + } + +} From 133e1561c1c623a72466cb7b84786d8fe6f5dbcb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 27 Jun 2012 10:58:48 +0200 Subject: [PATCH 7/9] Adjusted comment based on feedback, see #2265 --- akka-cluster/src/main/resources/reference.conf | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index a1497ed4b6..a06e9273cb 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -23,7 +23,8 @@ akka { auto-join = on # Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? - # Using auto-down implies that two separate clusters will be formed in case of network partition. + # Using auto-down implies that two separate clusters will automatically be formed in case of + # network partition. auto-down = on # the number of gossip daemon actors From 3011c6ebcf6ea227643b98a7e59c998d5eed7dd9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 27 Jun 2012 11:19:20 +0200 Subject: [PATCH 8/9] Fix wrong order of constructor params in AccrualFailureDetector --- .../scala/akka/cluster/AccrualFailureDetector.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index c397d065e5..7d719f6141 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -80,12 +80,12 @@ class AccrualFailureDetector( settings: ClusterSettings) = this( system, - settings.FailureDetectorThreshold, - settings.FailureDetectorMaxSampleSize, - settings.FailureDetectorAcceptableHeartbeatPause, - settings.FailureDetectorMinStdDeviation, - settings.HeartbeatInterval, - AccrualFailureDetector.realClock) + threshold = settings.FailureDetectorThreshold, + maxSampleSize = settings.FailureDetectorMaxSampleSize, + minStdDeviation = settings.FailureDetectorMinStdDeviation, + acceptableHeartbeatPause = settings.FailureDetectorAcceptableHeartbeatPause, + firstHeartbeatEstimate = settings.HeartbeatInterval, + clock = AccrualFailureDetector.realClock) private val log = Logging(system, "FailureDetector") From 4e603623562d84ba7a884802b412c68e5c5959a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Antonsson?= Date: Wed, 27 Jun 2012 14:05:22 +0200 Subject: [PATCH 9/9] Make FailureDetector.remove clean out information about address. See #2277 --- .../akka/cluster/AccrualFailureDetector.scala | 25 ++++++++++++------- .../scala/akka/cluster/FailureDetector.scala | 5 ++++ .../cluster/AccrualFailureDetectorSpec.scala | 6 ++--- .../akka/cluster/FailureDetectorPuppet.scala | 5 ++++ 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 7d719f6141..f1c761dec7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -107,8 +107,7 @@ class AccrualFailureDetector( private case class State( version: Long = 0L, history: Map[Address, HeartbeatHistory] = Map.empty, - timestamps: Map[Address, Long] = Map.empty[Address, Long], - explicitRemovals: Set[Address] = Set.empty[Address]) + timestamps: Map[Address, Long] = Map.empty[Address, Long]) private val state = new AtomicReference[State](State()) @@ -141,8 +140,7 @@ class AccrualFailureDetector( val newState = oldState copy (version = oldState.version + 1, history = oldState.history + (connection -> newHistory), - timestamps = oldState.timestamps + (connection -> timestamp), // record new timestamp, - explicitRemovals = oldState.explicitRemovals - connection) + timestamps = oldState.timestamps + (connection -> timestamp)) // record new timestamp // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) heartbeat(connection) // recur @@ -158,9 +156,7 @@ class AccrualFailureDetector( val oldState = state.get val oldTimestamp = oldState.timestamps.get(connection) - // if connection has been removed explicitly - if (oldState.explicitRemovals.contains(connection)) Double.MaxValue - else if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections + if (oldTimestamp.isEmpty) 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections else { val timeDiff = clock() - oldTimestamp.get @@ -208,13 +204,24 @@ class AccrualFailureDetector( if (oldState.history.contains(connection)) { val newState = oldState copy (version = oldState.version + 1, history = oldState.history - connection, - timestamps = oldState.timestamps - connection, - explicitRemovals = oldState.explicitRemovals + connection) + timestamps = oldState.timestamps - connection) // if we won the race then update else try again if (!state.compareAndSet(oldState, newState)) remove(connection) // recur } } + + def reset(): Unit = { + @tailrec + def doReset(): Unit = { + val oldState = state.get + val newState = oldState.copy(version = oldState.version + 1, history = Map.empty, timestamps = Map.empty) + // if we won the race then update else try again + if (!state.compareAndSet(oldState, newState)) doReset() // recur + } + log.debug("Resetting failure detector") + doReset() + } } private[cluster] object HeartbeatHistory { diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala index 60af0a1c41..1aa926c5e5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -25,4 +25,9 @@ trait FailureDetector { * Removes the heartbeat management for a connection. */ def remove(connection: Address): Unit + + /** + * Removes all connections and starts over. + */ + def reset(): Unit } diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 5c7186502c..df69a52e19 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -114,7 +114,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.isAvailable(conn) must be(true) } - "mark node as dead after explicit removal of connection" in { + "mark node as available after explicit removal of connection" in { val timeInterval = List[Long](0, 1000, 100, 100, 100) val fd = createFailureDetector(clock = fakeTimeGenerator(timeInterval)) @@ -124,7 +124,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.isAvailable(conn) must be(true) fd.remove(conn) - fd.isAvailable(conn) must be(false) + fd.isAvailable(conn) must be(true) } "mark node as available after explicit removal of connection and receiving heartbeat again" in { @@ -140,7 +140,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" fd.remove(conn) - fd.isAvailable(conn) must be(false) //3300 + fd.isAvailable(conn) must be(true) //3300 // it receives heartbeat from an explicitly removed node fd.heartbeat(conn) //4400 diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala index f35bca381d..9ddc9942b0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -57,4 +57,9 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte log.debug("Removing cluster node [{}]", connection) connections.remove(connection) } + + def reset(): Unit = { + log.debug("Resetting failure detector") + connections.clear() + } }