Merge pull request #23768 from chbatey/artery-startup

Block ActorSystem creation until Artery is bound
This commit is contained in:
Christopher Batey 2017-10-13 14:51:23 +01:00 committed by GitHub
commit cd8ec9f981
7 changed files with 114 additions and 13 deletions

View file

@ -88,7 +88,7 @@ abstract class AeronStreamLatencySpec
val pool = new EnvelopeBufferPool(1024 * 1024, 128)
val cncByteBuffer = IoUtil.mapExistingFile(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE), "cnc");
val cncByteBuffer = IoUtil.mapExistingFile(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE), "cnc")
val stats =
new AeronStat(AeronStat.mapCounters(cncByteBuffer))
@ -106,7 +106,6 @@ abstract class AeronStreamLatencySpec
}
lazy implicit val mat = ActorMaterializer()(system)
import system.dispatcher
override def initialParticipants = roles.size

View file

@ -3,7 +3,6 @@
*/
package akka.remote.artery
import java.net.InetAddress
import java.util.concurrent.Executors
import scala.collection.AbstractIterator

View file

@ -0,0 +1,40 @@
package akka.remote.artery
import akka.actor.ActorSystem
import akka.testkit.SocketUtil
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpec }
class ArteryFailedToBindSpec extends WordSpec with Matchers {
"an ActorSystem" must {
"not start if port is taken" in {
val port = SocketUtil.temporaryLocalPort(true)
val config = ConfigFactory.parseString(
s"""
|akka {
| actor {
| provider = remote
| }
| remote {
| artery {
| enabled = on
| canonical.hostname = "127.0.0.1"
| canonical.port = $port
| log-aeron-counters = on
| }
| }
|}
""".stripMargin)
val as = ActorSystem("BindTest1", config)
try {
val ex = intercept[RuntimeException] {
ActorSystem("BindTest2", config)
}
ex.getMessage should equal("Inbound Aeron channel is in errored state. See Aeron logs for details.")
} finally {
as.terminate()
}
}
}
}

View file

@ -755,8 +755,14 @@ akka {
# "<getHostName>" InetAddress.getLocalHost.getHostName
#
hostname = ""
# Time to wait for Aeron to bind
bind-timeout = 3s
}
# Periodically log out all Aeron counters. See https://github.com/real-logic/aeron/wiki/Monitoring-and-Debugging#counters
log-aeron-counters = false
# Actor paths to use the large message stream for when a message
# is sent to them over remoting. The large message stream dedicated
# is separate from "normal" and system messages so that sending a

View file

@ -3,23 +3,17 @@
*/
package akka.remote.artery
import java.util.concurrent.TimeUnit
import scala.annotation.tailrec
import scala.concurrent.duration._
import akka.stream.Attributes
import akka.stream.Outlet
import akka.stream.SourceShape
import akka.stream.stage.AsyncCallback
import akka.stream.stage.GraphStage
import akka.stream.stage.GraphStageLogic
import akka.stream.stage.OutHandler
import io.aeron.Aeron
import io.aeron.FragmentAssembler
import io.aeron.Subscription
import io.aeron.{ Aeron, FragmentAssembler, Subscription }
import io.aeron.logbuffer.FragmentHandler
import io.aeron.logbuffer.Header
import org.agrona.DirectBuffer
import org.agrona.concurrent.BackoffIdleStrategy
import org.agrona.hints.ThreadHints
import akka.stream.stage.GraphStageWithMaterializedValue
import scala.util.control.NonFatal

View file

