diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala index bdd82c4061..3521768011 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamConcistencySpec.scala @@ -62,8 +62,9 @@ abstract class AeronStreamConsistencySpec Aeron.connect(ctx) } + val idleCpuLevel = system.settings.config.getInt("akka.remote.artery.advanced.idle-cpu-level") val taskRunner = { - val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem]) + val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel) r.start() r } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 8c9c504fe5..9dcf1c7167 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -52,7 +52,10 @@ object AeronStreamLatencySpec extends MultiNodeConfig { serialize-creators = false serialize-messages = false } - remote.artery.enabled = off + remote.artery { + enabled = off + advanced.idle-cpu-level=8 + } } """))) @@ -91,8 +94,9 @@ abstract class AeronStreamLatencySpec Aeron.connect(ctx) } + val idleCpuLevel = system.settings.config.getInt("akka.remote.artery.advanced.idle-cpu-level") val taskRunner = { - val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem]) + val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel) r.start() r } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index eca9bf7af4..27689d59d3 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -95,8 +95,9 @@ abstract class AeronStreamMaxThroughputSpec Aeron.connect(ctx) } + val idleCpuLevel = system.settings.config.getInt("akka.remote.artery.advanced.idle-cpu-level") val taskRunner = { - val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem]) + val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel) r.start() r } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index 1ee34ee518..a6030f71e1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -53,6 +53,7 @@ object LatencySpec extends MultiNodeConfig { } remote.artery { enabled = on + advanced.idle-cpu-level=8 } } """))) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 3c88823a5e..7ed83e7c06 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -126,6 +126,14 @@ akka { # Embedded media driver will use a this directory, or a temporary directory if this # property is not defined (empty). aeron-dir = "" + + # Level of CPU time used, on a scale between 1 and 10, during backoff/idle. + # The tradeoff is that to have low latency more CPU time must be used to be + # able to react quickly on incoming messages or send as fast as possible after + # backoff backpressure. + # Level 1 strongly prefer low CPU consumption over low latency. + # Level 10 strongly prefer low latency over low CPU consumption. + idle-cpu-level = 5 } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 3eb9efdb80..9493dcb7df 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -30,6 +30,8 @@ final class RemoteSettings(val config: Config) { val AeronDirectoryName = getString("akka.remote.artery.advanced.aeron-dir") requiring (dir ⇒ EmbeddedMediaDriver || dir.nonEmpty, "aeron-dir must be defined when using external media driver") val TestMode: Boolean = getBoolean("akka.remote.artery.advanced.test-mode") + val IdleCpuLevel: Int = getInt("akka.remote.artery.advanced.idle-cpu-level").requiring(level ⇒ + 1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10") val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala index ee7adaa1e2..a580e39834 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSink.scala @@ -77,8 +77,7 @@ class AeronSink( taskRunner: TaskRunner, pool: EnvelopeBufferPool, giveUpSendAfter: Duration, - flightRecorder: EventSink -) + flightRecorder: EventSink) extends GraphStageWithMaterializedValue[SinkShape[EnvelopeBuffer], Future[Done]] { import AeronSink._ import TaskRunner._ @@ -96,6 +95,7 @@ class AeronSink( private var completedValue: Try[Done] = Success(Done) + // FIXME measure and adjust with IdleCpuLevel private val spinning = 1000 private var backoffCount = spinning private var lastMsgSize = 0 diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index 4f2947bc5e..93f1dbbe23 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -67,8 +67,7 @@ class AeronSource( aeron: Aeron, taskRunner: TaskRunner, pool: EnvelopeBufferPool, - flightRecorder: EventSink -) + flightRecorder: EventSink) extends GraphStage[SourceShape[EnvelopeBuffer]] { import AeronSource._ import TaskRunner._ @@ -81,6 +80,7 @@ class AeronSource( new GraphStageLogic(shape) with OutHandler { private val sub = aeron.addSubscription(channel, streamId) + // FIXME measure and adjust with IdleCpuLevel private val spinning = 1000 private val yielding = 0 private val parking = 0 diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index fdeafef12d..a58f88ab2d 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -73,6 +73,9 @@ import scala.collection.JavaConverters._ import akka.stream.ActorMaterializerSettings import scala.annotation.tailrec import akka.util.OptionVal +import io.aeron.driver.ThreadingMode +import org.agrona.concurrent.BackoffIdleStrategy +import org.agrona.concurrent.BusySpinIdleStrategy /** * INTERNAL API @@ -362,7 +365,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private val controlStreamId = 1 private val ordinaryStreamId = 3 private val largeStreamId = 4 - private val taskRunner = new TaskRunner(system) + private val taskRunner = new TaskRunner(system, remoteSettings.IdleCpuLevel) private val restartTimeout: FiniteDuration = 5.seconds // FIXME config private val maxRestarts = 5 // FIXME config @@ -431,9 +434,27 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R if (remoteSettings.AeronDirectoryName.nonEmpty) driverContext.aeronDirectoryName(remoteSettings.AeronDirectoryName) // FIXME settings from config + driverContext.conductorIdleStrategy() driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(20)) driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(20)) driverContext.driverTimeoutMs(SECONDS.toNanos(20)) + + if (remoteSettings.IdleCpuLevel == 10) { + driverContext + .threadingMode(ThreadingMode.DEDICATED) + .conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1)) + .receiverIdleStrategy(new BusySpinIdleStrategy) + .senderIdleStrategy(new BusySpinIdleStrategy); + } else if (remoteSettings.IdleCpuLevel == 1) { + driverContext + .threadingMode(ThreadingMode.SHARED) + //FIXME measure: .sharedIdleStrategy(new BackoffIdleStrategy(20, 50, 1, 200)) + } else if (remoteSettings.IdleCpuLevel <= 5) { + driverContext + .threadingMode(ThreadingMode.SHARED_NETWORK) + //FIXME measure: .sharedNetworkIdleStrategy(new BackoffIdleStrategy(20, 50, 1, 20 * (11 - remoteSettings.IdleCpuLevel))) + } + val driver = MediaDriver.launchEmbedded(driverContext) log.debug("Started embedded media driver in directory [{}]", driver.aeronDirectoryName) topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName().getBytes("US-ASCII")) diff --git a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala b/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala index 307399d23c..6778260759 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala @@ -11,6 +11,9 @@ import akka.event.Logging import org.agrona.concurrent.BackoffIdleStrategy import scala.annotation.tailrec import scala.reflect.ClassTag +import org.agrona.concurrent.IdleStrategy +import org.agrona.concurrent.BusySpinIdleStrategy +import akka.dispatch.MonitorableThreadFactory /** * INTERNAL API @@ -82,7 +85,7 @@ private[akka] object TaskRunner { /** * INTERNAL API */ -private[akka] class TaskRunner(system: ExtendedActorSystem) extends Runnable { +private[akka] class TaskRunner(system: ExtendedActorSystem, idleCpuLevel: Int) extends Runnable { import TaskRunner._ private val log = Logging(system, getClass) @@ -91,14 +94,29 @@ private[akka] class TaskRunner(system: ExtendedActorSystem) extends Runnable { private[this] val tasks = new ArrayBag[Task] // TODO the backoff strategy should be measured and tuned - private val spinning = 2000000 - private val yielding = 0 - private val idleStrategy = new BackoffIdleStrategy( - spinning, yielding, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100)) + private val idleStrategy: IdleStrategy = { + if (idleCpuLevel == 1) { + val maxParkMicros = 400 + new BackoffIdleStrategy(1, 1, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(maxParkMicros)) + } else if (idleCpuLevel == 10) + new BusySpinIdleStrategy + else { + val spinning = 100000 * idleCpuLevel + val yielding = 2 * idleCpuLevel + val maxParkMicros = 40 * (11 - idleCpuLevel) + new BackoffIdleStrategy( + spinning, yielding, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(maxParkMicros)) + } + } private var reset = false def start(): Unit = { - val thread = system.threadFactory.newThread(this) + val tf = system.threadFactory match { + case m: MonitorableThreadFactory ⇒ + m.withName(m.name + "-taskrunner") + case other ⇒ other + } + val thread = tf.newThread(this) thread.start() } diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala index 0500299727..368537ee55 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronSinkSpec.scala @@ -32,8 +32,9 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender { Aeron.connect(ctx) } + val idleCpuLevel = 5 val taskRunner = { - val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem]) + val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel) r.start() r } diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala index c9f94b99ce..6a8bf49089 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala @@ -78,8 +78,9 @@ object AeronStreamsApp { lazy val system = ActorSystem("AeronStreams") lazy implicit val mat = ActorMaterializer()(system) + val idleCpuLevel = 5 lazy val taskRunner = { - val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem]) + val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel) r.start() r }