diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 9ed003944f..c15af5e651 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -18,13 +18,7 @@ object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { val c1 = role("c1") val c2 = role("c2") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms - } - """))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -35,13 +29,11 @@ class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec -abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { import JoinTwoClustersMultiJvmSpec._ override def initialParticipants = 6 - def cluster: Cluster = Cluster(system) - after { testConductor.enter("after") } @@ -50,12 +42,6 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm val b1Address = node(b1).address val c1Address = node(c1).address - def awaitUpConvergence(numberOfMembers: Int): Unit = { - awaitCond(cluster.latestGossip.members.size == numberOfMembers) - awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(cluster.convergence.isDefined) - } - "Three different clusters (A, B and C)" must { "be able to 'elect' a single leader after joining (A -> B)" in { @@ -72,7 +58,9 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm awaitUpConvergence(numberOfMembers = 2) - cluster.isLeader must be(ifNode(a1, b1, c1)(true)(false)) + assertLeader(a1, a2) + assertLeader(b1, b2) + assertLeader(c1, c2) runOn(b2) { cluster.join(a1Address) @@ -82,7 +70,8 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm awaitUpConvergence(numberOfMembers = 4) } - cluster.isLeader must be(ifNode(a1, c1)(true)(false)) + assertLeader(a1, a2, b1, b2) + assertLeader(c1, c2) } @@ -94,7 +83,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm awaitUpConvergence(numberOfMembers = 6) - cluster.isLeader must be(ifNode(a1)(true)(false)) + assertLeader(a1, a2, b1, b2, c1, c2) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index c5ae2f11e7..844d2803a1 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -16,13 +16,7 @@ object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms - } - """))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -30,13 +24,12 @@ class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec -abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) + with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { import MembershipChangeListenerMultiJvmSpec._ override def initialParticipants = 3 - def cluster: Cluster = Cluster(system) - after { testConductor.enter("after") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala new file mode 100644 index 0000000000..873d819dbb --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -0,0 +1,63 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import akka.actor.Address +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeSpec +import akka.util.duration._ + +object MultiNodeClusterSpec { + def clusterConfig: Config = ConfigFactory.parseString(""" + akka.cluster { + gossip-frequency = 200 ms + leader-actions-frequency = 200 ms + periodic-tasks-initial-delay = 300 ms + } + akka.test { + single-expect-default = 5 s + } + """) +} + +trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒ + + def cluster: Cluster = Cluster(system) + + /** + * Assert that the member addresses match the expected addresses in the + * sort order used by the cluster. + */ + def assertMembers(gotMembers: Iterable[Member], expectedAddresses: Address*): Unit = { + import Member.addressOrdering + val members = gotMembers.toIndexedSeq + members.size must be(expectedAddresses.length) + expectedAddresses.sorted.zipWithIndex.foreach { case (a, i) ⇒ members(i).address must be(a) } + } + + /** + * Assert that the cluster has elected the correct leader + * out of all nodes in the cluster. First + * member in the cluster ring is expected leader. + */ + def assertLeader(nodesInCluster: RoleName*): Unit = if (nodesInCluster.contains(mySelf)) { + nodesInCluster.length must not be (0) + import Member.addressOrdering + val expectedLeader = nodesInCluster.map(role ⇒ (role, node(role).address)).sortBy(_._2).head._1 + cluster.isLeader must be(ifNode(expectedLeader)(true)(false)) + } + + /** + * Wait until the expected number of members has status Up + * and convergence has been reached. + */ + def awaitUpConvergence(numberOfMembers: Int): Unit = { + awaitCond(cluster.latestGossip.members.size == numberOfMembers) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(cluster.convergence.isDefined, 10 seconds) + } + +} \ No newline at end of file diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index a8af644fe0..b5dc5d4d42 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -14,13 +14,7 @@ object NodeMembershipMultiJvmSpec extends MultiNodeConfig { val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms - } - """))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } @@ -28,13 +22,11 @@ class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec -abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { import NodeMembershipMultiJvmSpec._ override def initialParticipants = 3 - def cluster: Cluster = Cluster(system) - after { testConductor.enter("after") } @@ -50,11 +42,7 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp runOn(first, second) { cluster.join(firstAddress) awaitCond(cluster.latestGossip.members.size == 2) - val members = cluster.latestGossip.members.toIndexedSeq - members.size must be(2) - val sortedAddresses = IndexedSeq(firstAddress, secondAddress).sortBy(_.toString) - members(0).address must be(sortedAddresses(0)) - members(1).address must be(sortedAddresses(1)) + assertMembers(cluster.latestGossip.members, firstAddress, secondAddress) awaitCond { cluster.latestGossip.members.forall(_.status == MemberStatus.Up) } @@ -69,14 +57,8 @@ abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSp cluster.join(firstAddress) } - // runOn all awaitCond(cluster.latestGossip.members.size == 3) - val members = cluster.latestGossip.members.toIndexedSeq - members.size must be(3) - val sortedAddresses = IndexedSeq(firstAddress, secondAddress, thirdAddress).sortBy(_.toString) - members(0).address must be(sortedAddresses(0)) - members(1).address must be(sortedAddresses(1)) - members(2).address must be(sortedAddresses(2)) + assertMembers(cluster.latestGossip.members, firstAddress, secondAddress, thirdAddress) awaitCond { cluster.latestGossip.members.forall(_.status == MemberStatus.Up) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala index f2206f8d89..55a0b15f63 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala @@ -13,26 +13,18 @@ object NodeStartupMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" - akka.cluster { - gossip-frequency = 200 ms - leader-actions-frequency = 200 ms - periodic-tasks-initial-delay = 300 ms - } - """))) + commonConfig(debugConfig(on = false).withFallback(MultiNodeClusterSpec.clusterConfig)) } class NodeStartupMultiJvmNode1 extends NodeStartupSpec class NodeStartupMultiJvmNode2 extends NodeStartupSpec -abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter { import NodeStartupMultiJvmSpec._ override def initialParticipants = 2 - def cluster: Cluster = Cluster(system) - after { testConductor.enter("after") }