@ -54,6 +54,8 @@ private[akka] final class ArterySettings private (config: Config) {
case "" Canonical.Hostname
case other other
}
val BindTimeout = getDuration("bind-timeout").requiring(!_.isNegative, "bind-timeout can not be negative")
}
val LargeMessageDestinations =
@ -67,6 +69,7 @@ private[akka] final class ArterySettings private (config: Config) {
val LogReceive: Boolean = getBoolean("log-received-messages")
val LogSend: Boolean = getBoolean("log-sent-messages")
val LogAeronCounters: Boolean = config.getBoolean("log-aeron-counters")
object Advanced {
val config = getConfig("advanced")

View file

@ -60,10 +60,12 @@ import io.aeron.driver.MediaDriver
import io.aeron.driver.ThreadingMode
import io.aeron.exceptions.ConductorServiceTimeoutException
import io.aeron.exceptions.DriverTimeoutException
import org.agrona.ErrorHandler
import org.agrona.IoUtil
import org.agrona.{ DirectBuffer, ErrorHandler, IoUtil }
import org.agrona.concurrent.BackoffIdleStrategy
import akka.remote.artery.Decoder.InboundCompressionAccess
import io.aeron.driver.status.ChannelEndpointStatus
import org.agrona.collections.IntObjConsumer
import org.agrona.concurrent.status.CountersReader.MetaData
/**
* INTERNAL API
@ -304,6 +306,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private[this] val mediaDriver = new AtomicReference[Option[MediaDriver]](None)
@volatile private[this] var aeron: Aeron = _
@volatile private[this] var aeronErrorLogTask: Cancellable = _
@volatile private[this] var aeronCounterTask: Cancellable = _
@volatile private[this] var areonErrorLog: AeronErrorLog = _
override val log: LoggingAdapter = Logging(system, getClass.getName)
@ -404,7 +407,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
priorityMessageDestinations,
outboundEnvelopePool))
override def settings = provider.remoteSettings.Artery
override def settings: ArterySettings = provider.remoteSettings.Artery
override def start(): Unit = {
Runtime.getRuntime.addShutdownHook(shutdownHook)
@ -413,6 +416,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
topLevelFREvents.loFreq(Transport_AeronStarted, NoMetaData)
startAeronErrorLog()
topLevelFREvents.loFreq(Transport_AeronErrorLogStarted, NoMetaData)
if (settings.LogAeronCounters) {
startAeronCounterLog()
}
taskRunner.start()
topLevelFREvents.loFreq(Transport_TaskRunnerStarted, NoMetaData)
@ -448,6 +454,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
topLevelFREvents.loFreq(Transport_MaterializerStarted, NoMetaData)
runInboundStreams()
blockUntilChannelActive()
topLevelFREvents.loFreq(Transport_StartupFinished, NoMetaData)
log.info("Remoting started; listening on address: [{}] with UID [{}]", localAddress.address, localAddress.uid)
@ -592,6 +599,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
cause.getMessage)
taskRunner.stop()
aeronErrorLogTask.cancel()
if (settings.LogAeronCounters) aeronCounterTask.cancel()
system.terminate()
throw new AeronTerminated(cause)
}
@ -604,6 +612,44 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
aeron = Aeron.connect(ctx)
}
private def blockUntilChannelActive(): Unit = {
val counterIdForInboundChannel = findCounterId(s"rcv-channel: $inboundChannel")
val waitInterval = 200
val retries = math.max(1, settings.Bind.BindTimeout.toMillis / waitInterval)
retry(retries)
@tailrec def retry(retries: Long): Unit = {
val status = aeron.countersReader().getCounterValue(counterIdForInboundChannel)
if (status == ChannelEndpointStatus.ACTIVE) {
log.debug("Inbound channel is now active")
} else if (status == ChannelEndpointStatus.ERRORED) {
areonErrorLog.logErrors(log, 0L)
throw new RuntimeException("Inbound Aeron channel is in errored state. See Aeron logs for details.")
} else if (status == ChannelEndpointStatus.INITIALIZING && retries > 0) {
Thread.sleep(waitInterval)
retry(retries - 1)
} else {
areonErrorLog.logErrors(log, 0L)
throw new RuntimeException("Timed out waiting for Aeron transport to bind. See Aeoron logs.")
}
}
}
private def findCounterId(label: String): Int = {
var counterId = -1
aeron.countersReader().forEach(new IntObjConsumer[String] {
def accept(i: Int, l: String): Unit = {
if (label == l)
counterId = i
}
})
if (counterId == -1) {
throw new RuntimeException(s"Unable to found counterId for label: $label")
} else {
counterId
}
}
// TODO Add FR Events
private def startAeronErrorLog(): Unit = {
areonErrorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE), log)
@ -617,6 +663,20 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
}
}
private def startAeronCounterLog(): Unit = {
import system.dispatcher
aeronCounterTask = system.scheduler.schedule(5.seconds, 5.seconds) {
if (!isShutdown && log.isDebugEnabled) {
aeron.countersReader.forEach(new MetaData() {
def accept(counterId: Int, typeId: Int, keyBuffer: DirectBuffer, label: String): Unit = {
val value = aeron.countersReader().getCounterValue(counterId)
log.debug("Aeron Counter {}: {} {}]", counterId, value, label)
}
})
}
}
}
private def runInboundStreams(): Unit = {
runInboundControlStream()
runInboundOrdinaryMessagesStream()