From f4b82ce62be9da96defb104a51cfa69e895bf019 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 6 Sep 2016 14:32:42 +0200 Subject: [PATCH] handle fatal Aeron exceptions, #20561 * handle ConductorServiceTimeoutException and DriverTimeoutException * shutdown things properly, in the right order, and without overwhelming the logs with exceptions --- .../akka/remote/artery/ArteryTransport.scala | 71 ++++++--- project/AkkaBuild.scala | 142 +++++++++--------- project/Doc.scala | 3 +- project/Release.scala | 12 +- 4 files changed, 130 insertions(+), 98 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index fb9957a401..41c7c74332 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -70,6 +70,8 @@ import org.agrona.IoUtil import org.agrona.concurrent.BackoffIdleStrategy import akka.stream.scaladsl.BroadcastHub import scala.util.control.NoStackTrace +import io.aeron.exceptions.DriverTimeoutException +import java.util.concurrent.atomic.AtomicBoolean /** * INTERNAL API @@ -245,21 +247,27 @@ private[remote] class FlushOnShutdown(done: Promise[Done], timeout: FiniteDurati override def preStart(): Unit = { // FIXME shall we also try to flush the ordinary message stream, not only control stream? val msg = ActorSystemTerminating(inboundContext.localAddress) - associations.foreach { a ⇒ a.send(msg, OptionVal.Some(self), OptionVal.None) } + try { + associations.foreach { a ⇒ a.send(msg, OptionVal.Some(self), OptionVal.None) } + } catch { + case NonFatal(e) ⇒ + // send may throw + done.tryFailure(e) + throw e + } } - override def postStop(): Unit = + override def postStop(): Unit = { timeoutTask.cancel() + done.trySuccess(Done) + } def receive = { case ActorSystemTerminatingAck(from) ⇒ remaining -= from - if (remaining.isEmpty) { - done.trySuccess(Done) + if (remaining.isEmpty) context.stop(self) - } case FlushOnShutdown.Timeout ⇒ - done.trySuccess(Done) context.stop(self) } } @@ -271,6 +279,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R extends RemoteTransport(_system, _provider) with InboundContext { import FlightRecorderEvents._ import ArteryTransport.ShutdownSignal + import ArteryTransport.AeronTerminated // these vars are initialized once in the start method @volatile private[this] var _localAddress: UniqueAddress = _ @@ -474,6 +483,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private def startAeron(): Unit = { val ctx = new Aeron.Context + ctx.driverTimeoutMs(settings.Advanced.DriverTimeout.toMillis) + ctx.availableImageHandler(new AvailableImageHandler { override def onAvailableImage(img: Image): Unit = { if (log.isDebugEnabled) @@ -487,17 +498,37 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // FIXME we should call FragmentAssembler.freeSessionBuffer when image is unavailable } }) + ctx.errorHandler(new ErrorHandler { + private val fatalErrorOccured = new AtomicBoolean + override def onError(cause: Throwable): Unit = { cause match { - case e: ConductorServiceTimeoutException ⇒ - // Timeout between service calls - log.error(cause, s"Aeron ServiceTimeoutException, ${cause.getMessage}") - + case e: ConductorServiceTimeoutException ⇒ handleFatalError(e) + case e: DriverTimeoutException ⇒ handleFatalError(e) + case _: AeronTerminated ⇒ // already handled, via handleFatalError case _ ⇒ log.error(cause, s"Aeron error, ${cause.getMessage}") } } + + private def handleFatalError(cause: Throwable): Unit = { + if (fatalErrorOccured.compareAndSet(false, true)) { + if (!isShutdown) { + log.error(cause, "Fatal Aeron error {}. Have to terminate ActorSystem because it lost contact with the " + + "{} Aeron media driver. Possible configuration properties to mitigate the problem are " + + "'client-liveness-timeout' or 'driver-timeout'. {}", + Logging.simpleName(cause), + if (settings.Advanced.EmbeddedMediaDriver) "embedded" else "external", + cause.getMessage) + taskRunner.stop() + aeronErrorLogTask.cancel() + system.terminate() + throw new AeronTerminated(cause) + } + } else + throw new AeronTerminated(cause) + } }) ctx.aeronDirectoryName(aeronDir) @@ -713,10 +744,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R _ ← streamsCompleted } yield { topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData) - if (taskRunner != null) { - taskRunner.stop() - topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) - } + taskRunner.stop() + topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) + if (aeronErrorLogTask != null) { aeronErrorLogTask.cancel() topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) @@ -965,12 +995,11 @@ private[remote] object ArteryTransport { val Version = 0 - /** - * Internal API - * - * @return A port that is hopefully available - */ - private[remote] def autoSelectPort(hostname: String): Int = { + class AeronTerminated(e: Throwable) extends RuntimeException(e) + + object ShutdownSignal extends RuntimeException with NoStackTrace + + def autoSelectPort(hostname: String): Int = { val socket = DatagramChannel.open().socket() socket.bind(new InetSocketAddress(hostname, 0)) val port = socket.getLocalPort @@ -978,6 +1007,4 @@ private[remote] object ArteryTransport { port } - object ShutdownSignal extends RuntimeException with NoStackTrace - } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index e5453761fb..220bdbdd06 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -82,7 +82,7 @@ object AkkaBuild extends Build { protobuf, remote, remoteTests, -// samples, +// samples, // FIXME temporary in artery-dev branch slf4j, stream, streamTestkit, @@ -98,13 +98,14 @@ object AkkaBuild extends Build { aggregate = aggregatedProjects ).settings(rootSettings: _*) - lazy val akkaScalaNightly = Project( - id = "akka-scala-nightly", - base = file("akka-scala-nightly"), - // remove dependencies that we have to build ourselves (Scala STM) - // samples don't work with dbuild right now - aggregate = aggregatedProjects diff List(agent, docs, samples) - ).disablePlugins(ValidatePullRequest, MimaPlugin) +// FIXME temporary in artery-dev branch +// lazy val akkaScalaNightly = Project( +// id = "akka-scala-nightly", +// base = file("akka-scala-nightly"), +// // remove dependencies that we have to build ourselves (Scala STM) +// // samples don't work with dbuild right now +// aggregate = aggregatedProjects diff List(agent, docs, samples) +// ).disablePlugins(ValidatePullRequest, MimaPlugin) lazy val actor = Project( id = "akka-actor", @@ -371,68 +372,69 @@ object AkkaBuild extends Build { lazy val samplesSettings = parentSettings ++ ActivatorDist.settings - lazy val samples = Project( - id = "akka-samples", - base = file("akka-samples"), - // FIXME osgiDiningHakkersSampleMavenTest temporarily removed from aggregate due to #16703 - aggregate = if (!Sample.CliOptions.aggregateSamples) Nil else - Seq(sampleCamelJava, sampleCamelScala, sampleClusterJava, sampleClusterScala, sampleFsmScala, sampleFsmJavaLambda, - sampleMainJava, sampleMainScala, sampleMainJavaLambda, sampleMultiNodeScala, - samplePersistenceJava, samplePersistenceScala, samplePersistenceJavaLambda, - sampleRemoteJava, sampleRemoteScala, sampleSupervisionJavaLambda, - sampleDistributedDataScala, sampleDistributedDataJava) - ) - .settings(samplesSettings: _*) - .disablePlugins(MimaPlugin) - - lazy val sampleCamelJava = Sample.project("akka-sample-camel-java") - lazy val sampleCamelScala = Sample.project("akka-sample-camel-scala") - - lazy val sampleClusterJava = Sample.project("akka-sample-cluster-java") - lazy val sampleClusterScala = Sample.project("akka-sample-cluster-scala") - - lazy val sampleFsmScala = Sample.project("akka-sample-fsm-scala") - lazy val sampleFsmJavaLambda = Sample.project("akka-sample-fsm-java-lambda") - - lazy val sampleMainJava = Sample.project("akka-sample-main-java") - lazy val sampleMainScala = Sample.project("akka-sample-main-scala") - lazy val sampleMainJavaLambda = Sample.project("akka-sample-main-java-lambda") - - lazy val sampleMultiNodeScala = Sample.project("akka-sample-multi-node-scala") - - lazy val samplePersistenceJava = Sample.project("akka-sample-persistence-java") - lazy val samplePersistenceScala = Sample.project("akka-sample-persistence-scala") - lazy val samplePersistenceJavaLambda = Sample.project("akka-sample-persistence-java-lambda") - - lazy val sampleRemoteJava = Sample.project("akka-sample-remote-java") - lazy val sampleRemoteScala = Sample.project("akka-sample-remote-scala") - - lazy val sampleSupervisionJavaLambda = Sample.project("akka-sample-supervision-java-lambda") - - lazy val sampleDistributedDataScala = Sample.project("akka-sample-distributed-data-scala") - lazy val sampleDistributedDataJava = Sample.project("akka-sample-distributed-data-java") - - lazy val osgiDiningHakkersSampleMavenTest = Project( - id = "akka-sample-osgi-dining-hakkers-maven-test", - base = file("akka-samples/akka-sample-osgi-dining-hakkers-maven-test") - ) - .settings( - publishArtifact := false, - // force publication of artifacts to local maven repo, so latest versions can be used when running maven tests - compile in Compile <<= - (publishM2 in actor, publishM2 in testkit, publishM2 in remote, publishM2 in cluster, publishM2 in osgi, - publishM2 in slf4j, publishM2 in persistence, compile in Compile) map - ((_, _, _, _, _, _, _, c) => c), - test in Test ~= { x => { - def executeMvnCommands(failureMessage: String, commands: String*) = { - if ({List("sh", "-c", commands.mkString("cd akka-samples/akka-sample-osgi-dining-hakkers; mvn ", " ", "")) !} != 0) - throw new Exception(failureMessage) - } - executeMvnCommands("Osgi sample Dining hakkers test failed", "clean", "install") - }} - ) - .disablePlugins(ValidatePullRequest, MimaPlugin) - .settings(dontPublishSettings: _*) +// FIXME temporary in artery-dev branch +// lazy val samples = Project( +// id = "akka-samples", +// base = file("akka-samples"), +// // FIXME osgiDiningHakkersSampleMavenTest temporarily removed from aggregate due to #16703 +// aggregate = if (!Sample.CliOptions.aggregateSamples) Nil else +// Seq(sampleCamelJava, sampleCamelScala, sampleClusterJava, sampleClusterScala, sampleFsmScala, sampleFsmJavaLambda, +// sampleMainJava, sampleMainScala, sampleMainJavaLambda, sampleMultiNodeScala, +// samplePersistenceJava, samplePersistenceScala, samplePersistenceJavaLambda, +// sampleRemoteJava, sampleRemoteScala, sampleSupervisionJavaLambda, +// sampleDistributedDataScala, sampleDistributedDataJava) +// ) +// .settings(samplesSettings: _*) +// .disablePlugins(MimaPlugin) +// +// lazy val sampleCamelJava = Sample.project("akka-sample-camel-java") +// lazy val sampleCamelScala = Sample.project("akka-sample-camel-scala") +// +// lazy val sampleClusterJava = Sample.project("akka-sample-cluster-java") +// lazy val sampleClusterScala = Sample.project("akka-sample-cluster-scala") +// +// lazy val sampleFsmScala = Sample.project("akka-sample-fsm-scala") +// lazy val sampleFsmJavaLambda = Sample.project("akka-sample-fsm-java-lambda") +// +// lazy val sampleMainJava = Sample.project("akka-sample-main-java") +// lazy val sampleMainScala = Sample.project("akka-sample-main-scala") +// lazy val sampleMainJavaLambda = Sample.project("akka-sample-main-java-lambda") +// +// lazy val sampleMultiNodeScala = Sample.project("akka-sample-multi-node-scala") +// +// lazy val samplePersistenceJava = Sample.project("akka-sample-persistence-java") +// lazy val samplePersistenceScala = Sample.project("akka-sample-persistence-scala") +// lazy val samplePersistenceJavaLambda = Sample.project("akka-sample-persistence-java-lambda") +// +// lazy val sampleRemoteJava = Sample.project("akka-sample-remote-java") +// lazy val sampleRemoteScala = Sample.project("akka-sample-remote-scala") +// +// lazy val sampleSupervisionJavaLambda = Sample.project("akka-sample-supervision-java-lambda") +// +// lazy val sampleDistributedDataScala = Sample.project("akka-sample-distributed-data-scala") +// lazy val sampleDistributedDataJava = Sample.project("akka-sample-distributed-data-java") +// +// lazy val osgiDiningHakkersSampleMavenTest = Project( +// id = "akka-sample-osgi-dining-hakkers-maven-test", +// base = file("akka-samples/akka-sample-osgi-dining-hakkers-maven-test") +// ) +// .settings( +// publishArtifact := false, +// // force publication of artifacts to local maven repo, so latest versions can be used when running maven tests +// compile in Compile <<= +// (publishM2 in actor, publishM2 in testkit, publishM2 in remote, publishM2 in cluster, publishM2 in osgi, +// publishM2 in slf4j, publishM2 in persistence, compile in Compile) map +// ((_, _, _, _, _, _, _, c) => c), +// test in Test ~= { x => { +// def executeMvnCommands(failureMessage: String, commands: String*) = { +// if ({List("sh", "-c", commands.mkString("cd akka-samples/akka-sample-osgi-dining-hakkers; mvn ", " ", "")) !} != 0) +// throw new Exception(failureMessage) +// } +// executeMvnCommands("Osgi sample Dining hakkers test failed", "clean", "install") +// }} +// ) +// .disablePlugins(ValidatePullRequest, MimaPlugin) +// .settings(dontPublishSettings: _*) val dontPublishSettings = Seq( publishSigned := (), diff --git a/project/Doc.scala b/project/Doc.scala index 2294bd2315..cb8b3b9b74 100644 --- a/project/Doc.scala +++ b/project/Doc.scala @@ -118,9 +118,10 @@ object UnidocRoot extends AutoPlugin { )) } + // FIXME temporary removal of samples in artery-dev branch override lazy val projectSettings = CliOptions.genjavadocEnabled.ifTrue(scalaJavaUnidocSettings).getOrElse(scalaUnidocSettings) ++ - settings(Seq(AkkaBuild.samples), Seq(AkkaBuild.remoteTests, AkkaBuild.benchJmh, AkkaBuild.parsing, AkkaBuild.protobuf, AkkaBuild.osgiDiningHakkersSampleMavenTest, AkkaBuild.akkaScalaNightly)) + settings(Seq(), Seq(AkkaBuild.remoteTests, AkkaBuild.benchJmh, AkkaBuild.parsing, AkkaBuild.protobuf)) } /** diff --git a/project/Release.scala b/project/Release.scala index 33c898faed..dd2d72830e 100644 --- a/project/Release.scala +++ b/project/Release.scala @@ -33,7 +33,8 @@ object Release { val (state2, Seq(api, japi)) = extracted.runTask(unidoc in Compile, state1) val (state3, docs) = extracted.runTask(generate in Sphinx, state2) val (state4, _) = extracted.runTask(Dist.dist, state3) - val (state5, activatorDist) = extracted.runTask(ActivatorDist.activatorDist in LocalProject(AkkaBuild.samples.id), state4) +// FIXME temporary in artery-dev branch +// val (state5, activatorDist) = extracted.runTask(ActivatorDist.activatorDist in LocalProject(AkkaBuild.samples.id), state4) IO.delete(release) IO.createDirectory(release) @@ -47,10 +48,11 @@ object Release { for (f <- (dist * "akka_*.zip").get) IO.copyFile(f, release / "downloads" / f.name) - for (f <- (activatorDist * "*.zip").get) - IO.copyFile(f, release / "downloads" / f.name) - - state5 +// FIXME temporary in artery-dev branch +// for (f <- (activatorDist * "*.zip").get) +// IO.copyFile(f, release / "downloads" / f.name) +// state5 + state4 } def uploadReleaseCommand = Command.command("uploadRelease") { state =>