Merge pull request #480 from akka/wip-2109-port-cluster-test-jboner
Rewritten old in-memory ClientDowningSpec into multi-node specs.
This commit is contained in:
commit
7de010bb8b
5 changed files with 171 additions and 149 deletions
|
|
@ -806,6 +806,7 @@ class Cluster(system: ExtendedActorSystem) extends Extension { clusterNode ⇒
|
|||
* @return 'true' if it gossiped to a "deputy" member.
|
||||
*/
|
||||
private def gossipToRandomNodeOf(addresses: Iterable[Address]): Boolean = {
|
||||
log.debug("Cluster Node [{}] - Selecting random node to gossip to [{}]", remoteAddress, addresses.mkString(", "))
|
||||
if (addresses.isEmpty) false
|
||||
else {
|
||||
val peers = addresses filter (_ != remoteAddress) // filter out myself
|
||||
|
|
|
|||
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.actor.Address
|
||||
|
||||
object ClientDowningNodeThatIsUnreachableMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.auto-down = off")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class ClientDowningNodeThatIsUnreachableMultiJvmNode1 extends ClientDowningNodeThatIsUnreachableSpec
|
||||
class ClientDowningNodeThatIsUnreachableMultiJvmNode2 extends ClientDowningNodeThatIsUnreachableSpec
|
||||
class ClientDowningNodeThatIsUnreachableMultiJvmNode3 extends ClientDowningNodeThatIsUnreachableSpec
|
||||
class ClientDowningNodeThatIsUnreachableMultiJvmNode4 extends ClientDowningNodeThatIsUnreachableSpec
|
||||
|
||||
class ClientDowningNodeThatIsUnreachableSpec
|
||||
extends MultiNodeSpec(ClientDowningNodeThatIsUnreachableMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender with BeforeAndAfter {
|
||||
import ClientDowningNodeThatIsUnreachableMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 4
|
||||
|
||||
"Client of a 4 node cluster" must {
|
||||
|
||||
"be able to DOWN a node that is UNREACHABLE (killed)" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
awaitUpConvergence(nrOfMembers = 4)
|
||||
|
||||
val thirdAddress = node(third).address
|
||||
testConductor.enter("all-up")
|
||||
|
||||
// kill 'third' node
|
||||
testConductor.shutdown(third, 0)
|
||||
testConductor.removeNode(third)
|
||||
|
||||
// mark 'third' node as DOWN
|
||||
cluster.down(thirdAddress)
|
||||
testConductor.enter("down-third-node")
|
||||
|
||||
awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress))
|
||||
cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false)
|
||||
testConductor.enter("await-completion")
|
||||
}
|
||||
|
||||
runOn(third) {
|
||||
cluster.join(node(first).address)
|
||||
|
||||
awaitUpConvergence(nrOfMembers = 4)
|
||||
testConductor.enter("all-up")
|
||||
}
|
||||
|
||||
runOn(second, fourth) {
|
||||
cluster.join(node(first).address)
|
||||
awaitUpConvergence(nrOfMembers = 4)
|
||||
|
||||
val thirdAddress = node(third).address
|
||||
testConductor.enter("all-up")
|
||||
|
||||
testConductor.enter("down-third-node")
|
||||
|
||||
awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress))
|
||||
testConductor.enter("await-completion")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,77 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.actor.Address
|
||||
|
||||
object ClientDowningNodeThatIsUpMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
val fourth = role("fourth")
|
||||
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.auto-down = off")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class ClientDowningNodeThatIsUpMultiJvmNode1 extends ClientDowningNodeThatIsUpSpec
|
||||
class ClientDowningNodeThatIsUpMultiJvmNode2 extends ClientDowningNodeThatIsUpSpec
|
||||
class ClientDowningNodeThatIsUpMultiJvmNode3 extends ClientDowningNodeThatIsUpSpec
|
||||
class ClientDowningNodeThatIsUpMultiJvmNode4 extends ClientDowningNodeThatIsUpSpec
|
||||
|
||||
class ClientDowningNodeThatIsUpSpec
|
||||
extends MultiNodeSpec(ClientDowningNodeThatIsUpMultiJvmSpec)
|
||||
with MultiNodeClusterSpec
|
||||
with ImplicitSender with BeforeAndAfter {
|
||||
import ClientDowningNodeThatIsUpMultiJvmSpec._
|
||||
|
||||
override def initialParticipants = 4
|
||||
|
||||
"Client of a 4 node cluster" must {
|
||||
|
||||
"be able to DOWN a node that is UP (healthy and available)" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
awaitUpConvergence(nrOfMembers = 4)
|
||||
|
||||
val thirdAddress = node(third).address
|
||||
testConductor.enter("all-up")
|
||||
|
||||
// mark 'third' node as DOWN
|
||||
testConductor.removeNode(third)
|
||||
cluster.down(thirdAddress)
|
||||
testConductor.enter("down-third-node")
|
||||
|
||||
awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress))
|
||||
cluster.latestGossip.members.exists(_.address == thirdAddress) must be(false)
|
||||
testConductor.enter("await-completion")
|
||||
}
|
||||
|
||||
runOn(third) {
|
||||
cluster.join(node(first).address)
|
||||
awaitUpConvergence(nrOfMembers = 4)
|
||||
testConductor.enter("all-up")
|
||||
}
|
||||
|
||||
runOn(second, fourth) {
|
||||
cluster.join(node(first).address)
|
||||
awaitUpConvergence(nrOfMembers = 4)
|
||||
|
||||
val thirdAddress = node(third).address
|
||||
testConductor.enter("all-up")
|
||||
|
||||
testConductor.enter("down-third-node")
|
||||
|
||||
awaitUpConvergence(nrOfMembers = 3, canNotBePartOfRing = Seq(thirdAddress))
|
||||
testConductor.enter("await-completion")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -50,8 +50,7 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
|
|||
}
|
||||
|
||||
/**
|
||||
* Wait until the expected number of members has status Up
|
||||
* and convergence has been reached.
|
||||
* Wait until the expected number of members has status Up and convergence has been reached.
|
||||
*/
|
||||
def awaitUpConvergence(numberOfMembers: Int): Unit = {
|
||||
awaitCond(cluster.latestGossip.members.size == numberOfMembers)
|
||||
|
|
@ -59,6 +58,16 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
|
|||
awaitCond(cluster.convergence.isDefined, 10 seconds)
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until the expected number of members has status Up and convergence has been reached.
|
||||
* Also asserts that nodes in the 'canNotBePartOfRing' are *not* part of the cluster ring.
|
||||
*/
|
||||
def awaitUpConvergence(nrOfMembers: Int, canNotBePartOfRing: Seq[Address] = Seq.empty[Address]): Unit = {
|
||||
awaitCond(cluster.latestGossip.members.size == nrOfMembers)
|
||||
awaitCond(cluster.latestGossip.members.forall(_.status == MemberStatus.Up))
|
||||
awaitCond(canNotBePartOfRing forall (address ⇒ !(cluster.latestGossip.members exists (_.address == address))))
|
||||
}
|
||||
|
||||
def roleOfLeader(nodesInCluster: Seq[RoleName]): RoleName = {
|
||||
nodesInCluster.length must not be (0)
|
||||
nodesInCluster.sorted.head
|
||||
|
|
@ -75,5 +84,4 @@ trait MultiNodeClusterSpec { self: MultiNodeSpec ⇒
|
|||
def roleName(address: Address): Option[RoleName] = {
|
||||
testConductor.getNodes.await.find(node(_).address == address)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,145 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.cluster
|
||||
|
||||
import akka.testkit._
|
||||
import akka.dispatch._
|
||||
import akka.actor._
|
||||
import akka.remote._
|
||||
import akka.util.duration._
|
||||
|
||||
import com.typesafe.config._
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with ImplicitSender {
|
||||
val portPrefix = 1
|
||||
|
||||
var node1: Cluster = _
|
||||
var node2: Cluster = _
|
||||
var node3: Cluster = _
|
||||
var node4: Cluster = _
|
||||
|
||||
var system1: ActorSystemImpl = _
|
||||
var system2: ActorSystemImpl = _
|
||||
var system3: ActorSystemImpl = _
|
||||
var system4: ActorSystemImpl = _
|
||||
|
||||
try {
|
||||
"Client of a 4 node cluster" must {
|
||||
|
||||
// ======= NODE 1 ========
|
||||
system1 = ActorSystem("system1", ConfigFactory
|
||||
.parseString("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty.port = %d550
|
||||
}""".format(portPrefix))
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node1 = Cluster(system1)
|
||||
val fd1 = node1.failureDetector
|
||||
val address1 = node1.remoteAddress
|
||||
|
||||
// ======= NODE 2 ========
|
||||
system2 = ActorSystem("system2", ConfigFactory
|
||||
.parseString("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty.port = %d551
|
||||
cluster.node-to-join = "akka://system1@localhost:%d550"
|
||||
}""".format(portPrefix, portPrefix))
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node2 = Cluster(system2)
|
||||
val fd2 = node2.failureDetector
|
||||
val address2 = node2.remoteAddress
|
||||
|
||||
// ======= NODE 3 ========
|
||||
system3 = ActorSystem("system3", ConfigFactory
|
||||
.parseString("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty.port = %d552
|
||||
cluster.node-to-join = "akka://system1@localhost:%d550"
|
||||
}""".format(portPrefix, portPrefix))
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node3 = Cluster(system3)
|
||||
val fd3 = node3.failureDetector
|
||||
val address3 = node3.remoteAddress
|
||||
|
||||
// ======= NODE 4 ========
|
||||
system4 = ActorSystem("system4", ConfigFactory
|
||||
.parseString("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty.port = %d553
|
||||
cluster.node-to-join = "akka://system1@localhost:%d550"
|
||||
}""".format(portPrefix, portPrefix))
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node4 = Cluster(system4)
|
||||
val fd4 = node4.failureDetector
|
||||
val address4 = node4.remoteAddress
|
||||
|
||||
"be able to DOWN a node that is UP" taggedAs LongRunningTest in {
|
||||
println("Give the system time to converge...")
|
||||
awaitConvergence(node1 :: node2 :: node3 :: node4 :: Nil)
|
||||
|
||||
node3.shutdown()
|
||||
system3.shutdown()
|
||||
|
||||
// client marks node3 as DOWN
|
||||
node1.down(address3)
|
||||
|
||||
println("Give the system time to converge...")
|
||||
Thread.sleep(10.seconds.dilated.toMillis)
|
||||
awaitConvergence(node1 :: node2 :: node4 :: Nil)
|
||||
|
||||
node1.latestGossip.members.size must be(3)
|
||||
node1.latestGossip.members.exists(_.address == address3) must be(false)
|
||||
}
|
||||
|
||||
"be able to DOWN a node that is UNREACHABLE" taggedAs LongRunningTest in {
|
||||
node4.shutdown()
|
||||
system4.shutdown()
|
||||
|
||||
// clien marks node4 as DOWN
|
||||
node2.down(address4)
|
||||
|
||||
println("Give the system time to converge...")
|
||||
Thread.sleep(10.seconds.dilated.toMillis)
|
||||
awaitConvergence(node1 :: node2 :: Nil)
|
||||
|
||||
node1.latestGossip.members.size must be(2)
|
||||
node1.latestGossip.members.exists(_.address == address4) must be(false)
|
||||
node1.latestGossip.members.exists(_.address == address3) must be(false)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
e.printStackTrace
|
||||
fail(e.toString)
|
||||
}
|
||||
|
||||
override def atTermination() {
|
||||
if (node1 ne null) node1.shutdown()
|
||||
if (system1 ne null) system1.shutdown()
|
||||
|
||||
if (node2 ne null) node2.shutdown()
|
||||
if (system2 ne null) system2.shutdown()
|
||||
|
||||
if (node3 ne null) node3.shutdown()
|
||||
if (system3 ne null) system3.shutdown()
|
||||
|
||||
if (node4 ne null) node4.shutdown()
|
||||
if (system4 ne null) system4.shutdown()
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue