From aed78f702b64549d996b098b5091c7df7507130f Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 26 Jun 2012 18:19:29 +0200 Subject: [PATCH 1/4] Workaround for SI-5986, see #2275 * Add new operators :+ and :++ by implicit conversion * Unfortunately this means that we must remember to use these until SI-5986 is fixed. Is there a better way? --- .../src/main/scala/akka/cluster/Cluster.scala | 22 +++++++++++++++--- .../scala/akka/cluster/TransitionSpec.scala | 23 +++++++++++++------ 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 3eddb5bf60..357d610ed5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -27,6 +27,7 @@ import javax.management._ import MemberStatus._ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } +import scala.collection.GenTraversableOnce /** * Interface for membership change listener. @@ -179,6 +180,20 @@ object Member { case (Joining, Joining) ⇒ m1 case (Up, Up) ⇒ m1 } + + // FIXME Workaround for https://issues.scala-lang.org/browse/SI-5986 + // SortedSet + and ++ operators replaces existing element + // Use these :+ and :++ operators for the Gossip members + implicit def sortedSetWorkaround(sortedSet: SortedSet[Member]): SortedSetWorkaround = new SortedSetWorkaround(sortedSet) + class SortedSetWorkaround(sortedSet: SortedSet[Member]) { + implicit def :+(elem: Member): SortedSet[Member] = { + if (sortedSet.contains(elem)) sortedSet + else sortedSet + elem + } + + implicit def :++(elems: GenTraversableOnce[Member]): SortedSet[Member] = + sortedSet ++ (elems.toSet diff sortedSet) + } } /** @@ -226,6 +241,7 @@ case class GossipOverview( object Gossip { val emptyMembers: SortedSet[Member] = SortedSet.empty + } /** @@ -300,7 +316,7 @@ case class Gossip( */ def :+(member: Member): Gossip = { if (members contains member) this - else this copy (members = members + member) + else this copy (members = members :+ member) } /** @@ -329,7 +345,7 @@ case class Gossip( // 4. merge members by selecting the single Member with highest MemberStatus out of the Member groups, // and exclude unreachable - val mergedMembers = Gossip.emptyMembers ++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) + val mergedMembers = Gossip.emptyMembers :++ Member.pickHighestPriority(this.members, that.members).filterNot(mergedUnreachable.contains) // 5. fresh seen table val mergedSeen = Map.empty[Address, VectorClock] @@ -803,7 +819,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) // add joining node as Joining // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers + Member(node, Joining) + Member(selfAddress, Joining) + val newMembers = localMembers :+ Member(node, Joining) :+ Member(selfAddress, Joining) val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip :+ vclockNode diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala index 0376545b41..397d824ef4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/TransitionSpec.scala @@ -99,12 +99,14 @@ abstract class TransitionSpec "start nodes as singleton clusters" taggedAs LongRunningTest in { - startClusterNode() - cluster.isSingletonCluster must be(true) - cluster.status must be(Joining) - cluster.convergence.isDefined must be(true) - cluster.leaderActions() - cluster.status must be(Up) + runOn(first) { + startClusterNode() + cluster.isSingletonCluster must be(true) + cluster.status must be(Joining) + cluster.convergence.isDefined must be(true) + cluster.leaderActions() + cluster.status must be(Up) + } enterBarrier("after-1") } @@ -244,13 +246,20 @@ abstract class TransitionSpec } "startup a second separated cluster consisting of nodes fourth and fifth" taggedAs LongRunningTest in { + runOn(fifth) { + startClusterNode() + cluster.leaderActions() + cluster.status must be(Up) + } + enterBarrier("fifth-started") + runOn(fourth) { cluster.join(fifth) } runOn(fifth) { awaitMembers(fourth, fifth) } - testConductor.enter("fourth-joined") + enterBarrier("fourth-joined") fifth gossipTo fourth fourth gossipTo fifth From 932ea6f98ac7b1393ab6186832125ee1ca5efb73 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 26 Jun 2012 09:33:46 +0200 Subject: [PATCH 2/4] Test split brain scenario, see #2265 --- .../src/main/resources/reference.conf | 3 +- .../akka/cluster/MultiNodeClusterSpec.scala | 2 +- .../scala/akka/cluster/SplitBrainSpec.scala | 111 ++++++++++++++++++ 3 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index d226506acc..a1497ed4b6 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -22,7 +22,8 @@ akka { # If seed-nodes is empty it will join itself and become a single node cluster. auto-join = on - # should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? + # Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? + # Using auto-down implies that two separate clusters will be formed in case of network partition. auto-down = on # the number of gossip daemon actors diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index ed95013bf4..3264c661b0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -198,7 +198,7 @@ trait MultiNodeClusterSpec extends FailureDetectorStrategy with Suite { self: Mu } } - def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = { + def roleOfLeader(nodesInCluster: Seq[RoleName] = roles): RoleName = { nodesInCluster.length must not be (0) nodesInCluster.sorted.head } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala new file mode 100644 index 0000000000..24e94f715d --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.Address +import akka.remote.testconductor.Direction + +object SplitBrainMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + val fifth = role("fifth") + + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString(""" + akka.cluster { + auto-down = on + failure-detector.threshold = 4 + }""")). + withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class SplitBrainWithFailureDetectorPuppetMultiJvmNode1 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode2 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode3 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode4 extends SplitBrainSpec with FailureDetectorPuppetStrategy +class SplitBrainWithFailureDetectorPuppetMultiJvmNode5 extends SplitBrainSpec with FailureDetectorPuppetStrategy + +class SplitBrainWithAccrualFailureDetectorMultiJvmNode1 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode2 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode3 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode4 extends SplitBrainSpec with AccrualFailureDetectorStrategy +class SplitBrainWithAccrualFailureDetectorMultiJvmNode5 extends SplitBrainSpec with AccrualFailureDetectorStrategy + +abstract class SplitBrainSpec + extends MultiNodeSpec(SplitBrainMultiJvmSpec) + with MultiNodeClusterSpec { + + import SplitBrainMultiJvmSpec._ + + val side1 = IndexedSeq(first, second) + val side2 = IndexedSeq(third, fourth, fifth) + + "A cluster of 5 members" must { + + "reach initial convergence" taggedAs LongRunningTest in { + awaitClusterUp(first, second, third, fourth, fifth) + + enterBarrier("after-1") + } + + "detect network partition and mark nodes on other side as unreachable" taggedAs LongRunningTest in { + val thirdAddress = address(third) + enterBarrier("before-split") + + runOn(first) { + // split the cluster in two parts (first, second) / (third, fourth, fifth) + for (role1 ← side1; role2 ← side2) { + testConductor.blackhole(role1, role2, Direction.Both).await + } + } + enterBarrier("after-split") + + runOn(side1.last) { + for (role ← side2) markNodeAsUnavailable(role) + } + runOn(side2.last) { + for (role ← side1) markNodeAsUnavailable(role) + } + + runOn(side1: _*) { + awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side2.toSet map address), 20 seconds) + } + runOn(side2: _*) { + awaitCond(cluster.latestGossip.overview.unreachable.map(_.address) == (side1.toSet map address), 20 seconds) + } + + enterBarrier("after-2") + } + + "auto-down the other nodes and form new cluster with potentially new leader" taggedAs LongRunningTest in { + + runOn(side1: _*) { + // auto-down = on + awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + cluster.latestGossip.overview.unreachable.map(_.address) must be(side2.toSet map address) + awaitUpConvergence(side1.size, side2 map address) + assertLeader(side1: _*) + } + + runOn(side2: _*) { + // auto-down = on + awaitCond(cluster.latestGossip.overview.unreachable.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) + cluster.latestGossip.overview.unreachable.map(_.address) must be(side1.toSet map address) + awaitUpConvergence(side2.size, side1 map address) + assertLeader(side2: _*) + } + + enterBarrier("after-3") + } + + } + +} From 133e1561c1c623a72466cb7b84786d8fe6f5dbcb Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 27 Jun 2012 10:58:48 +0200 Subject: [PATCH 3/4] Adjusted comment based on feedback, see #2265 --- akka-cluster/src/main/resources/reference.conf | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index a1497ed4b6..a06e9273cb 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -23,7 +23,8 @@ akka { auto-join = on # Should the 'leader' in the cluster be allowed to automatically mark unreachable nodes as DOWN? - # Using auto-down implies that two separate clusters will be formed in case of network partition. + # Using auto-down implies that two separate clusters will automatically be formed in case of + # network partition. auto-down = on # the number of gossip daemon actors From 3011c6ebcf6ea227643b98a7e59c998d5eed7dd9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 27 Jun 2012 11:19:20 +0200 Subject: [PATCH 4/4] Fix wrong order of constructor params in AccrualFailureDetector --- .../scala/akka/cluster/AccrualFailureDetector.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index c397d065e5..7d719f6141 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -80,12 +80,12 @@ class AccrualFailureDetector( settings: ClusterSettings) = this( system, - settings.FailureDetectorThreshold, - settings.FailureDetectorMaxSampleSize, - settings.FailureDetectorAcceptableHeartbeatPause, - settings.FailureDetectorMinStdDeviation, - settings.HeartbeatInterval, - AccrualFailureDetector.realClock) + threshold = settings.FailureDetectorThreshold, + maxSampleSize = settings.FailureDetectorMaxSampleSize, + minStdDeviation = settings.FailureDetectorMinStdDeviation, + acceptableHeartbeatPause = settings.FailureDetectorAcceptableHeartbeatPause, + firstHeartbeatEstimate = settings.HeartbeatInterval, + clock = AccrualFailureDetector.realClock) private val log = Logging(system, "FailureDetector")