Merge branch 'master' into wip-2214-heartbeats-patriknw

Conflicts:
	akka-cluster/src/main/scala/akka/cluster/AccrualFailureDetector.scala
	akka-cluster/src/main/scala/akka/cluster/Cluster.scala
This commit is contained in:
Patrik Nordwall 2012-06-11 22:27:08 +02:00
commit a7d2be10eb
31 changed files with 384 additions and 230 deletions

View file

@ -36,7 +36,6 @@ akka {
# how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring?
unreachable-nodes-reaper-interval = 1s unreachable-nodes-reaper-interval = 1s
# accrual failure detection config
failure-detector { failure-detector {
# defines the failure detector threshold # defines the failure detector threshold
@ -46,6 +45,8 @@ akka {
# actual crashes # actual crashes
threshold = 8 threshold = 8
implementation-class = ""
max-sample-size = 1000 max-sample-size = 1000
} }
} }

View file

@ -4,7 +4,8 @@
package akka.cluster package akka.cluster
import akka.actor.{ ActorSystem, Address } import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
import akka.remote.RemoteActorRefProvider
import akka.event.Logging import akka.event.Logging
import scala.collection.immutable.Map import scala.collection.immutable.Map
@ -23,11 +24,20 @@ import java.util.concurrent.atomic.AtomicReference
* Default threshold is 8, but can be configured in the Akka config. * Default threshold is 8, but can be configured in the Akka config.
*/ */
class AccrualFailureDetector( class AccrualFailureDetector(
system: ActorSystem, val system: ActorSystem,
address: Address,
val threshold: Int = 8, val threshold: Int = 8,
val maxSampleSize: Int = 1000, val maxSampleSize: Int = 1000,
val timeMachine: () Long = System.currentTimeMillis) { val timeMachine: () Long = System.currentTimeMillis) extends FailureDetector {
def this(
system: ActorSystem,
settings: ClusterSettings,
timeMachine: () Long = System.currentTimeMillis) =
this(
system,
settings.FailureDetectorThreshold,
settings.FailureDetectorMaxSampleSize,
timeMachine)
private final val PhiFactor = 1.0 / math.log(10.0) private final val PhiFactor = 1.0 / math.log(10.0)
@ -66,8 +76,7 @@ class AccrualFailureDetector(
*/ */
@tailrec @tailrec
final def heartbeat(connection: Address) { final def heartbeat(connection: Address) {
// FIXME change to debug log level, when failure detector is stable log.debug("Heartbeat from connection [{}] ", connection)
log.info("Node [{}] - Heartbeat from connection [{}] ", address, connection)
val oldState = state.get val oldState = state.get
val latestTimestamp = oldState.timestamps.get(connection) val latestTimestamp = oldState.timestamps.get(connection)
@ -157,7 +166,7 @@ class AccrualFailureDetector(
} }
// FIXME change to debug log level, when failure detector is stable // FIXME change to debug log level, when failure detector is stable
log.info("Node [{}] - Phi value [{}] and threshold [{}] for connection [{}] ", address, phi, threshold, connection) log.info("Phi value [{}] and threshold [{}] for connection [{}] ", phi, threshold, connection)
phi phi
} }
@ -165,8 +174,8 @@ class AccrualFailureDetector(
* Removes the heartbeat management for a connection. * Removes the heartbeat management for a connection.
*/ */
@tailrec @tailrec
final def remove(connection: Address) { final def remove(connection: Address): Unit = {
log.debug("Node [{}] - Remove connection [{}] ", address, connection) log.debug("Remove connection [{}] ", connection)
val oldState = state.get val oldState = state.get
if (oldState.failureStats.contains(connection)) { if (oldState.failureStats.contains(connection)) {

View file

@ -317,7 +317,21 @@ object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
override def lookup = Cluster override def lookup = Cluster
override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system) override def createExtension(system: ExtendedActorSystem): Cluster = {
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
val failureDetector = clusterSettings.FailureDetectorImplementationClass match {
case None new AccrualFailureDetector(system, clusterSettings)
case Some(fqcn)
system.dynamicAccess.createInstanceFor[FailureDetector](
fqcn, Seq((classOf[ActorSystem], system), (classOf[ClusterSettings], clusterSettings))) match {
case Right(fd) fd
case Left(e) throw new ConfigurationException("Could not create custom failure detector [" + fqcn + "] due to:" + e.toString)
}
}
new Cluster(system, failureDetector)
}
} }
/** /**
@ -360,7 +374,7 @@ trait ClusterNodeMBean {
* if (Cluster(system).isLeader) { ... } * if (Cluster(system).isLeader) { ... }
* }}} * }}}
*/ */
class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) extends Extension { clusterNode
/** /**
* Represents the state for this Cluster. Implemented using optimistic lockless concurrency. * Represents the state for this Cluster. Implemented using optimistic lockless concurrency.
@ -382,9 +396,6 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
val selfAddress = remote.transport.address val selfAddress = remote.transport.address
private val selfHeartbeat = Heartbeat(selfAddress) private val selfHeartbeat = Heartbeat(selfAddress)
val failureDetector = new AccrualFailureDetector(
system, selfAddress, FailureDetectorThreshold, FailureDetectorMaxSampleSize)
private val vclockNode = VectorClock.Node(selfAddress.toString) private val vclockNode = VectorClock.Node(selfAddress.toString)
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)

View file

@ -15,6 +15,10 @@ class ClusterSettings(val config: Config, val systemName: String) {
import config._ import config._
val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold") val FailureDetectorThreshold = getInt("akka.cluster.failure-detector.threshold")
val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size") val FailureDetectorMaxSampleSize = getInt("akka.cluster.failure-detector.max-sample-size")
val FailureDetectorImplementationClass: Option[String] = getString("akka.cluster.failure-detector.implementation-class") match {
case "" None
case fqcn Some(fqcn)
}
val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match { val NodeToJoin: Option[Address] = getString("akka.cluster.node-to-join") match {
case "" None case "" None
case AddressFromURIString(addr) Some(addr) case AddressFromURIString(addr) Some(addr)

View file

@ -0,0 +1,28 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.Address
/**
* Interface for Akka failure detectors.
*/
trait FailureDetector {
/**
* Returns true if the connection is considered to be up and healthy and returns false otherwise.
*/
def isAvailable(connection: Address): Boolean
/**
* Records a heartbeat for a connection.
*/
def heartbeat(connection: Address): Unit
/**
* Removes the heartbeat management for a connection.
*/
def remove(connection: Address): Unit
}

View file

@ -18,12 +18,17 @@ object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class ClientDowningNodeThatIsUnreachableMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
class ClientDowningNodeThatIsUnreachableMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
class ClientDowningNodeThatIsUnreachableMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec class ClientDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
class ClientDowningNodeThatIsUnreachableSpec class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
class ClientDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
abstract class ClientDowningNodeThatIsUnreachableSpec
extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec)
with MultiNodeClusterSpec { with MultiNodeClusterSpec {
@ -38,6 +43,7 @@ class ClientDowningNodeThatIsUnreachableSpec
runOn(first) { runOn(first) {
// kill 'third' node // kill 'third' node
testConductor.shutdown(third, 0) testConductor.shutdown(third, 0)
markNodeAsUnavailable(thirdAddress)
// mark 'third' node as DOWN // mark 'third' node as DOWN
cluster.down(thirdAddress) cluster.down(thirdAddress)

View file

@ -18,12 +18,17 @@ object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class ClientDowningNodeThatIsUpMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy
class ClientDowningNodeThatIsUpMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy
class ClientDowningNodeThatIsUpMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy
class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec class ClientDowningNodeThatIsUpWithFailureDetectorPuppetMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with FailureDetectorPuppetStrategy
class ClientDowningNodeThatIsUpSpec class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy
class ClientDowningNodeThatIsUpWithAccrualFailureDetectorMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec with AccrualFailureDetectorStrategy
abstract class ClientDowningNodeThatIsUpSpec
extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec)
with MultiNodeClusterSpec { with MultiNodeClusterSpec {
@ -40,6 +45,8 @@ class ClientDowningNodeThatIsUpSpec
cluster.down(thirdAddress) cluster.down(thirdAddress)
testConductor.enter("down-third-node") testConductor.enter("down-third-node")
markNodeAsUnavailable(thirdAddress)
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress)) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(thirdAddress))
cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false) cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false)
} }

View file

@ -17,27 +17,29 @@ object ConvergenceMultiJvmSpec extends MultiNodeConfig {
val fourth = role("fourth") val fourth = role("fourth")
commonConfig(debugConfig(on = false). commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString(""" withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")).
akka.cluster {
failure-detector.threshold = 4
}
""")).
withFallback(MultiNodeClusterSpec.clusterConfig)) withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class ConvergenceMultiJvmNode1 extends ConvergenceSpec class ConvergenceWithFailureDetectorPuppetMultiJvmNode1 extends ConvergenceSpec with FailureDetectorPuppetStrategy
class ConvergenceMultiJvmNode2 extends ConvergenceSpec class ConvergenceWithFailureDetectorPuppetMultiJvmNode2 extends ConvergenceSpec with FailureDetectorPuppetStrategy
class ConvergenceMultiJvmNode3 extends ConvergenceSpec class ConvergenceWithFailureDetectorPuppetMultiJvmNode3 extends ConvergenceSpec with FailureDetectorPuppetStrategy
class ConvergenceMultiJvmNode4 extends ConvergenceSpec class ConvergenceWithFailureDetectorPuppetMultiJvmNode4 extends ConvergenceSpec with FailureDetectorPuppetStrategy
class ConvergenceWithAccrualFailureDetectorMultiJvmNode1 extends ConvergenceSpec with AccrualFailureDetectorStrategy
class ConvergenceWithAccrualFailureDetectorMultiJvmNode2 extends ConvergenceSpec with AccrualFailureDetectorStrategy
class ConvergenceWithAccrualFailureDetectorMultiJvmNode3 extends ConvergenceSpec with AccrualFailureDetectorStrategy
class ConvergenceWithAccrualFailureDetectorMultiJvmNode4 extends ConvergenceSpec with AccrualFailureDetectorStrategy
abstract class ConvergenceSpec abstract class ConvergenceSpec
extends MultiNodeSpec(ConvergenceMultiJvmSpec) extends MultiNodeSpec(ConvergenceMultiJvmSpec)
with MultiNodeClusterSpec { with MultiNodeClusterSpec {
import ConvergenceMultiJvmSpec._ import ConvergenceMultiJvmSpec._
"A cluster of 3 members" must { "A cluster of 3 members" must {
"reach initial convergence" taggedAs LongRunningTest in { "reach initial convergence" taggedAs LongRunningTest ignore {
awaitClusterUp(first, second, third) awaitClusterUp(first, second, third)
runOn(fourth) { runOn(fourth) {
@ -47,13 +49,14 @@ abstract class ConvergenceSpec
testConductor.enter("after-1") testConductor.enter("after-1")
} }
"not reach convergence while any nodes are unreachable" taggedAs LongRunningTest in { "not reach convergence while any nodes are unreachable" taggedAs LongRunningTest ignore {
val thirdAddress = node(third).address val thirdAddress = node(third).address
testConductor.enter("before-shutdown") testConductor.enter("before-shutdown")
runOn(first) { runOn(first) {
// kill 'third' node // kill 'third' node
testConductor.shutdown(third, 0) testConductor.shutdown(third, 0)
markNodeAsUnavailable(thirdAddress)
} }
runOn(first, second) { runOn(first, second) {
@ -78,7 +81,7 @@ abstract class ConvergenceSpec
testConductor.enter("after-2") testConductor.enter("after-2")
} }
"not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest in { "not move a new joining node to Up while there is no convergence" taggedAs LongRunningTest ignore {
runOn(fourth) { runOn(fourth) {
// try to join // try to join
cluster.join(node(first).address) cluster.join(node(first).address)

View file

@ -0,0 +1,61 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.Address
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
/**
* Base trait for all failure detector strategies.
*/
trait FailureDetectorStrategy {
/**
* Get or create the FailureDetector to be used in the cluster node.
* To be defined by subclass.
*/
def failureDetector: FailureDetector
/**
* Marks a node as available in the failure detector.
* To be defined by subclass.
*/
def markNodeAsAvailable(address: Address): Unit
/**
* Marks a node as unavailable in the failure detector.
* To be defined by subclass.
*/
def markNodeAsUnavailable(address: Address): Unit
}
/**
* Defines a FailureDetectorPuppet-based FailureDetectorStrategy.
*/
trait FailureDetectorPuppetStrategy extends FailureDetectorStrategy { self: MultiNodeSpec
/**
* The puppet instance. Separated from 'failureDetector' field so we don't have to cast when using the puppet specific methods.
*/
private val puppet = new FailureDetectorPuppet(system)
override def failureDetector: FailureDetector = puppet
override def markNodeAsAvailable(address: Address): Unit = puppet markNodeAsAvailable address
override def markNodeAsUnavailable(address: Address): Unit = puppet markNodeAsUnavailable address
}
/**
* Defines a AccrualFailureDetector-based FailureDetectorStrategy.
*/
trait AccrualFailureDetectorStrategy extends FailureDetectorStrategy { self: MultiNodeSpec
override val failureDetector: FailureDetector = new AccrualFailureDetector(system, new ClusterSettings(system.settings.config, system.name))
override def markNodeAsAvailable(address: Address): Unit = { /* no-op */ }
override def markNodeAsUnavailable(address: Address): Unit = { /* no-op */ }
}

View file

@ -19,12 +19,14 @@ object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig {
withFallback(MultiNodeClusterSpec.clusterConfig)) withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class GossipingAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec class GossipingWithAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
class GossipingAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec class GossipingWithAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
class GossipingAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec class GossipingWithAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec with AccrualFailureDetectorStrategy
abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec) abstract class GossipingAccrualFailureDetectorSpec
extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec)
with MultiNodeClusterSpec { with MultiNodeClusterSpec {
import GossipingAccrualFailureDetectorMultiJvmSpec._ import GossipingAccrualFailureDetectorMultiJvmSpec._
lazy val firstAddress = node(first).address lazy val firstAddress = node(first).address

View file

@ -20,12 +20,12 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig)) commonConfig(debugConfig(on = true).withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec with FailureDetectorPuppetStrategy
abstract class JoinTwoClustersSpec abstract class JoinTwoClustersSpec
extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec)

View file

@ -7,7 +7,7 @@ import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.actor.Address import akka.actor._
import akka.util.duration._ import akka.util.duration._
object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig { object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig {
@ -16,22 +16,22 @@ object LeaderDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig {
val third = role("third") val third = role("third")
val fourth = role("fourth") val fourth = role("fourth")
commonConfig(debugConfig(on = true). commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString(""" withFallback(ConfigFactory.parseString("akka.cluster.auto-down = on")).
akka.cluster {
auto-down = on
failure-detector.threshold = 4
}
""")).
withFallback(MultiNodeClusterSpec.clusterConfig)) withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class LeaderDowningNodeThatIsUnreachableMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
class LeaderDowningNodeThatIsUnreachableMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
class LeaderDowningNodeThatIsUnreachableMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
class LeaderDowningNodeThatIsUnreachableMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec class LeaderDowningNodeThatIsUnreachableWithFailureDetectorPuppetMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with FailureDetectorPuppetStrategy
class LeaderDowningNodeThatIsUnreachableSpec class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode1 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode2 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode3 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
class LeaderDowningNodeThatIsUnreachableWithAccrualFailureDetectorMultiJvmNode4 extends LeaderDowningNodeThatIsUnreachableSpec with AccrualFailureDetectorStrategy
abstract class LeaderDowningNodeThatIsUnreachableSpec
extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec) extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec)
with MultiNodeClusterSpec { with MultiNodeClusterSpec {
@ -40,14 +40,17 @@ class LeaderDowningNodeThatIsUnreachableSpec
"The Leader in a 4 node cluster" must { "The Leader in a 4 node cluster" must {
"be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in { "be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in {
val fourthAddress = node(fourth).address
awaitClusterUp(first, second, third, fourth) awaitClusterUp(first, second, third, fourth)
val fourthAddress = node(fourth).address
runOn(first) { runOn(first) {
// kill 'fourth' node // kill 'fourth' node
testConductor.shutdown(fourth, 0) testConductor.shutdown(fourth, 0)
testConductor.enter("down-fourth-node") testConductor.enter("down-fourth-node")
// mark the node as unreachable in the failure detector
markNodeAsUnavailable(fourthAddress)
// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---
awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds) awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Seq(fourthAddress), 30.seconds)
@ -68,13 +71,16 @@ class LeaderDowningNodeThatIsUnreachableSpec
"be able to DOWN a 'middle' node that is UNREACHABLE" taggedAs LongRunningTest in { "be able to DOWN a 'middle' node that is UNREACHABLE" taggedAs LongRunningTest in {
val secondAddress = node(second).address val secondAddress = node(second).address
testConductor.enter("before-down-second-node")
testConductor.enter("before-down-second-node")
runOn(first) { runOn(first) {
// kill 'second' node // kill 'second' node
testConductor.shutdown(second, 0) testConductor.shutdown(second, 0)
testConductor.enter("down-second-node") testConductor.enter("down-second-node")
// mark the node as unreachable in the failure detector
markNodeAsUnavailable(secondAddress)
// --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE ---
awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds)

View file

@ -19,11 +19,17 @@ object LeaderElectionMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec class LeaderElectionWithFailureDetectorPuppetMultiJvmNode1 extends LeaderElectionSpec with FailureDetectorPuppetStrategy
class LeaderElectionMultiJvmNode2 extends LeaderElectionSpec class LeaderElectionWithFailureDetectorPuppetMultiJvmNode2 extends LeaderElectionSpec with FailureDetectorPuppetStrategy
class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec class LeaderElectionWithFailureDetectorPuppetMultiJvmNode3 extends LeaderElectionSpec with FailureDetectorPuppetStrategy
class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec class LeaderElectionWithFailureDetectorPuppetMultiJvmNode4 extends LeaderElectionSpec with FailureDetectorPuppetStrategy
class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec class LeaderElectionWithFailureDetectorPuppetMultiJvmNode5 extends LeaderElectionSpec with FailureDetectorPuppetStrategy
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode1 extends LeaderElectionSpec with AccrualFailureDetectorStrategy
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode2 extends LeaderElectionSpec with AccrualFailureDetectorStrategy
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode3 extends LeaderElectionSpec with AccrualFailureDetectorStrategy
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode4 extends LeaderElectionSpec with AccrualFailureDetectorStrategy
class LeaderElectionWithAccrualFailureDetectorMultiJvmNode5 extends LeaderElectionSpec with AccrualFailureDetectorStrategy
abstract class LeaderElectionSpec abstract class LeaderElectionSpec
extends MultiNodeSpec(LeaderElectionMultiJvmSpec) extends MultiNodeSpec(LeaderElectionMultiJvmSpec)
@ -57,9 +63,11 @@ abstract class LeaderElectionSpec
myself match { myself match {
case `controller` case `controller`
val leaderAddress = node(leader).address
testConductor.enter("before-shutdown") testConductor.enter("before-shutdown")
testConductor.shutdown(leader, 0) testConductor.shutdown(leader, 0)
testConductor.enter("after-shutdown", "after-down", "completed") testConductor.enter("after-shutdown", "after-down", "completed")
markNodeAsUnavailable(leaderAddress)
case `leader` case `leader`
testConductor.enter("before-shutdown", "after-shutdown") testConductor.enter("before-shutdown", "after-shutdown")
@ -71,6 +79,7 @@ abstract class LeaderElectionSpec
// user marks the shutdown leader as DOWN // user marks the shutdown leader as DOWN
cluster.down(leaderAddress) cluster.down(leaderAddress)
testConductor.enter("after-down", "completed") testConductor.enter("after-down", "completed")
markNodeAsUnavailable(leaderAddress)
case _ if remainingRoles.contains(myself) case _ if remainingRoles.contains(myself)
// remaining cluster nodes, not shutdown // remaining cluster nodes, not shutdown

View file

@ -27,9 +27,9 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
.withFallback(MultiNodeClusterSpec.clusterConfig))) .withFallback(MultiNodeClusterSpec.clusterConfig)))
} }
class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy
class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy
class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec with FailureDetectorPuppetStrategy
abstract class MembershipChangeListenerExitingSpec abstract class MembershipChangeListenerExitingSpec
extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec)

View file

@ -17,16 +17,12 @@ object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig {
commonConfig( commonConfig(
debugConfig(on = false) debugConfig(on = false)
.withFallback(ConfigFactory.parseString(""" .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval to allow time checking for JOIN before leader moves it to UP
akka.cluster { .withFallback(MultiNodeClusterSpec.clusterConfig)))
leader-actions-interval = 5 s # increase the leader action task interval to allow time checking for JOIN before leader moves it to UP
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))
} }
class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy
class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec with FailureDetectorPuppetStrategy
abstract class MembershipChangeListenerJoinSpec abstract class MembershipChangeListenerJoinSpec
extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec)

View file

@ -24,9 +24,9 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
.withFallback(MultiNodeClusterSpec.clusterConfig)) .withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy
class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy
class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec with FailureDetectorPuppetStrategy
abstract class MembershipChangeListenerLeavingSpec abstract class MembershipChangeListenerLeavingSpec
extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec)

View file

@ -1,77 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
}
class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec
class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec
class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec
abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec)
with MultiNodeClusterSpec {
import MembershipChangeListenerMultiJvmSpec._
lazy val firstAddress = node(first).address
lazy val secondAddress = node(second).address
"A set of connected cluster systems" must {
"(when two nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
awaitClusterUp(first)
runOn(first, second) {
val latch = TestLatch()
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
latch.countDown()
}
})
testConductor.enter("listener-1-registered")
cluster.join(firstAddress)
latch.await
}
runOn(third) {
testConductor.enter("listener-1-registered")
}
testConductor.enter("after-1")
}
"(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
val latch = TestLatch()
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 3 && members.forall(_.status == MemberStatus.Up))
latch.countDown()
}
})
testConductor.enter("listener-2-registered")
runOn(third) {
cluster.join(firstAddress)
}
latch.await
testConductor.enter("after-2")
}
}
}

View file

@ -5,21 +5,21 @@ package akka.cluster
import scala.collection.immutable.SortedSet import scala.collection.immutable.SortedSet
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.BeforeAndAfter
import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
import akka.util.duration._
object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig {
val first = role("first") val first = role("first")
val second = role("second") val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy
class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy
class MembershipChangeListenerUpMultiJvmNode3 extends MembershipChangeListenerUpSpec with FailureDetectorPuppetStrategy
abstract class MembershipChangeListenerUpSpec abstract class MembershipChangeListenerUpSpec
extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec) extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec)
@ -30,29 +30,50 @@ abstract class MembershipChangeListenerUpSpec
lazy val firstAddress = node(first).address lazy val firstAddress = node(first).address
lazy val secondAddress = node(second).address lazy val secondAddress = node(second).address
"A registered MembershipChangeListener" must { "A set of connected cluster systems" must {
"be notified when new node is marked as UP by the leader" taggedAs LongRunningTest in {
runOn(first) { "(when two nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
val upLatch = TestLatch()
awaitClusterUp(first)
runOn(first, second) {
val latch = TestLatch()
cluster.registerListener(new MembershipChangeListener { cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) { def notify(members: SortedSet[Member]) {
if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
upLatch.countDown() latch.countDown()
} }
}) })
testConductor.enter("registered-listener") testConductor.enter("listener-1-registered")
cluster.join(firstAddress)
upLatch.await latch.await
awaitUpConvergence(numberOfMembers = 2)
} }
runOn(second) { runOn(third) {
testConductor.enter("registered-listener") testConductor.enter("listener-1-registered")
}
testConductor.enter("after-1")
}
"(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
val latch = TestLatch()
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 3 && members.forall(_.status == MemberStatus.Up))
latch.countDown()
}
})
testConductor.enter("listener-2-registered")
runOn(third) {
cluster.join(firstAddress) cluster.join(firstAddress)
} }
testConductor.enter("after") latch.await
testConductor.enter("after-2")
} }
} }
} }

View file

@ -5,7 +5,7 @@ package akka.cluster
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.Address import akka.actor.{Address, ExtendedActorSystem}
import akka.remote.testconductor.RoleName import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.MultiNodeSpec
import akka.testkit._ import akka.testkit._
@ -29,14 +29,19 @@ object MultiNodeClusterSpec {
""") """)
} }
trait MultiNodeClusterSpec { self: MultiNodeSpec trait MultiNodeClusterSpec extends FailureDetectorStrategy { self: MultiNodeSpec
override def initialParticipants = roles.size override def initialParticipants = roles.size
/** /**
* Get or create a cluster node using 'Cluster(system)' extension. * The cluster node instance. Needs to be lazily created.
*/ */
def cluster: Cluster = Cluster(system) private lazy val clusterNode = new Cluster(system.asInstanceOf[ExtendedActorSystem], failureDetector)
/**
* Get the cluster node to use.
*/
def cluster: Cluster = clusterNode
/** /**
* Use this method instead of 'cluster.self' * Use this method instead of 'cluster.self'
@ -49,9 +54,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
* nodes (roles). First node will be started first * nodes (roles). First node will be started first
* and others will join the first. * and others will join the first.
*/ */
def startCluster(roles: RoleName*): Unit = { def startCluster(roles: RoleName*): Unit = awaitStartCluster(false, roles.toSeq)
awaitStartCluster(false, roles.toSeq)
}
/** /**
* Initialize the cluster of the specified member * Initialize the cluster of the specified member

View file

@ -16,16 +16,12 @@ object NodeJoinMultiJvmSpec extends MultiNodeConfig {
commonConfig( commonConfig(
debugConfig(on = false) debugConfig(on = false)
.withFallback(ConfigFactory.parseString(""" .withFallback(ConfigFactory.parseString("akka.cluster.leader-actions-interval = 5 s") // increase the leader action task interval
akka.cluster { .withFallback(MultiNodeClusterSpec.clusterConfig)))
leader-actions-interval = 5 s # increase the leader action task interval
}
""")
.withFallback(MultiNodeClusterSpec.clusterConfig)))
} }
class NodeJoinMultiJvmNode1 extends NodeJoinSpec class NodeJoinMultiJvmNode1 extends NodeJoinSpec with FailureDetectorPuppetStrategy
class NodeJoinMultiJvmNode2 extends NodeJoinSpec class NodeJoinMultiJvmNode2 extends NodeJoinSpec with FailureDetectorPuppetStrategy
abstract class NodeJoinSpec abstract class NodeJoinSpec
extends MultiNodeSpec(NodeJoinMultiJvmSpec) extends MultiNodeSpec(NodeJoinMultiJvmSpec)

View file

@ -18,9 +18,9 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec with AccrualFailureDetectorStrategy
abstract class NodeLeavingAndExitingAndBeingRemovedSpec abstract class NodeLeavingAndExitingAndBeingRemovedSpec
extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec) extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec)

View file

@ -26,9 +26,9 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
.withFallback(MultiNodeClusterSpec.clusterConfig))) .withFallback(MultiNodeClusterSpec.clusterConfig)))
} }
class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy
class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy
class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec with FailureDetectorPuppetStrategy
abstract class NodeLeavingAndExitingSpec abstract class NodeLeavingAndExitingSpec
extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec)

View file

@ -16,18 +16,18 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig {
commonConfig( commonConfig(
debugConfig(on = false) debugConfig(on = false)
.withFallback(ConfigFactory.parseString(""" .withFallback(ConfigFactory.parseString("akka.cluster.unreachable-nodes-reaper-frequency = 30 s"))
akka.cluster.unreachable-nodes-reaper-frequency = 30 s
"""))
.withFallback(MultiNodeClusterSpec.clusterConfig)) .withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class NodeLeavingMultiJvmNode1 extends NodeLeavingSpec class NodeLeavingMultiJvmNode1 extends NodeLeavingSpec with FailureDetectorPuppetStrategy
class NodeLeavingMultiJvmNode2 extends NodeLeavingSpec class NodeLeavingMultiJvmNode2 extends NodeLeavingSpec with FailureDetectorPuppetStrategy
class NodeLeavingMultiJvmNode3 extends NodeLeavingSpec class NodeLeavingMultiJvmNode3 extends NodeLeavingSpec with FailureDetectorPuppetStrategy
abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec) abstract class NodeLeavingSpec
extends MultiNodeSpec(NodeLeavingMultiJvmSpec)
with MultiNodeClusterSpec { with MultiNodeClusterSpec {
import NodeLeavingMultiJvmSpec._ import NodeLeavingMultiJvmSpec._
lazy val firstAddress = node(first).address lazy val firstAddress = node(first).address

View file

@ -16,9 +16,9 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec with FailureDetectorPuppetStrategy
class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec with FailureDetectorPuppetStrategy
class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec with FailureDetectorPuppetStrategy
abstract class NodeMembershipSpec abstract class NodeMembershipSpec
extends MultiNodeSpec(NodeMembershipMultiJvmSpec) extends MultiNodeSpec(NodeMembershipMultiJvmSpec)

View file

@ -19,8 +19,8 @@ object NodeUpMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
} }
class NodeUpMultiJvmNode1 extends NodeUpSpec class NodeUpMultiJvmNode1 extends NodeUpSpec with FailureDetectorPuppetStrategy
class NodeUpMultiJvmNode2 extends NodeUpSpec class NodeUpMultiJvmNode2 extends NodeUpSpec with FailureDetectorPuppetStrategy
abstract class NodeUpSpec abstract class NodeUpSpec
extends MultiNodeSpec(NodeUpMultiJvmSpec) extends MultiNodeSpec(NodeUpMultiJvmSpec)

View file

@ -16,7 +16,7 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false). commonConfig(debugConfig(on = false).
withFallback(ConfigFactory.parseString(""" withFallback(ConfigFactory.parseString("""
akka.cluster { akka.cluster {
auto-down = on auto-down = on
failure-detector.threshold = 4 failure-detector.threshold = 4
} }
""")). """)).
@ -24,10 +24,16 @@ object SingletonClusterMultiJvmSpec extends MultiNodeConfig {
} }
class SingletonClusterMultiJvmNode1 extends SingletonClusterSpec class SingletonClusterWithFailureDetectorPuppetMultiJvmNode1 extends SingletonClusterSpec with FailureDetectorPuppetStrategy
class SingletonClusterMultiJvmNode2 extends SingletonClusterSpec class SingletonClusterWithFailureDetectorPuppetMultiJvmNode2 extends SingletonClusterSpec with FailureDetectorPuppetStrategy
class SingletonClusterWithAccrualFailureDetectorMultiJvmNode1 extends SingletonClusterSpec with AccrualFailureDetectorStrategy
class SingletonClusterWithAccrualFailureDetectorMultiJvmNode2 extends SingletonClusterSpec with AccrualFailureDetectorStrategy
abstract class SingletonClusterSpec
extends MultiNodeSpec(SingletonClusterMultiJvmSpec)
with MultiNodeClusterSpec {
abstract class SingletonClusterSpec extends MultiNodeSpec(SingletonClusterMultiJvmSpec) with MultiNodeClusterSpec {
import SingletonClusterMultiJvmSpec._ import SingletonClusterMultiJvmSpec._
"A cluster of 2 nodes" must { "A cluster of 2 nodes" must {
@ -44,6 +50,9 @@ abstract class SingletonClusterSpec extends MultiNodeSpec(SingletonClusterMultiJ
runOn(first) { runOn(first) {
val secondAddress = node(second).address val secondAddress = node(second).address
testConductor.shutdown(second, 0) testConductor.shutdown(second, 0)
markNodeAsUnavailable(secondAddress)
awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds) awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Seq(secondAddress), 30.seconds)
cluster.isSingletonCluster must be(true) cluster.isSingletonCluster must be(true)
assertLeader(first) assertLeader(first)

View file

@ -21,18 +21,18 @@ object SunnyWeatherMultiJvmSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString(""" commonConfig(ConfigFactory.parseString("""
akka.cluster { akka.cluster {
gossip-interval = 400 ms gossip-interval = 400 ms
nr-of-deputy-nodes = 0 nr-of-deputy-nodes = 0
} }
akka.loglevel = INFO akka.loglevel = INFO
""")) """))
} }
class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec class SunnyWeatherMultiJvmNode1 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec class SunnyWeatherMultiJvmNode2 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec class SunnyWeatherMultiJvmNode3 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec class SunnyWeatherMultiJvmNode4 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec class SunnyWeatherMultiJvmNode5 extends SunnyWeatherSpec with FailureDetectorPuppetStrategy
abstract class SunnyWeatherSpec abstract class SunnyWeatherSpec
extends MultiNodeSpec(SunnyWeatherMultiJvmSpec) extends MultiNodeSpec(SunnyWeatherMultiJvmSpec)

View file

@ -28,7 +28,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
} }
"return phi value of 0.0 on startup for each address, when no heartbeats" in { "return phi value of 0.0 on startup for each address, when no heartbeats" in {
val fd = new AccrualFailureDetector(system, conn) val fd = new AccrualFailureDetector(system)
fd.phi(conn) must be(0.0) fd.phi(conn) must be(0.0)
fd.phi(conn2) must be(0.0) fd.phi(conn2) must be(0.0)
} }
@ -36,7 +36,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"return phi based on guess when only one heartbeat" in { "return phi based on guess when only one heartbeat" in {
// 1 second ticks // 1 second ticks
val timeInterval = Vector.fill(30)(1000L) val timeInterval = Vector.fill(30)(1000L)
val fd = new AccrualFailureDetector(system, conn, val fd = new AccrualFailureDetector(system,
timeMachine = fakeTimeGenerator(timeInterval)) timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) fd.heartbeat(conn)
@ -52,7 +52,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"return phi value using first interval after second heartbeat" in { "return phi value using first interval after second heartbeat" in {
val timeInterval = List[Long](0, 100, 100, 100) val timeInterval = List[Long](0, 100, 100, 100)
val fd = new AccrualFailureDetector(system, conn, val fd = new AccrualFailureDetector(system,
timeMachine = fakeTimeGenerator(timeInterval)) timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) fd.heartbeat(conn)
@ -63,7 +63,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as available after a series of successful heartbeats" in { "mark node as available after a series of successful heartbeats" in {
val timeInterval = List[Long](0, 1000, 100, 100) val timeInterval = List[Long](0, 1000, 100, 100)
val fd = new AccrualFailureDetector(system, conn, val fd = new AccrualFailureDetector(system,
timeMachine = fakeTimeGenerator(timeInterval)) timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) fd.heartbeat(conn)
@ -75,7 +75,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as dead after explicit removal of connection" in { "mark node as dead after explicit removal of connection" in {
val timeInterval = List[Long](0, 1000, 100, 100, 100) val timeInterval = List[Long](0, 1000, 100, 100, 100)
val fd = new AccrualFailureDetector(system, conn, val fd = new AccrualFailureDetector(system,
timeMachine = fakeTimeGenerator(timeInterval)) timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) fd.heartbeat(conn)
@ -89,7 +89,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as available after explicit removal of connection and receiving heartbeat again" in { "mark node as available after explicit removal of connection and receiving heartbeat again" in {
val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100) val timeInterval = List[Long](0, 1000, 100, 1100, 1100, 1100, 1100, 1100, 100)
val fd = new AccrualFailureDetector(system, conn, val fd = new AccrualFailureDetector(system,
timeMachine = fakeTimeGenerator(timeInterval)) timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0 fd.heartbeat(conn) //0
@ -114,7 +114,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as dead if heartbeat are missed" in { "mark node as dead if heartbeat are missed" in {
val timeInterval = List[Long](0, 1000, 100, 100, 5000) val timeInterval = List[Long](0, 1000, 100, 100, 5000)
val ft = fakeTimeGenerator(timeInterval) val ft = fakeTimeGenerator(timeInterval)
val fd = new AccrualFailureDetector(system, conn, threshold = 3, val fd = new AccrualFailureDetector(system, threshold = 3,
timeMachine = fakeTimeGenerator(timeInterval)) timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0 fd.heartbeat(conn) //0
@ -127,7 +127,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in { "mark node as available if it starts heartbeat again after being marked dead due to detection of failure" in {
val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100) val timeInterval = List[Long](0, 1000, 100, 1100, 5000, 100, 1000, 100, 100)
val fd = new AccrualFailureDetector(system, conn, threshold = 3, val fd = new AccrualFailureDetector(system, threshold = 3,
timeMachine = fakeTimeGenerator(timeInterval)) timeMachine = fakeTimeGenerator(timeInterval))
fd.heartbeat(conn) //0 fd.heartbeat(conn) //0
@ -144,7 +144,7 @@ class AccrualFailureDetectorSpec extends AkkaSpec("""
"use maxSampleSize heartbeats" in { "use maxSampleSize heartbeats" in {
val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000) val timeInterval = List[Long](0, 100, 100, 100, 100, 600, 1000, 1000, 1000, 1000, 1000)
val fd = new AccrualFailureDetector(system, conn, maxSampleSize = 3, val fd = new AccrualFailureDetector(system, maxSampleSize = 3,
timeMachine = fakeTimeGenerator(timeInterval)) timeMachine = fakeTimeGenerator(timeInterval))
// 100 ms interval // 100 ms interval

View file

@ -18,6 +18,7 @@ class ClusterConfigSpec extends AkkaSpec {
import settings._ import settings._
FailureDetectorThreshold must be(8) FailureDetectorThreshold must be(8)
FailureDetectorMaxSampleSize must be(1000) FailureDetectorMaxSampleSize must be(1000)
FailureDetectorImplementationClass must be(None)
NodeToJoin must be(None) NodeToJoin must be(None)
PeriodicTasksInitialDelay must be(1 seconds) PeriodicTasksInitialDelay must be(1 seconds)
GossipInterval must be(1 second) GossipInterval must be(1 second)

View file

@ -33,7 +33,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
val deterministicRandom = new AtomicInteger val deterministicRandom = new AtomicInteger
val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem]) { val cluster = new Cluster(system.asInstanceOf[ExtendedActorSystem], new FailureDetectorPuppet(system)) {
override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = { override def selectRandomNode(addresses: IndexedSeq[Address]): Option[Address] = {
if (addresses.isEmpty) None if (addresses.isEmpty) None
@ -67,9 +67,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with BeforeAndAfter {
@volatile @volatile
var _unavailable: Set[Address] = Set.empty var _unavailable: Set[Address] = Set.empty
override val failureDetector = new AccrualFailureDetector( override val failureDetector = new FailureDetectorPuppet(system) {
system, selfAddress, clusterSettings.FailureDetectorThreshold, clusterSettings.FailureDetectorMaxSampleSize) {
override def isAvailable(connection: Address): Boolean = { override def isAvailable(connection: Address): Boolean = {
if (_unavailable.contains(connection)) false if (_unavailable.contains(connection)) false
else super.isAvailable(connection) else super.isAvailable(connection)

View file

@ -0,0 +1,60 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.actor.{ Address, ActorSystem }
import akka.event.{ Logging, LogSource }
/**
* User controllable "puppet" failure detector.
*/
class FailureDetectorPuppet(system: ActorSystem, settings: ClusterSettings) extends FailureDetector {
import java.util.concurrent.ConcurrentHashMap
def this(system: ActorSystem) = this(system, new ClusterSettings(system.settings.config, system.name))
trait Status
object Up extends Status
object Down extends Status
implicit private val logSource: LogSource[AnyRef] = new LogSource[AnyRef] {
def genString(o: AnyRef): String = o.getClass.getName
override def getClazz(o: AnyRef): Class[_] = o.getClass
}
private val log = Logging(system, this)
private val connections = new ConcurrentHashMap[Address, Status]
def markNodeAsUnavailable(connection: Address): this.type = {
connections.put(connection, Down)
this
}
def markNodeAsAvailable(connection: Address): this.type = {
connections.put(connection, Up)
this
}
def isAvailable(connection: Address): Boolean = connections.get(connection) match {
case null
log.debug("Adding cluster node [{}]", connection)
connections.put(connection, Up)
true
case Up
log.debug("isAvailable: Cluster node IS NOT available [{}]", connection)
true
case Down
log.debug("isAvailable: Cluster node IS available [{}]", connection)
false
}
def heartbeat(connection: Address): Unit = log.debug("Heart beat from cluster node[{}]", connection)
def remove(connection: Address): Unit = {
log.debug("Removing cluster node [{}]", connection)
connections.remove(connection)
}
}