diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 7dd511e34a..8c905d5b29 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -25,13 +25,13 @@ akka { periodic-tasks-initial-delay = 1s # how often should the node send out gossip information? - gossip-frequency = 1s + gossip-interval = 1s # how often should the leader perform maintenance tasks? - leader-actions-frequency = 1s + leader-actions-interval = 1s # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? - unreachable-nodes-reaper-frequency = 1s + unreachable-nodes-reaper-interval = 1s # accrual failure detection config failure-detector { diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index bb8b385781..0b2b3919f7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -380,9 +380,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private val vclockNode = VectorClock.Node(selfAddress.toString) private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay - private val gossipFrequency = clusterSettings.GossipFrequency - private val leaderActionsFrequency = clusterSettings.LeaderActionsFrequency - private val unreachableNodesReaperFrequency = clusterSettings.UnreachableNodesReaperFrequency + private val gossipInterval = clusterSettings.GossipInterval + private val leaderActionsInterval = clusterSettings.LeaderActionsInterval + private val unreachableNodesReaperInterval = clusterSettings.UnreachableNodesReaperInterval implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) @@ -424,17 +424,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // ======================================================== // start periodic gossip to random nodes in cluster - private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipFrequency) { + private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipInterval) { gossip() } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperFrequency) { + private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperInterval) { reapUnreachableMembers() } // start periodic leader action management (only applies for the current leader) - private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsFrequency) { + private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsInterval) { leaderActions() } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 8e9b9c770d..0e7dac06ab 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -20,9 +20,9 @@ class ClusterSettings(val config: Config, val systemName: String) { case AddressFromURIString(addr) ⇒ Some(addr) } val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) - val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip-frequency"), MILLISECONDS) - val LeaderActionsFrequency = Duration(getMilliseconds("akka.cluster.leader-actions-frequency"), MILLISECONDS) - val UnreachableNodesReaperFrequency = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-frequency"), MILLISECONDS) + val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) + val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) + val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") val AutoDown = getBoolean("akka.cluster.auto-down") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index a5ce2d4258..b241899ad6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -26,6 +26,7 @@ class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeT class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) with MultiNodeClusterSpec { + import ClientDowningNodeThatIsUnreachableMultiJvmSpec._ override def initialParticipants = 4 @@ -34,7 +35,7 @@ class ClientDowningNodeThatIsUnreachableSpec "be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() awaitUpConvergence(numberOfMembers = 4) val thirdAddress = node(third).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 666c3e207a..ff048a2eda 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -26,6 +26,7 @@ class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSp class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) with MultiNodeClusterSpec { + import ClientDowningNodeThatIsUpMultiJvmSpec._ override def initialParticipants = 4 @@ -34,7 +35,7 @@ class ClientDowningNodeThatIsUpSpec "be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() awaitUpConvergence(numberOfMembers = 4) val thirdAddress = node(third).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index afaeac747b..27a012d32e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -15,7 +15,7 @@ object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig { val third = role("third") commonConfig(debugConfig(on = false). - withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold=4")). + withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold = 4")). withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -38,7 +38,7 @@ abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(Gossipi "receive gossip heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -68,5 +68,4 @@ abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(Gossipi testConductor.enter("after-2") } } - } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 9f1395b5dd..e01839684a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -27,7 +27,10 @@ class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec -abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender { +abstract class JoinTwoClustersSpec + extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) + with MultiNodeClusterSpec { + import JoinTwoClustersMultiJvmSpec._ override def initialParticipants = 6 @@ -41,7 +44,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(a1, b1, c1) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -75,7 +78,6 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm assertLeader(c1, c2) testConductor.enter("four-members") - } "be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in { @@ -91,5 +93,4 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm testConductor.enter("six-members") } } - } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index dfd8dde310..e8b956e87b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -34,6 +34,7 @@ class LeaderDowningNodeThatIsUnreachableMultiJvmNode4 extends LeaderDowningNodeT class LeaderDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec) with MultiNodeClusterSpec { + import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._ override def initialParticipants = 4 @@ -42,7 +43,7 @@ class LeaderDowningNodeThatIsUnreachableSpec "be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() awaitUpConvergence(numberOfMembers = 4) val fourthAddress = node(fourth).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index e5972b7d7c..5a155fc195 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -17,7 +17,6 @@ object LeaderElectionMultiJvmSpec extends MultiNodeConfig { val fourth = role("fourth") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - } class LeaderElectionMultiJvmNode1 extends LeaderElectionSpec @@ -26,7 +25,10 @@ class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec -abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec { +abstract class LeaderElectionSpec + extends MultiNodeSpec(LeaderElectionMultiJvmSpec) + with MultiNodeClusterSpec { + import LeaderElectionMultiJvmSpec._ override def initialParticipants = 5 @@ -41,7 +43,7 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp "be able to 'elect' a single leader" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -91,7 +93,6 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp testConductor.enter("completed") } - } "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { @@ -102,5 +103,4 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp shutdownLeaderAndVerifyNewLeader(alreadyShutdown = 1) } } - } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala new file mode 100644 index 0000000000..8932eed6ee --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -0,0 +1,84 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + leader-actions-interval = 5 s # increase the leader action task interval + unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec + +abstract class MembershipChangeListenerExitingSpec + extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) + with MultiNodeClusterSpec { + + import MembershipChangeListenerExitingMultiJvmSpec._ + + override def initialParticipants = 3 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + + "A registered MembershipChangeListener" must { + "be notified when new node is EXITING" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + testConductor.enter("first-started") + + runOn(second, third) { + cluster.join(firstAddress) + } + awaitUpConvergence(numberOfMembers = 3) + testConductor.enter("rest-started") + + runOn(first) { + testConductor.enter("registered-listener") + cluster.leave(secondAddress) + } + + runOn(second) { + testConductor.enter("registered-listener") + } + + runOn(third) { + val exitingLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Exiting)) + exitingLatch.countDown() + } + }) + testConductor.enter("registered-listener") + exitingLatch.await + } + + testConductor.enter("finished") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala new file mode 100644 index 0000000000..2f82e12506 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + leader-actions-interval = 5 s # increase the leader action task interval to allow time checking for JOIN before leader moves it to UP + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec +class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec + +abstract class MembershipChangeListenerJoinSpec + extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) + with MultiNodeClusterSpec { + + import MembershipChangeListenerJoinMultiJvmSpec._ + + override def initialParticipants = 2 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A registered MembershipChangeListener" must { + "be notified when new node is JOINING" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + runOn(second) { + testConductor.enter("registered-listener") + cluster.join(firstAddress) + } + + runOn(first) { + val joinLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.exists(_.status == MemberStatus.Joining)) // second node is not part of node ring anymore + joinLatch.countDown() + } + }) + testConductor.enter("registered-listener") + + joinLatch.await + cluster.convergence.isDefined must be(true) + } + + testConductor.enter("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala new file mode 100644 index 0000000000..089f241849 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster.leader-actions-interval = 5 s + akka.cluster.unreachable-nodes-reaper-interval = 30 s + """)) + .withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec + +abstract class MembershipChangeListenerLeavingSpec + extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) + with MultiNodeClusterSpec { + + import MembershipChangeListenerLeavingMultiJvmSpec._ + + override def initialParticipants = 3 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + + "A registered MembershipChangeListener" must { + "be notified when new node is LEAVING" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + testConductor.enter("first-started") + + runOn(second, third) { + cluster.join(firstAddress) + } + awaitUpConvergence(numberOfMembers = 3) + testConductor.enter("rest-started") + + runOn(first) { + testConductor.enter("registered-listener") + cluster.leave(secondAddress) + } + + runOn(second) { + testConductor.enter("registered-listener") + } + + runOn(third) { + val latch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Leaving)) + latch.countDown() + } + }) + testConductor.enter("registered-listener") + latch.await + } + + testConductor.enter("finished") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index 070fb80553..352f9de1a4 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -15,7 +15,6 @@ object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { val third = role("third") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - } class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec @@ -55,7 +54,6 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan } testConductor.enter("after-1") - } "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { @@ -75,8 +73,6 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan cluster.convergence.isDefined must be(true) testConductor.enter("after-2") - } } - } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala new file mode 100644 index 0000000000..3df6b876f9 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec + +abstract class MembershipChangeListenerUpSpec + extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec) + with MultiNodeClusterSpec { + + import MembershipChangeListenerUpMultiJvmSpec._ + + override def initialParticipants = 2 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A registered MembershipChangeListener" must { + "be notified when new node is marked as UP by the leader" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + runOn(second) { + testConductor.enter("registered-listener") + cluster.join(firstAddress) + } + + runOn(first) { + val upLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) + upLatch.countDown() + } + }) + testConductor.enter("registered-listener") + + upLatch.await + awaitUpConvergence(numberOfMembers = 2) + } + + testConductor.enter("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index a341d7d22c..bf431f74f6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -15,11 +15,11 @@ import akka.util.Duration object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { - auto-down = off - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - unreachable-nodes-reaper-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms + auto-down = off + gossip-interval = 200 ms + leader-actions-interval = 200 ms + unreachable-nodes-reaper-interval = 200 ms + periodic-tasks-initial-delay = 300 ms } akka.test { single-expect-default = 5 s @@ -29,8 +29,16 @@ object MultiNodeClusterSpec { trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ + /** + * Create a cluster node using 'Cluster(system)'. + */ def cluster: Cluster = Cluster(system) + /** + * Use this method instead of 'cluster.self'. + */ + def startClusterNode(): Unit = cluster.self + /** * Assert that the member addresses match the expected addresses in the * sort order used by the cluster. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala new file mode 100644 index 0000000000..99116ecb25 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeJoinMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + leader-actions-interval = 5 s # increase the leader action task interval + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class NodeJoinMultiJvmNode1 extends NodeJoinSpec +class NodeJoinMultiJvmNode2 extends NodeJoinSpec + +abstract class NodeJoinSpec + extends MultiNodeSpec(NodeJoinMultiJvmSpec) + with MultiNodeClusterSpec { + + import NodeJoinMultiJvmSpec._ + + override def initialParticipants = 2 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A cluster node" must { + "join another cluster and get status JOINING - when sending a 'Join' command" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + runOn(second) { + cluster.join(firstAddress) + } + + awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining }) + + testConductor.enter("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemoved.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala similarity index 93% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemoved.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index 8ea16dfa8a..ebab4f6ba3 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemoved.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -22,8 +22,10 @@ class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndEx class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode2 extends NodeLeavingAndExitingAndBeingRemovedSpec class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode3 extends NodeLeavingAndExitingAndBeingRemovedSpec -abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec) +abstract class NodeLeavingAndExitingAndBeingRemovedSpec + extends MultiNodeSpec(NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec) with MultiNodeClusterSpec { + import NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec._ override def initialParticipants = 3 @@ -39,7 +41,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(No "be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExiting.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala similarity index 84% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExiting.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 448d57d6e7..31630f934c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExiting.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -19,8 +19,8 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" akka.cluster { - leader-actions-frequency = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state - unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set + leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state + unreachable-nodes-reaper-interval = 30 s } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) @@ -30,8 +30,10 @@ class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec -abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) +abstract class NodeLeavingAndExitingSpec + extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) with MultiNodeClusterSpec { + import NodeLeavingAndExitingMultiJvmSpec._ override def initialParticipants = 3 @@ -45,7 +47,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi "be moved to EXITING by the leader" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -63,7 +65,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi runOn(first, third) { // 1. Verify that 'second' node is set to LEAVING - // We have set the 'leader-actions-frequency' to 5 seconds to make sure that we get a + // We have set the 'leader-actions-interval' to 5 seconds to make sure that we get a // chance to test the LEAVING state before the leader moves the node to EXITING awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) // wait on LEAVING val hasLeft = cluster.latestGossip.members.find(_.status == MemberStatus.Leaving) // verify node that left diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala similarity index 93% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala index 39fee8acfa..17db90c880 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala @@ -17,7 +17,7 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster.unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set + akka.cluster.unreachable-nodes-reaper-frequency = 30 s """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -41,7 +41,7 @@ abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec) "be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index cf6839dd83..edd3e44121 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -14,14 +14,16 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig { val third = role("third") commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - } class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec -abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec { +abstract class NodeMembershipSpec + extends MultiNodeSpec(NodeMembershipMultiJvmSpec) + with MultiNodeClusterSpec { + import NodeMembershipMultiJvmSpec._ override def initialParticipants = 3 @@ -36,7 +38,7 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -51,7 +53,6 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp } testConductor.enter("after-1") - } "(when three nodes) start gossiping to each other so that all nodes gets the same gossip info" taggedAs LongRunningTest in { @@ -68,8 +69,6 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp awaitCond(cluster.convergence.isDefined) testConductor.enter("after-2") - } } - } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala index 1179f89d76..c0ac1ee22b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala @@ -37,7 +37,7 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) "not be singleton cluster when joined" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -63,5 +63,4 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) testConductor.enter("after-2") } } - } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala deleted file mode 100644 index 7e3fdb3323..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import com.typesafe.config.ConfigFactory -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -object NodeStartupMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - -} - -class NodeStartupMultiJvmNode1 extends NodeStartupSpec -class NodeStartupMultiJvmNode2 extends NodeStartupSpec - -abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec { - import NodeStartupMultiJvmSpec._ - - override def initialParticipants = 2 - - lazy val firstAddress = node(first).address - lazy val secondAddress = node(second).address - - "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { - - "be a singleton cluster when started up" taggedAs LongRunningTest in { - runOn(first) { - awaitCond(cluster.isSingletonCluster) - awaitUpConvergence(numberOfMembers = 1) - assertLeader(first) - } - testConductor.enter("after-1") - } - } - - "A second cluster node" must { - "join the other node cluster when sending a Join command" taggedAs LongRunningTest in { - - runOn(second) { - cluster.join(firstAddress) - } - - awaitCond { - cluster.latestGossip.members.exists { member ⇒ - member.address == secondAddress && member.status == MemberStatus.Up - } - } - cluster.latestGossip.members.size must be(2) - awaitCond(cluster.convergence.isDefined) - assertLeader(first, second) - testConductor.enter("after-2") - } - } - -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala new file mode 100644 index 0000000000..7931ce48f1 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class NodeUpMultiJvmNode1 extends NodeUpSpec +class NodeUpMultiJvmNode2 extends NodeUpSpec + +abstract class NodeUpSpec + extends MultiNodeSpec(NodeUpMultiJvmSpec) + with MultiNodeClusterSpec { + + import NodeUpMultiJvmSpec._ + + override def initialParticipants = 2 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A cluster node that is joining another cluster" must { + "be moved to UP by the leader after a convergence" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + runOn(second) { + cluster.join(firstAddress) + } + + awaitUpConvergence(numberOfMembers = 2) + + testConductor.enter("after") + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 45b0a35521..6b2ff1962c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -20,9 +20,9 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorMaxSampleSize must be(1000) NodeToJoin must be(None) PeriodicTasksInitialDelay must be(1 seconds) - GossipFrequency must be(1 second) - LeaderActionsFrequency must be(1 second) - UnreachableNodesReaperFrequency must be(1 second) + GossipInterval must be(1 second) + LeaderActionsInterval must be(1 second) + UnreachableNodesReaperInterval must be(1 second) NrOfGossipDaemons must be(4) NrOfDeputyNodes must be(3) AutoDown must be(true)