From 835125de3d250fabbbf3596f1b3c92047e23d8dd Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 14 Sep 2016 09:57:44 +0200 Subject: [PATCH] make cluster.StressSpec pass with Artery, #21458 * need to use a shared media driver to get the cpu usage at a reasonable level * also changed to SleepingIdleStrategy(1 ms) when cpu-level=1 not needed for the test to pass, but can be good to make level 1 more extreme --- .../akka/cluster/MultiNodeClusterSpec.scala | 6 +- .../cluster/SharedMediaDriverSupport.scala | 105 +++++ .../scala/akka/cluster/StressSpec.scala | 415 +++++++++--------- .../akka/remote/testkit/MultiNodeSpec.scala | 2 +- .../akka/remote/artery/ArteryTransport.scala | 2 +- .../scala/akka/remote/artery/TaskRunner.scala | 25 +- 6 files changed, 344 insertions(+), 211 deletions(-) create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/SharedMediaDriverSupport.scala diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index ed5ac063ff..a819090154 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -83,12 +83,14 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro private val cachedAddresses = new ConcurrentHashMap[RoleName, Address] - override def atStartup(): Unit = { + override protected def atStartup(): Unit = { startCoroner() muteLog() + self.atStartup() } - override def afterTermination(): Unit = { + override protected def afterTermination(): Unit = { + self.afterTermination() stopCoroner() } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/SharedMediaDriverSupport.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/SharedMediaDriverSupport.scala new file mode 100644 index 0000000000..925bfbc4d5 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/SharedMediaDriverSupport.scala @@ -0,0 +1,105 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.cluster + +import java.io.File +import java.util.concurrent.atomic.AtomicReference +import java.util.function.Consumer + +import scala.annotation.tailrec +import scala.util.control.NonFatal + +import akka.remote.RemoteSettings +import akka.remote.artery.ArterySettings +import akka.remote.artery.TaskRunner +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import com.typesafe.config.ConfigFactory +import io.aeron.driver.MediaDriver +import io.aeron.driver.ThreadingMode +import org.agrona.IoUtil + +object SharedMediaDriverSupport { + + private val mediaDriver = new AtomicReference[Option[MediaDriver]](None) + + def loadArterySettings(config: MultiNodeConfig): ArterySettings = + (new RemoteSettings(ConfigFactory.load(config.config))).Artery + + def startMediaDriver(config: MultiNodeConfig): Unit = { + val arterySettings = loadArterySettings(config) + if (arterySettings.Enabled) { + val aeronDir = arterySettings.Advanced.AeronDirectoryName + require(aeronDir.nonEmpty, "aeron-dir must be defined") + val driverContext = new MediaDriver.Context + driverContext.aeronDirectoryName(aeronDir) + driverContext.clientLivenessTimeoutNs(arterySettings.Advanced.ClientLivenessTimeout.toNanos) + driverContext.imageLivenessTimeoutNs(arterySettings.Advanced.ImageLivenessTimeoutNs.toNanos) + driverContext.driverTimeoutMs(arterySettings.Advanced.DriverTimeout.toMillis) + + val idleCpuLevel = arterySettings.Advanced.IdleCpuLevel + driverContext + .threadingMode(ThreadingMode.SHARED) + .sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel)) + + // Check if the media driver is already started by another multi-node jvm. + // It checks more than one time with a sleep inbetween. The number of checks + // depends on the multi-node index (i). + @tailrec def isDriverInactive(i: Int): Boolean = { + if (i < 0) true + else { + val active = driverContext.isDriverActive(5000, new Consumer[String] { + override def accept(msg: String): Unit = { + println(msg) + } + }) + if (active) false + else { + Thread.sleep(500) + isDriverInactive(i - 1) + } + } + } + + try { + if (isDriverInactive(MultiNodeSpec.selfIndex)) { + val driver = MediaDriver.launchEmbedded(driverContext) + println(s"Started media driver in directory [${driver.aeronDirectoryName}]") + if (!mediaDriver.compareAndSet(None, Some(driver))) { + throw new IllegalStateException("media driver started more than once") + } + } + } catch { + case NonFatal(e) ⇒ + println(s"Failed to start media driver in [${aeronDir}]: ${e.getMessage}") + } + } + } + + def isMediaDriverRunningByThisNode: Boolean = mediaDriver.get.isDefined + + def stopMediaDriver(config: MultiNodeConfig): Unit = { + val maybeDriver = mediaDriver.getAndSet(None) + maybeDriver.foreach { driver ⇒ + val arterySettings = loadArterySettings(config) + + // let other nodes shutdown first + Thread.sleep(5000) + + driver.close() + + try { + if (arterySettings.Advanced.DeleteAeronDirectory) { + IoUtil.delete(new File(driver.aeronDirectoryName), false) + } + } catch { + case NonFatal(e) ⇒ + println( + s"Couldn't delete Aeron embedded media driver files in [${driver.aeronDirectoryName}] " + + s"due to [${e.getMessage}]") + } + } + } + +} diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index 47d2fff641..1cbe452bcf 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -45,6 +45,7 @@ import akka.actor.ActorIdentity import akka.util.Helpers.ConfigOps import akka.util.Helpers.Requiring import java.lang.management.ManagementFactory +import akka.remote.RARP /** * This test is intended to be used as long running stress test @@ -134,6 +135,12 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig { akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = off + akka.remote.artery.advanced { + idle-cpu-level = 1 + embedded-media-driver = off + aeron-dir = "target/aeron-StressSpec" + } + akka.actor.default-dispatcher.fork-join-executor { parallelism-min = 8 parallelism-max = 8 @@ -699,8 +706,11 @@ class StressMultiJvmNode12 extends StressSpec class StressMultiJvmNode13 extends StressSpec abstract class StressSpec - extends MultiNodeSpec(StressMultiJvmSpec) - with MultiNodeClusterSpec with BeforeAndAfterEach with ImplicitSender { + extends MultiNodeSpec({ + // Aeron media driver must be started before ActorSystem + SharedMediaDriverSupport.startMediaDriver(StressMultiJvmSpec) + StressMultiJvmSpec + }) with MultiNodeClusterSpec with BeforeAndAfterEach with ImplicitSender { import StressMultiJvmSpec._ import ClusterEvent._ @@ -726,6 +736,20 @@ abstract class StressSpec classOf[StatsResult], classOf[PhiResult], RetryTick.getClass)(sys) } + override protected def afterTermination(): Unit = { + SharedMediaDriverSupport.stopMediaDriver(StressMultiJvmSpec) + super.afterTermination() + } + + Runtime.getRuntime.addShutdownHook(new Thread { + override def run(): Unit = { + if (SharedMediaDriverSupport.isMediaDriverRunningByThisNode) + println("Abrupt exit of JVM without closing media driver. This should not happen and may cause test failure.") + } + }) + + def isArteryEnabled: Boolean = RARP(system).provider.remoteSettings.Artery.Enabled + def jvmInfo(): String = { val runtime = ManagementFactory.getRuntimeMXBean val os = ManagementFactory.getOperatingSystemMXBean @@ -1129,200 +1153,199 @@ abstract class StressSpec "A cluster under stress" must { - "TODO work with artery" in (pending) - // "log settings" taggedAs LongRunningTest in { - // if (infolog) { - // log.info("StressSpec JVM:\n{}", jvmInfo) - // runOn(roles.head) { - // log.info("StressSpec settings:\n{}", settings) - // } - // } - // enterBarrier("after-" + step) - // } - // - // "join seed nodes" taggedAs LongRunningTest in within(30 seconds) { - // - // val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially) - // val size = seedNodes.size + otherNodesJoiningSeedNodes.size - // - // createResultAggregator("join seed nodes", expectedResults = size, includeInHistory = true) - // - // runOn((seedNodes ++ otherNodesJoiningSeedNodes): _*) { - // reportResult { - // cluster.joinSeedNodes(seedNodes.toIndexedSeq map address) - // awaitMembersUp(size, timeout = remainingOrDefault) - // } - // } - // - // awaitClusterResult() - // - // nbrUsedRoles += size - // enterBarrier("after-" + step) - // } - // - // "start routers that are running while nodes are joining" taggedAs LongRunningTest in { - // runOn(roles.take(3): _*) { - // system.actorOf( - // Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), - // name = masterName) ! Begin - // } - // } - // - // "join nodes one-by-one to small cluster" taggedAs LongRunningTest in { - // joinOneByOne(numberOfNodesJoiningOneByOneSmall) - // enterBarrier("after-" + step) - // } - // - // "join several nodes to one node" taggedAs LongRunningTest in { - // joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = false) - // nbrUsedRoles += numberOfNodesJoiningToOneNode - // enterBarrier("after-" + step) - // } - // - // "join several nodes to seed nodes" taggedAs LongRunningTest in { - // if (numberOfNodesJoiningToSeedNodes > 0) { - // joinSeveral(numberOfNodesJoiningToSeedNodes, toSeedNodes = true) - // nbrUsedRoles += numberOfNodesJoiningToSeedNodes - // } - // enterBarrier("after-" + step) - // } - // - // "join nodes one-by-one to large cluster" taggedAs LongRunningTest in { - // joinOneByOne(numberOfNodesJoiningOneByOneLarge) - // enterBarrier("after-" + step) - // } - // - // "end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) { - // if (exerciseActors) { - // runOn(roles.take(3): _*) { - // master match { - // case Some(m) ⇒ - // m.tell(End, testActor) - // val workResult = awaitWorkResult(m) - // workResult.retryCount should ===(0) - // workResult.sendCount should be > (0L) - // workResult.ackCount should be > (0L) - // case None ⇒ fail("master not running") - // } - // } - // } - // enterBarrier("after-" + step) - // } - // - // "use routers with normal throughput" taggedAs LongRunningTest in { - // if (exerciseActors) { - // exerciseRouters("use routers with normal throughput", normalThroughputDuration, - // batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false) - // } - // enterBarrier("after-" + step) - // } - // - // "use routers with high throughput" taggedAs LongRunningTest in { - // if (exerciseActors) { - // exerciseRouters("use routers with high throughput", highThroughputDuration, - // batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false) - // } - // enterBarrier("after-" + step) - // } - // - // "use many actors with normal throughput" taggedAs LongRunningTest in { - // if (exerciseActors) { - // exerciseRouters("use many actors with normal throughput", normalThroughputDuration, - // batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true) - // } - // enterBarrier("after-" + step) - // } - // - // "use many actors with high throughput" taggedAs LongRunningTest in { - // if (exerciseActors) { - // exerciseRouters("use many actors with high throughput", highThroughputDuration, - // batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true) - // } - // enterBarrier("after-" + step) - // } - // - // "exercise join/remove/join/remove" taggedAs LongRunningTest in { - // exerciseJoinRemove("exercise join/remove", joinRemoveDuration) - // enterBarrier("after-" + step) - // } - // - // "exercise supervision" taggedAs LongRunningTest in { - // if (exerciseActors) { - // exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration) - // } - // enterBarrier("after-" + step) - // } - // - // "gossip when idle" taggedAs LongRunningTest in { - // idleGossip("idle gossip") - // enterBarrier("after-" + step) - // } - // - // "start routers that are running while nodes are removed" taggedAs LongRunningTest in { - // if (exerciseActors) { - // runOn(roles.take(3): _*) { - // system.actorOf( - // Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), - // name = masterName) ! Begin - // } - // } - // enterBarrier("after-" + step) - // } - // - // "leave nodes one-by-one from large cluster" taggedAs LongRunningTest in { - // removeOneByOne(numberOfNodesLeavingOneByOneLarge, shutdown = false) - // enterBarrier("after-" + step) - // } - // - // "shutdown nodes one-by-one from large cluster" taggedAs LongRunningTest in { - // removeOneByOne(numberOfNodesShutdownOneByOneLarge, shutdown = true) - // enterBarrier("after-" + step) - // } - // - // "leave several nodes" taggedAs LongRunningTest in { - // removeSeveral(numberOfNodesLeaving, shutdown = false) - // nbrUsedRoles -= numberOfNodesLeaving - // enterBarrier("after-" + step) - // } - // - // "shutdown several nodes" taggedAs LongRunningTest in { - // removeSeveral(numberOfNodesShutdown, shutdown = true) - // nbrUsedRoles -= numberOfNodesShutdown - // enterBarrier("after-" + step) - // } - // - // "shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in { - // removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true) - // enterBarrier("after-" + step) - // } - // - // "leave nodes one-by-one from small cluster" taggedAs LongRunningTest in { - // removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false) - // enterBarrier("after-" + step) - // } - // - // "end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) { - // if (exerciseActors) { - // runOn(roles.take(3): _*) { - // master match { - // case Some(m) ⇒ - // m.tell(End, testActor) - // val workResult = awaitWorkResult(m) - // workResult.sendCount should be > (0L) - // workResult.ackCount should be > (0L) - // case None ⇒ fail("master not running") - // } - // } - // } - // enterBarrier("after-" + step) - // } - // - // "log jvm info" taggedAs LongRunningTest in { - // if (infolog) { - // log.info("StressSpec JVM:\n{}", jvmInfo) - // } - // enterBarrier("after-" + step) - // } + "log settings" taggedAs LongRunningTest in { + if (infolog) { + log.info("StressSpec JVM:\n{}", jvmInfo) + runOn(roles.head) { + log.info("StressSpec settings:\n{}", settings) + } + } + enterBarrier("after-" + step) + } + + "join seed nodes" taggedAs LongRunningTest in within(30 seconds) { + + val otherNodesJoiningSeedNodes = roles.slice(numberOfSeedNodes, numberOfSeedNodes + numberOfNodesJoiningToSeedNodesInitially) + val size = seedNodes.size + otherNodesJoiningSeedNodes.size + + createResultAggregator("join seed nodes", expectedResults = size, includeInHistory = true) + + runOn((seedNodes ++ otherNodesJoiningSeedNodes): _*) { + reportResult { + cluster.joinSeedNodes(seedNodes.toIndexedSeq map address) + awaitMembersUp(size, timeout = remainingOrDefault) + } + } + + awaitClusterResult() + + nbrUsedRoles += size + enterBarrier("after-" + step) + } + + "start routers that are running while nodes are joining" taggedAs LongRunningTest in { + runOn(roles.take(3): _*) { + system.actorOf( + Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), + name = masterName) ! Begin + } + } + + "join nodes one-by-one to small cluster" taggedAs LongRunningTest in { + joinOneByOne(numberOfNodesJoiningOneByOneSmall) + enterBarrier("after-" + step) + } + + "join several nodes to one node" taggedAs LongRunningTest in { + joinSeveral(numberOfNodesJoiningToOneNode, toSeedNodes = false) + nbrUsedRoles += numberOfNodesJoiningToOneNode + enterBarrier("after-" + step) + } + + "join several nodes to seed nodes" taggedAs LongRunningTest in { + if (numberOfNodesJoiningToSeedNodes > 0) { + joinSeveral(numberOfNodesJoiningToSeedNodes, toSeedNodes = true) + nbrUsedRoles += numberOfNodesJoiningToSeedNodes + } + enterBarrier("after-" + step) + } + + "join nodes one-by-one to large cluster" taggedAs LongRunningTest in { + joinOneByOne(numberOfNodesJoiningOneByOneLarge) + enterBarrier("after-" + step) + } + + "end routers that are running while nodes are joining" taggedAs LongRunningTest in within(30.seconds) { + if (exerciseActors) { + runOn(roles.take(3): _*) { + master match { + case Some(m) ⇒ + m.tell(End, testActor) + val workResult = awaitWorkResult(m) + workResult.retryCount should ===(0) + workResult.sendCount should be > (0L) + workResult.ackCount should be > (0L) + case None ⇒ fail("master not running") + } + } + } + enterBarrier("after-" + step) + } + + "use routers with normal throughput" taggedAs LongRunningTest in { + if (exerciseActors) { + exerciseRouters("use routers with normal throughput", normalThroughputDuration, + batchInterval = workBatchInterval, expectDroppedMessages = false, tree = false) + } + enterBarrier("after-" + step) + } + + "use routers with high throughput" taggedAs LongRunningTest in { + if (exerciseActors) { + exerciseRouters("use routers with high throughput", highThroughputDuration, + batchInterval = Duration.Zero, expectDroppedMessages = false, tree = false) + } + enterBarrier("after-" + step) + } + + "use many actors with normal throughput" taggedAs LongRunningTest in { + if (exerciseActors) { + exerciseRouters("use many actors with normal throughput", normalThroughputDuration, + batchInterval = workBatchInterval, expectDroppedMessages = false, tree = true) + } + enterBarrier("after-" + step) + } + + "use many actors with high throughput" taggedAs LongRunningTest in { + if (exerciseActors) { + exerciseRouters("use many actors with high throughput", highThroughputDuration, + batchInterval = Duration.Zero, expectDroppedMessages = false, tree = true) + } + enterBarrier("after-" + step) + } + + "exercise join/remove/join/remove" taggedAs LongRunningTest in { + exerciseJoinRemove("exercise join/remove", joinRemoveDuration) + enterBarrier("after-" + step) + } + + "exercise supervision" taggedAs LongRunningTest in { + if (exerciseActors) { + exerciseSupervision("exercise supervision", supervisionDuration, supervisionOneIteration) + } + enterBarrier("after-" + step) + } + + "gossip when idle" taggedAs LongRunningTest in { + idleGossip("idle gossip") + enterBarrier("after-" + step) + } + + "start routers that are running while nodes are removed" taggedAs LongRunningTest in { + if (exerciseActors) { + runOn(roles.take(3): _*) { + system.actorOf( + Props(classOf[Master], settings, settings.workBatchInterval, false).withDeploy(Deploy.local), + name = masterName) ! Begin + } + } + enterBarrier("after-" + step) + } + + "leave nodes one-by-one from large cluster" taggedAs LongRunningTest in { + removeOneByOne(numberOfNodesLeavingOneByOneLarge, shutdown = false) + enterBarrier("after-" + step) + } + + "shutdown nodes one-by-one from large cluster" taggedAs LongRunningTest in { + removeOneByOne(numberOfNodesShutdownOneByOneLarge, shutdown = true) + enterBarrier("after-" + step) + } + + "leave several nodes" taggedAs LongRunningTest in { + removeSeveral(numberOfNodesLeaving, shutdown = false) + nbrUsedRoles -= numberOfNodesLeaving + enterBarrier("after-" + step) + } + + "shutdown several nodes" taggedAs LongRunningTest in { + removeSeveral(numberOfNodesShutdown, shutdown = true) + nbrUsedRoles -= numberOfNodesShutdown + enterBarrier("after-" + step) + } + + "shutdown nodes one-by-one from small cluster" taggedAs LongRunningTest in { + removeOneByOne(numberOfNodesShutdownOneByOneSmall, shutdown = true) + enterBarrier("after-" + step) + } + + "leave nodes one-by-one from small cluster" taggedAs LongRunningTest in { + removeOneByOne(numberOfNodesLeavingOneByOneSmall, shutdown = false) + enterBarrier("after-" + step) + } + + "end routers that are running while nodes are removed" taggedAs LongRunningTest in within(30.seconds) { + if (exerciseActors) { + runOn(roles.take(3): _*) { + master match { + case Some(m) ⇒ + m.tell(End, testActor) + val workResult = awaitWorkResult(m) + workResult.sendCount should be > (0L) + workResult.ackCount should be > (0L) + case None ⇒ fail("master not running") + } + } + } + enterBarrier("after-" + step) + } + + "log jvm info" taggedAs LongRunningTest in { + if (infolog) { + log.info("StressSpec JVM:\n{}", jvmInfo) + } + enterBarrier("after-" + step) + } } } diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index 2ba21c2207..079de2f5f1 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -97,7 +97,7 @@ abstract class MultiNodeConfig { _roles(MultiNodeSpec.selfIndex) } - private[testkit] def config: Config = { + private[akka] def config: Config = { val transportConfig = if (_testTransport) ConfigFactory.parseString( """ diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 4e6ce2efa0..d9be34a281 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -462,7 +462,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } val driver = MediaDriver.launchEmbedded(driverContext) - log.debug("Started embedded media driver in directory [{}]", driver.aeronDirectoryName) + log.info("Started embedded media driver in directory [{}]", driver.aeronDirectoryName) topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName().getBytes("US-ASCII")) Runtime.getRuntime.addShutdownHook(stopMediaDriverShutdownHook) if (!mediaDriver.compareAndSet(None, Some(driver))) { diff --git a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala b/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala index a8e1d67750..635c7ee517 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala @@ -4,16 +4,20 @@ package akka.remote.artery import java.util.concurrent.TimeUnit.MICROSECONDS -import scala.util.control.NonFatal -import akka.actor.ExtendedActorSystem -import akka.dispatch.AbstractNodeQueue -import akka.event.Logging -import org.agrona.concurrent.BackoffIdleStrategy +import java.util.concurrent.TimeUnit.MILLISECONDS + import scala.annotation.tailrec import scala.reflect.ClassTag -import org.agrona.concurrent.IdleStrategy -import org.agrona.concurrent.BusySpinIdleStrategy +import scala.util.control.NonFatal + +import akka.actor.ExtendedActorSystem +import akka.dispatch.AbstractNodeQueue import akka.dispatch.MonitorableThreadFactory +import akka.event.Logging +import org.agrona.concurrent.BackoffIdleStrategy +import org.agrona.concurrent.BusySpinIdleStrategy +import org.agrona.concurrent.IdleStrategy +import org.agrona.concurrent.SleepingIdleStrategy /** * INTERNAL API @@ -82,10 +86,9 @@ private[akka] object TaskRunner { } def createIdleStrategy(idleCpuLevel: Int): IdleStrategy = { - if (idleCpuLevel == 1) { - val maxParkMicros = 400 - new BackoffIdleStrategy(100, 1, MICROSECONDS.toNanos(1), MICROSECONDS.toNanos(maxParkMicros)) - } else if (idleCpuLevel == 10) + if (idleCpuLevel == 1) + new SleepingIdleStrategy(MILLISECONDS.toNanos(1)) + else if (idleCpuLevel == 10) new BusySpinIdleStrategy else { // spin between 100 to 10000 depending on idleCpuLevel