diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 5da67fb01b..37d46fe743 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -72,7 +72,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with implicit val ec = system.dispatcher import akka.routing.RoutingSpec._ - muteDeadLetters("DeathWatchNotification.*")() + muteDeadLetters(classOf[akka.dispatch.sysmsg.DeathWatchNotification])() "routers in general" must { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index cc369d1897..102682ed5f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -105,16 +105,18 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro } muteDeadLetters( - "Heartbeat.*", - "GossipEnvelope.*", - "ClusterMetricsChanged.*", - "Disassociated.*", - "DisassociateUnderlying.*", - "HandleListenerRegistered.*", - "PoisonPill.*", - "DeathWatchNotification.*", - "NullMessage.*", - "InboundPayload.*")(sys) + classOf[ClusterHeartbeatReceiver.Heartbeat], + classOf[ClusterHeartbeatReceiver.EndHeartbeat], + classOf[GossipEnvelope], + classOf[MetricsGossipEnvelope], + classOf[ClusterEvent.ClusterMetricsChanged], + classOf[InternalClusterAction.Tick], + classOf[akka.actor.PoisonPill], + classOf[akka.dispatch.sysmsg.DeathWatchNotification], + akka.dispatch.NullMessage.getClass, + akka.remote.transport.AssociationHandle.Disassociated.getClass, + akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass, + classOf[akka.remote.transport.AssociationHandle.InboundPayload])(sys) } } @@ -290,6 +292,8 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro } } + def awaitAllReachable(): Unit = awaitAssert(clusterView.unreachableMembers.isEmpty) + /** * Wait until the specified nodes have seen the same gossip overview. */ diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 91c840b5a1..d220339fb5 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -38,6 +38,8 @@ import akka.testkit._ import akka.testkit.TestEvent._ import akka.actor.Identify import akka.actor.ActorIdentity +import akka.util.Helpers.Requiring +import java.lang.management.ManagementFactory /** * This test is intended to be used as long running stress test @@ -50,27 +52,38 @@ import akka.actor.ActorIdentity * 4. exercise cluster aware routers, including high throughput * 5. exercise many actors in a tree structure * 6. exercise remote supervision - * 7. leave and shutdown nodes in various ways - * 8. while nodes are removed remote death watch is also exercised - * 9. while nodes are removed a few cluster aware routers are also working + * 7. gossip without any changes to the membership + * 8. leave and shutdown nodes in various ways + * 9. while nodes are removed remote death watch is also exercised + * 10. while nodes are removed a few cluster aware routers are also working + * + * By default it uses 13 nodes. + * Example of sbt command line parameters to double that: + * `-DMultiJvm.akka.cluster.Stress.nrOfNodes=26 -Dmultinode.Dakka.test.cluster-stress-spec.nr-of-nodes-factor=2` */ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { + val totalNumberOfNodes = + System.getProperty("MultiJvm.akka.cluster.Stress.nrOfNodes") match { + case null ⇒ 13 + case value ⇒ value.toInt requiring (_ >= 10, "nrOfNodes must be >= 10") + } + + for (n ← 1 to totalNumberOfNodes) role("node-" + n) + // Note that this test uses default configuration, // not MultiNodeClusterSpec.clusterConfig commonConfig(ConfigFactory.parseString(""" akka.test.cluster-stress-spec { - log-stats = off + infolog = off # scale the nr-of-nodes* settings with this factor nr-of-nodes-factor = 1 - nr-of-nodes = 13 # not scaled nr-of-seed-nodes = 3 nr-of-nodes-joining-to-seed-initally = 2 nr-of-nodes-joining-one-by-one-small = 2 nr-of-nodes-joining-one-by-one-large = 2 nr-of-nodes-joining-to-one = 2 - nr-of-nodes-joining-to-seed = 2 nr-of-nodes-leaving-one-by-one-small = 1 nr-of-nodes-leaving-one-by-one-large = 2 nr-of-nodes-leaving = 2 @@ -89,6 +102,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { high-throughput-duration = 10s supervision-duration = 10s supervision-one-iteration = 2.5s + idle-gossip-duration = 10s expected-test-duration = 600s # actors are created in a tree structure defined # by tree-width (number of children for each actor) and @@ -99,6 +113,8 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { report-metrics-interval = 10s # scale convergence within timeouts with this factor convergence-within-factor = 1.0 + # set to off to only test cluster membership + exercise-actors = on } akka.actor.provider = akka.cluster.ClusterActorRefProvider @@ -111,6 +127,12 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = off + #akka.scheduler.tick-duration = 33 ms + akka.actor.default-dispatcher.fork-join-executor { + parallelism-min = 8 + parallelism-max = 8 + } + akka.actor.deployment { /master-node-1/workers { router = round-robin @@ -148,16 +170,18 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { private def getDuration(name: String): FiniteDuration = Duration(getMilliseconds(name), MILLISECONDS) - val logStats = getBoolean("log-stats") + val infolog = getBoolean("infolog") val nFactor = getInt("nr-of-nodes-factor") - val totalNumberOfNodes = getInt("nr-of-nodes") * nFactor ensuring ( - _ >= 10, "nr-of-nodes must be >= 10") val numberOfSeedNodes = getInt("nr-of-seed-nodes") // not scaled by nodes factor val numberOfNodesJoiningToSeedNodesInitially = getInt("nr-of-nodes-joining-to-seed-initally") * nFactor val numberOfNodesJoiningOneByOneSmall = getInt("nr-of-nodes-joining-one-by-one-small") * nFactor val numberOfNodesJoiningOneByOneLarge = getInt("nr-of-nodes-joining-one-by-one-large") * nFactor val numberOfNodesJoiningToOneNode = getInt("nr-of-nodes-joining-to-one") * nFactor - val numberOfNodesJoiningToSeedNodes = getInt("nr-of-nodes-joining-to-seed") * nFactor + // remaining will join to seed nodes + val numberOfNodesJoiningToSeedNodes = (totalNumberOfNodes - numberOfSeedNodes - + numberOfNodesJoiningToSeedNodesInitially - numberOfNodesJoiningOneByOneSmall - + numberOfNodesJoiningOneByOneLarge - numberOfNodesJoiningToOneNode) requiring (_ >= 0, + s"too many configured nr-of-nodes-joining-*, total must be <= ${totalNumberOfNodes}") val numberOfNodesLeavingOneByOneSmall = getInt("nr-of-nodes-leaving-one-by-one-small") * nFactor val numberOfNodesLeavingOneByOneLarge = getInt("nr-of-nodes-leaving-one-by-one-large") * nFactor val numberOfNodesLeaving = getInt("nr-of-nodes-leaving") * nFactor @@ -175,11 +199,13 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { val highThroughputDuration = getDuration("high-throughput-duration") * dFactor val supervisionDuration = getDuration("supervision-duration") * dFactor val supervisionOneIteration = getDuration("supervision-one-iteration") * dFactor + val idleGossipDuration = getDuration("idle-gossip-duration") * dFactor val expectedTestDuration = getDuration("expected-test-duration") * dFactor val treeWidth = getInt("tree-width") val treeLevels = getInt("tree-levels") val reportMetricsInterval = getDuration("report-metrics-interval") val convergenceWithinFactor = getDouble("convergence-within-factor") + val exerciseActors = getBoolean("exercise-actors") require(numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially + numberOfNodesJoiningOneByOneSmall + numberOfNodesJoiningOneByOneLarge + numberOfNodesJoiningToOneNode + numberOfNodesJoiningToSeedNodes <= totalNumberOfNodes, @@ -191,10 +217,11 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { s"specified number of leaving/shutdown nodes <= ${totalNumberOfNodes - 3}") require(numberOfNodesJoinRemove <= totalNumberOfNodes, s"nr-of-nodes-join-remove must be <= ${totalNumberOfNodes}") - } - // FIXME configurable number of nodes - for (n ← 1 to 13) role("node-" + n) + override def toString: String = { + testConfig.withFallback(ConfigFactory.parseString(s"nrOfNodes=${totalNumberOfNodes}")).root.render + } + } implicit class FormattedDouble(val d: Double) extends AnyVal { def form: String = d.formatted("%.2f") @@ -215,7 +242,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { */ class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging { import settings.reportMetricsInterval - import settings.logStats + import settings.infolog val cluster = Cluster(context.system) var reportTo: Option[ActorRef] = None var results = Vector.empty[ClusterResult] @@ -246,13 +273,13 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { case PhiResult(from, phiValues) ⇒ phiValuesObservedByNode += from -> phiValues case StatsResult(from, stats) ⇒ clusterStatsObservedByNode += from -> stats case ReportTick ⇒ - if (logStats) + if (infolog) log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") case r: ClusterResult ⇒ results :+= r if (results.size == expectedResults) { val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats) - if (logStats) + if (infolog) log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") reportTo foreach { _ ! aggregated } context stop self @@ -270,7 +297,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { (formatMetricsHeader +: (nodeMetrics.toSeq.sortBy(_.address) map formatMetricsLine)).mkString("\n") } - def formatMetricsHeader: String = "Node\tHeap (MB)\tCPU (%)\tLoad" + def formatMetricsHeader: String = "[Node]\t[Heap (MB)]\t[CPU (%)]\t[Load]" def formatMetricsLine(nodeMetrics: NodeMetrics): String = { val heap = nodeMetrics match { @@ -305,14 +332,14 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { } } - def formatPhiHeader: String = "Monitor\tSubject\tcount\tcount phi > 1.0\tmax phi" + def formatPhiHeader: String = "[Monitor]\t[Subject]\t[count]\t[count phi > 1.0]\t[max phi]" def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" def formatStats: String = (clusterStatsObservedByNode map { case (monitor, stats) ⇒ s"${monitor}\t${stats}" }). - mkString("ClusterStats\n", "\n", "") + mkString("ClusterStats(gossip, merge, same, newer, older)\n", "\n", "") } /** @@ -332,7 +359,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { def formatHistory: String = (formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n") - def formatHistoryHeader: String = "title\tduration (ms)\tcluster stats" + def formatHistoryHeader: String = "[Title]\t[Duration (ms)]\t[ClusterStats(gossip, merge, same, newer, older)]" def formatHistoryLine(result: AggregatedClusterResult): String = s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}" @@ -385,7 +412,15 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiSet) } case state: CurrentClusterState ⇒ nodes = state.members.map(_.address) case memberEvent: MemberEvent ⇒ nodes += memberEvent.member.address - case ReportTo(ref) ⇒ reportTo = ref + case ReportTo(ref) ⇒ + reportTo foreach context.unwatch + reportTo = ref + reportTo foreach context.watch + case Terminated(ref) ⇒ + reportTo match { + case Some(`ref`) ⇒ reportTo = None + case _ ⇒ + } case Reset ⇒ phiByNode = emptyPhiByNode nodes = Set.empty[Address] @@ -414,7 +449,14 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats) reportTo foreach { _ ! res } case ReportTo(ref) ⇒ + reportTo foreach context.unwatch reportTo = ref + reportTo foreach context.watch + case Terminated(ref) ⇒ + reportTo match { + case Some(`ref`) ⇒ reportTo = None + case _ ⇒ + } case Reset ⇒ startStats = cluster.readView.latestStats } @@ -527,7 +569,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { case TreeJob(id, payload, idx, levels, width) ⇒ // create the actors when first TreeJob message is received val totalActors = ((width * math.pow(width, levels) - 1) / (width - 1)).toInt - log.info("Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children", + log.debug("Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children", totalActors, levels, width) val tree = context.actorOf(Props(classOf[TreeNode], levels, width), "tree") tree forward ((idx, SimpleJob(id, payload))) @@ -665,13 +707,48 @@ abstract class StressSpec override def expectedTestDuration = settings.expectedTestDuration + override def shutdownTimeout: FiniteDuration = 30.seconds.dilated + override def muteLog(sys: ActorSystem = system): Unit = { super.muteLog(sys) sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) - sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*PhiResult.*"))) - sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*SendBatch.*"))) - sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*ClusterStats.*"))) - muteDeadLetters("SimpleJob.*", "Tick.*", "AggregatedClusterResult.*")(sys) + muteDeadLetters(classOf[SimpleJob], classOf[AggregatedClusterResult], SendBatch.getClass, + classOf[StatsResult], classOf[PhiResult], RetryTick.getClass)(sys) + } + + def jvmInfo(): String = { + val runtime = ManagementFactory.getRuntimeMXBean + val os = ManagementFactory.getOperatingSystemMXBean + val threads = ManagementFactory.getThreadMXBean + val mem = ManagementFactory.getMemoryMXBean + val heap = mem.getHeapMemoryUsage + + val sb = new StringBuilder + + sb.append("Operating system: ").append(os.getName).append(", ").append(os.getArch).append(", ").append(os.getVersion) + sb.append("\n") + sb.append("JVM: ").append(runtime.getVmName).append(" ").append(runtime.getVmVendor). + append(" ").append(runtime.getVmVersion) + sb.append("\n") + sb.append("Processors: ").append(os.getAvailableProcessors) + sb.append("\n") + sb.append("Load average: ").append(os.getSystemLoadAverage) + sb.append("\n") + sb.append("Thread count: ").append(threads.getThreadCount).append(" (").append(threads.getPeakThreadCount).append(")") + sb.append("\n") + sb.append("Heap: ").append((heap.getUsed.toDouble / 1024 / 1024).form). + append(" (").append((heap.getInit.toDouble / 1024 / 1024).form). + append(" - "). + append((heap.getMax.toDouble / 1024 / 1024).form). + append(")").append(" MB") + sb.append("\n") + + import scala.collection.JavaConverters._ + val args = runtime.getInputArguments.asScala.filterNot(_.contains("classpath")).mkString("\n ") + sb.append("Args:\n ").append(args) + sb.append("\n") + + sb.toString } val seedNodes = roles.take(numberOfSeedNodes) @@ -689,7 +766,7 @@ abstract class StressSpec runOn(roles.head) { val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings), name = "result" + step) - if (includeInHistory) aggregator ! ReportTo(Some(clusterResultHistory)) + if (includeInHistory && infolog) aggregator ! ReportTo(Some(clusterResultHistory)) else aggregator ! ReportTo(None) } enterBarrier("result-aggregator-created-" + step) @@ -706,7 +783,7 @@ abstract class StressSpec } lazy val clusterResultHistory = - if (settings.logStats) system.actorOf(Props[ClusterResultHistory], "resultHistory") + if (settings.infolog) system.actorOf(Props[ClusterResultHistory], "resultHistory") else system.deadLetters lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver") @@ -803,11 +880,13 @@ abstract class StressSpec reportResult { runOn(roles.head) { if (shutdown) { - log.info("Shutting down [{}]", removeAddress) + if (infolog) + log.info("Shutting down [{}]", removeAddress) testConductor.exit(removeRole, 0).await } } awaitMembersUp(currentRoles.size, timeout = remaining) + awaitAllReachable() } } @@ -836,11 +915,13 @@ abstract class StressSpec reportResult { runOn(roles.head) { if (shutdown) removeRoles.foreach { r ⇒ - log.info("Shutting down [{}]", address(r)) + if (infolog) + log.info("Shutting down [{}]", address(r)) testConductor.exit(r, 0).await } } awaitMembersUp(currentRoles.size, timeout = remaining) + awaitAllReachable() } } awaitClusterResult() @@ -891,6 +972,7 @@ abstract class StressSpec nbrUsedRoles + activeRoles.size, canNotBePartOfMemberRing = allPreviousAddresses, timeout = remaining) + awaitAllReachable() } val nextAddresses = clusterView.members.map(_.address) -- usedAddresses runOn(usedRoles: _*) { @@ -912,6 +994,7 @@ abstract class StressSpec within(loopDuration) { runOn(usedRoles: _*) { awaitMembersUp(nbrUsedRoles, timeout = remaining) + awaitAllReachable() phiObserver ! Reset statsObserver ! Reset } @@ -930,6 +1013,7 @@ abstract class StressSpec def exerciseRouters(title: String, duration: FiniteDuration, batchInterval: FiniteDuration, expectDroppedMessages: Boolean, tree: Boolean): Unit = within(duration + 10.seconds) { + nbrUsedRoles must be(totalNumberOfNodes) createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3) @@ -962,7 +1046,7 @@ abstract class StressSpec def awaitWorkResult: WorkResult = { val workResult = expectMsgType[WorkResult] - if (settings.logStats) + if (settings.infolog) log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", masterName, workResult.jobsPerSecond.form, workResult.retryCount, workResult.sendCount) @@ -982,30 +1066,40 @@ abstract class StressSpec for (count ← 0 until rounds) { createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) - reportResult { - roles.take(nbrUsedRoles) foreach { r ⇒ - supervisor ! Props[RemoteChild].withDeploy(Deploy(scope = RemoteScope(address(r)))) - } - supervisor ! GetChildrenCount - expectMsgType[ChildrenCount] must be(ChildrenCount(nbrUsedRoles, 0)) - - 1 to 5 foreach { _ ⇒ supervisor ! new RuntimeException("Simulated exception") } - awaitAssert { + val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3) + runOn(masterRoles: _*) { + reportResult { + roles.take(nbrUsedRoles) foreach { r ⇒ + supervisor ! Props[RemoteChild].withDeploy(Deploy(scope = RemoteScope(address(r)))) + } supervisor ! GetChildrenCount - val c = expectMsgType[ChildrenCount] - c must be(ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles)) - } + expectMsgType[ChildrenCount] must be(ChildrenCount(nbrUsedRoles, 0)) - // after 5 restart attempts the children should be stopped - supervisor ! new RuntimeException("Simulated exception") - awaitAssert { - supervisor ! GetChildrenCount - val c = expectMsgType[ChildrenCount] - // zero children - c must be(ChildrenCount(0, 6 * nbrUsedRoles)) - } - supervisor ! Reset + 1 to 5 foreach { _ ⇒ supervisor ! new RuntimeException("Simulated exception") } + awaitAssert { + supervisor ! GetChildrenCount + val c = expectMsgType[ChildrenCount] + c must be(ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles)) + } + // after 5 restart attempts the children should be stopped + supervisor ! new RuntimeException("Simulated exception") + awaitAssert { + supervisor ! GetChildrenCount + val c = expectMsgType[ChildrenCount] + // zero children + c must be(ChildrenCount(0, 6 * nbrUsedRoles)) + } + supervisor ! Reset + + } + enterBarrier("supervision-done-" + step) + } + + runOn(otherRoles: _*) { + reportResult { + enterBarrier("supervision-done-" + step) + } } awaitClusterResult() @@ -1013,8 +1107,28 @@ abstract class StressSpec } } + def idleGossip(title: String): Unit = { + createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = true) + reportResult { + clusterView.members.size must be(nbrUsedRoles) + Thread.sleep(idleGossipDuration.toMillis) + clusterView.members.size must be(nbrUsedRoles) + } + awaitClusterResult() + } + "A cluster under stress" must { + "log settings" taggedAs LongRunningTest in { + if (infolog) { + log.info("StressSpec JVM:\n{}", jvmInfo) + runOn(roles.head) { + log.info("StressSpec settings:\n{}", settings) + } + } + enterBarrier("after-" + step) + } + "join seed nodes" taggedAs LongRunningTest in within(30 seconds) { val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially) @@ -1040,7 +1154,6 @@ abstract class StressSpec system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false), name = "master-" + myself.name) ! Begin } - enterBarrier("after-" + step) } "join nodes one-by-one to small cluster" taggedAs LongRunningTest in { @@ -1055,8 +1168,10 @@ abstract class StressSpec } "join several nodes to seed nodes" taggedAs LongRunningTest in { - joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = true) - nbrUsedRoles += numberOfNodesJoiningToSeedNodes + if (numberOfNodesJoiningToSeedNodes > 0) { + joinSeveral(numberOfNodesJoiningToSeedNodes, toSeedNodes = true) + nbrUsedRoles += numberOfNodesJoiningToSeedNodes + } enterBarrier("after-" + step) } @@ -1066,40 +1181,50 @@ abstract class StressSpec } "end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) { - runOn(roles.take(3): _*) { - val m = master - m must not be (None) - m.get.tell(End, testActor) - val workResult = awaitWorkResult - workResult.retryCount must be(0) - workResult.sendCount must be > (0L) - workResult.ackCount must be > (0L) + if (exerciseActors) { + runOn(roles.take(3): _*) { + val m = master + m must not be (None) + m.get.tell(End, testActor) + val workResult = awaitWorkResult + workResult.retryCount must be(0) + workResult.sendCount must be > (0L) + workResult.ackCount must be > (0L) + } + enterBarrier("after-" + step) } - enterBarrier("after-" + step) } "use routers with normal throughput" taggedAs LongRunningTest in { - exerciseRouters("use routers with normal throughput", normalThroughputDuration, - batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false) - enterBarrier("after-" + step) + if (exerciseActors) { + exerciseRouters("use routers with normal throughput", normalThroughputDuration, + batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false) + enterBarrier("after-" + step) + } } "use routers with high throughput" taggedAs LongRunningTest in { - exerciseRouters("use routers with high throughput", highThroughputDuration, - batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false) - enterBarrier("after-" + step) + if (exerciseActors) { + exerciseRouters("use routers with high throughput", highThroughputDuration, + batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false) + enterBarrier("after-" + step) + } } "use many actors with normal throughput" taggedAs LongRunningTest in { - exerciseRouters("use many actors with normal throughput", normalThroughputDuration, - batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true) - enterBarrier("after-" + step) + if (exerciseActors) { + exerciseRouters("use many actors with normal throughput", normalThroughputDuration, + batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true) + enterBarrier("after-" + step) + } } "use many actors with high throughput" taggedAs LongRunningTest in { - exerciseRouters("use many actors with high throughput", highThroughputDuration, - batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true) - enterBarrier("after-" + step) + if (exerciseActors) { + exerciseRouters("use many actors with high throughput", highThroughputDuration, + batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true) + enterBarrier("after-" + step) + } } "exercise join/remove/join/remove" taggedAs LongRunningTest in { @@ -1108,16 +1233,25 @@ abstract class StressSpec } "exercise supervision" taggedAs LongRunningTest in { - exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration) + if (exerciseActors) { + exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration) + enterBarrier("after-" + step) + } + } + + "gossip when idle" taggedAs LongRunningTest in { + idleGossip("idle gossip") enterBarrier("after-" + step) } "start routers that are running while nodes are removed" taggedAs LongRunningTest in { - runOn(roles.take(3): _*) { - system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false), - name = "master-" + myself.name) ! Begin + if (exerciseActors) { + runOn(roles.take(3): _*) { + system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false), + name = "master-" + myself.name) ! Begin + } + enterBarrier("after-" + step) } - enterBarrier("after-" + step) } "leave nodes one-by-one from large cluster" taggedAs LongRunningTest in { @@ -1142,27 +1276,36 @@ abstract class StressSpec enterBarrier("after-" + step) } - "leave nodes one-by-one from small cluster" taggedAs LongRunningTest in { - removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false) - enterBarrier("after-" + step) - } - "shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in { removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true) enterBarrier("after-" + step) } - "end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) { - runOn(roles.take(3): _*) { - val m = master - m must not be (None) - m.get.tell(End, testActor) - val workResult = awaitWorkResult - workResult.sendCount must be > (0L) - workResult.ackCount must be > (0L) - } + "leave nodes one-by-one from small cluster" taggedAs LongRunningTest in { + removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false) enterBarrier("after-" + step) } + "end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) { + if (exerciseActors) { + runOn(roles.take(3): _*) { + val m = master + m must not be (None) + m.get.tell(End, testActor) + val workResult = awaitWorkResult + workResult.sendCount must be > (0L) + workResult.ackCount must be > (0L) + } + enterBarrier("after-" + step) + } + } + + "log jvm info" taggedAs LongRunningTest in { + if (infolog) { + log.info("StressSpec JVM:\n{}", jvmInfo) + enterBarrier("after-" + step) + } + } + } } diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index 086d4f2f6d..c75e052993 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -269,7 +269,6 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: } } system.shutdown() - val shutdownTimeout = 5.seconds.dilated try system.awaitTermination(shutdownTimeout) catch { case _: TimeoutException ⇒ val msg = "Failed to stop [%s] within [%s] \n%s".format(system.name, shutdownTimeout, @@ -280,6 +279,8 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: afterTermination() } + def shutdownTimeout: FiniteDuration = 5.seconds.dilated + /** * Override this and return `true` to assert that the * shutdown of the `ActorSystem` was done properly. @@ -359,12 +360,12 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: */ def node(role: RoleName): ActorPath = RootActorPath(testConductor.getAddressFor(role).await) - def muteDeadLetters(endPatterns: String*)(sys: ActorSystem = system): Unit = + def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): Unit = if (!sys.log.isDebugEnabled) { - def mute(suffix: String): Unit = - sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead.*" + suffix))) - if (endPatterns.isEmpty) mute("") - else endPatterns foreach mute + def mute(clazz: Class[_]): Unit = + sys.eventStream.publish(Mute(DeadLettersFilter(clazz)(occurrences = Int.MaxValue))) + if (messageClasses.isEmpty) mute(classOf[AnyRef]) + else messageClasses foreach mute } /** diff --git a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala index 2c52df1c45..53732e78a1 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteWatcherSpec.scala @@ -91,7 +91,10 @@ class RemoteWatcherSpec extends AkkaSpec( val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid - Seq(system, remoteSystem).foreach(muteDeadLetters("Disassociated.*", "DisassociateUnderlying.*")(_)) + Seq(system, remoteSystem).foreach(muteDeadLetters( + akka.remote.transport.AssociationHandle.Disassociated.getClass, + akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass, + akka.dispatch.NullMessage.getClass)(_)) override def afterTermination() { remoteSystem.shutdown() diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 51541814d7..0d1101cd9c 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -4,7 +4,6 @@ package akka.testkit import language.existentials - import scala.util.matching.Regex import scala.collection.immutable import scala.concurrent.duration.Duration @@ -16,6 +15,7 @@ import akka.event.Logging import akka.actor.NoSerializationVerificationNeeded import akka.japi.Util.immutableSeq import java.lang.{ Iterable ⇒ JIterable } +import akka.util.BoxedType /** * Implementation helpers of the EventFilter facilities: send `Mute` @@ -445,6 +445,25 @@ case class CustomEventFilter(test: PartialFunction[LogEvent, Boolean])(occurrenc } } +object DeadLettersFilter { + def apply[T](implicit t: ClassTag[T]): DeadLettersFilter = + new DeadLettersFilter(t.runtimeClass.asInstanceOf[Class[T]])(Int.MaxValue) +} +/** + * Filter which matches DeadLetter events, if the wrapped message conforms to the + * given type. + */ +case class DeadLettersFilter(val messageClass: Class[_])(occurrences: Int) extends EventFilter(occurrences) { + + def matches(event: LogEvent) = { + event match { + case Warning(_, _, msg) ⇒ BoxedType(messageClass) isInstance msg + case _ ⇒ false + } + } + +} + /** * EventListener for running tests, which allows selectively filtering out * expected messages. To use it, include something like this into @@ -469,15 +488,16 @@ class TestEventListener extends Logging.DefaultLogger { case Mute(filters) ⇒ filters foreach addFilter case UnMute(filters) ⇒ filters foreach removeFilter case event: LogEvent ⇒ if (!filter(event)) print(event) - case DeadLetter(msg: SystemMessage, _, rcp) ⇒ - if (!msg.isInstanceOf[Terminate]) { - val event = Warning(rcp.path.toString, rcp.getClass, "received dead system message: " + msg) - if (!filter(event)) print(event) - } case DeadLetter(msg, snd, rcp) ⇒ - if (!msg.isInstanceOf[Terminated]) { - val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg) - if (!filter(event)) print(event) + if (!msg.isInstanceOf[Terminate]) { + val event = Warning(rcp.path.toString, rcp.getClass, msg) + if (!filter(event)) { + val msgStr = + if (msg.isInstanceOf[SystemMessage]) "received dead system message: " + msg + else "received dead letter from " + snd + ": " + msg + val event2 = Warning(rcp.path.toString, rcp.getClass, msgStr) + if (!filter(event2)) print(event2) + } } case UnhandledMessage(msg, sender, rcp) ⇒ val event = Warning(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index b771d8a1d5..123e55470d 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -238,7 +238,7 @@ trait TestKitBase { * Note that the timeout is scaled using Duration.dilated, * which uses the configuration entry "akka.test.timefactor". */ - def awaitAssert(a: ⇒ Any, max: Duration = Duration.Undefined, interval: Duration = 100.millis) { + def awaitAssert(a: ⇒ Any, max: Duration = Duration.Undefined, interval: Duration = 800.millis) { val _max = remainingOrDilated(max) val stop = now + _max diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index e5e9e1b67c..be193fe17c 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -94,12 +94,12 @@ abstract class AkkaSpec(_system: ActorSystem) override def expectedTestDuration: FiniteDuration = 60 seconds - def muteDeadLetters(endPatterns: String*)(sys: ActorSystem = system): Unit = + def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): Unit = if (!sys.log.isDebugEnabled) { - def mute(suffix: String): Unit = - sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead.*" + suffix))) - if (endPatterns.isEmpty) mute("") - else endPatterns foreach mute + def mute(clazz: Class[_]): Unit = + sys.eventStream.publish(Mute(DeadLettersFilter(clazz)(occurrences = Int.MaxValue))) + if (messageClasses.isEmpty) mute(classOf[AnyRef]) + else messageClasses foreach mute } } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 9b551910fc..6687e0ac2d 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -580,13 +580,14 @@ object AkkaBuild extends Build { // multinode.D= and multinode.X= makes it possible to pass arbitrary // -D or -X arguments to the forked jvm, e.g. // -Dmultinode.Djava.net.preferIPv4Stack=true -Dmultinode.Xmx512m -Dmultinode.XX:MaxPermSize=256M + // -DMultiJvm.akka.cluster.Stress.nrOfNodes=15 val MultinodeJvmArgs = "multinode\\.(D|X)(.*)".r + val knownPrefix = Set("multnode.", "akka.", "MultiJvm.") val akkaProperties = System.getProperties.propertyNames.asScala.toList.collect { case MultinodeJvmArgs(a, b) => val value = System.getProperty("multinode." + a + b) "-" + a + b + (if (value == "") "" else "=" + value) - case key: String if key.startsWith("multinode.") => "-D" + key + "=" + System.getProperty(key) - case key: String if key.startsWith("akka.") => "-D" + key + "=" + System.getProperty(key) + case key: String if knownPrefix.exists(pre => key.startsWith(pre)) => "-D" + key + "=" + System.getProperty(key) } "-Xmx256m" :: akkaProperties :::