diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index d615ce9e64..7e867b5b0e 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -88,7 +88,7 @@ abstract class AeronStreamLatencySpec val pool = new EnvelopeBufferPool(1024 * 1024, 128) - val cncByteBuffer = IoUtil.mapExistingFile(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE), "cnc"); + val cncByteBuffer = IoUtil.mapExistingFile(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE), "cnc") val stats = new AeronStat(AeronStat.mapCounters(cncByteBuffer)) @@ -106,7 +106,6 @@ abstract class AeronStreamLatencySpec } lazy implicit val mat = ActorMaterializer()(system) - import system.dispatcher override def initialParticipants = roles.size diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index 28c5c0ab2a..4032d69b12 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -3,7 +3,6 @@ */ package akka.remote.artery -import java.net.InetAddress import java.util.concurrent.Executors import scala.collection.AbstractIterator diff --git a/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala new file mode 100644 index 0000000000..0e9224dabf --- /dev/null +++ b/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala @@ -0,0 +1,40 @@ +package akka.remote.artery + +import akka.actor.ActorSystem +import akka.testkit.SocketUtil +import com.typesafe.config.ConfigFactory +import org.scalatest.{ Matchers, WordSpec } + +class ArteryFailedToBindSpec extends WordSpec with Matchers { + + "an ActorSystem" must { + "not start if port is taken" in { + val port = SocketUtil.temporaryLocalPort(true) + val config = ConfigFactory.parseString( + s""" + |akka { + | actor { + | provider = remote + | } + | remote { + | artery { + | enabled = on + | canonical.hostname = "127.0.0.1" + | canonical.port = $port + | log-aeron-counters = on + | } + | } + |} + """.stripMargin) + val as = ActorSystem("BindTest1", config) + try { + val ex = intercept[RuntimeException] { + ActorSystem("BindTest2", config) + } + ex.getMessage should equal("Inbound Aeron channel is in errored state. See Aeron logs for details.") + } finally { + as.terminate() + } + } + } +} diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 3a8dfc554c..bbaff88a73 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -755,8 +755,14 @@ akka { # "" InetAddress.getLocalHost.getHostName # hostname = "" + + # Time to wait for Aeron to bind + bind-timeout = 3s } + # Periodically log out all Aeron counters. See https://github.com/real-logic/aeron/wiki/Monitoring-and-Debugging#counters + log-aeron-counters = false + # Actor paths to use the large message stream for when a message # is sent to them over remoting. The large message stream dedicated # is separate from "normal" and system messages so that sending a diff --git a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala index f008204ea1..9868c63c47 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/AeronSource.scala @@ -3,23 +3,17 @@ */ package akka.remote.artery -import java.util.concurrent.TimeUnit import scala.annotation.tailrec -import scala.concurrent.duration._ import akka.stream.Attributes import akka.stream.Outlet import akka.stream.SourceShape import akka.stream.stage.AsyncCallback -import akka.stream.stage.GraphStage import akka.stream.stage.GraphStageLogic import akka.stream.stage.OutHandler -import io.aeron.Aeron -import io.aeron.FragmentAssembler -import io.aeron.Subscription +import io.aeron.{ Aeron, FragmentAssembler, Subscription } import io.aeron.logbuffer.FragmentHandler import io.aeron.logbuffer.Header import org.agrona.DirectBuffer -import org.agrona.concurrent.BackoffIdleStrategy import org.agrona.hints.ThreadHints import akka.stream.stage.GraphStageWithMaterializedValue import scala.util.control.NonFatal diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 5ff4b6dd3a..25d05a8494 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -54,6 +54,8 @@ private[akka] final class ArterySettings private (config: Config) { case "" ⇒ Canonical.Hostname case other ⇒ other } + + val BindTimeout = getDuration("bind-timeout").requiring(!_.isNegative, "bind-timeout can not be negative") } val LargeMessageDestinations = @@ -67,6 +69,7 @@ private[akka] final class ArterySettings private (config: Config) { val LogReceive: Boolean = getBoolean("log-received-messages") val LogSend: Boolean = getBoolean("log-sent-messages") + val LogAeronCounters: Boolean = config.getBoolean("log-aeron-counters") object Advanced { val config = getConfig("advanced") diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index c02b0ffc9c..c5dc10e485 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -60,10 +60,12 @@ import io.aeron.driver.MediaDriver import io.aeron.driver.ThreadingMode import io.aeron.exceptions.ConductorServiceTimeoutException import io.aeron.exceptions.DriverTimeoutException -import org.agrona.ErrorHandler -import org.agrona.IoUtil +import org.agrona.{ DirectBuffer, ErrorHandler, IoUtil } import org.agrona.concurrent.BackoffIdleStrategy import akka.remote.artery.Decoder.InboundCompressionAccess +import io.aeron.driver.status.ChannelEndpointStatus +import org.agrona.collections.IntObjConsumer +import org.agrona.concurrent.status.CountersReader.MetaData /** * INTERNAL API @@ -304,6 +306,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private[this] val mediaDriver = new AtomicReference[Option[MediaDriver]](None) @volatile private[this] var aeron: Aeron = _ @volatile private[this] var aeronErrorLogTask: Cancellable = _ + @volatile private[this] var aeronCounterTask: Cancellable = _ @volatile private[this] var areonErrorLog: AeronErrorLog = _ override val log: LoggingAdapter = Logging(system, getClass.getName) @@ -404,7 +407,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R priorityMessageDestinations, outboundEnvelopePool)) - override def settings = provider.remoteSettings.Artery + override def settings: ArterySettings = provider.remoteSettings.Artery override def start(): Unit = { Runtime.getRuntime.addShutdownHook(shutdownHook) @@ -413,6 +416,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R topLevelFREvents.loFreq(Transport_AeronStarted, NoMetaData) startAeronErrorLog() topLevelFREvents.loFreq(Transport_AeronErrorLogStarted, NoMetaData) + if (settings.LogAeronCounters) { + startAeronCounterLog() + } taskRunner.start() topLevelFREvents.loFreq(Transport_TaskRunnerStarted, NoMetaData) @@ -448,6 +454,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R topLevelFREvents.loFreq(Transport_MaterializerStarted, NoMetaData) runInboundStreams() + blockUntilChannelActive() topLevelFREvents.loFreq(Transport_StartupFinished, NoMetaData) log.info("Remoting started; listening on address: [{}] with UID [{}]", localAddress.address, localAddress.uid) @@ -592,6 +599,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R cause.getMessage) taskRunner.stop() aeronErrorLogTask.cancel() + if (settings.LogAeronCounters) aeronCounterTask.cancel() system.terminate() throw new AeronTerminated(cause) } @@ -604,6 +612,44 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R aeron = Aeron.connect(ctx) } + private def blockUntilChannelActive(): Unit = { + val counterIdForInboundChannel = findCounterId(s"rcv-channel: $inboundChannel") + val waitInterval = 200 + val retries = math.max(1, settings.Bind.BindTimeout.toMillis / waitInterval) + retry(retries) + + @tailrec def retry(retries: Long): Unit = { + val status = aeron.countersReader().getCounterValue(counterIdForInboundChannel) + if (status == ChannelEndpointStatus.ACTIVE) { + log.debug("Inbound channel is now active") + } else if (status == ChannelEndpointStatus.ERRORED) { + areonErrorLog.logErrors(log, 0L) + throw new RuntimeException("Inbound Aeron channel is in errored state. See Aeron logs for details.") + } else if (status == ChannelEndpointStatus.INITIALIZING && retries > 0) { + Thread.sleep(waitInterval) + retry(retries - 1) + } else { + areonErrorLog.logErrors(log, 0L) + throw new RuntimeException("Timed out waiting for Aeron transport to bind. See Aeoron logs.") + } + } + } + + private def findCounterId(label: String): Int = { + var counterId = -1 + aeron.countersReader().forEach(new IntObjConsumer[String] { + def accept(i: Int, l: String): Unit = { + if (label == l) + counterId = i + } + }) + if (counterId == -1) { + throw new RuntimeException(s"Unable to found counterId for label: $label") + } else { + counterId + } + } + // TODO Add FR Events private def startAeronErrorLog(): Unit = { areonErrorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE), log) @@ -617,6 +663,20 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R } } + private def startAeronCounterLog(): Unit = { + import system.dispatcher + aeronCounterTask = system.scheduler.schedule(5.seconds, 5.seconds) { + if (!isShutdown && log.isDebugEnabled) { + aeron.countersReader.forEach(new MetaData() { + def accept(counterId: Int, typeId: Int, keyBuffer: DirectBuffer, label: String): Unit = { + val value = aeron.countersReader().getCounterValue(counterId) + log.debug("Aeron Counter {}: {} {}]", counterId, value, label) + } + }) + } + } + } + private def runInboundStreams(): Unit = { runInboundControlStream() runInboundOrdinaryMessagesStream()