From e6ee3e2a953768c982f0150837471041dcf46060 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 10 Jun 2012 16:50:04 +0200 Subject: [PATCH 1/8] Ignoring ConvergenceSpec until fixed. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index 9963903b90..65571b97b3 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -37,7 +37,7 @@ abstract class ConvergenceSpec "A cluster of 3 members" must { - "reach initial convergence" taggedAs LongRunningTest in { + "reach initial convergence" taggedAs LongRunningTest ignore { awaitClusterUp(first, second, third) runOn(fourth) { @@ -47,7 +47,7 @@ abstract class ConvergenceSpec testConductor.enter("after-1") } - "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in { + "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore { val thirdAddress = node(third).address testConductor.enter("before-shutdown") @@ -78,7 +78,7 @@ abstract class ConvergenceSpec testConductor.enter("after-2") } - "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in { + "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore { runOn(fourth) { // try to join cluster.join(node(first).address) From a4499b06bb00945bd63f6b352190e6e0a4560b26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 10 Jun 2012 16:52:33 +0200 Subject: [PATCH 2/8] Abstracted the FailureDetector into a interface trait and added controllable failure detector mock. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Abstracted a FailureDetector trait. - Added a FailureDetectorPuppet mock that can be user controllable - Added option to define a custom failure detector - Misc minor fixes Signed-off-by: Jonas Bonér --- .../akka/cluster/AccrualFailureDetector.scala | 28 ++++-- .../src/main/scala/akka/cluster/Cluster.scala | 21 +++- .../scala/akka/cluster/ClusterSettings.scala | 4 + .../scala/akka/cluster/FailureDetector.scala | 99 +++++++++++++++++++ .../cluster/AccrualFailureDetectorSpec.scala | 18 ++-- .../akka/cluster/ClusterConfigSpec.scala | 1 + .../test/scala/akka/cluster/ClusterSpec.scala | 6 +- 7 files changed, 151 insertions(+), 26 deletions(-) create mode 100644 akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index c7aaf12fcf..cdca8c9503 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -4,7 +4,8 @@ package akka.cluster -import akka.actor.{ ActorSystem, Address } +import akka.actor.{ ActorSystem, Address, ExtendedActorSystem } +import akka.remote.RemoteActorRefProvider import akka.event.Logging import scala.collection.immutable.Map @@ -23,11 +24,20 @@ import java.util.concurrent.atomic.AtomicReference * Default threshold is 8, but can be configured in the Akka config. */ class AccrualFailureDetector( - system: ActorSystem, - address: Address, + val system: ActorSystem, val threshold: Int = 8, val maxSampleSize: Int = 1000, - val timeMachine: () ⇒ Long = System.currentTimeMillis) { + val timeMachine: () ⇒ Long = System.currentTimeMillis) extends FailureDetector { + + def this( + system: ActorSystem, + settings: ClusterSettings, + timeMachine: () ⇒ Long = System.currentTimeMillis) = + this( + system, + settings.FailureDetectorThreshold, + settings.FailureDetectorMaxSampleSize, + timeMachine) private final val PhiFactor = 1.0 / math.log(10.0) @@ -65,8 +75,8 @@ class AccrualFailureDetector( * Records a heartbeat for a connection. */ @tailrec - final def heartbeat(connection: Address) { - log.debug("Node [{}] - Heartbeat from connection [{}] ", address, connection) + final def heartbeat(connection: Address): Unit = { + log.debug("Heartbeat from connection [{}] ", connection) val oldState = state.get val latestTimestamp = oldState.timestamps.get(connection) @@ -155,7 +165,7 @@ class AccrualFailureDetector( else PhiFactor * timestampDiff / mean } - log.debug("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) + log.debug("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection) phi } @@ -163,8 +173,8 @@ class AccrualFailureDetector( * Removes the heartbeat management for a connection. */ @tailrec - final def remove(connection: Address) { - log.debug("Node [{}] - Remove connection [{}] ", address, connection) + final def remove(connection: Address): Unit = { + log.debug("Remove connection [{}] ", connection) val oldState = state.get if (oldState.failureStats.contains(connection)) { diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 4ea43d50e4..e788450148 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -306,7 +306,22 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def lookup = Cluster - override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system) + override def createExtension(system: ExtendedActorSystem): Cluster = { + val clusterSettings = new ClusterSettings(system.settings.config, system.name) + + def createDefaultFD() = new AccrualFailureDetector(system, clusterSettings) + val failureDetector = clusterSettings.FailureDetectorImplementationClass match { + case None ⇒ createDefaultFD() + case Some(fqcn) ⇒ system.dynamicAccess.createInstanceFor[FailureDetector](fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match { + case Right(fd) ⇒ fd + case Left(e) ⇒ + system.log.error(e, "Could not create custom failure detector - falling back to default") + createDefaultFD() + } + } + + new Cluster(system, failureDetector) + } } /** @@ -349,7 +364,7 @@ trait ClusterNodeMBean { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ +class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension { clusterNode ⇒ /** * Represents the state for this Cluster. Implemented using optimistic lockless concurrency. @@ -369,8 +384,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ import clusterSettings._ val selfAddress = remote.transport.address - val failureDetector = new AccrualFailureDetector( - system, selfAddress, FailureDetectorThreshold, FailureDetectorMaxSampleSize) private val vclockNode = VectorClock.Node(selfAddress.toString) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 0e7dac06ab..b58775e222 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -15,6 +15,10 @@ class ClusterSettings(val config: Config, val systemName: String) { import config._ val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") + val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match { + case "" ⇒ None + case fqcn ⇒ Some(fqcn) + } val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { case "" ⇒ None case AddressFromURIString(addr) ⇒ Some(addr) diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala new file mode 100644 index 0000000000..897d0413b5 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.actor.{ Address, ActorSystem } +import akka.event.{ Logging, LogSource } + +/** + * Interface for Akka failure detectors. + */ +trait FailureDetector { + + /** + * Returns true if the connection is considered to be up and healthy + * and returns false otherwise. + */ + def isAvailable(connection: Address): Boolean + + /** + * Records a heartbeat for a connection. + */ + def heartbeat(connection: Address): Unit + + /** + * Calculates how likely it is that the connection has failed. + *

+ * If a connection does not have any records in failure detector then it is + * considered healthy. + */ + def phi(connection: Address): Double + + /** + * Removes the heartbeat management for a connection. + */ + def remove(connection: Address): Unit +} + +/** + * User controllable "puppet" failure detector. + */ +class FailureDetectorPuppet(system: ActorSystem, connectionsToStartWith: Address*) extends FailureDetector { + import java.util.concurrent.ConcurrentHashMap + + trait Status + object Up extends Status + object Down extends Status + + implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { + def genString(o: AnyRef): String = o.getClass.getName + override def getClazz(o: AnyRef): Class[_] = o.getClass + } + + val log = Logging(system, this) + + private val connections = { + val cs = new ConcurrentHashMap[Address, Status] + connectionsToStartWith foreach { cs put (_, Up) } + cs + } + + def +(connection: Address): this.type = { + log.debug("Adding cluster node [{}]", connection) + connections.put(connection, Up) + this + } + + def markAsDown(connection: Address): this.type = { + connections.put(connection, Down) + this + } + + def markAsUp(connection: Address): this.type = { + connections.put(connection, Up) + this + } + + def isAvailable(connection: Address): Boolean = connections.get(connection) match { + case null ⇒ + this + connection + true + case Up ⇒ + log.debug("isAvailable: Cluster node IS NOT available [{}]", connection) + true + case Down ⇒ + log.debug("isAvailable: Cluster node IS available [{}]", connection) + false + } + + def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection) + + def phi(connection: Address): Double = 0.1D + + def remove(connection: Address): Unit = { + log.debug("Removing cluster node [{}]", connection) + connections.remove(connection) + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 1cf62daf1c..bd4d5d2c52 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -28,7 +28,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "return phi value of 0.0 on startup for each address, when no heartbeats" in { - val fd = new AccrualFailureDetector(system, conn) + val fd = new AccrualFailureDetector(system) fd.phi(conn) must be(0.0) fd.phi(conn2) must be(0.0) } @@ -36,7 +36,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "return phi based on guess when only one heartbeat" in { // 1 second ticks val timeInterval = Vector.fill(30)(1000L) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -52,7 +52,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "return phi value using first interval after second heartbeat" in { val timeInterval = List[Long](0, 100, 100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -63,7 +63,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after a series of successful heartbeats" in { val timeInterval = List[Long](0, 1000, 100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -75,7 +75,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead after explicit removal of connection" in { val timeInterval = List[Long](0, 1000, 100, 100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -89,7 +89,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after explicit removal of connection and receiving heartbeat again" in { val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -114,7 +114,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead if heartbeat are missed" in { val timeInterval = List[Long](0, 1000, 100, 100, 5000) val ft = fakeTimeGenerator(timeInterval) - val fd = new AccrualFailureDetector(system, conn, threshold = 3, + val fd = new AccrualFailureDetector(system, threshold = 3, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -127,7 +127,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100) - val fd = new AccrualFailureDetector(system, conn, threshold = 3, + val fd = new AccrualFailureDetector(system, threshold = 3, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -144,7 +144,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "use maxSampleSize heartbeats" in { val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000) - val fd = new AccrualFailureDetector(system, conn, maxSampleSize = 3, + val fd = new AccrualFailureDetector(system, maxSampleSize = 3, timeMachine = fakeTimeGenerator(timeInterval)) // 100 ms interval diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 6b2ff1962c..9bce41a831 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -18,6 +18,7 @@ class ClusterConfigSpec extends AkkaSpec { import settings._ FailureDetectorThreshold must be(8) FailureDetectorMaxSampleSize must be(1000) + FailureDetectorImplementationClass must be(None) NodeToJoin must be(None) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index d3d1d6d0a2..5b4bca3379 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -33,7 +33,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { val deterministicRandom = new AtomicInteger - val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem]) { + val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], new FailureDetectorPuppet(system)) { override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = { if (addresses.isEmpty) None @@ -67,9 +67,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { @volatile var _unavailable: Set[Address] = Set.empty - override val failureDetector = new AccrualFailureDetector( - system, selfAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) { - + override val failureDetector = new AccrualFailureDetector(system, clusterSettings) { override def isAvailable(connection: Address): Boolean = { if (_unavailable.contains(connection)) false else super.isAvailable(connection) From 0030fa1b528bdac181a34e7d211b4cc26b09c678 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sun, 10 Jun 2012 16:53:17 +0200 Subject: [PATCH 3/8] Made LeaderDowningNodeThatIsUnreachableSpec make use of the new FailureDetectorPuppet as a sample of how to use it. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- ...aderDowningNodeThatIsUnreachableSpec.scala | 28 +++++++++++++------ 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 616c412556..f3f8015ced 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -7,7 +7,7 @@ import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.actor.Address +import akka.actor._ import akka.util.duration._ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { @@ -16,12 +16,9 @@ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = true). + commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" - akka.cluster { - auto-down = on - failure-detector.threshold = 4 - } + akka.cluster.auto-down = on """)). withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -37,10 +34,20 @@ class LeaderDowningNodeThatIsUnreachableSpec import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._ + // Set up the puppet failure detector + lazy val failureDetector = new FailureDetectorPuppet(system = system) + lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + + override def cluster = clusterNode + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + lazy val fourthAddress = node(fourth).address + "The Leader in a 4 node cluster" must { "be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in { - val fourthAddress = node(fourth).address awaitClusterUp(first, second, third, fourth) runOn(first) { @@ -48,6 +55,9 @@ class LeaderDowningNodeThatIsUnreachableSpec testConductor.shutdown(fourth, 0) testConductor.enter("down-fourth-node") + // mark the node as unreachable in the failure detector + failureDetector markAsDown fourthAddress + // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) @@ -67,7 +77,6 @@ class LeaderDowningNodeThatIsUnreachableSpec } "be able to DOWN a 'middle' node that is UNREACHABLE" taggedAs LongRunningTest in { - val secondAddress = node(second).address testConductor.enter("before-down-second-node") runOn(first) { @@ -75,6 +84,9 @@ class LeaderDowningNodeThatIsUnreachableSpec testConductor.shutdown(second, 0) testConductor.enter("down-second-node") + // mark the node as unreachable in the failure detector + failureDetector markAsDown secondAddress + // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) From ec7177be740fc070c2a5fe483dbc76a49b35d6fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 11 Jun 2012 10:06:53 +0200 Subject: [PATCH 4/8] Misc fixes after FailureDetectorPuppet and abstraction review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Moved FailureDetectorPuppet to its own file in src/test. - Removed 'phi' method from FailureDetector public API. - Throwing exception instead of falling back to default if we can't load the custom FD. - Removed add-connection method in FailureDetectorPuppet. Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Cluster.scala | 15 ++-- .../scala/akka/cluster/FailureDetector.scala | 75 +------------------ .../test/scala/akka/cluster/ClusterSpec.scala | 2 +- .../akka/cluster/FailureDetectorPuppet.scala | 60 +++++++++++++++ 4 files changed, 70 insertions(+), 82 deletions(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index e788450148..891c8972b0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -309,15 +309,14 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def createExtension(system: ExtendedActorSystem): Cluster = { val clusterSettings = new ClusterSettings(system.settings.config, system.name) - def createDefaultFD() = new AccrualFailureDetector(system, clusterSettings) val failureDetector = clusterSettings.FailureDetectorImplementationClass match { - case None ⇒ createDefaultFD() - case Some(fqcn) ⇒ system.dynamicAccess.createInstanceFor[FailureDetector](fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match { - case Right(fd) ⇒ fd - case Left(e) ⇒ - system.log.error(e, "Could not create custom failure detector - falling back to default") - createDefaultFD() - } + case None ⇒ new AccrualFailureDetector(system, clusterSettings) + case Some(fqcn) ⇒ + system.dynamicAccess.createInstanceFor[FailureDetector]( + fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match { + case Right(fd) ⇒ fd + case Left(e) ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString) + } } new Cluster(system, failureDetector) diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala index 897d0413b5..60af0a1c41 100644 --- a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -4,8 +4,7 @@ package akka.cluster -import akka.actor.{ Address, ActorSystem } -import akka.event.{ Logging, LogSource } +import akka.actor.Address /** * Interface for Akka failure detectors. @@ -13,8 +12,7 @@ import akka.event.{ Logging, LogSource } trait FailureDetector { /** - * Returns true if the connection is considered to be up and healthy - * and returns false otherwise. + * Returns true if the connection is considered to be up and healthy and returns false otherwise. */ def isAvailable(connection: Address): Boolean @@ -23,77 +21,8 @@ trait FailureDetector { */ def heartbeat(connection: Address): Unit - /** - * Calculates how likely it is that the connection has failed. - *

- * If a connection does not have any records in failure detector then it is - * considered healthy. - */ - def phi(connection: Address): Double - /** * Removes the heartbeat management for a connection. */ def remove(connection: Address): Unit } - -/** - * User controllable "puppet" failure detector. - */ -class FailureDetectorPuppet(system: ActorSystem, connectionsToStartWith: Address*) extends FailureDetector { - import java.util.concurrent.ConcurrentHashMap - - trait Status - object Up extends Status - object Down extends Status - - implicit val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { - def genString(o: AnyRef): String = o.getClass.getName - override def getClazz(o: AnyRef): Class[_] = o.getClass - } - - val log = Logging(system, this) - - private val connections = { - val cs = new ConcurrentHashMap[Address, Status] - connectionsToStartWith foreach { cs put (_, Up) } - cs - } - - def +(connection: Address): this.type = { - log.debug("Adding cluster node [{}]", connection) - connections.put(connection, Up) - this - } - - def markAsDown(connection: Address): this.type = { - connections.put(connection, Down) - this - } - - def markAsUp(connection: Address): this.type = { - connections.put(connection, Up) - this - } - - def isAvailable(connection: Address): Boolean = connections.get(connection) match { - case null ⇒ - this + connection - true - case Up ⇒ - log.debug("isAvailable: Cluster node IS NOT available [{}]", connection) - true - case Down ⇒ - log.debug("isAvailable: Cluster node IS available [{}]", connection) - false - } - - def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection) - - def phi(connection: Address): Double = 0.1D - - def remove(connection: Address): Unit = { - log.debug("Removing cluster node [{}]", connection) - connections.remove(connection) - } -} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 5b4bca3379..f60e6fa7dc 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -67,7 +67,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { @volatile var _unavailable: Set[Address] = Set.empty - override val failureDetector = new AccrualFailureDetector(system, clusterSettings) { + override val failureDetector = new FailureDetectorPuppet(system) { override def isAvailable(connection: Address): Boolean = { if (_unavailable.contains(connection)) false else super.isAvailable(connection) diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala new file mode 100644 index 0000000000..3245a15f97 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.actor.{ Address, ActorSystem } +import akka.event.{ Logging, LogSource } + +/** + * User controllable "puppet" failure detector. + */ +class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector { + import java.util.concurrent.ConcurrentHashMap + + def this(system: ActorSystem) = this(system, new ClusterSettings(system.settings.config, system.name)) + + trait Status + object Up extends Status + object Down extends Status + + implicit private val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { + def genString(o: AnyRef): String = o.getClass.getName + override def getClazz(o: AnyRef): Class[_] = o.getClass + } + + private val log = Logging(system, this) + + private val connections = new ConcurrentHashMap[Address, Status] + + def markAsDown(connection: Address): this.type = { + connections.put(connection, Down) + this + } + + def markAsUp(connection: Address): this.type = { + connections.put(connection, Up) + this + } + + def isAvailable(connection: Address): Boolean = connections.get(connection) match { + case null ⇒ + log.debug("Adding cluster node [{}]", connection) + connections.put(connection, Up) + true + case Up ⇒ + log.debug("isAvailable: Cluster node IS NOT available [{}]", connection) + true + case Down ⇒ + log.debug("isAvailable: Cluster node IS available [{}]", connection) + false + } + + def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection) + + def remove(connection: Address): Unit = { + log.debug("Removing cluster node [{}]", connection) + connections.remove(connection) + } +} From 523f433e4b328ab6976d0b4f037432c9772a4825 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 11 Jun 2012 11:36:29 +0200 Subject: [PATCH 5/8] Fixed potential problem in test --- .../cluster/LeaderDowningNodeThatIsUnreachableSpec.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index f3f8015ced..dc383dca43 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -40,16 +40,12 @@ class LeaderDowningNodeThatIsUnreachableSpec override def cluster = clusterNode - lazy val firstAddress = node(first).address - lazy val secondAddress = node(second).address - lazy val thirdAddress = node(third).address - lazy val fourthAddress = node(fourth).address - "The Leader in a 4 node cluster" must { "be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in { awaitClusterUp(first, second, third, fourth) + val fourthAddress = node(fourth).address runOn(first) { // kill 'fourth' node testConductor.shutdown(fourth, 0) @@ -77,8 +73,9 @@ class LeaderDowningNodeThatIsUnreachableSpec } "be able to DOWN a 'middle' node that is UNREACHABLE" taggedAs LongRunningTest in { - testConductor.enter("before-down-second-node") + val secondAddress = node(second).address + testConductor.enter("before-down-second-node") runOn(first) { // kill 'second' node testConductor.shutdown(second, 0) From b65cf5c2ec233a9f7952485aee9081498f4fa95c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 11 Jun 2012 14:32:17 +0200 Subject: [PATCH 6/8] Created FailureDetectorStrategy with two implementations: FailureDetectorPuppetStrategy and AccrualFailureDetectorStrategy. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Created FailureDetectorStrategy base trait. - Created FailureDetectorPuppetStrategy. - Created AccrualFailureDetectorStrategy. - Created two versions of LeaderDowningNodeThatIsUnreachableMultiJvmSpec - LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppet - LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetector - Added AccrualFailureDetectorStrategy to all the remaining tests - will be split up into two versions shortly. Signed-off-by: Jonas Bonér --- ...ientDowningNodeThatIsUnreachableSpec.scala | 10 +-- .../ClientDowningNodeThatIsUpSpec.scala | 10 +-- .../scala/akka/cluster/ConvergenceSpec.scala | 8 +-- .../cluster/FailureDetectorStrategy.scala | 61 +++++++++++++++++++ .../GossipingAccrualFailureDetectorSpec.scala | 6 +- .../akka/cluster/JoinTwoClustersSpec.scala | 12 ++-- ...aderDowningNodeThatIsUnreachableSpec.scala | 25 ++++---- .../akka/cluster/LeaderElectionSpec.scala | 10 +-- .../MembershipChangeListenerExitingSpec.scala | 6 +- .../MembershipChangeListenerJoinSpec.scala | 4 +- .../MembershipChangeListenerLeavingSpec.scala | 6 +- .../MembershipChangeListenerSpec.scala | 6 +- .../MembershipChangeListenerUpSpec.scala | 4 +- .../akka/cluster/MultiNodeClusterSpec.scala | 17 +++--- .../scala/akka/cluster/NodeJoinSpec.scala | 4 +- ...LeavingAndExitingAndBeingRemovedSpec.scala | 6 +- .../cluster/NodeLeavingAndExitingSpec.scala | 6 +- .../scala/akka/cluster/NodeLeavingSpec.scala | 6 +- .../akka/cluster/NodeMembershipSpec.scala | 6 +- .../scala/akka/cluster/NodeShutdownSpec.scala | 4 +- .../scala/akka/cluster/NodeUpSpec.scala | 4 +- .../scala/akka/cluster/SunnyWeatherSpec.scala | 10 +-- .../akka/cluster/FailureDetectorPuppet.scala | 4 +- 23 files changed, 149 insertions(+), 86 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 6d4d09f7cb..d1a9f756dd 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -18,12 +18,12 @@ object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ClientDowningNodeThatIsUnreachableMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec -class ClientDowningNodeThatIsUnreachableMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec -class ClientDowningNodeThatIsUnreachableMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec -class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec +class ClientDowningNodeThatIsUnreachableMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableSpec +abstract class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) with MultiNodeClusterSpec { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index db00438c9e..687596745b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -18,12 +18,12 @@ object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ClientDowningNodeThatIsUpMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec -class ClientDowningNodeThatIsUpMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec -class ClientDowningNodeThatIsUpMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec -class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec +class ClientDowningNodeThatIsUpMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpSpec +abstract class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) with MultiNodeClusterSpec { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index 65571b97b3..df47e19bec 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -25,10 +25,10 @@ object ConvergenceMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ConvergenceMultiJvmNode1 extends ConvergenceSpec -class ConvergenceMultiJvmNode2 extends ConvergenceSpec -class ConvergenceMultiJvmNode3 extends ConvergenceSpec -class ConvergenceMultiJvmNode4 extends ConvergenceSpec +class ConvergenceMultiJvmNode1 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceMultiJvmNode2 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceMultiJvmNode3 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceMultiJvmNode4 extends ConvergenceSpec with AccrualFailureDetectorStrategy abstract class ConvergenceSpec extends MultiNodeSpec(ConvergenceMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala new file mode 100644 index 0000000000..dcbb65d0f1 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import akka.actor.Address +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +/** + * Base trait for all failure detector strategies. + */ +trait FailureDetectorStrategy { + + /** + * Get or create the FailureDetector to be used in the cluster node. + * To be defined by subclass. + */ + def failureDetector: FailureDetector + + /** + * Marks a node as available in the failure detector. + * To be defined by subclass. + */ + def markNodeAsAvailable(address: Address): Unit + + /** + * Marks a node as unavailable in the failure detector. + * To be defined by subclass. + */ + def markNodeAsUnavailable(address: Address): Unit +} + +/** + * Defines a FailureDetectorPuppet-based FailureDetectorStrategy. + */ +trait FailureDetectorPuppetStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ + + /** + * The puppet instance. Separated from 'failureDetector' field so we don't have to cast when using the puppet specific methods. + */ + private val puppet = new FailureDetectorPuppet(system) + + override def failureDetector: FailureDetector = puppet + + override def markNodeAsAvailable(address: Address): Unit = puppet markNodeAsAvailable address + + override def markNodeAsUnavailable(address: Address): Unit = puppet markNodeAsUnavailable address +} + +/** + * Defines a AccrualFailureDetector-based FailureDetectorStrategy. + */ +trait AccrualFailureDetectorStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ + + override val failureDetector: FailureDetector = new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name)) + + override def markNodeAsAvailable(address: Address): Unit = { /* no-op */ } + + override def markNodeAsUnavailable(address: Address): Unit = { /* no-op */ } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index f75ca3b058..63090b7a1f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -19,9 +19,9 @@ object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfig)) } -class GossipingAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec -class GossipingAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec -class GossipingAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec +class GossipingAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class GossipingAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class GossipingAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec) with MultiNodeClusterSpec { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index e86602949f..2000e63253 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -20,12 +20,12 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy +class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy +class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy +class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy +class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy +class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index dc383dca43..7dcb6b20f6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -23,23 +23,22 @@ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfig)) } -class LeaderDowningNodeThatIsUnreachableMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec -class LeaderDowningNodeThatIsUnreachableMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec -class LeaderDowningNodeThatIsUnreachableMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec -class LeaderDowningNodeThatIsUnreachableMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableSpec +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy + +abstract class LeaderDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec) with MultiNodeClusterSpec { import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._ - // Set up the puppet failure detector - lazy val failureDetector = new FailureDetectorPuppet(system = system) - lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) - - override def cluster = clusterNode - "The Leader in a 4 node cluster" must { "be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in { @@ -52,7 +51,7 @@ class LeaderDowningNodeThatIsUnreachableSpec testConductor.enter("down-fourth-node") // mark the node as unreachable in the failure detector - failureDetector markAsDown fourthAddress + markNodeAsUnavailable(fourthAddress) // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- @@ -82,7 +81,7 @@ class LeaderDowningNodeThatIsUnreachableSpec testConductor.enter("down-second-node") // mark the node as unreachable in the failure detector - failureDetector markAsDown secondAddress + markNodeAsUnavailable(secondAddress) // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 43f0fc19eb..f44b494917 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -19,11 +19,11 @@ object LeaderElectionMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec -class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec -class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec -class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec -class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec +class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec with AccrualFailureDetectorStrategy abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index d76c3cf689..7389a01ffc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -27,9 +27,9 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec -class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec -class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec with AccrualFailureDetectorStrategy abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index bdf8f7d44d..8a940375ef 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -25,8 +25,8 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec -class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec +class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec with AccrualFailureDetectorStrategy abstract class MembershipChangeListenerJoinSpec extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index 1ff11465bb..d7c79407a2 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -24,9 +24,9 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig)) } -class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec -class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec -class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec with AccrualFailureDetectorStrategy abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index c48727b1cd..914db94acb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -17,9 +17,9 @@ object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec -class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec -class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec +class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec with AccrualFailureDetectorStrategy abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) with MultiNodeClusterSpec { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index 3e22dd456d..4cd81cd0e7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -18,8 +18,8 @@ object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec -class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec with AccrualFailureDetectorStrategy abstract class MembershipChangeListenerUpSpec extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index b185067ab0..39ecd8b0dc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -5,7 +5,7 @@ package akka.cluster import com.typesafe.config.Config import com.typesafe.config.ConfigFactory -import akka.actor.Address +import akka.actor.{Address, ExtendedActorSystem} import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeSpec import akka.testkit._ @@ -28,14 +28,19 @@ object MultiNodeClusterSpec { """) } -trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ +trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ override def initialParticipants = roles.size /** - * Get or create a cluster node using 'Cluster(system)' extension. + * The cluster node instance. Needs to be lazily created. */ - def cluster: Cluster = Cluster(system) + private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + + /** + * Get the cluster node to use. + */ + def cluster: Cluster = clusterNode /** * Use this method instead of 'cluster.self' @@ -48,9 +53,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ * nodes (roles). First node will be started first * and others will join the first. */ - def startCluster(roles: RoleName*): Unit = { - awaitStartCluster(false, roles.toSeq) - } + def startCluster(roles: RoleName*): Unit = awaitStartCluster(false, roles.toSeq) /** * Initialize the cluster of the specified member diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index 066e86aae6..58ed162af7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -24,8 +24,8 @@ object NodeJoinMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class NodeJoinMultiJvmNode1 extends NodeJoinSpec -class NodeJoinMultiJvmNode2 extends NodeJoinSpec +class NodeJoinMultiJvmNode1 extends NodeJoinSpec with AccrualFailureDetectorStrategy +class NodeJoinMultiJvmNode2 extends NodeJoinSpec with AccrualFailureDetectorStrategy abstract class NodeJoinSpec extends MultiNodeSpec(NodeJoinMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index 8e274be311..a16ae055f0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -18,9 +18,9 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 79fff4770f..bb32d8641f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -26,9 +26,9 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec -class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec -class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec +class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec with AccrualFailureDetectorStrategy +class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec with AccrualFailureDetectorStrategy +class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec with AccrualFailureDetectorStrategy abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala index b834492045..eccba596f2 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala @@ -22,9 +22,9 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeLeavingMultiJvmNode1 extends NodeLeavingSpec -class NodeLeavingMultiJvmNode2 extends NodeLeavingSpec -class NodeLeavingMultiJvmNode3 extends NodeLeavingSpec +class NodeLeavingMultiJvmNode1 extends NodeLeavingSpec with AccrualFailureDetectorStrategy +class NodeLeavingMultiJvmNode2 extends NodeLeavingSpec with AccrualFailureDetectorStrategy +class NodeLeavingMultiJvmNode3 extends NodeLeavingSpec with AccrualFailureDetectorStrategy abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec) with MultiNodeClusterSpec { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index ef65cefd0f..c7fa1569f2 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -16,9 +16,9 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec -class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec -class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec with AccrualFailureDetectorStrategy +class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec with AccrualFailureDetectorStrategy +class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec with AccrualFailureDetectorStrategy abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala index 4dc90a5b89..7417ae06d5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala @@ -24,8 +24,8 @@ object NodeShutdownMultiJvmSpec extends MultiNodeConfig { } -class NodeShutdownMultiJvmNode1 extends NodeShutdownSpec -class NodeShutdownMultiJvmNode2 extends NodeShutdownSpec +class NodeShutdownMultiJvmNode1 extends NodeShutdownSpec with AccrualFailureDetectorStrategy +class NodeShutdownMultiJvmNode2 extends NodeShutdownSpec with AccrualFailureDetectorStrategy abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) with MultiNodeClusterSpec { import NodeShutdownMultiJvmSpec._ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index 6cb8bf9e07..4a2342fca1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -19,8 +19,8 @@ object NodeUpMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeUpMultiJvmNode1 extends NodeUpSpec -class NodeUpMultiJvmNode2 extends NodeUpSpec +class NodeUpMultiJvmNode1 extends NodeUpSpec with AccrualFailureDetectorStrategy +class NodeUpMultiJvmNode2 extends NodeUpSpec with AccrualFailureDetectorStrategy abstract class NodeUpSpec extends MultiNodeSpec(NodeUpMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index fcb1393f8a..cabaf21ab1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -28,11 +28,11 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { """)) } -class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec -class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec -class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec -class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec -class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy abstract class SunnyWeatherSpec extends MultiNodeSpec(SunnyWeatherMultiJvmSpec) diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala index 3245a15f97..f35bca381d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -28,12 +28,12 @@ class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) exte private val connections = new ConcurrentHashMap[Address, Status] - def markAsDown(connection: Address): this.type = { + def markNodeAsUnavailable(connection: Address): this.type = { connections.put(connection, Down) this } - def markAsUp(connection: Address): this.type = { + def markNodeAsAvailable(connection: Address): this.type = { connections.put(connection, Up) this } From 2dcceb58ce688b9fec6174126bfe6d3b774d0f74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 11 Jun 2012 16:48:19 +0200 Subject: [PATCH 7/8] Split up all tests that are related to failure detection into two versions: Accrual FD and FD Puppet. Also moved all tests that are not failure detection tests to use FD Puppet. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- ...ientDowningNodeThatIsUnreachableSpec.scala | 14 +++- .../ClientDowningNodeThatIsUpSpec.scala | 15 +++- .../scala/akka/cluster/ConvergenceSpec.scala | 21 ++--- .../GossipingAccrualFailureDetectorSpec.scala | 10 ++- .../akka/cluster/JoinTwoClustersSpec.scala | 12 +-- ...aderDowningNodeThatIsUnreachableSpec.scala | 4 +- .../akka/cluster/LeaderElectionSpec.scala | 19 +++-- .../MembershipChangeListenerExitingSpec.scala | 6 +- .../MembershipChangeListenerJoinSpec.scala | 12 +-- .../MembershipChangeListenerLeavingSpec.scala | 6 +- .../MembershipChangeListenerSpec.scala | 77 ------------------- .../MembershipChangeListenerUpSpec.scala | 53 +++++++++---- .../scala/akka/cluster/NodeJoinSpec.scala | 12 +-- .../cluster/NodeLeavingAndExitingSpec.scala | 6 +- .../scala/akka/cluster/NodeLeavingSpec.scala | 14 ++-- .../akka/cluster/NodeMembershipSpec.scala | 6 +- .../scala/akka/cluster/NodeShutdownSpec.scala | 17 +++- .../scala/akka/cluster/NodeUpSpec.scala | 4 +- .../scala/akka/cluster/SunnyWeatherSpec.scala | 12 +-- 19 files changed, 145 insertions(+), 175 deletions(-) delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index d1a9f756dd..343f0c7c17 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -18,10 +18,15 @@ object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ClientDowningNodeThatIsUnreachableMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy + +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy abstract class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) @@ -38,6 +43,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec runOn(first) { // kill 'third' node testConductor.shutdown(third, 0) + markNodeAsUnavailable(thirdAddress) // mark 'third' node as DOWN cluster.down(thirdAddress) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 687596745b..95eeefd982 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -18,10 +18,15 @@ object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ClientDowningNodeThatIsUpMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy -class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy + +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy abstract class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) @@ -40,6 +45,8 @@ abstract class ClientDowningNodeThatIsUpSpec cluster.down(thirdAddress) testConductor.enter("down-third-node") + markNodeAsUnavailable(thirdAddress) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index df47e19bec..bdc0a1ae8b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -17,22 +17,24 @@ object ConvergenceMultiJvmSpec extends MultiNodeConfig { val fourth = role("fourth") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString(""" - akka.cluster { - failure-detector.threshold = 4 - } - """)). + withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")). withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ConvergenceMultiJvmNode1 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceMultiJvmNode2 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceMultiJvmNode3 extends ConvergenceSpec with AccrualFailureDetectorStrategy -class ConvergenceMultiJvmNode4 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec with FailureDetectorPuppetStrategy +class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec with FailureDetectorPuppetStrategy +class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec with FailureDetectorPuppetStrategy +class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec with FailureDetectorPuppetStrategy + +class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec with AccrualFailureDetectorStrategy abstract class ConvergenceSpec extends MultiNodeSpec(ConvergenceMultiJvmSpec) with MultiNodeClusterSpec { + import ConvergenceMultiJvmSpec._ "A cluster of 3 members" must { @@ -54,6 +56,7 @@ abstract class ConvergenceSpec runOn(first) { // kill 'third' node testConductor.shutdown(third, 0) + markNodeAsUnavailable(thirdAddress) } runOn(first, second) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index 63090b7a1f..b14c0d927c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -19,12 +19,14 @@ object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfig)) } -class GossipingAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -class GossipingAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -class GossipingAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class GossipingWithAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class GossipingWithAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class GossipingWithAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec) +abstract class GossipingAccrualFailureDetectorSpec + extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec) with MultiNodeClusterSpec { + import GossipingAccrualFailureDetectorMultiJvmSpec._ lazy val firstAddress = node(first).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 2000e63253..4b64bb6e58 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -20,12 +20,12 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy -class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy -class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy -class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy -class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy -class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec with AccrualFailureDetectorStrategy +class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 7dcb6b20f6..5e2545394d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -17,9 +17,7 @@ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { val fourth = role("fourth") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString(""" - akka.cluster.auto-down = on - """)). + withFallback(ConfigFactory.parseString("akka.cluster.auto-down = on")). withFallback(MultiNodeClusterSpec.clusterConfig)) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index f44b494917..e161206ba0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -19,11 +19,17 @@ object LeaderElectionMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec with AccrualFailureDetectorStrategy -class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec with FailureDetectorPuppetStrategy + +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec with AccrualFailureDetectorStrategy abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) @@ -57,9 +63,11 @@ abstract class LeaderElectionSpec myself match { case `controller` ⇒ + val leaderAddress = node(leader).address testConductor.enter("before-shutdown") testConductor.shutdown(leader, 0) testConductor.enter("after-shutdown", "after-down", "completed") + markNodeAsUnavailable(leaderAddress) case `leader` ⇒ testConductor.enter("before-shutdown", "after-shutdown") @@ -71,6 +79,7 @@ abstract class LeaderElectionSpec // user marks the shutdown leader as DOWN cluster.down(leaderAddress) testConductor.enter("after-down", "completed") + markNodeAsUnavailable(leaderAddress) case _ if remainingRoles.contains(myself) ⇒ // remaining cluster nodes, not shutdown diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 7389a01ffc..d9b2c7b876 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -27,9 +27,9 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec with AccrualFailureDetectorStrategy -class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec with AccrualFailureDetectorStrategy -class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 8a940375ef..2809ae820b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -17,16 +17,12 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.cluster { - leader-actions-interval = 5 s # increase the leader action task interval to allow time checking for JOIN before leader moves it to UP - } - """) - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP + .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with AccrualFailureDetectorStrategy -class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy abstract class MembershipChangeListenerJoinSpec extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index d7c79407a2..57cec4f389 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -24,9 +24,9 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig)) } -class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec with AccrualFailureDetectorStrategy -class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec with AccrualFailureDetectorStrategy -class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala deleted file mode 100644 index 914db94acb..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import scala.collection.immutable.SortedSet -import com.typesafe.config.ConfigFactory -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - val third = role("third") - - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) -} - -class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec with AccrualFailureDetectorStrategy -class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec with AccrualFailureDetectorStrategy -class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec with AccrualFailureDetectorStrategy - -abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) - with MultiNodeClusterSpec { - import MembershipChangeListenerMultiJvmSpec._ - - lazy val firstAddress = node(first).address - lazy val secondAddress = node(second).address - - "A set of connected cluster systems" must { - - "(when two nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - - awaitClusterUp(first) - - runOn(first, second) { - val latch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) - latch.countDown() - } - }) - testConductor.enter("listener-1-registered") - cluster.join(firstAddress) - latch.await - } - - runOn(third) { - testConductor.enter("listener-1-registered") - } - - testConductor.enter("after-1") - } - - "(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - - val latch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) - latch.countDown() - } - }) - testConductor.enter("listener-2-registered") - - runOn(third) { - cluster.join(firstAddress) - } - - latch.await - - testConductor.enter("after-2") - } - } -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index 4cd81cd0e7..c89bbe1f0a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -5,21 +5,21 @@ package akka.cluster import scala.collection.immutable.SortedSet import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.util.duration._ object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") + val third = role("third") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec with AccrualFailureDetectorStrategy -class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec with AccrualFailureDetectorStrategy +class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy abstract class MembershipChangeListenerUpSpec extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec) @@ -30,29 +30,50 @@ abstract class MembershipChangeListenerUpSpec lazy val firstAddress = node(first).address lazy val secondAddress = node(second).address - "A registered MembershipChangeListener" must { - "be notified when new node is marked as UP by the leader" taggedAs LongRunningTest in { + "A set of connected cluster systems" must { - runOn(first) { - val upLatch = TestLatch() + "(when two nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { + + awaitClusterUp(first) + + runOn(first, second) { + val latch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) - upLatch.countDown() + latch.countDown() } }) - testConductor.enter("registered-listener") - - upLatch.await - awaitUpConvergence(numberOfMembers = 2) + testConductor.enter("listener-1-registered") + cluster.join(firstAddress) + latch.await } - runOn(second) { - testConductor.enter("registered-listener") + runOn(third) { + testConductor.enter("listener-1-registered") + } + + testConductor.enter("after-1") + } + + "(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { + + val latch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) + latch.countDown() + } + }) + testConductor.enter("listener-2-registered") + + runOn(third) { cluster.join(firstAddress) } - testConductor.enter("after") + latch.await + + testConductor.enter("after-2") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index 58ed162af7..6cf5fc220d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -16,16 +16,12 @@ object NodeJoinMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.cluster { - leader-actions-interval = 5 s # increase the leader action task interval - } - """) - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval + .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class NodeJoinMultiJvmNode1 extends NodeJoinSpec with AccrualFailureDetectorStrategy -class NodeJoinMultiJvmNode2 extends NodeJoinSpec with AccrualFailureDetectorStrategy +class NodeJoinMultiJvmNode1 extends NodeJoinSpec with FailureDetectorPuppetStrategy +class NodeJoinMultiJvmNode2 extends NodeJoinSpec with FailureDetectorPuppetStrategy abstract class NodeJoinSpec extends MultiNodeSpec(NodeJoinMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index bb32d8641f..ef285b5070 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -26,9 +26,9 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec with AccrualFailureDetectorStrategy -class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec with AccrualFailureDetectorStrategy -class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec with AccrualFailureDetectorStrategy +class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy +class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy +class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala index eccba596f2..8f637d87e5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala @@ -16,18 +16,18 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.cluster.unreachable-nodes-reaper-frequency = 30 s - """)) + .withFallback(ConfigFactory.parseString("akka.cluster.unreachable-nodes-reaper-frequency = 30 s")) .withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeLeavingMultiJvmNode1 extends NodeLeavingSpec with AccrualFailureDetectorStrategy -class NodeLeavingMultiJvmNode2 extends NodeLeavingSpec with AccrualFailureDetectorStrategy -class NodeLeavingMultiJvmNode3 extends NodeLeavingSpec with AccrualFailureDetectorStrategy +class NodeLeavingMultiJvmNode1 extends NodeLeavingSpec with FailureDetectorPuppetStrategy +class NodeLeavingMultiJvmNode2 extends NodeLeavingSpec with FailureDetectorPuppetStrategy +class NodeLeavingMultiJvmNode3 extends NodeLeavingSpec with FailureDetectorPuppetStrategy -abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec) +abstract class NodeLeavingSpec + extends MultiNodeSpec(NodeLeavingMultiJvmSpec) with MultiNodeClusterSpec { + import NodeLeavingMultiJvmSpec._ lazy val firstAddress = node(first).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index c7fa1569f2..fb0573f77f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -16,9 +16,9 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec with AccrualFailureDetectorStrategy -class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec with AccrualFailureDetectorStrategy -class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec with AccrualFailureDetectorStrategy +class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec with FailureDetectorPuppetStrategy +class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec with FailureDetectorPuppetStrategy +class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec with FailureDetectorPuppetStrategy abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala index 7417ae06d5..69b0a43a20 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala @@ -16,7 +16,7 @@ object NodeShutdownMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" akka.cluster { - auto-down = on + auto-down = on failure-detector.threshold = 4 } """)). @@ -24,10 +24,16 @@ object NodeShutdownMultiJvmSpec extends MultiNodeConfig { } -class NodeShutdownMultiJvmNode1 extends NodeShutdownSpec with AccrualFailureDetectorStrategy -class NodeShutdownMultiJvmNode2 extends NodeShutdownSpec with AccrualFailureDetectorStrategy +class NodeShutdownWithFailureDetectorPuppetMultiJvmNode1 extends NodeShutdownSpec with FailureDetectorPuppetStrategy +class NodeShutdownWithFailureDetectorPuppetMultiJvmNode2 extends NodeShutdownSpec with FailureDetectorPuppetStrategy + +class NodeShutdownWithAccrualFailureDetectorMultiJvmNode1 extends NodeShutdownSpec with AccrualFailureDetectorStrategy +class NodeShutdownWithAccrualFailureDetectorMultiJvmNode2 extends NodeShutdownSpec with AccrualFailureDetectorStrategy + +abstract class NodeShutdownSpec + extends MultiNodeSpec(NodeShutdownMultiJvmSpec) + with MultiNodeClusterSpec { -abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) with MultiNodeClusterSpec { import NodeShutdownMultiJvmSpec._ "A cluster of 2 nodes" must { @@ -44,6 +50,9 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) runOn(first) { val secondAddress = node(second).address testConductor.shutdown(second, 0) + + markNodeAsUnavailable(secondAddress) + awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) cluster.isSingletonCluster must be(true) assertLeader(first) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index 4a2342fca1..0fdc3c89b8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -19,8 +19,8 @@ object NodeUpMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeUpMultiJvmNode1 extends NodeUpSpec with AccrualFailureDetectorStrategy -class NodeUpMultiJvmNode2 extends NodeUpSpec with AccrualFailureDetectorStrategy +class NodeUpMultiJvmNode1 extends NodeUpSpec with FailureDetectorPuppetStrategy +class NodeUpMultiJvmNode2 extends NodeUpSpec with FailureDetectorPuppetStrategy abstract class NodeUpSpec extends MultiNodeSpec(NodeUpMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index cabaf21ab1..b8486841c6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -21,18 +21,18 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.cluster { - gossip-interval = 400 ms + gossip-interval = 400 ms nr-of-deputy-nodes = 0 } akka.loglevel = INFO """)) } -class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy -class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with AccrualFailureDetectorStrategy +class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy +class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy +class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy +class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy +class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy abstract class SunnyWeatherSpec extends MultiNodeSpec(SunnyWeatherMultiJvmSpec) From 36b040cfab7164079af75f6cf5ebbd00a279a245 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 11 Jun 2012 18:11:02 +0200 Subject: [PATCH 8/8] Unbreaking master --- akka-cluster/src/main/resources/reference.conf | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 8c905d5b29..cdaf8c729c 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -33,7 +33,6 @@ akka { # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? unreachable-nodes-reaper-interval = 1s - # accrual failure detection config failure-detector { # defines the failure detector threshold @@ -43,6 +42,8 @@ akka { # actual crashes threshold = 8 + implementation-class = "" + max-sample-size = 1000 } }