handle fatal Aeron exceptions, #20561

* handle ConductorServiceTimeoutException and
  DriverTimeoutException
* shutdown things properly, in the right order, and
  without overwhelming the logs with exceptions
This commit is contained in:
Patrik Nordwall 2016-09-06 14:32:42 +02:00
parent 85be571af7
commit f4b82ce62b
4 changed files with 130 additions and 98 deletions

View file

@ -70,6 +70,8 @@ import org.agrona.IoUtil
import org.agrona.concurrent.BackoffIdleStrategy
import akka.stream.scaladsl.BroadcastHub
import scala.util.control.NoStackTrace
import io.aeron.exceptions.DriverTimeoutException
import java.util.concurrent.atomic.AtomicBoolean
/**
* INTERNAL API
@ -245,21 +247,27 @@ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDurati
override def preStart(): Unit = {
// FIXME shall we also try to flush the ordinary message stream, not only control stream?
val msg = ActorSystemTerminating(inboundContext.localAddress)
associations.foreach { a a.send(msg, OptionVal.Some(self), OptionVal.None) }
try {
associations.foreach { a a.send(msg, OptionVal.Some(self), OptionVal.None) }
} catch {
case NonFatal(e)
// send may throw
done.tryFailure(e)
throw e
}
}
override def postStop(): Unit =
override def postStop(): Unit = {
timeoutTask.cancel()
done.trySuccess(Done)
}
def receive = {
case ActorSystemTerminatingAck(from)
remaining -= from
if (remaining.isEmpty) {
done.trySuccess(Done)
if (remaining.isEmpty)
context.stop(self)
}
case FlushOnShutdown.Timeout
done.trySuccess(Done)
context.stop(self)
}
}
@ -271,6 +279,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
extends RemoteTransport(_system, _provider) with InboundContext {
import FlightRecorderEvents._
import ArteryTransport.ShutdownSignal
import ArteryTransport.AeronTerminated
// these vars are initialized once in the start method
@volatile private[this] var _localAddress: UniqueAddress = _
@ -474,6 +483,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private def startAeron(): Unit = {
val ctx = new Aeron.Context
ctx.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis)
ctx.availableImageHandler(new AvailableImageHandler {
override def onAvailableImage(img: Image): Unit = {
if (log.isDebugEnabled)
@ -487,17 +498,37 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
// FIXME we should call FragmentAssembler.freeSessionBuffer when image is unavailable
}
})
ctx.errorHandler(new ErrorHandler {
private val fatalErrorOccured = new AtomicBoolean
override def onError(cause: Throwable): Unit = {
cause match {
case e: ConductorServiceTimeoutException
// Timeout between service calls
log.error(cause, s"Aeron ServiceTimeoutException, ${cause.getMessage}")
case e: ConductorServiceTimeoutException handleFatalError(e)
case e: DriverTimeoutException handleFatalError(e)
case _: AeronTerminated // already handled, via handleFatalError
case _
log.error(cause, s"Aeron error, ${cause.getMessage}")
}
}
private def handleFatalError(cause: Throwable): Unit = {
if (fatalErrorOccured.compareAndSet(false, true)) {
if (!isShutdown) {
log.error(cause, "Fatal Aeron error {}. Have to terminate ActorSystem because it lost contact with the " +
"{} Aeron media driver. Possible configuration properties to mitigate the problem are " +
"'client-liveness-timeout' or 'driver-timeout'. {}",
Logging.simpleName(cause),
if (settings.Advanced.EmbeddedMediaDriver) "embedded" else "external",
cause.getMessage)
taskRunner.stop()
aeronErrorLogTask.cancel()
system.terminate()
throw new AeronTerminated(cause)
}
} else
throw new AeronTerminated(cause)
}
})
ctx.aeronDirectoryName(aeronDir)
@ -713,10 +744,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
_ streamsCompleted
} yield {
topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData)
if (taskRunner != null) {
taskRunner.stop()
topLevelFREvents.loFreq(Transport_Stopped, NoMetaData)
}
taskRunner.stop()
topLevelFREvents.loFreq(Transport_Stopped, NoMetaData)
if (aeronErrorLogTask != null) {
aeronErrorLogTask.cancel()
topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData)
@ -965,12 +995,11 @@ private[remote] object ArteryTransport {
val Version = 0
/**
* Internal API
*
* @return A port that is hopefully available
*/
private[remote] def autoSelectPort(hostname: String): Int = {
class AeronTerminated(e: Throwable) extends RuntimeException(e)
object ShutdownSignal extends RuntimeException with NoStackTrace
def autoSelectPort(hostname: String): Int = {
val socket = DatagramChannel.open().socket()
socket.bind(new InetSocketAddress(hostname, 0))
val port = socket.getLocalPort
@ -978,6 +1007,4 @@ private[remote] object ArteryTransport {
port
}
object ShutdownSignal extends RuntimeException with NoStackTrace
}

View file

@ -82,7 +82,7 @@ object AkkaBuild extends Build {
protobuf,
remote,
remoteTests,
// samples,
// samples, // FIXME temporary in artery-dev branch
slf4j,
stream,
streamTestkit,
@ -98,13 +98,14 @@ object AkkaBuild extends Build {
aggregate = aggregatedProjects
).settings(rootSettings: _*)
lazy val akkaScalaNightly = Project(
id = "akka-scala-nightly",
base = file("akka-scala-nightly"),
// remove dependencies that we have to build ourselves (Scala STM)
// samples don't work with dbuild right now
aggregate = aggregatedProjects diff List(agent, docs, samples)
).disablePlugins(ValidatePullRequest, MimaPlugin)
// FIXME temporary in artery-dev branch
// lazy val akkaScalaNightly = Project(
// id = "akka-scala-nightly",
// base = file("akka-scala-nightly"),
// // remove dependencies that we have to build ourselves (Scala STM)
// // samples don't work with dbuild right now
// aggregate = aggregatedProjects diff List(agent, docs, samples)
// ).disablePlugins(ValidatePullRequest, MimaPlugin)
lazy val actor = Project(
id = "akka-actor",
@ -371,68 +372,69 @@ object AkkaBuild extends Build {
lazy val samplesSettings = parentSettings ++ ActivatorDist.settings
lazy val samples = Project(
id = "akka-samples",
base = file("akka-samples"),
// FIXME osgiDiningHakkersSampleMavenTest temporarily removed from aggregate due to #16703
aggregate = if (!Sample.CliOptions.aggregateSamples) Nil else
Seq(sampleCamelJava, sampleCamelScala, sampleClusterJava, sampleClusterScala, sampleFsmScala, sampleFsmJavaLambda,
sampleMainJava, sampleMainScala, sampleMainJavaLambda, sampleMultiNodeScala,
samplePersistenceJava, samplePersistenceScala, samplePersistenceJavaLambda,
sampleRemoteJava, sampleRemoteScala, sampleSupervisionJavaLambda,
sampleDistributedDataScala, sampleDistributedDataJava)
)
.settings(samplesSettings: _*)
.disablePlugins(MimaPlugin)
lazy val sampleCamelJava = Sample.project("akka-sample-camel-java")
lazy val sampleCamelScala = Sample.project("akka-sample-camel-scala")
lazy val sampleClusterJava = Sample.project("akka-sample-cluster-java")
lazy val sampleClusterScala = Sample.project("akka-sample-cluster-scala")
lazy val sampleFsmScala = Sample.project("akka-sample-fsm-scala")
lazy val sampleFsmJavaLambda = Sample.project("akka-sample-fsm-java-lambda")
lazy val sampleMainJava = Sample.project("akka-sample-main-java")
lazy val sampleMainScala = Sample.project("akka-sample-main-scala")
lazy val sampleMainJavaLambda = Sample.project("akka-sample-main-java-lambda")
lazy val sampleMultiNodeScala = Sample.project("akka-sample-multi-node-scala")
lazy val samplePersistenceJava = Sample.project("akka-sample-persistence-java")
lazy val samplePersistenceScala = Sample.project("akka-sample-persistence-scala")
lazy val samplePersistenceJavaLambda = Sample.project("akka-sample-persistence-java-lambda")
lazy val sampleRemoteJava = Sample.project("akka-sample-remote-java")
lazy val sampleRemoteScala = Sample.project("akka-sample-remote-scala")
lazy val sampleSupervisionJavaLambda = Sample.project("akka-sample-supervision-java-lambda")
lazy val sampleDistributedDataScala = Sample.project("akka-sample-distributed-data-scala")
lazy val sampleDistributedDataJava = Sample.project("akka-sample-distributed-data-java")
lazy val osgiDiningHakkersSampleMavenTest = Project(
id = "akka-sample-osgi-dining-hakkers-maven-test",
base = file("akka-samples/akka-sample-osgi-dining-hakkers-maven-test")
)
.settings(
publishArtifact := false,
// force publication of artifacts to local maven repo, so latest versions can be used when running maven tests
compile in Compile <<=
(publishM2 in actor, publishM2 in testkit, publishM2 in remote, publishM2 in cluster, publishM2 in osgi,
publishM2 in slf4j, publishM2 in persistence, compile in Compile) map
((_, _, _, _, _, _, _, c) => c),
test in Test ~= { x => {
def executeMvnCommands(failureMessage: String, commands: String*) = {
if ({List("sh", "-c", commands.mkString("cd akka-samples/akka-sample-osgi-dining-hakkers; mvn ", " ", "")) !} != 0)
throw new Exception(failureMessage)
}
executeMvnCommands("Osgi sample Dining hakkers test failed", "clean", "install")
}}
)
.disablePlugins(ValidatePullRequest, MimaPlugin)
.settings(dontPublishSettings: _*)
// FIXME temporary in artery-dev branch
// lazy val samples = Project(
// id = "akka-samples",
// base = file("akka-samples"),
// // FIXME osgiDiningHakkersSampleMavenTest temporarily removed from aggregate due to #16703
// aggregate = if (!Sample.CliOptions.aggregateSamples) Nil else
// Seq(sampleCamelJava, sampleCamelScala, sampleClusterJava, sampleClusterScala, sampleFsmScala, sampleFsmJavaLambda,
// sampleMainJava, sampleMainScala, sampleMainJavaLambda, sampleMultiNodeScala,
// samplePersistenceJava, samplePersistenceScala, samplePersistenceJavaLambda,
// sampleRemoteJava, sampleRemoteScala, sampleSupervisionJavaLambda,
// sampleDistributedDataScala, sampleDistributedDataJava)
// )
// .settings(samplesSettings: _*)
// .disablePlugins(MimaPlugin)
//
// lazy val sampleCamelJava = Sample.project("akka-sample-camel-java")
// lazy val sampleCamelScala = Sample.project("akka-sample-camel-scala")
//
// lazy val sampleClusterJava = Sample.project("akka-sample-cluster-java")
// lazy val sampleClusterScala = Sample.project("akka-sample-cluster-scala")
//
// lazy val sampleFsmScala = Sample.project("akka-sample-fsm-scala")
// lazy val sampleFsmJavaLambda = Sample.project("akka-sample-fsm-java-lambda")
//
// lazy val sampleMainJava = Sample.project("akka-sample-main-java")
// lazy val sampleMainScala = Sample.project("akka-sample-main-scala")
// lazy val sampleMainJavaLambda = Sample.project("akka-sample-main-java-lambda")
//
// lazy val sampleMultiNodeScala = Sample.project("akka-sample-multi-node-scala")
//
// lazy val samplePersistenceJava = Sample.project("akka-sample-persistence-java")
// lazy val samplePersistenceScala = Sample.project("akka-sample-persistence-scala")
// lazy val samplePersistenceJavaLambda = Sample.project("akka-sample-persistence-java-lambda")
//
// lazy val sampleRemoteJava = Sample.project("akka-sample-remote-java")
// lazy val sampleRemoteScala = Sample.project("akka-sample-remote-scala")
//
// lazy val sampleSupervisionJavaLambda = Sample.project("akka-sample-supervision-java-lambda")
//
// lazy val sampleDistributedDataScala = Sample.project("akka-sample-distributed-data-scala")
// lazy val sampleDistributedDataJava = Sample.project("akka-sample-distributed-data-java")
//
// lazy val osgiDiningHakkersSampleMavenTest = Project(
// id = "akka-sample-osgi-dining-hakkers-maven-test",
// base = file("akka-samples/akka-sample-osgi-dining-hakkers-maven-test")
// )
// .settings(
// publishArtifact := false,
// // force publication of artifacts to local maven repo, so latest versions can be used when running maven tests
// compile in Compile <<=
// (publishM2 in actor, publishM2 in testkit, publishM2 in remote, publishM2 in cluster, publishM2 in osgi,
// publishM2 in slf4j, publishM2 in persistence, compile in Compile) map
// ((_, _, _, _, _, _, _, c) => c),
// test in Test ~= { x => {
// def executeMvnCommands(failureMessage: String, commands: String*) = {
// if ({List("sh", "-c", commands.mkString("cd akka-samples/akka-sample-osgi-dining-hakkers; mvn ", " ", "")) !} != 0)
// throw new Exception(failureMessage)
// }
// executeMvnCommands("Osgi sample Dining hakkers test failed", "clean", "install")
// }}
// )
// .disablePlugins(ValidatePullRequest, MimaPlugin)
// .settings(dontPublishSettings: _*)
val dontPublishSettings = Seq(
publishSigned := (),

View file

@ -118,9 +118,10 @@ object UnidocRoot extends AutoPlugin {
))
}
// FIXME temporary removal of samples in artery-dev branch
override lazy val projectSettings =
CliOptions.genjavadocEnabled.ifTrue(scalaJavaUnidocSettings).getOrElse(scalaUnidocSettings) ++
settings(Seq(AkkaBuild.samples), Seq(AkkaBuild.remoteTests, AkkaBuild.benchJmh, AkkaBuild.parsing, AkkaBuild.protobuf, AkkaBuild.osgiDiningHakkersSampleMavenTest, AkkaBuild.akkaScalaNightly))
settings(Seq(), Seq(AkkaBuild.remoteTests, AkkaBuild.benchJmh, AkkaBuild.parsing, AkkaBuild.protobuf))
}
/**

View file

@ -33,7 +33,8 @@ object Release {
val (state2, Seq(api, japi)) = extracted.runTask(unidoc in Compile, state1)
val (state3, docs) = extracted.runTask(generate in Sphinx, state2)
val (state4, _) = extracted.runTask(Dist.dist, state3)
val (state5, activatorDist) = extracted.runTask(ActivatorDist.activatorDist in LocalProject(AkkaBuild.samples.id), state4)
// FIXME temporary in artery-dev branch
// val (state5, activatorDist) = extracted.runTask(ActivatorDist.activatorDist in LocalProject(AkkaBuild.samples.id), state4)
IO.delete(release)
IO.createDirectory(release)
@ -47,10 +48,11 @@ object Release {
for (f <- (dist * "akka_*.zip").get)
IO.copyFile(f, release / "downloads" / f.name)
for (f <- (activatorDist * "*.zip").get)
IO.copyFile(f, release / "downloads" / f.name)
state5
// FIXME temporary in artery-dev branch
// for (f <- (activatorDist * "*.zip").get)
// IO.copyFile(f, release / "downloads" / f.name)
// state5
state4
}
def uploadReleaseCommand = Command.command("uploadRelease") { state =>