From ea090bdfd2686dad41550420b069f42c4c254a26 Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 24 May 2012 13:56:50 +0200 Subject: [PATCH 01/13] get better auto-generated actor system names in tests, see #2122 --- .../src/test/scala/akka/cluster/ClusterSpec.scala | 4 ++-- .../testconductor/NetworkFailureInjector.scala | 2 +- .../scala/akka/remote/testkit/MultiNodeSpec.scala | 2 +- .../src/test/scala/akka/testkit/AkkaSpec.scala | 12 ++++++++---- 4 files changed, 12 insertions(+), 8 deletions(-) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 38017ad00c..854d9e5584 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -47,11 +47,11 @@ abstract class ClusterSpec(_system: ActorSystem) extends AkkaSpec(_system) { def portPrefix: Int - def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName, config.withFallback(ClusterSpec.testConf))) + def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName(classOf[ClusterSpec]), config.withFallback(ClusterSpec.testConf))) def this(s: String) = this(ConfigFactory.parseString(s)) - def this() = this(ActorSystem(AkkaSpec.getCallerName, ClusterSpec.testConf)) + def this() = this(ActorSystem(AkkaSpec.getCallerName(classOf[ClusterSpec]), ClusterSpec.testConf)) def awaitConvergence(nodes: Iterable[Cluster], maxWaitTime: Duration = 60 seconds) { val deadline = maxWaitTime.fromNow diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala index ba8f8d1285..b425518044 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/NetworkFailureInjector.scala @@ -135,7 +135,7 @@ private[akka] object NetworkFailureInjector { } /** - * Brief overview: all network traffic passes through the `sender`/`receiver` FSMs managed + * Brief overview: all network traffic passes through the `sender`/`receiver` FSMs managed * by the FailureInjector of the TestConductor extension. These can * pass through requests immediately, drop them or throttle to a desired rate. The FSMs are * registered in the TestConductorExt.failureInjector so that settings can be applied from diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala index 3822a1f529..e6a1ca6dac 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -119,7 +119,7 @@ abstract class MultiNodeSpec(val mySelf: RoleName, _system: ActorSystem) extends import MultiNodeSpec._ - def this(config: MultiNodeConfig) = this(config.mySelf, ActorSystem(AkkaSpec.getCallerName, config.config)) + def this(config: MultiNodeConfig) = this(config.mySelf, ActorSystem(AkkaSpec.getCallerName(classOf[MultiNodeSpec]), config.config)) /* * Test Class Interface diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index f24ea49b8c..c7000f2cf7 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -46,9 +46,13 @@ object AkkaSpec { ConfigFactory.parseMap(map.asJava) } - def getCallerName: String = { + def getCallerName(clazz: Class[_]): String = { val s = Thread.currentThread.getStackTrace map (_.getClassName) drop 1 dropWhile (_ matches ".*AkkaSpec.?$") - s.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_") + val reduced = s.lastIndexWhere(_ == clazz.getName) match { + case -1 ⇒ s + case z ⇒ s drop (z + 1) + } + reduced.head.replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_") } } @@ -56,13 +60,13 @@ object AkkaSpec { abstract class AkkaSpec(_system: ActorSystem) extends TestKit(_system) with WordSpec with MustMatchers with BeforeAndAfterAll { - def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName, config.withFallback(AkkaSpec.testConf))) + def this(config: Config) = this(ActorSystem(AkkaSpec.getCallerName(getClass), config.withFallback(AkkaSpec.testConf))) def this(s: String) = this(ConfigFactory.parseString(s)) def this(configMap: Map[String, _]) = this(AkkaSpec.mapToConfig(configMap)) - def this() = this(ActorSystem(AkkaSpec.getCallerName, AkkaSpec.testConf)) + def this() = this(ActorSystem(AkkaSpec.getCallerName(getClass), AkkaSpec.testConf)) val log: LoggingAdapter = Logging(system, this.getClass) From e0fbf2f3246bf25d859557605b2366dc7a13de1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Fri, 1 Jun 2012 18:06:43 +0200 Subject: [PATCH 02/13] Renamed the NodeStartupSpec to NodeJoinAndUpSpec and added tests for both JOINING and UP. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../akka/cluster/NodeJoinAndUpSpec.scala | 76 +++++++++++++++++++ .../scala/akka/cluster/NodeStartupSpec.scala | 63 --------------- 2 files changed, 76 insertions(+), 63 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala new file mode 100644 index 0000000000..5415df1b4a --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeJoinAndUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + gossip-frequency = 1000 ms + leader-actions-frequency = 5000 ms # increase the leader action task frequency + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class NodeJoinAndUpMultiJvmNode1 extends NodeJoinAndUpSpec +class NodeJoinAndUpMultiJvmNode2 extends NodeJoinAndUpSpec + +abstract class NodeJoinAndUpSpec + extends MultiNodeSpec(NodeJoinAndUpMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender + with BeforeAndAfter { + + import NodeJoinAndUpMultiJvmSpec._ + + override def initialParticipants = 2 + + after { + testConductor.enter("after") + } + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { + + "be a singleton cluster when started up" taggedAs LongRunningTest in { + runOn(first) { + awaitCond(cluster.isSingletonCluster) + awaitUpConvergence(numberOfMembers = 1) + cluster.isLeader must be(true) + } + } + } + + "A second cluster node" must { + "join the cluster as JOINING - when sending a 'Join' command - and then be moved to UP by the leader" taggedAs LongRunningTest in { + + runOn(second) { + cluster.join(firstAddress) + } + + awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining }) + + awaitCond( + cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Up }, + 30.seconds.dilated) // waiting for the leader to move from JOINING -> UP (frequency set to 5 sec in config) + + cluster.latestGossip.members.size must be(2) + awaitCond(cluster.convergence.isDefined) + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala deleted file mode 100644 index 44682b81f7..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -object NodeStartupMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - -} - -class NodeStartupMultiJvmNode1 extends NodeStartupSpec -class NodeStartupMultiJvmNode2 extends NodeStartupSpec - -abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { - import NodeStartupMultiJvmSpec._ - - override def initialParticipants = 2 - - after { - testConductor.enter("after") - } - - lazy val firstAddress = node(first).address - lazy val secondAddress = node(second).address - - "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { - - "be a singleton cluster when started up" taggedAs LongRunningTest in { - runOn(first) { - awaitCond(cluster.isSingletonCluster) - awaitUpConvergence(numberOfMembers = 1) - cluster.isLeader must be(true) - } - } - } - - "A second cluster node" must { - "join the other node cluster when sending a Join command" taggedAs LongRunningTest in { - - runOn(second) { - cluster.join(firstAddress) - } - - awaitCond { - cluster.latestGossip.members.exists { member ⇒ - member.address == secondAddress && member.status == MemberStatus.Up - } - } - cluster.latestGossip.members.size must be(2) - awaitCond(cluster.convergence.isDefined) - } - } - -} From d1fb1b925259bf52024be294593bfcc75f55feb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 2 Jun 2012 17:36:10 +0200 Subject: [PATCH 03/13] Changed name of test files to end with *Spec. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../MembershipChangeListenerSpec.scala | 83 ------------------- ...eavingAndExitingAndBeingRemovedSpec.scala} | 0 ....scala => NodeLeavingAndExitingSpec.scala} | 0 ...odeLeaving.scala => NodeLeavingSpec.scala} | 3 +- 4 files changed, 2 insertions(+), 84 deletions(-) delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala rename akka-cluster/src/multi-jvm/scala/akka/cluster/{NodeLeavingAndExitingAndBeingRemoved.scala => NodeLeavingAndExitingAndBeingRemovedSpec.scala} (100%) rename akka-cluster/src/multi-jvm/scala/akka/cluster/{NodeLeavingAndExiting.scala => NodeLeavingAndExitingSpec.scala} (100%) rename akka-cluster/src/multi-jvm/scala/akka/cluster/{NodeLeaving.scala => NodeLeavingSpec.scala} (94%) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala deleted file mode 100644 index f818c97744..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import scala.collection.immutable.SortedSet -import org.scalatest.BeforeAndAfter -import com.typesafe.config.ConfigFactory -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ - -object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - val third = role("third") - - commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) - -} - -class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec -class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec -class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec - -abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) - with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { - import MembershipChangeListenerMultiJvmSpec._ - - override def initialParticipants = 3 - - after { - testConductor.enter("after") - } - - lazy val firstAddress = node(first).address - lazy val secondAddress = node(second).address - - "A set of connected cluster systems" must { - - "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - - // make sure that the node-to-join is started before other join - runOn(first) { - cluster.self - } - testConductor.enter("first-started") - - runOn(first, second) { - cluster.join(firstAddress) - val latch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) - latch.countDown() - } - }) - latch.await - cluster.convergence.isDefined must be(true) - } - - } - - "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - - runOn(third) { - cluster.join(firstAddress) - } - - val latch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) - latch.countDown() - } - }) - latch.await - cluster.convergence.isDefined must be(true) - - } - } - -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemoved.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala similarity index 100% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemoved.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExiting.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala similarity index 100% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExiting.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala similarity index 94% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala index a6ddccb806..c4cf3fc12c 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeaving.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala @@ -18,7 +18,8 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster.unreachable-nodes-reaper-frequency = 30000 # turn "off" reaping to unreachable node set + akka.cluster.leader-actions-frequency = 5000 ms + akka.cluster.unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) } From bcc6e4c11f4fe9fc98d9c8a6ab2893c15552aafd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 2 Jun 2012 17:37:13 +0200 Subject: [PATCH 04/13] Added test for testing that MemberChangeListener is triggered by node EXITING event. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../MembershipChangeListenerExitingSpec.scala | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala new file mode 100644 index 0000000000..0145628bd5 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + leader-actions-frequency = 5000 ms # increase the leader action task frequency + unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec +class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec + +abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import MembershipChangeListenerExitingMultiJvmSpec._ + + override def initialParticipants = 3 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + + "A registered MembershipChangeListener" must { + "be notified when new node is EXITING" taggedAs LongRunningTest in { + + runOn(first) { + cluster.self + } + testConductor.enter("first-started") + + runOn(second, third) { + cluster.join(firstAddress) + } + awaitUpConvergence(numberOfMembers = 3) + testConductor.enter("rest-started") + + runOn(third) { + val exitingLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.exists(_.status == MemberStatus.Exiting)) + exitingLatch.countDown() + } + }) + exitingLatch.await + } + + runOn(first) { + cluster.leave(secondAddress) + } + + testConductor.enter("finished") + } + } +} From 5dc039b0f1b9479e674d843fa8c28a443bbc85eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 2 Jun 2012 17:37:29 +0200 Subject: [PATCH 05/13] Added test for testing that MemberChangeListener is triggered by node JOINING and UP events. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- ...embershipChangeListenerJoinAndUpSpec.scala | 85 +++++++++++++++++++ 1 file changed, 85 insertions(+) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala new file mode 100644 index 0000000000..81e32d1491 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object MembershipChangeListenerJoinAndUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + gossip-frequency = 1000 ms + leader-actions-frequency = 5000 ms # increase the leader action task frequency + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class MembershipChangeListenerJoinAndUpMultiJvmNode1 extends MembershipChangeListenerJoinAndUpSpec +class MembershipChangeListenerJoinAndUpMultiJvmNode2 extends MembershipChangeListenerJoinAndUpSpec + +abstract class MembershipChangeListenerJoinAndUpSpec + extends MultiNodeSpec(MembershipChangeListenerJoinAndUpMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender + with BeforeAndAfter { + + import MembershipChangeListenerJoinAndUpMultiJvmSpec._ + + override def initialParticipants = 2 + + after { + testConductor.enter("after") + } + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A registered MembershipChangeListener" must { + "be notified when new node is JOINING and node is marked as UP by the leader" taggedAs LongRunningTest in { + + runOn(first) { + cluster.self + } + + runOn(second) { + cluster.join(firstAddress) + } + + runOn(first) { + // JOINING + val joinLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.exists(_.status == MemberStatus.Joining)) // second node is not part of node ring anymore + joinLatch.countDown() + } + }) + joinLatch.await + cluster.convergence.isDefined must be(true) + + // UP + val upLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) + upLatch.countDown() + } + }) + upLatch.await + awaitCond(cluster.convergence.isDefined) + } + } + } +} From ead5bf8695c26b207066ec32b5ea1e4fbc5a1b42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 2 Jun 2012 17:37:41 +0200 Subject: [PATCH 06/13] Added test for testing that MemberChangeListener is triggered by node LEAVING event. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../MembershipChangeListenerLeavingSpec.scala | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala new file mode 100644 index 0000000000..f8b083c4d8 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ + +object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster.leader-actions-frequency = 5000 ms + akka.cluster.unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set + """)) + .withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec +class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec + +abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { + import MembershipChangeListenerLeavingMultiJvmSpec._ + + override def initialParticipants = 3 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + lazy val thirdAddress = node(third).address + + "A registered MembershipChangeListener" must { + "be notified when new node is LEAVING" taggedAs LongRunningTest in { + + runOn(first) { + cluster.self + } + testConductor.enter("first-started") + + runOn(second, third) { + cluster.join(firstAddress) + } + awaitUpConvergence(numberOfMembers = 3) + testConductor.enter("rest-started") + + runOn(third) { + val latch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.exists(_.status == MemberStatus.Leaving)) + latch.countDown() + } + }) + latch.await + } + + runOn(first) { + cluster.leave(secondAddress) + } + + testConductor.enter("finished") + } + } +} From de59444795e257fb9a310e202204b2c8159168ac Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 4 Jun 2012 10:03:41 +0200 Subject: [PATCH 07/13] offer TestKitBase trait, see #2174 --- .../code/docs/testkit/TestkitDocSpec.scala | 20 +++++++++++++++++++ akka-docs/scala/testing.rst | 14 +++++++++++++ .../src/main/scala/akka/testkit/TestKit.scala | 6 ++++-- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala b/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala index ddb3eeaf1d..96c7857990 100644 --- a/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala +++ b/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala @@ -14,6 +14,8 @@ import akka.dispatch.Futures import akka.testkit.AkkaSpec import akka.testkit.DefaultTimeout import akka.testkit.ImplicitSender +import akka.util.NonFatal + object TestkitDocSpec { case object Say42 case object Unknown @@ -251,5 +253,23 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { } //#event-filter } + + "demonstrate TestKitBase" in { + //#test-kit-base + import akka.testkit.TestKitBase + + class MyTest extends TestKitBase { + implicit lazy val system = ActorSystem() + + //#put-your-test-code-here + val probe = TestProbe() + probe.send(testActor, "hello") + try expectMsg("hello") catch { case NonFatal(e) => system.shutdown(); throw e } + //#put-your-test-code-here + + system.shutdown() + } + //#test-kit-base + } } diff --git a/akka-docs/scala/testing.rst b/akka-docs/scala/testing.rst index a98ee14917..d2875ed62a 100644 --- a/akka-docs/scala/testing.rst +++ b/akka-docs/scala/testing.rst @@ -671,6 +671,20 @@ This section contains a collection of known gotchas with some other frameworks, which is by no means exhaustive and does not imply endorsement or special support. +When you need it to be a trait +------------------------------ + +If for some reason it is a problem to inherit from :class:`TestKit` due to it +being a concrete class instead of a trait, there’s :class:`TestKitBase`: + +.. includecode:: code/docs/testkit/TestkitDocSpec.scala + :include: test-kit-base + :exclude: put-your-test-code-here + +The ``implicit lazy val system`` must be declared exactly like that (you can of +course pass arguments to the actor system factory as needed) because trait +:class:`TestKitBase` needs the system during its construction. + Specs2 ------ diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 156a9d8612..6d8f73e7b8 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -101,11 +101,11 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor { * @author Roland Kuhn * @since 1.1 */ -class TestKit(_system: ActorSystem) { +trait TestKitBase { import TestActor.{ Message, RealMessage, NullMessage } - implicit val system = _system + implicit val system: ActorSystem val testKitSettings = TestKitExtension(system) private val queue = new LinkedBlockingDeque[Message]() @@ -579,6 +579,8 @@ class TestKit(_system: ActorSystem) { private def format(u: TimeUnit, d: Duration) = "%.3f %s".format(d.toUnit(u), u.toString.toLowerCase) } +class TestKit(_system: ActorSystem) extends { implicit val system = _system } with TestKitBase + object TestKit { private[testkit] val testActorId = new AtomicInteger(0) From df479a0bf09c0cda9b646a341f9903674cd23f7a Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 4 Jun 2012 11:29:56 +0200 Subject: [PATCH 08/13] add back TestProbe.reply, see #2172 --- .../scala/code/docs/testkit/TestkitDocSpec.scala | 12 ++++++------ .../src/main/scala/akka/testkit/TestKit.scala | 13 +++++++------ 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala b/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala index 96c7857990..564b7929ce 100644 --- a/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala +++ b/akka-docs/scala/code/docs/testkit/TestkitDocSpec.scala @@ -210,7 +210,7 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { val probe = TestProbe() val future = probe.ref ? "hello" probe.expectMsg(0 millis, "hello") // TestActor runs on CallingThreadDispatcher - probe.sender ! "world" + probe.reply("world") assert(future.isCompleted && future.value == Some(Right("world"))) //#test-probe-reply } @@ -253,20 +253,20 @@ class TestkitDocSpec extends AkkaSpec with DefaultTimeout with ImplicitSender { } //#event-filter } - + "demonstrate TestKitBase" in { //#test-kit-base import akka.testkit.TestKitBase - + class MyTest extends TestKitBase { implicit lazy val system = ActorSystem() - + //#put-your-test-code-here val probe = TestProbe() probe.send(testActor, "hello") - try expectMsg("hello") catch { case NonFatal(e) => system.shutdown(); throw e } + try expectMsg("hello") catch { case NonFatal(e) ⇒ system.shutdown(); throw e } //#put-your-test-code-here - + system.shutdown() } //#test-kit-base diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 6d8f73e7b8..9dfa40a5ee 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -642,22 +642,23 @@ class TestProbe(_application: ActorSystem) extends TestKit(_application) { * Replies will be available for inspection with all of TestKit's assertion * methods. */ - def send(actor: ActorRef, msg: AnyRef) = { - actor.!(msg)(testActor) - } + def send(actor: ActorRef, msg: Any): Unit = actor.!(msg)(testActor) /** * Forward this message as if in the TestActor's receive method with self.forward. */ - def forward(actor: ActorRef, msg: AnyRef = lastMessage.msg) { - actor.!(msg)(lastMessage.sender) - } + def forward(actor: ActorRef, msg: Any = lastMessage.msg): Unit = actor.!(msg)(lastMessage.sender) /** * Get sender of last received message. */ def sender = lastMessage.sender + /** + * Send message to the sender of the last dequeued message. + */ + def reply(msg: Any): Unit = sender.!(msg)(ref) + } object TestProbe { From 54febffb283129cf84a1de3dffba5b36691f24a0 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 4 Jun 2012 17:07:44 +0200 Subject: [PATCH 09/13] #2093 - Adding support for setting the sender when using TestActorRef.receive --- .../src/main/scala/akka/testkit/TestActorRef.scala | 12 +++++++++++- .../test/scala/akka/testkit/TestActorRefSpec.scala | 9 ++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 0a5d6163e8..279c728e80 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -56,7 +56,17 @@ class TestActorRef[T <: Actor]( * thrown will be available to you, while still being able to use * become/unbecome. */ - def receive(o: Any): Unit = underlying.receiveMessage(o) + def receive(o: Any): Unit = receive(o, underlying.system.deadLetters) + + /** + * Directly inject messages into actor receive behavior. Any exceptions + * thrown will be available to you, while still being able to use + * become/unbecome. + */ + def receive(o: Any, sender: ActorRef): Unit = try { + underlying.currentMessage = Envelope(o, if (sender eq null) underlying.system.deadLetters else sender)(underlying.system) + underlying.receiveMessage(o) + } finally underlying.currentMessage = null /** * Retrieve reference to the underlying actor, where the static type matches the factory used inside the diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 7c977884fc..492c44408c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -246,11 +246,18 @@ class TestActorRefSpec extends AkkaSpec("disp1.type=Dispatcher") with BeforeAndA a.underlying.dispatcher.getClass must be(classOf[Dispatcher]) } - "proxy receive for the underlying actor" in { + "proxy receive for the underlying actor without sender" in { val ref = TestActorRef[WorkerActor] ref.receive("work") ref.isTerminated must be(true) } + "proxy receive for the underlying actor with sender" in { + val ref = TestActorRef[WorkerActor] + ref.receive("work", testActor) + ref.isTerminated must be(true) + expectMsg("workDone") + } + } } From b840624b7844ff4a8427a4e069b9cd8bdc3a5447 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 4 Jun 2012 19:28:58 +0200 Subject: [PATCH 10/13] warn against using TestKitBase trait --- akka-docs/scala/testing.rst | 5 ++ .../src/main/scala/akka/testkit/TestKit.scala | 90 +++++++++++-------- 2 files changed, 59 insertions(+), 36 deletions(-) diff --git a/akka-docs/scala/testing.rst b/akka-docs/scala/testing.rst index d2875ed62a..d19a1ab753 100644 --- a/akka-docs/scala/testing.rst +++ b/akka-docs/scala/testing.rst @@ -685,6 +685,11 @@ The ``implicit lazy val system`` must be declared exactly like that (you can course pass arguments to the actor system factory as needed) because trait :class:`TestKitBase` needs the system during its construction. +.. warning:: + + Use of the trait is discouraged because of potential issues with binary + backwards compatibility in the future, use at own risk. + Specs2 ------ diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 9dfa40a5ee..373f4c1fff 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -62,44 +62,22 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor { } /** - * Test kit for testing actors. Inheriting from this trait enables reception of - * replies from actors, which are queued by an internal actor and can be - * examined using the `expectMsg...` methods. Assertions and bounds concerning - * timing are available in the form of `within` blocks. + * Implementation trait behind the [[akka.testkit.TestKit]] class: you may use + * this if inheriting from a concrete class is not possible. * - *
- * class Test extends TestKit(ActorSystem()) {
- *     try {
- *
- *       val test = system.actorOf(Props[SomeActor]
- *
- *       within (1 second) {
- *         test ! SomeWork
- *         expectMsg(Result1) // bounded to 1 second
- *         expectMsg(Result2) // bounded to the remainder of the 1 second
- *       }
- *
- *     } finally {
- *       system.shutdown()
- *     }
+ * Use of the trait is discouraged because of potential issues with binary 
+ * backwards compatibility in the future, use at own risk.
+ * 
+ * This trait requires the concrete class mixing it in to provide an 
+ * [[akka.actor.ActorSystem]] which is available before this traits’s
+ * constructor is run. The recommended way is this:
+ * 
+ * {{{
+ * class MyTest extends TestKitBase {
+ *   implicit lazy val system = ActorSystem() // may add arguments here
+ *   ...
  * }
- * 
- * - * Beware of two points: - * - * - the ActorSystem passed into the constructor needs to be shutdown, - * otherwise thread pools and memory will be leaked - * - this trait is not thread-safe (only one actor with one queue, one stack - * of `within` blocks); it is expected that the code is executed from a - * constructor as shown above, which makes this a non-issue, otherwise take - * care not to run tests within a single test class instance in parallel. - * - * It should be noted that for CI servers and the like all maximum Durations - * are scaled using their Duration.dilated method, which uses the - * TestKitExtension.Settings.TestTimeFactor settable via akka.conf entry "akka.test.timefactor". - * - * @author Roland Kuhn - * @since 1.1 + * }}} */ trait TestKitBase { @@ -579,6 +557,46 @@ trait TestKitBase { private def format(u: TimeUnit, d: Duration) = "%.3f %s".format(d.toUnit(u), u.toString.toLowerCase) } +/** + * Test kit for testing actors. Inheriting from this trait enables reception of + * replies from actors, which are queued by an internal actor and can be + * examined using the `expectMsg...` methods. Assertions and bounds concerning + * timing are available in the form of `within` blocks. + * + *
+ * class Test extends TestKit(ActorSystem()) {
+ *     try {
+ *
+ *       val test = system.actorOf(Props[SomeActor]
+ *
+ *       within (1 second) {
+ *         test ! SomeWork
+ *         expectMsg(Result1) // bounded to 1 second
+ *         expectMsg(Result2) // bounded to the remainder of the 1 second
+ *       }
+ *
+ *     } finally {
+ *       system.shutdown()
+ *     }
+ * }
+ * 
+ * + * Beware of two points: + * + * - the ActorSystem passed into the constructor needs to be shutdown, + * otherwise thread pools and memory will be leaked + * - this trait is not thread-safe (only one actor with one queue, one stack + * of `within` blocks); it is expected that the code is executed from a + * constructor as shown above, which makes this a non-issue, otherwise take + * care not to run tests within a single test class instance in parallel. + * + * It should be noted that for CI servers and the like all maximum Durations + * are scaled using their Duration.dilated method, which uses the + * TestKitExtension.Settings.TestTimeFactor settable via akka.conf entry "akka.test.timefactor". + * + * @author Roland Kuhn + * @since 1.1 + */ class TestKit(_system: ActorSystem) extends { implicit val system = _system } with TestKitBase object TestKit { From b98fb0e37a132b2b2a29278f3d5ae47abf2919dd Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 4 Jun 2012 23:10:03 +0200 Subject: [PATCH 11/13] clarify deployment using anonymous factories --- akka-docs/java/remoting.rst | 8 ++++++++ akka-docs/scala/remoting.rst | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/akka-docs/java/remoting.rst b/akka-docs/java/remoting.rst index ae2ac9c246..910ec5fbb2 100644 --- a/akka-docs/java/remoting.rst +++ b/akka-docs/java/remoting.rst @@ -92,6 +92,14 @@ As you can see from the example above the following pattern is used to find an ` akka://@:/ +.. note:: + + In order to ensure serializability of ``Props`` when passing constructor + arguments to the actor being created, do not make the factory a non-static + inner class: this will inherently capture a reference to its enclosing + object, which in most cases is not serializable. It is best to make a static + inner class which implements :class:`UntypedActorFactory`. + Programmatic Remote Deployment ------------------------------ diff --git a/akka-docs/scala/remoting.rst b/akka-docs/scala/remoting.rst index 0f55ccdff4..0863d80b55 100644 --- a/akka-docs/scala/remoting.rst +++ b/akka-docs/scala/remoting.rst @@ -105,6 +105,14 @@ Once you have configured the properties above you would do the following in code ``SampleActor`` has to be available to the runtimes using it, i.e. the classloader of the actor systems has to have a JAR containing the class. +.. note:: + + In order to ensure serializability of ``Props`` when passing constructor + arguments to the actor being created, do not make the factory an inner class: + this will inherently capture a reference to its enclosing object, which in + most cases is not serializable. It is best to create a factory method in the + companion object of the actor’s class. + Programmatic Remote Deployment ------------------------------ From 391fed65941c29aa7d139011b0a97fb7c37f768e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 4 Jun 2012 23:21:28 +0200 Subject: [PATCH 12/13] Misc changes, fixes and improvements after review. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Renamed all 'frequency' to 'interval' - Split up NodeJoinAndUpSpec and into NodeJoinSpec and NodeUpSpec. - Split up MembershipChangeListenerJoinAndUpSpec and into MembershipChangeListenerJoinSpec and MembershipChangeListenerUpSpec. - Added utility method 'startClusterNode()' - Fixed race in register listener and telling node to leave - Removed 'after' blocks - Cleaned up unused code - Improved comments Signed-off-by: Jonas Bonér --- .../src/main/resources/reference.conf | 6 +- .../src/main/scala/akka/cluster/Cluster.scala | 12 +-- .../scala/akka/cluster/ClusterSettings.scala | 6 +- ...ientDowningNodeThatIsUnreachableSpec.scala | 6 +- .../ClientDowningNodeThatIsUpSpec.scala | 6 +- .../GossipingAccrualFailureDetectorSpec.scala | 2 +- .../akka/cluster/JoinTwoClustersSpec.scala | 7 +- ...aderDowningNodeThatIsUnreachableSpec.scala | 6 +- .../akka/cluster/LeaderElectionSpec.scala | 7 +- .../MembershipChangeListenerExitingSpec.scala | 28 ++++--- ...=> MembershipChangeListenerJoinSpec.scala} | 44 ++++------- .../MembershipChangeListenerLeavingSpec.scala | 28 ++++--- .../MembershipChangeListenerUpSpec.scala | 64 ++++++++++++++++ .../akka/cluster/MultiNodeClusterSpec.scala | 18 +++-- .../akka/cluster/NodeJoinAndUpSpec.scala | 76 ------------------- .../scala/akka/cluster/NodeJoinSpec.scala | 57 ++++++++++++++ ...LeavingAndExitingAndBeingRemovedSpec.scala | 2 +- .../cluster/NodeLeavingAndExitingSpec.scala | 14 ++-- .../scala/akka/cluster/NodeLeavingSpec.scala | 6 +- .../akka/cluster/NodeMembershipSpec.scala | 8 +- .../scala/akka/cluster/NodeShutdownSpec.scala | 4 +- .../scala/akka/cluster/NodeUpSpec.scala | 50 ++++++++++++ .../akka/cluster/ClusterConfigSpec.scala | 6 +- 23 files changed, 289 insertions(+), 174 deletions(-) rename akka-cluster/src/multi-jvm/scala/akka/cluster/{MembershipChangeListenerJoinAndUpSpec.scala => MembershipChangeListenerJoinSpec.scala} (52%) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index 7dd511e34a..8c905d5b29 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -25,13 +25,13 @@ akka { periodic-tasks-initial-delay = 1s # how often should the node send out gossip information? - gossip-frequency = 1s + gossip-interval = 1s # how often should the leader perform maintenance tasks? - leader-actions-frequency = 1s + leader-actions-interval = 1s # how often should the node move nodes, marked as unreachable by the failure detector, out of the membership ring? - unreachable-nodes-reaper-frequency = 1s + unreachable-nodes-reaper-interval = 1s # accrual failure detection config failure-detector { diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index c5ad773989..8beb7f4164 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -380,9 +380,9 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ private val vclockNode = VectorClock.Node(selfAddress.toString) private val periodicTasksInitialDelay = clusterSettings.PeriodicTasksInitialDelay - private val gossipFrequency = clusterSettings.GossipFrequency - private val leaderActionsFrequency = clusterSettings.LeaderActionsFrequency - private val unreachableNodesReaperFrequency = clusterSettings.UnreachableNodesReaperFrequency + private val gossipInterval = clusterSettings.GossipInterval + private val leaderActionsInterval = clusterSettings.LeaderActionsInterval + private val unreachableNodesReaperInterval = clusterSettings.UnreachableNodesReaperInterval implicit private val defaultTimeout = Timeout(remoteSettings.RemoteSystemDaemonAckTimeout) @@ -424,17 +424,17 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒ // ======================================================== // start periodic gossip to random nodes in cluster - private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipFrequency) { + private val gossipCanceller = system.scheduler.schedule(periodicTasksInitialDelay, gossipInterval) { gossip() } // start periodic cluster failure detector reaping (moving nodes condemned by the failure detector to unreachable list) - private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperFrequency) { + private val failureDetectorReaperCanceller = system.scheduler.schedule(periodicTasksInitialDelay, unreachableNodesReaperInterval) { reapUnreachableMembers() } // start periodic leader action management (only applies for the current leader) - private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsFrequency) { + private val leaderActionsCanceller = system.scheduler.schedule(periodicTasksInitialDelay, leaderActionsInterval) { leaderActions() } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 8e9b9c770d..0e7dac06ab 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -20,9 +20,9 @@ class ClusterSettings(val config: Config, val systemName: String) { case AddressFromURIString(addr) ⇒ Some(addr) } val PeriodicTasksInitialDelay = Duration(getMilliseconds("akka.cluster.periodic-tasks-initial-delay"), MILLISECONDS) - val GossipFrequency = Duration(getMilliseconds("akka.cluster.gossip-frequency"), MILLISECONDS) - val LeaderActionsFrequency = Duration(getMilliseconds("akka.cluster.leader-actions-frequency"), MILLISECONDS) - val UnreachableNodesReaperFrequency = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-frequency"), MILLISECONDS) + val GossipInterval = Duration(getMilliseconds("akka.cluster.gossip-interval"), MILLISECONDS) + val LeaderActionsInterval = Duration(getMilliseconds("akka.cluster.leader-actions-interval"), MILLISECONDS) + val UnreachableNodesReaperInterval = Duration(getMilliseconds("akka.cluster.unreachable-nodes-reaper-interval"), MILLISECONDS) val NrOfGossipDaemons = getInt("akka.cluster.nr-of-gossip-daemons") val NrOfDeputyNodes = getInt("akka.cluster.nr-of-deputy-nodes") val AutoDown = getBoolean("akka.cluster.auto-down") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index 6ab4d1a39e..ba34c9b0be 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -26,8 +26,8 @@ class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeT class ClientDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender with BeforeAndAfter { + with MultiNodeClusterSpec { + import ClientDowningNodeThatIsUnreachableMultiJvmSpec._ override def initialParticipants = 4 @@ -36,7 +36,7 @@ class ClientDowningNodeThatIsUnreachableSpec "be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() awaitUpConvergence(numberOfMembers = 4) val thirdAddress = node(third).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 6b0bbae22e..ac1d68c8af 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -26,8 +26,8 @@ class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSp class ClientDowningNodeThatIsUpSpec extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender with BeforeAndAfter { + with MultiNodeClusterSpec { + import ClientDowningNodeThatIsUpMultiJvmSpec._ override def initialParticipants = 4 @@ -36,7 +36,7 @@ class ClientDowningNodeThatIsUpSpec "be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() awaitUpConvergence(numberOfMembers = 4) val thirdAddress = node(third).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index 9d388622db..cec99e9af9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -43,7 +43,7 @@ abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(Gossipi "receive gossip heartbeats so that all member nodes in the cluster are marked 'available'" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 9f1395b5dd..7b7263bbe0 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -27,7 +27,10 @@ class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec -abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender { +abstract class JoinTwoClustersSpec + extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) + with MultiNodeClusterSpec { + import JoinTwoClustersMultiJvmSpec._ override def initialParticipants = 6 @@ -41,7 +44,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(a1, b1, c1) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 63665d3c57..7b2536d9d2 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -34,8 +34,8 @@ class LeaderDowningNodeThatIsUnreachableMultiJvmNode4 extends LeaderDowningNodeT class LeaderDowningNodeThatIsUnreachableSpec extends MultiNodeSpec(LeaderDowningNodeThatIsUnreachableMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender with BeforeAndAfter { + with MultiNodeClusterSpec { + import LeaderDowningNodeThatIsUnreachableMultiJvmSpec._ override def initialParticipants = 4 @@ -44,7 +44,7 @@ class LeaderDowningNodeThatIsUnreachableSpec "be able to DOWN a 'last' node that is UNREACHABLE" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() awaitUpConvergence(numberOfMembers = 4) val fourthAddress = node(fourth).address diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala index ba0471bedb..bf60b6b4ac 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderElectionSpec.scala @@ -26,7 +26,10 @@ class LeaderElectionMultiJvmNode3 extends LeaderElectionSpec class LeaderElectionMultiJvmNode4 extends LeaderElectionSpec class LeaderElectionMultiJvmNode5 extends LeaderElectionSpec -abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSpec) with MultiNodeClusterSpec { +abstract class LeaderElectionSpec + extends MultiNodeSpec(LeaderElectionMultiJvmSpec) + with MultiNodeClusterSpec { + import LeaderElectionMultiJvmSpec._ override def initialParticipants = 5 @@ -41,7 +44,7 @@ abstract class LeaderElectionSpec extends MultiNodeSpec(LeaderElectionMultiJvmSp "be able to 'elect' a single leader" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala index 0145628bd5..8932eed6ee 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerExitingSpec.scala @@ -20,8 +20,8 @@ object MembershipChangeListenerExitingMultiJvmSpec extends MultiNodeConfig { debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" akka.cluster { - leader-actions-frequency = 5000 ms # increase the leader action task frequency - unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set + leader-actions-interval = 5 s # increase the leader action task interval + unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) @@ -31,8 +31,10 @@ class MembershipChangeListenerExitingMultiJvmNode1 extends MembershipChangeListe class MembershipChangeListenerExitingMultiJvmNode2 extends MembershipChangeListenerExitingSpec class MembershipChangeListenerExitingMultiJvmNode3 extends MembershipChangeListenerExitingSpec -abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) - with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { +abstract class MembershipChangeListenerExitingSpec + extends MultiNodeSpec(MembershipChangeListenerExitingMultiJvmSpec) + with MultiNodeClusterSpec { + import MembershipChangeListenerExitingMultiJvmSpec._ override def initialParticipants = 3 @@ -45,7 +47,7 @@ abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(Members "be notified when new node is EXITING" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -55,21 +57,27 @@ abstract class MembershipChangeListenerExitingSpec extends MultiNodeSpec(Members awaitUpConvergence(numberOfMembers = 3) testConductor.enter("rest-started") + runOn(first) { + testConductor.enter("registered-listener") + cluster.leave(secondAddress) + } + + runOn(second) { + testConductor.enter("registered-listener") + } + runOn(third) { val exitingLatch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.exists(_.status == MemberStatus.Exiting)) + if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Exiting)) exitingLatch.countDown() } }) + testConductor.enter("registered-listener") exitingLatch.await } - runOn(first) { - cluster.leave(secondAddress) - } - testConductor.enter("finished") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala similarity index 52% rename from akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala index 81e32d1491..2f82e12506 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinAndUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerJoinSpec.scala @@ -11,7 +11,7 @@ import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.util.duration._ -object MembershipChangeListenerJoinAndUpMultiJvmSpec extends MultiNodeConfig { +object MembershipChangeListenerJoinMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") @@ -19,46 +19,39 @@ object MembershipChangeListenerJoinAndUpMultiJvmSpec extends MultiNodeConfig { debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" akka.cluster { - gossip-frequency = 1000 ms - leader-actions-frequency = 5000 ms # increase the leader action task frequency + leader-actions-interval = 5 s # increase the leader action task interval to allow time checking for JOIN before leader moves it to UP } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) } -class MembershipChangeListenerJoinAndUpMultiJvmNode1 extends MembershipChangeListenerJoinAndUpSpec -class MembershipChangeListenerJoinAndUpMultiJvmNode2 extends MembershipChangeListenerJoinAndUpSpec +class MembershipChangeListenerJoinMultiJvmNode1 extends MembershipChangeListenerJoinSpec +class MembershipChangeListenerJoinMultiJvmNode2 extends MembershipChangeListenerJoinSpec -abstract class MembershipChangeListenerJoinAndUpSpec - extends MultiNodeSpec(MembershipChangeListenerJoinAndUpMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender - with BeforeAndAfter { +abstract class MembershipChangeListenerJoinSpec + extends MultiNodeSpec(MembershipChangeListenerJoinMultiJvmSpec) + with MultiNodeClusterSpec { - import MembershipChangeListenerJoinAndUpMultiJvmSpec._ + import MembershipChangeListenerJoinMultiJvmSpec._ override def initialParticipants = 2 - after { - testConductor.enter("after") - } - lazy val firstAddress = node(first).address lazy val secondAddress = node(second).address "A registered MembershipChangeListener" must { - "be notified when new node is JOINING and node is marked as UP by the leader" taggedAs LongRunningTest in { + "be notified when new node is JOINING" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } runOn(second) { + testConductor.enter("registered-listener") cluster.join(firstAddress) } runOn(first) { - // JOINING val joinLatch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { @@ -66,20 +59,13 @@ abstract class MembershipChangeListenerJoinAndUpSpec joinLatch.countDown() } }) + testConductor.enter("registered-listener") + joinLatch.await cluster.convergence.isDefined must be(true) - - // UP - val upLatch = TestLatch() - cluster.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) - upLatch.countDown() - } - }) - upLatch.await - awaitCond(cluster.convergence.isDefined) } + + testConductor.enter("after") } } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala index f8b083c4d8..089f241849 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerLeavingSpec.scala @@ -18,8 +18,8 @@ object MembershipChangeListenerLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster.leader-actions-frequency = 5000 ms - akka.cluster.unreachable-nodes-reaper-frequency = 30000 ms # turn "off" reaping to unreachable node set + akka.cluster.leader-actions-interval = 5 s + akka.cluster.unreachable-nodes-reaper-interval = 30 s """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -28,8 +28,10 @@ class MembershipChangeListenerLeavingMultiJvmNode1 extends MembershipChangeListe class MembershipChangeListenerLeavingMultiJvmNode2 extends MembershipChangeListenerLeavingSpec class MembershipChangeListenerLeavingMultiJvmNode3 extends MembershipChangeListenerLeavingSpec -abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) - with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { +abstract class MembershipChangeListenerLeavingSpec + extends MultiNodeSpec(MembershipChangeListenerLeavingMultiJvmSpec) + with MultiNodeClusterSpec { + import MembershipChangeListenerLeavingMultiJvmSpec._ override def initialParticipants = 3 @@ -42,7 +44,7 @@ abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(Members "be notified when new node is LEAVING" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -52,21 +54,27 @@ abstract class MembershipChangeListenerLeavingSpec extends MultiNodeSpec(Members awaitUpConvergence(numberOfMembers = 3) testConductor.enter("rest-started") + runOn(first) { + testConductor.enter("registered-listener") + cluster.leave(secondAddress) + } + + runOn(second) { + testConductor.enter("registered-listener") + } + runOn(third) { val latch = TestLatch() cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { - if (members.size == 3 && members.exists(_.status == MemberStatus.Leaving)) + if (members.size == 3 && members.exists( m => m.address == secondAddress && m.status == MemberStatus.Leaving)) latch.countDown() } }) + testConductor.enter("registered-listener") latch.await } - runOn(first) { - cluster.leave(secondAddress) - } - testConductor.enter("finished") } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala new file mode 100644 index 0000000000..3df6b876f9 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerUpSpec.scala @@ -0,0 +1,64 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import scala.collection.immutable.SortedSet +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object MembershipChangeListenerUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class MembershipChangeListenerUpMultiJvmNode1 extends MembershipChangeListenerUpSpec +class MembershipChangeListenerUpMultiJvmNode2 extends MembershipChangeListenerUpSpec + +abstract class MembershipChangeListenerUpSpec + extends MultiNodeSpec(MembershipChangeListenerUpMultiJvmSpec) + with MultiNodeClusterSpec { + + import MembershipChangeListenerUpMultiJvmSpec._ + + override def initialParticipants = 2 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A registered MembershipChangeListener" must { + "be notified when new node is marked as UP by the leader" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + runOn(second) { + testConductor.enter("registered-listener") + cluster.join(firstAddress) + } + + runOn(first) { + val upLatch = TestLatch() + cluster.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) + upLatch.countDown() + } + }) + testConductor.enter("registered-listener") + + upLatch.await + awaitUpConvergence(numberOfMembers = 2) + } + + testConductor.enter("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index 4d0c7f4720..dd57b4b13f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -15,11 +15,11 @@ import akka.util.Duration object MultiNodeClusterSpec { def clusterConfig: Config = ConfigFactory.parseString(""" akka.cluster { - auto-down = off - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - unreachable-nodes-reaper-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms + auto-down = off + gossip-interval = 200 ms + leader-actions-interval = 200 ms + unreachable-nodes-reaper-interval = 200 ms + periodic-tasks-initial-delay = 300 ms } akka.test { single-expect-default = 5 s @@ -29,8 +29,16 @@ object MultiNodeClusterSpec { trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ + /** + * Create a cluster node using 'Cluster(system)'. + */ def cluster: Cluster = Cluster(system) + /** + * Use this method instead of 'cluster.self'. + */ + def startClusterNode(): Unit = cluster.self + /** * Assert that the member addresses match the expected addresses in the * sort order used by the cluster. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala deleted file mode 100644 index 5415df1b4a..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinAndUpSpec.scala +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ -package akka.cluster - -import com.typesafe.config.ConfigFactory -import org.scalatest.BeforeAndAfter -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import akka.util.duration._ - -object NodeJoinAndUpMultiJvmSpec extends MultiNodeConfig { - val first = role("first") - val second = role("second") - - commonConfig( - debugConfig(on = false) - .withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 1000 ms - leader-actions-frequency = 5000 ms # increase the leader action task frequency - } - """) - .withFallback(MultiNodeClusterSpec.clusterConfig))) -} - -class NodeJoinAndUpMultiJvmNode1 extends NodeJoinAndUpSpec -class NodeJoinAndUpMultiJvmNode2 extends NodeJoinAndUpSpec - -abstract class NodeJoinAndUpSpec - extends MultiNodeSpec(NodeJoinAndUpMultiJvmSpec) - with MultiNodeClusterSpec - with ImplicitSender - with BeforeAndAfter { - - import NodeJoinAndUpMultiJvmSpec._ - - override def initialParticipants = 2 - - after { - testConductor.enter("after") - } - - lazy val firstAddress = node(first).address - lazy val secondAddress = node(second).address - - "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { - - "be a singleton cluster when started up" taggedAs LongRunningTest in { - runOn(first) { - awaitCond(cluster.isSingletonCluster) - awaitUpConvergence(numberOfMembers = 1) - cluster.isLeader must be(true) - } - } - } - - "A second cluster node" must { - "join the cluster as JOINING - when sending a 'Join' command - and then be moved to UP by the leader" taggedAs LongRunningTest in { - - runOn(second) { - cluster.join(firstAddress) - } - - awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining }) - - awaitCond( - cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Up }, - 30.seconds.dilated) // waiting for the leader to move from JOINING -> UP (frequency set to 5 sec in config) - - cluster.latestGossip.members.size must be(2) - awaitCond(cluster.convergence.isDefined) - } - } -} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala new file mode 100644 index 0000000000..99116ecb25 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeJoinSpec.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeJoinMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig( + debugConfig(on = false) + .withFallback(ConfigFactory.parseString(""" + akka.cluster { + leader-actions-interval = 5 s # increase the leader action task interval + } + """) + .withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class NodeJoinMultiJvmNode1 extends NodeJoinSpec +class NodeJoinMultiJvmNode2 extends NodeJoinSpec + +abstract class NodeJoinSpec + extends MultiNodeSpec(NodeJoinMultiJvmSpec) + with MultiNodeClusterSpec { + + import NodeJoinMultiJvmSpec._ + + override def initialParticipants = 2 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A cluster node" must { + "join another cluster and get status JOINING - when sending a 'Join' command" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + runOn(second) { + cluster.join(firstAddress) + } + + awaitCond(cluster.latestGossip.members.exists { member ⇒ member.address == secondAddress && member.status == MemberStatus.Joining }) + + testConductor.enter("after") + } + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index 7c1037a624..da500323aa 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -40,7 +40,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec extends MultiNodeSpec(No "be moved to EXITING and then to REMOVED by the reaper" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala index 3fe9e220f6..189cb4c9c6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingSpec.scala @@ -20,8 +20,8 @@ object NodeLeavingAndExitingMultiJvmSpec extends MultiNodeConfig { debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" akka.cluster { - leader-actions-frequency = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state - unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set + leader-actions-interval = 5 s # increase the leader action task frequency to make sure we get a chance to test the LEAVING state + unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set } """) .withFallback(MultiNodeClusterSpec.clusterConfig))) @@ -31,8 +31,10 @@ class NodeLeavingAndExitingMultiJvmNode1 extends NodeLeavingAndExitingSpec class NodeLeavingAndExitingMultiJvmNode2 extends NodeLeavingAndExitingSpec class NodeLeavingAndExitingMultiJvmNode3 extends NodeLeavingAndExitingSpec -abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) - with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { +abstract class NodeLeavingAndExitingSpec + extends MultiNodeSpec(NodeLeavingAndExitingMultiJvmSpec) + with MultiNodeClusterSpec { + import NodeLeavingAndExitingMultiJvmSpec._ override def initialParticipants = 3 @@ -46,7 +48,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi "be moved to EXITING by the leader" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -64,7 +66,7 @@ abstract class NodeLeavingAndExitingSpec extends MultiNodeSpec(NodeLeavingAndExi runOn(first, third) { // 1. Verify that 'second' node is set to LEAVING - // We have set the 'leader-actions-frequency' to 5 seconds to make sure that we get a + // We have set the 'leader-actions-interval' to 5 seconds to make sure that we get a // chance to test the LEAVING state before the leader moves the node to EXITING awaitCond(cluster.latestGossip.members.exists(_.status == MemberStatus.Leaving)) // wait on LEAVING val hasLeft = cluster.latestGossip.members.find(_.status == MemberStatus.Leaving) // verify node that left diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala index 300afdea20..ad445b4c42 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingSpec.scala @@ -18,8 +18,8 @@ object NodeLeavingMultiJvmSpec extends MultiNodeConfig { commonConfig( debugConfig(on = false) .withFallback(ConfigFactory.parseString(""" - akka.cluster.leader-actions-frequency = 5 s - akka.cluster.unreachable-nodes-reaper-frequency = 30 s # turn "off" reaping to unreachable node set + akka.cluster.leader-actions-interval = 5 s + akka.cluster.unreachable-nodes-reaper-interval = 30 s # turn "off" reaping to unreachable node set """)) .withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -43,7 +43,7 @@ abstract class NodeLeavingSpec extends MultiNodeSpec(NodeLeavingMultiJvmSpec) "be marked as LEAVING in the converged membership table" taggedAs LongRunningTest in { runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index fecb53c898..369dcf56ad 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -22,7 +22,11 @@ class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec -abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { +abstract class NodeMembershipSpec + extends MultiNodeSpec(NodeMembershipMultiJvmSpec) + with MultiNodeClusterSpec + with ImplicitSender with BeforeAndAfter { + import NodeMembershipMultiJvmSpec._ override def initialParticipants = 3 @@ -41,7 +45,7 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala index c0c12f4582..a9a5ee3233 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeShutdownSpec.scala @@ -42,7 +42,7 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) "not be singleton cluster when joined" taggedAs LongRunningTest in { // make sure that the node-to-join is started before other join runOn(first) { - cluster.self + startClusterNode() } testConductor.enter("first-started") @@ -63,8 +63,6 @@ abstract class NodeShutdownSpec extends MultiNodeSpec(NodeShutdownMultiJvmSpec) cluster.isSingletonCluster must be(true) assertLeader(first) } - } } - } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala new file mode 100644 index 0000000000..7931ce48f1 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeUpSpec.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ + +object NodeUpMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) +} + +class NodeUpMultiJvmNode1 extends NodeUpSpec +class NodeUpMultiJvmNode2 extends NodeUpSpec + +abstract class NodeUpSpec + extends MultiNodeSpec(NodeUpMultiJvmSpec) + with MultiNodeClusterSpec { + + import NodeUpMultiJvmSpec._ + + override def initialParticipants = 2 + + lazy val firstAddress = node(first).address + lazy val secondAddress = node(second).address + + "A cluster node that is joining another cluster" must { + "be moved to UP by the leader after a convergence" taggedAs LongRunningTest in { + + runOn(first) { + startClusterNode() + } + + runOn(second) { + cluster.join(firstAddress) + } + + awaitUpConvergence(numberOfMembers = 2) + + testConductor.enter("after") + } + } +} diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala index 45b0a35521..6b2ff1962c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterConfigSpec.scala @@ -20,9 +20,9 @@ class ClusterConfigSpec extends AkkaSpec { FailureDetectorMaxSampleSize must be(1000) NodeToJoin must be(None) PeriodicTasksInitialDelay must be(1 seconds) - GossipFrequency must be(1 second) - LeaderActionsFrequency must be(1 second) - UnreachableNodesReaperFrequency must be(1 second) + GossipInterval must be(1 second) + LeaderActionsInterval must be(1 second) + UnreachableNodesReaperInterval must be(1 second) NrOfGossipDaemons must be(4) NrOfDeputyNodes must be(3) AutoDown must be(true) From 0a011ee50ea7bd235b4c612968fad163f4f9c6b3 Mon Sep 17 00:00:00 2001 From: Roland Date: Mon, 4 Jun 2012 23:35:52 +0200 Subject: [PATCH 13/13] =?UTF-8?q?fix=20a=20few=20doubled=20the=E2=80=99s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- akka-docs/java/fault-tolerance-sample.rst | 2 +- akka-docs/java/logging.rst | 2 +- akka-docs/java/untyped-actors.rst | 2 +- akka-docs/scala/actors.rst | 2 +- akka-docs/scala/fault-tolerance-sample.rst | 2 +- akka-docs/scala/logging.rst | 2 +- akka-docs/scala/testing.rst | 8 ++++---- 7 files changed, 10 insertions(+), 10 deletions(-) diff --git a/akka-docs/java/fault-tolerance-sample.rst b/akka-docs/java/fault-tolerance-sample.rst index cb7e1e774d..749cf7ef95 100644 --- a/akka-docs/java/fault-tolerance-sample.rst +++ b/akka-docs/java/fault-tolerance-sample.rst @@ -43,7 +43,7 @@ Step Description 9, 10, 11 and tells the ``Counter`` that there is no ``Storage``. 12 The ``CounterService`` schedules a ``Reconnect`` message to itself. 13, 14 When it receives the ``Reconnect`` message it creates a new ``Storage`` ... -15, 16 and tells the the ``Counter`` to use the new ``Storage`` +15, 16 and tells the ``Counter`` to use the new ``Storage`` =========== ================================================================================== Full Source Code of the Fault Tolerance Sample (Java) diff --git a/akka-docs/java/logging.rst b/akka-docs/java/logging.rst index 0f6f4479e5..03de58de5b 100644 --- a/akka-docs/java/logging.rst +++ b/akka-docs/java/logging.rst @@ -211,7 +211,7 @@ the first case and ``LoggerFactory.getLogger(String s)`` in the second). .. note:: - Beware that the the actor system’s name is appended to a :class:`String` log + Beware that the actor system’s name is appended to a :class:`String` log source if the LoggingAdapter was created giving an :class:`ActorSystem` to the factory. If this is not intended, give a :class:`LoggingBus` instead as shown below: diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index 31a0df9674..ac911fd216 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -586,7 +586,7 @@ What happens to the Message --------------------------- If an exception is thrown while a message is being processed (so taken of his -mailbox and handed over the the receive), then this message will be lost. It is +mailbox and handed over to the receive), then this message will be lost. It is important to understand that it is not put back on the mailbox. So if you want to retry processing of a message, you need to deal with it yourself by catching the exception and retry your flow. Make sure that you put a bound on the number diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 4a556cf6c2..9b2cb9a7e5 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -651,7 +651,7 @@ What happens to the Message --------------------------- If an exception is thrown while a message is being processed (so taken of his -mailbox and handed over the the receive), then this message will be lost. It is +mailbox and handed over to the receive), then this message will be lost. It is important to understand that it is not put back on the mailbox. So if you want to retry processing of a message, you need to deal with it yourself by catching the exception and retry your flow. Make sure that you put a bound on the number diff --git a/akka-docs/scala/fault-tolerance-sample.rst b/akka-docs/scala/fault-tolerance-sample.rst index 56ac838b1f..12621e968b 100644 --- a/akka-docs/scala/fault-tolerance-sample.rst +++ b/akka-docs/scala/fault-tolerance-sample.rst @@ -45,7 +45,7 @@ Step Description 9, 10, 11 and tells the ``Counter`` that there is no ``Storage``. 12 The ``CounterService`` schedules a ``Reconnect`` message to itself. 13, 14 When it receives the ``Reconnect`` message it creates a new ``Storage`` ... -15, 16 and tells the the ``Counter`` to use the new ``Storage`` +15, 16 and tells the ``Counter`` to use the new ``Storage`` =========== ================================================================================== Full Source Code of the Fault Tolerance Sample (Scala) diff --git a/akka-docs/scala/logging.rst b/akka-docs/scala/logging.rst index 66cc6ae398..4ea96722e5 100644 --- a/akka-docs/scala/logging.rst +++ b/akka-docs/scala/logging.rst @@ -253,7 +253,7 @@ the first case and ``LoggerFactory.getLogger(s: String)`` in the second). .. note:: - Beware that the the actor system’s name is appended to a :class:`String` log + Beware that the actor system’s name is appended to a :class:`String` log source if the LoggingAdapter was created giving an :class:`ActorSystem` to the factory. If this is not intended, give a :class:`LoggingBus` instead as shown below: diff --git a/akka-docs/scala/testing.rst b/akka-docs/scala/testing.rst index d19a1ab753..0835db18e7 100644 --- a/akka-docs/scala/testing.rst +++ b/akka-docs/scala/testing.rst @@ -194,10 +194,10 @@ is a whole set of examination methods, e.g. receiving all consecutive messages matching certain criteria, receiving a whole sequence of fixed messages or classes, receiving nothing for some time, etc. -The ActorSystem passed in to the constructor of TestKit is accessible with -the the :obj:`system` member. -Remember to shut down the actor system after the test is finished (also in case -of failure) so that all actors—including the test actor—are stopped. +The ActorSystem passed in to the constructor of TestKit is accessible via the +:obj:`system` member. Remember to shut down the actor system after the test is +finished (also in case of failure) so that all actors—including the test +actor—are stopped. Built-In Assertions -------------------