Merge pull request #25593 from akka/wip-25592-terminated-patriknw

hardening of CoordinatedShutdown init, #25592
This commit is contained in:
Patrik Nordwall 2018-10-03 16:54:55 +02:00 committed by GitHub
commit 272cceb12b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 3 deletions

View file

@ -4,6 +4,7 @@
package akka.actor package akka.actor
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{ ConcurrentLinkedQueue, RejectedExecutionException } import java.util.concurrent.{ ConcurrentLinkedQueue, RejectedExecutionException }
import akka.actor.setup.ActorSystemSetup import akka.actor.setup.ActorSystemSetup
@ -15,7 +16,6 @@ import akka.testkit.TestKit
import akka.util.Helpers.ConfigOps import akka.util.Helpers.ConfigOps
import akka.util.{ Switch, Timeout } import akka.util.{ Switch, Timeout }
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future } import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.language.postfixOps import scala.language.postfixOps
@ -236,7 +236,7 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
} }
"throw RejectedExecutionException when shutdown" in { "throw RejectedExecutionException when shutdown" in {
val system2 = ActorSystem("AwaitTermination", AkkaSpec.testConf) val system2 = ActorSystem("RejectedExecution-1", AkkaSpec.testConf)
Await.ready(system2.terminate(), 10 seconds) Await.ready(system2.terminate(), 10 seconds)
intercept[RejectedExecutionException] { intercept[RejectedExecutionException] {
@ -244,6 +244,22 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
}.getMessage should ===("ActorSystem already terminated.") }.getMessage should ===("ActorSystem already terminated.")
} }
"throw RejectedExecutionException or run termination callback when shutting down" in {
val system2 = ActorSystem("RejectedExecution-2", AkkaSpec.testConf)
// using counter to detect double calls
val count = new AtomicInteger
try {
system2.terminate()
system2.registerOnTermination { count.incrementAndGet() }
} catch {
case _: RejectedExecutionException count.incrementAndGet()
}
Await.ready(system2.whenTerminated, 10.seconds)
count.get() should ===(1)
}
"reliably create waves of actors" in { "reliably create waves of actors" in {
import system.dispatcher import system.dispatcher
implicit val timeout = Timeout((20 seconds).dilated) implicit val timeout = Timeout((20 seconds).dilated)

View file

@ -410,6 +410,22 @@ class CoordinatedShutdownSpec extends AkkaSpec(ConfigFactory.parseString(
cancellable.cancel() cancellable.cancel()
} }
} }
"access extension after system termination" in new JvmHookTest {
lazy val systemName = s"CoordinatedShutdownSpec-terminated-${System.currentTimeMillis()}"
lazy val systemConfig = ConfigFactory.parseString(
"""
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on
akka.coordinated-shutdown.terminate-actor-system = on
""")
def withSystemRunning(newSystem: ActorSystem): Unit = {
TestKit.shutdownActorSystem(newSystem)
CoordinatedShutdown(newSystem)
}
}
} }
abstract class JvmHookTest { abstract class JvmHookTest {

View file

@ -161,7 +161,10 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
initPhaseActorSystemTerminate(system, conf, coord) initPhaseActorSystemTerminate(system, conf, coord)
initJvmHook(system, conf, coord) initJvmHook(system, conf, coord)
// Avoid leaking actor system references when system is terminated before JVM is #23384 // Avoid leaking actor system references when system is terminated before JVM is #23384
system.registerOnTermination { // Catching RejectedExecutionException in case extension is accessed first time when
// system is already terminated, see #25592. The extension is eagerly loaded when ActorSystem
// is started but it might be a race between (failing?) startup and shutdown.
def cleanupActorSystemJvmHook(): Unit = {
coord.actorSystemJvmHook match { coord.actorSystemJvmHook match {
case OptionVal.Some(cancellable) if !runningJvmHook && !cancellable.isCancelled case OptionVal.Some(cancellable) if !runningJvmHook && !cancellable.isCancelled
cancellable.cancel() cancellable.cancel()
@ -169,6 +172,10 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
case _ case _
} }
} }
try system.registerOnTermination(cleanupActorSystemJvmHook())
catch {
case _: RejectedExecutionException cleanupActorSystemJvmHook()
}
coord coord
} }