Run CoordinatedShutdown from ActorSystem.terminate #25213 (#26830)

This commit is contained in:
Patrik Nordwall 2019-05-03 10:47:47 +02:00 committed by GitHub
parent 81b1e2ef9b
commit 2bbf13f707
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 228 additions and 33 deletions

View file

@ -9,16 +9,18 @@ import java.util
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.Future import scala.concurrent.Future
import akka.Done import akka.Done
import akka.testkit.{ AkkaSpec, EventFilter, TestKit } import akka.testkit.{ AkkaSpec, EventFilter, TestKit }
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import akka.actor.CoordinatedShutdown.Phase import akka.actor.CoordinatedShutdown.Phase
import akka.actor.CoordinatedShutdown.UnknownReason import akka.actor.CoordinatedShutdown.UnknownReason
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.concurrent.Promise import scala.concurrent.Promise
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.ConfigurationException
class CoordinatedShutdownSpec class CoordinatedShutdownSpec
extends AkkaSpec(ConfigFactory.parseString(""" extends AkkaSpec(ConfigFactory.parseString("""
akka.loglevel=INFO akka.loglevel=INFO
@ -320,11 +322,53 @@ class CoordinatedShutdownSpec
confWithOverrides.getInt("exit-code") should ===(-1) confWithOverrides.getInt("exit-code") should ===(-1)
} }
// this must be the last test, since it terminates the ActorSystem
"terminate ActorSystem" in { "terminate ActorSystem" in {
Await.result(CoordinatedShutdown(system).run(CustomReason), 10.seconds) should ===(Done) val sys = ActorSystem(system.name, system.settings.config)
system.whenTerminated.isCompleted should ===(true) try {
CoordinatedShutdown(system).shutdownReason() === (Some(CustomReason)) Await.result(CoordinatedShutdown(sys).run(CustomReason), 10.seconds) should ===(Done)
sys.whenTerminated.isCompleted should ===(true)
CoordinatedShutdown(sys).shutdownReason() should ===(Some(CustomReason))
} finally {
shutdown(sys)
}
}
"be run by ActorSystem.terminate" in {
val sys = ActorSystem(system.name, system.settings.config)
try {
Await.result(sys.terminate(), 10.seconds)
sys.whenTerminated.isCompleted should ===(true)
CoordinatedShutdown(sys).shutdownReason() should ===(Some(CoordinatedShutdown.ActorSystemTerminateReason))
} finally {
shutdown(sys)
}
}
"not be run by ActorSystem.terminate when run-by-actor-system-terminate=off" in {
val sys = ActorSystem(
system.name,
ConfigFactory
.parseString("akka.coordinated-shutdown.run-by-actor-system-terminate = off")
.withFallback(system.settings.config))
try {
Await.result(sys.terminate(), 10.seconds)
sys.whenTerminated.isCompleted should ===(true)
CoordinatedShutdown(sys).shutdownReason() should ===(None)
} finally {
shutdown(sys)
}
}
"not allow terminate-actor-system=off && run-by-actor-system-terminate=on" in {
intercept[ConfigurationException] {
val sys = ActorSystem(
system.name,
ConfigFactory
.parseString("akka.coordinated-shutdown.terminate-actor-system = off")
.withFallback(system.settings.config))
// will only get here if test is failing
shutdown(sys)
}
} }
"add and remove user JVM hooks with run-by-jvm-shutdown-hook = off, terminate-actor-system = off" in new JvmHookTest { "add and remove user JVM hooks with run-by-jvm-shutdown-hook = off, terminate-actor-system = off" in new JvmHookTest {
@ -332,6 +376,7 @@ class CoordinatedShutdownSpec
lazy val systemConfig = ConfigFactory.parseString(""" lazy val systemConfig = ConfigFactory.parseString("""
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off
""") """)
override def withSystemRunning(newSystem: ActorSystem): Unit = { override def withSystemRunning(newSystem: ActorSystem): Unit = {
@ -347,6 +392,7 @@ class CoordinatedShutdownSpec
lazy val systemConfig = ConfigFactory.parseString(""" lazy val systemConfig = ConfigFactory.parseString("""
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on
akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off
""") """)
override def withSystemRunning(newSystem: ActorSystem): Unit = { override def withSystemRunning(newSystem: ActorSystem): Unit = {

View file

@ -67,6 +67,12 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
getBoolean("akka.log-dead-letters-during-shutdown") should ===(true) getBoolean("akka.log-dead-letters-during-shutdown") should ===(true)
settings.LogDeadLettersDuringShutdown should ===(true) settings.LogDeadLettersDuringShutdown should ===(true)
getBoolean("akka.coordinated-shutdown.terminate-actor-system") should ===(true)
settings.CoordinatedShutdownTerminateActorSystem should ===(true)
getBoolean("akka.coordinated-shutdown.run-by-actor-system-terminate") should ===(true)
settings.CoordinatedShutdownRunByActorSystemTerminate should ===(true)
} }
{ {

View file

@ -11,9 +11,11 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.Done import akka.Done
import akka.actor.CoordinatedShutdown
import akka.actor.InvalidMessageException import akka.actor.InvalidMessageException
import akka.actor.testkit.typed.scaladsl.TestInbox import akka.actor.testkit.typed.scaladsl.TestInbox
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import org.scalatest._ import org.scalatest._
import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
@ -55,6 +57,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
} }
inbox.receiveAll() should ===("hello" :: Nil) inbox.receiveAll() should ===("hello" :: Nil)
sys.whenTerminated.futureValue sys.whenTerminated.futureValue
CoordinatedShutdown(sys.toUntyped).shutdownReason() should ===(
Some(CoordinatedShutdown.ActorSystemTerminateReason))
} }
} }
@ -89,6 +93,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
// now we know that the guardian has started, and should receive PostStop // now we know that the guardian has started, and should receive PostStop
sys.terminate() sys.terminate()
sys.whenTerminated.futureValue sys.whenTerminated.futureValue
CoordinatedShutdown(sys.toUntyped).shutdownReason() should ===(
Some(CoordinatedShutdown.ActorSystemTerminateReason))
inbox.receiveAll() should ===("done" :: Nil) inbox.receiveAll() should ===("done" :: Nil)
} }

View file

@ -104,8 +104,15 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: Inter
implicit def executionContext: ExecutionContextExecutor implicit def executionContext: ExecutionContextExecutor
/** /**
* Terminates this actor system. This will stop the guardian actor, which in turn * Terminates this actor system by running [[akka.actor.CoordinatedShutdown]] with reason
* will recursively stop all its child actors, then the system guardian * [[akka.actor.CoordinatedShutdown.ActorSystemTerminateReason]].
*
* If `akka.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off`
* it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors
* will still be terminated.
*
* This will stop the guardian actor, which in turn
* will recursively stop all its child actors, and finally the system guardian
* (below which the logging actors reside). * (below which the logging actors reside).
* *
* This is an asynchronous operation and completion of the termination can * This is an asynchronous operation and completion of the termination can

View file

@ -5,6 +5,9 @@
package akka.actor.typed.internal.adapter package akka.actor.typed.internal.adapter
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.BehaviorInterceptor
import akka.actor.typed.Signal
import akka.actor.typed.TypedActorContext
import akka.actor.typed.scaladsl.AbstractBehavior import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
@ -37,10 +40,50 @@ private[akka] final class GuardianStartupBehavior[T](val guardianBehavior: Behav
msg match { msg match {
case Start => case Start =>
// ctx is not available initially so we cannot use it until here // ctx is not available initially so we cannot use it until here
Behaviors.setup(ctx => stash.unstashAll(ctx.asInstanceOf[ActorContext[T]], guardianBehavior).unsafeCast[Any]) Behaviors.setup(
ctx =>
stash
.unstashAll(
ctx.asInstanceOf[ActorContext[T]],
Behaviors.intercept(new GuardianStopInterceptor[T])(guardianBehavior))
.unsafeCast[Any])
case other => case other =>
stash.stash(other.asInstanceOf[T]) stash.stash(other.asInstanceOf[T])
this this
} }
} }
/**
* INTERNAL API
*
* When the user guardian is stopped the ActorSystem is terminated, but to run CoordinatedShutdown
* as part of that we must intercept when the guardian is stopped and call ActorSystem.terminate()
* explicitly.
*/
@InternalApi private[akka] final class GuardianStopInterceptor[T] extends BehaviorInterceptor[T, T] {
override def aroundReceive(
ctx: TypedActorContext[T],
msg: T,
target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = {
val next = target(ctx, msg)
interceptStopped(ctx, next)
}
override def aroundSignal(
ctx: TypedActorContext[T],
signal: Signal,
target: BehaviorInterceptor.SignalTarget[T]): Behavior[T] = {
val next = target(ctx, signal)
interceptStopped(ctx, next)
}
private def interceptStopped(ctx: TypedActorContext[T], next: Behavior[T]): Behavior[T] = {
if (Behavior.isAlive(next))
next
else {
ctx.asScala.system.terminate()
Behaviors.ignore
}
}
}

View file

@ -7,6 +7,9 @@ ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL")
ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL$*") ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL$*")
ProblemFilters.exclude[MissingClassProblem]("akka.actor.dsl.*") ProblemFilters.exclude[MissingClassProblem]("akka.actor.dsl.*")
# #25213 CoordinatedShutdown from ActorSystem.terminate
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ExtendedActorSystem.finalTerminate")
# #26190 remove actorFor # #26190 remove actorFor
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorCell.actorFor") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorCell.actorFor")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorRefProvider.actorFor") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorRefProvider.actorFor")

