Port JoinTwoClustersSpec to MultiNodeSpec. See #2111
This commit is contained in:
parent
8d114f5da5
commit
db16b6c4b3
1 changed files with 87 additions and 148 deletions
|
|
@ -4,174 +4,113 @@
|
||||||
|
|
||||||
package akka.cluster
|
package akka.cluster
|
||||||
|
|
||||||
import akka.actor.ActorSystem
|
import org.scalatest.BeforeAndAfter
|
||||||
import akka.actor.ActorSystemImpl
|
|
||||||
import akka.testkit.ImplicitSender
|
|
||||||
import akka.testkit.LongRunningTest
|
|
||||||
import akka.testkit.duration2TestDuration
|
|
||||||
import akka.util.duration.intToDurationInt
|
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
import akka.remote.testkit.MultiNodeConfig
|
||||||
|
import akka.remote.testkit.MultiNodeSpec
|
||||||
|
import akka.testkit.ImplicitSender
|
||||||
|
|
||||||
class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.threshold = 5") with ImplicitSender {
|
object JoinTwoClustersMultiJvmSpec extends MultiNodeConfig {
|
||||||
val portPrefix = 3
|
val a1 = role("a1")
|
||||||
|
val a2 = role("a2")
|
||||||
|
val b1 = role("b1")
|
||||||
|
val b2 = role("b2")
|
||||||
|
val c1 = role("c1")
|
||||||
|
val c2 = role("c2")
|
||||||
|
|
||||||
var node1: Cluster = _
|
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
|
||||||
var node2: Cluster = _
|
akka.cluster {
|
||||||
var node3: Cluster = _
|
gossip-frequency = 200 ms
|
||||||
var node4: Cluster = _
|
leader-actions-frequency = 200 ms
|
||||||
var node5: Cluster = _
|
periodic-tasks-initial-delay = 300 ms
|
||||||
var node6: Cluster = _
|
}
|
||||||
|
""")))
|
||||||
|
|
||||||
var system1: ActorSystemImpl = _
|
}
|
||||||
var system2: ActorSystemImpl = _
|
|
||||||
var system3: ActorSystemImpl = _
|
class JoinTwoClustersMultiJvmNode1 extends JoinTwoClustersSpec
|
||||||
var system4: ActorSystemImpl = _
|
class JoinTwoClustersMultiJvmNode2 extends JoinTwoClustersSpec
|
||||||
var system5: ActorSystemImpl = _
|
class JoinTwoClustersMultiJvmNode3 extends JoinTwoClustersSpec
|
||||||
var system6: ActorSystemImpl = _
|
class JoinTwoClustersMultiJvmNode4 extends JoinTwoClustersSpec
|
||||||
|
class JoinTwoClustersMultiJvmNode5 extends JoinTwoClustersSpec
|
||||||
|
class JoinTwoClustersMultiJvmNode6 extends JoinTwoClustersSpec
|
||||||
|
|
||||||
|
abstract class JoinTwoClustersSpec extends MultiNodeSpec(JoinTwoClustersMultiJvmSpec) with ImplicitSender with BeforeAndAfter {
|
||||||
|
import JoinTwoClustersMultiJvmSpec._
|
||||||
|
|
||||||
|
override def initialParticipants = 6
|
||||||
|
|
||||||
|
def node(): Cluster = Cluster(system)
|
||||||
|
|
||||||
|
after {
|
||||||
|
testConductor.enter("after")
|
||||||
|
}
|
||||||
|
|
||||||
|
val a1Address = testConductor.getAddressFor(a1).await
|
||||||
|
val b1Address = testConductor.getAddressFor(b1).await
|
||||||
|
val c1Address = testConductor.getAddressFor(c1).await
|
||||||
|
|
||||||
|
def awaitUpConvergence(numberOfMembers: Int): Unit = {
|
||||||
|
awaitCond(node().latestGossip.members.size == numberOfMembers)
|
||||||
|
awaitCond(node().latestGossip.members.forall(_.status == MemberStatus.Up))
|
||||||
|
awaitCond(node().convergence.isDefined)
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
|
||||||
"Three different clusters (A, B and C)" must {
|
"Three different clusters (A, B and C)" must {
|
||||||
|
|
||||||
// ======= NODE 1 ========
|
"be able to 'elect' a single leader after joining (A -> B)" in {
|
||||||
system1 = ActorSystem("system1", ConfigFactory
|
|
||||||
.parseString("""
|
|
||||||
akka {
|
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
|
||||||
remote.netty.port = %d551
|
|
||||||
}""".format(portPrefix))
|
|
||||||
.withFallback(system.settings.config))
|
|
||||||
.asInstanceOf[ActorSystemImpl]
|
|
||||||
node1 = Cluster(system1)
|
|
||||||
|
|
||||||
// ======= NODE 2 ========
|
runOn(a1, a2) {
|
||||||
system2 = ActorSystem("system2", ConfigFactory
|
node().join(a1Address)
|
||||||
.parseString("""
|
}
|
||||||
akka {
|
runOn(b1, b2) {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
node().join(b1Address)
|
||||||
remote.netty.port = %d552
|
}
|
||||||
cluster.node-to-join = "akka://system1@localhost:%d551"
|
runOn(c1, c2) {
|
||||||
}""".format(portPrefix, portPrefix))
|
node().join(c1Address)
|
||||||
.withFallback(system.settings.config))
|
|
||||||
.asInstanceOf[ActorSystemImpl]
|
|
||||||
node2 = Cluster(system2)
|
|
||||||
|
|
||||||
// ======= NODE 3 ========
|
|
||||||
system3 = ActorSystem("system3", ConfigFactory
|
|
||||||
.parseString("""
|
|
||||||
akka {
|
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
|
||||||
remote.netty.port = %d553
|
|
||||||
}""".format(portPrefix))
|
|
||||||
.withFallback(system.settings.config))
|
|
||||||
.asInstanceOf[ActorSystemImpl]
|
|
||||||
node3 = Cluster(system3)
|
|
||||||
|
|
||||||
// ======= NODE 4 ========
|
|
||||||
system4 = ActorSystem("system4", ConfigFactory
|
|
||||||
.parseString("""
|
|
||||||
akka {
|
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
|
||||||
remote.netty.port = %d554
|
|
||||||
cluster.node-to-join = "akka://system3@localhost:%d553"
|
|
||||||
}""".format(portPrefix, portPrefix))
|
|
||||||
.withFallback(system.settings.config))
|
|
||||||
.asInstanceOf[ActorSystemImpl]
|
|
||||||
node4 = Cluster(system4)
|
|
||||||
|
|
||||||
// ======= NODE 5 ========
|
|
||||||
system5 = ActorSystem("system5", ConfigFactory
|
|
||||||
.parseString("""
|
|
||||||
akka {
|
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
|
||||||
remote.netty.port = %d555
|
|
||||||
}""".format(portPrefix))
|
|
||||||
.withFallback(system.settings.config))
|
|
||||||
.asInstanceOf[ActorSystemImpl]
|
|
||||||
node5 = Cluster(system5)
|
|
||||||
|
|
||||||
// ======= NODE 6 ========
|
|
||||||
system6 = ActorSystem("system6", ConfigFactory
|
|
||||||
.parseString("""
|
|
||||||
akka {
|
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
|
||||||
remote.netty.port = %d556
|
|
||||||
cluster.node-to-join = "akka://system5@localhost:%d555"
|
|
||||||
}""".format(portPrefix, portPrefix))
|
|
||||||
.withFallback(system.settings.config))
|
|
||||||
.asInstanceOf[ActorSystemImpl]
|
|
||||||
node6 = Cluster(system6)
|
|
||||||
|
|
||||||
"be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in {
|
|
||||||
|
|
||||||
println("Give the system time to converge...")
|
|
||||||
awaitConvergence(node1 :: node2 :: node3 :: node4 :: node5 :: node6 :: Nil)
|
|
||||||
|
|
||||||
// check leader
|
|
||||||
node1.isLeader must be(true)
|
|
||||||
node2.isLeader must be(false)
|
|
||||||
node3.isLeader must be(true)
|
|
||||||
node4.isLeader must be(false)
|
|
||||||
node5.isLeader must be(true)
|
|
||||||
node6.isLeader must be(false)
|
|
||||||
|
|
||||||
// join
|
|
||||||
node4.join(node1.remoteAddress)
|
|
||||||
//node1.scheduleNodeJoin(node4.remoteAddress)
|
|
||||||
|
|
||||||
println("Give the system time to converge...")
|
|
||||||
Thread.sleep(10.seconds.dilated.toMillis)
|
|
||||||
awaitConvergence(node1 :: node2 :: node3 :: node4 :: node5 :: node6 :: Nil)
|
|
||||||
|
|
||||||
// check leader
|
|
||||||
node1.isLeader must be(true)
|
|
||||||
node2.isLeader must be(false)
|
|
||||||
node3.isLeader must be(false)
|
|
||||||
node4.isLeader must be(false)
|
|
||||||
node5.isLeader must be(true)
|
|
||||||
node6.isLeader must be(false)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"be able to 'elect' a single leader after joining (C -> A + B)" taggedAs LongRunningTest in {
|
awaitUpConvergence(numberOfMembers = 2)
|
||||||
// join
|
|
||||||
node4.join(node5.remoteAddress)
|
|
||||||
//node5.scheduleNodeJoin(node4.remoteAddress)
|
|
||||||
|
|
||||||
println("Give the system time to converge...")
|
runOn(a1, b1, c1) {
|
||||||
Thread.sleep(10.seconds.dilated.toMillis)
|
node().isLeader must be(true)
|
||||||
awaitConvergence(node1 :: node2 :: node3 :: node4 :: node5 :: node6 :: Nil)
|
|
||||||
|
|
||||||
// check leader
|
|
||||||
node1.isLeader must be(true)
|
|
||||||
node2.isLeader must be(false)
|
|
||||||
node3.isLeader must be(false)
|
|
||||||
node4.isLeader must be(false)
|
|
||||||
node5.isLeader must be(false)
|
|
||||||
node6.isLeader must be(false)
|
|
||||||
}
|
}
|
||||||
}
|
runOn(a2, b2, c2) {
|
||||||
} catch {
|
node().isLeader must be(false)
|
||||||
case e: Exception ⇒
|
|
||||||
e.printStackTrace
|
|
||||||
fail(e.toString)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override def atTermination() {
|
runOn(b2) {
|
||||||
if (node1 ne null) node1.shutdown()
|
node().join(a1Address)
|
||||||
if (system1 ne null) system1.shutdown()
|
}
|
||||||
|
|
||||||
if (node2 ne null) node2.shutdown()
|
runOn(a1, a2, b1, b2) {
|
||||||
if (system2 ne null) system2.shutdown()
|
awaitUpConvergence(numberOfMembers = 4)
|
||||||
|
}
|
||||||
|
|
||||||
if (node3 ne null) node3.shutdown()
|
runOn(a1, c1) {
|
||||||
if (system3 ne null) system3.shutdown()
|
node().isLeader must be(true)
|
||||||
|
}
|
||||||
|
runOn(a2, b1, b2, c2) {
|
||||||
|
node().isLeader must be(false)
|
||||||
|
}
|
||||||
|
|
||||||
if (node4 ne null) node4.shutdown()
|
}
|
||||||
if (system4 ne null) system4.shutdown()
|
|
||||||
|
|
||||||
if (node5 ne null) node5.shutdown()
|
"be able to 'elect' a single leader after joining (C -> A + B)" in {
|
||||||
if (system5 ne null) system5.shutdown()
|
|
||||||
|
|
||||||
if (node6 ne null) node6.shutdown()
|
runOn(b2) {
|
||||||
if (system6 ne null) system6.shutdown()
|
node().join(c1Address)
|
||||||
|
}
|
||||||
|
|
||||||
|
awaitUpConvergence(numberOfMembers = 6)
|
||||||
|
|
||||||
|
runOn(a1) {
|
||||||
|
node().isLeader must be(true)
|
||||||
|
}
|
||||||
|
runOn(a2, b1, b2, c1, c2) {
|
||||||
|
node().isLeader must be(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue