From e0fbf2f3246bf25d859557605b2366dc7a13de1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 1 Jun 2012 18:06:43 +0200 Subject: [PATCH 1/6] Renamed the NodeStartupSpec to NodeJoinAndUpSpec and added tests for both JOINING and UP. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../akka/cluster/NodeJoinAndUpSpec.scala | 76 +++++++++++++++++++ .../scala/akka/cluster/NodeStartupSpec.scala | 63 --------------- 2 files changed, 76 insertions(+), 63 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala new file mode 100644 index 0000000000..5415df1b4a --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeJoinAndUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + gossip-frequency = 1000 ms + leader-actions-frequency = 5000 ms # increase the leader action task frequency + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class NodeJoinAndUpMultiJvmNode1 extends NodeJoinAndUpSpec +class NodeJoinAndUpMultiJvmNode2 extends NodeJoinAndUpSpec + +abstract class NodeJoinAndUpSpec + extends MultiNodeSpec(NodeJoinAndUpMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender + with BeforeAndAfter { + + import NodeJoinAndUpMultiJvmSpec._ + + override def initialParticipants = 2 + + after { + testConductor.enter("after") + } + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { + + "be a singleton cluster when started up" taggedAs LongRunningTest in { + runOn(first) { + awaitCond(cluster.isSingletonCluster) + awaitUpConvergence(numberOfMembers = 1) + cluster.isLeader must be(true) + } + } + } + + "A second cluster node" must { + "join the cluster as JOINING - when sending a 'Join' command - and then be moved to UP by the leader" taggedAs LongRunningTest in { + + runOn(second) { + cluster.join(firstAddress) + } + + awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining }) + + awaitCond( + cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Up }, + 30.seconds.dilated) // waiting for the leader to move from JOINING -> UP (frequency set to 5 sec in config) + + cluster.latestGossip.members.size must be(2) + awaitCond(cluster.convergence.isDefined) + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala deleted file mode 100644 index 44682b81f7..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -object NodeStartupMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - -} - -class NodeStartupMultiJvmNode1 extends NodeStartupSpec -class NodeStartupMultiJvmNode2 extends NodeStartupSpec - -abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { - import NodeStartupMultiJvmSpec._ - - override def initialParticipants = 2 - - after { - testConductor.enter("after") - } - - lazy val firstAddress = node(first).address - lazy val secondAddress = node(second).address - - "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { - - "be a singleton cluster when started up" taggedAs LongRunningTest in { - runOn(first) { - awaitCond(cluster.isSingletonCluster) - awaitUpConvergence(numberOfMembers = 1) - cluster.isLeader must be(true) - } - } - } - - "A second cluster node" must { - "join the other node cluster when sending a Join command" taggedAs LongRunningTest in { - - runOn(second) { - cluster.join(firstAddress) - } - - awaitCond { - cluster.latestGossip.members.exists { member ⇒ - member.address == secondAddress && member.status == MemberStatus.Up - } - } - cluster.latestGossip.members.size must be(2) - awaitCond(cluster.convergence.isDefined) - } - } - -} From d1fb1b925259bf52024be294593bfcc75f55feb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 2 Jun 2012 17:36:10 +0200 Subject: [PATCH 2/6] Changed name of test files to end with *Spec. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../MembershipChangeListenerSpec.scala | 83 ------------------- ...eavingAndExitingAndBeingRemovedSpec.scala} | 0 ....scala => NodeLeavingAndExitingSpec.scala} | 0 ...odeLeaving.scala => NodeLeavingSpec.scala} | 3 +- 4 files changed, 2 insertions(+), 84 deletions(-) delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala rename akka-cluster/src/multi-jvm/scala/akka/cluster/{NodeLeavingAndExitingAndBeingRemoved.scala => NodeLeavingAndExitingAndBeingRemovedSpec.scala} (100%) rename akka-cluster/src/multi-jvm/scala/akka/cluster/{NodeLeavingAndExiting.scala => NodeLeavingAndExitingSpec.scala} (100%) rename akka-cluster/src/multi-jvm/scala/akka/cluster/{NodeLeaving.scala => NodeLeavingSpec.scala} (94%) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala deleted file mode 100644 index f818c97744..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import scala.collection.immutable.SortedSet -import org.scalatest.BeforeAndAfter -import com.typesafe.config.ConfigFactory -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - val third = role("third") - - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - -} - -class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec -class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec -class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec - -abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) - with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { - import MembershipChangeListenerMultiJvmSpec._ - - override def initialParticipants = 3 - - after { - testConductor.enter("after") - } - - lazy val firstAddress = node(first).address - lazy val secondAddress = node(second).address - - "A set of connected cluster systems" must { - - "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - - // make sure that the node-to-join is started before other join - runOn(first) { - cluster.self - } - testConductor.enter("first-started") - - runOn(first, second) { - cluster.join(firstAddress) - val latch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) - latch.countDown() - } - }) - latch.await - cluster.convergence.isDefined must be(true) - } - - } - - "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - - runOn(third) { - cluster.join(firstAddress) - } - - val latch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) - latch.countDown() - } - }) - latch.await - cluster.convergence.isDefined must be(true) - - } - } - -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemoved.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala similarity index 100% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemoved.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExiting.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala similarity index 100% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExiting.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala similarity index 94% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala index a6ddccb806..c4cf3fc12c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala @@ -18,7 +18,8 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster.unreachable-nodes-reaper-frequency = 30000 # turn "off" reaping to unreachable node set + akka.cluster.leader-actions-frequency = 5000 ms + akka.cluster.unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) } From bcc6e4c11f4fe9fc98d9c8a6ab2893c15552aafd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 2 Jun 2012 17:37:13 +0200 Subject: [PATCH 3/6] Added test for testing that MemberChangeListener is triggered by node EXITING event. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../MembershipChangeListenerExitingSpec.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala new file mode 100644 index 0000000000..0145628bd5 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + leader-actions-frequency = 5000 ms # increase the leader action task frequency + unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec + +abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import MembershipChangeListenerExitingMultiJvmSpec._ + + override def initialParticipants = 3 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + + "A registered MembershipChangeListener" must { + "be notified when new node is EXITING" taggedAs LongRunningTest in { + + runOn(first) { + cluster.self + } + testConductor.enter("first-started") + + runOn(second, third) { + cluster.join(firstAddress) + } + awaitUpConvergence(numberOfMembers = 3) + testConductor.enter("rest-started") + + runOn(third) { + val exitingLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.exists(_.status == MemberStatus.Exiting)) + exitingLatch.countDown() + } + }) + exitingLatch.await + } + + runOn(first) { + cluster.leave(secondAddress) + } + + testConductor.enter("finished") + } + } +} From 5dc039b0f1b9479e674d843fa8c28a443bbc85eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 2 Jun 2012 17:37:29 +0200 Subject: [PATCH 4/6] Added test for testing that MemberChangeListener is triggered by node JOINING and UP events. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- ...embershipChangeListenerJoinAndUpSpec.scala | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala new file mode 100644 index 0000000000..81e32d1491 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object MembershipChangeListenerJoinAndUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + gossip-frequency = 1000 ms + leader-actions-frequency = 5000 ms # increase the leader action task frequency + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class MembershipChangeListenerJoinAndUpMultiJvmNode1 extends MembershipChangeListenerJoinAndUpSpec +class MembershipChangeListenerJoinAndUpMultiJvmNode2 extends MembershipChangeListenerJoinAndUpSpec + +abstract class MembershipChangeListenerJoinAndUpSpec + extends MultiNodeSpec(MembershipChangeListenerJoinAndUpMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender + with BeforeAndAfter { + + import MembershipChangeListenerJoinAndUpMultiJvmSpec._ + + override def initialParticipants = 2 + + after { + testConductor.enter("after") + } + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A registered MembershipChangeListener" must { + "be notified when new node is JOINING and node is marked as UP by the leader" taggedAs LongRunningTest in { + + runOn(first) { + cluster.self + } + + runOn(second) { + cluster.join(firstAddress) + } + + runOn(first) { + // JOINING + val joinLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.exists(_.status == MemberStatus.Joining)) // second node is not part of node ring anymore + joinLatch.countDown() + } + }) + joinLatch.await + cluster.convergence.isDefined must be(true) + + // UP + val upLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) + upLatch.countDown() + } + }) + upLatch.await + awaitCond(cluster.convergence.isDefined) + } + } + } +} From ead5bf8695c26b207066ec32b5ea1e4fbc5a1b42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 2 Jun 2012 17:37:41 +0200 Subject: [PATCH 5/6] Added test for testing that MemberChangeListener is triggered by node LEAVING event. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../MembershipChangeListenerLeavingSpec.scala | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala new file mode 100644 index 0000000000..f8b083c4d8 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster.leader-actions-frequency = 5000 ms + akka.cluster.unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set + """)) + .withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec + +abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import MembershipChangeListenerLeavingMultiJvmSpec._ + + override def initialParticipants = 3 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + + "A registered MembershipChangeListener" must { + "be notified when new node is LEAVING" taggedAs LongRunningTest in { + + runOn(first) { + cluster.self + } + testConductor.enter("first-started") + + runOn(second, third) { + cluster.join(firstAddress) + } + awaitUpConvergence(numberOfMembers = 3) + testConductor.enter("rest-started") + + runOn(third) { + val latch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.exists(_.status == MemberStatus.Leaving)) + latch.countDown() + } + }) + latch.await + } + + runOn(first) { + cluster.leave(secondAddress) + } + + testConductor.enter("finished") + } + } +} From 391fed65941c29aa7d139011b0a97fb7c37f768e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 4 Jun 2012 23:21:28 +0200 Subject: [PATCH 6/6] Misc changes, fixes and improvements after review. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Renamed all 'frequency' to 'interval' - Split up NodeJoinAndUpSpec and into NodeJoinSpec and NodeUpSpec. - Split up MembershipChangeListenerJoinAndUpSpec and into MembershipChangeListenerJoinSpec and MembershipChangeListenerUpSpec. - Added utility method 'startClusterNode()' - Fixed race in register listener and telling node to leave - Removed 'after' blocks - Cleaned up unused code - Improved comments Signed-off-by: Jonas Bonér --- .../src/main/resources/reference.conf | 6 +- .../src/main/scala/akka/cluster/Cluster.scala | 12 +-- .../scala/akka/cluster/ClusterSettings.scala | 6 +- ...ientDowningNodeThatIsUnreachableSpec.scala | 6 +- .../ClientDowningNodeThatIsUpSpec.scala | 6 +- .../GossipingAccrualFailureDetectorSpec.scala | 2 +- .../akka/cluster/JoinTwoClustersSpec.scala | 7 +- ...aderDowningNodeThatIsUnreachableSpec.scala | 6 +- .../akka/cluster/LeaderElectionSpec.scala | 7 +- .../MembershipChangeListenerExitingSpec.scala | 28 ++++--- ...=> MembershipChangeListenerJoinSpec.scala} | 44 ++++------- .../MembershipChangeListenerLeavingSpec.scala | 28 ++++--- .../MembershipChangeListenerUpSpec.scala | 64 ++++++++++++++++ .../akka/cluster/MultiNodeClusterSpec.scala | 18 +++-- .../akka/cluster/NodeJoinAndUpSpec.scala | 76 ------------------- .../scala/akka/cluster/NodeJoinSpec.scala | 57 ++++++++++++++ ...LeavingAndExitingAndBeingRemovedSpec.scala | 2 +- .../cluster/NodeLeavingAndExitingSpec.scala | 14 ++-- .../scala/akka/cluster/NodeLeavingSpec.scala | 6 +- .../akka/cluster/NodeMembershipSpec.scala | 8 +- .../scala/akka/cluster/NodeShutdownSpec.scala | 4 +- .../scala/akka/cluster/NodeUpSpec.scala | 50 ++++++++++++ .../akka/cluster/ClusterConfigSpec.scala | 6 +- 23 files changed, 289 insertions(+), 174 deletions(-) rename akka-cluster/src/multi-jvm/scala/akka/cluster/{MembershipChangeListenerJoinAndUpSpec.scala => MembershipChangeListenerJoinSpec.scala} (52%) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 7dd511e34a..8c905d5b29 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -25,13 +25,13 @@ akka { periodic-tasks-initial-delay = 1s # how often should the node send out gossip information? - gossip-frequency = 1s + gossip-interval = 1s # how often should the leader perform maintenance tasks? - leader-actions-frequency = 1s + leader-actions-interval = 1s # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? - unreachable-nodes-reaper-frequency = 1s + unreachable-nodes-reaper-interval = 1s # accrual failure detection config failure-detector { diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c5ad773989..8beb7f4164 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -380,9 +380,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private val vclockNode = VectorClock.Node(selfAddress.toString) private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay - private val gossipFrequency = clusterSettings.GossipFrequency - private val leaderActionsFrequency = clusterSettings.LeaderActionsFrequency - private val unreachableNodesReaperFrequency = clusterSettings.UnreachableNodesReaperFrequency + private val gossipInterval = clusterSettings.GossipInterval + private val leaderActionsInterval = clusterSettings.LeaderActionsInterval + private val unreachableNodesReaperInterval = clusterSettings.UnreachableNodesReaperInterval implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) @@ -424,17 +424,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // ======================================================== // start periodic gossip to random nodes in cluster - private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipFrequency) { + private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipInterval) { gossip() } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperFrequency) { + private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperInterval) { reapUnreachableMembers() } // start periodic leader action management (only applies for the current leader) - private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsFrequency) { + private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsInterval) { leaderActions() } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 8e9b9c770d..0e7dac06ab 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -20,9 +20,9 @@ class ClusterSettings(val config: Config, val systemName: String) { case AddressFromURIString(addr) ⇒ Some(addr) } val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) - val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip-frequency"), MILLISECONDS) - val LeaderActionsFrequency = Duration(getMilliseconds("akka.cluster.leader-actions-frequency"), MILLISECONDS) - val UnreachableNodesReaperFrequency = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-frequency"), MILLISECONDS) + val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) + val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) + val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") val AutoDown = getBoolean("akka.cluster.auto-down") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 6ab4d1a39e..ba34c9b0be 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -26,8 +26,8 @@ class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeT class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender with BeforeAndAfter { + with MultiNodeClusterSpec { + import ClientDowningNodeThatIsUnreachableMultiJvmSpec._ override def initialParticipants = 4 @@ -36,7 +36,7 @@ class ClientDowningNodeThatIsUnreachableSpec "be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() awaitUpConvergence(numberOfMembers = 4) val thirdAddress = node(third).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 6b0bbae22e..ac1d68c8af 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -26,8 +26,8 @@ class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSp class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender with BeforeAndAfter { + with MultiNodeClusterSpec { + import ClientDowningNodeThatIsUpMultiJvmSpec._ override def initialParticipants = 4 @@ -36,7 +36,7 @@ class ClientDowningNodeThatIsUpSpec "be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() awaitUpConvergence(numberOfMembers = 4) val thirdAddress = node(third).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index 9d388622db..cec99e9af9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -43,7 +43,7 @@ abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(Gossipi "receive gossip heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 9f1395b5dd..7b7263bbe0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -27,7 +27,10 @@ class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec -abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender { +abstract class JoinTwoClustersSpec + extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) + with MultiNodeClusterSpec { + import JoinTwoClustersMultiJvmSpec._ override def initialParticipants = 6 @@ -41,7 +44,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(a1, b1, c1) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 63665d3c57..7b2536d9d2 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -34,8 +34,8 @@ class LeaderDowningNodeThatIsUnreachableMultiJvmNode4 extends LeaderDowningNodeT class LeaderDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender with BeforeAndAfter { + with MultiNodeClusterSpec { + import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._ override def initialParticipants = 4 @@ -44,7 +44,7 @@ class LeaderDowningNodeThatIsUnreachableSpec "be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() awaitUpConvergence(numberOfMembers = 4) val fourthAddress = node(fourth).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index ba0471bedb..bf60b6b4ac 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -26,7 +26,10 @@ class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec -abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec { +abstract class LeaderElectionSpec + extends MultiNodeSpec(LeaderElectionMultiJvmSpec) + with MultiNodeClusterSpec { + import LeaderElectionMultiJvmSpec._ override def initialParticipants = 5 @@ -41,7 +44,7 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp "be able to 'elect' a single leader" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 0145628bd5..8932eed6ee 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -20,8 +20,8 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" akka.cluster { - leader-actions-frequency = 5000 ms # increase the leader action task frequency - unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set + leader-actions-interval = 5 s # increase the leader action task interval + unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) @@ -31,8 +31,10 @@ class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListe class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec -abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) - with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { +abstract class MembershipChangeListenerExitingSpec + extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) + with MultiNodeClusterSpec { + import MembershipChangeListenerExitingMultiJvmSpec._ override def initialParticipants = 3 @@ -45,7 +47,7 @@ abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(Members "be notified when new node is EXITING" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -55,21 +57,27 @@ abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(Members awaitUpConvergence(numberOfMembers = 3) testConductor.enter("rest-started") + runOn(first) { + testConductor.enter("registered-listener") + cluster.leave(secondAddress) + } + + runOn(second) { + testConductor.enter("registered-listener") + } + runOn(third) { val exitingLatch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.exists(_.status == MemberStatus.Exiting)) + if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Exiting)) exitingLatch.countDown() } }) + testConductor.enter("registered-listener") exitingLatch.await } - runOn(first) { - cluster.leave(secondAddress) - } - testConductor.enter("finished") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala similarity index 52% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 81e32d1491..2f82e12506 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -11,7 +11,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.util.duration._ -object MembershipChangeListenerJoinAndUpMultiJvmSpec extends MultiNodeConfig { +object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") @@ -19,46 +19,39 @@ object MembershipChangeListenerJoinAndUpMultiJvmSpec extends MultiNodeConfig { debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" akka.cluster { - gossip-frequency = 1000 ms - leader-actions-frequency = 5000 ms # increase the leader action task frequency + leader-actions-interval = 5 s # increase the leader action task interval to allow time checking for JOIN before leader moves it to UP } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class MembershipChangeListenerJoinAndUpMultiJvmNode1 extends MembershipChangeListenerJoinAndUpSpec -class MembershipChangeListenerJoinAndUpMultiJvmNode2 extends MembershipChangeListenerJoinAndUpSpec +class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec +class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec -abstract class MembershipChangeListenerJoinAndUpSpec - extends MultiNodeSpec(MembershipChangeListenerJoinAndUpMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender - with BeforeAndAfter { +abstract class MembershipChangeListenerJoinSpec + extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) + with MultiNodeClusterSpec { - import MembershipChangeListenerJoinAndUpMultiJvmSpec._ + import MembershipChangeListenerJoinMultiJvmSpec._ override def initialParticipants = 2 - after { - testConductor.enter("after") - } - lazy val firstAddress = node(first).address lazy val secondAddress = node(second).address "A registered MembershipChangeListener" must { - "be notified when new node is JOINING and node is marked as UP by the leader" taggedAs LongRunningTest in { + "be notified when new node is JOINING" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } runOn(second) { + testConductor.enter("registered-listener") cluster.join(firstAddress) } runOn(first) { - // JOINING val joinLatch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { @@ -66,20 +59,13 @@ abstract class MembershipChangeListenerJoinAndUpSpec joinLatch.countDown() } }) + testConductor.enter("registered-listener") + joinLatch.await cluster.convergence.isDefined must be(true) - - // UP - val upLatch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) - upLatch.countDown() - } - }) - upLatch.await - awaitCond(cluster.convergence.isDefined) } + + testConductor.enter("after") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index f8b083c4d8..089f241849 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -18,8 +18,8 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster.leader-actions-frequency = 5000 ms - akka.cluster.unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set + akka.cluster.leader-actions-interval = 5 s + akka.cluster.unreachable-nodes-reaper-interval = 30 s """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -28,8 +28,10 @@ class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListe class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec -abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) - with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { +abstract class MembershipChangeListenerLeavingSpec + extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) + with MultiNodeClusterSpec { + import MembershipChangeListenerLeavingMultiJvmSpec._ override def initialParticipants = 3 @@ -42,7 +44,7 @@ abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(Members "be notified when new node is LEAVING" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -52,21 +54,27 @@ abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(Members awaitUpConvergence(numberOfMembers = 3) testConductor.enter("rest-started") + runOn(first) { + testConductor.enter("registered-listener") + cluster.leave(secondAddress) + } + + runOn(second) { + testConductor.enter("registered-listener") + } + runOn(third) { val latch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.exists(_.status == MemberStatus.Leaving)) + if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Leaving)) latch.countDown() } }) + testConductor.enter("registered-listener") latch.await } - runOn(first) { - cluster.leave(secondAddress) - } - testConductor.enter("finished") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala new file mode 100644 index 0000000000..3df6b876f9 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec + +abstract class MembershipChangeListenerUpSpec + extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec) + with MultiNodeClusterSpec { + + import MembershipChangeListenerUpMultiJvmSpec._ + + override def initialParticipants = 2 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A registered MembershipChangeListener" must { + "be notified when new node is marked as UP by the leader" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + runOn(second) { + testConductor.enter("registered-listener") + cluster.join(firstAddress) + } + + runOn(first) { + val upLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) + upLatch.countDown() + } + }) + testConductor.enter("registered-listener") + + upLatch.await + awaitUpConvergence(numberOfMembers = 2) + } + + testConductor.enter("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 4d0c7f4720..dd57b4b13f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -15,11 +15,11 @@ import akka.util.Duration object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { - auto-down = off - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - unreachable-nodes-reaper-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms + auto-down = off + gossip-interval = 200 ms + leader-actions-interval = 200 ms + unreachable-nodes-reaper-interval = 200 ms + periodic-tasks-initial-delay = 300 ms } akka.test { single-expect-default = 5 s @@ -29,8 +29,16 @@ object MultiNodeClusterSpec { trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ + /** + * Create a cluster node using 'Cluster(system)'. + */ def cluster: Cluster = Cluster(system) + /** + * Use this method instead of 'cluster.self'. + */ + def startClusterNode(): Unit = cluster.self + /** * Assert that the member addresses match the expected addresses in the * sort order used by the cluster. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala deleted file mode 100644 index 5415df1b4a..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import akka.util.duration._ - -object NodeJoinAndUpMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - - commonConfig( - debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 1000 ms - leader-actions-frequency = 5000 ms # increase the leader action task frequency - } - """) - .withFallback(MultiNodeClusterSpec.clusterConfig))) -} - -class NodeJoinAndUpMultiJvmNode1 extends NodeJoinAndUpSpec -class NodeJoinAndUpMultiJvmNode2 extends NodeJoinAndUpSpec - -abstract class NodeJoinAndUpSpec - extends MultiNodeSpec(NodeJoinAndUpMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender - with BeforeAndAfter { - - import NodeJoinAndUpMultiJvmSpec._ - - override def initialParticipants = 2 - - after { - testConductor.enter("after") - } - - lazy val firstAddress = node(first).address - lazy val secondAddress = node(second).address - - "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { - - "be a singleton cluster when started up" taggedAs LongRunningTest in { - runOn(first) { - awaitCond(cluster.isSingletonCluster) - awaitUpConvergence(numberOfMembers = 1) - cluster.isLeader must be(true) - } - } - } - - "A second cluster node" must { - "join the cluster as JOINING - when sending a 'Join' command - and then be moved to UP by the leader" taggedAs LongRunningTest in { - - runOn(second) { - cluster.join(firstAddress) - } - - awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining }) - - awaitCond( - cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Up }, - 30.seconds.dilated) // waiting for the leader to move from JOINING -> UP (frequency set to 5 sec in config) - - cluster.latestGossip.members.size must be(2) - awaitCond(cluster.convergence.isDefined) - } - } -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala new file mode 100644 index 0000000000..99116ecb25 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeJoinMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + leader-actions-interval = 5 s # increase the leader action task interval + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class NodeJoinMultiJvmNode1 extends NodeJoinSpec +class NodeJoinMultiJvmNode2 extends NodeJoinSpec + +abstract class NodeJoinSpec + extends MultiNodeSpec(NodeJoinMultiJvmSpec) + with MultiNodeClusterSpec { + + import NodeJoinMultiJvmSpec._ + + override def initialParticipants = 2 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A cluster node" must { + "join another cluster and get status JOINING - when sending a 'Join' command" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + runOn(second) { + cluster.join(firstAddress) + } + + awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining }) + + testConductor.enter("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index 7c1037a624..da500323aa 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -40,7 +40,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(No "be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 3fe9e220f6..189cb4c9c6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -20,8 +20,8 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" akka.cluster { - leader-actions-frequency = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state - unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set + leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state + unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) @@ -31,8 +31,10 @@ class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec -abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) - with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { +abstract class NodeLeavingAndExitingSpec + extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) + with MultiNodeClusterSpec { + import NodeLeavingAndExitingMultiJvmSpec._ override def initialParticipants = 3 @@ -46,7 +48,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi "be moved to EXITING by the leader" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -64,7 +66,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi runOn(first, third) { // 1. Verify that 'second' node is set to LEAVING - // We have set the 'leader-actions-frequency' to 5 seconds to make sure that we get a + // We have set the 'leader-actions-interval' to 5 seconds to make sure that we get a // chance to test the LEAVING state before the leader moves the node to EXITING awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) // wait on LEAVING val hasLeft = cluster.latestGossip.members.find(_.status == MemberStatus.Leaving) // verify node that left diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala index 300afdea20..ad445b4c42 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala @@ -18,8 +18,8 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster.leader-actions-frequency = 5 s - akka.cluster.unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set + akka.cluster.leader-actions-interval = 5 s + akka.cluster.unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -43,7 +43,7 @@ abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec) "be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index fecb53c898..369dcf56ad 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -22,7 +22,11 @@ class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec -abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { +abstract class NodeMembershipSpec + extends MultiNodeSpec(NodeMembershipMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with BeforeAndAfter { + import NodeMembershipMultiJvmSpec._ override def initialParticipants = 3 @@ -41,7 +45,7 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala index c0c12f4582..a9a5ee3233 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala @@ -42,7 +42,7 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) "not be singleton cluster when joined" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -63,8 +63,6 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) cluster.isSingletonCluster must be(true) assertLeader(first) } - } } - } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala new file mode 100644 index 0000000000..7931ce48f1 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class NodeUpMultiJvmNode1 extends NodeUpSpec +class NodeUpMultiJvmNode2 extends NodeUpSpec + +abstract class NodeUpSpec + extends MultiNodeSpec(NodeUpMultiJvmSpec) + with MultiNodeClusterSpec { + + import NodeUpMultiJvmSpec._ + + override def initialParticipants = 2 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A cluster node that is joining another cluster" must { + "be moved to UP by the leader after a convergence" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + runOn(second) { + cluster.join(firstAddress) + } + + awaitUpConvergence(numberOfMembers = 2) + + testConductor.enter("after") + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 45b0a35521..6b2ff1962c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -20,9 +20,9 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorMaxSampleSize must be(1000) NodeToJoin must be(None) PeriodicTasksInitialDelay must be(1 seconds) - GossipFrequency must be(1 second) - LeaderActionsFrequency must be(1 second) - UnreachableNodesReaperFrequency must be(1 second) + GossipInterval must be(1 second) + LeaderActionsInterval must be(1 second) + UnreachableNodesReaperInterval must be(1 second) NrOfGossipDaemons must be(4) NrOfDeputyNodes must be(3) AutoDown must be(true)