From 8fb7727526f4e5dedd342b53d403d21150f16d16 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 1 Jun 2016 11:56:18 +0200 Subject: [PATCH] make it possible to use external Aeron media driver, #20588 (#20653) * make it possible to use external Aeron media driver, #20588 * on my machine the MaxThroughputSpec maxed out all 8 cores completely, and when using external media driver it is much better and easier to find the actual bottlenecks * aeron.properties for external media driver --- .../remote/artery/MaxThroughputSpec.scala | 6 ++++ akka-remote/src/main/resources/reference.conf | 13 +++++++ .../scala/akka/remote/RemoteSettings.scala | 3 ++ .../akka/remote/artery/ArteryTransport.scala | 35 ++++++++++++------- .../src/test/resources/aeron.properties | 16 +++++++++ 5 files changed, 61 insertions(+), 12 deletions(-) create mode 100644 akka-remote/src/test/resources/aeron.properties diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 45282e4d6f..ade8827976 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -36,6 +36,12 @@ object MaxThroughputSpec extends MultiNodeConfig { } remote.artery { enabled = on + + # for serious measurements when running this test on only one machine + # it is recommended to use external media driver + # See akka-remote-tests/src/test/resources/aeron.properties + #advanced.embedded-media-driver = off + #advanced.aeron-dir = "target/aeron" } } """))) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index ea692dab5c..09c50044ef 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -105,6 +105,19 @@ akka { # stream, to pass such messages through the large message stream the selections # but must be resolved to ActorRefs first. large-message-destinations = [] + + advanced { + # Controls whether to start the Aeron media driver in the same JVM or use external + # process. Set to 'off' when using external media driver, and then also set the + # 'aeron-dir'. + embedded-media-driver = on + + # Directory used by the Aeron media driver. It's mandatory to define the 'aeron-dir' + # if using external media driver, i.e. when 'embedded-media-driver = off'. + # Embedded media driver will use a this directory, or a temporary directory if this + # property is not defined (empty). + aeron-dir = "" + } } ### General settings diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 346671a457..a92f44424b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -26,6 +26,9 @@ final class RemoteSettings(val config: Config) { case "" ⇒ InetAddress.getLocalHost.getHostName case other ⇒ other } + val EmbeddedMediaDriver = getBoolean("akka.remote.artery.advanced.embedded-media-driver") + val AeronDirectoryName = getString("akka.remote.artery.advanced.aeron-dir") requiring (dir ⇒ + EmbeddedMediaDriver || dir.nonEmpty, "aeron-dir must be defined when using external media driver") val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index fb6e7aa5dc..91d0651f1c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -222,7 +222,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var materializer: Materializer = _ @volatile private[this] var controlSubject: ControlMessageSubject = _ @volatile private[this] var messageDispatcher: MessageDispatcher = _ - @volatile private[this] var driver: MediaDriver = _ + @volatile private[this] var mediaDriver: Option[MediaDriver] = None @volatile private[this] var aeron: Aeron = _ @volatile private[this] var aeronErrorLogTask: Cancellable = _ @@ -298,13 +298,23 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } private def startMediaDriver(): Unit = { - // TODO also support external media driver - val driverContext = new MediaDriver.Context - // FIXME settings from config - driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(20)) - driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(20)) - driverContext.driverTimeoutMs(SECONDS.toNanos(20)) - driver = MediaDriver.launchEmbedded(driverContext) + if (remoteSettings.EmbeddedMediaDriver) { + val driverContext = new MediaDriver.Context + if (remoteSettings.AeronDirectoryName.nonEmpty) + driverContext.aeronDirectoryName(remoteSettings.AeronDirectoryName) + // FIXME settings from config + driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(20)) + driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(20)) + driverContext.driverTimeoutMs(SECONDS.toNanos(20)) + val driver = MediaDriver.launchEmbedded(driverContext) + log.debug("Started embedded media driver in directory [{}]", driver.aeronDirectoryName) + mediaDriver = Some(driver) + } + } + + private def aeronDir: String = mediaDriver match { + case Some(driver) ⇒ driver.aeronDirectoryName + case None ⇒ remoteSettings.AeronDirectoryName } private def startAeron(): Unit = { @@ -336,12 +346,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } }) - ctx.aeronDirectoryName(driver.aeronDirectoryName) + ctx.aeronDirectoryName(aeronDir) aeron = Aeron.connect(ctx) } private def startAeronErrorLog(): Unit = { - val errorLog = new AeronErrorLog(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE)) + val errorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE)) val lastTimestamp = new AtomicLong(0L) import system.dispatcher // FIXME perhaps use another dispatcher for this aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) { @@ -436,9 +446,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R if (taskRunner != null) taskRunner.stop() if (aeronErrorLogTask != null) aeronErrorLogTask.cancel() if (aeron != null) aeron.close() - if (driver != null) { + mediaDriver.foreach { driver ⇒ + // this is only for embedded media driver driver.close() - // FIXME only delete files for embedded media driver, and it should also be configurable + // FIXME it should also be configurable to not delete dir IoUtil.delete(new File(driver.aeronDirectoryName), true) } Future.successful(Done) diff --git a/akka-remote/src/test/resources/aeron.properties b/akka-remote/src/test/resources/aeron.properties new file mode 100644 index 0000000000..db195e1075 --- /dev/null +++ b/akka-remote/src/test/resources/aeron.properties @@ -0,0 +1,16 @@ +# External Aeron Media Driver using this properties file (loaded as classpath resource) +# can be run with: +# sbt "akka-remote/test:runMain io.aeron.driver.MediaDriver aeron.properties" + +aeron.mtu.length=16384 +aeron.socket.so_sndbuf=2097152 +aeron.socket.so_rcvbuf=2097152 +aeron.rcv.buffer.length=16384 +aeron.rcv.initial.window.length=2097152 +agrona.disable.bounds.checks=true + +aeron.threading.mode=SHARED_NETWORK + +# use same director in akka.remote.artery.advanced.aeron-dir config +# of the Akka application +aeron.dir=target/aeron