diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala index 783560ec09..8e3fb04ec6 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeMembershipSpec.scala @@ -3,131 +3,122 @@ */ package akka.cluster -import akka.actor.ActorSystem -import akka.actor.ActorSystemImpl -import akka.remote.RemoteActorRefProvider -import akka.testkit.ImplicitSender -import akka.testkit.LongRunningTest import com.typesafe.config.ConfigFactory +import org.scalatest.BeforeAndAfter +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ -class NodeMembershipSpec extends ClusterSpec with ImplicitSender { - val portPrefix = 7 +object NodeMembershipMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") - var node0: Cluster = _ - var node1: Cluster = _ - var node2: Cluster = _ - - var system0: ActorSystemImpl = _ - var system1: ActorSystemImpl = _ - var system2: ActorSystemImpl = _ - - try { - "A set of connected cluster systems" must { - "(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in { - - // ======= NODE 0 ======== - system0 = ActorSystem("system0", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d550 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider] - node0 = Cluster(system0) - - // ======= NODE 1 ======== - system1 = ActorSystem("system1", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d551 - cluster.node-to-join = "akka://system0@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] - node1 = Cluster(system1) - - // check cluster convergence - awaitConvergence(node0 :: node1 :: Nil) - - val members0 = node0.latestGossip.members.toArray - members0.size must be(2) - members0(0).address.port.get must be(550.withPortPrefix) - members0(0).status must be(MemberStatus.Up) - members0(1).address.port.get must be(551.withPortPrefix) - members0(1).status must be(MemberStatus.Up) - - val members1 = node1.latestGossip.members.toArray - members1.size must be(2) - members1(0).address.port.get must be(550.withPortPrefix) - members1(0).status must be(MemberStatus.Up) - members1(1).address.port.get must be(551.withPortPrefix) - members1(1).status must be(MemberStatus.Up) - } - - "(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest ignore { - - // ======= NODE 2 ======== - system2 = ActorSystem("system2", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d552 - cluster.node-to-join = "akka://system0@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] - node2 = Cluster(system2) - - awaitConvergence(node0 :: node1 :: node2 :: Nil) - - val members0 = node0.latestGossip.members.toArray - val version = node0.latestGossip.version - members0.size must be(3) - members0(0).address.port.get must be(550.withPortPrefix) - members0(0).status must be(MemberStatus.Up) - members0(1).address.port.get must be(551.withPortPrefix) - members0(1).status must be(MemberStatus.Up) - members0(2).address.port.get must be(552.withPortPrefix) - members0(2).status must be(MemberStatus.Up) - - val members1 = node1.latestGossip.members.toArray - members1.size must be(3) - members1(0).address.port.get must be(550.withPortPrefix) - members1(0).status must be(MemberStatus.Up) - members1(1).address.port.get must be(551.withPortPrefix) - members1(1).status must be(MemberStatus.Up) - members1(2).address.port.get must be(552.withPortPrefix) - members1(2).status must be(MemberStatus.Up) - - val members2 = node2.latestGossip.members.toArray - members2.size must be(3) - members2(0).address.port.get must be(550.withPortPrefix) - members2(0).status must be(MemberStatus.Up) - members2(1).address.port.get must be(551.withPortPrefix) - members2(1).status must be(MemberStatus.Up) - members2(2).address.port.get must be(552.withPortPrefix) - members2(2).status must be(MemberStatus.Up) - } + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka.cluster { + gossip-frequency = 200 ms + leader-actions-frequency = 200 ms + periodic-tasks-initial-delay = 300 ms + # FIXME get rid of this hardcoded host:port + node-to-join = "akka://MultiNodeSpec@localhost:2602" } - } catch { - case e: Exception ⇒ - e.printStackTrace - fail(e.toString) - } + """))) - override def atTermination() { - if (node0 ne null) node0.shutdown() - if (system0 ne null) system0.shutdown() + nodeConfig(first, ConfigFactory.parseString(""" + # FIXME get rid of this hardcoded port + akka.remote.netty.port=2602 + """)) + +} + +class NodeMembershipMultiJvmNode1 extends NodeMembershipSpec { + override var node: Cluster = _ +} +class NodeMembershipMultiJvmNode2 extends NodeMembershipSpec { + override var node: Cluster = _ +} +class NodeMembershipMultiJvmNode3 extends NodeMembershipSpec { + override var node: Cluster = _ +} + +abstract class NodeMembershipSpec extends MultiNodeSpec(NodeMembershipMultiJvmSpec) with ImplicitSender with BeforeAndAfter { + import NodeMembershipMultiJvmSpec._ + + override def initialParticipants = 3 + + var node: Cluster + + after { + testConductor.enter("after") + } + + "A set of connected cluster systems" must { + + val firstAddress = testConductor.getAddressFor(first).await + val secondAddress = testConductor.getAddressFor(second).await + val thirdAddress = testConductor.getAddressFor(third).await + + "(when two systems) start gossiping to each other so that both systems gets the same gossip info" in { + + def assertMembers: Unit = { + val members = node.latestGossip.members.toIndexedSeq + members.size must be(2) + members(0).address must be(firstAddress) + members(1).address must be(secondAddress) + awaitCond { + node.latestGossip.members.forall(_.status == MemberStatus.Up) + } + } + + runOn(first) { + node = Cluster(system) + awaitCond(node.latestGossip.members.size == 2) + assertMembers + node.convergence.isDefined + } + + runOn(second) { + node = Cluster(system) + awaitCond(node.latestGossip.members.size == 2) + assertMembers + node.convergence.isDefined + } + + } + + "(when three systems) start gossiping to each other so that both systems gets the same gossip info" in { + + def assertMembers: Unit = { + val members = node.latestGossip.members.toIndexedSeq + members.size must be(3) + members(0).address must be(firstAddress) + members(1).address must be(secondAddress) + members(2).address must be(thirdAddress) + awaitCond { + node.latestGossip.members.forall(_.status == MemberStatus.Up) + } + } + + runOn(third) { + node = Cluster(system) + awaitCond(node.latestGossip.members.size == 3) + awaitCond(node.convergence.isDefined) + assertMembers + } + + runOn(first) { + awaitCond(node.latestGossip.members.size == 3) + assertMembers + node.convergence.isDefined + } + + runOn(second) { + awaitCond(node.latestGossip.members.size == 3) + assertMembers + node.convergence.isDefined + } + + } + } - if (node1 ne null) node1.shutdown() - if (system1 ne null) system1.shutdown() - - if (node2 ne null) node2.shutdown() - if (system2 ne null) system2.shutdown() - } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala index 694d4ac57d..3a5eeb846d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeStartupSpec.scala @@ -13,7 +13,13 @@ object NodeStartupMultiJvmSpec extends MultiNodeConfig { val first = role("first") val second = role("second") - commonConfig(debugConfig(on = false)) + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka.cluster { + gossip-frequency = 200 ms + leader-actions-frequency = 200 ms + periodic-tasks-initial-delay = 300 ms + } + """))) nodeConfig(first, ConfigFactory.parseString(""" # FIXME get rid of this hardcoded port @@ -27,29 +33,33 @@ object NodeStartupMultiJvmSpec extends MultiNodeConfig { } -class NodeStartupMultiJvmNode1 extends NodeStartupSpec -class NodeStartupMultiJvmNode2 extends NodeStartupSpec +class NodeStartupMultiJvmNode1 extends NodeStartupSpec { + override var node: Cluster = _ +} +class NodeStartupMultiJvmNode2 extends NodeStartupSpec { + override var node: Cluster = _ +} -class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with ImplicitSender with BeforeAndAfter { +abstract class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with ImplicitSender with BeforeAndAfter { import NodeStartupMultiJvmSpec._ override def initialParticipants = 2 - var firstNode: Cluster = _ + var node: Cluster after { testConductor.enter("after") } runOn(first) { - firstNode = Cluster(system) + node = Cluster(system) } "A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must { "be a singleton cluster when started up" in { runOn(first) { - awaitCond(firstNode.isSingletonCluster) + awaitCond(node.isSingletonCluster) // FIXME #2117 singletonCluster should reach convergence //awaitCond(firstNode.convergence.isDefined) } @@ -57,7 +67,7 @@ class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with Implic "be in 'Joining' phase when started up" in { runOn(first) { - val members = firstNode.latestGossip.members + val members = node.latestGossip.members members.size must be(1) val firstAddress = testConductor.getAddressFor(first).await val joiningMember = members find (_.address == firstAddress) @@ -69,21 +79,25 @@ class NodeStartupSpec extends MultiNodeSpec(NodeStartupMultiJvmSpec) with Implic "A second cluster node with a 'node-to-join' config defined" must { "join the other node cluster when sending a Join command" in { + val secondAddress = testConductor.getAddressFor(second).await + + def awaitSecondUp = awaitCond { + node.latestGossip.members.exists { member ⇒ + member.address == secondAddress && member.status == MemberStatus.Up + } + } + runOn(second) { // start cluster on second node, and join - val secondNode = Cluster(system) - awaitCond(secondNode.convergence.isDefined) + node = Cluster(system) + awaitSecondUp + node.convergence.isDefined } runOn(first) { - val secondAddress = testConductor.getAddressFor(second).await - awaitCond { - firstNode.latestGossip.members.exists { member ⇒ - member.address == secondAddress && member.status == MemberStatus.Up - } - } - firstNode.latestGossip.members.size must be(2) - awaitCond(firstNode.convergence.isDefined) + awaitSecondUp + node.latestGossip.members.size must be(2) + node.convergence.isDefined } } }