Test gossip in large cluster, see #2239
This commit is contained in:
parent
a900052f68
commit
aca66de732
2 changed files with 276 additions and 0 deletions
|
|
@ -0,0 +1,267 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.cluster
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.testkit._
|
||||
import akka.util.duration._
|
||||
import akka.actor.ActorSystem
|
||||
import akka.util.Deadline
|
||||
import java.util.concurrent.TimeoutException
|
||||
import scala.collection.immutable.SortedSet
|
||||
import akka.dispatch.Await
|
||||
import akka.util.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
import akka.remote.testconductor.RoleName
|
||||
|
||||
object LargeClusterMultiJvmSpec extends MultiNodeConfig {
|
||||
// each jvm simulates a datacenter with many nodes
|
||||
val firstDatacenter = role("first-datacenter")
|
||||
val secondDatacenter = role("second-datacenter")
|
||||
val thirdDatacenter = role("third-datacenter")
|
||||
val fourthDatacenter = role("fourth-datacenter")
|
||||
val fifthDatacenter = role("fifth-datacenter")
|
||||
|
||||
// Note that this test uses default configuration,
|
||||
// not MultiNodeClusterSpec.clusterConfig
|
||||
commonConfig(ConfigFactory.parseString("""
|
||||
# Number of ActorSystems in each jvm, can be specified as
|
||||
# system property when running real tests. Many nodes
|
||||
# will take long time.
|
||||
akka.test.large-cluster-spec.nodes-per-datacenter = 2
|
||||
akka.cluster {
|
||||
gossip-interval = 500 ms
|
||||
auto-join = off
|
||||
failure-detector.threshold = 4
|
||||
}
|
||||
akka.loglevel = INFO
|
||||
akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 2
|
||||
akka.scheduler.tick-duration = 33 ms
|
||||
akka.remote.netty.execution-pool-size = 1
|
||||
"""))
|
||||
}
|
||||
|
||||
class LargeClusterMultiJvmNode1 extends LargeClusterSpec with AccrualFailureDetectorStrategy
|
||||
class LargeClusterMultiJvmNode2 extends LargeClusterSpec with AccrualFailureDetectorStrategy
|
||||
class LargeClusterMultiJvmNode3 extends LargeClusterSpec with AccrualFailureDetectorStrategy
|
||||
class LargeClusterMultiJvmNode4 extends LargeClusterSpec with AccrualFailureDetectorStrategy
|
||||
class LargeClusterMultiJvmNode5 extends LargeClusterSpec with AccrualFailureDetectorStrategy
|
||||
|
||||
abstract class LargeClusterSpec
|
||||
extends MultiNodeSpec(LargeClusterMultiJvmSpec)
|
||||
with MultiNodeClusterSpec {
|
||||
|
||||
import LargeClusterMultiJvmSpec._
|
||||
|
||||
var systems: IndexedSeq[ActorSystem] = IndexedSeq(system)
|
||||
val nodesPerDatacenter = system.settings.config.getInt(
|
||||
"akka.test.large-cluster-spec.nodes-per-datacenter")
|
||||
|
||||
/**
|
||||
* Since we start some ActorSystems/Clusters outside of the
|
||||
* MultiNodeClusterSpec control we can't use use the mechanism
|
||||
* defined in MultiNodeClusterSpec to inject failure detector etc.
|
||||
* Use ordinary Cluster extension with default AccrualFailureDetector.
|
||||
*/
|
||||
override def cluster: Cluster = Cluster(system)
|
||||
|
||||
override def atTermination(): Unit = {
|
||||
systems foreach { _.shutdown }
|
||||
val shutdownTimeout = 20.seconds
|
||||
val deadline = Deadline.now + shutdownTimeout
|
||||
systems.foreach { sys ⇒
|
||||
if (sys.isTerminated)
|
||||
() // already done
|
||||
else if (deadline.isOverdue)
|
||||
sys.log.warning("Failed to shutdown [{}] within [{}]", sys.name, shutdownTimeout)
|
||||
else {
|
||||
try sys.awaitTermination(deadline.timeLeft) catch {
|
||||
case _: TimeoutException ⇒ sys.log.warning("Failed to shutdown [{}] within [{}]", sys.name, shutdownTimeout)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def startupSystems(): Unit = {
|
||||
// one system is already started by the multi-node test
|
||||
for (n ← 2 to nodesPerDatacenter)
|
||||
systems :+= ActorSystem(myself.name + "-" + n, system.settings.config)
|
||||
|
||||
// Initialize the Cluster extensions, i.e. startup the clusters
|
||||
systems foreach { Cluster(_) }
|
||||
}
|
||||
|
||||
def expectedMaxDuration(totalNodes: Int): Duration =
|
||||
5.seconds + (2.seconds * totalNodes)
|
||||
|
||||
def joinAll(from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = {
|
||||
val joiningClusters = systems.map(Cluster(_)).toSet
|
||||
join(joiningClusters, from, to, totalNodes, runOnRoles: _*)
|
||||
}
|
||||
|
||||
def join(joiningClusterNodes: Set[Cluster], from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = {
|
||||
runOnRoles must contain(from)
|
||||
runOnRoles must contain(to)
|
||||
|
||||
runOn(runOnRoles: _*) {
|
||||
systems.size must be(nodesPerDatacenter) // make sure it is initialized
|
||||
|
||||
val clusterNodes = ifNode(from)(joiningClusterNodes)(systems.map(Cluster(_)).toSet)
|
||||
val startGossipCounts = Map.empty[Cluster, Long] ++
|
||||
clusterNodes.map(c ⇒ (c -> c.receivedGossipCount))
|
||||
def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c)
|
||||
val startTime = System.nanoTime
|
||||
def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms"
|
||||
|
||||
val latch = TestLatch(clusterNodes.size)
|
||||
clusterNodes foreach { c ⇒
|
||||
c.registerListener(new MembershipChangeListener {
|
||||
override def notify(members: SortedSet[Member]): Unit = {
|
||||
if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) {
|
||||
log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages",
|
||||
totalNodes, c.selfAddress, tookMillis, gossipCount(c))
|
||||
latch.countDown()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
runOn(from) {
|
||||
clusterNodes foreach { _ join to }
|
||||
}
|
||||
|
||||
Await.ready(latch, remaining)
|
||||
|
||||
awaitCond(clusterNodes.forall(_.convergence.isDefined))
|
||||
val counts = clusterNodes.map(gossipCount(_))
|
||||
val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max)
|
||||
log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node",
|
||||
totalNodes, tookMillis, formattedStats)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
"A large cluster" must {
|
||||
|
||||
"join all nodes in first-datacenter to first-datacenter" taggedAs LongRunningTest in {
|
||||
runOn(firstDatacenter) {
|
||||
startupSystems()
|
||||
startClusterNode()
|
||||
}
|
||||
enterBarrier("first-datacenter-started")
|
||||
|
||||
val totalNodes = nodesPerDatacenter
|
||||
within(expectedMaxDuration(totalNodes)) {
|
||||
joinAll(from = firstDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter)
|
||||
enterBarrier("first-datacenter-joined")
|
||||
}
|
||||
}
|
||||
|
||||
"join all nodes in second-datacenter to first-datacenter" taggedAs LongRunningTest in {
|
||||
runOn(secondDatacenter) {
|
||||
startupSystems()
|
||||
}
|
||||
enterBarrier("second-datacenter-started")
|
||||
|
||||
val totalNodes = nodesPerDatacenter * 2
|
||||
within(expectedMaxDuration(totalNodes)) {
|
||||
joinAll(from = secondDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter, secondDatacenter)
|
||||
enterBarrier("second-datacenter-joined")
|
||||
}
|
||||
}
|
||||
|
||||
"join all nodes in third-datacenter to first-datacenter" taggedAs LongRunningTest in {
|
||||
runOn(thirdDatacenter) {
|
||||
startupSystems()
|
||||
}
|
||||
enterBarrier("third-datacenter-started")
|
||||
|
||||
val totalNodes = nodesPerDatacenter * 3
|
||||
within(expectedMaxDuration(totalNodes)) {
|
||||
joinAll(from = thirdDatacenter, to = firstDatacenter, totalNodes,
|
||||
runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter)
|
||||
enterBarrier("third-datacenter-joined")
|
||||
}
|
||||
}
|
||||
|
||||
"join all nodes in fourth-datacenter to first-datacenter" taggedAs LongRunningTest in {
|
||||
runOn(fourthDatacenter) {
|
||||
startupSystems()
|
||||
}
|
||||
enterBarrier("fourth-datacenter-started")
|
||||
|
||||
val totalNodes = nodesPerDatacenter * 4
|
||||
within(expectedMaxDuration(totalNodes)) {
|
||||
joinAll(from = fourthDatacenter, to = firstDatacenter, totalNodes,
|
||||
runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter)
|
||||
enterBarrier("fourth-datacenter-joined")
|
||||
}
|
||||
}
|
||||
|
||||
"join nodes one by one from fifth-datacenter to first-datacenter" taggedAs LongRunningTest in {
|
||||
runOn(fifthDatacenter) {
|
||||
startupSystems()
|
||||
}
|
||||
enterBarrier("fifth-datacenter-started")
|
||||
|
||||
for (i ← 0 until nodesPerDatacenter) {
|
||||
val totalNodes = nodesPerDatacenter * 4 + i + 1
|
||||
within(expectedMaxDuration(totalNodes)) {
|
||||
val joiningClusters = ifNode(fifthDatacenter)(Set(Cluster(systems(i))))(Set.empty)
|
||||
join(joiningClusters, from = fifthDatacenter, to = firstDatacenter, totalNodes,
|
||||
runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter)
|
||||
enterBarrier("fifth-datacenter-joined-" + i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME sometimes this fails, FD marks nodes from other than second-datacenter as unavailable
|
||||
"detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest ignore {
|
||||
val unreachableNodes = nodesPerDatacenter
|
||||
val liveNodes = nodesPerDatacenter * 4
|
||||
|
||||
within(20.seconds + expectedMaxDuration(liveNodes)) {
|
||||
val startGossipCounts = Map.empty[Cluster, Long] ++
|
||||
systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).receivedGossipCount))
|
||||
def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c)
|
||||
val startTime = System.nanoTime
|
||||
def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms"
|
||||
|
||||
val latch = TestLatch(nodesPerDatacenter)
|
||||
systems foreach { sys ⇒
|
||||
Cluster(sys).registerListener(new MembershipChangeListener {
|
||||
override def notify(members: SortedSet[Member]): Unit = {
|
||||
if (!latch.isOpen && members.size == liveNodes && Cluster(sys).latestGossip.overview.unreachable.size == unreachableNodes) {
|
||||
log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages",
|
||||
unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys)))
|
||||
latch.countDown()
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
runOn(firstDatacenter) {
|
||||
testConductor.shutdown(secondDatacenter, 0)
|
||||
}
|
||||
|
||||
enterBarrier("second-datacenter-shutdown")
|
||||
|
||||
runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) {
|
||||
Await.ready(latch, remaining)
|
||||
awaitCond(systems.forall(Cluster(_).convergence.isDefined))
|
||||
val counts = systems.map(sys ⇒ gossipCount(Cluster(sys)))
|
||||
val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max)
|
||||
log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node",
|
||||
liveNodes, tookMillis, formattedStats)
|
||||
}
|
||||
|
||||
enterBarrier("after-6")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue