From ad4725aa708a11171493a74f347b4032a220f2a4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 24 May 2012 13:57:04 +0200 Subject: [PATCH 1/5] Moved MembershipChangeListenerSpec to multi-jvm. See #2114 --- .../MembershipChangeListenerSpec.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) rename akka-cluster/src/{test => multi-jvm}/scala/akka/cluster/MembershipChangeListenerSpec.scala (90%) diff --git a/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala similarity index 90% rename from akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala rename to akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index 17a7c6ed7a..6ff97fe483 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -3,19 +3,19 @@ */ package akka.cluster -import akka.testkit._ -import akka.dispatch._ -import akka.actor._ -import akka.remote._ -import akka.util.duration._ - -import java.net.InetSocketAddress -import java.util.concurrent.{ CountDownLatch, TimeUnit } - +import akka.actor.ActorSystem +import akka.actor.ActorSystemImpl +import akka.remote.RemoteActorRefProvider +import akka.testkit.ImplicitSender +import akka.testkit.LongRunningTest +import akka.testkit.duration2TestDuration +import akka.util.duration.intToDurationInt +import com.typesafe.config.ConfigFactory +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import scala.annotation.tailrec import scala.collection.immutable.SortedSet -import com.typesafe.config._ - class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender { val portPrefix = 6 From 7322ff3ef0801eaa6cc95126f29d20cc19bd1ab4 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 24 May 2012 14:46:35 +0200 Subject: [PATCH 2/5] Port MembershipChangeListenerSpec to MultiNodeSpec. See #2114 --- .../GossipMembershipMultiJvmSpec.scala | 134 ------------ .../MembershipChangeListenerSpec.scala | 204 ++++++++---------- 2 files changed, 88 insertions(+), 250 deletions(-) delete mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/GossipMembershipMultiJvmSpec.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipMembershipMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipMembershipMultiJvmSpec.scala deleted file mode 100644 index c380d3e5eb..0000000000 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/GossipMembershipMultiJvmSpec.scala +++ /dev/null @@ -1,134 +0,0 @@ -// package akka.cluster - -// import akka.actor.Actor -// import akka.remote._ -// import akka.routing._ -// import akka.routing.Routing.Broadcast - -// object GossipMembershipMultiJvmSpec { -// val NrOfNodes = 4 -// class SomeActor extends Actor with Serializable { -// def receive = { -// case "hit" ⇒ sender ! system.nodename -// case "end" ⇒ self.stop() -// } -// } - -// import com.typesafe.config.ConfigFactory -// val commonConfig = ConfigFactory.parseString(""" -// akka { -// loglevel = "WARNING" -// cluster { -// seed-nodes = ["localhost:9991"] -// } -// remote.server.hostname = "localhost" -// }""") - -// val node1Config = ConfigFactory.parseString(""" -// akka { -// remote.server.port = "9991" -// cluster.nodename = "node1" -// }""") withFallback commonConfig - -// val node2Config = ConfigFactory.parseString(""" -// akka { -// remote.server.port = "9992" -// cluster.nodename = "node2" -// }""") withFallback commonConfig - -// val node3Config = ConfigFactory.parseString(""" -// akka { -// remote.server.port = "9993" -// cluster.nodename = "node3" -// }""") withFallback commonConfig - -// val node4Config = ConfigFactory.parseString(""" -// akka { -// remote.server.port = "9994" -// cluster.nodename = "node4" -// }""") withFallback commonConfig -// } - -// class GossipMembershipMultiJvmNode1 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node1Config) { -// import GossipMembershipMultiJvmSpec._ -// val nodes = NrOfNodes -// "A cluster" must { -// "allow new node to join and should reach convergence with new membership table" in { - -// barrier("setup") -// remote.start() - -// barrier("start") -// val actor = system.actorOf(Props[SomeActor]("service-hello") -// actor.isInstanceOf[RoutedActorRef] must be(true) - -// val connectionCount = NrOfNodes - 1 -// val iterationCount = 10 - -// var replies = Map( -// "node1" -> 0, -// "node2" -> 0, -// "node3" -> 0) - -// for (i ← 0 until iterationCount) { -// for (k ← 0 until connectionCount) { -// val nodeName = (actor ? "hit").as[String].getOrElse(fail("No id returned by actor")) -// replies = replies + (nodeName -> (replies(nodeName) + 1)) -// } -// } - -// barrier("broadcast-end") -// actor ! Broadcast("end") - -// barrier("end") -// replies.values foreach { _ must be > (0) } - -// barrier("done") -// } -// } -// } - -// class GossipMembershipMultiJvmNode2 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node2Config) { -// import GossipMembershipMultiJvmSpec._ -// val nodes = NrOfNodes -// "___" must { -// "___" in { -// barrier("setup") -// remote.start() -// barrier("start") -// barrier("broadcast-end") -// barrier("end") -// barrier("done") -// } -// } -// } - -// class GossipMembershipMultiJvmNode3 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node3Config) { -// import GossipMembershipMultiJvmSpec._ -// val nodes = NrOfNodes -// "___" must { -// "___" in { -// barrier("setup") -// remote.start() -// barrier("start") -// barrier("broadcast-end") -// barrier("end") -// barrier("done") -// } -// } -// } - -// class GossipMembershipMultiJvmNode4 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node4Config) { -// import GossipMembershipMultiJvmSpec._ -// val nodes = NrOfNodes -// "___" must { -// "___" in { -// barrier("setup") -// remote.start() -// barrier("start") -// barrier("broadcast-end") -// barrier("end") -// barrier("done") -// } -// } -// } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index 6ff97fe483..e81c52ad36 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -3,127 +3,99 @@ */ package akka.cluster -import akka.actor.ActorSystem -import akka.actor.ActorSystemImpl -import akka.remote.RemoteActorRefProvider -import akka.testkit.ImplicitSender -import akka.testkit.LongRunningTest -import akka.testkit.duration2TestDuration -import akka.util.duration.intToDurationInt -import com.typesafe.config.ConfigFactory -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import scala.annotation.tailrec import scala.collection.immutable.SortedSet +import org.scalatest.BeforeAndAfter +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit.ImplicitSender +import akka.testkit.TestLatch -class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender { - val portPrefix = 6 +object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") - var node0: Cluster = _ - var node1: Cluster = _ - var node2: Cluster = _ - - var system0: ActorSystemImpl = _ - var system1: ActorSystemImpl = _ - var system2: ActorSystemImpl = _ - - try { - "A set of connected cluster systems" must { - "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - system0 = ActorSystem("system0", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port = %d550 - }""".format(portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider] - node0 = Cluster(system0) - - system1 = ActorSystem("system1", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port=%d551 - cluster.node-to-join = "akka://system0@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] - node1 = Cluster(system1) - - val latch = new CountDownLatch(2) - - node0.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - latch.countDown() - } - }) - node1.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - latch.countDown() - } - }) - - latch.await(10.seconds.dilated.toMillis, TimeUnit.MILLISECONDS) - - Thread.sleep(10.seconds.dilated.toMillis) - - // check cluster convergence - node0.convergence must be('defined) - node1.convergence must be('defined) - } - - "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in { - - // ======= NODE 2 ======== - system2 = ActorSystem("system2", ConfigFactory - .parseString(""" - akka { - actor.provider = "akka.remote.RemoteActorRefProvider" - remote.netty.port=%d552 - cluster.node-to-join = "akka://system0@localhost:%d550" - }""".format(portPrefix, portPrefix)) - .withFallback(system.settings.config)) - .asInstanceOf[ActorSystemImpl] - val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] - node2 = Cluster(system2) - - val latch = new CountDownLatch(3) - node0.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - latch.countDown() - } - }) - node1.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - latch.countDown() - } - }) - node2.registerListener(new MembershipChangeListener { - def notify(members: SortedSet[Member]) { - latch.countDown() - } - }) - - latch.await(30.seconds.dilated.toMillis, TimeUnit.MILLISECONDS) - } + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + akka.cluster { + gossip-frequency = 200 ms + leader-actions-frequency = 200 ms + periodic-tasks-initial-delay = 300 ms } - } catch { - case e: Exception ⇒ - e.printStackTrace - fail(e.toString) - } + """))) - override def atTermination() { - if (node0 ne null) node0.shutdown() - if (system0 ne null) system0.shutdown() + nodeConfig(first, ConfigFactory.parseString(""" + # FIXME get rid of this hardcoded port + akka.remote.netty.port=2603 + """)) - if (node1 ne null) node1.shutdown() - if (system1 ne null) system1.shutdown() + nodeConfig(second, ConfigFactory.parseString(""" + # FIXME get rid of this hardcoded host:port + akka.cluster.node-to-join = "akka://MultiNodeSpec@localhost:2603" + """)) + + nodeConfig(third, ConfigFactory.parseString(""" + # FIXME get rid of this hardcoded host:port + akka.cluster.node-to-join = "akka://MultiNodeSpec@localhost:2603" + """)) + +} + +class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec +class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec +class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec + +abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) with ImplicitSender with BeforeAndAfter { + import MembershipChangeListenerMultiJvmSpec._ + + override def initialParticipants = 3 + + var node: Cluster = _ + + after { + testConductor.enter("after") + } + + "A set of connected cluster systems" must { + + val firstAddress = testConductor.getAddressFor(first).await + val secondAddress = testConductor.getAddressFor(second).await + + "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in { + + runOn(first, second) { + node = Cluster(system) + val latch = TestLatch() + node.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) + latch.countDown() + } + }) + latch.await + node.convergence.isDefined must be(true) + } + + } + + "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in { + + runOn(third) { + node = Cluster(system) + } + + // runOn all + val latch = TestLatch() + node.registerListener(new MembershipChangeListener { + def notify(members: SortedSet[Member]) { + if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) + latch.countDown() + } + }) + latch.await + node.convergence.isDefined must be(true) + + } + } - if (node2 ne null) node2.shutdown() - if (system2 ne null) system2.shutdown() - } } From e63d5b26d1047c777645311118b46302538e23d9 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 24 May 2012 15:42:34 +0200 Subject: [PATCH 3/5] Remove node var. See #2114 --- .../cluster/MembershipChangeListenerSpec.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index e81c52ad36..3411228d5a 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -50,7 +50,7 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan override def initialParticipants = 3 - var node: Cluster = _ + def node(): Cluster = Cluster(system) after { testConductor.enter("after") @@ -64,36 +64,31 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in { runOn(first, second) { - node = Cluster(system) val latch = TestLatch() - node.registerListener(new MembershipChangeListener { + node().registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) latch.countDown() } }) latch.await - node.convergence.isDefined must be(true) + node().convergence.isDefined must be(true) } } "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in { - runOn(third) { - node = Cluster(system) - } - // runOn all val latch = TestLatch() - node.registerListener(new MembershipChangeListener { + node().registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) latch.countDown() } }) latch.await - node.convergence.isDefined must be(true) + node().convergence.isDefined must be(true) } } From 60b85e7da6ad03adba4f36c65187af143ec1e285 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 24 May 2012 16:17:53 +0200 Subject: [PATCH 4/5] Remove hardcode host/port for node-to-join. See #2114 --- .../MembershipChangeListenerSpec.scala | 20 +++++-------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index 3411228d5a..c648cdf631 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -24,21 +24,6 @@ object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig { } """))) - nodeConfig(first, ConfigFactory.parseString(""" - # FIXME get rid of this hardcoded port - akka.remote.netty.port=2603 - """)) - - nodeConfig(second, ConfigFactory.parseString(""" - # FIXME get rid of this hardcoded host:port - akka.cluster.node-to-join = "akka://MultiNodeSpec@localhost:2603" - """)) - - nodeConfig(third, ConfigFactory.parseString(""" - # FIXME get rid of this hardcoded host:port - akka.cluster.node-to-join = "akka://MultiNodeSpec@localhost:2603" - """)) - } class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec @@ -64,6 +49,7 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in { runOn(first, second) { + node().join(firstAddress) val latch = TestLatch() node().registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { @@ -79,6 +65,10 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in { + runOn(third) { + node().join(firstAddress) + } + // runOn all val latch = TestLatch() node().registerListener(new MembershipChangeListener { From 8d1dea1750073ff68555158971fe67e2a37a1c50 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 25 May 2012 08:59:13 +0200 Subject: [PATCH 5/5] Change node naming. See 2114 --- .../MembershipChangeListenerSpec.scala | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala index c648cdf631..c5ae2f11e7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -35,7 +35,7 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan override def initialParticipants = 3 - def node(): Cluster = Cluster(system) + def cluster: Cluster = Cluster(system) after { testConductor.enter("after") @@ -43,22 +43,22 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan "A set of connected cluster systems" must { - val firstAddress = testConductor.getAddressFor(first).await - val secondAddress = testConductor.getAddressFor(second).await + val firstAddress = node(first).address + val secondAddress = node(second).address "(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in { runOn(first, second) { - node().join(firstAddress) + cluster.join(firstAddress) val latch = TestLatch() - node().registerListener(new MembershipChangeListener { + cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { if (members.size == 2 && members.forall(_.status == MemberStatus.Up)) latch.countDown() } }) latch.await - node().convergence.isDefined must be(true) + cluster.convergence.isDefined must be(true) } } @@ -66,19 +66,18 @@ abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChan "(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in { runOn(third) { - node().join(firstAddress) + cluster.join(firstAddress) } - // runOn all val latch = TestLatch() - node().registerListener(new MembershipChangeListener { + cluster.registerListener(new MembershipChangeListener { def notify(members: SortedSet[Member]) { if (members.size == 3 && members.forall(_.status == MemberStatus.Up)) latch.countDown() } }) latch.await - node().convergence.isDefined must be(true) + cluster.convergence.isDefined must be(true) } }