Enable usage of MultiJvm nrOfNodes in cluster StressSpec, see #2787

* Adjustments to StressSpec for testing large clusters
* Performance improvement of mute deadLetters
This commit is contained in:
Patrik Nordwall 2013-04-23 15:05:27 +02:00
parent 6b51b4d824
commit 33a8808a6d
9 changed files with 302 additions and 130 deletions

View file

@ -72,7 +72,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
implicit val ec = system.dispatcher implicit val ec = system.dispatcher
import akka.routing.RoutingSpec._ import akka.routing.RoutingSpec._
muteDeadLetters("DeathWatchNotification.*")() muteDeadLetters(classOf[akka.dispatch.sysmsg.DeathWatchNotification])()
"routers in general" must { "routers in general" must {

View file

@ -105,16 +105,18 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
} }
muteDeadLetters( muteDeadLetters(
"Heartbeat.*", classOf[ClusterHeartbeatReceiver.Heartbeat],
"GossipEnvelope.*", classOf[ClusterHeartbeatReceiver.EndHeartbeat],
"ClusterMetricsChanged.*", classOf[GossipEnvelope],
"Disassociated.*", classOf[MetricsGossipEnvelope],
"DisassociateUnderlying.*", classOf[ClusterEvent.ClusterMetricsChanged],
"HandleListenerRegistered.*", classOf[InternalClusterAction.Tick],
"PoisonPill.*", classOf[akka.actor.PoisonPill],
"DeathWatchNotification.*", classOf[akka.dispatch.sysmsg.DeathWatchNotification],
"NullMessage.*", akka.dispatch.NullMessage.getClass,
"InboundPayload.*")(sys) akka.remote.transport.AssociationHandle.Disassociated.getClass,
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
classOf[akka.remote.transport.AssociationHandle.InboundPayload])(sys)
} }
} }
@ -290,6 +292,8 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
} }
} }
def awaitAllReachable(): Unit = awaitAssert(clusterView.unreachableMembers.isEmpty)
/** /**
* Wait until the specified nodes have seen the same gossip overview. * Wait until the specified nodes have seen the same gossip overview.
*/ */

View file

@ -38,6 +38,8 @@ import akka.testkit._
import akka.testkit.TestEvent._ import akka.testkit.TestEvent._
import akka.actor.Identify import akka.actor.Identify
import akka.actor.ActorIdentity import akka.actor.ActorIdentity
import akka.util.Helpers.Requiring
import java.lang.management.ManagementFactory
/** /**
* This test is intended to be used as long running stress test * This test is intended to be used as long running stress test
@ -50,27 +52,38 @@ import akka.actor.ActorIdentity
* 4. exercise cluster aware routers, including high throughput * 4. exercise cluster aware routers, including high throughput
* 5. exercise many actors in a tree structure * 5. exercise many actors in a tree structure
* 6. exercise remote supervision * 6. exercise remote supervision
* 7. leave and shutdown nodes in various ways * 7. gossip without any changes to the membership
* 8. while nodes are removed remote death watch is also exercised * 8. leave and shutdown nodes in various ways
* 9. while nodes are removed a few cluster aware routers are also working * 9. while nodes are removed remote death watch is also exercised
* 10. while nodes are removed a few cluster aware routers are also working
*
* By default it uses 13 nodes.
* Example of sbt command line parameters to double that:
* `-DMultiJvm.akka.cluster.Stress.nrOfNodes=26 -Dmultinode.Dakka.test.cluster-stress-spec.nr-of-nodes-factor=2`
*/ */
private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
val totalNumberOfNodes =
System.getProperty("MultiJvm.akka.cluster.Stress.nrOfNodes") match {
case null 13
case value value.toInt requiring (_ >= 10, "nrOfNodes must be >= 10")
}
for (n 1 to totalNumberOfNodes) role("node-" + n)
// Note that this test uses default configuration, // Note that this test uses default configuration,
// not MultiNodeClusterSpec.clusterConfig // not MultiNodeClusterSpec.clusterConfig
commonConfig(ConfigFactory.parseString(""" commonConfig(ConfigFactory.parseString("""
akka.test.cluster-stress-spec { akka.test.cluster-stress-spec {
log-stats = off infolog = off
# scale the nr-of-nodes* settings with this factor # scale the nr-of-nodes* settings with this factor
nr-of-nodes-factor = 1 nr-of-nodes-factor = 1
nr-of-nodes = 13
# not scaled # not scaled
nr-of-seed-nodes = 3 nr-of-seed-nodes = 3
nr-of-nodes-joining-to-seed-initally = 2 nr-of-nodes-joining-to-seed-initally = 2
nr-of-nodes-joining-one-by-one-small = 2 nr-of-nodes-joining-one-by-one-small = 2
nr-of-nodes-joining-one-by-one-large = 2 nr-of-nodes-joining-one-by-one-large = 2
nr-of-nodes-joining-to-one = 2 nr-of-nodes-joining-to-one = 2
nr-of-nodes-joining-to-seed = 2
nr-of-nodes-leaving-one-by-one-small = 1 nr-of-nodes-leaving-one-by-one-small = 1
nr-of-nodes-leaving-one-by-one-large = 2 nr-of-nodes-leaving-one-by-one-large = 2
nr-of-nodes-leaving = 2 nr-of-nodes-leaving = 2
@ -89,6 +102,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
high-throughput-duration = 10s high-throughput-duration = 10s
supervision-duration = 10s supervision-duration = 10s
supervision-one-iteration = 2.5s supervision-one-iteration = 2.5s
idle-gossip-duration = 10s
expected-test-duration = 600s expected-test-duration = 600s
# actors are created in a tree structure defined # actors are created in a tree structure defined
# by tree-width (number of children for each actor) and # by tree-width (number of children for each actor) and
@ -99,6 +113,8 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
report-metrics-interval = 10s report-metrics-interval = 10s
# scale convergence within timeouts with this factor # scale convergence within timeouts with this factor
convergence-within-factor = 1.0 convergence-within-factor = 1.0
# set to off to only test cluster membership
exercise-actors = on
} }
akka.actor.provider = akka.cluster.ClusterActorRefProvider akka.actor.provider = akka.cluster.ClusterActorRefProvider
@ -111,6 +127,12 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
akka.loglevel = INFO akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off akka.remote.log-remote-lifecycle-events = off
#akka.scheduler.tick-duration = 33 ms
akka.actor.default-dispatcher.fork-join-executor {
parallelism-min = 8
parallelism-max = 8
}
akka.actor.deployment { akka.actor.deployment {
/master-node-1/workers { /master-node-1/workers {
router = round-robin router = round-robin
@ -148,16 +170,18 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
private def getDuration(name: String): FiniteDuration = Duration(getMilliseconds(name), MILLISECONDS) private def getDuration(name: String): FiniteDuration = Duration(getMilliseconds(name), MILLISECONDS)
val logStats = getBoolean("log-stats") val infolog = getBoolean("infolog")
val nFactor = getInt("nr-of-nodes-factor") val nFactor = getInt("nr-of-nodes-factor")
val totalNumberOfNodes = getInt("nr-of-nodes") * nFactor ensuring (
_ >= 10, "nr-of-nodes must be >= 10")
val numberOfSeedNodes = getInt("nr-of-seed-nodes") // not scaled by nodes factor val numberOfSeedNodes = getInt("nr-of-seed-nodes") // not scaled by nodes factor
val numberOfNodesJoiningToSeedNodesInitially = getInt("nr-of-nodes-joining-to-seed-initally") * nFactor val numberOfNodesJoiningToSeedNodesInitially = getInt("nr-of-nodes-joining-to-seed-initally") * nFactor
val numberOfNodesJoiningOneByOneSmall = getInt("nr-of-nodes-joining-one-by-one-small") * nFactor val numberOfNodesJoiningOneByOneSmall = getInt("nr-of-nodes-joining-one-by-one-small") * nFactor
val numberOfNodesJoiningOneByOneLarge = getInt("nr-of-nodes-joining-one-by-one-large") * nFactor val numberOfNodesJoiningOneByOneLarge = getInt("nr-of-nodes-joining-one-by-one-large") * nFactor
val numberOfNodesJoiningToOneNode = getInt("nr-of-nodes-joining-to-one") * nFactor val numberOfNodesJoiningToOneNode = getInt("nr-of-nodes-joining-to-one") * nFactor
val numberOfNodesJoiningToSeedNodes = getInt("nr-of-nodes-joining-to-seed") * nFactor // remaining will join to seed nodes
val numberOfNodesJoiningToSeedNodes = (totalNumberOfNodes - numberOfSeedNodes -
numberOfNodesJoiningToSeedNodesInitially - numberOfNodesJoiningOneByOneSmall -
numberOfNodesJoiningOneByOneLarge - numberOfNodesJoiningToOneNode) requiring (_ >= 0,
s"too many configured nr-of-nodes-joining-*, total must be <= ${totalNumberOfNodes}")
val numberOfNodesLeavingOneByOneSmall = getInt("nr-of-nodes-leaving-one-by-one-small") * nFactor val numberOfNodesLeavingOneByOneSmall = getInt("nr-of-nodes-leaving-one-by-one-small") * nFactor
val numberOfNodesLeavingOneByOneLarge = getInt("nr-of-nodes-leaving-one-by-one-large") * nFactor val numberOfNodesLeavingOneByOneLarge = getInt("nr-of-nodes-leaving-one-by-one-large") * nFactor
val numberOfNodesLeaving = getInt("nr-of-nodes-leaving") * nFactor val numberOfNodesLeaving = getInt("nr-of-nodes-leaving") * nFactor
@ -175,11 +199,13 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
val highThroughputDuration = getDuration("high-throughput-duration") * dFactor val highThroughputDuration = getDuration("high-throughput-duration") * dFactor
val supervisionDuration = getDuration("supervision-duration") * dFactor val supervisionDuration = getDuration("supervision-duration") * dFactor
val supervisionOneIteration = getDuration("supervision-one-iteration") * dFactor val supervisionOneIteration = getDuration("supervision-one-iteration") * dFactor
val idleGossipDuration = getDuration("idle-gossip-duration") * dFactor
val expectedTestDuration = getDuration("expected-test-duration") * dFactor val expectedTestDuration = getDuration("expected-test-duration") * dFactor
val treeWidth = getInt("tree-width") val treeWidth = getInt("tree-width")
val treeLevels = getInt("tree-levels") val treeLevels = getInt("tree-levels")
val reportMetricsInterval = getDuration("report-metrics-interval") val reportMetricsInterval = getDuration("report-metrics-interval")
val convergenceWithinFactor = getDouble("convergence-within-factor") val convergenceWithinFactor = getDouble("convergence-within-factor")
val exerciseActors = getBoolean("exercise-actors")
require(numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially + numberOfNodesJoiningOneByOneSmall + require(numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially + numberOfNodesJoiningOneByOneSmall +
numberOfNodesJoiningOneByOneLarge + numberOfNodesJoiningToOneNode + numberOfNodesJoiningToSeedNodes <= totalNumberOfNodes, numberOfNodesJoiningOneByOneLarge + numberOfNodesJoiningToOneNode + numberOfNodesJoiningToSeedNodes <= totalNumberOfNodes,
@ -191,10 +217,11 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
s"specified number of leaving/shutdown nodes <= ${totalNumberOfNodes - 3}") s"specified number of leaving/shutdown nodes <= ${totalNumberOfNodes - 3}")
require(numberOfNodesJoinRemove <= totalNumberOfNodes, s"nr-of-nodes-join-remove must be <= ${totalNumberOfNodes}") require(numberOfNodesJoinRemove <= totalNumberOfNodes, s"nr-of-nodes-join-remove must be <= ${totalNumberOfNodes}")
}
// FIXME configurable number of nodes override def toString: String = {
for (n 1 to 13) role("node-" + n) testConfig.withFallback(ConfigFactory.parseString(s"nrOfNodes=${totalNumberOfNodes}")).root.render
}
}
implicit class FormattedDouble(val d: Double) extends AnyVal { implicit class FormattedDouble(val d: Double) extends AnyVal {
def form: String = d.formatted("%.2f") def form: String = d.formatted("%.2f")
@ -215,7 +242,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
*/ */
class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging { class ClusterResultAggregator(title: String, expectedResults: Int, settings: Settings) extends Actor with ActorLogging {
import settings.reportMetricsInterval import settings.reportMetricsInterval
import settings.logStats import settings.infolog
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
var reportTo: Option[ActorRef] = None var reportTo: Option[ActorRef] = None
var results = Vector.empty[ClusterResult] var results = Vector.empty[ClusterResult]
@ -246,13 +273,13 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
case PhiResult(from, phiValues) phiValuesObservedByNode += from -> phiValues case PhiResult(from, phiValues) phiValuesObservedByNode += from -> phiValues
case StatsResult(from, stats) clusterStatsObservedByNode += from -> stats case StatsResult(from, stats) clusterStatsObservedByNode += from -> stats
case ReportTick case ReportTick
if (logStats) if (infolog)
log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") log.info(s"[${title}] in progress\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}")
case r: ClusterResult case r: ClusterResult
results :+= r results :+= r
if (results.size == expectedResults) { if (results.size == expectedResults) {
val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats) val aggregated = AggregatedClusterResult(title, maxDuration, totalClusterStats)
if (logStats) if (infolog)
log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}") log.info(s"[${title}] completed in [${aggregated.duration.toMillis}] ms\n${aggregated.clusterStats}\n${formatMetrics}\n\n${formatPhi}\n\n${formatStats}")
reportTo foreach { _ ! aggregated } reportTo foreach { _ ! aggregated }
context stop self context stop self
@ -270,7 +297,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
(formatMetricsHeader +: (nodeMetrics.toSeq.sortBy(_.address) map formatMetricsLine)).mkString("\n") (formatMetricsHeader +: (nodeMetrics.toSeq.sortBy(_.address) map formatMetricsLine)).mkString("\n")
} }
def formatMetricsHeader: String = "Node\tHeap (MB)\tCPU (%)\tLoad" def formatMetricsHeader: String = "[Node]\t[Heap (MB)]\t[CPU (%)]\t[Load]"
def formatMetricsLine(nodeMetrics: NodeMetrics): String = { def formatMetricsLine(nodeMetrics: NodeMetrics): String = {
val heap = nodeMetrics match { val heap = nodeMetrics match {
@ -305,14 +332,14 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
} }
} }
def formatPhiHeader: String = "Monitor\tSubject\tcount\tcount phi > 1.0\tmax phi" def formatPhiHeader: String = "[Monitor]\t[Subject]\t[count]\t[count phi > 1.0]\t[max phi]"
def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String = def formatPhiLine(monitor: Address, subject: Address, phi: PhiValue): String =
s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}"
def formatStats: String = def formatStats: String =
(clusterStatsObservedByNode map { case (monitor, stats) s"${monitor}\t${stats}" }). (clusterStatsObservedByNode map { case (monitor, stats) s"${monitor}\t${stats}" }).
mkString("ClusterStats\n", "\n", "") mkString("ClusterStats(gossip, merge, same, newer, older)\n", "\n", "")
} }
/** /**
@ -332,7 +359,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
def formatHistory: String = def formatHistory: String =
(formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n") (formatHistoryHeader +: (history map formatHistoryLine)).mkString("\n")
def formatHistoryHeader: String = "title\tduration (ms)\tcluster stats" def formatHistoryHeader: String = "[Title]\t[Duration (ms)]\t[ClusterStats(gossip, merge, same, newer, older)]"
def formatHistoryLine(result: AggregatedClusterResult): String = def formatHistoryLine(result: AggregatedClusterResult): String =
s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}" s"${result.title}\t${result.duration.toMillis}\t${result.clusterStats}"
@ -385,7 +412,15 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiSet) } reportTo foreach { _ ! PhiResult(cluster.selfAddress, phiSet) }
case state: CurrentClusterState nodes = state.members.map(_.address) case state: CurrentClusterState nodes = state.members.map(_.address)
case memberEvent: MemberEvent nodes += memberEvent.member.address case memberEvent: MemberEvent nodes += memberEvent.member.address
case ReportTo(ref) reportTo = ref case ReportTo(ref)
reportTo foreach context.unwatch
reportTo = ref
reportTo foreach context.watch
case Terminated(ref)
reportTo match {
case Some(`ref`) reportTo = None
case _
}
case Reset case Reset
phiByNode = emptyPhiByNode phiByNode = emptyPhiByNode
nodes = Set.empty[Address] nodes = Set.empty[Address]
@ -414,7 +449,14 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats) val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats)
reportTo foreach { _ ! res } reportTo foreach { _ ! res }
case ReportTo(ref) case ReportTo(ref)
reportTo foreach context.unwatch
reportTo = ref reportTo = ref
reportTo foreach context.watch
case Terminated(ref)
reportTo match {
case Some(`ref`) reportTo = None
case _
}
case Reset case Reset
startStats = cluster.readView.latestStats startStats = cluster.readView.latestStats
} }
@ -527,7 +569,7 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
case TreeJob(id, payload, idx, levels, width) case TreeJob(id, payload, idx, levels, width)
// create the actors when first TreeJob message is received // create the actors when first TreeJob message is received
val totalActors = ((width * math.pow(width, levels) - 1) / (width - 1)).toInt val totalActors = ((width * math.pow(width, levels) - 1) / (width - 1)).toInt
log.info("Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children", log.debug("Creating [{}] actors in a tree structure of [{}] levels and each actor has [{}] children",
totalActors, levels, width) totalActors, levels, width)
val tree = context.actorOf(Props(classOf[TreeNode], levels, width), "tree") val tree = context.actorOf(Props(classOf[TreeNode], levels, width), "tree")
tree forward ((idx, SimpleJob(id, payload))) tree forward ((idx, SimpleJob(id, payload)))
@ -665,13 +707,48 @@ abstract class StressSpec
override def expectedTestDuration = settings.expectedTestDuration override def expectedTestDuration = settings.expectedTestDuration
override def shutdownTimeout: FiniteDuration = 30.seconds.dilated
override def muteLog(sys: ActorSystem = system): Unit = { override def muteLog(sys: ActorSystem = system): Unit = {
super.muteLog(sys) super.muteLog(sys)
sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*"))) sys.eventStream.publish(Mute(EventFilter[RuntimeException](pattern = ".*Simulated exception.*")))
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*PhiResult.*"))) muteDeadLetters(classOf[SimpleJob], classOf[AggregatedClusterResult], SendBatch.getClass,
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*SendBatch.*"))) classOf[StatsResult], classOf[PhiResult], RetryTick.getClass)(sys)
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*ClusterStats.*"))) }
muteDeadLetters("SimpleJob.*", "Tick.*", "AggregatedClusterResult.*")(sys)
def jvmInfo(): String = {
val runtime = ManagementFactory.getRuntimeMXBean
val os = ManagementFactory.getOperatingSystemMXBean
val threads = ManagementFactory.getThreadMXBean
val mem = ManagementFactory.getMemoryMXBean
val heap = mem.getHeapMemoryUsage
val sb = new StringBuilder
sb.append("Operating system: ").append(os.getName).append(", ").append(os.getArch).append(", ").append(os.getVersion)
sb.append("\n")
sb.append("JVM: ").append(runtime.getVmName).append(" ").append(runtime.getVmVendor).
append(" ").append(runtime.getVmVersion)
sb.append("\n")
sb.append("Processors: ").append(os.getAvailableProcessors)
sb.append("\n")
sb.append("Load average: ").append(os.getSystemLoadAverage)
sb.append("\n")
sb.append("Thread count: ").append(threads.getThreadCount).append(" (").append(threads.getPeakThreadCount).append(")")
sb.append("\n")
sb.append("Heap: ").append((heap.getUsed.toDouble / 1024 / 1024).form).
append(" (").append((heap.getInit.toDouble / 1024 / 1024).form).
append(" - ").
append((heap.getMax.toDouble / 1024 / 1024).form).
append(")").append(" MB")
sb.append("\n")
import scala.collection.JavaConverters._
val args = runtime.getInputArguments.asScala.filterNot(_.contains("classpath")).mkString("\n ")
sb.append("Args:\n ").append(args)
sb.append("\n")
sb.toString
} }
val seedNodes = roles.take(numberOfSeedNodes) val seedNodes = roles.take(numberOfSeedNodes)
@ -689,7 +766,7 @@ abstract class StressSpec
runOn(roles.head) { runOn(roles.head) {
val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings), val aggregator = system.actorOf(Props(classOf[ClusterResultAggregator], title, expectedResults, settings),
name = "result" + step) name = "result" + step)
if (includeInHistory) aggregator ! ReportTo(Some(clusterResultHistory)) if (includeInHistory && infolog) aggregator ! ReportTo(Some(clusterResultHistory))
else aggregator ! ReportTo(None) else aggregator ! ReportTo(None)
} }
enterBarrier("result-aggregator-created-" + step) enterBarrier("result-aggregator-created-" + step)
@ -706,7 +783,7 @@ abstract class StressSpec
} }
lazy val clusterResultHistory = lazy val clusterResultHistory =
if (settings.logStats) system.actorOf(Props[ClusterResultHistory], "resultHistory") if (settings.infolog) system.actorOf(Props[ClusterResultHistory], "resultHistory")
else system.deadLetters else system.deadLetters
lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver") lazy val phiObserver = system.actorOf(Props[PhiObserver], "phiObserver")
@ -803,11 +880,13 @@ abstract class StressSpec
reportResult { reportResult {
runOn(roles.head) { runOn(roles.head) {
if (shutdown) { if (shutdown) {
log.info("Shutting down [{}]", removeAddress) if (infolog)
log.info("Shutting down [{}]", removeAddress)
testConductor.exit(removeRole, 0).await testConductor.exit(removeRole, 0).await
} }
} }
awaitMembersUp(currentRoles.size, timeout = remaining) awaitMembersUp(currentRoles.size, timeout = remaining)
awaitAllReachable()
} }
} }
@ -836,11 +915,13 @@ abstract class StressSpec
reportResult { reportResult {
runOn(roles.head) { runOn(roles.head) {
if (shutdown) removeRoles.foreach { r if (shutdown) removeRoles.foreach { r
log.info("Shutting down [{}]", address(r)) if (infolog)
log.info("Shutting down [{}]", address(r))
testConductor.exit(r, 0).await testConductor.exit(r, 0).await
} }
} }
awaitMembersUp(currentRoles.size, timeout = remaining) awaitMembersUp(currentRoles.size, timeout = remaining)
awaitAllReachable()
} }
} }
awaitClusterResult() awaitClusterResult()
@ -891,6 +972,7 @@ abstract class StressSpec
nbrUsedRoles + activeRoles.size, nbrUsedRoles + activeRoles.size,
canNotBePartOfMemberRing = allPreviousAddresses, canNotBePartOfMemberRing = allPreviousAddresses,
timeout = remaining) timeout = remaining)
awaitAllReachable()
} }
val nextAddresses = clusterView.members.map(_.address) -- usedAddresses val nextAddresses = clusterView.members.map(_.address) -- usedAddresses
runOn(usedRoles: _*) { runOn(usedRoles: _*) {
@ -912,6 +994,7 @@ abstract class StressSpec
within(loopDuration) { within(loopDuration) {
runOn(usedRoles: _*) { runOn(usedRoles: _*) {
awaitMembersUp(nbrUsedRoles, timeout = remaining) awaitMembersUp(nbrUsedRoles, timeout = remaining)
awaitAllReachable()
phiObserver ! Reset phiObserver ! Reset
statsObserver ! Reset statsObserver ! Reset
} }
@ -930,6 +1013,7 @@ abstract class StressSpec
def exerciseRouters(title: String, duration: FiniteDuration, batchInterval: FiniteDuration, def exerciseRouters(title: String, duration: FiniteDuration, batchInterval: FiniteDuration,
expectDroppedMessages: Boolean, tree: Boolean): Unit = expectDroppedMessages: Boolean, tree: Boolean): Unit =
within(duration + 10.seconds) { within(duration + 10.seconds) {
nbrUsedRoles must be(totalNumberOfNodes)
createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false)
val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3) val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3)
@ -962,7 +1046,7 @@ abstract class StressSpec
def awaitWorkResult: WorkResult = { def awaitWorkResult: WorkResult = {
val workResult = expectMsgType[WorkResult] val workResult = expectMsgType[WorkResult]
if (settings.logStats) if (settings.infolog)
log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", masterName, log.info("{} result, [{}] jobs/s, retried [{}] of [{}] msg", masterName,
workResult.jobsPerSecond.form, workResult.jobsPerSecond.form,
workResult.retryCount, workResult.sendCount) workResult.retryCount, workResult.sendCount)
@ -982,30 +1066,40 @@ abstract class StressSpec
for (count 0 until rounds) { for (count 0 until rounds) {
createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false)
reportResult { val (masterRoles, otherRoles) = roles.take(nbrUsedRoles).splitAt(3)
roles.take(nbrUsedRoles) foreach { r runOn(masterRoles: _*) {
supervisor ! Props[RemoteChild].withDeploy(Deploy(scope = RemoteScope(address(r)))) reportResult {
} roles.take(nbrUsedRoles) foreach { r
supervisor ! GetChildrenCount supervisor ! Props[RemoteChild].withDeploy(Deploy(scope = RemoteScope(address(r))))
expectMsgType[ChildrenCount] must be(ChildrenCount(nbrUsedRoles, 0)) }
1 to 5 foreach { _ supervisor ! new RuntimeException("Simulated exception") }
awaitAssert {
supervisor ! GetChildrenCount supervisor ! GetChildrenCount
val c = expectMsgType[ChildrenCount] expectMsgType[ChildrenCount] must be(ChildrenCount(nbrUsedRoles, 0))
c must be(ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles))
}
// after 5 restart attempts the children should be stopped 1 to 5 foreach { _ supervisor ! new RuntimeException("Simulated exception") }
supervisor ! new RuntimeException("Simulated exception") awaitAssert {
awaitAssert { supervisor ! GetChildrenCount
supervisor ! GetChildrenCount val c = expectMsgType[ChildrenCount]
val c = expectMsgType[ChildrenCount] c must be(ChildrenCount(nbrUsedRoles, 5 * nbrUsedRoles))
// zero children }
c must be(ChildrenCount(0, 6 * nbrUsedRoles))
}
supervisor ! Reset
// after 5 restart attempts the children should be stopped
supervisor ! new RuntimeException("Simulated exception")
awaitAssert {
supervisor ! GetChildrenCount
val c = expectMsgType[ChildrenCount]
// zero children
c must be(ChildrenCount(0, 6 * nbrUsedRoles))
}
supervisor ! Reset
}
enterBarrier("supervision-done-" + step)
}
runOn(otherRoles: _*) {
reportResult {
enterBarrier("supervision-done-" + step)
}
} }
awaitClusterResult() awaitClusterResult()
@ -1013,8 +1107,28 @@ abstract class StressSpec
} }
} }
def idleGossip(title: String): Unit = {
createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = true)
reportResult {
clusterView.members.size must be(nbrUsedRoles)
Thread.sleep(idleGossipDuration.toMillis)
clusterView.members.size must be(nbrUsedRoles)
}
awaitClusterResult()
}
"A cluster under stress" must { "A cluster under stress" must {
"log settings" taggedAs LongRunningTest in {
if (infolog) {
log.info("StressSpec JVM:\n{}", jvmInfo)
runOn(roles.head) {
log.info("StressSpec settings:\n{}", settings)
}
}
enterBarrier("after-" + step)
}
"join seed nodes" taggedAs LongRunningTest in within(30 seconds) { "join seed nodes" taggedAs LongRunningTest in within(30 seconds) {
val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially) val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially)
@ -1040,7 +1154,6 @@ abstract class StressSpec
system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false), system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false),
name = "master-" + myself.name) ! Begin name = "master-" + myself.name) ! Begin
} }
enterBarrier("after-" + step)
} }
"join nodes one-by-one to small cluster" taggedAs LongRunningTest in { "join nodes one-by-one to small cluster" taggedAs LongRunningTest in {
@ -1055,8 +1168,10 @@ abstract class StressSpec
} }
"join several nodes to seed nodes" taggedAs LongRunningTest in { "join several nodes to seed nodes" taggedAs LongRunningTest in {
joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = true) if (numberOfNodesJoiningToSeedNodes > 0) {
nbrUsedRoles += numberOfNodesJoiningToSeedNodes joinSeveral(numberOfNodesJoiningToSeedNodes, toSeedNodes = true)
nbrUsedRoles += numberOfNodesJoiningToSeedNodes
}
enterBarrier("after-" + step) enterBarrier("after-" + step)
} }
@ -1066,40 +1181,50 @@ abstract class StressSpec
} }
"end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) { "end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) {
runOn(roles.take(3): _*) { if (exerciseActors) {
val m = master runOn(roles.take(3): _*) {
m must not be (None) val m = master
m.get.tell(End, testActor) m must not be (None)
val workResult = awaitWorkResult m.get.tell(End, testActor)
workResult.retryCount must be(0) val workResult = awaitWorkResult
workResult.sendCount must be > (0L) workResult.retryCount must be(0)
workResult.ackCount must be > (0L) workResult.sendCount must be > (0L)
workResult.ackCount must be > (0L)
}
enterBarrier("after-" + step)
} }
enterBarrier("after-" + step)
} }
"use routers with normal throughput" taggedAs LongRunningTest in { "use routers with normal throughput" taggedAs LongRunningTest in {
exerciseRouters("use routers with normal throughput", normalThroughputDuration, if (exerciseActors) {
batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false) exerciseRouters("use routers with normal throughput", normalThroughputDuration,
enterBarrier("after-" + step) batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false)
enterBarrier("after-" + step)
}
} }
"use routers with high throughput" taggedAs LongRunningTest in { "use routers with high throughput" taggedAs LongRunningTest in {
exerciseRouters("use routers with high throughput", highThroughputDuration, if (exerciseActors) {
batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false) exerciseRouters("use routers with high throughput", highThroughputDuration,
enterBarrier("after-" + step) batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false)
enterBarrier("after-" + step)
}
} }
"use many actors with normal throughput" taggedAs LongRunningTest in { "use many actors with normal throughput" taggedAs LongRunningTest in {
exerciseRouters("use many actors with normal throughput", normalThroughputDuration, if (exerciseActors) {
batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true) exerciseRouters("use many actors with normal throughput", normalThroughputDuration,
enterBarrier("after-" + step) batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true)
enterBarrier("after-" + step)
}
} }
"use many actors with high throughput" taggedAs LongRunningTest in { "use many actors with high throughput" taggedAs LongRunningTest in {
exerciseRouters("use many actors with high throughput", highThroughputDuration, if (exerciseActors) {
batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true) exerciseRouters("use many actors with high throughput", highThroughputDuration,
enterBarrier("after-" + step) batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true)
enterBarrier("after-" + step)
}
} }
"exercise join/remove/join/remove" taggedAs LongRunningTest in { "exercise join/remove/join/remove" taggedAs LongRunningTest in {
@ -1108,16 +1233,25 @@ abstract class StressSpec
} }
"exercise supervision" taggedAs LongRunningTest in { "exercise supervision" taggedAs LongRunningTest in {
exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration) if (exerciseActors) {
exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration)
enterBarrier("after-" + step)
}
}
"gossip when idle" taggedAs LongRunningTest in {
idleGossip("idle gossip")
enterBarrier("after-" + step) enterBarrier("after-" + step)
} }
"start routers that are running while nodes are removed" taggedAs LongRunningTest in { "start routers that are running while nodes are removed" taggedAs LongRunningTest in {
runOn(roles.take(3): _*) { if (exerciseActors) {
system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false), runOn(roles.take(3): _*) {
name = "master-" + myself.name) ! Begin system.actorOf(Props(classOf[Master], settings, settings.workBatchInterval, false),
name = "master-" + myself.name) ! Begin
}
enterBarrier("after-" + step)
} }
enterBarrier("after-" + step)
} }
"leave nodes one-by-one from large cluster" taggedAs LongRunningTest in { "leave nodes one-by-one from large cluster" taggedAs LongRunningTest in {
@ -1142,27 +1276,36 @@ abstract class StressSpec
enterBarrier("after-" + step) enterBarrier("after-" + step)
} }
"leave nodes one-by-one from small cluster" taggedAs LongRunningTest in {
removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false)
enterBarrier("after-" + step)
}
"shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in { "shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in {
removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true) removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true)
enterBarrier("after-" + step) enterBarrier("after-" + step)
} }
"end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) { "leave nodes one-by-one from small cluster" taggedAs LongRunningTest in {
runOn(roles.take(3): _*) { removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false)
val m = master
m must not be (None)
m.get.tell(End, testActor)
val workResult = awaitWorkResult
workResult.sendCount must be > (0L)
workResult.ackCount must be > (0L)
}
enterBarrier("after-" + step) enterBarrier("after-" + step)
} }
"end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) {
if (exerciseActors) {
runOn(roles.take(3): _*) {
val m = master
m must not be (None)
m.get.tell(End, testActor)
val workResult = awaitWorkResult
workResult.sendCount must be > (0L)
workResult.ackCount must be > (0L)
}
enterBarrier("after-" + step)
}
}
"log jvm info" taggedAs LongRunningTest in {
if (infolog) {
log.info("StressSpec JVM:\n{}", jvmInfo)
enterBarrier("after-" + step)
}
}
} }
} }

View file

@ -269,7 +269,6 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
} }
} }
system.shutdown() system.shutdown()
val shutdownTimeout = 5.seconds.dilated
try system.awaitTermination(shutdownTimeout) catch { try system.awaitTermination(shutdownTimeout) catch {
case _: TimeoutException case _: TimeoutException
val msg = "Failed to stop [%s] within [%s] \n%s".format(system.name, shutdownTimeout, val msg = "Failed to stop [%s] within [%s] \n%s".format(system.name, shutdownTimeout,
@ -280,6 +279,8 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
afterTermination() afterTermination()
} }
def shutdownTimeout: FiniteDuration = 5.seconds.dilated
/** /**
* Override this and return `true` to assert that the * Override this and return `true` to assert that the
* shutdown of the `ActorSystem` was done properly. * shutdown of the `ActorSystem` was done properly.
@ -359,12 +360,12 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
*/ */
def node(role: RoleName): ActorPath = RootActorPath(testConductor.getAddressFor(role).await) def node(role: RoleName): ActorPath = RootActorPath(testConductor.getAddressFor(role).await)
def muteDeadLetters(endPatterns: String*)(sys: ActorSystem = system): Unit = def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): Unit =
if (!sys.log.isDebugEnabled) { if (!sys.log.isDebugEnabled) {
def mute(suffix: String): Unit = def mute(clazz: Class[_]): Unit =
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead.*" + suffix))) sys.eventStream.publish(Mute(DeadLettersFilter(clazz)(occurrences = Int.MaxValue)))
if (endPatterns.isEmpty) mute("") if (messageClasses.isEmpty) mute(classOf[AnyRef])
else endPatterns foreach mute else messageClasses foreach mute
} }
/** /**

View file

@ -91,7 +91,10 @@ class RemoteWatcherSpec extends AkkaSpec(
val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress val remoteAddress = remoteSystem.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid def remoteAddressUid = AddressUidExtension(remoteSystem).addressUid
Seq(system, remoteSystem).foreach(muteDeadLetters("Disassociated.*", "DisassociateUnderlying.*")(_)) Seq(system, remoteSystem).foreach(muteDeadLetters(
akka.remote.transport.AssociationHandle.Disassociated.getClass,
akka.remote.transport.ActorTransportAdapter.DisassociateUnderlying.getClass,
akka.dispatch.NullMessage.getClass)(_))
override def afterTermination() { override def afterTermination() {
remoteSystem.shutdown() remoteSystem.shutdown()

View file

@ -4,7 +4,6 @@
package akka.testkit package akka.testkit
import language.existentials import language.existentials
import scala.util.matching.Regex import scala.util.matching.Regex
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.duration.Duration import scala.concurrent.duration.Duration
@ -16,6 +15,7 @@ import akka.event.Logging
import akka.actor.NoSerializationVerificationNeeded import akka.actor.NoSerializationVerificationNeeded
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import java.lang.{ Iterable JIterable } import java.lang.{ Iterable JIterable }
import akka.util.BoxedType
/** /**
* Implementation helpers of the EventFilter facilities: send `Mute` * Implementation helpers of the EventFilter facilities: send `Mute`
@ -445,6 +445,25 @@ case class CustomEventFilter(test: PartialFunction[LogEvent, Boolean])(occurrenc
} }
} }
object DeadLettersFilter {
def apply[T](implicit t: ClassTag[T]): DeadLettersFilter =
new DeadLettersFilter(t.runtimeClass.asInstanceOf[Class[T]])(Int.MaxValue)
}
/**
* Filter which matches DeadLetter events, if the wrapped message conforms to the
* given type.
*/
case class DeadLettersFilter(val messageClass: Class[_])(occurrences: Int) extends EventFilter(occurrences) {
def matches(event: LogEvent) = {
event match {
case Warning(_, _, msg) BoxedType(messageClass) isInstance msg
case _ false
}
}
}
/** /**
* EventListener for running tests, which allows selectively filtering out * EventListener for running tests, which allows selectively filtering out
* expected messages. To use it, include something like this into * expected messages. To use it, include something like this into
@ -469,15 +488,16 @@ class TestEventListener extends Logging.DefaultLogger {
case Mute(filters) filters foreach addFilter case Mute(filters) filters foreach addFilter
case UnMute(filters) filters foreach removeFilter case UnMute(filters) filters foreach removeFilter
case event: LogEvent if (!filter(event)) print(event) case event: LogEvent if (!filter(event)) print(event)
case DeadLetter(msg: SystemMessage, _, rcp)
if (!msg.isInstanceOf[Terminate]) {
val event = Warning(rcp.path.toString, rcp.getClass, "received dead system message: " + msg)
if (!filter(event)) print(event)
}
case DeadLetter(msg, snd, rcp) case DeadLetter(msg, snd, rcp)
if (!msg.isInstanceOf[Terminated]) { if (!msg.isInstanceOf[Terminate]) {
val event = Warning(rcp.path.toString, rcp.getClass, "received dead letter from " + snd + ": " + msg) val event = Warning(rcp.path.toString, rcp.getClass, msg)
if (!filter(event)) print(event) if (!filter(event)) {
val msgStr =
if (msg.isInstanceOf[SystemMessage]) "received dead system message: " + msg
else "received dead letter from " + snd + ": " + msg
val event2 = Warning(rcp.path.toString, rcp.getClass, msgStr)
if (!filter(event2)) print(event2)
}
} }
case UnhandledMessage(msg, sender, rcp) case UnhandledMessage(msg, sender, rcp)
val event = Warning(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg) val event = Warning(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg)

View file

@ -238,7 +238,7 @@ trait TestKitBase {
* Note that the timeout is scaled using Duration.dilated, * Note that the timeout is scaled using Duration.dilated,
* which uses the configuration entry "akka.test.timefactor". * which uses the configuration entry "akka.test.timefactor".
*/ */
def awaitAssert(a: Any, max: Duration = Duration.Undefined, interval: Duration = 100.millis) { def awaitAssert(a: Any, max: Duration = Duration.Undefined, interval: Duration = 800.millis) {
val _max = remainingOrDilated(max) val _max = remainingOrDilated(max)
val stop = now + _max val stop = now + _max

View file

@ -94,12 +94,12 @@ abstract class AkkaSpec(_system: ActorSystem)
override def expectedTestDuration: FiniteDuration = 60 seconds override def expectedTestDuration: FiniteDuration = 60 seconds
def muteDeadLetters(endPatterns: String*)(sys: ActorSystem = system): Unit = def muteDeadLetters(messageClasses: Class[_]*)(sys: ActorSystem = system): Unit =
if (!sys.log.isDebugEnabled) { if (!sys.log.isDebugEnabled) {
def mute(suffix: String): Unit = def mute(clazz: Class[_]): Unit =
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead.*" + suffix))) sys.eventStream.publish(Mute(DeadLettersFilter(clazz)(occurrences = Int.MaxValue)))
if (endPatterns.isEmpty) mute("") if (messageClasses.isEmpty) mute(classOf[AnyRef])
else endPatterns foreach mute else messageClasses foreach mute
} }
} }

View file

@ -580,13 +580,14 @@ object AkkaBuild extends Build {
// multinode.D= and multinode.X= makes it possible to pass arbitrary // multinode.D= and multinode.X= makes it possible to pass arbitrary
// -D or -X arguments to the forked jvm, e.g. // -D or -X arguments to the forked jvm, e.g.
// -Dmultinode.Djava.net.preferIPv4Stack=true -Dmultinode.Xmx512m -Dmultinode.XX:MaxPermSize=256M // -Dmultinode.Djava.net.preferIPv4Stack=true -Dmultinode.Xmx512m -Dmultinode.XX:MaxPermSize=256M
// -DMultiJvm.akka.cluster.Stress.nrOfNodes=15
val MultinodeJvmArgs = "multinode\\.(D|X)(.*)".r val MultinodeJvmArgs = "multinode\\.(D|X)(.*)".r
val knownPrefix = Set("multnode.", "akka.", "MultiJvm.")
val akkaProperties = System.getProperties.propertyNames.asScala.toList.collect { val akkaProperties = System.getProperties.propertyNames.asScala.toList.collect {
case MultinodeJvmArgs(a, b) => case MultinodeJvmArgs(a, b) =>
val value = System.getProperty("multinode." + a + b) val value = System.getProperty("multinode." + a + b)
"-" + a + b + (if (value == "") "" else "=" + value) "-" + a + b + (if (value == "") "" else "=" + value)
case key: String if key.startsWith("multinode.") => "-D" + key + "=" + System.getProperty(key) case key: String if knownPrefix.exists(pre => key.startsWith(pre)) => "-D" + key + "=" + System.getProperty(key)
case key: String if key.startsWith("akka.") => "-D" + key + "=" + System.getProperty(key)
} }
"-Xmx256m" :: akkaProperties ::: "-Xmx256m" :: akkaProperties :::