diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index c896e721cc..7d1997649d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -284,18 +284,24 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto // keep the latestGossip to be sent to new subscribers latestGossip = newGossip // first publish the diffUnreachable between the last two gossips - diffUnreachable(oldGossip, newGossip) foreach { event ⇒ - publish(event) - // notify DeathWatch about unreachable node - publish(AddressTerminated(event.member.address)) - } + diffUnreachable(oldGossip, newGossip) foreach publish // buffer up the MemberEvents waiting for convergence memberEvents ++= diffMemberEvents(oldGossip, newGossip) // if we have convergence then publish the MemberEvents and possibly a LeaderChanged if (newGossip.convergence) { val previousConvergedGossip = latestConvergedGossip latestConvergedGossip = newGossip - memberEvents foreach publish + memberEvents foreach { event ⇒ + event match { + case m @ (MemberDowned(_) | MemberRemoved(_)) ⇒ + // TODO MemberDowned match should probably be covered by MemberRemoved, see ticket #2788 + // but right now we don't change Downed to Removed + publish(event) + // notify DeathWatch about downed node + publish(AddressTerminated(m.member.address)) + case _ ⇒ publish(event) + } + } memberEvents = immutable.Seq.empty diffLeader(previousConvergedGossip, latestConvergedGossip) foreach publish } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 3ada580bb2..94e4e9b9f9 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -116,6 +116,7 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg case HeartbeatTick ⇒ heartbeat() case s: CurrentClusterState ⇒ reset(s) case UnreachableMember(m) ⇒ removeMember(m) + case MemberDowned(m) ⇒ removeMember(m) case MemberRemoved(m) ⇒ removeMember(m) case e: MemberEvent ⇒ addMember(e.member) case JoinInProgress(a, d) ⇒ addJoinInProgress(a, d) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala index ae023263c8..ec0c38a944 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterJmx.scala @@ -140,7 +140,6 @@ private[akka] class ClusterJmx(cluster: Cluster, log: LoggingAdapter) { * Unregisters the cluster JMX MBean from MBean server. */ def unregisterMBean(): Unit = { - clusterView.close() try { mBeanServer.unregisterMBean(clusterMBeanName) } catch { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala index bf1009b472..a10e941c3f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUnreachableSpec.scala @@ -52,7 +52,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow cluster.down(thirdAddress) enterBarrier("down-third-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress)) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) clusterView.members.exists(_.address == thirdAddress) must be(false) } @@ -63,7 +63,7 @@ abstract class ClientDowningNodeThatIsUnreachableSpec(multiNodeConfig: ClientDow runOn(second, fourth) { enterBarrier("down-third-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress)) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) } enterBarrier("await-completion") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala index 2a0af15997..a8c8bdb736 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClientDowningNodeThatIsUpSpec.scala @@ -50,7 +50,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT markNodeAsUnavailable(thirdAddress) - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress)) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) clusterView.members.exists(_.address == thirdAddress) must be(false) } @@ -61,7 +61,7 @@ abstract class ClientDowningNodeThatIsUpSpec(multiNodeConfig: ClientDowningNodeT runOn(second, fourth) { enterBarrier("down-third-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(thirdAddress)) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(thirdAddress)) } enterBarrier("await-completion") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala index 279e32ab66..6a917be56e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/LeaderDowningNodeThatIsUnreachableSpec.scala @@ -60,7 +60,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(fourthAddress), 30.seconds) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(fourthAddress), 30.seconds) } runOn(fourth) { @@ -70,7 +70,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow runOn(second, third) { enterBarrier("down-fourth-node") - awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = List(fourthAddress), 30.seconds) + awaitUpConvergence(numberOfMembers = 3, canNotBePartOfMemberRing = Set(fourthAddress), 30.seconds) } enterBarrier("await-completion-1") @@ -90,7 +90,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow // --- HERE THE LEADER SHOULD DETECT FAILURE AND AUTO-DOWN THE UNREACHABLE NODE --- - awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = List(secondAddress), 30.seconds) + awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Set(secondAddress), 30.seconds) } runOn(second) { @@ -100,7 +100,7 @@ abstract class LeaderDowningNodeThatIsUnreachableSpec(multiNodeConfig: LeaderDow runOn(third) { enterBarrier("down-second-node") - awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = List(secondAddress), 30 seconds) + awaitUpConvergence(numberOfMembers = 2, canNotBePartOfMemberRing = Set(secondAddress), 30 seconds) } enterBarrier("await-completion-2") diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala index e6d83f881e..5d55afb00f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MBeanSpec.scala @@ -115,7 +115,7 @@ abstract class MBeanSpec enterBarrier("fourth-down") runOn(first, second, third) { - awaitUpConvergence(3, canNotBePartOfMemberRing = List(fourthAddress)) + awaitUpConvergence(3, canNotBePartOfMemberRing = Set(fourthAddress)) assertMembers(clusterView.members, first, second, third) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index fd2714005b..9d7447e6c9 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -218,17 +218,17 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS */ def awaitUpConvergence( numberOfMembers: Int, - canNotBePartOfMemberRing: immutable.Seq[Address] = Nil, + canNotBePartOfMemberRing: Set[Address] = Set.empty, timeout: FiniteDuration = 20.seconds): Unit = { within(timeout) { + if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set + awaitCond( + canNotBePartOfMemberRing forall (address ⇒ !(clusterView.members exists (_.address == address)))) awaitCond(clusterView.members.size == numberOfMembers) awaitCond(clusterView.members.forall(_.status == MemberStatus.Up)) // clusterView.leader is updated by LeaderChanged, await that to be updated also val expectedLeader = clusterView.members.headOption.map(_.address) awaitCond(clusterView.leader == expectedLeader) - if (!canNotBePartOfMemberRing.isEmpty) // don't run this on an empty set - awaitCond( - canNotBePartOfMemberRing forall (address ⇒ !(clusterView.members exists (_.address == address)))) } } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala index 33ce67ecb5..d1e7098f9b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SingletonClusterSpec.scala @@ -66,7 +66,7 @@ abstract class SingletonClusterSpec(multiNodeConfig: SingletonClusterMultiNodeCo markNodeAsUnavailable(secondAddress) - awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = List(secondAddress), 30.seconds) + awaitUpConvergence(numberOfMembers = 1, canNotBePartOfMemberRing = Set(secondAddress), 30.seconds) clusterView.isSingletonCluster must be(true) awaitCond(clusterView.isLeader) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala index 95b362e6b3..0633969319 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SplitBrainSpec.scala @@ -102,7 +102,7 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) // auto-down = on awaitCond(clusterView.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) clusterView.unreachableMembers.map(_.address) must be(side2.toSet map address) - awaitUpConvergence(side1.size, side2 map address) + awaitUpConvergence(side1.size, side2.toSet map address) assertLeader(side1: _*) } @@ -110,7 +110,7 @@ abstract class SplitBrainSpec(multiNodeConfig: SplitBrainMultiNodeConfig) // auto-down = on awaitCond(clusterView.unreachableMembers.forall(m ⇒ m.status == MemberStatus.Down), 15 seconds) clusterView.unreachableMembers.map(_.address) must be(side1.toSet map address) - awaitUpConvergence(side2.size, side1 map address) + awaitUpConvergence(side2.size, side1.toSet map address) assertLeader(side2: _*) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala new file mode 100644 index 0000000000..707e96610f --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -0,0 +1,1082 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import language.postfixOps +import scala.annotation.tailrec +import scala.collection.immutable +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom +import org.scalatest.BeforeAndAfterEach +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Address +import akka.actor.Deploy +import akka.actor.OneForOneStrategy +import akka.actor.Props +import akka.actor.RootActorPath +import akka.actor.SupervisorStrategy._ +import akka.actor.Terminated +import akka.cluster.ClusterEvent.ClusterMetricsChanged +import akka.cluster.ClusterEvent.CurrentClusterState +import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.StandardMetrics.Cpu +import akka.cluster.StandardMetrics.HeapMemory +import akka.remote.RemoteScope +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.routing.FromConfig +import akka.testkit._ +import akka.testkit.TestEvent._ + +/** + * This test is intended to be used as long running stress test + * of cluster related features. Number of nodes and duration of + * the test steps can be configured. The test scenario is organized as + * follows: + * 1. join nodes in various ways up to the configured total number of nodes + * 2 while nodes are joining a few cluster aware routers are also working + * 3. exercise concurrent joining and shutdown of nodes repeatedly + * 4. exercise cluster aware routers, including high throughput + * 5. exercise many actors in a tree structure + * 6. exercise remote supervision + * 7. leave and shutdown nodes in various ways + * 8. while nodes are removed remote death watch is also exercised + * 9. while nodes are removed a few cluster aware routers are also working + */ +object StressMultiJvmSpec extends MultiNodeConfig { + + // Note that this test uses default configuration, + // not MultiNodeClusterSpec.clusterConfig + commonConfig(ConfigFactory.parseString(""" + akka.test.cluster-stress-spec { + # scale the nr-of-nodes* settings with this factor + nr-of-nodes-factor = 1 + nr-of-nodes = 13 + # not scaled + nr-of-seed-nodes = 3 + nr-of-nodes-joining-to-seed-initally = 2 + nr-of-nodes-joining-one-by-one-small = 2 + nr-of-nodes-joining-one-by-one-large = 2 + nr-of-nodes-joining-to-one = 2 + nr-of-nodes-joining-to-seed = 2 + nr-of-nodes-leaving-one-by-one-small = 1 + nr-of-nodes-leaving-one-by-one-large = 2 + nr-of-nodes-leaving = 2 + nr-of-nodes-shutdown-one-by-one-small = 1 + nr-of-nodes-shutdown-one-by-one-large = 2 + nr-of-nodes-shutdown = 2 + nr-of-nodes-join-remove = 2 + # not scaled + join-remove-duration = 90s + work-batch-size = 100 + work-batch-interval = 2s + payload-size = 1000 + # scale the *-duration settings with this factor + duration-factor = 1 + normal-throughput-duration = 30s + high-throughput-duration = 10s + supervision-duration = 10s + # actors are created in a tree structure defined + # by tree-width (number of children for each actor) and + # tree-levels, total number of actors can be calculated by + # (width * math.pow(width, levels) - 1) / (width - 1) + tree-width = 5 + tree-levels = 4 + report-metrics-interval = 10s + # scale convergence within timeouts with this factor + convergence-within-factor = 1.0 + } + + akka.actor.provider = akka.cluster.ClusterActorRefProvider + akka.cluster { + auto-join = off + auto-down = on + publish-stats-interval = 0 s # always, when it happens + } + akka.event-handlers = ["akka.testkit.TestEventListener"] + akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = off + + akka.actor.deployment { + /master-node-1/workers { + router = round-robin + nr-of-instances = 100 + cluster { + enabled = on + max-nr-of-instances-per-node = 1 + allow-local-routees = off + } + } + /master-node-2/workers { + router = round-robin + nr-of-instances = 100 + cluster { + enabled = on + routees-path = "/user/worker" + allow-local-routees = off + } + } + /master-node-3/workers = { + router = adaptive + nr-of-instances = 100 + cluster { + enabled = on + max-nr-of-instances-per-node = 1 + allow-local-routees = off + } + } + } + """)) + + class Settings(conf: Config) { + private val testConfig = conf.getConfig("akka.test.cluster-stress-spec") + import testConfig._ + + private def getDuration(name: String): FiniteDuration = Duration(getMilliseconds(name), MILLISECONDS) + + val nFactor = getInt("nr-of-nodes-factor") + val totalNumberOfNodes = getInt("nr-of-nodes") * nFactor ensuring ( + _ >= 10, "nr-of-nodes must be >= 10") + val numberOfSeedNodes = getInt("nr-of-seed-nodes") // not scaled by nodes factor + val numberOfNodesJoiningToSeedNodesInitially = getInt("nr-of-nodes-joining-to-seed-initally") * nFactor + val numberOfNodesJoiningOneByOneSmall = getInt("nr-of-nodes-joining-one-by-one-small") * nFactor + val numberOfNodesJoiningOneByOneLarge = getInt("nr-of-nodes-joining-one-by-one-large") * nFactor + val numberOfNodesJoiningToOneNode = getInt("nr-of-nodes-joining-to-one") * nFactor + val numberOfNodesJoiningToSeedNodes = getInt("nr-of-nodes-joining-to-seed") * nFactor + val numberOfNodesLeavingOneByOneSmall = getInt("nr-of-nodes-leaving-one-by-one-small") * nFactor + val numberOfNodesLeavingOneByOneLarge = getInt("nr-of-nodes-leaving-one-by-one-large") * nFactor + val numberOfNodesLeaving = getInt("nr-of-nodes-leaving") * nFactor + val numberOfNodesShutdownOneByOneSmall = getInt("nr-of-nodes-shutdown-one-by-one-small") * nFactor + val numberOfNodesShutdownOneByOneLarge = getInt("nr-of-nodes-shutdown-one-by-one-large") * nFactor + val numberOfNodesShutdown = getInt("nr-of-nodes-shutdown") * nFactor + val numberOfNodesJoinRemove = getInt("nr-of-nodes-join-remove") // not scaled by nodes factor + + val workBatchSize = getInt("work-batch-size") + val workBatchInterval = Duration(getMilliseconds("work-batch-interval"), MILLISECONDS) + val payloadSize = getInt("payload-size") + val dFactor = getInt("duration-factor") + val joinRemoveDuration = getDuration("join-remove-duration") * dFactor + val normalThroughputDuration = getDuration("normal-throughput-duration") * dFactor + val highThroughputDuration = getDuration("high-throughput-duration") * dFactor + val supervisionDuration = getDuration("supervision-duration") * dFactor + val treeWidth = getInt("tree-width") + val treeLevels = getInt("tree-levels") + val reportMetricsInterval = getDuration("report-metrics-interval") + val convergenceWithinFactor = getDouble("convergence-within-factor") + + require(numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially + numberOfNodesJoiningOneByOneSmall + + numberOfNodesJoiningOneByOneLarge + numberOfNodesJoiningToOneNode + numberOfNodesJoiningToSeedNodes <= totalNumberOfNodes, + s"specified number of joining nodes <= ${totalNumberOfNodes}") + + // don't shutdown the 3 nodes hosting the master actors + require(numberOfNodesLeavingOneByOneSmall + numberOfNodesLeavingOneByOneLarge + numberOfNodesLeaving + + numberOfNodesShutdownOneByOneSmall + numberOfNodesShutdownOneByOneLarge + numberOfNodesShutdown <= totalNumberOfNodes - 3, + s"specified number of leaving/shutdown nodes <= ${totalNumberOfNodes - 3}") + + require(numberOfNodesJoinRemove <= totalNumberOfNodes, s"nr-of-nodes-join-remove must be <= ${totalNumberOfNodes}") + } + + // FIXME configurable number of nodes + for (n ← 1 to 13) role("node-" + n) + + implicit class FormattedDouble(val d: Double) extends AnyVal { + def form: String = d.formatted("%.2f") + } + + case class ClusterResult( + address: Address, + duration: Duration, + clusterStats: ClusterStats) + + case class AggregatedClusterResult(title: String, duration: Duration, clusterStats: ClusterStats) + + /** + * Central aggregator of cluster statistics and metrics. + * Reports the result via log periodically and when all + * expected results has been collected. It shuts down + * itself when expected results has been collected. + */ + class ClusterResultAggregator(title: String, expectedResults: Int, reportMetricsInterval: FiniteDuration) extends Actor with ActorLogging { + val cluster = Cluster(context.system) + var reportTo: Option[ActorRef] = None + var results = Vector.empty[ClusterResult] + var nodeMetrics = Set.empty[NodeMetrics] + var phiValuesObservedByNode = { + import akka.cluster.Member.addressOrdering + immutable.SortedMap.empty[Address, Set[PhiValue]] + } + + import context.dispatcher + val reportMetricsTask = context.system.scheduler.schedule( + reportMetricsInterval, reportMetricsInterval, self, ReportTick) + + // subscribe to ClusterMetricsChanged, re-subscribe when restart + override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterMetricsChanged]) + override def postStop(): Unit = { + cluster.unsubscribe(self) + reportMetricsTask.cancel() + super.postStop() + } + + def receive = { + case ClusterMetricsChanged(clusterMetrics) ⇒ nodeMetrics = clusterMetrics + case PhiResult(from, phiValues) ⇒ phiValuesObservedByNode += from -> phiValues + case ReportTick ⇒ + log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}") + case r: ClusterResult ⇒ + results :+= r + if (results.size == expectedResults) { + val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats) + log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}") + reportTo foreach { _ ! aggregated } + context stop self + } + case _: CurrentClusterState ⇒ + case ReportTo(ref) ⇒ reportTo = ref + } + + def maxDuration = results.map(_.duration).max + + def totalClusterStats = results.map(_.clusterStats).foldLeft(ClusterStats()) { (acc, s) ⇒ + ClusterStats( + receivedGossipCount = acc.receivedGossipCount + s.receivedGossipCount, + mergeConflictCount = acc.mergeConflictCount + s.mergeConflictCount, + mergeCount = acc.mergeCount + s.mergeCount, + mergeDetectedCount = acc.mergeDetectedCount + s.mergeDetectedCount) + } + + def formatMetrics: String = { + import akka.cluster.Member.addressOrdering + (formatMetricsHeader +: (nodeMetrics.toSeq.sortBy(_.address) map formatMetricsLine)).mkString("\n") + } + + def formatMetricsHeader: String = "Node\tHeap (MB)\tCPU (%)\tLoad" + + def formatMetricsLine(nodeMetrics: NodeMetrics): String = { + val heap = nodeMetrics match { + case HeapMemory(address, timestamp, used, committed, max) ⇒ + (used.doubleValue / 1024 / 1024).form + case _ ⇒ "" + } + val cpuAndLoad = nodeMetrics match { + case Cpu(address, timestamp, loadOption, cpuOption, processors) ⇒ + format(cpuOption) + "\t" + format(loadOption) + case _ ⇒ "N/A\tN/A" + } + s"${nodeMetrics.address}\t${heap}\t${cpuAndLoad}" + } + + def format(opt: Option[Double]) = opt match { + case None ⇒ "N/A" + case Some(x) ⇒ x.form + } + + def formatPhi: String = { + if (phiValuesObservedByNode.isEmpty) "" + else { + import akka.cluster.Member.addressOrdering + val lines = + for { + (monitor, phiValues) ← phiValuesObservedByNode.toSeq + phi ← phiValues.toSeq.sortBy(_.address) + } yield formatPhiLine(monitor, phi.address, phi) + + (formatPhiHeader +: lines).mkString("\n") + } + } + + def formatPhiHeader: String = "Monitor\tSubject\tcount\tcount phi > 1.0\tmax phi" + + def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = + s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" + + } + + /** + * Keeps cluster statistics and metrics reported by + * ClusterResultAggregator. Logs the list of historical + * results when a new AggregatedClusterResult is received. + */ + class ClusterResultHistory extends Actor with ActorLogging { + var history = Vector.empty[AggregatedClusterResult] + + def receive = { + case result: AggregatedClusterResult ⇒ + history :+= result + log.info("Cluster result history\n" + formatHistory) + } + + def formatHistory: String = + (formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n") + + def formatHistoryHeader: String = "title\tduration (ms)\tgossip count\tmerge count" + + def formatHistoryLine(result: AggregatedClusterResult): String = + s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats.receivedGossipCount}\t${result.clusterStats.mergeCount}" + + } + + /** + * Collect phi values of the failure detector and report to the + * central ClusterResultAggregator. + */ + class PhiObserver extends Actor with ActorLogging { + val cluster = Cluster(context.system) + val fd = cluster.failureDetector.asInstanceOf[AccrualFailureDetector] + var reportTo: Option[ActorRef] = None + val emptyPhiByNode = Map.empty[Address, PhiValue].withDefault(address ⇒ PhiValue(address, 0, 0, 0.0)) + var phiByNode = emptyPhiByNode + var nodes = Set.empty[Address] + + import context.dispatcher + val checkPhiTask = context.system.scheduler.schedule( + 1.second, 1.second, self, PhiTick) + + // subscribe to MemberEvent, re-subscribe when restart + override def preStart(): Unit = cluster.subscribe(self, classOf[MemberEvent]) + override def postStop(): Unit = { + cluster.unsubscribe(self) + checkPhiTask.cancel() + super.postStop() + } + + def receive = { + case PhiTick ⇒ + nodes foreach { node ⇒ + val previous = phiByNode(node) + val φ = fd.phi(node) + if (φ > 0) { + val aboveOne = if (!φ.isInfinite && φ > 1.0) 1 else 0 + phiByNode += node -> PhiValue(node, previous.countAboveOne + aboveOne, previous.count + 1, + math.max(previous.max, φ)) + } + } + reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiByNode.values.toSet) } + case state: CurrentClusterState ⇒ nodes = state.members.map(_.address) + case memberEvent: MemberEvent ⇒ nodes += memberEvent.member.address + case ReportTo(ref) ⇒ reportTo = ref + case Reset ⇒ + phiByNode = emptyPhiByNode + nodes = Set.empty[Address] + cluster.unsubscribe(self) + cluster.subscribe(self, classOf[MemberEvent]) + + } + } + + /** + * Master of routers + * + * Flow control, to not flood the consumers, is handled by scheduling a + * batch of messages to be sent to the router when half of the number + * of outstanding messages remains. + * + * It uses a simple message retry mechanism. If an ack of a sent message + * is not received within a timeout, that message will be resent to the router, + * infinite number of times. + * + * When it receives the `End` command it will stop sending messages to the router, + * resends continuous, until all outstanding acks have been received, and then + * finally it replies with `WorkResult` to the sender of the `End` command, and stops + * itself. + */ + class Master(settings: StressMultiJvmSpec.Settings, batchInterval: FiniteDuration, tree: Boolean) extends Actor { + val workers = context.actorOf(Props[Worker].withRouter(FromConfig), "workers") + val payload = Array.fill(settings.payloadSize)(ThreadLocalRandom.current.nextInt(127).toByte) + val retryTimeout = 5.seconds.dilated(context.system) + val idCounter = Iterator from 0 + var sendCounter = 0L + var ackCounter = 0L + var outstanding = Map.empty[JobId, JobState] + var startTime = 0L + + import context.dispatcher + val resendTask = context.system.scheduler.schedule(3.seconds, 3.seconds, self, RetryTick) + + override def postStop(): Unit = { + resendTask.cancel() + super.postStop() + } + + def receive = { + case Begin ⇒ + startTime = System.nanoTime + self ! SendBatch + context.become(working) + case RetryTick ⇒ + } + + def working: Receive = { + case Ack(id) ⇒ + outstanding -= id + ackCounter += 1 + if (outstanding.size == settings.workBatchSize / 2) + if (batchInterval == Duration.Zero) self ! SendBatch + else context.system.scheduler.scheduleOnce(batchInterval, self, SendBatch) + case SendBatch ⇒ sendJobs() + case RetryTick ⇒ resend() + case End ⇒ + done(sender) + context.become(ending(sender)) + } + + def ending(replyTo: ActorRef): Receive = { + case Ack(id) ⇒ + outstanding -= id + ackCounter += 1 + done(replyTo) + case SendBatch ⇒ + case RetryTick ⇒ resend() + } + + def done(replyTo: ActorRef): Unit = if (outstanding.isEmpty) { + val duration = (System.nanoTime - startTime).nanos + replyTo ! WorkResult(duration, sendCounter, ackCounter) + context stop self + } + + def sendJobs(): Unit = { + 0 until settings.workBatchSize foreach { _ ⇒ + send(createJob()) + } + } + + def createJob(): Job = { + if (tree) TreeJob(idCounter.next(), payload, ThreadLocalRandom.current.nextInt(settings.treeWidth), + settings.treeLevels, settings.treeWidth) + else SimpleJob(idCounter.next(), payload) + } + + def resend(): Unit = { + outstanding.values foreach { jobState ⇒ + if (jobState.deadline.isOverdue) + send(jobState.job) + } + } + + def send(job: Job): Unit = { + outstanding += job.id -> JobState(Deadline.now + retryTimeout, job) + sendCounter += 1 + workers ! job + } + } + + /** + * Used by Master as routee + */ + class Worker extends Actor with ActorLogging { + def receive = { + case SimpleJob(id, payload) ⇒ sender ! Ack(id) + case TreeJob(id, payload, idx, levels, width) ⇒ + // create the actors when first TreeJob message is received + val totalActors = ((width * math.pow(width, levels) - 1) / (width - 1)).toInt + log.info("Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children", + totalActors, levels, width) + val tree = context.actorOf(Props(new TreeNode(levels, width)), "tree") + tree forward (idx, SimpleJob(id, payload)) + context.become(treeWorker(tree)) + } + + def treeWorker(tree: ActorRef): Receive = { + case SimpleJob(id, payload) ⇒ sender ! Ack(id) + case TreeJob(id, payload, idx, _, _) ⇒ + tree forward (idx, SimpleJob(id, payload)) + } + } + + class TreeNode(level: Int, width: Int) extends Actor { + require(level >= 1) + def createChild(): Actor = if (level == 1) new Leaf else new TreeNode(level - 1, width) + val indexedChildren = + 0 until width map { i ⇒ context.actorOf(Props(createChild()), name = i.toString) } toVector + + def receive = { + case (idx: Int, job: SimpleJob) if idx < width ⇒ indexedChildren(idx) forward (idx, job) + } + } + + class Leaf extends Actor { + def receive = { + case (_: Int, job: SimpleJob) ⇒ sender ! Ack(job.id) + } + } + + /** + * Used for remote death watch testing + */ + class Watchee extends Actor { + def receive = Actor.emptyBehavior + } + + /** + * Used for remote supervision testing + */ + class Supervisor extends Actor { + + var restartCount = 0 + + override val supervisorStrategy = + OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) { + case _: Exception ⇒ + restartCount += 1 + Restart + } + + def receive = { + case props: Props ⇒ context.actorOf(props) + case e: Exception ⇒ context.children foreach { _ ! e } + case GetChildrenCount ⇒ sender ! ChildrenCount(context.children.size, restartCount) + case Reset ⇒ + require(context.children.isEmpty, + s"ResetChildrenCount not allowed when children exists, [${context.children.size}]") + restartCount = 0 + } + } + + /** + * Child of Supervisor for remote supervision testing + */ + class RemoteChild extends Actor { + def receive = { + case e: Exception ⇒ throw e + } + } + + case object Begin + case object End + case object RetryTick + case object ReportTick + case object PhiTick + case class PhiResult(from: Address, phiValues: Set[PhiValue]) + case class PhiValue(address: Address, countAboveOne: Int, count: Int, max: Double) + case class ReportTo(ref: Option[ActorRef]) + + type JobId = Int + trait Job { def id: JobId } + case class SimpleJob(id: JobId, payload: Any) extends Job + case class TreeJob(id: JobId, payload: Any, idx: Int, levels: Int, width: Int) extends Job + case class Ack(id: JobId) + case class JobState(deadline: Deadline, job: Job) + case class WorkResult(duration: Duration, sendCount: Long, ackCount: Long) { + def retryCount: Long = sendCount - ackCount + def jobsPerSecond: Double = ackCount * 1000.0 / duration.toMillis + } + case object SendBatch + case class CreateTree(levels: Int, width: Int) + + case object GetChildrenCount + case class ChildrenCount(numberOfChildren: Int, numberOfChildRestarts: Int) + case object Reset + +} + +class StressMultiJvmNode1 extends StressSpec +class StressMultiJvmNode2 extends StressSpec +class StressMultiJvmNode3 extends StressSpec +class StressMultiJvmNode4 extends StressSpec +class StressMultiJvmNode5 extends StressSpec +class StressMultiJvmNode6 extends StressSpec +class StressMultiJvmNode7 extends StressSpec +class StressMultiJvmNode8 extends StressSpec +class StressMultiJvmNode9 extends StressSpec +class StressMultiJvmNode10 extends StressSpec +class StressMultiJvmNode11 extends StressSpec +class StressMultiJvmNode12 extends StressSpec +class StressMultiJvmNode13 extends StressSpec + +abstract class StressSpec + extends MultiNodeSpec(StressMultiJvmSpec) + with MultiNodeClusterSpec with BeforeAndAfterEach with ImplicitSender { + + import StressMultiJvmSpec._ + import ClusterEvent._ + + val settings = new Settings(system.settings.config) + import settings._ + + var step = 0 + var nbrUsedRoles = 0 + + override def beforeEach(): Unit = { step += 1 } + + override def muteLog(sys: ActorSystem = system): Unit = { + super.muteLog(sys) + sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) + sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*PhiResult.*"))) + sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*SendBatch.*"))) + } + + val seedNodes = roles.take(numberOfSeedNodes) + + override def cluster: Cluster = { + createWorker + super.cluster + } + + // always create one worker when the cluster is started + lazy val createWorker: Unit = + system.actorOf(Props[Worker], "worker") + + def createResultAggregator(title: String, expectedResults: Int, includeInHistory: Boolean): Unit = { + runOn(roles.head) { + val aggregator = system.actorOf(Props(new ClusterResultAggregator(title, expectedResults, reportMetricsInterval)), + name = "result" + step) + if (includeInHistory) aggregator ! ReportTo(Some(clusterResultHistory)) + else aggregator ! ReportTo(None) + } + enterBarrier("result-aggregator-created-" + step) + runOn(roles.take(nbrUsedRoles): _*) { + phiObserver ! ReportTo(Some(clusterResultAggregator)) + } + } + + def clusterResultAggregator: ActorRef = system.actorFor(node(roles.head) / "user" / ("result" + step)) + + lazy val clusterResultHistory = system.actorOf(Props[ClusterResultHistory], "resultHistory") + + lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver") + + def awaitClusterResult: Unit = { + runOn(roles.head) { + val r = clusterResultAggregator + watch(r) + expectMsgPF(remaining) { case Terminated(`r`) ⇒ true } + } + enterBarrier("cluster-result-done-" + step) + } + + def joinOneByOne(numberOfNodes: Int): Unit = { + 0 until numberOfNodes foreach { _ ⇒ + joinOne() + nbrUsedRoles += 1 + step += 1 + } + } + + def convergenceWithin(base: FiniteDuration, nodes: Int): FiniteDuration = + (base.toMillis * convergenceWithinFactor * nodes).millis + + def joinOne(): Unit = within(5.seconds + convergenceWithin(2.seconds, nbrUsedRoles + 1)) { + val currentRoles = roles.take(nbrUsedRoles + 1) + val title = s"join one to ${nbrUsedRoles} nodes cluster" + createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true) + runOn(currentRoles: _*) { + reportResult { + runOn(currentRoles.last) { + cluster.join(roles.head) + } + awaitUpConvergence(currentRoles.size, timeout = remaining) + } + + } + awaitClusterResult + enterBarrier("join-one-" + step) + } + + def joinSeveral(numberOfNodes: Int, toSeedNodes: Boolean): Unit = + within(10.seconds + convergenceWithin(3.seconds, nbrUsedRoles + numberOfNodes)) { + val currentRoles = roles.take(nbrUsedRoles + numberOfNodes) + val joiningRoles = currentRoles.takeRight(numberOfNodes) + val title = s"join ${numberOfNodes} to ${if (toSeedNodes) "seed nodes" else "one node"}, in ${nbrUsedRoles} nodes cluster" + createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true) + runOn(currentRoles: _*) { + reportResult { + runOn(joiningRoles: _*) { + if (toSeedNodes) cluster.joinSeedNodes(seedNodes.toIndexedSeq map address) + else cluster.join(roles.head) + } + awaitUpConvergence(currentRoles.size, timeout = remaining) + } + + } + awaitClusterResult + enterBarrier("join-several-" + step) + } + + def removeOneByOne(numberOfNodes: Int, shutdown: Boolean): Unit = { + 0 until numberOfNodes foreach { _ ⇒ + removeOne(shutdown) + nbrUsedRoles -= 1 + step += 1 + } + } + + def removeOne(shutdown: Boolean): Unit = within(10.seconds + convergenceWithin(3.seconds, nbrUsedRoles - 1)) { + val currentRoles = roles.take(nbrUsedRoles - 1) + val title = s"${if (shutdown) "shutdown" else "remove"} one from ${nbrUsedRoles} nodes cluster" + createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true) + val removeRole = roles(nbrUsedRoles - 1) + val removeAddress = address(removeRole) + runOn(removeRole) { + system.actorOf(Props[Watchee], "watchee") + if (!shutdown) cluster.leave(myself) + } + enterBarrier("watchee-created-" + step) + runOn(roles.head) { + watch(system.actorFor(node(removeRole) / "user" / "watchee")) + } + enterBarrier("watch-estabilished-" + step) + + runOn(currentRoles: _*) { + reportResult { + runOn(roles.head) { + if (shutdown) { + log.info("Shutting down [{}]", removeAddress) + testConductor.shutdown(removeRole, 0).await + } + } + awaitUpConvergence(currentRoles.size, timeout = remaining) + } + } + + runOn(roles.head) { + val expectedRef = system.actorFor(RootActorPath(removeAddress) / "user" / "watchee") + expectMsgPF(remaining) { + case Terminated(`expectedRef`) ⇒ true + } + } + enterBarrier("watch-verified-" + step) + + awaitClusterResult + enterBarrier("remove-one-" + step) + } + + def removeSeveral(numberOfNodes: Int, shutdown: Boolean): Unit = + within(10.seconds + convergenceWithin(5.seconds, nbrUsedRoles - numberOfNodes)) { + val currentRoles = roles.take(nbrUsedRoles - numberOfNodes) + val removeRoles = roles.slice(currentRoles.size, nbrUsedRoles) + val title = s"${if (shutdown) "shutdown" else "leave"} ${numberOfNodes} in ${nbrUsedRoles} nodes cluster" + createResultAggregator(title, expectedResults = currentRoles.size, includeInHistory = true) + runOn(removeRoles: _*) { + if (!shutdown) cluster.leave(myself) + } + runOn(currentRoles: _*) { + reportResult { + runOn(roles.head) { + if (shutdown) removeRoles.foreach { r ⇒ + log.info("Shutting down [{}]", address(r)) + testConductor.shutdown(r, 0).await + } + } + awaitUpConvergence(currentRoles.size, timeout = remaining) + } + } + awaitClusterResult + enterBarrier("remove-several-" + step) + } + + def reportResult[T](thunk: ⇒ T): T = { + val startTime = System.nanoTime + val startStats = clusterView.latestStats + + val returnValue = thunk + + val duration = (System.nanoTime - startTime).nanos + val latestStats = clusterView.latestStats + val clusterStats = ClusterStats( + receivedGossipCount = latestStats.receivedGossipCount - startStats.receivedGossipCount, + mergeConflictCount = latestStats.mergeConflictCount - startStats.mergeConflictCount, + mergeCount = latestStats.mergeCount - startStats.mergeCount, + mergeDetectedCount = latestStats.mergeDetectedCount - startStats.mergeDetectedCount) + clusterResultAggregator ! ClusterResult(cluster.selfAddress, duration, clusterStats) + returnValue + } + + def exerciseJoinRemove(title: String, duration: FiniteDuration): Unit = { + val activeRoles = roles.take(numberOfNodesJoinRemove) + val loopDuration = 10.seconds + convergenceWithin(4.seconds, nbrUsedRoles + activeRoles.size) + val deadline = Deadline.now + duration - loopDuration + val usedRoles = roles.take(nbrUsedRoles) + val usedAddresses = usedRoles.map(address(_)).toSet + + @tailrec def loop(counter: Int, previousAS: Option[ActorSystem], allPreviousAddresses: Set[Address]): Option[ActorSystem] = { + if (deadline.isOverdue) previousAS + else { + val t = title + " round " + counter + runOn(usedRoles: _*) { + phiObserver ! Reset + } + createResultAggregator(t, expectedResults = nbrUsedRoles, includeInHistory = true) + val (nextAS, nextAddresses) = within(loopDuration) { + reportResult { + val nextAS = + if (activeRoles contains myself) { + previousAS foreach { _.shutdown() } + val sys = ActorSystem(system.name, system.settings.config) + muteLog(sys) + Cluster(sys).joinSeedNodes(seedNodes.toIndexedSeq map address) + Some(sys) + } else previousAS + runOn(usedRoles: _*) { + awaitUpConvergence( + nbrUsedRoles + activeRoles.size, + canNotBePartOfMemberRing = allPreviousAddresses, + timeout = remaining) + } + val nextAddresses = clusterView.members.map(_.address) -- usedAddresses + runOn(usedRoles: _*) { + nextAddresses.size must be(numberOfNodesJoinRemove) + } + + enterBarrier("join-remove-" + step) + (nextAS, nextAddresses) + } + } + awaitClusterResult + + step += 1 + loop(counter + 1, nextAS, nextAddresses) + + } + } + + loop(1, None, Set.empty) foreach { _.shutdown } + within(loopDuration) { + runOn(usedRoles: _*) { + awaitUpConvergence(nbrUsedRoles, timeout = remaining) + phiObserver ! Reset + } + } + enterBarrier("join-remove-shutdown-" + step) + + } + + def master: ActorRef = system.actorFor("/user/master-" + myself.name) + + def exerciseRouters(title: String, duration: FiniteDuration, batchInterval: FiniteDuration, + expectDroppedMessages: Boolean, tree: Boolean): Unit = + within(duration + 10.seconds) { + createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) + + val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3) + runOn(masterRoles: _*) { + reportResult { + val m = system.actorOf(Props(new Master(settings, batchInterval, tree)), + name = "master-" + myself.name) + m ! Begin + import system.dispatcher + system.scheduler.scheduleOnce(highThroughputDuration) { + m.tell(End, testActor) + } + val workResult = awaitWorkResult + workResult.sendCount must be > (0L) + workResult.ackCount must be > (0L) + if (!expectDroppedMessages) + workResult.retryCount must be(0) + + enterBarrier("routers-done-" + step) + } + } + runOn(otherRoles: _*) { + reportResult { + enterBarrier("routers-done-" + step) + } + } + + awaitClusterResult + } + + def awaitWorkResult: WorkResult = { + val m = master + val workResult = expectMsgType[WorkResult] + log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", m.path.name, + workResult.jobsPerSecond.form, + workResult.retryCount, workResult.sendCount) + watch(m) + expectMsgPF(remaining) { case Terminated(`m`) ⇒ true } + workResult + } + + def exerciseSupervision(title: String, duration: FiniteDuration): Unit = + within(duration + 10.seconds) { + val supervisor = system.actorOf(Props[Supervisor], "supervisor") + while (remaining > 10.seconds) { + createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) + + reportResult { + roles.take(nbrUsedRoles) foreach { r ⇒ + supervisor ! Props[RemoteChild].withDeploy(Deploy(scope = RemoteScope(address(r)))) + } + supervisor ! GetChildrenCount + expectMsgType[ChildrenCount] must be(ChildrenCount(nbrUsedRoles, 0)) + + 1 to 5 foreach { _ ⇒ supervisor ! new RuntimeException("Simulated exception") } + awaitCond { + supervisor ! GetChildrenCount + val c = expectMsgType[ChildrenCount] + c == ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles) + } + + // after 5 restart attempts the children should be stopped + supervisor ! new RuntimeException("Simulated exception") + awaitCond { + supervisor ! GetChildrenCount + val c = expectMsgType[ChildrenCount] + // zero children + c == ChildrenCount(0, 6 * nbrUsedRoles) + } + supervisor ! Reset + + } + + awaitClusterResult + } + } + + "A cluster under stress" must { + + "join seed nodes" taggedAs LongRunningTest in { + + val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially) + val size = seedNodes.size + otherNodesJoiningSeedNodes.size + + createResultAggregator("join seed nodes", expectedResults = size, includeInHistory = true) + + runOn((seedNodes ++ otherNodesJoiningSeedNodes): _*) { + reportResult { + cluster.joinSeedNodes(seedNodes.toIndexedSeq map address) + awaitUpConvergence(size) + } + } + + awaitClusterResult + + nbrUsedRoles += size + enterBarrier("after-" + step) + } + + "start routers that are running while nodes are joining" taggedAs LongRunningTest in { + runOn(roles.take(3): _*) { + system.actorOf(Props(new Master(settings, settings.workBatchInterval, tree = false)), + name = "master-" + myself.name) ! Begin + } + enterBarrier("after-" + step) + } + + "join nodes one-by-one to small cluster" taggedAs LongRunningTest in { + joinOneByOne(numberOfNodesJoiningOneByOneSmall) + enterBarrier("after-" + step) + } + + "join several nodes to one node" taggedAs LongRunningTest in { + joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = false) + nbrUsedRoles += numberOfNodesJoiningToOneNode + enterBarrier("after-" + step) + } + + "join several nodes to seed nodes" taggedAs LongRunningTest in { + joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = true) + nbrUsedRoles += numberOfNodesJoiningToSeedNodes + enterBarrier("after-" + step) + } + + "join nodes one-by-one to large cluster" taggedAs LongRunningTest in { + joinOneByOne(numberOfNodesJoiningOneByOneLarge) + enterBarrier("after-" + step) + } + + "end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) { + runOn(roles.take(3): _*) { + val m = master + m.tell(End, testActor) + val workResult = awaitWorkResult + workResult.retryCount must be(0) + workResult.sendCount must be > (0L) + workResult.ackCount must be > (0L) + } + enterBarrier("after-" + step) + } + + "use routers with normal throughput" taggedAs LongRunningTest in { + exerciseRouters("use routers with normal throughput", normalThroughputDuration, + batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false) + enterBarrier("after-" + step) + } + + "use routers with high throughput" taggedAs LongRunningTest in { + exerciseRouters("use routers with high throughput", highThroughputDuration, + batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false) + enterBarrier("after-" + step) + } + + "use many actors with normal throughput" taggedAs LongRunningTest in { + exerciseRouters("use many actors with normal throughput", normalThroughputDuration, + batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true) + enterBarrier("after-" + step) + } + + "use many actors with high throughput" taggedAs LongRunningTest in { + exerciseRouters("use many actors with high throughput", highThroughputDuration, + batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true) + enterBarrier("after-" + step) + } + + "excercise join/remove/join/remove" taggedAs LongRunningTest in { + exerciseJoinRemove("excercise join/remove", joinRemoveDuration) + enterBarrier("after-" + step) + } + + "excercise supervision" taggedAs LongRunningTest in { + exerciseSupervision("excercise supervision", supervisionDuration) + enterBarrier("after-" + step) + } + + "start routers that are running while nodes are removed" taggedAs LongRunningTest in { + runOn(roles.take(3): _*) { + system.actorOf(Props(new Master(settings, settings.workBatchInterval, tree = false)), + name = "master-" + myself.name) ! Begin + } + enterBarrier("after-" + step) + } + + "leave nodes one-by-one from large cluster" taggedAs LongRunningTest in { + removeOneByOne(numberOfNodesLeavingOneByOneLarge, shutdown = false) + enterBarrier("after-" + step) + } + + "shutdown nodes one-by-one from large cluster" taggedAs LongRunningTest in { + removeOneByOne(numberOfNodesShutdownOneByOneLarge, shutdown = true) + enterBarrier("after-" + step) + } + + "leave several nodes" taggedAs LongRunningTest in { + removeSeveral(numberOfNodesLeaving, shutdown = false) + nbrUsedRoles -= numberOfNodesLeaving + enterBarrier("after-" + step) + } + + "shutdown several nodes" taggedAs LongRunningTest in { + removeSeveral(numberOfNodesShutdown, shutdown = true) + nbrUsedRoles -= numberOfNodesShutdown + enterBarrier("after-" + step) + } + + "leave nodes one-by-one from small cluster" taggedAs LongRunningTest in { + removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false) + enterBarrier("after-" + step) + } + + "shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in { + removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true) + enterBarrier("after-" + step) + } + + "end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) { + runOn(roles.take(3): _*) { + val m = master + m.tell(End, testActor) + val workResult = awaitWorkResult + workResult.sendCount must be > (0L) + workResult.ackCount must be > (0L) + } + enterBarrier("after-" + step) + } + + } +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala index 99b076b8d6..a33ca8146e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/UnreachableNodeRejoinsClusterSpec.scala @@ -130,7 +130,7 @@ abstract class UnreachableNodeRejoinsClusterSpec(multiNodeConfig: UnreachableNod } runOn(allBut(victim): _*) { - awaitUpConvergence(roles.size - 1, List(victim)) + awaitUpConvergence(roles.size - 1, Set(victim)) } endBarrier