diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 357d610ed5..eb1c2c08fb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -28,6 +28,7 @@ import MemberStatus._ import scala.annotation.tailrec import scala.collection.immutable.{ Map, SortedSet } import scala.collection.GenTraversableOnce +import java.util.concurrent.atomic.AtomicLong /** * Interface for membership change listener. @@ -948,6 +949,13 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) } } + // Can be removed when gossip has been optimized + private val _receivedGossipCount = new AtomicLong + /** + * INTERNAL API. + */ + private[cluster] def receivedGossipCount: Long = _receivedGossipCount.get + /** * INTERNAL API. * @@ -995,6 +1003,7 @@ class Cluster(system: ExtendedActorSystem, val failureDetector: FailureDetector) if (!state.compareAndSet(localState, newState)) receiveGossip(from, remoteGossip) // recur if we fail the update else { log.debug("Cluster Node [{}] - Receiving gossip from [{}]", selfAddress, from) + _receivedGossipCount.incrementAndGet() notifyMembershipChangeListeners(localState, newState) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala new file mode 100644 index 0000000000..a12fc90ff9 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LargeClusterSpec.scala @@ -0,0 +1,267 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import com.typesafe.config.ConfigFactory +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.testkit._ +import akka.util.duration._ +import akka.actor.ActorSystem +import akka.util.Deadline +import java.util.concurrent.TimeoutException +import scala.collection.immutable.SortedSet +import akka.dispatch.Await +import akka.util.Duration +import java.util.concurrent.TimeUnit +import akka.remote.testconductor.RoleName + +object LargeClusterMultiJvmSpec extends MultiNodeConfig { + // each jvm simulates a datacenter with many nodes + val firstDatacenter = role("first-datacenter") + val secondDatacenter = role("second-datacenter") + val thirdDatacenter = role("third-datacenter") + val fourthDatacenter = role("fourth-datacenter") + val fifthDatacenter = role("fifth-datacenter") + + // Note that this test uses default configuration, + // not MultiNodeClusterSpec.clusterConfig + commonConfig(ConfigFactory.parseString(""" + # Number of ActorSystems in each jvm, can be specified as + # system property when running real tests. Many nodes + # will take long time. + akka.test.large-cluster-spec.nodes-per-datacenter = 2 + akka.cluster { + gossip-interval = 500 ms + auto-join = off + failure-detector.threshold = 4 + } + akka.loglevel = INFO + akka.actor.default-dispatcher.fork-join-executor.parallelism-max = 2 + akka.scheduler.tick-duration = 33 ms + akka.remote.netty.execution-pool-size = 1 + """)) +} + +class LargeClusterMultiJvmNode1 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode2 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode3 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode4 extends LargeClusterSpec with AccrualFailureDetectorStrategy +class LargeClusterMultiJvmNode5 extends LargeClusterSpec with AccrualFailureDetectorStrategy + +abstract class LargeClusterSpec + extends MultiNodeSpec(LargeClusterMultiJvmSpec) + with MultiNodeClusterSpec { + + import LargeClusterMultiJvmSpec._ + + var systems: IndexedSeq[ActorSystem] = IndexedSeq(system) + val nodesPerDatacenter = system.settings.config.getInt( + "akka.test.large-cluster-spec.nodes-per-datacenter") + + /** + * Since we start some ActorSystems/Clusters outside of the + * MultiNodeClusterSpec control we can't use use the mechanism + * defined in MultiNodeClusterSpec to inject failure detector etc. + * Use ordinary Cluster extension with default AccrualFailureDetector. + */ + override def cluster: Cluster = Cluster(system) + + override def atTermination(): Unit = { + systems foreach { _.shutdown } + val shutdownTimeout = 20.seconds + val deadline = Deadline.now + shutdownTimeout + systems.foreach { sys ⇒ + if (sys.isTerminated) + () // already done + else if (deadline.isOverdue) + sys.log.warning("Failed to shutdown [{}] within [{}]", sys.name, shutdownTimeout) + else { + try sys.awaitTermination(deadline.timeLeft) catch { + case _: TimeoutException ⇒ sys.log.warning("Failed to shutdown [{}] within [{}]", sys.name, shutdownTimeout) + } + } + } + } + + def startupSystems(): Unit = { + // one system is already started by the multi-node test + for (n ← 2 to nodesPerDatacenter) + systems :+= ActorSystem(myself.name + "-" + n, system.settings.config) + + // Initialize the Cluster extensions, i.e. startup the clusters + systems foreach { Cluster(_) } + } + + def expectedMaxDuration(totalNodes: Int): Duration = + 5.seconds + (2.seconds * totalNodes) + + def joinAll(from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = { + val joiningClusters = systems.map(Cluster(_)).toSet + join(joiningClusters, from, to, totalNodes, runOnRoles: _*) + } + + def join(joiningClusterNodes: Set[Cluster], from: RoleName, to: RoleName, totalNodes: Int, runOnRoles: RoleName*): Unit = { + runOnRoles must contain(from) + runOnRoles must contain(to) + + runOn(runOnRoles: _*) { + systems.size must be(nodesPerDatacenter) // make sure it is initialized + + val clusterNodes = ifNode(from)(joiningClusterNodes)(systems.map(Cluster(_)).toSet) + val startGossipCounts = Map.empty[Cluster, Long] ++ + clusterNodes.map(c ⇒ (c -> c.receivedGossipCount)) + def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c) + val startTime = System.nanoTime + def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" + + val latch = TestLatch(clusterNodes.size) + clusterNodes foreach { c ⇒ + c.registerListener(new MembershipChangeListener { + override def notify(members: SortedSet[Member]): Unit = { + if (!latch.isOpen && members.size == totalNodes && members.forall(_.status == MemberStatus.Up)) { + log.debug("All [{}] nodes Up in [{}], it took [{}], received [{}] gossip messages", + totalNodes, c.selfAddress, tookMillis, gossipCount(c)) + latch.countDown() + } + } + }) + } + + runOn(from) { + clusterNodes foreach { _ join to } + } + + Await.ready(latch, remaining) + + awaitCond(clusterNodes.forall(_.convergence.isDefined)) + val counts = clusterNodes.map(gossipCount(_)) + val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / clusterNodes.size, counts.min, counts.max) + log.info("Convergence of [{}] nodes reached, it took [{}], received [{}] gossip messages per node", + totalNodes, tookMillis, formattedStats) + + } + } + + "A large cluster" must { + + "join all nodes in first-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(firstDatacenter) { + startupSystems() + startClusterNode() + } + enterBarrier("first-datacenter-started") + + val totalNodes = nodesPerDatacenter + within(expectedMaxDuration(totalNodes)) { + joinAll(from = firstDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter) + enterBarrier("first-datacenter-joined") + } + } + + "join all nodes in second-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(secondDatacenter) { + startupSystems() + } + enterBarrier("second-datacenter-started") + + val totalNodes = nodesPerDatacenter * 2 + within(expectedMaxDuration(totalNodes)) { + joinAll(from = secondDatacenter, to = firstDatacenter, totalNodes, runOnRoles = firstDatacenter, secondDatacenter) + enterBarrier("second-datacenter-joined") + } + } + + "join all nodes in third-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(thirdDatacenter) { + startupSystems() + } + enterBarrier("third-datacenter-started") + + val totalNodes = nodesPerDatacenter * 3 + within(expectedMaxDuration(totalNodes)) { + joinAll(from = thirdDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter) + enterBarrier("third-datacenter-joined") + } + } + + "join all nodes in fourth-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(fourthDatacenter) { + startupSystems() + } + enterBarrier("fourth-datacenter-started") + + val totalNodes = nodesPerDatacenter * 4 + within(expectedMaxDuration(totalNodes)) { + joinAll(from = fourthDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter) + enterBarrier("fourth-datacenter-joined") + } + } + + "join nodes one by one from fifth-datacenter to first-datacenter" taggedAs LongRunningTest in { + runOn(fifthDatacenter) { + startupSystems() + } + enterBarrier("fifth-datacenter-started") + + for (i ← 0 until nodesPerDatacenter) { + val totalNodes = nodesPerDatacenter * 4 + i + 1 + within(expectedMaxDuration(totalNodes)) { + val joiningClusters = ifNode(fifthDatacenter)(Set(Cluster(systems(i))))(Set.empty) + join(joiningClusters, from = fifthDatacenter, to = firstDatacenter, totalNodes, + runOnRoles = firstDatacenter, secondDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) + enterBarrier("fifth-datacenter-joined-" + i) + } + } + } + + // FIXME sometimes this fails, FD marks nodes from other than second-datacenter as unavailable + "detect failure and auto-down crashed nodes in second-datacenter" taggedAs LongRunningTest ignore { + val unreachableNodes = nodesPerDatacenter + val liveNodes = nodesPerDatacenter * 4 + + within(20.seconds + expectedMaxDuration(liveNodes)) { + val startGossipCounts = Map.empty[Cluster, Long] ++ + systems.map(sys ⇒ (Cluster(sys) -> Cluster(sys).receivedGossipCount)) + def gossipCount(c: Cluster): Long = c.receivedGossipCount - startGossipCounts(c) + val startTime = System.nanoTime + def tookMillis: String = TimeUnit.NANOSECONDS.toMillis(System.nanoTime - startTime) + " ms" + + val latch = TestLatch(nodesPerDatacenter) + systems foreach { sys ⇒ + Cluster(sys).registerListener(new MembershipChangeListener { + override def notify(members: SortedSet[Member]): Unit = { + if (!latch.isOpen && members.size == liveNodes && Cluster(sys).latestGossip.overview.unreachable.size == unreachableNodes) { + log.info("Detected [{}] unreachable nodes in [{}], it took [{}], received [{}] gossip messages", + unreachableNodes, Cluster(sys).selfAddress, tookMillis, gossipCount(Cluster(sys))) + latch.countDown() + } + } + }) + } + + runOn(firstDatacenter) { + testConductor.shutdown(secondDatacenter, 0) + } + + enterBarrier("second-datacenter-shutdown") + + runOn(firstDatacenter, thirdDatacenter, fourthDatacenter, fifthDatacenter) { + Await.ready(latch, remaining) + awaitCond(systems.forall(Cluster(_).convergence.isDefined)) + val counts = systems.map(sys ⇒ gossipCount(Cluster(sys))) + val formattedStats = "mean=%s min=%s max=%s".format(counts.sum / nodesPerDatacenter, counts.min, counts.max) + log.info("Convergence of [{}] nodes reached after failure, it took [{}], received [{}] gossip messages per node", + liveNodes, tookMillis, formattedStats) + } + + enterBarrier("after-6") + } + + } + + } +}