Misc changes, fixes and improvements after review.
- Renamed all 'frequency' to 'interval' - Split up NodeJoinAndUpSpec and into NodeJoinSpec and NodeUpSpec. - Split up MembershipChangeListenerJoinAndUpSpec and into MembershipChangeListenerJoinSpec and MembershipChangeListenerUpSpec. - Added utility method 'startClusterNode()' - Fixed race in register listener and telling node to leave - Removed 'after' blocks - Cleaned up unused code - Improved comments Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
parent
20be83d0eb
commit
391fed6594
23 changed files with 289 additions and 174 deletions
|
|
@ -25,13 +25,13 @@ akka {
|
|||
periodic-tasks-initial-delay = 1s
|
||||
|
||||
# how often should the node send out gossip information?
|
||||
gossip-frequency = 1s
|
||||
gossip-interval = 1s
|
||||
|
||||
# how often should the leader perform maintenance tasks?
|
||||
leader-actions-frequency = 1s
|
||||
leader-actions-interval = 1s
|
||||
|
||||
# how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring?
|
||||
unreachable-nodes-reaper-frequency = 1s
|
||||
unreachable-nodes-reaper-interval = 1s
|
||||
|
||||
# accrual failure detection config
|
||||
failure-detector {
|
||||
|
|
|
|||
|
|
@ -380,9 +380,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
private val vclockNode = VectorClock.Node(selfAddress.toString)
|
||||
|
||||
private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay
|
||||
private val gossipFrequency = clusterSettings.GossipFrequency
|
||||
private val leaderActionsFrequency = clusterSettings.LeaderActionsFrequency
|
||||
private val unreachableNodesReaperFrequency = clusterSettings.UnreachableNodesReaperFrequency
|
||||
private val gossipInterval = clusterSettings.GossipInterval
|
||||
private val leaderActionsInterval = clusterSettings.LeaderActionsInterval
|
||||
private val unreachableNodesReaperInterval = clusterSettings.UnreachableNodesReaperInterval
|
||||
|
||||
implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout)
|
||||
|
||||
|
|
@ -424,17 +424,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
// ========================================================
|
||||
|
||||
// start periodic gossip to random nodes in cluster
|
||||
private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipFrequency) {
|
||||
private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipInterval) {
|
||||
gossip()
|
||||
}
|
||||
|
||||
// start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list)
|
||||
private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperFrequency) {
|
||||
private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperInterval) {
|
||||
reapUnreachableMembers()
|
||||
}
|
||||
|
||||
// start periodic leader action management (only applies for the current leader)
|
||||
private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsFrequency) {
|
||||
private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsInterval) {
|
||||
leaderActions()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,9 +20,9 @@ class ClusterSettings(val config: Config, val systemName: String) {
|
|||
case AddressFromURIString(addr) ⇒ Some(addr)
|
||||
}
|
||||
val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS)
|
||||
val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip-frequency"), MILLISECONDS)
|
||||
val LeaderActionsFrequency = Duration(getMilliseconds("akka.cluster.leader-actions-frequency"), MILLISECONDS)
|
||||
val UnreachableNodesReaperFrequency = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-frequency"), MILLISECONDS)
|
||||
val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS)
|
||||
val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS)
|
||||
val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS)
|
||||
val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons")
|
||||
val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes")
|
||||
val AutoDown = getBoolean("akka.cluster.auto-down")
|
||||
|
|
|
|||
|
|
@ -26,8 +26,8 @@ class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeT
|
|||
|
||||
class ClientDowningNodeThatIsUnreachableSpec
|
||||
extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender with BeforeAndAfter {
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import ClientDowningNodeThatIsUnreachableMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 4
|
||||
|
|
@ -36,7 +36,7 @@ class ClientDowningNodeThatIsUnreachableSpec
|
|||
|
||||
"be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
awaitUpConvergence(numberOfMembers = 4)
|
||||
|
||||
val thirdAddress = node(third).address
|
||||
|
|
|
|||
|
|
@ -26,8 +26,8 @@ class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSp
|
|||
|
||||
class ClientDowningNodeThatIsUpSpec
|
||||
extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender with BeforeAndAfter {
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import ClientDowningNodeThatIsUpMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 4
|
||||
|
|
@ -36,7 +36,7 @@ class ClientDowningNodeThatIsUpSpec
|
|||
|
||||
"be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
awaitUpConvergence(numberOfMembers = 4)
|
||||
|
||||
val thirdAddress = node(third).address
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(Gossipi
|
|||
"receive gossip heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in {
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
|
|||
|
|
@ -27,7 +27,10 @@ class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec
|
|||
class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec
|
||||
class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec
|
||||
|
||||
abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender {
|
||||
abstract class JoinTwoClustersSpec
|
||||
extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import JoinTwoClustersMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 6
|
||||
|
|
@ -41,7 +44,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm
|
|||
"be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in {
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(a1, b1, c1) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
|
|||
|
|
@ -34,8 +34,8 @@ class LeaderDowningNodeThatIsUnreachableMultiJvmNode4 extends LeaderDowningNodeT
|
|||
|
||||
class LeaderDowningNodeThatIsUnreachableSpec
|
||||
extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender with BeforeAndAfter {
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 4
|
||||
|
|
@ -44,7 +44,7 @@ class LeaderDowningNodeThatIsUnreachableSpec
|
|||
|
||||
"be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
awaitUpConvergence(numberOfMembers = 4)
|
||||
|
||||
val fourthAddress = node(fourth).address
|
||||
|
|
|
|||
|
|
@ -26,7 +26,10 @@ class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec
|
|||
class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec
|
||||
class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec
|
||||
|
||||
abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec {
|
||||
abstract class LeaderElectionSpec
|
||||
extends MultiNodeSpec(LeaderElectionMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import LeaderElectionMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 5
|
||||
|
|
@ -41,7 +44,7 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp
|
|||
"be able to 'elect' a single leader" taggedAs LongRunningTest in {
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
|
|||
|
|
@ -20,8 +20,8 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig {
|
|||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-frequency = 5000 ms # increase the leader action task frequency
|
||||
unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set
|
||||
leader-actions-interval = 5 s # increase the leader action task interval
|
||||
unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
|
|
@ -31,8 +31,10 @@ class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListe
|
|||
class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec
|
||||
class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec
|
||||
|
||||
abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
||||
abstract class MembershipChangeListenerExitingSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import MembershipChangeListenerExitingMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 3
|
||||
|
|
@ -45,7 +47,7 @@ abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(Members
|
|||
"be notified when new node is EXITING" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -55,21 +57,27 @@ abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(Members
|
|||
awaitUpConvergence(numberOfMembers = 3)
|
||||
testConductor.enter("rest-started")
|
||||
|
||||
runOn(first) {
|
||||
testConductor.enter("registered-listener")
|
||||
cluster.leave(secondAddress)
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
testConductor.enter("registered-listener")
|
||||
}
|
||||
|
||||
runOn(third) {
|
||||
val exitingLatch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
if (members.size == 3 && members.exists(_.status == MemberStatus.Exiting))
|
||||
if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Exiting))
|
||||
exitingLatch.countDown()
|
||||
}
|
||||
})
|
||||
testConductor.enter("registered-listener")
|
||||
exitingLatch.await
|
||||
}
|
||||
|
||||
runOn(first) {
|
||||
cluster.leave(secondAddress)
|
||||
}
|
||||
|
||||
testConductor.enter("finished")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ import akka.remote.testkit.MultiNodeSpec
|
|||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object MembershipChangeListenerJoinAndUpMultiJvmSpec extends MultiNodeConfig {
|
||||
object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
|
|
@ -19,46 +19,39 @@ object MembershipChangeListenerJoinAndUpMultiJvmSpec extends MultiNodeConfig {
|
|||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
gossip-frequency = 1000 ms
|
||||
leader-actions-frequency = 5000 ms # increase the leader action task frequency
|
||||
leader-actions-interval = 5 s # increase the leader action task interval to allow time checking for JOIN before leader moves it to UP
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
}
|
||||
|
||||
class MembershipChangeListenerJoinAndUpMultiJvmNode1 extends MembershipChangeListenerJoinAndUpSpec
|
||||
class MembershipChangeListenerJoinAndUpMultiJvmNode2 extends MembershipChangeListenerJoinAndUpSpec
|
||||
class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec
|
||||
class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec
|
||||
|
||||
abstract class MembershipChangeListenerJoinAndUpSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerJoinAndUpMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender
|
||||
with BeforeAndAfter {
|
||||
abstract class MembershipChangeListenerJoinSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import MembershipChangeListenerJoinAndUpMultiJvmSpec._
|
||||
import MembershipChangeListenerJoinMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
after {
|
||||
testConductor.enter("after")
|
||||
}
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
|
||||
"A registered MembershipChangeListener" must {
|
||||
"be notified when new node is JOINING and node is marked as UP by the leader" taggedAs LongRunningTest in {
|
||||
"be notified when new node is JOINING" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
testConductor.enter("registered-listener")
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
runOn(first) {
|
||||
// JOINING
|
||||
val joinLatch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
|
|
@ -66,20 +59,13 @@ abstract class MembershipChangeListenerJoinAndUpSpec
|
|||
joinLatch.countDown()
|
||||
}
|
||||
})
|
||||
testConductor.enter("registered-listener")
|
||||
|
||||
joinLatch.await
|
||||
cluster.convergence.isDefined must be(true)
|
||||
}
|
||||
|
||||
// UP
|
||||
val upLatch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
|
||||
upLatch.countDown()
|
||||
}
|
||||
})
|
||||
upLatch.await
|
||||
awaitCond(cluster.convergence.isDefined)
|
||||
}
|
||||
testConductor.enter("after")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -18,8 +18,8 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig {
|
|||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.leader-actions-frequency = 5000 ms
|
||||
akka.cluster.unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set
|
||||
akka.cluster.leader-actions-interval = 5 s
|
||||
akka.cluster.unreachable-nodes-reaper-interval = 30 s
|
||||
"""))
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
|
@ -28,8 +28,10 @@ class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListe
|
|||
class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec
|
||||
class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec
|
||||
|
||||
abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
||||
abstract class MembershipChangeListenerLeavingSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import MembershipChangeListenerLeavingMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 3
|
||||
|
|
@ -42,7 +44,7 @@ abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(Members
|
|||
"be notified when new node is LEAVING" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -52,21 +54,27 @@ abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(Members
|
|||
awaitUpConvergence(numberOfMembers = 3)
|
||||
testConductor.enter("rest-started")
|
||||
|
||||
runOn(first) {
|
||||
testConductor.enter("registered-listener")
|
||||
cluster.leave(secondAddress)
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
testConductor.enter("registered-listener")
|
||||
}
|
||||
|
||||
runOn(third) {
|
||||
val latch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
if (members.size == 3 && members.exists(_.status == MemberStatus.Leaving))
|
||||
if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Leaving))
|
||||
latch.countDown()
|
||||
}
|
||||
})
|
||||
testConductor.enter("registered-listener")
|
||||
latch.await
|
||||
}
|
||||
|
||||
runOn(first) {
|
||||
cluster.leave(secondAddress)
|
||||
}
|
||||
|
||||
testConductor.enter("finished")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,64 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import scala.collection.immutable.SortedSet
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec
|
||||
class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec
|
||||
|
||||
abstract class MembershipChangeListenerUpSpec
|
||||
extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import MembershipChangeListenerUpMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
|
||||
"A registered MembershipChangeListener" must {
|
||||
"be notified when new node is marked as UP by the leader" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
startClusterNode()
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
testConductor.enter("registered-listener")
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
runOn(first) {
|
||||
val upLatch = TestLatch()
|
||||
cluster.registerListener(new MembershipChangeListener {
|
||||
def notify(members: SortedSet[Member]) {
|
||||
if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
|
||||
upLatch.countDown()
|
||||
}
|
||||
})
|
||||
testConductor.enter("registered-listener")
|
||||
|
||||
upLatch.await
|
||||
awaitUpConvergence(numberOfMembers = 2)
|
||||
}
|
||||
|
||||
testConductor.enter("after")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -16,9 +16,9 @@ object MultiNodeClusterSpec {
|
|||
def clusterConfig: Config = ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
auto-down = off
|
||||
gossip-frequency = 200 ms
|
||||
leader-actions-frequency = 200 ms
|
||||
unreachable-nodes-reaper-frequency = 200 ms
|
||||
gossip-interval = 200 ms
|
||||
leader-actions-interval = 200 ms
|
||||
unreachable-nodes-reaper-interval = 200 ms
|
||||
periodic-tasks-initial-delay = 300 ms
|
||||
}
|
||||
akka.test {
|
||||
|
|
@ -29,8 +29,16 @@ object MultiNodeClusterSpec {
|
|||
|
||||
trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
|
||||
|
||||
/**
|
||||
* Create a cluster node using 'Cluster(system)'.
|
||||
*/
|
||||
def cluster: Cluster = Cluster(system)
|
||||
|
||||
/**
|
||||
* Use this method instead of 'cluster.self'.
|
||||
*/
|
||||
def startClusterNode(): Unit = cluster.self
|
||||
|
||||
/**
|
||||
* Assert that the member addresses match the expected addresses in the
|
||||
* sort order used by the cluster.
|
||||
|
|
|
|||
|
|
@ -1,76 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object NodeJoinAndUpMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
gossip-frequency = 1000 ms
|
||||
leader-actions-frequency = 5000 ms # increase the leader action task frequency
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
}
|
||||
|
||||
class NodeJoinAndUpMultiJvmNode1 extends NodeJoinAndUpSpec
|
||||
class NodeJoinAndUpMultiJvmNode2 extends NodeJoinAndUpSpec
|
||||
|
||||
abstract class NodeJoinAndUpSpec
|
||||
extends MultiNodeSpec(NodeJoinAndUpMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender
|
||||
with BeforeAndAfter {
|
||||
|
||||
import NodeJoinAndUpMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
after {
|
||||
testConductor.enter("after")
|
||||
}
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
|
||||
"A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must {
|
||||
|
||||
"be a singleton cluster when started up" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
awaitCond(cluster.isSingletonCluster)
|
||||
awaitUpConvergence(numberOfMembers = 1)
|
||||
cluster.isLeader must be(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"A second cluster node" must {
|
||||
"join the cluster as JOINING - when sending a 'Join' command - and then be moved to UP by the leader" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(second) {
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining })
|
||||
|
||||
awaitCond(
|
||||
cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Up },
|
||||
30.seconds.dilated) // waiting for the leader to move from JOINING -> UP (frequency set to 5 sec in config)
|
||||
|
||||
cluster.latestGossip.members.size must be(2)
|
||||
awaitCond(cluster.convergence.isDefined)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object NodeJoinMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-interval = 5 s # increase the leader action task interval
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
}
|
||||
|
||||
class NodeJoinMultiJvmNode1 extends NodeJoinSpec
|
||||
class NodeJoinMultiJvmNode2 extends NodeJoinSpec
|
||||
|
||||
abstract class NodeJoinSpec
|
||||
extends MultiNodeSpec(NodeJoinMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import NodeJoinMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
|
||||
"A cluster node" must {
|
||||
"join another cluster and get status JOINING - when sending a 'Join' command" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
startClusterNode()
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining })
|
||||
|
||||
testConductor.enter("after")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -40,7 +40,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(No
|
|||
"be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
|
|||
|
|
@ -20,8 +20,8 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig {
|
|||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster {
|
||||
leader-actions-frequency = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
|
||||
unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set
|
||||
leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state
|
||||
unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set
|
||||
}
|
||||
""")
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig)))
|
||||
|
|
@ -31,8 +31,10 @@ class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec
|
|||
class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec
|
||||
class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec
|
||||
|
||||
abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
||||
abstract class NodeLeavingAndExitingSpec
|
||||
extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import NodeLeavingAndExitingMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 3
|
||||
|
|
@ -46,7 +48,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi
|
|||
"be moved to EXITING by the leader" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -64,7 +66,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi
|
|||
runOn(first, third) {
|
||||
|
||||
// 1. Verify that 'second' node is set to LEAVING
|
||||
// We have set the 'leader-actions-frequency' to 5 seconds to make sure that we get a
|
||||
// We have set the 'leader-actions-interval' to 5 seconds to make sure that we get a
|
||||
// chance to test the LEAVING state before the leader moves the node to EXITING
|
||||
awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) // wait on LEAVING
|
||||
val hasLeft = cluster.latestGossip.members.find(_.status == MemberStatus.Leaving) // verify node that left
|
||||
|
|
|
|||
|
|
@ -18,8 +18,8 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig {
|
|||
commonConfig(
|
||||
debugConfig(on = false)
|
||||
.withFallback(ConfigFactory.parseString("""
|
||||
akka.cluster.leader-actions-frequency = 5 s
|
||||
akka.cluster.unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set
|
||||
akka.cluster.leader-actions-interval = 5 s
|
||||
akka.cluster.unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set
|
||||
"""))
|
||||
.withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
|
@ -43,7 +43,7 @@ abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec)
|
|||
"be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,11 @@ class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec
|
|||
class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec
|
||||
class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec
|
||||
|
||||
abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
||||
abstract class NodeMembershipSpec
|
||||
extends MultiNodeSpec(NodeMembershipMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender with BeforeAndAfter {
|
||||
|
||||
import NodeMembershipMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 3
|
||||
|
|
@ -41,7 +45,7 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp
|
|||
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
|
|||
|
|
@ -42,7 +42,7 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec)
|
|||
"not be singleton cluster when joined" taggedAs LongRunningTest in {
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
startClusterNode()
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
|
|
@ -63,8 +63,6 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec)
|
|||
cluster.isSingletonCluster must be(true)
|
||||
assertLeader(first)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,50 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
|
||||
object NodeUpMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
|
||||
commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class NodeUpMultiJvmNode1 extends NodeUpSpec
|
||||
class NodeUpMultiJvmNode2 extends NodeUpSpec
|
||||
|
||||
abstract class NodeUpSpec
|
||||
extends MultiNodeSpec(NodeUpMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import NodeUpMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 2
|
||||
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
|
||||
"A cluster node that is joining another cluster" must {
|
||||
"be moved to UP by the leader after a convergence" taggedAs LongRunningTest in {
|
||||
|
||||
runOn(first) {
|
||||
startClusterNode()
|
||||
}
|
||||
|
||||
runOn(second) {
|
||||
cluster.join(firstAddress)
|
||||
}
|
||||
|
||||
awaitUpConvergence(numberOfMembers = 2)
|
||||
|
||||
testConductor.enter("after")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -20,9 +20,9 @@ class ClusterConfigSpec extends AkkaSpec {
|
|||
FailureDetectorMaxSampleSize must be(1000)
|
||||
NodeToJoin must be(None)
|
||||
PeriodicTasksInitialDelay must be(1 seconds)
|
||||
GossipFrequency must be(1 second)
|
||||
LeaderActionsFrequency must be(1 second)
|
||||
UnreachableNodesReaperFrequency must be(1 second)
|
||||
GossipInterval must be(1 second)
|
||||
LeaderActionsInterval must be(1 second)
|
||||
UnreachableNodesReaperInterval must be(1 second)
|
||||
NrOfGossipDaemons must be(4)
|
||||
NrOfDeputyNodes must be(3)
|
||||
AutoDown must be(true)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue