Merge pull request #21459 from akka/wip-21458-StressSpec-patriknw

make cluster.StressSpec pass with Artery, #21458
This commit is contained in:
Patrik Nordwall 2016-09-16 12:59:17 +02:00 committed by GitHub
commit 9b1b6a9a65
6 changed files with 344 additions and 211 deletions

View file

@ -83,12 +83,14 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro
private val cachedAddresses = new ConcurrentHashMap[RoleName, Address]
override def atStartup(): Unit = {
override protected def atStartup(): Unit = {
startCoroner()
muteLog()
self.atStartup()
}
override def afterTermination(): Unit = {
override protected def afterTermination(): Unit = {
self.afterTermination()
stopCoroner()
}

View file

@ -0,0 +1,105 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import java.io.File
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Consumer
import scala.annotation.tailrec
import scala.util.control.NonFatal
import akka.remote.RemoteSettings
import akka.remote.artery.ArterySettings
import akka.remote.artery.TaskRunner
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import com.typesafe.config.ConfigFactory
import io.aeron.driver.MediaDriver
import io.aeron.driver.ThreadingMode
import org.agrona.IoUtil
object SharedMediaDriverSupport {
private val mediaDriver = new AtomicReference[Option[MediaDriver]](None)
def loadArterySettings(config: MultiNodeConfig): ArterySettings =
(new RemoteSettings(ConfigFactory.load(config.config))).Artery
def startMediaDriver(config: MultiNodeConfig): Unit = {
val arterySettings = loadArterySettings(config)
if (arterySettings.Enabled) {
val aeronDir = arterySettings.Advanced.AeronDirectoryName
require(aeronDir.nonEmpty, "aeron-dir must be defined")
val driverContext = new MediaDriver.Context
driverContext.aeronDirectoryName(aeronDir)
driverContext.clientLivenessTimeoutNs(arterySettings.Advanced.ClientLivenessTimeout.toNanos)
driverContext.imageLivenessTimeoutNs(arterySettings.Advanced.ImageLivenessTimeoutNs.toNanos)
driverContext.driverTimeoutMs(arterySettings.Advanced.DriverTimeout.toMillis)
val idleCpuLevel = arterySettings.Advanced.IdleCpuLevel
driverContext
.threadingMode(ThreadingMode.SHARED)
.sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
// Check if the media driver is already started by another multi-node jvm.
// It checks more than one time with a sleep inbetween. The number of checks
// depends on the multi-node index (i).
@tailrec def isDriverInactive(i: Int): Boolean = {
if (i < 0) true
else {
val active = driverContext.isDriverActive(5000, new Consumer[String] {
override def accept(msg: String): Unit = {
println(msg)
}
})
if (active) false
else {
Thread.sleep(500)
isDriverInactive(i - 1)
}
}
}
try {
if (isDriverInactive(MultiNodeSpec.selfIndex)) {
val driver = MediaDriver.launchEmbedded(driverContext)
println(s"Started media driver in directory [${driver.aeronDirectoryName}]")
if (!mediaDriver.compareAndSet(None, Some(driver))) {
throw new IllegalStateException("media driver started more than once")
}
}
} catch {
case NonFatal(e)
println(s"Failed to start media driver in [${aeronDir}]: ${e.getMessage}")
}
}
}
def isMediaDriverRunningByThisNode: Boolean = mediaDriver.get.isDefined
def stopMediaDriver(config: MultiNodeConfig): Unit = {
val maybeDriver = mediaDriver.getAndSet(None)
maybeDriver.foreach { driver
val arterySettings = loadArterySettings(config)
// let other nodes shutdown first
Thread.sleep(5000)
driver.close()
try {
if (arterySettings.Advanced.DeleteAeronDirectory) {
IoUtil.delete(new File(driver.aeronDirectoryName), false)
}
} catch {
case NonFatal(e)
println(
s"Couldn't delete Aeron embedded media driver files in [${driver.aeronDirectoryName}] " +
s"due to [${e.getMessage}]")
}
}
}
}

View file

@ -45,6 +45,7 @@ import akka.actor.ActorIdentity
import akka.util.Helpers.ConfigOps
import akka.util.Helpers.Requiring
import java.lang.management.ManagementFactory
import akka.remote.RARP
/**
* This test is intended to be used as long running stress test
@ -134,6 +135,12 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = off
akka.remote.artery.advanced {
idle-cpu-level = 1
embedded-media-driver = off
aeron-dir = "target/aeron-StressSpec"
}
akka.actor.default-dispatcher.fork-join-executor {
parallelism-min = 8
parallelism-max = 8
@ -699,8 +706,11 @@ class StressMultiJvmNode12 extends StressSpec
class StressMultiJvmNode13 extends StressSpec
abstract class StressSpec
extends MultiNodeSpec(StressMultiJvmSpec)
with MultiNodeClusterSpec with BeforeAndAfterEach with ImplicitSender {
extends MultiNodeSpec({
// Aeron media driver must be started before ActorSystem
SharedMediaDriverSupport.startMediaDriver(StressMultiJvmSpec)
StressMultiJvmSpec
}) with MultiNodeClusterSpec with BeforeAndAfterEach with ImplicitSender {
import StressMultiJvmSpec._
import ClusterEvent._
@ -726,6 +736,20 @@ abstract class StressSpec
classOf[StatsResult], classOf[PhiResult], RetryTick.getClass)(sys)
}
override protected def afterTermination(): Unit = {
SharedMediaDriverSupport.stopMediaDriver(StressMultiJvmSpec)
super.afterTermination()
}
Runtime.getRuntime.addShutdownHook(new Thread {
override def run(): Unit = {
if (SharedMediaDriverSupport.isMediaDriverRunningByThisNode)
println("Abrupt exit of JVM without closing media driver. This should not happen and may cause test failure.")
}
})
def isArteryEnabled: Boolean = RARP(system).provider.remoteSettings.Artery.Enabled
def jvmInfo(): String = {
val runtime = ManagementFactory.getRuntimeMXBean
val os = ManagementFactory.getOperatingSystemMXBean
@ -1129,200 +1153,199 @@ abstract class StressSpec
"A cluster under stress" must {
"TODO work with artery" in (pending)
// "log settings" taggedAs LongRunningTest in {
// if (infolog) {
// log.info("StressSpec JVM:\n{}", jvmInfo)
// runOn(roles.head) {
// log.info("StressSpec settings:\n{}", settings)
// }
// }
// enterBarrier("after-" + step)
// }
//
// "join seed nodes" taggedAs LongRunningTest in within(30 seconds) {
//
// val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially)
// val size = seedNodes.size + otherNodesJoiningSeedNodes.size
//
// createResultAggregator("join seed nodes", expectedResults = size, includeInHistory = true)
//
// runOn((seedNodes ++ otherNodesJoiningSeedNodes): _*) {
// reportResult {
// cluster.joinSeedNodes(seedNodes.toIndexedSeq map address)
// awaitMembersUp(size, timeout = remainingOrDefault)
// }
// }
//
// awaitClusterResult()
//
// nbrUsedRoles += size
// enterBarrier("after-" + step)
// }
//
// "start routers that are running while nodes are joining" taggedAs LongRunningTest in {
// runOn(roles.take(3): _*) {
// system.actorOf(
// Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
// name = masterName) ! Begin
// }
// }
//
// "join nodes one-by-one to small cluster" taggedAs LongRunningTest in {
// joinOneByOne(numberOfNodesJoiningOneByOneSmall)
// enterBarrier("after-" + step)
// }
//
// "join several nodes to one node" taggedAs LongRunningTest in {
// joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = false)
// nbrUsedRoles += numberOfNodesJoiningToOneNode
// enterBarrier("after-" + step)
// }
//
// "join several nodes to seed nodes" taggedAs LongRunningTest in {
// if (numberOfNodesJoiningToSeedNodes > 0) {
// joinSeveral(numberOfNodesJoiningToSeedNodes, toSeedNodes = true)
// nbrUsedRoles += numberOfNodesJoiningToSeedNodes
// }
// enterBarrier("after-" + step)
// }
//
// "join nodes one-by-one to large cluster" taggedAs LongRunningTest in {
// joinOneByOne(numberOfNodesJoiningOneByOneLarge)
// enterBarrier("after-" + step)
// }
//
// "end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) {
// if (exerciseActors) {
// runOn(roles.take(3): _*) {
// master match {
// case Some(m)
// m.tell(End, testActor)
// val workResult = awaitWorkResult(m)
// workResult.retryCount should ===(0)
// workResult.sendCount should be > (0L)
// workResult.ackCount should be > (0L)
// case None fail("master not running")
// }
// }
// }
// enterBarrier("after-" + step)
// }
//
// "use routers with normal throughput" taggedAs LongRunningTest in {
// if (exerciseActors) {
// exerciseRouters("use routers with normal throughput", normalThroughputDuration,
// batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false)
// }
// enterBarrier("after-" + step)
// }
//
// "use routers with high throughput" taggedAs LongRunningTest in {
// if (exerciseActors) {
// exerciseRouters("use routers with high throughput", highThroughputDuration,
// batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false)
// }
// enterBarrier("after-" + step)
// }
//
// "use many actors with normal throughput" taggedAs LongRunningTest in {
// if (exerciseActors) {
// exerciseRouters("use many actors with normal throughput", normalThroughputDuration,
// batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true)
// }
// enterBarrier("after-" + step)
// }
//
// "use many actors with high throughput" taggedAs LongRunningTest in {
// if (exerciseActors) {
// exerciseRouters("use many actors with high throughput", highThroughputDuration,
// batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true)
// }
// enterBarrier("after-" + step)
// }
//
// "exercise join/remove/join/remove" taggedAs LongRunningTest in {
// exerciseJoinRemove("exercise join/remove", joinRemoveDuration)
// enterBarrier("after-" + step)
// }
//
// "exercise supervision" taggedAs LongRunningTest in {
// if (exerciseActors) {
// exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration)
// }
// enterBarrier("after-" + step)
// }
//
// "gossip when idle" taggedAs LongRunningTest in {
// idleGossip("idle gossip")
// enterBarrier("after-" + step)
// }
//
// "start routers that are running while nodes are removed" taggedAs LongRunningTest in {
// if (exerciseActors) {
// runOn(roles.take(3): _*) {
// system.actorOf(
// Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
// name = masterName) ! Begin
// }
// }
// enterBarrier("after-" + step)
// }
//
// "leave nodes one-by-one from large cluster" taggedAs LongRunningTest in {
// removeOneByOne(numberOfNodesLeavingOneByOneLarge, shutdown = false)
// enterBarrier("after-" + step)
// }
//
// "shutdown nodes one-by-one from large cluster" taggedAs LongRunningTest in {
// removeOneByOne(numberOfNodesShutdownOneByOneLarge, shutdown = true)
// enterBarrier("after-" + step)
// }
//
// "leave several nodes" taggedAs LongRunningTest in {
// removeSeveral(numberOfNodesLeaving, shutdown = false)
// nbrUsedRoles -= numberOfNodesLeaving
// enterBarrier("after-" + step)
// }
//
// "shutdown several nodes" taggedAs LongRunningTest in {
// removeSeveral(numberOfNodesShutdown, shutdown = true)
// nbrUsedRoles -= numberOfNodesShutdown
// enterBarrier("after-" + step)
// }
//
// "shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in {
// removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true)
// enterBarrier("after-" + step)
// }
//
// "leave nodes one-by-one from small cluster" taggedAs LongRunningTest in {
// removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false)
// enterBarrier("after-" + step)
// }
//
// "end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) {
// if (exerciseActors) {
// runOn(roles.take(3): _*) {
// master match {
// case Some(m)
// m.tell(End, testActor)
// val workResult = awaitWorkResult(m)
// workResult.sendCount should be > (0L)
// workResult.ackCount should be > (0L)
// case None fail("master not running")
// }
// }
// }
// enterBarrier("after-" + step)
// }
//
// "log jvm info" taggedAs LongRunningTest in {
// if (infolog) {
// log.info("StressSpec JVM:\n{}", jvmInfo)
// }
// enterBarrier("after-" + step)
// }
"log settings" taggedAs LongRunningTest in {
if (infolog) {
log.info("StressSpec JVM:\n{}", jvmInfo)
runOn(roles.head) {
log.info("StressSpec settings:\n{}", settings)
}
}
enterBarrier("after-" + step)
}
"join seed nodes" taggedAs LongRunningTest in within(30 seconds) {
val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially)
val size = seedNodes.size + otherNodesJoiningSeedNodes.size
createResultAggregator("join seed nodes", expectedResults = size, includeInHistory = true)
runOn((seedNodes ++ otherNodesJoiningSeedNodes): _*) {
reportResult {
cluster.joinSeedNodes(seedNodes.toIndexedSeq map address)
awaitMembersUp(size, timeout = remainingOrDefault)
}
}
awaitClusterResult()
nbrUsedRoles += size
enterBarrier("after-" + step)
}
"start routers that are running while nodes are joining" taggedAs LongRunningTest in {
runOn(roles.take(3): _*) {
system.actorOf(
Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
name = masterName) ! Begin
}
}
"join nodes one-by-one to small cluster" taggedAs LongRunningTest in {
joinOneByOne(numberOfNodesJoiningOneByOneSmall)
enterBarrier("after-" + step)
}
"join several nodes to one node" taggedAs LongRunningTest in {
joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = false)
nbrUsedRoles += numberOfNodesJoiningToOneNode
enterBarrier("after-" + step)
}
"join several nodes to seed nodes" taggedAs LongRunningTest in {
if (numberOfNodesJoiningToSeedNodes > 0) {
joinSeveral(numberOfNodesJoiningToSeedNodes, toSeedNodes = true)
nbrUsedRoles += numberOfNodesJoiningToSeedNodes
}
enterBarrier("after-" + step)
}
"join nodes one-by-one to large cluster" taggedAs LongRunningTest in {
joinOneByOne(numberOfNodesJoiningOneByOneLarge)
enterBarrier("after-" + step)
}
"end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) {
if (exerciseActors) {
runOn(roles.take(3): _*) {
master match {
case Some(m)
m.tell(End, testActor)
val workResult = awaitWorkResult(m)
workResult.retryCount should ===(0)
workResult.sendCount should be > (0L)
workResult.ackCount should be > (0L)
case None fail("master not running")
}
}
}
enterBarrier("after-" + step)
}
"use routers with normal throughput" taggedAs LongRunningTest in {
if (exerciseActors) {
exerciseRouters("use routers with normal throughput", normalThroughputDuration,
batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false)
}
enterBarrier("after-" + step)
}
"use routers with high throughput" taggedAs LongRunningTest in {
if (exerciseActors) {
exerciseRouters("use routers with high throughput", highThroughputDuration,
batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false)
}
enterBarrier("after-" + step)
}
"use many actors with normal throughput" taggedAs LongRunningTest in {
if (exerciseActors) {
exerciseRouters("use many actors with normal throughput", normalThroughputDuration,
batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true)
}
enterBarrier("after-" + step)
}
"use many actors with high throughput" taggedAs LongRunningTest in {
if (exerciseActors) {
exerciseRouters("use many actors with high throughput", highThroughputDuration,
batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true)
}
enterBarrier("after-" + step)
}
"exercise join/remove/join/remove" taggedAs LongRunningTest in {
exerciseJoinRemove("exercise join/remove", joinRemoveDuration)
enterBarrier("after-" + step)
}
"exercise supervision" taggedAs LongRunningTest in {
if (exerciseActors) {
exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration)
}
enterBarrier("after-" + step)
}
"gossip when idle" taggedAs LongRunningTest in {
idleGossip("idle gossip")
enterBarrier("after-" + step)
}
"start routers that are running while nodes are removed" taggedAs LongRunningTest in {
if (exerciseActors) {
runOn(roles.take(3): _*) {
system.actorOf(
Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local),
name = masterName) ! Begin
}
}
enterBarrier("after-" + step)
}
"leave nodes one-by-one from large cluster" taggedAs LongRunningTest in {
removeOneByOne(numberOfNodesLeavingOneByOneLarge, shutdown = false)
enterBarrier("after-" + step)
}
"shutdown nodes one-by-one from large cluster" taggedAs LongRunningTest in {
removeOneByOne(numberOfNodesShutdownOneByOneLarge, shutdown = true)
enterBarrier("after-" + step)
}
"leave several nodes" taggedAs LongRunningTest in {
removeSeveral(numberOfNodesLeaving, shutdown = false)
nbrUsedRoles -= numberOfNodesLeaving
enterBarrier("after-" + step)
}
"shutdown several nodes" taggedAs LongRunningTest in {
removeSeveral(numberOfNodesShutdown, shutdown = true)
nbrUsedRoles -= numberOfNodesShutdown
enterBarrier("after-" + step)
}
"shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in {
removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true)
enterBarrier("after-" + step)
}
"leave nodes one-by-one from small cluster" taggedAs LongRunningTest in {
removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false)
enterBarrier("after-" + step)
}
"end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) {
if (exerciseActors) {
runOn(roles.take(3): _*) {
master match {
case Some(m)
m.tell(End, testActor)
val workResult = awaitWorkResult(m)
workResult.sendCount should be > (0L)
workResult.ackCount should be > (0L)
case None fail("master not running")
}
}
}
enterBarrier("after-" + step)
}
"log jvm info" taggedAs LongRunningTest in {
if (infolog) {
log.info("StressSpec JVM:\n{}", jvmInfo)
}
enterBarrier("after-" + step)
}
}
}

View file

@ -97,7 +97,7 @@ abstract class MultiNodeConfig {
_roles(MultiNodeSpec.selfIndex)
}
private[testkit] def config: Config = {
private[akka] def config: Config = {
val transportConfig =
if (_testTransport) ConfigFactory.parseString(
"""

View file

@ -462,7 +462,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
val driver = MediaDriver.launchEmbedded(driverContext)
log.debug("Started embedded media driver in directory [{}]", driver.aeronDirectoryName)
log.info("Started embedded media driver in directory [{}]", driver.aeronDirectoryName)
topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName().getBytes("US-ASCII"))
Runtime.getRuntime.addShutdownHook(stopMediaDriverShutdownHook)
if (!mediaDriver.compareAndSet(None, Some(driver))) {

View file

@ -4,16 +4,20 @@
package akka.remote.artery
import java.util.concurrent.TimeUnit.MICROSECONDS
import scala.util.control.NonFatal
import akka.actor.ExtendedActorSystem
import akka.dispatch.AbstractNodeQueue
import akka.event.Logging
import org.agrona.concurrent.BackoffIdleStrategy
import java.util.concurrent.TimeUnit.MILLISECONDS
import scala.annotation.tailrec
import scala.reflect.ClassTag
import org.agrona.concurrent.IdleStrategy
import org.agrona.concurrent.BusySpinIdleStrategy
import scala.util.control.NonFatal
import akka.actor.ExtendedActorSystem
import akka.dispatch.AbstractNodeQueue
import akka.dispatch.MonitorableThreadFactory
import akka.event.Logging
import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.concurrent.BusySpinIdleStrategy
import org.agrona.concurrent.IdleStrategy
import org.agrona.concurrent.SleepingIdleStrategy
/**
* INTERNAL API
@ -82,10 +86,9 @@ private[akka] object TaskRunner {
}
def createIdleStrategy(idleCpuLevel: Int): IdleStrategy = {
if (idleCpuLevel == 1) {
val maxParkMicros = 400
new BackoffIdleStrategy(100, 1, MICROSECONDS.toNanos(1), MICROSECONDS.toNanos(maxParkMicros))
} else if (idleCpuLevel == 10)
if (idleCpuLevel == 1)
new SleepingIdleStrategy(MILLISECONDS.toNanos(1))
else if (idleCpuLevel == 10)
new BusySpinIdleStrategy
else {
// spin between 100 to 10000 depending on idleCpuLevel