Merge pull request #512 from akka/wip-2162-membership-change-listener-spec-jboner
Ticket #2162 - Tests for trigging of MemberChangeListener on life-cycle state changes
This commit is contained in:
commit
2d2e19578c
24 changed files with 472 additions and 117 deletions
|
|
@ -25,13 +25,13 @@ akka {
|
|||
periodic-tasks-initial-delay = 1s
|
||||
|
||||
# how often should the node send out gossip information?
|
||||
gossip-frequency = 1s
|
||||
gossip-interval = 1s
|
||||
|
||||
# how often should the leader perform maintenance tasks?
|
||||
leader-actions-frequency = 1s
|
||||
leader-actions-interval = 1s
|
||||
|
||||
# how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring?
|
||||
unreachable-nodes-reaper-frequency = 1s
|
||||
unreachable-nodes-reaper-interval = 1s
|
||||
|
||||
# accrual failure detection config
|
||||
failure-detector {
|
||||
|
|
|
|||
|
|
@ -380,9 +380,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
private val vclockNode = VectorClock.Node(selfAddress.toString)
|
||||
|
||||
private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay
|
||||
private val gossipFrequency = clusterSettings.GossipFrequency
|
||||
private val leaderActionsFrequency = clusterSettings.LeaderActionsFrequency
|
||||
private val unreachableNodesReaperFrequency = clusterSettings.UnreachableNodesReaperFrequency
|
||||
private val gossipInterval = clusterSettings.GossipInterval
|
||||
private val leaderActionsInterval = clusterSettings.LeaderActionsInterval
|
||||
private val unreachableNodesReaperInterval = clusterSettings.UnreachableNodesReaperInterval
|
||||
|
||||
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
|
||||
|
||||
|
|
@ -424,17 +424,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
// ========================================================
|
||||
|
||||
// start periodic gossip to random nodes in cluster
|
||||
private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipFrequency) {
|
||||
private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipInterval) {
|
||||
gossip()
|
||||
}
|
||||
|
||||
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
|
||||
private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperFrequency) {
|
||||
private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperInterval) {
|
||||
reapUnreachableMembers()
|
||||
}
|
||||
|
||||
// start periodic leader action management (only applies for the current leader)
|
||||
private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsFrequency) {
|
||||
private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsInterval) {
|
||||
leaderActions()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,9 +20,9 @@ class ClusterSettings(val config: Config, val systemName: String) {
|
|||
case AddressFromURIString(addr) ⇒ Some(addr)
|
||||
}
|
||||
val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
|
||||
val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip-frequency"), MILLISECONDS)
|
||||
val LeaderActionsFrequency = Duration(getMilliseconds("akka.cluster.leader-actions-frequency"), MILLISECONDS)
|
||||
val UnreachableNodesReaperFrequency = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-frequency"), MILLISECONDS)
|
||||
val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
|
||||
val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS)
|
||||
val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
|
||||
val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
|
||||
val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes")
|
||||
val AutoDown = getBoolean("akka.cluster.auto-down")
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeT
|
|||
class ClientDowningNodeThatIsUnreachableSpec
|
||||
extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import ClientDowningNodeThatIsUnreachableMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 4
|
||||
|
|
@ -34,7 +35,7 @@ class ClientDowningNodeThatIsUnreachableSpec
|
|||
|
||||
"be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
awaitUpConvergence(numberOfMembers = 4)
|
||||
|
||||
val thirdAddress = node(third).address
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@ class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSp
|
|||
class ClientDowningNodeThatIsUpSpec
|
||||
extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import ClientDowningNodeThatIsUpMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 4
|
||||
|
|
@ -34,7 +35,7 @@ class ClientDowningNodeThatIsUpSpec
|
|||
|
||||
"be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
awaitUpConvergence(numberOfMembers = 4)
|
||||
|
||||
val thirdAddress = node(third).address
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig {
|
|||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold=4")).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
|
|
@ -38,7 +38,7 @@ abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(Gossipi
|
|||
"receive gossip heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in {
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -68,5 +68,4 @@ abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(Gossipi
|
|||
testConductor.enter("after-2")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -27,7 +27,10 @@ class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec
|
|||
class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec
|
||||
class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec
|
||||
|
||||
abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender {
|
||||
abstract class JoinTwoClustersSpec
|
||||
extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import JoinTwoClustersMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 6
|
||||
|
|
@ -41,7 +44,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm
|
|||
"be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in {
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(a1, b1, c1) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -75,7 +78,6 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm
|
|||
assertLeader(c1, c2)
|
||||
|
||||
testConductor.enter("four-members")
|
||||
|
||||
}
|
||||
|
||||
"be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in {
|
||||
|
|
@ -91,5 +93,4 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm
|
|||
testConductor.enter("six-members")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ class LeaderDowningNodeThatIsUnreachableMultiJvmNode4 extends LeaderDowningNodeT
|
|||
class LeaderDowningNodeThatIsUnreachableSpec
|
||||
extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 4
|
||||
|
|
@ -42,7 +43,7 @@ class LeaderDowningNodeThatIsUnreachableSpec
|
|||
|
||||
"be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
awaitUpConvergence(numberOfMembers = 4)
|
||||
|
||||
val fourthAddress = node(fourth).address
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ object LeaderElectionMultiJvmSpec extends MultiNodeConfig {
|
|||
val fourth = role("fourth")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
}
|
||||
|
||||
class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec
|
||||
|
|
@ -26,7 +25,10 @@ class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec
|
|||
class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec
|
||||
class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec
|
||||
|
||||
abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec {
|
||||
abstract class LeaderElectionSpec
|
||||
extends MultiNodeSpec(LeaderElectionMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import LeaderElectionMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 5
|
||||
|
|
@ -41,7 +43,7 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp
|
|||
"be able to 'elect' a single leader" taggedAs LongRunningTest in {
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -91,7 +93,6 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp
|
|||
testConductor.enter("completed")
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in {
|
||||
|
|
@ -102,5 +103,4 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp
|
|||
shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,84 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import scala.collection.immutable.SortedSet
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-interval = 5 s # increase the leader action task interval
|
||||
unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
}
|
||||
|
||||
class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec
|
||||
class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec
|
||||
class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec
|
||||
|
||||
abstract class MembershipChangeListenerExitingSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import MembershipChangeListenerExitingMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 3
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
lazy val thirdAddress = node(third).address
|
||||
|
||||
"A registered MembershipChangeListener" must {
|
||||
"be notified when new node is EXITING" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
runOn(second, third) {
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
awaitUpConvergence(numberOfMembers = 3)
|
||||
testConductor.enter("rest-started")
|
||||
|
||||
runOn(first) {
|
||||
testConductor.enter("registered-listener")
|
||||
cluster.leave(secondAddress)
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
testConductor.enter("registered-listener")
|
||||
}
|
||||
|
||||
runOn(third) {
|
||||
val exitingLatch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Exiting))
|
||||
exitingLatch.countDown()
|
||||
}
|
||||
})
|
||||
testConductor.enter("registered-listener")
|
||||
exitingLatch.await
|
||||
}
|
||||
|
||||
testConductor.enter("finished")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,71 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import scala.collection.immutable.SortedSet
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-interval = 5 s # increase the leader action task interval to allow time checking for JOIN before leader moves it to UP
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
}
|
||||
|
||||
class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec
|
||||
class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec
|
||||
|
||||
abstract class MembershipChangeListenerJoinSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import MembershipChangeListenerJoinMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
|
||||
"A registered MembershipChangeListener" must {
|
||||
"be notified when new node is JOINING" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
startClusterNode()
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
testConductor.enter("registered-listener")
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
runOn(first) {
|
||||
val joinLatch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
if (members.size == 2 && members.exists(_.status == MemberStatus.Joining)) // second node is not part of node ring anymore
|
||||
joinLatch.countDown()
|
||||
}
|
||||
})
|
||||
testConductor.enter("registered-listener")
|
||||
|
||||
joinLatch.await
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
testConductor.enter("after")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import scala.collection.immutable.SortedSet
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
|
||||
object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.leader-actions-interval = 5 s
|
||||
akka.cluster.unreachable-nodes-reaper-interval = 30 s
|
||||
"""))
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec
|
||||
class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec
|
||||
class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec
|
||||
|
||||
abstract class MembershipChangeListenerLeavingSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import MembershipChangeListenerLeavingMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 3
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
lazy val thirdAddress = node(third).address
|
||||
|
||||
"A registered MembershipChangeListener" must {
|
||||
"be notified when new node is LEAVING" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
runOn(second, third) {
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
awaitUpConvergence(numberOfMembers = 3)
|
||||
testConductor.enter("rest-started")
|
||||
|
||||
runOn(first) {
|
||||
testConductor.enter("registered-listener")
|
||||
cluster.leave(secondAddress)
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
testConductor.enter("registered-listener")
|
||||
}
|
||||
|
||||
runOn(third) {
|
||||
val latch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Leaving))
|
||||
latch.countDown()
|
||||
}
|
||||
})
|
||||
testConductor.enter("registered-listener")
|
||||
latch.await
|
||||
}
|
||||
|
||||
testConductor.enter("finished")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -15,7 +15,6 @@ object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig {
|
|||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
}
|
||||
|
||||
class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec
|
||||
|
|
@ -55,7 +54,6 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan
|
|||
}
|
||||
|
||||
testConductor.enter("after-1")
|
||||
|
||||
}
|
||||
|
||||
"(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
|
||||
|
|
@ -75,8 +73,6 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan
|
|||
cluster.convergence.isDefined must be(true)
|
||||
|
||||
testConductor.enter("after-2")
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import scala.collection.immutable.SortedSet
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec
|
||||
class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec
|
||||
|
||||
abstract class MembershipChangeListenerUpSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import MembershipChangeListenerUpMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
|
||||
"A registered MembershipChangeListener" must {
|
||||
"be notified when new node is marked as UP by the leader" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
startClusterNode()
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
testConductor.enter("registered-listener")
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
runOn(first) {
|
||||
val upLatch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
|
||||
upLatch.countDown()
|
||||
}
|
||||
})
|
||||
testConductor.enter("registered-listener")
|
||||
|
||||
upLatch.await
|
||||
awaitUpConvergence(numberOfMembers = 2)
|
||||
}
|
||||
|
||||
testConductor.enter("after")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -15,11 +15,11 @@ import akka.util.Duration
|
|||
object MultiNodeClusterSpec {
|
||||
def clusterConfig: Config = ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
auto-down = off
|
||||
gossip-frequency = 200 ms
|
||||
leader-actions-frequency = 200 ms
|
||||
unreachable-nodes-reaper-frequency = 200 ms
|
||||
periodic-tasks-initial-delay = 300 ms
|
||||
auto-down = off
|
||||
gossip-interval = 200 ms
|
||||
leader-actions-interval = 200 ms
|
||||
unreachable-nodes-reaper-interval = 200 ms
|
||||
periodic-tasks-initial-delay = 300 ms
|
||||
}
|
||||
akka.test {
|
||||
single-expect-default = 5 s
|
||||
|
|
@ -29,8 +29,16 @@ object MultiNodeClusterSpec {
|
|||
|
||||
trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
|
||||
|
||||
/**
|
||||
* Create a cluster node using 'Cluster(system)'.
|
||||
*/
|
||||
def cluster: Cluster = Cluster(system)
|
||||
|
||||
/**
|
||||
* Use this method instead of 'cluster.self'.
|
||||
*/
|
||||
def startClusterNode(): Unit = cluster.self
|
||||
|
||||
/**
|
||||
* Assert that the member addresses match the expected addresses in the
|
||||
* sort order used by the cluster.
|
||||
|
|
|
|||
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object NodeJoinMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-interval = 5 s # increase the leader action task interval
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
}
|
||||
|
||||
class NodeJoinMultiJvmNode1 extends NodeJoinSpec
|
||||
class NodeJoinMultiJvmNode2 extends NodeJoinSpec
|
||||
|
||||
abstract class NodeJoinSpec
|
||||
extends MultiNodeSpec(NodeJoinMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import NodeJoinMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
|
||||
"A cluster node" must {
|
||||
"join another cluster and get status JOINING - when sending a 'Join' command" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
startClusterNode()
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining })
|
||||
|
||||
testConductor.enter("after")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -22,8 +22,10 @@ class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndEx
|
|||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec
|
||||
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec
|
||||
|
||||
abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec)
|
||||
abstract class NodeLeavingAndExitingAndBeingRemovedSpec
|
||||
extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 3
|
||||
|
|
@ -39,7 +41,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(No
|
|||
"be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -19,8 +19,8 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
|
|||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-frequency = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
|
||||
unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set
|
||||
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
|
||||
unreachable-nodes-reaper-interval = 30 s
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
|
|
@ -30,8 +30,10 @@ class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec
|
|||
class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec
|
||||
class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec
|
||||
|
||||
abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec)
|
||||
abstract class NodeLeavingAndExitingSpec
|
||||
extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import NodeLeavingAndExitingMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 3
|
||||
|
|
@ -45,7 +47,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi
|
|||
"be moved to EXITING by the leader" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -63,7 +65,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi
|
|||
runOn(first, third) {
|
||||
|
||||
// 1. Verify that 'second' node is set to LEAVING
|
||||
// We have set the 'leader-actions-frequency' to 5 seconds to make sure that we get a
|
||||
// We have set the 'leader-actions-interval' to 5 seconds to make sure that we get a
|
||||
// chance to test the LEAVING state before the leader moves the node to EXITING
|
||||
awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) // wait on LEAVING
|
||||
val hasLeft = cluster.latestGossip.members.find(_.status == MemberStatus.Leaving) // verify node that left
|
||||
|
|
@ -17,7 +17,7 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig {
|
|||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set
|
||||
akka.cluster.unreachable-nodes-reaper-frequency = 30 s
|
||||
"""))
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
|
@ -41,7 +41,7 @@ abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec)
|
|||
"be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -14,14 +14,16 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig {
|
|||
val third = role("third")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
}
|
||||
|
||||
class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec
|
||||
class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec
|
||||
class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec
|
||||
|
||||
abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec {
|
||||
abstract class NodeMembershipSpec
|
||||
extends MultiNodeSpec(NodeMembershipMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import NodeMembershipMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 3
|
||||
|
|
@ -36,7 +38,7 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp
|
|||
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -51,7 +53,6 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp
|
|||
}
|
||||
|
||||
testConductor.enter("after-1")
|
||||
|
||||
}
|
||||
|
||||
"(when three nodes) start gossiping to each other so that all nodes gets the same gossip info" taggedAs LongRunningTest in {
|
||||
|
|
@ -68,8 +69,6 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp
|
|||
awaitCond(cluster.convergence.isDefined)
|
||||
|
||||
testConductor.enter("after-2")
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec)
|
|||
"not be singleton cluster when joined" taggedAs LongRunningTest in {
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -63,5 +63,4 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec)
|
|||
testConductor.enter("after-2")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,61 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
|
||||
object NodeStartupMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
|
||||
}
|
||||
|
||||
class NodeStartupMultiJvmNode1 extends NodeStartupSpec
|
||||
class NodeStartupMultiJvmNode2 extends NodeStartupSpec
|
||||
|
||||
abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec {
|
||||
import NodeStartupMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
|
||||
"A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must {
|
||||
|
||||
"be a singleton cluster when started up" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
awaitCond(cluster.isSingletonCluster)
|
||||
awaitUpConvergence(numberOfMembers = 1)
|
||||
assertLeader(first)
|
||||
}
|
||||
testConductor.enter("after-1")
|
||||
}
|
||||
}
|
||||
|
||||
"A second cluster node" must {
|
||||
"join the other node cluster when sending a Join command" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(second) {
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
awaitCond {
|
||||
cluster.latestGossip.members.exists { member ⇒
|
||||
member.address == secondAddress && member.status == MemberStatus.Up
|
||||
}
|
||||
}
|
||||
cluster.latestGossip.members.size must be(2)
|
||||
awaitCond(cluster.convergence.isDefined)
|
||||
assertLeader(first, second)
|
||||
testConductor.enter("after-2")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object NodeUpMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class NodeUpMultiJvmNode1 extends NodeUpSpec
|
||||
class NodeUpMultiJvmNode2 extends NodeUpSpec
|
||||
|
||||
abstract class NodeUpSpec
|
||||
extends MultiNodeSpec(NodeUpMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import NodeUpMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
|
||||
"A cluster node that is joining another cluster" must {
|
||||
"be moved to UP by the leader after a convergence" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
startClusterNode()
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
awaitUpConvergence(numberOfMembers = 2)
|
||||
|
||||
testConductor.enter("after")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -20,9 +20,9 @@ class ClusterConfigSpec extends AkkaSpec {
|
|||
FailureDetectorMaxSampleSize must be(1000)
|
||||
NodeToJoin must be(None)
|
||||
PeriodicTasksInitialDelay must be(1 seconds)
|
||||
GossipFrequency must be(1 second)
|
||||
LeaderActionsFrequency must be(1 second)
|
||||
UnreachableNodesReaperFrequency must be(1 second)
|
||||
GossipInterval must be(1 second)
|
||||
LeaderActionsInterval must be(1 second)
|
||||
UnreachableNodesReaperInterval must be(1 second)
|
||||
NrOfGossipDaemons must be(4)
|
||||
NrOfDeputyNodes must be(3)
|
||||
AutoDown must be(true)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue