From 8d114f5da5e938e0967a0aeee1201b9ff3d92f8d Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 24 May 2012 16:31:43 +0200 Subject: [PATCH 1/4] Move JoinTwoClustersSpec to multi-jvm. See #2111 --- .../scala/akka/cluster/JoinTwoClustersSpec.scala | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) rename akka-cluster/src/{test => multi-jvm}/scala/akka/cluster/JoinTwoClustersSpec.scala (95%) diff --git a/akka-cluster/src/test/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala similarity index 95% rename from akka-cluster/src/test/scala/akka/cluster/JoinTwoClustersSpec.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index ff12cb6c60..1efd356698 100644 --- a/akka-cluster/src/test/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -4,15 +4,13 @@ package akka.cluster -import akka.testkit._ -import akka.dispatch._ -import akka.actor._ -import akka.remote._ -import akka.util.duration._ - -import com.typesafe.config._ - -import java.net.InetSocketAddress +import akka.actor.ActorSystem +import akka.actor.ActorSystemImpl +import akka.testkit.ImplicitSender +import akka.testkit.LongRunningTest +import akka.testkit.duration2TestDuration +import akka.util.duration.intToDurationInt +import com.typesafe.config.ConfigFactory class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.threshold = 5") with ImplicitSender { val portPrefix = 3 From db16b6c4b38bb80c0b918b52ed082261d9812b3b Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 24 May 2012 17:08:28 +0200 Subject: [PATCH 2/4] Port JoinTwoClustersSpec to MultiNodeSpec. See #2111 --- .../akka/cluster/JoinTwoClustersSpec.scala | 235 +++++++----------- 1 file changed, 87 insertions(+), 148 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 1efd356698..9c59beb70e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -4,174 +4,113 @@ package akka.cluster -import akka.actor.ActorSystem -import akka.actor.ActorSystemImpl -import akka.testkit.ImplicitSender -import akka.testkit.LongRunningTest -import akka.testkit.duration2TestDuration -import akka.util.duration.intToDurationInt +import org.scalatest.BeforeAndAfter import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit.ImplicitSender -class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.threshold = 5") with ImplicitSender { - val portPrefix = 3 +object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig { + val a1 = role("a1") + val a2 = role("a2") + val b1 = role("b1") + val b2 = role("b2") + val c1 = role("c1") + val c2 = role("c2") - var node1: Cluster = _ - var node2: Cluster = _ - var node3: Cluster = _ - var node4: Cluster = _ - var node5: Cluster = _ - var node6: Cluster = _ + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka.cluster { + gossip-frequency = 200 ms + leader-actions-frequency = 200 ms + periodic-tasks-initial-delay = 300 ms + } + """))) - var system1: ActorSystemImpl = _ - var system2: ActorSystemImpl = _ - var system3: ActorSystemImpl = _ - var system4: ActorSystemImpl = _ - var system5: ActorSystemImpl = _ - var system6: ActorSystemImpl = _ +} - try { - "Three different clusters (A, B and C)" must { +class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec +class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec - // ======= NODE 1 ======== - system1 = ActorSystem("system1", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d551 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node1 = Cluster(system1) +abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with ImplicitSender with BeforeAndAfter { + import JoinTwoClustersMultiJvmSpec._ - // ======= NODE 2 ======== - system2 = ActorSystem("system2", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d552 - cluster.node-to-join = "akka://system1@localhost:%d551" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node2 = Cluster(system2) + override def initialParticipants = 6 - // ======= NODE 3 ======== - system3 = ActorSystem("system3", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d553 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node3 = Cluster(system3) + def node(): Cluster = Cluster(system) - // ======= NODE 4 ======== - system4 = ActorSystem("system4", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d554 - cluster.node-to-join = "akka://system3@localhost:%d553" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node4 = Cluster(system4) + after { + testConductor.enter("after") + } - // ======= NODE 5 ======== - system5 = ActorSystem("system5", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d555 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node5 = Cluster(system5) + val a1Address = testConductor.getAddressFor(a1).await + val b1Address = testConductor.getAddressFor(b1).await + val c1Address = testConductor.getAddressFor(c1).await - // ======= NODE 6 ======== - system6 = ActorSystem("system6", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d556 - cluster.node-to-join = "akka://system5@localhost:%d555" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - node6 = Cluster(system6) + def awaitUpConvergence(numberOfMembers: Int): Unit = { + awaitCond(node().latestGossip.members.size == numberOfMembers) + awaitCond(node().latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(node().convergence.isDefined) + } - "be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in { + "Three different clusters (A, B and C)" must { - println("Give the system time to converge...") - awaitConvergence(node1 :: node2 :: node3 :: node4 :: node5 :: node6 :: Nil) + "be able to 'elect' a single leader after joining (A -> B)" in { - // check leader - node1.isLeader must be(true) - node2.isLeader must be(false) - node3.isLeader must be(true) - node4.isLeader must be(false) - node5.isLeader must be(true) - node6.isLeader must be(false) - - // join - node4.join(node1.remoteAddress) - //node1.scheduleNodeJoin(node4.remoteAddress) - - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node1 :: node2 :: node3 :: node4 :: node5 :: node6 :: Nil) - - // check leader - node1.isLeader must be(true) - node2.isLeader must be(false) - node3.isLeader must be(false) - node4.isLeader must be(false) - node5.isLeader must be(true) - node6.isLeader must be(false) + runOn(a1, a2) { + node().join(a1Address) + } + runOn(b1, b2) { + node().join(b1Address) + } + runOn(c1, c2) { + node().join(c1Address) } - "be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in { - // join - node4.join(node5.remoteAddress) - //node5.scheduleNodeJoin(node4.remoteAddress) + awaitUpConvergence(numberOfMembers = 2) - println("Give the system time to converge...") - Thread.sleep(10.seconds.dilated.toMillis) - awaitConvergence(node1 :: node2 :: node3 :: node4 :: node5 :: node6 :: Nil) + runOn(a1, b1, c1) { + node().isLeader must be(true) + } + runOn(a2, b2, c2) { + node().isLeader must be(false) + } - // check leader - node1.isLeader must be(true) - node2.isLeader must be(false) - node3.isLeader must be(false) - node4.isLeader must be(false) - node5.isLeader must be(false) - node6.isLeader must be(false) + runOn(b2) { + node().join(a1Address) + } + + runOn(a1, a2, b1, b2) { + awaitUpConvergence(numberOfMembers = 4) + } + + runOn(a1, c1) { + node().isLeader must be(true) + } + runOn(a2, b1, b2, c2) { + node().isLeader must be(false) + } + + } + + "be able to 'elect' a single leader after joining (C -> A + B)" in { + + runOn(b2) { + node().join(c1Address) + } + + awaitUpConvergence(numberOfMembers = 6) + + runOn(a1) { + node().isLeader must be(true) + } + runOn(a2, b1, b2, c1, c2) { + node().isLeader must be(false) } } - } catch { - case e: Exception ⇒ - e.printStackTrace - fail(e.toString) } - override def atTermination() { - if (node1 ne null) node1.shutdown() - if (system1 ne null) system1.shutdown() - - if (node2 ne null) node2.shutdown() - if (system2 ne null) system2.shutdown() - - if (node3 ne null) node3.shutdown() - if (system3 ne null) system3.shutdown() - - if (node4 ne null) node4.shutdown() - if (system4 ne null) system4.shutdown() - - if (node5 ne null) node5.shutdown() - if (system5 ne null) system5.shutdown() - - if (node6 ne null) node6.shutdown() - if (system6 ne null) system6.shutdown() - } } From 0d51fb2ed5a76be97f679b8a5ee2fac7a0d8dc25 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 24 May 2012 17:36:00 +0200 Subject: [PATCH 3/4] Use ifNode for asserts. See #2111 --- .../akka/cluster/JoinTwoClustersSpec.scala | 21 +++---------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index 9c59beb70e..b5e764ea23 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -72,12 +72,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm awaitUpConvergence(numberOfMembers = 2) - runOn(a1, b1, c1) { - node().isLeader must be(true) - } - runOn(a2, b2, c2) { - node().isLeader must be(false) - } + node().isLeader must be(ifNode(a1, b1, c1)(true)(false)) runOn(b2) { node().join(a1Address) @@ -87,12 +82,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm awaitUpConvergence(numberOfMembers = 4) } - runOn(a1, c1) { - node().isLeader must be(true) - } - runOn(a2, b1, b2, c2) { - node().isLeader must be(false) - } + node().isLeader must be(ifNode(a1, c1)(true)(false)) } @@ -104,12 +94,7 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm awaitUpConvergence(numberOfMembers = 6) - runOn(a1) { - node().isLeader must be(true) - } - runOn(a2, b1, b2, c1, c2) { - node().isLeader must be(false) - } + node().isLeader must be(ifNode(a1)(true)(false)) } } From 0ac7f967dd6d6e70ee713127abbc349e4f66586a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 25 May 2012 08:32:42 +0200 Subject: [PATCH 4/4] Change node naming. See 2111 --- .../akka/cluster/JoinTwoClustersSpec.scala | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala index b5e764ea23..9ed003944f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/JoinTwoClustersSpec.scala @@ -40,20 +40,20 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm override def initialParticipants = 6 - def node(): Cluster = Cluster(system) + def cluster: Cluster = Cluster(system) after { testConductor.enter("after") } - val a1Address = testConductor.getAddressFor(a1).await - val b1Address = testConductor.getAddressFor(b1).await - val c1Address = testConductor.getAddressFor(c1).await + val a1Address = node(a1).address + val b1Address = node(b1).address + val c1Address = node(c1).address def awaitUpConvergence(numberOfMembers: Int): Unit = { - awaitCond(node().latestGossip.members.size == numberOfMembers) - awaitCond(node().latestGossip.members.forall(_.status == MemberStatus.Up)) - awaitCond(node().convergence.isDefined) + awaitCond(cluster.latestGossip.members.size == numberOfMembers) + awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up)) + awaitCond(cluster.convergence.isDefined) } "Three different clusters (A, B and C)" must { @@ -61,40 +61,40 @@ abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvm "be able to 'elect' a single leader after joining (A -> B)" in { runOn(a1, a2) { - node().join(a1Address) + cluster.join(a1Address) } runOn(b1, b2) { - node().join(b1Address) + cluster.join(b1Address) } runOn(c1, c2) { - node().join(c1Address) + cluster.join(c1Address) } awaitUpConvergence(numberOfMembers = 2) - node().isLeader must be(ifNode(a1, b1, c1)(true)(false)) + cluster.isLeader must be(ifNode(a1, b1, c1)(true)(false)) runOn(b2) { - node().join(a1Address) + cluster.join(a1Address) } runOn(a1, a2, b1, b2) { awaitUpConvergence(numberOfMembers = 4) } - node().isLeader must be(ifNode(a1, c1)(true)(false)) + cluster.isLeader must be(ifNode(a1, c1)(true)(false)) } "be able to 'elect' a single leader after joining (C -> A + B)" in { runOn(b2) { - node().join(c1Address) + cluster.join(c1Address) } awaitUpConvergence(numberOfMembers = 6) - node().isLeader must be(ifNode(a1)(true)(false)) + cluster.isLeader must be(ifNode(a1)(true)(false)) } }