diff --git a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala index df67112230..7c3f3a341f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala @@ -9,16 +9,18 @@ import java.util import scala.concurrent.duration._ import scala.concurrent.Await import scala.concurrent.Future + import akka.Done import akka.testkit.{ AkkaSpec, EventFilter, TestKit } import com.typesafe.config.{ Config, ConfigFactory } import akka.actor.CoordinatedShutdown.Phase import akka.actor.CoordinatedShutdown.UnknownReason - import scala.collection.JavaConverters._ import scala.concurrent.Promise import java.util.concurrent.TimeoutException +import akka.ConfigurationException + class CoordinatedShutdownSpec extends AkkaSpec(ConfigFactory.parseString(""" akka.loglevel=INFO @@ -320,11 +322,53 @@ class CoordinatedShutdownSpec confWithOverrides.getInt("exit-code") should ===(-1) } - // this must be the last test, since it terminates the ActorSystem "terminate ActorSystem" in { - Await.result(CoordinatedShutdown(system).run(CustomReason), 10.seconds) should ===(Done) - system.whenTerminated.isCompleted should ===(true) - CoordinatedShutdown(system).shutdownReason() === (Some(CustomReason)) + val sys = ActorSystem(system.name, system.settings.config) + try { + Await.result(CoordinatedShutdown(sys).run(CustomReason), 10.seconds) should ===(Done) + sys.whenTerminated.isCompleted should ===(true) + CoordinatedShutdown(sys).shutdownReason() should ===(Some(CustomReason)) + } finally { + shutdown(sys) + } + } + + "be run by ActorSystem.terminate" in { + val sys = ActorSystem(system.name, system.settings.config) + try { + Await.result(sys.terminate(), 10.seconds) + sys.whenTerminated.isCompleted should ===(true) + CoordinatedShutdown(sys).shutdownReason() should ===(Some(CoordinatedShutdown.ActorSystemTerminateReason)) + } finally { + shutdown(sys) + } + } + + "not be run by ActorSystem.terminate when run-by-actor-system-terminate=off" in { + val sys = ActorSystem( + system.name, + ConfigFactory + .parseString("akka.coordinated-shutdown.run-by-actor-system-terminate = off") + .withFallback(system.settings.config)) + try { + Await.result(sys.terminate(), 10.seconds) + sys.whenTerminated.isCompleted should ===(true) + CoordinatedShutdown(sys).shutdownReason() should ===(None) + } finally { + shutdown(sys) + } + } + + "not allow terminate-actor-system=off && run-by-actor-system-terminate=on" in { + intercept[ConfigurationException] { + val sys = ActorSystem( + system.name, + ConfigFactory + .parseString("akka.coordinated-shutdown.terminate-actor-system = off") + .withFallback(system.settings.config)) + // will only get here if test is failing + shutdown(sys) + } } "add and remove user JVM hooks with run-by-jvm-shutdown-hook = off, terminate-actor-system = off" in new JvmHookTest { @@ -332,6 +376,7 @@ class CoordinatedShutdownSpec lazy val systemConfig = ConfigFactory.parseString(""" akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off akka.coordinated-shutdown.terminate-actor-system = off + akka.coordinated-shutdown.run-by-actor-system-terminate = off """) override def withSystemRunning(newSystem: ActorSystem): Unit = { @@ -347,6 +392,7 @@ class CoordinatedShutdownSpec lazy val systemConfig = ConfigFactory.parseString(""" akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on akka.coordinated-shutdown.terminate-actor-system = off + akka.coordinated-shutdown.run-by-actor-system-terminate = off """) override def withSystemRunning(newSystem: ActorSystem): Unit = { diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 593be6b6f9..f9cf6fee43 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -67,6 +67,12 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin getBoolean("akka.log-dead-letters-during-shutdown") should ===(true) settings.LogDeadLettersDuringShutdown should ===(true) + + getBoolean("akka.coordinated-shutdown.terminate-actor-system") should ===(true) + settings.CoordinatedShutdownTerminateActorSystem should ===(true) + + getBoolean("akka.coordinated-shutdown.run-by-actor-system-terminate") should ===(true) + settings.CoordinatedShutdownRunByActorSystemTerminate should ===(true) } { diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala index d13d40dd6e..8b1407c813 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/internal/ActorSystemSpec.scala @@ -11,9 +11,11 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.Done +import akka.actor.CoordinatedShutdown import akka.actor.InvalidMessageException import akka.actor.testkit.typed.scaladsl.TestInbox import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter._ import org.scalatest._ import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.ScalaFutures @@ -55,6 +57,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with } inbox.receiveAll() should ===("hello" :: Nil) sys.whenTerminated.futureValue + CoordinatedShutdown(sys.toUntyped).shutdownReason() should ===( + Some(CoordinatedShutdown.ActorSystemTerminateReason)) } } @@ -89,6 +93,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with // now we know that the guardian has started, and should receive PostStop sys.terminate() sys.whenTerminated.futureValue + CoordinatedShutdown(sys.toUntyped).shutdownReason() should ===( + Some(CoordinatedShutdown.ActorSystemTerminateReason)) inbox.receiveAll() should ===("done" :: Nil) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala index 3946027c1c..93e06100a8 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/ActorSystem.scala @@ -104,8 +104,15 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: Inter implicit def executionContext: ExecutionContextExecutor /** - * Terminates this actor system. This will stop the guardian actor, which in turn - * will recursively stop all its child actors, then the system guardian + * Terminates this actor system by running [[akka.actor.CoordinatedShutdown]] with reason + * [[akka.actor.CoordinatedShutdown.ActorSystemTerminateReason]]. + * + * If `akka.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off` + * it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors + * will still be terminated. + * + * This will stop the guardian actor, which in turn + * will recursively stop all its child actors, and finally the system guardian * (below which the logging actors reside). * * This is an asynchronous operation and completion of the termination can diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala index 35960b7902..29cb359924 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala @@ -5,6 +5,9 @@ package akka.actor.typed.internal.adapter import akka.actor.typed.Behavior +import akka.actor.typed.BehaviorInterceptor +import akka.actor.typed.Signal +import akka.actor.typed.TypedActorContext import akka.actor.typed.scaladsl.AbstractBehavior import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors @@ -37,10 +40,50 @@ private[akka] final class GuardianStartupBehavior[T](val guardianBehavior: Behav msg match { case Start => // ctx is not available initially so we cannot use it until here - Behaviors.setup(ctx => stash.unstashAll(ctx.asInstanceOf[ActorContext[T]], guardianBehavior).unsafeCast[Any]) + Behaviors.setup( + ctx => + stash + .unstashAll( + ctx.asInstanceOf[ActorContext[T]], + Behaviors.intercept(new GuardianStopInterceptor[T])(guardianBehavior)) + .unsafeCast[Any]) case other => stash.stash(other.asInstanceOf[T]) this } } + +/** + * INTERNAL API + * + * When the user guardian is stopped the ActorSystem is terminated, but to run CoordinatedShutdown + * as part of that we must intercept when the guardian is stopped and call ActorSystem.terminate() + * explicitly. + */ +@InternalApi private[akka] final class GuardianStopInterceptor[T] extends BehaviorInterceptor[T, T] { + override def aroundReceive( + ctx: TypedActorContext[T], + msg: T, + target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + val next = target(ctx, msg) + interceptStopped(ctx, next) + } + + override def aroundSignal( + ctx: TypedActorContext[T], + signal: Signal, + target: BehaviorInterceptor.SignalTarget[T]): Behavior[T] = { + val next = target(ctx, signal) + interceptStopped(ctx, next) + } + + private def interceptStopped(ctx: TypedActorContext[T], next: Behavior[T]): Behavior[T] = { + if (Behavior.isAlive(next)) + next + else { + ctx.asScala.system.terminate() + Behaviors.ignore + } + } +} diff --git a/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes index 5a85a3b507..fbad9af603 100644 --- a/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.x.backwards.excludes @@ -7,6 +7,9 @@ ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL") ProblemFilters.exclude[MissingClassProblem]("akka.actor.ActorDSL$*") ProblemFilters.exclude[MissingClassProblem]("akka.actor.dsl.*") +# #25213 CoordinatedShutdown from ActorSystem.terminate +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.ExtendedActorSystem.finalTerminate") + # #26190 remove actorFor ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorCell.actorFor") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorRefProvider.actorFor") diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index f2b3fa5be9..0e6e09fdbb 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -1114,6 +1114,11 @@ akka { # This property is related to `akka.jvm-shutdown-hooks` above. run-by-jvm-shutdown-hook = on + # Run the coordinated shutdown when ActorSystem.terminate is called. + # Enabling this and disabling terminate-actor-system is not a supported + # combination (will throw ConfigurationException at startup). + run-by-actor-system-terminate = on + # When Coordinated Shutdown is triggered an instance of `Reason` is # required. That value can be used to override the default settings. # Only 'exit-jvm', 'exit-code' and 'terminate-actor-system' may be diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 823618deeb..047dfcd9a4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -9,13 +9,13 @@ import java.util.concurrent._ import java.util.concurrent.atomic.AtomicReference import com.typesafe.config.{ Config, ConfigFactory } +import akka.ConfigurationException import akka.event._ import akka.dispatch._ import akka.japi.Util.immutableSeq import akka.actor.dungeon.ChildrenContainer import akka.util._ import akka.util.Helpers.toRootLowerCase - import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, Future, Promise } @@ -25,7 +25,6 @@ import java.util.Optional import akka.actor.setup.{ ActorSystemSetup, Setup } import akka.annotation.InternalApi - import scala.compat.java8.FutureConverters import scala.compat.java8.OptionConverters._ @@ -387,6 +386,15 @@ object ActorSystem { final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error") final val JvmShutdownHooks: Boolean = getBoolean("akka.jvm-shutdown-hooks") + final val CoordinatedShutdownTerminateActorSystem: Boolean = getBoolean( + "akka.coordinated-shutdown.terminate-actor-system") + final val CoordinatedShutdownRunByActorSystemTerminate: Boolean = getBoolean( + "akka.coordinated-shutdown.run-by-actor-system-terminate") + if (CoordinatedShutdownRunByActorSystemTerminate && !CoordinatedShutdownTerminateActorSystem) + throw new ConfigurationException( + "akka.coordinated-shutdown.run-by-actor-system-terminate=on and " + + "akka.coordinated-shutdown.terminate-actor-system=off is not a supported configuration combination.") + final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor") if (ConfigVersion != Version) @@ -571,12 +579,19 @@ abstract class ActorSystem extends ActorRefFactory { def registerOnTermination(code: Runnable): Unit /** - * Terminates this actor system. This will stop the guardian actor, which in turn - * will recursively stop all its child actors, the system guardian + * Terminates this actor system by running [[CoordinatedShutdown]] with reason + * [[CoordinatedShutdown.ActorSystemTerminateReason]]. + * + * If `akka.coordinated-shutdown.run-by-actor-system-terminate` is configured to `off` + * it will not run `CoordinatedShutdown`, but the `ActorSystem` and its actors + * will still be terminated. + * + * This will stop the guardian actor, which in turn + * will recursively stop all its child actors, and finally the system guardian * (below which the logging actors reside) and then execute all registered * termination handlers (see [[ActorSystem#registerOnTermination]]). * Be careful to not schedule any operations on completion of the returned future - * using the `dispatcher` of this actor system as it will have been shut down before the + * using the dispatcher of this actor system as it will have been shut down before the * future completes. */ def terminate(): Future[Terminated] @@ -682,6 +697,11 @@ abstract class ExtendedActorSystem extends ActorSystem { */ private[akka] def printTree: String + /** + * INTERNAL API: final step of `terminate()` + */ + @InternalApi private[akka] def finalTerminate(): Unit + } /** @@ -936,9 +956,25 @@ private[akka] class ActorSystemImpl( def registerOnTermination(code: Runnable): Unit = { terminationCallbacks.add(code) } override def terminate(): Future[Terminated] = { + if (settings.CoordinatedShutdownRunByActorSystemTerminate && !aborting) { + // Note that the combination CoordinatedShutdownRunByActorSystemTerminate==true && + // CoordinatedShutdownTerminateActorSystem==false is disallowed, checked in Settings. + // It's not a combination that is valuable to support and it would be complicated to + // protect against concurrency race conditions between calls to ActorSystem.terminate() + // and CoordinateShutdown.run() + + // it will call finalTerminate() at the end + CoordinatedShutdown(this).run(CoordinatedShutdown.ActorSystemTerminateReason) + } else { + finalTerminate() + } + whenTerminated + } + + override private[akka] def finalTerminate(): Unit = { + // these actions are idempotent if (!settings.LogDeadLettersDuringShutdown) logDeadLetterListener.foreach(stop) guardian.stop() - whenTerminated } @volatile var aborting = false @@ -946,8 +982,8 @@ private[akka] class ActorSystemImpl( /** * This kind of shutdown attempts to bring the system down and release its * resources more forcefully than plain shutdown. For example it will not - * wait for remote-deployed child actors to terminate before terminating their - * parents. + * run CoordinatedShutdown and not wait for remote-deployed child actors to + * terminate before terminating their parents. */ def abort(): Unit = { aborting = true diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index 6248345acb..372cc7cce3 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -120,10 +120,20 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi */ def unknownReason: Reason = UnknownReason + /** + * Scala API: The shutdown was initiated by ActorSystem.terminate. + */ + case object ActorSystemTerminateReason extends Reason + + /** + * Java API: The shutdown was initiated by ActorSystem.terminate. + */ + def actorSystemTerminateReason: Reason = ActorSystemTerminateReason + /** * Scala API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM. */ - object JvmExitReason extends Reason + case object JvmExitReason extends Reason /** * Java API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM. @@ -133,7 +143,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi /** * Scala API: The shutdown was initiated by Cluster downing. */ - object ClusterDowningReason extends Reason + case object ClusterDowningReason extends Reason /** * Java API: The shutdown was initiated by Cluster downing. @@ -143,7 +153,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi /** * Scala API: The shutdown was initiated by a failure to join a seed node. */ - object ClusterJoinUnsuccessfulReason extends Reason + case object ClusterJoinUnsuccessfulReason extends Reason /** * Java API: The shutdown was initiated by a failure to join a seed node. @@ -163,7 +173,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi /** * Scala API: The shutdown was initiated by Cluster leaving. */ - object ClusterLeavingReason extends Reason + case object ClusterLeavingReason extends Reason /** * Java API: The shutdown was initiated by Cluster leaving. @@ -211,7 +221,10 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi .getOrElse(conf) } - private def initPhaseActorSystemTerminate(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = { + private def initPhaseActorSystemTerminate( + system: ExtendedActorSystem, + conf: Config, + coord: CoordinatedShutdown): Unit = { coord.addTask(PhaseActorSystemTerminate, "terminate-system") { () => val confForReason = confWithOverrides(conf, coord.shutdownReason()) val terminateActorSystem = confForReason.getBoolean("terminate-actor-system") @@ -235,12 +248,11 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi } if (terminateActorSystem) { - system - .terminate() - .map { _ => - if (exitJvm && !runningJvmHook) System.exit(exitCode) - Done - }(ExecutionContexts.sameThreadExecutionContext) + system.finalTerminate() + system.whenTerminated.map { _ => + if (exitJvm && !runningJvmHook) System.exit(exitCode) + Done + }(ExecutionContexts.sameThreadExecutionContext) } else if (exitJvm) { System.exit(exitCode) Future.successful(Done) @@ -456,6 +468,7 @@ final class CoordinatedShutdown private[akka] ( if (runStarted.compareAndSet(None, Some(reason))) { implicit val ec = system.dispatchers.internalDispatcher val debugEnabled = log.isDebugEnabled + log.debug("Running CoordinatedShutdown with reason [{}]", reason) def loop(remainingPhases: List[String]): Future[Done] = { remainingPhases match { case Nil => Future.successful(Done) @@ -602,7 +615,7 @@ final class CoordinatedShutdown private[akka] ( * shutdown hooks the standard library JVM shutdown hooks APIs are better suited. */ @tailrec def addCancellableJvmShutdownHook[T](hook: => T): Cancellable = { - if (runStarted.get == None) { + if (runStarted.get.isEmpty) { val currentLatch = _jvmHooksLatch.get val newLatch = new CountDownLatch(currentLatch.getCount.toInt + 1) if (_jvmHooksLatch.compareAndSet(currentLatch, newLatch)) { diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index 1956ea27d0..1e1be8cae3 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -47,6 +47,7 @@ object ClusterShardingSpec { akka.cluster.sharding.number-of-shards = 10 akka.coordinated-shutdown.terminate-actor-system = off + akka.coordinated-shutdown.run-by-actor-system-terminate = off akka.actor { serialize-messages = off diff --git a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java index ecfc434267..c8f18a3d07 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java @@ -26,6 +26,7 @@ public class ClusterApiTest extends JUnitSuite { + "akka.remote.artery.canonical.hostname = 127.0.0.1 \n" + "akka.cluster.jmx.multi-mbeans-in-same-jvm = on \n" + "akka.coordinated-shutdown.terminate-actor-system = off \n" + + "akka.coordinated-shutdown.run-by-actor-system-terminate = off \n" + "akka.actor { \n" + " serialize-messages = off \n" + " allow-java-serialization = off \n" diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala index a58db1503c..3ddf16850e 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ActorSystemSpec.scala @@ -10,21 +10,25 @@ import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.Done +import akka.actor.CoordinatedShutdown import akka.actor.InvalidMessageException import akka.actor.testkit.typed.scaladsl.TestInbox import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.PostStop +import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.Behaviors import com.typesafe.config.ConfigFactory import org.scalatest._ import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.Span class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures with Eventually { - override implicit val patienceConfig = PatienceConfig(1.second) + implicit val patience: PatienceConfig = PatienceConfig(3.seconds, Span(100, org.scalatest.time.Millis)) + val config = ConfigFactory.parseString(""" akka.actor.provider = cluster akka.remote.classic.netty.tcp.port = 0 @@ -65,6 +69,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with } inbox.receiveAll() should ===("hello" :: Nil) sys.whenTerminated.futureValue + CoordinatedShutdown(sys.toUntyped).shutdownReason() should ===( + Some(CoordinatedShutdown.ActorSystemTerminateReason)) } } @@ -99,6 +105,8 @@ class ActorSystemSpec extends WordSpec with Matchers with BeforeAndAfterAll with // now we know that the guardian has started, and should receive PostStop sys.terminate() sys.whenTerminated.futureValue + CoordinatedShutdown(sys.toUntyped).shutdownReason() should ===( + Some(CoordinatedShutdown.ActorSystemTerminateReason)) inbox.receiveAll() should ===("done" :: Nil) } diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala index e404c981ec..1e0a386a9c 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala @@ -23,6 +23,7 @@ object ClusterApiSpec { akka.remote.artery.canonical.hostname = 127.0.0.1 akka.cluster.jmx.multi-mbeans-in-same-jvm = on akka.coordinated-shutdown.terminate-actor-system = off + akka.coordinated-shutdown.run-by-actor-system-terminate = off akka.actor { serialize-messages = off allow-java-serialization = off diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala index 85796bfe8e..f35bfab4c3 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala @@ -20,6 +20,7 @@ object ClusterSingletonPersistenceSpec { akka.remote.artery.canonical.hostname = 127.0.0.1 akka.coordinated-shutdown.terminate-actor-system = off + akka.coordinated-shutdown.run-by-actor-system-terminate = off akka.actor { serialize-messages = off diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialMembersOfNewDcSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialMembersOfNewDcSpec.scala index 59706f5b93..91b518cc70 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialMembersOfNewDcSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/InitialMembersOfNewDcSpec.scala @@ -14,7 +14,6 @@ object InitialMembersOfNewDcSpec extends MultiNodeConfig { commonConfig(ConfigFactory.parseString(s""" akka.actor.provider = cluster akka.actor.warn-about-java-serializer-usage = off - akka.coordinated-shutdown.terminate-actor-system = off akka.cluster { jmx.enabled = off debug.verbose-gossip-logging = on diff --git a/akka-docs/src/main/paradox/actors.md b/akka-docs/src/main/paradox/actors.md index c2f5642240..68e7e4ee0f 100644 --- a/akka-docs/src/main/paradox/actors.md +++ b/akka-docs/src/main/paradox/actors.md @@ -1095,6 +1095,8 @@ To enable a hard `System.exit` as a final action you can configure: akka.coordinated-shutdown.exit-jvm = on ``` +The coordinated shutdown process can also be started by calling `ActorSystem.terminate()`. + When using @ref:[Akka Cluster](cluster-usage.md) the `CoordinatedShutdown` will automatically run when the cluster node sees itself as `Exiting`, i.e. leaving from another node will trigger the shutdown process on the leaving node. Tasks for graceful leaving of cluster including graceful @@ -1125,6 +1127,7 @@ used in the test: ``` # Don't terminate ActorSystem via CoordinatedShutdown in tests akka.coordinated-shutdown.terminate-actor-system = off +akka.coordinated-shutdown.run-by-actor-system-terminate = off akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off akka.cluster.run-coordinated-shutdown-when-down = off ``` diff --git a/akka-docs/src/main/paradox/general/actor-systems.md b/akka-docs/src/main/paradox/general/actor-systems.md index 80537bcba9..83a1533064 100644 --- a/akka-docs/src/main/paradox/general/actor-systems.md +++ b/akka-docs/src/main/paradox/general/actor-systems.md @@ -115,9 +115,9 @@ while Akka does the heavy lifting under the hood. ## Terminating ActorSystem When you know everything is done for your application, you can call the -`terminate` method of `ActorSystem`. That will stop the guardian -actor, which in turn will recursively stop all its child actors, the system -guardian. +`terminate` method of `ActorSystem`. That will run @ref:[`CoordinatedShutdown`](../actors.md#coordinated-shutdown) +followed by stopping the guardian actor, which in turn will recursively stop all its child actors, +and finally the system guardian. If you want to execute some operations while terminating `ActorSystem`, look at @ref:[`CoordinatedShutdown`](../actors.md#coordinated-shutdown). diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index f5d184b4f5..783d657f40 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -153,3 +153,18 @@ The materialized value for `StreamRefs.sinkRef` and `StreamRefs.sourceRef` is no The configuration `akka.cluster.sharding.passivate-idle-entity-after` is now enabled by default. Sharding will passivate entities when they have not received any messages after this duration. Set + +## CoordinatedShutdown is run from ActorSystem.terminate + +No migration is needed but it is mentioned here because it is a change in behavior. + +When `ActorSystem.terminate()` is called, @ref:[`CoordinatedShutdown`](../actors.md#coordinated-shutdown) +will be run in Akka 2.6.x, which wasn't the case in 2.5.x. For example, if using Akka Cluster this means that +member will attempt to leave the cluster gracefully. + +If this is not desired behavior, for example in tests, you can disable this feature with the following configuration +and then it will behave as in Akka 2.5.x: + +``` +akka.coordinated-shutdown.run-by-actor-system-terminate = off +``` diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index 80d923b1b2..9f4028f32d 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -225,6 +225,7 @@ object MultiNodeSpec { loglevel = "WARNING" stdout-loglevel = "WARNING" coordinated-shutdown.terminate-actor-system = off + coordinated-shutdown.run-by-actor-system-terminate = off coordinated-shutdown.run-by-jvm-shutdown-hook = off actor { default-dispatcher {