make cpu vs latency configurable, #20625
* the actual default values will be measured and tuned later
This commit is contained in:
parent
39f6a9dcf3
commit
3eceb241e1
12 changed files with 75 additions and 17 deletions
|
|
@ -62,8 +62,9 @@ abstract class AeronStreamConsistencySpec
|
|||
Aeron.connect(ctx)
|
||||
}
|
||||
|
||||
val idleCpuLevel = system.settings.config.getInt("akka.remote.artery.advanced.idle-cpu-level")
|
||||
val taskRunner = {
|
||||
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem])
|
||||
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel)
|
||||
r.start()
|
||||
r
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,10 @@ object AeronStreamLatencySpec extends MultiNodeConfig {
|
|||
serialize-creators = false
|
||||
serialize-messages = false
|
||||
}
|
||||
remote.artery.enabled = off
|
||||
remote.artery {
|
||||
enabled = off
|
||||
advanced.idle-cpu-level=8
|
||||
}
|
||||
}
|
||||
""")))
|
||||
|
||||
|
|
@ -91,8 +94,9 @@ abstract class AeronStreamLatencySpec
|
|||
Aeron.connect(ctx)
|
||||
}
|
||||
|
||||
val idleCpuLevel = system.settings.config.getInt("akka.remote.artery.advanced.idle-cpu-level")
|
||||
val taskRunner = {
|
||||
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem])
|
||||
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel)
|
||||
r.start()
|
||||
r
|
||||
}
|
||||
|
|
|
|||
|
|
@ -95,8 +95,9 @@ abstract class AeronStreamMaxThroughputSpec
|
|||
Aeron.connect(ctx)
|
||||
}
|
||||
|
||||
val idleCpuLevel = system.settings.config.getInt("akka.remote.artery.advanced.idle-cpu-level")
|
||||
val taskRunner = {
|
||||
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem])
|
||||
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel)
|
||||
r.start()
|
||||
r
|
||||
}
|
||||
|
|
|
|||
|
|
@ -53,6 +53,7 @@ object LatencySpec extends MultiNodeConfig {
|
|||
}
|
||||
remote.artery {
|
||||
enabled = on
|
||||
advanced.idle-cpu-level=8
|
||||
}
|
||||
}
|
||||
""")))
|
||||
|
|
|
|||
|
|
@ -126,6 +126,14 @@ akka {
|
|||
# Embedded media driver will use a this directory, or a temporary directory if this
|
||||
# property is not defined (empty).
|
||||
aeron-dir = ""
|
||||
|
||||
# Level of CPU time used, on a scale between 1 and 10, during backoff/idle.
|
||||
# The tradeoff is that to have low latency more CPU time must be used to be
|
||||
# able to react quickly on incoming messages or send as fast as possible after
|
||||
# backoff backpressure.
|
||||
# Level 1 strongly prefer low CPU consumption over low latency.
|
||||
# Level 10 strongly prefer low latency over low CPU consumption.
|
||||
idle-cpu-level = 5
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,8 @@ final class RemoteSettings(val config: Config) {
|
|||
val AeronDirectoryName = getString("akka.remote.artery.advanced.aeron-dir") requiring (dir ⇒
|
||||
EmbeddedMediaDriver || dir.nonEmpty, "aeron-dir must be defined when using external media driver")
|
||||
val TestMode: Boolean = getBoolean("akka.remote.artery.advanced.test-mode")
|
||||
val IdleCpuLevel: Int = getInt("akka.remote.artery.advanced.idle-cpu-level").requiring(level ⇒
|
||||
1 <= level && level <= 10, "idle-cpu-level must be between 1 and 10")
|
||||
|
||||
val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages")
|
||||
|
||||
|
|
|
|||
|
|
@ -77,8 +77,7 @@ class AeronSink(
|
|||
taskRunner: TaskRunner,
|
||||
pool: EnvelopeBufferPool,
|
||||
giveUpSendAfter: Duration,
|
||||
flightRecorder: EventSink
|
||||
)
|
||||
flightRecorder: EventSink)
|
||||
extends GraphStageWithMaterializedValue[SinkShape[EnvelopeBuffer], Future[Done]] {
|
||||
import AeronSink._
|
||||
import TaskRunner._
|
||||
|
|
@ -96,6 +95,7 @@ class AeronSink(
|
|||
|
||||
private var completedValue: Try[Done] = Success(Done)
|
||||
|
||||
// FIXME measure and adjust with IdleCpuLevel
|
||||
private val spinning = 1000
|
||||
private var backoffCount = spinning
|
||||
private var lastMsgSize = 0
|
||||
|
|
|
|||
|
|
@ -67,8 +67,7 @@ class AeronSource(
|
|||
aeron: Aeron,
|
||||
taskRunner: TaskRunner,
|
||||
pool: EnvelopeBufferPool,
|
||||
flightRecorder: EventSink
|
||||
)
|
||||
flightRecorder: EventSink)
|
||||
extends GraphStage[SourceShape[EnvelopeBuffer]] {
|
||||
import AeronSource._
|
||||
import TaskRunner._
|
||||
|
|
@ -81,6 +80,7 @@ class AeronSource(
|
|||
new GraphStageLogic(shape) with OutHandler {
|
||||
|
||||
private val sub = aeron.addSubscription(channel, streamId)
|
||||
// FIXME measure and adjust with IdleCpuLevel
|
||||
private val spinning = 1000
|
||||
private val yielding = 0
|
||||
private val parking = 0
|
||||
|
|
|
|||
|
|
@ -73,6 +73,9 @@ import scala.collection.JavaConverters._
|
|||
import akka.stream.ActorMaterializerSettings
|
||||
import scala.annotation.tailrec
|
||||
import akka.util.OptionVal
|
||||
import io.aeron.driver.ThreadingMode
|
||||
import org.agrona.concurrent.BackoffIdleStrategy
|
||||
import org.agrona.concurrent.BusySpinIdleStrategy
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -362,7 +365,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
private val controlStreamId = 1
|
||||
private val ordinaryStreamId = 3
|
||||
private val largeStreamId = 4
|
||||
private val taskRunner = new TaskRunner(system)
|
||||
private val taskRunner = new TaskRunner(system, remoteSettings.IdleCpuLevel)
|
||||
|
||||
private val restartTimeout: FiniteDuration = 5.seconds // FIXME config
|
||||
private val maxRestarts = 5 // FIXME config
|
||||
|
|
@ -431,9 +434,27 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
|||
if (remoteSettings.AeronDirectoryName.nonEmpty)
|
||||
driverContext.aeronDirectoryName(remoteSettings.AeronDirectoryName)
|
||||
// FIXME settings from config
|
||||
driverContext.conductorIdleStrategy()
|
||||
driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(20))
|
||||
driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(20))
|
||||
driverContext.driverTimeoutMs(SECONDS.toNanos(20))
|
||||
|
||||
if (remoteSettings.IdleCpuLevel == 10) {
|
||||
driverContext
|
||||
.threadingMode(ThreadingMode.DEDICATED)
|
||||
.conductorIdleStrategy(new BackoffIdleStrategy(1, 1, 1, 1))
|
||||
.receiverIdleStrategy(new BusySpinIdleStrategy)
|
||||
.senderIdleStrategy(new BusySpinIdleStrategy);
|
||||
} else if (remoteSettings.IdleCpuLevel == 1) {
|
||||
driverContext
|
||||
.threadingMode(ThreadingMode.SHARED)
|
||||
//FIXME measure: .sharedIdleStrategy(new BackoffIdleStrategy(20, 50, 1, 200))
|
||||
} else if (remoteSettings.IdleCpuLevel <= 5) {
|
||||
driverContext
|
||||
.threadingMode(ThreadingMode.SHARED_NETWORK)
|
||||
//FIXME measure: .sharedNetworkIdleStrategy(new BackoffIdleStrategy(20, 50, 1, 20 * (11 - remoteSettings.IdleCpuLevel)))
|
||||
}
|
||||
|
||||
val driver = MediaDriver.launchEmbedded(driverContext)
|
||||
log.debug("Started embedded media driver in directory [{}]", driver.aeronDirectoryName)
|
||||
topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName().getBytes("US-ASCII"))
|
||||
|
|
|
|||
|
|
@ -11,6 +11,9 @@ import akka.event.Logging
|
|||
import org.agrona.concurrent.BackoffIdleStrategy
|
||||
import scala.annotation.tailrec
|
||||
import scala.reflect.ClassTag
|
||||
import org.agrona.concurrent.IdleStrategy
|
||||
import org.agrona.concurrent.BusySpinIdleStrategy
|
||||
import akka.dispatch.MonitorableThreadFactory
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -82,7 +85,7 @@ private[akka] object TaskRunner {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class TaskRunner(system: ExtendedActorSystem) extends Runnable {
|
||||
private[akka] class TaskRunner(system: ExtendedActorSystem, idleCpuLevel: Int) extends Runnable {
|
||||
import TaskRunner._
|
||||
|
||||
private val log = Logging(system, getClass)
|
||||
|
|
@ -91,14 +94,29 @@ private[akka] class TaskRunner(system: ExtendedActorSystem) extends Runnable {
|
|||
private[this] val tasks = new ArrayBag[Task]
|
||||
|
||||
// TODO the backoff strategy should be measured and tuned
|
||||
private val spinning = 2000000
|
||||
private val yielding = 0
|
||||
private val idleStrategy = new BackoffIdleStrategy(
|
||||
spinning, yielding, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(100))
|
||||
private val idleStrategy: IdleStrategy = {
|
||||
if (idleCpuLevel == 1) {
|
||||
val maxParkMicros = 400
|
||||
new BackoffIdleStrategy(1, 1, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(maxParkMicros))
|
||||
} else if (idleCpuLevel == 10)
|
||||
new BusySpinIdleStrategy
|
||||
else {
|
||||
val spinning = 100000 * idleCpuLevel
|
||||
val yielding = 2 * idleCpuLevel
|
||||
val maxParkMicros = 40 * (11 - idleCpuLevel)
|
||||
new BackoffIdleStrategy(
|
||||
spinning, yielding, TimeUnit.MICROSECONDS.toNanos(1), TimeUnit.MICROSECONDS.toNanos(maxParkMicros))
|
||||
}
|
||||
}
|
||||
private var reset = false
|
||||
|
||||
def start(): Unit = {
|
||||
val thread = system.threadFactory.newThread(this)
|
||||
val tf = system.threadFactory match {
|
||||
case m: MonitorableThreadFactory ⇒
|
||||
m.withName(m.name + "-taskrunner")
|
||||
case other ⇒ other
|
||||
}
|
||||
val thread = tf.newThread(this)
|
||||
thread.start()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,8 +32,9 @@ class AeronSinkSpec extends AkkaSpec with ImplicitSender {
|
|||
Aeron.connect(ctx)
|
||||
}
|
||||
|
||||
val idleCpuLevel = 5
|
||||
val taskRunner = {
|
||||
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem])
|
||||
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel)
|
||||
r.start()
|
||||
r
|
||||
}
|
||||
|
|
|
|||
|
|
@ -78,8 +78,9 @@ object AeronStreamsApp {
|
|||
lazy val system = ActorSystem("AeronStreams")
|
||||
lazy implicit val mat = ActorMaterializer()(system)
|
||||
|
||||
val idleCpuLevel = 5
|
||||
lazy val taskRunner = {
|
||||
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem])
|
||||
val r = new TaskRunner(system.asInstanceOf[ExtendedActorSystem], idleCpuLevel)
|
||||
r.start()
|
||||
r
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue