Port MembershipChangeListenerSpec to MultiNodeSpec. See #2114

This commit is contained in:
Patrik Nordwall 2012-05-24 14:46:35 +02:00
parent ad4725aa70
commit 7322ff3ef0
2 changed files with 88 additions and 250 deletions

View file

@ -1,134 +0,0 @@
// package akka.cluster
// import akka.actor.Actor
// import akka.remote._
// import akka.routing._
// import akka.routing.Routing.Broadcast
// object GossipMembershipMultiJvmSpec {
// val NrOfNodes = 4
// class SomeActor extends Actor with Serializable {
// def receive = {
// case "hit" sender ! system.nodename
// case "end" self.stop()
// }
// }
// import com.typesafe.config.ConfigFactory
// val commonConfig = ConfigFactory.parseString("""
// akka {
// loglevel = "WARNING"
// cluster {
// seed-nodes = ["localhost:9991"]
// }
// remote.server.hostname = "localhost"
// }""")
// val node1Config = ConfigFactory.parseString("""
// akka {
// remote.server.port = "9991"
// cluster.nodename = "node1"
// }""") withFallback commonConfig
// val node2Config = ConfigFactory.parseString("""
// akka {
// remote.server.port = "9992"
// cluster.nodename = "node2"
// }""") withFallback commonConfig
// val node3Config = ConfigFactory.parseString("""
// akka {
// remote.server.port = "9993"
// cluster.nodename = "node3"
// }""") withFallback commonConfig
// val node4Config = ConfigFactory.parseString("""
// akka {
// remote.server.port = "9994"
// cluster.nodename = "node4"
// }""") withFallback commonConfig
// }
// class GossipMembershipMultiJvmNode1 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node1Config) {
// import GossipMembershipMultiJvmSpec._
// val nodes = NrOfNodes
// "A cluster" must {
// "allow new node to join and should reach convergence with new membership table" in {
// barrier("setup")
// remote.start()
// barrier("start")
// val actor = system.actorOf(Props[SomeActor]("service-hello")
// actor.isInstanceOf[RoutedActorRef] must be(true)
// val connectionCount = NrOfNodes - 1
// val iterationCount = 10
// var replies = Map(
// "node1" -> 0,
// "node2" -> 0,
// "node3" -> 0)
// for (i 0 until iterationCount) {
// for (k 0 until connectionCount) {
// val nodeName = (actor ? "hit").as[String].getOrElse(fail("No id returned by actor"))
// replies = replies + (nodeName -> (replies(nodeName) + 1))
// }
// }
// barrier("broadcast-end")
// actor ! Broadcast("end")
// barrier("end")
// replies.values foreach { _ must be > (0) }
// barrier("done")
// }
// }
// }
// class GossipMembershipMultiJvmNode2 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node2Config) {
// import GossipMembershipMultiJvmSpec._
// val nodes = NrOfNodes
// "___" must {
// "___" in {
// barrier("setup")
// remote.start()
// barrier("start")
// barrier("broadcast-end")
// barrier("end")
// barrier("done")
// }
// }
// }
// class GossipMembershipMultiJvmNode3 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node3Config) {
// import GossipMembershipMultiJvmSpec._
// val nodes = NrOfNodes
// "___" must {
// "___" in {
// barrier("setup")
// remote.start()
// barrier("start")
// barrier("broadcast-end")
// barrier("end")
// barrier("done")
// }
// }
// }
// class GossipMembershipMultiJvmNode4 extends AkkaRemoteSpec(GossipMembershipMultiJvmSpec.node4Config) {
// import GossipMembershipMultiJvmSpec._
// val nodes = NrOfNodes
// "___" must {
// "___" in {
// barrier("setup")
// remote.start()
// barrier("start")
// barrier("broadcast-end")
// barrier("end")
// barrier("done")
// }
// }
// }

View file

