diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 1e7c0e4c08..7fb930eaef 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -36,7 +36,6 @@ akka { # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? unreachable-nodes-reaper-interval = 1s - # accrual failure detection config failure-detector { # defines the failure detector threshold @@ -46,6 +45,8 @@ akka { # actual crashes threshold = 8 + implementation-class = "" + max-sample-size = 1000 } } diff --git a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala index 76c773f759..6632111f00 100644 --- a/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala @@ -4,7 +4,8 @@ package akka.cluster -import akka.actor.{ ActorSystem, Address } +import akka.actor.{ ActorSystem, Address, ExtendedActorSystem } +import akka.remote.RemoteActorRefProvider import akka.event.Logging import scala.collection.immutable.Map @@ -23,11 +24,20 @@ import java.util.concurrent.atomic.AtomicReference * Default threshold is 8, but can be configured in the Akka config. */ class AccrualFailureDetector( - system: ActorSystem, - address: Address, + val system: ActorSystem, val threshold: Int = 8, val maxSampleSize: Int = 1000, - val timeMachine: () ⇒ Long = System.currentTimeMillis) { + val timeMachine: () ⇒ Long = System.currentTimeMillis) extends FailureDetector { + + def this( + system: ActorSystem, + settings: ClusterSettings, + timeMachine: () ⇒ Long = System.currentTimeMillis) = + this( + system, + settings.FailureDetectorThreshold, + settings.FailureDetectorMaxSampleSize, + timeMachine) private final val PhiFactor = 1.0 / math.log(10.0) @@ -66,8 +76,7 @@ class AccrualFailureDetector( */ @tailrec final def heartbeat(connection: Address) { - // FIXME change to debug log level, when failure detector is stable - log.info("Node [{}] - Heartbeat from connection [{}] ", address, connection) + log.debug("Heartbeat from connection [{}] ", connection) val oldState = state.get val latestTimestamp = oldState.timestamps.get(connection) @@ -157,7 +166,7 @@ class AccrualFailureDetector( } // FIXME change to debug log level, when failure detector is stable - log.info("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) + log.info("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection) phi } @@ -165,8 +174,8 @@ class AccrualFailureDetector( * Removes the heartbeat management for a connection. */ @tailrec - final def remove(connection: Address) { - log.debug("Node [{}] - Remove connection [{}] ", address, connection) + final def remove(connection: Address): Unit = { + log.debug("Remove connection [{}] ", connection) val oldState = state.get if (oldState.failureStats.contains(connection)) { diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 8be6b21d25..46c6919cc1 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -317,7 +317,21 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider { override def lookup = Cluster - override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system) + override def createExtension(system: ExtendedActorSystem): Cluster = { + val clusterSettings = new ClusterSettings(system.settings.config, system.name) + + val failureDetector = clusterSettings.FailureDetectorImplementationClass match { + case None ⇒ new AccrualFailureDetector(system, clusterSettings) + case Some(fqcn) ⇒ + system.dynamicAccess.createInstanceFor[FailureDetector]( + fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match { + case Right(fd) ⇒ fd + case Left(e) ⇒ throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString) + } + } + + new Cluster(system, failureDetector) + } } /** @@ -360,7 +374,7 @@ trait ClusterNodeMBean { * if (Cluster(system).isLeader) { ... } * }}} */ -class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ +class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension { clusterNode ⇒ /** * Represents the state for this Cluster. Implemented using optimistic lockless concurrency. @@ -382,9 +396,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ val selfAddress = remote.transport.address private val selfHeartbeat = Heartbeat(selfAddress) - val failureDetector = new AccrualFailureDetector( - system, selfAddress, FailureDetectorThreshold, FailureDetectorMaxSampleSize) - private val vclockNode = VectorClock.Node(selfAddress.toString) implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 90831db2e6..9a17f2a0eb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -15,6 +15,10 @@ class ClusterSettings(val config: Config, val systemName: String) { import config._ val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") + val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match { + case "" ⇒ None + case fqcn ⇒ Some(fqcn) + } val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { case "" ⇒ None case AddressFromURIString(addr) ⇒ Some(addr) diff --git a/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala new file mode 100644 index 0000000000..60af0a1c41 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/FailureDetector.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.actor.Address + +/** + * Interface for Akka failure detectors. + */ +trait FailureDetector { + + /** + * Returns true if the connection is considered to be up and healthy and returns false otherwise. + */ + def isAvailable(connection: Address): Boolean + + /** + * Records a heartbeat for a connection. + */ + def heartbeat(connection: Address): Unit + + /** + * Removes the heartbeat management for a connection. + */ + def remove(connection: Address): Unit +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 6d4d09f7cb..343f0c7c17 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -18,12 +18,17 @@ object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ClientDowningNodeThatIsUnreachableMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec -class ClientDowningNodeThatIsUnreachableMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec -class ClientDowningNodeThatIsUnreachableMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec -class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUnreachableSpec +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy + +abstract class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) with MultiNodeClusterSpec { @@ -38,6 +43,7 @@ class ClientDowningNodeThatIsUnreachableSpec runOn(first) { // kill 'third' node testConductor.shutdown(third, 0) + markNodeAsUnavailable(thirdAddress) // mark 'third' node as DOWN cluster.down(thirdAddress) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index db00438c9e..95eeefd982 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -18,12 +18,17 @@ object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ClientDowningNodeThatIsUpMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec -class ClientDowningNodeThatIsUpMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec -class ClientDowningNodeThatIsUpMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec -class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy +class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy -class ClientDowningNodeThatIsUpSpec +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy +class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy + +abstract class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) with MultiNodeClusterSpec { @@ -40,6 +45,8 @@ class ClientDowningNodeThatIsUpSpec cluster.down(thirdAddress) testConductor.enter("down-third-node") + markNodeAsUnavailable(thirdAddress) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala index 9963903b90..bdc0a1ae8b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ConvergenceSpec.scala @@ -17,27 +17,29 @@ object ConvergenceMultiJvmSpec extends MultiNodeConfig { val fourth = role("fourth") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString(""" - akka.cluster { - failure-detector.threshold = 4 - } - """)). + withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")). withFallback(MultiNodeClusterSpec.clusterConfig)) } -class ConvergenceMultiJvmNode1 extends ConvergenceSpec -class ConvergenceMultiJvmNode2 extends ConvergenceSpec -class ConvergenceMultiJvmNode3 extends ConvergenceSpec -class ConvergenceMultiJvmNode4 extends ConvergenceSpec +class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec with FailureDetectorPuppetStrategy +class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec with FailureDetectorPuppetStrategy +class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec with FailureDetectorPuppetStrategy +class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec with FailureDetectorPuppetStrategy + +class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec with AccrualFailureDetectorStrategy +class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec with AccrualFailureDetectorStrategy abstract class ConvergenceSpec extends MultiNodeSpec(ConvergenceMultiJvmSpec) with MultiNodeClusterSpec { + import ConvergenceMultiJvmSpec._ "A cluster of 3 members" must { - "reach initial convergence" taggedAs LongRunningTest in { + "reach initial convergence" taggedAs LongRunningTest ignore { awaitClusterUp(first, second, third) runOn(fourth) { @@ -47,13 +49,14 @@ abstract class ConvergenceSpec testConductor.enter("after-1") } - "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in { + "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore { val thirdAddress = node(third).address testConductor.enter("before-shutdown") runOn(first) { // kill 'third' node testConductor.shutdown(third, 0) + markNodeAsUnavailable(thirdAddress) } runOn(first, second) { @@ -78,7 +81,7 @@ abstract class ConvergenceSpec testConductor.enter("after-2") } - "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in { + "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore { runOn(fourth) { // try to join cluster.join(node(first).address) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala new file mode 100644 index 0000000000..dcbb65d0f1 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/FailureDetectorStrategy.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import akka.actor.Address +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +/** + * Base trait for all failure detector strategies. + */ +trait FailureDetectorStrategy { + + /** + * Get or create the FailureDetector to be used in the cluster node. + * To be defined by subclass. + */ + def failureDetector: FailureDetector + + /** + * Marks a node as available in the failure detector. + * To be defined by subclass. + */ + def markNodeAsAvailable(address: Address): Unit + + /** + * Marks a node as unavailable in the failure detector. + * To be defined by subclass. + */ + def markNodeAsUnavailable(address: Address): Unit +} + +/** + * Defines a FailureDetectorPuppet-based FailureDetectorStrategy. + */ +trait FailureDetectorPuppetStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ + + /** + * The puppet instance. Separated from 'failureDetector' field so we don't have to cast when using the puppet specific methods. + */ + private val puppet = new FailureDetectorPuppet(system) + + override def failureDetector: FailureDetector = puppet + + override def markNodeAsAvailable(address: Address): Unit = puppet markNodeAsAvailable address + + override def markNodeAsUnavailable(address: Address): Unit = puppet markNodeAsUnavailable address +} + +/** + * Defines a AccrualFailureDetector-based FailureDetectorStrategy. + */ +trait AccrualFailureDetectorStrategy extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ + + override val failureDetector: FailureDetector = new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name)) + + override def markNodeAsAvailable(address: Address): Unit = { /* no-op */ } + + override def markNodeAsUnavailable(address: Address): Unit = { /* no-op */ } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index f75ca3b058..b14c0d927c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -19,12 +19,14 @@ object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { withFallback(MultiNodeClusterSpec.clusterConfig)) } -class GossipingAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec -class GossipingAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec -class GossipingAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec +class GossipingWithAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class GossipingWithAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy +class GossipingWithAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy -abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec) +abstract class GossipingAccrualFailureDetectorSpec + extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec) with MultiNodeClusterSpec { + import GossipingAccrualFailureDetectorMultiJvmSpec._ lazy val firstAddress = node(first).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 2e27f4c3bd..f4ea161b2a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -20,12 +20,12 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec -class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy +class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 616c412556..5e2545394d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -7,7 +7,7 @@ import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.actor.Address +import akka.actor._ import akka.util.duration._ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { @@ -16,22 +16,22 @@ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { val third = role("third") val fourth = role("fourth") - commonConfig(debugConfig(on = true). - withFallback(ConfigFactory.parseString(""" - akka.cluster { - auto-down = on - failure-detector.threshold = 4 - } - """)). + commonConfig(debugConfig(on = false). + withFallback(ConfigFactory.parseString("akka.cluster.auto-down = on")). withFallback(MultiNodeClusterSpec.clusterConfig)) } -class LeaderDowningNodeThatIsUnreachableMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec -class LeaderDowningNodeThatIsUnreachableMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec -class LeaderDowningNodeThatIsUnreachableMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec -class LeaderDowningNodeThatIsUnreachableMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy +class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy -class LeaderDowningNodeThatIsUnreachableSpec +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy +class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy + +abstract class LeaderDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec) with MultiNodeClusterSpec { @@ -40,14 +40,17 @@ class LeaderDowningNodeThatIsUnreachableSpec "The Leader in a 4 node cluster" must { "be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in { - val fourthAddress = node(fourth).address awaitClusterUp(first, second, third, fourth) + val fourthAddress = node(fourth).address runOn(first) { // kill 'fourth' node testConductor.shutdown(fourth, 0) testConductor.enter("down-fourth-node") + // mark the node as unreachable in the failure detector + markNodeAsUnavailable(fourthAddress) + // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) @@ -68,13 +71,16 @@ class LeaderDowningNodeThatIsUnreachableSpec "be able to DOWN a 'middle' node that is UNREACHABLE" taggedAs LongRunningTest in { val secondAddress = node(second).address - testConductor.enter("before-down-second-node") + testConductor.enter("before-down-second-node") runOn(first) { // kill 'second' node testConductor.shutdown(second, 0) testConductor.enter("down-second-node") + // mark the node as unreachable in the failure detector + markNodeAsUnavailable(secondAddress) + // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index 43f0fc19eb..e161206ba0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -19,11 +19,17 @@ object LeaderElectionMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec -class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec -class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec -class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec -class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec with FailureDetectorPuppetStrategy +class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec with FailureDetectorPuppetStrategy + +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec with AccrualFailureDetectorStrategy +class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec with AccrualFailureDetectorStrategy abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) @@ -57,9 +63,11 @@ abstract class LeaderElectionSpec myself match { case `controller` ⇒ + val leaderAddress = node(leader).address testConductor.enter("before-shutdown") testConductor.shutdown(leader, 0) testConductor.enter("after-shutdown", "after-down", "completed") + markNodeAsUnavailable(leaderAddress) case `leader` ⇒ testConductor.enter("before-shutdown", "after-shutdown") @@ -71,6 +79,7 @@ abstract class LeaderElectionSpec // user marks the shutdown leader as DOWN cluster.down(leaderAddress) testConductor.enter("after-down", "completed") + markNodeAsUnavailable(leaderAddress) case _ if remainingRoles.contains(myself) ⇒ // remaining cluster nodes, not shutdown diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index d76c3cf689..d9b2c7b876 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -27,9 +27,9 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec -class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec -class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index bdf8f7d44d..2809ae820b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -17,16 +17,12 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.cluster { - leader-actions-interval = 5 s # increase the leader action task interval to allow time checking for JOIN before leader moves it to UP - } - """) - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP + .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec -class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec +class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy abstract class MembershipChangeListenerJoinSpec extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index 1ff11465bb..57cec4f389 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -24,9 +24,9 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig)) } -class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec -class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec -class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala deleted file mode 100644 index c48727b1cd..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import scala.collection.immutable.SortedSet -import com.typesafe.config.ConfigFactory -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - val third = role("third") - - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) -} - -class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec -class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec -class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec - -abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) - with MultiNodeClusterSpec { - import MembershipChangeListenerMultiJvmSpec._ - - lazy val firstAddress = node(first).address - lazy val secondAddress = node(second).address - - "A set of connected cluster systems" must { - - "(when two nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - - awaitClusterUp(first) - - runOn(first, second) { - val latch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) - latch.countDown() - } - }) - testConductor.enter("listener-1-registered") - cluster.join(firstAddress) - latch.await - } - - runOn(third) { - testConductor.enter("listener-1-registered") - } - - testConductor.enter("after-1") - } - - "(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - - val latch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) - latch.countDown() - } - }) - testConductor.enter("listener-2-registered") - - runOn(third) { - cluster.join(firstAddress) - } - - latch.await - - testConductor.enter("after-2") - } - } -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala index 3e22dd456d..c89bbe1f0a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -5,21 +5,21 @@ package akka.cluster import scala.collection.immutable.SortedSet import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ -import akka.util.duration._ object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") + val third = role("third") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec -class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy +class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy abstract class MembershipChangeListenerUpSpec extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec) @@ -30,29 +30,50 @@ abstract class MembershipChangeListenerUpSpec lazy val firstAddress = node(first).address lazy val secondAddress = node(second).address - "A registered MembershipChangeListener" must { - "be notified when new node is marked as UP by the leader" taggedAs LongRunningTest in { + "A set of connected cluster systems" must { - runOn(first) { - val upLatch = TestLatch() + "(when two nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { + + awaitClusterUp(first) + + runOn(first, second) { + val latch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) - upLatch.countDown() + latch.countDown() } }) - testConductor.enter("registered-listener") - - upLatch.await - awaitUpConvergence(numberOfMembers = 2) + testConductor.enter("listener-1-registered") + cluster.join(firstAddress) + latch.await } - runOn(second) { - testConductor.enter("registered-listener") + runOn(third) { + testConductor.enter("listener-1-registered") + } + + testConductor.enter("after-1") + } + + "(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { + + val latch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) + latch.countDown() + } + }) + testConductor.enter("listener-2-registered") + + runOn(third) { cluster.join(firstAddress) } - testConductor.enter("after") + latch.await + + testConductor.enter("after-2") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 729923699d..b4532f7efc 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -5,7 +5,7 @@ package akka.cluster import com.typesafe.config.Config import com.typesafe.config.ConfigFactory -import akka.actor.Address +import akka.actor.{Address, ExtendedActorSystem} import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeSpec import akka.testkit._ @@ -29,14 +29,19 @@ object MultiNodeClusterSpec { """) } -trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ +trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec ⇒ override def initialParticipants = roles.size /** - * Get or create a cluster node using 'Cluster(system)' extension. + * The cluster node instance. Needs to be lazily created. */ - def cluster: Cluster = Cluster(system) + private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector) + + /** + * Get the cluster node to use. + */ + def cluster: Cluster = clusterNode /** * Use this method instead of 'cluster.self' @@ -49,9 +54,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ * nodes (roles). First node will be started first * and others will join the first. */ - def startCluster(roles: RoleName*): Unit = { - awaitStartCluster(false, roles.toSeq) - } + def startCluster(roles: RoleName*): Unit = awaitStartCluster(false, roles.toSeq) /** * Initialize the cluster of the specified member diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala index 066e86aae6..6cf5fc220d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -16,16 +16,12 @@ object NodeJoinMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.cluster { - leader-actions-interval = 5 s # increase the leader action task interval - } - """) - .withFallback(MultiNodeClusterSpec.clusterConfig))) + .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval + .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class NodeJoinMultiJvmNode1 extends NodeJoinSpec -class NodeJoinMultiJvmNode2 extends NodeJoinSpec +class NodeJoinMultiJvmNode1 extends NodeJoinSpec with FailureDetectorPuppetStrategy +class NodeJoinMultiJvmNode2 extends NodeJoinSpec with FailureDetectorPuppetStrategy abstract class NodeJoinSpec extends MultiNodeSpec(NodeJoinMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index d85016c714..01e5f8aa74 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -18,9 +18,9 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec -class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy +class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 2909362fa7..6378a74040 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -26,9 +26,9 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec -class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec -class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec +class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy +class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy +class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala index 27bc36a3bf..8ea21e9380 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala @@ -16,18 +16,18 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.cluster.unreachable-nodes-reaper-frequency = 30 s - """)) + .withFallback(ConfigFactory.parseString("akka.cluster.unreachable-nodes-reaper-frequency = 30 s")) .withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeLeavingMultiJvmNode1 extends NodeLeavingSpec -class NodeLeavingMultiJvmNode2 extends NodeLeavingSpec -class NodeLeavingMultiJvmNode3 extends NodeLeavingSpec +class NodeLeavingMultiJvmNode1 extends NodeLeavingSpec with FailureDetectorPuppetStrategy +class NodeLeavingMultiJvmNode2 extends NodeLeavingSpec with FailureDetectorPuppetStrategy +class NodeLeavingMultiJvmNode3 extends NodeLeavingSpec with FailureDetectorPuppetStrategy -abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec) +abstract class NodeLeavingSpec + extends MultiNodeSpec(NodeLeavingMultiJvmSpec) with MultiNodeClusterSpec { + import NodeLeavingMultiJvmSpec._ lazy val firstAddress = node(first).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index ef65cefd0f..fb0573f77f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -16,9 +16,9 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec -class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec -class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec +class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec with FailureDetectorPuppetStrategy +class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec with FailureDetectorPuppetStrategy +class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec with FailureDetectorPuppetStrategy abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala index 6cb8bf9e07..0fdc3c89b8 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -19,8 +19,8 @@ object NodeUpMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } -class NodeUpMultiJvmNode1 extends NodeUpSpec -class NodeUpMultiJvmNode2 extends NodeUpSpec +class NodeUpMultiJvmNode1 extends NodeUpSpec with FailureDetectorPuppetStrategy +class NodeUpMultiJvmNode2 extends NodeUpSpec with FailureDetectorPuppetStrategy abstract class NodeUpSpec extends MultiNodeSpec(NodeUpMultiJvmSpec) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 68d20012f5..cada29e210 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -16,7 +16,7 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false). withFallback(ConfigFactory.parseString(""" akka.cluster { - auto-down = on + auto-down = on failure-detector.threshold = 4 } """)). @@ -24,10 +24,16 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig { } -class SingletonClusterMultiJvmNode1 extends SingletonClusterSpec -class SingletonClusterMultiJvmNode2 extends SingletonClusterSpec +class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec with FailureDetectorPuppetStrategy +class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec with FailureDetectorPuppetStrategy + +class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec with AccrualFailureDetectorStrategy +class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec with AccrualFailureDetectorStrategy + +abstract class SingletonClusterSpec + extends MultiNodeSpec(SingletonClusterMultiJvmSpec) + with MultiNodeClusterSpec { -abstract class SingletonClusterSpec extends MultiNodeSpec(SingletonClusterMultiJvmSpec) with MultiNodeClusterSpec { import SingletonClusterMultiJvmSpec._ "A cluster of 2 nodes" must { @@ -44,6 +50,9 @@ abstract class SingletonClusterSpec extends MultiNodeSpec(SingletonClusterMultiJ runOn(first) { val secondAddress = node(second).address testConductor.shutdown(second, 0) + + markNodeAsUnavailable(secondAddress) + awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) cluster.isSingletonCluster must be(true) assertLeader(first) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala index fcb1393f8a..b8486841c6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SunnyWeatherSpec.scala @@ -21,18 +21,18 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(""" akka.cluster { - gossip-interval = 400 ms + gossip-interval = 400 ms nr-of-deputy-nodes = 0 } akka.loglevel = INFO """)) } -class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec -class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec -class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec -class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec -class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec +class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy +class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy +class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy +class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy +class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy abstract class SunnyWeatherSpec extends MultiNodeSpec(SunnyWeatherMultiJvmSpec) diff --git a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala index 1cf62daf1c..bd4d5d2c52 100644 --- a/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/AccrualFailureDetectorSpec.scala @@ -28,7 +28,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" } "return phi value of 0.0 on startup for each address, when no heartbeats" in { - val fd = new AccrualFailureDetector(system, conn) + val fd = new AccrualFailureDetector(system) fd.phi(conn) must be(0.0) fd.phi(conn2) must be(0.0) } @@ -36,7 +36,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "return phi based on guess when only one heartbeat" in { // 1 second ticks val timeInterval = Vector.fill(30)(1000L) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -52,7 +52,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "return phi value using first interval after second heartbeat" in { val timeInterval = List[Long](0, 100, 100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -63,7 +63,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after a series of successful heartbeats" in { val timeInterval = List[Long](0, 1000, 100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -75,7 +75,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead after explicit removal of connection" in { val timeInterval = List[Long](0, 1000, 100, 100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) @@ -89,7 +89,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available after explicit removal of connection and receiving heartbeat again" in { val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) - val fd = new AccrualFailureDetector(system, conn, + val fd = new AccrualFailureDetector(system, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -114,7 +114,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as dead if heartbeat are missed" in { val timeInterval = List[Long](0, 1000, 100, 100, 5000) val ft = fakeTimeGenerator(timeInterval) - val fd = new AccrualFailureDetector(system, conn, threshold = 3, + val fd = new AccrualFailureDetector(system, threshold = 3, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -127,7 +127,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100) - val fd = new AccrualFailureDetector(system, conn, threshold = 3, + val fd = new AccrualFailureDetector(system, threshold = 3, timeMachine = fakeTimeGenerator(timeInterval)) fd.heartbeat(conn) //0 @@ -144,7 +144,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec(""" "use maxSampleSize heartbeats" in { val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000) - val fd = new AccrualFailureDetector(system, conn, maxSampleSize = 3, + val fd = new AccrualFailureDetector(system, maxSampleSize = 3, timeMachine = fakeTimeGenerator(timeInterval)) // 100 ms interval diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 3a96451466..6c9023d410 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -18,6 +18,7 @@ class ClusterConfigSpec extends AkkaSpec { import settings._ FailureDetectorThreshold must be(8) FailureDetectorMaxSampleSize must be(1000) + FailureDetectorImplementationClass must be(None) NodeToJoin must be(None) PeriodicTasksInitialDelay must be(1 seconds) GossipInterval must be(1 second) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 9b1a9706af..112da9d0c0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -33,7 +33,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { val deterministicRandom = new AtomicInteger - val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem]) { + val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], new FailureDetectorPuppet(system)) { override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = { if (addresses.isEmpty) None @@ -67,9 +67,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter { @volatile var _unavailable: Set[Address] = Set.empty - override val failureDetector = new AccrualFailureDetector( - system, selfAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) { - + override val failureDetector = new FailureDetectorPuppet(system) { override def isAvailable(connection: Address): Boolean = { if (_unavailable.contains(connection)) false else super.isAvailable(connection) diff --git a/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala new file mode 100644 index 0000000000..f35bca381d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/FailureDetectorPuppet.scala @@ -0,0 +1,60 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.cluster + +import akka.actor.{ Address, ActorSystem } +import akka.event.{ Logging, LogSource } + +/** + * User controllable "puppet" failure detector. + */ +class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector { + import java.util.concurrent.ConcurrentHashMap + + def this(system: ActorSystem) = this(system, new ClusterSettings(system.settings.config, system.name)) + + trait Status + object Up extends Status + object Down extends Status + + implicit private val logSource: LogSource[AnyRef] = new LogSource[AnyRef] { + def genString(o: AnyRef): String = o.getClass.getName + override def getClazz(o: AnyRef): Class[_] = o.getClass + } + + private val log = Logging(system, this) + + private val connections = new ConcurrentHashMap[Address, Status] + + def markNodeAsUnavailable(connection: Address): this.type = { + connections.put(connection, Down) + this + } + + def markNodeAsAvailable(connection: Address): this.type = { + connections.put(connection, Up) + this + } + + def isAvailable(connection: Address): Boolean = connections.get(connection) match { + case null ⇒ + log.debug("Adding cluster node [{}]", connection) + connections.put(connection, Up) + true + case Up ⇒ + log.debug("isAvailable: Cluster node IS NOT available [{}]", connection) + true + case Down ⇒ + log.debug("isAvailable: Cluster node IS available [{}]", connection) + false + } + + def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection) + + def remove(connection: Address): Unit = { + log.debug("Removing cluster node [{}]", connection) + connections.remove(connection) + } +}