/** * Copyright (C) 2009-2012 Typesafe Inc. */ package akka.cluster import com.typesafe.config.ConfigFactory import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.testkit._ import akka.util.duration._ import akka.actor.ActorSystem import akka.util.Deadline import java.util.concurrent.TimeoutException import scala.collection.immutable.SortedSet import akka.dispatch.Await import akka.util.Duration import java.util.concurrent.TimeUnit import akka.remote.testconductor.RoleName object LargeClusterMultiJvmSpec extends MultiNodeConfig { // each jvm simulates a datacenter with many nodes val firstDatacenter = role("first-datacenter") val secondDatacenter = role("second-datacenter") val thirdDatacenter = role("third-datacenter") val fourthDatacenter = role("fourth-datacenter") val fifthDatacenter = role("fifth-datacenter") // Note that this test uses default configuration, // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString(""" # Number of ActorSystems in each jvm, can be specified as # system property when running real tests. Many nodes # will take long time and consume many threads. # 10 => 50 nodes is possible to run on one machine. akka.test.large-cluster-spec.nodes-per-datacenter = 2 akka.cluster { gossip-interval = 500 ms auto-join = off failure-detector.threshold = 4 } akka.loglevel = INFO akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 2 akka.scheduler.tick-duration = 33 ms akka.remote.netty.execution-pool-size = 0 # don't use testconductor transport in this test, especially not # when using use-dispatcher-for-io akka.remote.transport = "akka.remote.netty.NettyRemoteTransport" # Using a separate dispatcher for netty io doesn't reduce number # of needed threads # akka.remote.netty.use-dispatcher-for-io=akka.test.io-dispatcher # akka.test.io-dispatcher.fork-join-executor { # parallelism-min = 100 # parallelism-max = 100 # } """)) } class LargeClusterMultiJvmNode1 extends LargeClusterSpec with AccrualFailureDetectorStrategy class LargeClusterMultiJvmNode2 extends LargeClusterSpec with AccrualFailureDetectorStrategy class LargeClusterMultiJvmNode3 extends LargeClusterSpec with AccrualFailureDetectorStrategy class LargeClusterMultiJvmNode4 extends LargeClusterSpec with AccrualFailureDetectorStrategy class LargeClusterMultiJvmNode5 extends LargeClusterSpec with AccrualFailureDetectorStrategy abstract class LargeClusterSpec extends MultiNodeSpec(LargeClusterMultiJvmSpec) with MultiNodeClusterSpec { import LargeClusterMultiJvmSpec._ var systems: IndexedSeq[ActorSystem] = IndexedSeq(system) val nodesPerDatacenter = system.settings.config.getInt( "akka.test.large-cluster-spec.nodes-per-datacenter") /** * Since we start some ActorSystems/Clusters outside of the * MultiNodeClusterSpec control we can't use use the mechanism * defined in MultiNodeClusterSpec to inject failure detector etc. * Use ordinary Cluster extension with default AccrualFailureDetector. */ override def cluster: Cluster = Cluster(system) override def atTermination(): Unit = { systems foreach { _.shutdown } val shutdownTimeout = 20.seconds val deadline = Deadline.now + shutdownTimeout systems.foreach { sys ⇒ if (sys.isTerminated) () // already done else if (deadline.isOverdue) sys.log.warning("Failed to shutdown [{}] within [{}]", sys.name, shutdownTimeout) else { try sys.awaitTermination(deadline.timeLeft) catch { case _: TimeoutException ⇒ sys.log.warning("Failed to shutdown [{}] within [{}]", sys.name, shutdownTimeout) } } } } def startupSystems(): Unit = { // one system is already started by the multi-node test for (n ← 2 to nodesPerDatacenter) systems :+= ActorSystem(myself.name + "-" + n, system.settings.config) // Initialize the Cluster extensions, i.e. startup the clusters systems foreach { Cluster(_) } } def expectedMaxDuration(totalNodes: Int): Duration = 5.seconds + (2.seconds * totalNodes) def joinAll(from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = { val joiningClusters = systems.map(Cluster(_)).toSet join(joiningClusters, from, to, totalNodes, runOnRoles: _*) } def join(joiningClusterNodes: Set[Cluster], from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = { runOnRoles must contain(from) runOnRoles must contain(to) runOn(runOnRoles: _*) { systems.size must be(nodesPerDatacenter) // make sure it is initialized val clusterNodes = ifNode(from)(joiningClusterNodes)(systems.map(Cluster(_)).toSet) val startGossipCounts = Map.empty[Cluster, Long] ++ clusterNodes.map(c ⇒ (c -> c.receivedGossipCount)) def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c) val startTime = System.nanoTime def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" val latch = TestLatch(clusterNodes.size) clusterNodes foreach { c ⇒ c.registerListener(new MembershipChangeListener { override def notify(members: SortedSet[Member]): Unit = { if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) { log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages", totalNodes, c.selfAddress, tookMillis, gossipCount(c)) latch.countDown() } } }) } runOn(from) { clusterNodes foreach { _ join to } } Await.ready(latch, remaining) awaitCond(clusterNodes.forall(_.convergence.isDefined)) val counts = clusterNodes.map(gossipCount(_)) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max) log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node", totalNodes, tookMillis, formattedStats) } } "A large cluster" must { "join all nodes in first-datacenter to first-datacenter" taggedAs LongRunningTest in { runOn(firstDatacenter) { startupSystems() startClusterNode() } enterBarrier("first-datacenter-started") val totalNodes = nodesPerDatacenter within(expectedMaxDuration(totalNodes)) { joinAll(from = firstDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter) enterBarrier("first-datacenter-joined") } } "join all nodes in second-datacenter to first-datacenter" taggedAs LongRunningTest in { runOn(secondDatacenter) { startupSystems() } enterBarrier("second-datacenter-started") val totalNodes = nodesPerDatacenter * 2 within(expectedMaxDuration(totalNodes)) { joinAll(from = secondDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter, secondDatacenter) enterBarrier("second-datacenter-joined") } } "join all nodes in third-datacenter to first-datacenter" taggedAs LongRunningTest in { runOn(thirdDatacenter) { startupSystems() } enterBarrier("third-datacenter-started") val totalNodes = nodesPerDatacenter * 3 within(expectedMaxDuration(totalNodes)) { joinAll(from = thirdDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter) enterBarrier("third-datacenter-joined") } } "join all nodes in fourth-datacenter to first-datacenter" taggedAs LongRunningTest in { runOn(fourthDatacenter) { startupSystems() } enterBarrier("fourth-datacenter-started") val totalNodes = nodesPerDatacenter * 4 within(expectedMaxDuration(totalNodes)) { joinAll(from = fourthDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter) enterBarrier("fourth-datacenter-joined") } } "join nodes one by one from fifth-datacenter to first-datacenter" taggedAs LongRunningTest in { runOn(fifthDatacenter) { startupSystems() } enterBarrier("fifth-datacenter-started") // enough to join a few one-by-one (takes too long time otherwise) val (bulk, oneByOne) = systems.splitAt(systems.size - 3) if (bulk.nonEmpty) { val totalNodes = nodesPerDatacenter * 4 + bulk.size within(expectedMaxDuration(totalNodes)) { val joiningClusters = ifNode(fifthDatacenter)(bulk.map(Cluster(_)).toSet)(Set.empty) join(joiningClusters, from = fifthDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) enterBarrier("fifth-datacenter-joined-" + bulk.size) } } for (i ← 0 until oneByOne.size) { val totalNodes = nodesPerDatacenter * 4 + bulk.size + i + 1 within(expectedMaxDuration(totalNodes)) { val joiningClusters = ifNode(fifthDatacenter)(Set(Cluster(oneByOne(i))))(Set.empty) join(joiningClusters, from = fifthDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) enterBarrier("fifth-datacenter-joined-" + (bulk.size + i)) } } } // FIXME sometimes this fails, FD marks nodes from other than second-datacenter as unavailable "detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest ignore { val unreachableNodes = nodesPerDatacenter val liveNodes = nodesPerDatacenter * 4 within(20.seconds + expectedMaxDuration(liveNodes)) { val startGossipCounts = Map.empty[Cluster, Long] ++ systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).receivedGossipCount)) def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c) val startTime = System.nanoTime def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" val latch = TestLatch(nodesPerDatacenter) systems foreach { sys ⇒ Cluster(sys).registerListener(new MembershipChangeListener { override def notify(members: SortedSet[Member]): Unit = { if (!latch.isOpen && members.size == liveNodes && Cluster(sys).latestGossip.overview.unreachable.size == unreachableNodes) { log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) latch.countDown() } } }) } runOn(firstDatacenter) { testConductor.shutdown(secondDatacenter, 0) } enterBarrier("second-datacenter-shutdown") runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { Await.ready(latch, remaining) awaitCond(systems.forall(Cluster(_).convergence.isDefined)) val counts = systems.map(sys ⇒ gossipCount(Cluster(sys))) val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node", liveNodes, tookMillis, formattedStats) } enterBarrier("after-6") } } } }