@ -3,127 +3,99 @@
*/
package akka.cluster
import akka.actor.ActorSystem
import akka.actor.ActorSystemImpl
import akka.remote.RemoteActorRefProvider
import akka.testkit.ImplicitSender
import akka.testkit.LongRunningTest
import akka.testkit.duration2TestDuration
import akka.util.duration.intToDurationInt
import com.typesafe.config.ConfigFactory
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.collection.immutable.SortedSet
import org.scalatest.BeforeAndAfter
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit.ImplicitSender
import akka.testkit.TestLatch
class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender {
val portPrefix = 6
object MembershipChangeListenerMultiJvmSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
var node0: Cluster = _
var node1: Cluster = _
var node2: Cluster = _
var system0: ActorSystemImpl = _
var system1: ActorSystemImpl = _
var system2: ActorSystemImpl = _
try {
"A set of connected cluster systems" must {
"(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
system0 = ActorSystem("system0", ConfigFactory
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port = %d550
}""".format(portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
node0 = Cluster(system0)
system1 = ActorSystem("system1", ConfigFactory
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=%d551
cluster.node-to-join = "akka://system0@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
node1 = Cluster(system1)
val latch = new CountDownLatch(2)
node0.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
latch.countDown()
}
})
node1.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
latch.countDown()
}
})
latch.await(10.seconds.dilated.toMillis, TimeUnit.MILLISECONDS)
Thread.sleep(10.seconds.dilated.toMillis)
// check cluster convergence
node0.convergence must be('defined)
node1.convergence must be('defined)
}
"(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
// ======= NODE 2 ========
system2 = ActorSystem("system2", ConfigFactory
.parseString("""
akka {
actor.provider = "akka.remote.RemoteActorRefProvider"
remote.netty.port=%d552
cluster.node-to-join = "akka://system0@localhost:%d550"
}""".format(portPrefix, portPrefix))
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
node2 = Cluster(system2)
val latch = new CountDownLatch(3)
node0.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
latch.countDown()
}
})
node1.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
latch.countDown()
}
})
node2.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
latch.countDown()
}
})
latch.await(30.seconds.dilated.toMillis, TimeUnit.MILLISECONDS)
}
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
akka.cluster {
gossip-frequency = 200 ms
leader-actions-frequency = 200 ms
periodic-tasks-initial-delay = 300 ms
}
} catch {
case e: Exception
e.printStackTrace
fail(e.toString)
}
""")))
override def atTermination() {
if (node0 ne null) node0.shutdown()
if (system0 ne null) system0.shutdown()
nodeConfig(first, ConfigFactory.parseString("""
# FIXME get rid of this hardcoded port
akka.remote.netty.port=2603
"""))
if (node1 ne null) node1.shutdown()
if (system1 ne null) system1.shutdown()
nodeConfig(second, ConfigFactory.parseString("""
# FIXME get rid of this hardcoded host:port
akka.cluster.node-to-join = "akka://MultiNodeSpec@localhost:2603"
"""))
nodeConfig(third, ConfigFactory.parseString("""
# FIXME get rid of this hardcoded host:port
akka.cluster.node-to-join = "akka://MultiNodeSpec@localhost:2603"
"""))
}
class MembershipChangeListenerMultiJvmNode1 extends MembershipChangeListenerSpec
class MembershipChangeListenerMultiJvmNode2 extends MembershipChangeListenerSpec
class MembershipChangeListenerMultiJvmNode3 extends MembershipChangeListenerSpec
abstract class MembershipChangeListenerSpec extends MultiNodeSpec(MembershipChangeListenerMultiJvmSpec) with ImplicitSender with BeforeAndAfter {
import MembershipChangeListenerMultiJvmSpec._
override def initialParticipants = 3
var node: Cluster = _
after {
testConductor.enter("after")
}
"A set of connected cluster systems" must {
val firstAddress = testConductor.getAddressFor(first).await
val secondAddress = testConductor.getAddressFor(second).await
"(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in {
runOn(first, second) {
node = Cluster(system)
val latch = TestLatch()
node.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 2 && members.forall(_.status == MemberStatus.Up))
latch.countDown()
}
})
latch.await
node.convergence.isDefined must be(true)
}
}
"(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" in {
runOn(third) {
node = Cluster(system)
}
// runOn all
val latch = TestLatch()
node.registerListener(new MembershipChangeListener {
def notify(members: SortedSet[Member]) {
if (members.size == 3 && members.forall(_.status == MemberStatus.Up))
latch.countDown()
}
})
latch.await
node.convergence.isDefined must be(true)
}
}
if (node2 ne null) node2.shutdown()
if (system2 ne null) system2.shutdown()
}
}