Merge pull request #473 from akka/wip-2114-MembershipChangeListenerSpec-patriknw

Port MembershipChangeListenerSpec to MultiNodeSpec. See #2114
This commit is contained in:
patriknw 2012-05-25 00:01:20 -07:00
commit 8a454358d5
3 changed files with 85 additions and 263 deletions

View file

@ -1,134 +0,0 @@
// package akka.cluster
// import akka.actor.Actor
// import akka.remote._
// import akka.routing._
// import akka.routing.Routing.Broadcast
// object GossipMembershipMultiJvmSpec {
// val NrOfNodes = 4
// class SomeActor extends Actor with Serializable {
// def receive = {
// case "hit" sender ! system.nodename
// case "end" self.stop()
// }
// }
// import com.typesafe.config.ConfigFactory
// val commonConfig = ConfigFactory.parseString("""
// akka {
// loglevel = "WARNING"
// cluster {
// seed-nodes = ["localhost:9991"]
// }
// remote.server.hostname = "localhost"
// }""")
// val node1Config = ConfigFactory.parseString("""
// akka {
// remote.server.port = "9991"
// cluster.nodename = "node1"
// }""") withFallback commonConfig
// val node2Config = ConfigFactory.parseString("""
// akka {
// remote.server.port = "9992"
// cluster.nodename = "node2"
// }""") withFallback commonConfig
// val node3Config = ConfigFactory.parseString("""
// akka {
// remote.server.port = "9993"
// cluster.nodename = "node3"
// }""") withFallback commonConfig
// val node4Config = ConfigFactory.parseString("""
// akka {
// remote.server.port = "9994"
// cluster.nodename = "node4"
// }""") withFallback commonConfig
// }
// class GossipMembershipMultiJvmNode1 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node1Config) {
// import GossipMembershipMultiJvmSpec._
// val nodes = NrOfNodes
// "A cluster" must {
// "allow new node to join and should reach convergence with new membership table" in {
// barrier("setup")
// remote.start()
// barrier("start")
// val actor = system.actorOf(Props[SomeActor]("service-hello")
// actor.isInstanceOf[RoutedActorRef] must be(true)
// val connectionCount = NrOfNodes - 1
// val iterationCount = 10
// var replies = Map(
// "node1" -> 0,
// "node2" -> 0,
// "node3" -> 0)
// for (i 0 until iterationCount) {
// for (k 0 until connectionCount) {
// val nodeName = (actor ? "hit").as[String].getOrElse(fail("No id returned by actor"))
// replies = replies + (nodeName -> (replies(nodeName) + 1))
// }
// }
// barrier("broadcast-end")
// actor ! Broadcast("end")
// barrier("end")
// replies.values foreach { _ must be > (0) }
// barrier("done")
// }
// }
// }
// class GossipMembershipMultiJvmNode2 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node2Config) {
// import GossipMembershipMultiJvmSpec._
// val nodes = NrOfNodes
// "___" must {
// "___" in {
// barrier("setup")
// remote.start()
// barrier("start")
// barrier("broadcast-end")
// barrier("end")
// barrier("done")
// }
// }
// }
// class GossipMembershipMultiJvmNode3 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node3Config) {
// import GossipMembershipMultiJvmSpec._
// val nodes = NrOfNodes
// "___" must {
// "___" in {
// barrier("setup")
// remote.start()
// barrier("start")
// barrier("broadcast-end")
// barrier("end")
// barrier("done")
// }
// }
// }
// class GossipMembershipMultiJvmNode4 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node4Config) {
// import GossipMembershipMultiJvmSpec._
// val nodes = NrOfNodes
// "___" must {
// "___" in {
// barrier("setup")
// remote.start()
// barrier("start")
// barrier("broadcast-end")
// barrier("end")
// barrier("done")
// }
// }
// }

View file

@ -0,0 +1,85 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import org.scalatest.BeforeAndAfter
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import akka.testkit.TestLatch
object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
akka.cluster {
gossip-frequency = 200 ms
leader-actions-frequency = 200 ms
periodic-tasks-initial-delay = 300 ms
}
""")))
}
class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec
class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec
class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec
abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) with ImplicitSender with BeforeAndAfter {
import MembershipChangeListenerMultiJvmSpec._
override def initialParticipants = 3
def cluster: Cluster = Cluster(system)
after {
testConductor.enter("after")
}
"A set of connected cluster systems" must {
val firstAddress = node(first).address
val secondAddress = node(second).address
"(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in {
runOn(first, second) {
cluster.join(firstAddress)
val latch = TestLatch()
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
latch.countDown()
}
})
latch.await
cluster.convergence.isDefined must be(true)
}
}
"(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in {
runOn(third) {
cluster.join(firstAddress)
}
val latch = TestLatch()
cluster.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 3 && members.forall(_.status == MemberStatus.Up))
latch.countDown()
}
})
latch.await
cluster.convergence.isDefined must be(true)
}
}
}

View file

@ -1,129 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import akka.testkit._
import akka.dispatch._
import akka.actor._
import akka.remote._
import akka.util.duration._
import java.net.InetSocketAddress
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import scala.collection.immutable.SortedSet
import com.typesafe.config._
class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender {
val portPrefix = 6
var node0: Cluster = _
var node1: Cluster = _
var node2: Cluster = _
var system0: ActorSystemImpl = _
var system1: ActorSystemImpl = _
var system2: ActorSystemImpl = _
try {
"A set of connected cluster systems" must {
"(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
system0 = ActorSystem("system0", ConfigFactory
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = %d550
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
node0 = Cluster(system0)
system1 = ActorSystem("system1", ConfigFactory
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=%d551
cluster.node-to-join = "akka://system0@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
node1 = Cluster(system1)
val latch = new CountDownLatch(2)
node0.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
latch.countDown()
}
})
node1.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
latch.countDown()
}
})
latch.await(10.seconds.dilated.toMillis, TimeUnit.MILLISECONDS)
Thread.sleep(10.seconds.dilated.toMillis)
// check cluster convergence
node0.convergence must be('defined)
node1.convergence must be('defined)
}
"(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
// ======= NODE 2 ========
system2 = ActorSystem("system2", ConfigFactory
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=%d552
cluster.node-to-join = "akka://system0@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
node2 = Cluster(system2)
val latch = new CountDownLatch(3)
node0.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
latch.countDown()
}
})
node1.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
latch.countDown()
}
})
node2.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
latch.countDown()
}
})
latch.await(30.seconds.dilated.toMillis, TimeUnit.MILLISECONDS)
}
}
} catch {
case e: Exception
e.printStackTrace
fail(e.toString)
}
override def atTermination() {
if (node0 ne null) node0.shutdown()
if (system0 ne null) system0.shutdown()
if (node1 ne null) node1.shutdown()
if (system1 ne null) system1.shutdown()
if (node2 ne null) node2.shutdown()
if (system2 ne null) system2.shutdown()
}
}