Port GossipingAccrualFailureDetectorSpec to MultiNodeSpec, see #2110
This commit is contained in:
parent
478cd96639
commit
bffb14b022
1 changed files with 52 additions and 90 deletions
|
|
@ -3,111 +3,73 @@
|
|||
*/
|
||||
package akka.cluster
|
||||
|
||||
import akka.testkit._
|
||||
import akka.dispatch._
|
||||
import akka.actor._
|
||||
import akka.remote._
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.util.duration._
|
||||
import akka.testkit._
|
||||
|
||||
import com.typesafe.config._
|
||||
object GossipingAccrualFailureDetectorMultiJvmSpec extends MultiNodeConfig {
|
||||
val first = role("first")
|
||||
val second = role("second")
|
||||
val third = role("third")
|
||||
|
||||
import java.net.InetSocketAddress
|
||||
commonConfig(debugConfig(on = false).
|
||||
withFallback(ConfigFactory.parseString("akka.cluster.failure-detector.threshold=5")).
|
||||
withFallback(MultiNodeClusterSpec.clusterConfig))
|
||||
}
|
||||
|
||||
class GossipingAccrualFailureDetectorSpec extends ClusterSpec with ImplicitSender {
|
||||
val portPrefix = 2
|
||||
class GossipingAccrualFailureDetectorMultiJvmNode1 extends GossipingAccrualFailureDetectorSpec
|
||||
class GossipingAccrualFailureDetectorMultiJvmNode2 extends GossipingAccrualFailureDetectorSpec
|
||||
class GossipingAccrualFailureDetectorMultiJvmNode3 extends GossipingAccrualFailureDetectorSpec
|
||||
|
||||
var node1: Cluster = _
|
||||
var node2: Cluster = _
|
||||
var node3: Cluster = _
|
||||
abstract class GossipingAccrualFailureDetectorSpec extends MultiNodeSpec(GossipingAccrualFailureDetectorMultiJvmSpec)
|
||||
with MultiNodeClusterSpec with ImplicitSender with BeforeAndAfter {
|
||||
import GossipingAccrualFailureDetectorMultiJvmSpec._
|
||||
|
||||
var system1: ActorSystemImpl = _
|
||||
var system2: ActorSystemImpl = _
|
||||
var system3: ActorSystemImpl = _
|
||||
override def initialParticipants = 3
|
||||
|
||||
try {
|
||||
"A Gossip-driven Failure Detector" must {
|
||||
lazy val firstAddress = node(first).address
|
||||
lazy val secondAddress = node(second).address
|
||||
lazy val thirdAddress = node(third).address
|
||||
|
||||
// ======= NODE 1 ========
|
||||
system1 = ActorSystem("system1", ConfigFactory
|
||||
.parseString("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty.port=%d550
|
||||
}""".format(portPrefix))
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node1 = Cluster(system1)
|
||||
val fd1 = node1.failureDetector
|
||||
val address1 = node1.remoteAddress
|
||||
after {
|
||||
testConductor.enter("after")
|
||||
}
|
||||
|
||||
// ======= NODE 2 ========
|
||||
system2 = ActorSystem("system2", ConfigFactory
|
||||
.parseString("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty.port=%d551
|
||||
cluster.node-to-join = "akka://system1@localhost:%d550"
|
||||
}""".format(portPrefix, portPrefix))
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node2 = Cluster(system2)
|
||||
val fd2 = node2.failureDetector
|
||||
val address2 = node2.remoteAddress
|
||||
"A Gossip-driven Failure Detector" must {
|
||||
|
||||
// ======= NODE 3 ========
|
||||
system3 = ActorSystem("system3", ConfigFactory
|
||||
.parseString("""
|
||||
akka {
|
||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||
remote.netty.port=%d552
|
||||
cluster.node-to-join = "akka://system1@localhost:%d550"
|
||||
}""".format(portPrefix, portPrefix))
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node3 = Cluster(system3)
|
||||
val fd3 = node3.failureDetector
|
||||
val address3 = node3.remoteAddress
|
||||
"receive gossip heartbeats so that all healthy systems in the cluster are marked 'available'" taggedAs LongRunningTest in {
|
||||
// make sure that the node-to-join is started before other join
|
||||
runOn(first) {
|
||||
cluster.self
|
||||
}
|
||||
testConductor.enter("first-started")
|
||||
|
||||
"receive gossip heartbeats so that all healthy systems in the cluster are marked 'available'" taggedAs LongRunningTest in {
|
||||
println("Let the systems gossip for a while...")
|
||||
Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
|
||||
fd1.isAvailable(address2) must be(true)
|
||||
fd1.isAvailable(address3) must be(true)
|
||||
fd2.isAvailable(address1) must be(true)
|
||||
fd2.isAvailable(address3) must be(true)
|
||||
fd3.isAvailable(address1) must be(true)
|
||||
fd3.isAvailable(address2) must be(true)
|
||||
cluster.join(firstAddress)
|
||||
|
||||
log.info("Let the systems gossip for a while...")
|
||||
10.seconds.dilated.sleep // let them gossip
|
||||
cluster.failureDetector.isAvailable(firstAddress) must be(true)
|
||||
cluster.failureDetector.isAvailable(secondAddress) must be(true)
|
||||
cluster.failureDetector.isAvailable(thirdAddress) must be(true)
|
||||
}
|
||||
|
||||
"mark system as 'unavailable' if a system in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in {
|
||||
runOn(first) {
|
||||
testConductor.shutdown(third, 0)
|
||||
testConductor.removeNode(third)
|
||||
}
|
||||
|
||||
"mark system as 'unavailable' if a system in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in {
|
||||
// shut down system3
|
||||
node3.shutdown()
|
||||
system3.shutdown()
|
||||
println("Give the remaning systems time to detect failure...")
|
||||
Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of system3
|
||||
fd1.isAvailable(address2) must be(true)
|
||||
fd1.isAvailable(address3) must be(false)
|
||||
fd2.isAvailable(address1) must be(true)
|
||||
fd2.isAvailable(address3) must be(false)
|
||||
runOn(first, second) {
|
||||
log.info("Give the remaning systems time to detect failure...")
|
||||
15.seconds.dilated.sleep // give them time to detect failure
|
||||
cluster.failureDetector.isAvailable(firstAddress) must be(true)
|
||||
cluster.failureDetector.isAvailable(secondAddress) must be(true)
|
||||
cluster.failureDetector.isAvailable(thirdAddress) must be(false)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
e.printStackTrace
|
||||
fail(e.toString)
|
||||
}
|
||||
|
||||
override def atTermination() {
|
||||
if (node1 ne null) node1.shutdown()
|
||||
if (system1 ne null) system1.shutdown()
|
||||
|
||||
if (node2 ne null) node2.shutdown()
|
||||
if (system2 ne null) system2.shutdown()
|
||||
|
||||
if (node3 ne null) node3.shutdown()
|
||||
if (system3 ne null) system3.shutdown()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue