From 3d3a3528bf37f07930cd275f3f629178b64039b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 2 Sep 2016 16:40:38 +0200 Subject: [PATCH] Eliminated JVM crashing race on transport stop (#21337) * Eliminated JVM crashing race on transport stop * Fail explicitly if started more than once --- .../akka/remote/artery/ArteryTransport.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 3aa3c6bfd6..16f82393c2 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -6,10 +6,9 @@ package akka.remote.artery import java.io.File import java.net.InetSocketAddress import java.nio.channels.{ DatagramChannel, FileChannel } - import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.atomic.{ AtomicLong, AtomicReference } import scala.annotation.tailrec import scala.collection.JavaConverters._ @@ -20,7 +19,6 @@ import scala.util.Failure import scala.util.Success import scala.util.Try import scala.util.control.NonFatal - import akka.Done import akka.NotUsed import akka.actor._ @@ -276,7 +274,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var materializer: Materializer = _ @volatile private[this] var controlSubject: ControlMessageSubject = _ @volatile private[this] var messageDispatcher: MessageDispatcher = _ - @volatile private[this] var mediaDriver: Option[MediaDriver] = None + private[this] val mediaDriver = new AtomicReference[Option[MediaDriver]](None) @volatile private[this] var aeron: Aeron = _ @volatile private[this] var aeronErrorLogTask: Cancellable = _ @@ -438,19 +436,24 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R log.debug("Started embedded media driver in directory [{}]", driver.aeronDirectoryName) topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName().getBytes("US-ASCII")) Runtime.getRuntime.addShutdownHook(stopMediaDriverShutdownHook) - mediaDriver = Some(driver) + if (!mediaDriver.compareAndSet(None, Some(driver))) { + throw new IllegalStateException("media driver started more than once") + } } } - private def aeronDir: String = mediaDriver match { + private def aeronDir: String = mediaDriver.get match { case Some(driver) ⇒ driver.aeronDirectoryName case None ⇒ settings.Advanced.AeronDirectoryName } private def stopMediaDriver(): Unit = { - mediaDriver.foreach { driver ⇒ + // make sure we only close the driver once or we will crash the JVM + val maybeDriver = mediaDriver.getAndSet(None) + maybeDriver.foreach { driver ⇒ // this is only for embedded media driver driver.close() + try { if (settings.Advanced.DeleteAeronDirectory) { IoUtil.delete(new File(driver.aeronDirectoryName), false) @@ -461,8 +464,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R "Couldn't delete Aeron embedded media driver files in [{}] due to [{}]", driver.aeronDirectoryName, e.getMessage) } + Try(Runtime.getRuntime.removeShutdownHook(stopMediaDriverShutdownHook)) } - Try(Runtime.getRuntime.removeShutdownHook(stopMediaDriverShutdownHook)) } // TODO: Add FR events @@ -683,7 +686,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) } if (aeron != null) aeron.close() - if (mediaDriver.isDefined) { + if (mediaDriver.get.isDefined) { stopMediaDriver() topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData) }