View file

@ -1114,6 +1114,11 @@ akka {
# This property is related to `akka.jvm-shutdown-hooks` above. # This property is related to `akka.jvm-shutdown-hooks` above.
run-by-jvm-shutdown-hook = on run-by-jvm-shutdown-hook = on
# Run the coordinated shutdown when ActorSystem.terminate is called.
# Enabling this and disabling terminate-actor-system is not a supported
# combination (will throw ConfigurationException at startup).
run-by-actor-system-terminate = on
# When Coordinated Shutdown is triggered an instance of `Reason` is # When Coordinated Shutdown is triggered an instance of `Reason` is
# required. That value can be used to override the default settings. # required. That value can be used to override the default settings.
# Only 'exit-jvm', 'exit-code' and 'terminate-actor-system' may be # Only 'exit-jvm', 'exit-code' and 'terminate-actor-system' may be

View file

@ -9,13 +9,13 @@ import java.util.concurrent._
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import com.typesafe.config.{ Config, ConfigFactory } import com.typesafe.config.{ Config, ConfigFactory }
import akka.ConfigurationException
import akka.event._ import akka.event._
import akka.dispatch._ import akka.dispatch._
import akka.japi.Util.immutableSeq import akka.japi.Util.immutableSeq
import akka.actor.dungeon.ChildrenContainer import akka.actor.dungeon.ChildrenContainer
import akka.util._ import akka.util._
import akka.util.Helpers.toRootLowerCase import akka.util.Helpers.toRootLowerCase
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise } import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise }
@ -25,7 +25,6 @@ import java.util.Optional
import akka.actor.setup.{ ActorSystemSetup, Setup } import akka.actor.setup.{ ActorSystemSetup, Setup }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import scala.compat.java8.FutureConverters import scala.compat.java8.FutureConverters
import scala.compat.java8.OptionConverters._ import scala.compat.java8.OptionConverters._
@ -387,6 +386,15 @@ object ActorSystem {
final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error") final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error")
final val JvmShutdownHooks: Boolean = getBoolean("akka.jvm-shutdown-hooks") final val JvmShutdownHooks: Boolean = getBoolean("akka.jvm-shutdown-hooks")
final val CoordinatedShutdownTerminateActorSystem: Boolean = getBoolean(
"akka.coordinated-shutdown.terminate-actor-system")
final val CoordinatedShutdownRunByActorSystemTerminate: Boolean = getBoolean(
"akka.coordinated-shutdown.run-by-actor-system-terminate")
if (CoordinatedShutdownRunByActorSystemTerminate && !CoordinatedShutdownTerminateActorSystem)
throw new ConfigurationException(
"akka.coordinated-shutdown.run-by-actor-system-terminate=on and " +
"akka.coordinated-shutdown.terminate-actor-system=off is not a supported configuration combination.")
final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor") final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor")
if (ConfigVersion != Version) if (ConfigVersion != Version)
@ -571,12 +579,19 @@ abstract class ActorSystem extends ActorRefFactory {
def registerOnTermination(code: Runnable): Unit def registerOnTermination(code: Runnable): Unit
/** /**
* Terminates this actor system. This will stop the guardian actor, which in turn * Terminates this actor system by running [[CoordinatedShutdown]] with reason
* will recursively stop all its child actors, the system guardian * [[CoordinatedShutdown.ActorSystemTerminateReason]].
*
* If `akka.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off`
* it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors
* will still be terminated.
*
* This will stop the guardian actor, which in turn
* will recursively stop all its child actors, and finally the system guardian
* (below which the logging actors reside) and then execute all registered * (below which the logging actors reside) and then execute all registered
* termination handlers (see [[ActorSystem#registerOnTermination]]). * termination handlers (see [[ActorSystem#registerOnTermination]]).
* Be careful to not schedule any operations on completion of the returned future * Be careful to not schedule any operations on completion of the returned future
* using the `dispatcher` of this actor system as it will have been shut down before the * using the dispatcher of this actor system as it will have been shut down before the
* future completes. * future completes.
*/ */
def terminate(): Future[Terminated] def terminate(): Future[Terminated]
@ -682,6 +697,11 @@ abstract class ExtendedActorSystem extends ActorSystem {
*/ */
private[akka] def printTree: String private[akka] def printTree: String
/**
* INTERNAL API: final step of `terminate()`
*/
@InternalApi private[akka] def finalTerminate(): Unit
} }
/** /**
@ -936,9 +956,25 @@ private[akka] class ActorSystemImpl(
def registerOnTermination(code: Runnable): Unit = { terminationCallbacks.add(code) } def registerOnTermination(code: Runnable): Unit = { terminationCallbacks.add(code) }
override def terminate(): Future[Terminated] = { override def terminate(): Future[Terminated] = {
if (settings.CoordinatedShutdownRunByActorSystemTerminate && !aborting) {
// Note that the combination CoordinatedShutdownRunByActorSystemTerminate==true &&
// CoordinatedShutdownTerminateActorSystem==false is disallowed, checked in Settings.
// It's not a combination that is valuable to support and it would be complicated to
// protect against concurrency race conditions between calls to ActorSystem.terminate()
// and CoordinateShutdown.run()
// it will call finalTerminate() at the end
CoordinatedShutdown(this).run(CoordinatedShutdown.ActorSystemTerminateReason)
} else {
finalTerminate()
}
whenTerminated
}
override private[akka] def finalTerminate(): Unit = {
// these actions are idempotent
if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener.foreach(stop) if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener.foreach(stop)
guardian.stop() guardian.stop()
whenTerminated
} }
@volatile var aborting = false @volatile var aborting = false
@ -946,8 +982,8 @@ private[akka] class ActorSystemImpl(
/** /**
* This kind of shutdown attempts to bring the system down and release its * This kind of shutdown attempts to bring the system down and release its
* resources more forcefully than plain shutdown. For example it will not * resources more forcefully than plain shutdown. For example it will not
* wait for remote-deployed child actors to terminate before terminating their * run CoordinatedShutdown and not wait for remote-deployed child actors to
* parents. * terminate before terminating their parents.
*/ */
def abort(): Unit = { def abort(): Unit = {
aborting = true aborting = true

View file

@ -120,10 +120,20 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
*/ */
def unknownReason: Reason = UnknownReason def unknownReason: Reason = UnknownReason
/**
* Scala API: The shutdown was initiated by ActorSystem.terminate.
*/
case object ActorSystemTerminateReason extends Reason
/**
* Java API: The shutdown was initiated by ActorSystem.terminate.
*/
def actorSystemTerminateReason: Reason = ActorSystemTerminateReason
/** /**
* Scala API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM. * Scala API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM.
*/ */
object JvmExitReason extends Reason case object JvmExitReason extends Reason
/** /**
* Java API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM. * Java API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM.
@ -133,7 +143,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
/** /**
* Scala API: The shutdown was initiated by Cluster downing. * Scala API: The shutdown was initiated by Cluster downing.
*/ */
object ClusterDowningReason extends Reason case object ClusterDowningReason extends Reason
/** /**
* Java API: The shutdown was initiated by Cluster downing. * Java API: The shutdown was initiated by Cluster downing.
@ -143,7 +153,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
/** /**
* Scala API: The shutdown was initiated by a failure to join a seed node. * Scala API: The shutdown was initiated by a failure to join a seed node.
*/ */
object ClusterJoinUnsuccessfulReason extends Reason case object ClusterJoinUnsuccessfulReason extends Reason
/** /**
* Java API: The shutdown was initiated by a failure to join a seed node. * Java API: The shutdown was initiated by a failure to join a seed node.
@ -163,7 +173,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
/** /**
* Scala API: The shutdown was initiated by Cluster leaving. * Scala API: The shutdown was initiated by Cluster leaving.
*/ */
object ClusterLeavingReason extends Reason case object ClusterLeavingReason extends Reason
/** /**
* Java API: The shutdown was initiated by Cluster leaving. * Java API: The shutdown was initiated by Cluster leaving.
@ -211,7 +221,10 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
.getOrElse(conf) .getOrElse(conf)
} }
private def initPhaseActorSystemTerminate(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = { private def initPhaseActorSystemTerminate(
system: ExtendedActorSystem,
conf: Config,
coord: CoordinatedShutdown): Unit = {
coord.addTask(PhaseActorSystemTerminate, "terminate-system") { () => coord.addTask(PhaseActorSystemTerminate, "terminate-system") { () =>
val confForReason = confWithOverrides(conf, coord.shutdownReason()) val confForReason = confWithOverrides(conf, coord.shutdownReason())
val terminateActorSystem = confForReason.getBoolean("terminate-actor-system") val terminateActorSystem = confForReason.getBoolean("terminate-actor-system")
@ -235,9 +248,8 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
} }
if (terminateActorSystem) { if (terminateActorSystem) {
system system.finalTerminate()
.terminate() system.whenTerminated.map { _ =>
.map { _ =>
if (exitJvm && !runningJvmHook) System.exit(exitCode) if (exitJvm && !runningJvmHook) System.exit(exitCode)
Done Done
}(ExecutionContexts.sameThreadExecutionContext) }(ExecutionContexts.sameThreadExecutionContext)
@ -456,6 +468,7 @@ final class CoordinatedShutdown private[akka] (
if (runStarted.compareAndSet(None, Some(reason))) { if (runStarted.compareAndSet(None, Some(reason))) {
implicit val ec = system.dispatchers.internalDispatcher implicit val ec = system.dispatchers.internalDispatcher
val debugEnabled = log.isDebugEnabled val debugEnabled = log.isDebugEnabled
log.debug("Running CoordinatedShutdown with reason [{}]", reason)
def loop(remainingPhases: List[String]): Future[Done] = { def loop(remainingPhases: List[String]): Future[Done] = {
remainingPhases match { remainingPhases match {
case Nil => Future.successful(Done) case Nil => Future.successful(Done)
@ -602,7 +615,7 @@ final class CoordinatedShutdown private[akka] (
* shutdown hooks the standard library JVM shutdown hooks APIs are better suited. * shutdown hooks the standard library JVM shutdown hooks APIs are better suited.
*/ */
@tailrec def addCancellableJvmShutdownHook[T](hook: => T): Cancellable = { @tailrec def addCancellableJvmShutdownHook[T](hook: => T): Cancellable = {
if (runStarted.get == None) { if (runStarted.get.isEmpty) {
val currentLatch = _jvmHooksLatch.get val currentLatch = _jvmHooksLatch.get
val newLatch = new CountDownLatch(currentLatch.getCount.toInt + 1) val newLatch = new CountDownLatch(currentLatch.getCount.toInt + 1)
if (_jvmHooksLatch.compareAndSet(currentLatch, newLatch)) { if (_jvmHooksLatch.compareAndSet(currentLatch, newLatch)) {

View file

@ -47,6 +47,7 @@ object ClusterShardingSpec {
akka.cluster.sharding.number-of-shards = 10 akka.cluster.sharding.number-of-shards = 10
akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off
akka.actor { akka.actor {
serialize-messages = off serialize-messages = off

View file

@ -26,6 +26,7 @@ public class ClusterApiTest extends JUnitSuite {
+ "akka.remote.artery.canonical.hostname = 127.0.0.1 \n" + "akka.remote.artery.canonical.hostname = 127.0.0.1 \n"
+ "akka.cluster.jmx.multi-mbeans-in-same-jvm = on \n" + "akka.cluster.jmx.multi-mbeans-in-same-jvm = on \n"
+ "akka.coordinated-shutdown.terminate-actor-system = off \n" + "akka.coordinated-shutdown.terminate-actor-system = off \n"
+ "akka.coordinated-shutdown.run-by-actor-system-terminate = off \n"
+ "akka.actor { \n" + "akka.actor { \n"
+ " serialize-messages = off \n" + " serialize-messages = off \n"
+ " allow-java-serialization = off \n" + " allow-java-serialization = off \n"

View file

@ -10,21 +10,25 @@ import scala.concurrent.duration._
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.Done import akka.Done
import akka.actor.CoordinatedShutdown
import akka.actor.InvalidMessageException import akka.actor.InvalidMessageException
import akka.actor.testkit.typed.scaladsl.TestInbox import akka.actor.testkit.typed.scaladsl.TestInbox
import akka.actor.typed.ActorRef import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.PostStop import akka.actor.typed.PostStop
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest._ import org.scalatest._
import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.Span
class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with Eventually { class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with Eventually {
override implicit val patienceConfig = PatienceConfig(1.second) implicit val patience: PatienceConfig = PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis))
val config = ConfigFactory.parseString(""" val config = ConfigFactory.parseString("""
akka.actor.provider = cluster akka.actor.provider = cluster
akka.remote.classic.netty.tcp.port = 0 akka.remote.classic.netty.tcp.port = 0
@ -65,6 +69,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
} }
inbox.receiveAll() should ===("hello" :: Nil) inbox.receiveAll() should ===("hello" :: Nil)
sys.whenTerminated.futureValue sys.whenTerminated.futureValue
CoordinatedShutdown(sys.toUntyped).shutdownReason() should ===(
Some(CoordinatedShutdown.ActorSystemTerminateReason))
} }
} }
@ -99,6 +105,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with
// now we know that the guardian has started, and should receive PostStop // now we know that the guardian has started, and should receive PostStop
sys.terminate() sys.terminate()
sys.whenTerminated.futureValue sys.whenTerminated.futureValue
CoordinatedShutdown(sys.toUntyped).shutdownReason() should ===(
Some(CoordinatedShutdown.ActorSystemTerminateReason))
inbox.receiveAll() should ===("done" :: Nil) inbox.receiveAll() should ===("done" :: Nil)
} }

View file

@ -23,6 +23,7 @@ object ClusterApiSpec {
akka.remote.artery.canonical.hostname = 127.0.0.1 akka.remote.artery.canonical.hostname = 127.0.0.1
akka.cluster.jmx.multi-mbeans-in-same-jvm = on akka.cluster.jmx.multi-mbeans-in-same-jvm = on
akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off
akka.actor { akka.actor {
serialize-messages = off serialize-messages = off
allow-java-serialization = off allow-java-serialization = off

View file

@ -20,6 +20,7 @@ object ClusterSingletonPersistenceSpec {
akka.remote.artery.canonical.hostname = 127.0.0.1 akka.remote.artery.canonical.hostname = 127.0.0.1
akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off
akka.actor { akka.actor {
serialize-messages = off serialize-messages = off

View file

@ -14,7 +14,6 @@ object InitialMembersOfNewDcSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString(s""" commonConfig(ConfigFactory.parseString(s"""
akka.actor.provider = cluster akka.actor.provider = cluster
akka.actor.warn-about-java-serializer-usage = off akka.actor.warn-about-java-serializer-usage = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster { akka.cluster {
jmx.enabled = off jmx.enabled = off
debug.verbose-gossip-logging = on debug.verbose-gossip-logging = on

View file

@ -1095,6 +1095,8 @@ To enable a hard `System.exit` as a final action you can configure:
akka.coordinated-shutdown.exit-jvm = on akka.coordinated-shutdown.exit-jvm = on
``` ```
The coordinated shutdown process can also be started by calling `ActorSystem.terminate()`.
When using @ref:[Akka Cluster](cluster-usage.md) the `CoordinatedShutdown` will automatically run When using @ref:[Akka Cluster](cluster-usage.md) the `CoordinatedShutdown` will automatically run
when the cluster node sees itself as `Exiting`, i.e. leaving from another node will trigger when the cluster node sees itself as `Exiting`, i.e. leaving from another node will trigger
the shutdown process on the leaving node. Tasks for graceful leaving of cluster including graceful the shutdown process on the leaving node. Tasks for graceful leaving of cluster including graceful
@ -1125,6 +1127,7 @@ used in the test:
``` ```
# Don't terminate ActorSystem via CoordinatedShutdown in tests # Don't terminate ActorSystem via CoordinatedShutdown in tests
akka.coordinated-shutdown.terminate-actor-system = off akka.coordinated-shutdown.terminate-actor-system = off
akka.coordinated-shutdown.run-by-actor-system-terminate = off
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.cluster.run-coordinated-shutdown-when-down = off akka.cluster.run-coordinated-shutdown-when-down = off
``` ```

View file

@ -115,9 +115,9 @@ while Akka does the heavy lifting under the hood.
## Terminating ActorSystem ## Terminating ActorSystem
When you know everything is done for your application, you can call the When you know everything is done for your application, you can call the
`terminate` method of `ActorSystem`. That will stop the guardian `terminate` method of `ActorSystem`. That will run @ref:[`CoordinatedShutdown`](../actors.md#coordinated-shutdown)
actor, which in turn will recursively stop all its child actors, the system followed by stopping the guardian actor, which in turn will recursively stop all its child actors,
guardian. and finally the system guardian.
If you want to execute some operations while terminating `ActorSystem`, If you want to execute some operations while terminating `ActorSystem`,
look at @ref:[`CoordinatedShutdown`](../actors.md#coordinated-shutdown). look at @ref:[`CoordinatedShutdown`](../actors.md#coordinated-shutdown).

View file

@ -153,3 +153,18 @@ The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no
The configuration `akka.cluster.sharding.passivate-idle-entity-after` is now enabled by default. The configuration `akka.cluster.sharding.passivate-idle-entity-after` is now enabled by default.
Sharding will passivate entities when they have not received any messages after this duration. Sharding will passivate entities when they have not received any messages after this duration.
Set Set
## CoordinatedShutdown is run from ActorSystem.terminate
No migration is needed but it is mentioned here because it is a change in behavior.
When `ActorSystem.terminate()` is called, @ref:[`CoordinatedShutdown`](../actors.md#coordinated-shutdown)
will be run in Akka 2.6.x, which wasn't the case in 2.5.x. For example, if using Akka Cluster this means that
member will attempt to leave the cluster gracefully.
If this is not desired behavior, for example in tests, you can disable this feature with the following configuration
and then it will behave as in Akka 2.5.x:
```
akka.coordinated-shutdown.run-by-actor-system-terminate = off
```

View file

@ -225,6 +225,7 @@ object MultiNodeSpec {
loglevel = "WARNING" loglevel = "WARNING"
stdout-loglevel = "WARNING" stdout-loglevel = "WARNING"
coordinated-shutdown.terminate-actor-system = off coordinated-shutdown.terminate-actor-system = off
coordinated-shutdown.run-by-actor-system-terminate = off
coordinated-shutdown.run-by-jvm-shutdown-hook = off coordinated-shutdown.run-by-jvm-shutdown-hook = off
actor { actor {
default-dispatcher { default-dispatcher {