From 260ad313e250cc71eb9378ad577fc5a554f14013 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 11 Sep 2018 11:30:42 +0200 Subject: [PATCH] hardening of CoordinatedShutdown init, #25592 --- .../scala/akka/actor/ActorSystemSpec.scala | 20 +++++++++++++++++-- .../akka/actor/CoordinatedShutdownSpec.scala | 16 +++++++++++++++ .../akka/actor/CoordinatedShutdown.scala | 9 ++++++++- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index cddbd3aede..e0952d3684 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -4,6 +4,7 @@ package akka.actor +import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ ConcurrentLinkedQueue, RejectedExecutionException } import akka.actor.setup.ActorSystemSetup @@ -15,7 +16,6 @@ import akka.testkit.TestKit import akka.util.Helpers.ConfigOps import akka.util.{ Switch, Timeout } import com.typesafe.config.{ Config, ConfigFactory } - import scala.concurrent.duration._ import scala.concurrent.{ Await, ExecutionContext, Future } import scala.language.postfixOps @@ -236,7 +236,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend } "throw RejectedExecutionException when shutdown" in { - val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf) + val system2 = ActorSystem("RejectedExecution-1", AkkaSpec.testConf) Await.ready(system2.terminate(), 10 seconds) intercept[RejectedExecutionException] { @@ -244,6 +244,22 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend }.getMessage should ===("ActorSystem already terminated.") } + "throw RejectedExecutionException or run termination callback when shutting down" in { + val system2 = ActorSystem("RejectedExecution-2", AkkaSpec.testConf) + // using counter to detect double calls + val count = new AtomicInteger + + try { + system2.terminate() + system2.registerOnTermination { count.incrementAndGet() } + } catch { + case _: RejectedExecutionException ⇒ count.incrementAndGet() + } + + Await.ready(system2.whenTerminated, 10.seconds) + count.get() should ===(1) + } + "reliably create waves of actors" in { import system.dispatcher implicit val timeout = Timeout((20 seconds).dilated) diff --git a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala index 2b91704538..77bf08672c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala @@ -410,6 +410,22 @@ class CoordinatedShutdownSpec extends AkkaSpec(ConfigFactory.parseString( cancellable.cancel() } } + + "access extension after system termination" in new JvmHookTest { + lazy val systemName = s"CoordinatedShutdownSpec-terminated-${System.currentTimeMillis()}" + lazy val systemConfig = ConfigFactory.parseString( + """ + akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on + akka.coordinated-shutdown.terminate-actor-system = on + """) + + def withSystemRunning(newSystem: ActorSystem): Unit = { + TestKit.shutdownActorSystem(newSystem) + CoordinatedShutdown(newSystem) + + } + } + } abstract class JvmHookTest { diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index 751722c690..e53857ea6b 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -161,7 +161,10 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi initPhaseActorSystemTerminate(system, conf, coord) initJvmHook(system, conf, coord) // Avoid leaking actor system references when system is terminated before JVM is #23384 - system.registerOnTermination { + // Catching RejectedExecutionException in case extension is accessed first time when + // system is already terminated, see #25592. The extension is eagerly loaded when ActorSystem + // is started but it might be a race between (failing?) startup and shutdown. + def cleanupActorSystemJvmHook(): Unit = { coord.actorSystemJvmHook match { case OptionVal.Some(cancellable) if !runningJvmHook && !cancellable.isCancelled ⇒ cancellable.cancel() @@ -169,6 +172,10 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi case _ ⇒ } } + try system.registerOnTermination(cleanupActorSystemJvmHook()) + catch { + case _: RejectedExecutionException ⇒ cleanupActorSystemJvmHook() + } coord }