Merge pull request #24049 from akka/wip-24048-coordinated-downing-patriknw
Run all CoordinatedShutdown phases also when downing, #24048
This commit is contained in:
commit
774bfee074
14 changed files with 331 additions and 53 deletions
|
|
@ -12,6 +12,7 @@ import akka.Done
|
||||||
import akka.testkit.{ AkkaSpec, TestKit }
|
import akka.testkit.{ AkkaSpec, TestKit }
|
||||||
import com.typesafe.config.{ Config, ConfigFactory }
|
import com.typesafe.config.{ Config, ConfigFactory }
|
||||||
import akka.actor.CoordinatedShutdown.Phase
|
import akka.actor.CoordinatedShutdown.Phase
|
||||||
|
import akka.actor.CoordinatedShutdown.UnknownReason
|
||||||
|
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
import scala.concurrent.Promise
|
import scala.concurrent.Promise
|
||||||
|
|
@ -42,6 +43,8 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case object CustomReason extends CoordinatedShutdown.Reason
|
||||||
|
|
||||||
"CoordinatedShutdown" must {
|
"CoordinatedShutdown" must {
|
||||||
|
|
||||||
"sort phases in topolgical order" in {
|
"sort phases in topolgical order" in {
|
||||||
|
|
@ -151,7 +154,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
testActor ! "C"
|
testActor ! "C"
|
||||||
Future.successful(Done)
|
Future.successful(Done)
|
||||||
}
|
}
|
||||||
Await.result(co.run(), remainingOrDefault)
|
Await.result(co.run(UnknownReason), remainingOrDefault)
|
||||||
receiveN(4) should ===(List("A", "B", "B", "C"))
|
receiveN(4) should ===(List("A", "B", "B", "C"))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -174,8 +177,9 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
testActor ! "C"
|
testActor ! "C"
|
||||||
Future.successful(Done)
|
Future.successful(Done)
|
||||||
}
|
}
|
||||||
Await.result(co.run(Some("b")), remainingOrDefault)
|
Await.result(co.run(CustomReason, Some("b")), remainingOrDefault)
|
||||||
receiveN(2) should ===(List("B", "C"))
|
receiveN(2) should ===(List("B", "C"))
|
||||||
|
co.shutdownReason() should ===(Some(CustomReason))
|
||||||
}
|
}
|
||||||
|
|
||||||
"only run once" in {
|
"only run once" in {
|
||||||
|
|
@ -186,11 +190,14 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
testActor ! "A"
|
testActor ! "A"
|
||||||
Future.successful(Done)
|
Future.successful(Done)
|
||||||
}
|
}
|
||||||
Await.result(co.run(), remainingOrDefault)
|
co.shutdownReason() should ===(None)
|
||||||
|
Await.result(co.run(CustomReason), remainingOrDefault)
|
||||||
|
co.shutdownReason() should ===(Some(CustomReason))
|
||||||
expectMsg("A")
|
expectMsg("A")
|
||||||
Await.result(co.run(), remainingOrDefault)
|
Await.result(co.run(UnknownReason), remainingOrDefault)
|
||||||
testActor ! "done"
|
testActor ! "done"
|
||||||
expectMsg("done") // no additional A
|
expectMsg("done") // no additional A
|
||||||
|
co.shutdownReason() should ===(Some(CustomReason))
|
||||||
}
|
}
|
||||||
|
|
||||||
"continue after timeout or failure" in {
|
"continue after timeout or failure" in {
|
||||||
|
|
@ -220,7 +227,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
testActor ! "C"
|
testActor ! "C"
|
||||||
Future.successful(Done)
|
Future.successful(Done)
|
||||||
}
|
}
|
||||||
Await.result(co.run(), remainingOrDefault)
|
Await.result(co.run(UnknownReason), remainingOrDefault)
|
||||||
expectMsg("A")
|
expectMsg("A")
|
||||||
expectMsg("A")
|
expectMsg("A")
|
||||||
expectMsg("B")
|
expectMsg("B")
|
||||||
|
|
@ -241,7 +248,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
testActor ! "C"
|
testActor ! "C"
|
||||||
Future.successful(Done)
|
Future.successful(Done)
|
||||||
}
|
}
|
||||||
val result = co.run()
|
val result = co.run(UnknownReason)
|
||||||
expectMsg("B")
|
expectMsg("B")
|
||||||
intercept[TimeoutException] {
|
intercept[TimeoutException] {
|
||||||
Await.result(result, remainingOrDefault)
|
Await.result(result, remainingOrDefault)
|
||||||
|
|
@ -263,7 +270,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
Future.successful(Done)
|
Future.successful(Done)
|
||||||
}
|
}
|
||||||
Await.result(co.run(), remainingOrDefault)
|
Await.result(co.run(UnknownReason), remainingOrDefault)
|
||||||
expectMsg("A")
|
expectMsg("A")
|
||||||
expectMsg("B")
|
expectMsg("B")
|
||||||
}
|
}
|
||||||
|
|
@ -291,8 +298,9 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
|
|
||||||
// this must be the last test, since it terminates the ActorSystem
|
// this must be the last test, since it terminates the ActorSystem
|
||||||
"terminate ActorSystem" in {
|
"terminate ActorSystem" in {
|
||||||
Await.result(CoordinatedShutdown(system).run(), 10.seconds) should ===(Done)
|
Await.result(CoordinatedShutdown(system).run(CustomReason), 10.seconds) should ===(Done)
|
||||||
system.whenTerminated.isCompleted should ===(true)
|
system.whenTerminated.isCompleted should ===(true)
|
||||||
|
CoordinatedShutdown(system).shutdownReason() === (Some(CustomReason))
|
||||||
}
|
}
|
||||||
|
|
||||||
"add and remove user JVM hooks with run-by-jvm-shutdown-hook = off, terminate-actor-system = off" in new JvmHookTest {
|
"add and remove user JVM hooks with run-by-jvm-shutdown-hook = off, terminate-actor-system = off" in new JvmHookTest {
|
||||||
|
|
@ -307,7 +315,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
val cancellable = CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(
|
val cancellable = CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(
|
||||||
println(s"User JVM hook from ${newSystem.name}")
|
println(s"User JVM hook from ${newSystem.name}")
|
||||||
)
|
)
|
||||||
myHooksCount should ===(2) // one user, one from system
|
myHooksCount should ===(1) // one user, none from system
|
||||||
cancellable.cancel()
|
cancellable.cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -345,7 +353,23 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
myHooksCount should ===(2) // one user, one from actor system
|
myHooksCount should ===(2) // one user, one from actor system
|
||||||
cancellable.cancel()
|
cancellable.cancel()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
"add and remove user JVM hooks with run-by-jvm-shutdown-hook = on, akka.jvm-shutdown-hooks = off" in new JvmHookTest {
|
||||||
|
lazy val systemName = s"CoordinatedShutdownSpec-JvmHooks-4-${System.currentTimeMillis()}"
|
||||||
|
lazy val systemConfig = ConfigFactory.parseString(
|
||||||
|
"""
|
||||||
|
akka.jvm-shutdown-hooks = off
|
||||||
|
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on
|
||||||
|
""")
|
||||||
|
|
||||||
|
def withSystemRunning(newSystem: ActorSystem): Unit = {
|
||||||
|
val cancellable = CoordinatedShutdown(newSystem).addCancellableJvmShutdownHook(
|
||||||
|
println(s"User JVM hook from ${newSystem.name}")
|
||||||
|
)
|
||||||
|
myHooksCount should ===(1) // one user, none from actor system
|
||||||
|
cancellable.cancel()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -358,13 +382,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
||||||
def systemConfig: Config
|
def systemConfig: Config
|
||||||
def withSystemRunning(system: ActorSystem): Unit
|
def withSystemRunning(system: ActorSystem): Unit
|
||||||
|
|
||||||
val newSystem = ActorSystem(
|
val newSystem = ActorSystem(systemName, systemConfig)
|
||||||
systemName,
|
|
||||||
ConfigFactory.parseString(
|
|
||||||
"""
|
|
||||||
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = on
|
|
||||||
akka.coordinated-shutdown.terminate-actor-system = on
|
|
||||||
"""))
|
|
||||||
|
|
||||||
withSystemRunning(newSystem)
|
withSystemRunning(newSystem)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -45,6 +45,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
|
||||||
|
|
||||||
getBoolean("akka.jvm-exit-on-fatal-error") should ===(true)
|
getBoolean("akka.jvm-exit-on-fatal-error") should ===(true)
|
||||||
settings.JvmExitOnFatalError should ===(true)
|
settings.JvmExitOnFatalError should ===(true)
|
||||||
|
settings.JvmShutdownHooks should ===(true)
|
||||||
|
|
||||||
getInt("akka.actor.deployment.default.virtual-nodes-factor") should ===(10)
|
getInt("akka.actor.deployment.default.virtual-nodes-factor") should ===(10)
|
||||||
settings.DefaultVirtualNodesFactor should ===(10)
|
settings.DefaultVirtualNodesFactor should ===(10)
|
||||||
|
|
|
||||||
|
|
@ -81,6 +81,12 @@ akka {
|
||||||
# such as OutOfMemoryError
|
# such as OutOfMemoryError
|
||||||
jvm-exit-on-fatal-error = on
|
jvm-exit-on-fatal-error = on
|
||||||
|
|
||||||
|
# Akka installs JVM shutdown hooks by default, e.g. in CoordinatedShutdown and Artery.
|
||||||
|
# This property makes it possible to disable all such hooks if the application itself
|
||||||
|
# or a higher level framework such as Play prefers to install the JVM shutdown hook and
|
||||||
|
# terminate the ActorSystem itself, with or without using CoordinatedShutdown.
|
||||||
|
jvm-shutdown-hooks = on
|
||||||
|
|
||||||
actor {
|
actor {
|
||||||
|
|
||||||
# Either one of "local", "remote" or "cluster" or the
|
# Either one of "local", "remote" or "cluster" or the
|
||||||
|
|
|
||||||
|
|
@ -363,6 +363,7 @@ object ActorSystem {
|
||||||
final val SchedulerClass: String = getString("akka.scheduler.implementation")
|
final val SchedulerClass: String = getString("akka.scheduler.implementation")
|
||||||
final val Daemonicity: Boolean = getBoolean("akka.daemonic")
|
final val Daemonicity: Boolean = getBoolean("akka.daemonic")
|
||||||
final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error")
|
final val JvmExitOnFatalError: Boolean = getBoolean("akka.jvm-exit-on-fatal-error")
|
||||||
|
final val JvmShutdownHooks: Boolean = getBoolean("akka.jvm-shutdown-hooks")
|
||||||
|
|
||||||
final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor")
|
final val DefaultVirtualNodesFactor: Int = getInt("akka.actor.deployment.default.virtual-nodes-factor")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,6 @@ import scala.concurrent.duration._
|
||||||
import scala.compat.java8.FutureConverters._
|
import scala.compat.java8.FutureConverters._
|
||||||
import scala.compat.java8.OptionConverters._
|
import scala.compat.java8.OptionConverters._
|
||||||
import java.util.concurrent._
|
import java.util.concurrent._
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
|
||||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
|
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
|
|
@ -99,6 +98,54 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
|
||||||
*/
|
*/
|
||||||
val PhaseActorSystemTerminate = "actor-system-terminate"
|
val PhaseActorSystemTerminate = "actor-system-terminate"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reason for the shutdown, which can be used by tasks in case they need to do
|
||||||
|
* different things depending on what caused the shutdown. There are some
|
||||||
|
* predefined reasons, but external libraries applications may also define
|
||||||
|
* other reasons.
|
||||||
|
*/
|
||||||
|
trait Reason
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scala API: The reason for the shutdown was unknown. Needed for backwards compatibility.
|
||||||
|
*/
|
||||||
|
case object UnknownReason extends Reason
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API: The reason for the shutdown was unknown. Needed for backwards compatibility.
|
||||||
|
*/
|
||||||
|
def unknownReason: Reason = UnknownReason
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scala API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM.
|
||||||
|
*/
|
||||||
|
object JvmExitReason extends Reason
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM.
|
||||||
|
*/
|
||||||
|
def jvmExitReason: Reason = JvmExitReason
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scala API: The shutdown was initiated by Cluster downing.
|
||||||
|
*/
|
||||||
|
object ClusterDowningReason extends Reason
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API: The shutdown was initiated by Cluster downing.
|
||||||
|
*/
|
||||||
|
def clusterDowningReason: Reason = ClusterDowningReason
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Scala API: The shutdown was initiated by Cluster leaving.
|
||||||
|
*/
|
||||||
|
object ClusterLeavingReason extends Reason
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java API: The shutdown was initiated by Cluster leaving.
|
||||||
|
*/
|
||||||
|
def clusterLeavingReason: Reason = ClusterLeavingReason
|
||||||
|
|
||||||
@volatile private var runningJvmHook = false
|
@volatile private var runningJvmHook = false
|
||||||
|
|
||||||
override def get(system: ActorSystem): CoordinatedShutdown = super.get(system)
|
override def get(system: ActorSystem): CoordinatedShutdown = super.get(system)
|
||||||
|
|
@ -159,7 +206,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
|
||||||
}
|
}
|
||||||
|
|
||||||
private def initJvmHook(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = {
|
private def initJvmHook(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = {
|
||||||
val runByJvmShutdownHook = conf.getBoolean("run-by-jvm-shutdown-hook")
|
val runByJvmShutdownHook = system.settings.JvmShutdownHooks && conf.getBoolean("run-by-jvm-shutdown-hook")
|
||||||
if (runByJvmShutdownHook) {
|
if (runByJvmShutdownHook) {
|
||||||
coord.actorSystemJvmHook = OptionVal.Some(coord.addCancellableJvmShutdownHook {
|
coord.actorSystemJvmHook = OptionVal.Some(coord.addCancellableJvmShutdownHook {
|
||||||
runningJvmHook = true // avoid System.exit from PhaseActorSystemTerminate task
|
runningJvmHook = true // avoid System.exit from PhaseActorSystemTerminate task
|
||||||
|
|
@ -168,7 +215,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
|
||||||
try {
|
try {
|
||||||
// totalTimeout will be 0 when no tasks registered, so at least 3.seconds
|
// totalTimeout will be 0 when no tasks registered, so at least 3.seconds
|
||||||
val totalTimeout = coord.totalTimeout().max(3.seconds)
|
val totalTimeout = coord.totalTimeout().max(3.seconds)
|
||||||
Await.ready(coord.run(), totalTimeout)
|
Await.ready(coord.run(JvmExitReason), totalTimeout)
|
||||||
} catch {
|
} catch {
|
||||||
case NonFatal(e) ⇒
|
case NonFatal(e) ⇒
|
||||||
coord.log.warning(
|
coord.log.warning(
|
||||||
|
|
@ -246,6 +293,9 @@ final class CoordinatedShutdown private[akka] (
|
||||||
system: ExtendedActorSystem,
|
system: ExtendedActorSystem,
|
||||||
phases: Map[String, CoordinatedShutdown.Phase]) extends Extension {
|
phases: Map[String, CoordinatedShutdown.Phase]) extends Extension {
|
||||||
import CoordinatedShutdown.Phase
|
import CoordinatedShutdown.Phase
|
||||||
|
import CoordinatedShutdown.Reason
|
||||||
|
import CoordinatedShutdown.UnknownReason
|
||||||
|
import CoordinatedShutdown.JvmExitReason
|
||||||
|
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
private[akka] val log = Logging(system, getClass)
|
private[akka] val log = Logging(system, getClass)
|
||||||
|
|
@ -253,10 +303,10 @@ final class CoordinatedShutdown private[akka] (
|
||||||
/** INTERNAL API */
|
/** INTERNAL API */
|
||||||
private[akka] val orderedPhases = CoordinatedShutdown.topologicalSort(phases)
|
private[akka] val orderedPhases = CoordinatedShutdown.topologicalSort(phases)
|
||||||
private val tasks = new ConcurrentHashMap[String, Vector[(String, () ⇒ Future[Done])]]
|
private val tasks = new ConcurrentHashMap[String, Vector[(String, () ⇒ Future[Done])]]
|
||||||
private val runStarted = new AtomicBoolean(false)
|
private val runStarted = new AtomicReference[Option[Reason]](None)
|
||||||
private val runPromise = Promise[Done]()
|
private val runPromise = Promise[Done]()
|
||||||
|
|
||||||
private var _jvmHooksLatch = new AtomicReference[CountDownLatch](new CountDownLatch(0))
|
private val _jvmHooksLatch = new AtomicReference[CountDownLatch](new CountDownLatch(0))
|
||||||
@volatile private var actorSystemJvmHook: OptionVal[Cancellable] = OptionVal.None
|
@volatile private var actorSystemJvmHook: OptionVal[Cancellable] = OptionVal.None
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -306,33 +356,51 @@ final class CoordinatedShutdown private[akka] (
|
||||||
def addTask(phase: String, taskName: String, task: Supplier[CompletionStage[Done]]): Unit =
|
def addTask(phase: String, taskName: String, task: Supplier[CompletionStage[Done]]): Unit =
|
||||||
addTask(phase, taskName)(() ⇒ task.get().toScala)
|
addTask(phase, taskName)(() ⇒ task.get().toScala)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The `Reason` for the shutdown as passed to the `run` method. `None` if the shutdown
|
||||||
|
* has not been started.
|
||||||
|
*/
|
||||||
|
def shutdownReason(): Option[Reason] = runStarted.get()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The `Reason` for the shutdown as passed to the `run` method. `Optional.empty` if the shutdown
|
||||||
|
* has not been started.
|
||||||
|
*/
|
||||||
|
def getShutdownReason(): Optional[Reason] = shutdownReason().asJava
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scala API: Run tasks of all phases. The returned
|
* Scala API: Run tasks of all phases. The returned
|
||||||
* `Future` is completed when all tasks have been completed,
|
* `Future` is completed when all tasks have been completed,
|
||||||
* or there is a failure when recovery is disabled.
|
* or there is a failure when recovery is disabled.
|
||||||
*
|
*
|
||||||
* It's safe to call this method multiple times. It will only run the once.
|
* It's safe to call this method multiple times. It will only run the shutdown sequence once.
|
||||||
*/
|
*/
|
||||||
def run(): Future[Done] = run(None)
|
def run(reason: Reason): Future[Done] = run(reason, None)
|
||||||
|
|
||||||
|
@deprecated("Use the method with `reason` parameter instead", since = "2.5.8")
|
||||||
|
def run(): Future[Done] = run(UnknownReason)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: Run tasks of all phases. The returned
|
* Java API: Run tasks of all phases. The returned
|
||||||
* `CompletionStage` is completed when all tasks have been completed,
|
* `CompletionStage` is completed when all tasks have been completed,
|
||||||
* or there is a failure when recovery is disabled.
|
* or there is a failure when recovery is disabled.
|
||||||
*
|
*
|
||||||
* It's safe to call this method multiple times. It will only run the once.
|
* It's safe to call this method multiple times. It will only run the shutdown sequence once.
|
||||||
*/
|
*/
|
||||||
def runAll(): CompletionStage[Done] = run().toJava
|
def runAll(reason: Reason): CompletionStage[Done] = run(reason).toJava
|
||||||
|
|
||||||
|
@deprecated("Use the method with `reason` parameter instead", since = "2.5.8")
|
||||||
|
def runAll(): CompletionStage[Done] = runAll(UnknownReason)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scala API: Run tasks of all phases including and after the given phase.
|
* Scala API: Run tasks of all phases including and after the given phase.
|
||||||
* The returned `Future` is completed when all such tasks have been completed,
|
* The returned `Future` is completed when all such tasks have been completed,
|
||||||
* or there is a failure when recovery is disabled.
|
* or there is a failure when recovery is disabled.
|
||||||
*
|
*
|
||||||
* It's safe to call this method multiple times. It will only run the once.
|
* It's safe to call this method multiple times. It will only run shutdown sequence once.
|
||||||
*/
|
*/
|
||||||
def run(fromPhase: Option[String]): Future[Done] = {
|
def run(reason: Reason, fromPhase: Option[String]): Future[Done] = {
|
||||||
if (runStarted.compareAndSet(false, true)) {
|
if (runStarted.compareAndSet(None, Some(reason))) {
|
||||||
import system.dispatcher
|
import system.dispatcher
|
||||||
val debugEnabled = log.isDebugEnabled
|
val debugEnabled = log.isDebugEnabled
|
||||||
def loop(remainingPhases: List[String]): Future[Done] = {
|
def loop(remainingPhases: List[String]): Future[Done] = {
|
||||||
|
|
@ -409,15 +477,23 @@ final class CoordinatedShutdown private[akka] (
|
||||||
runPromise.future
|
runPromise.future
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@deprecated("Use the method with `reason` parameter instead", since = "2.5.8")
|
||||||
|
def run(fromPhase: Option[String]): Future[Done] =
|
||||||
|
run(UnknownReason, fromPhase)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: Run tasks of all phases including and after the given phase.
|
* Java API: Run tasks of all phases including and after the given phase.
|
||||||
* The returned `CompletionStage` is completed when all such tasks have been completed,
|
* The returned `CompletionStage` is completed when all such tasks have been completed,
|
||||||
* or there is a failure when recovery is disabled.
|
* or there is a failure when recovery is disabled.
|
||||||
*
|
*
|
||||||
* It's safe to call this method multiple times. It will only run once.
|
* It's safe to call this method multiple times. It will only run the shutdown sequence once.
|
||||||
*/
|
*/
|
||||||
|
def run(reason: Reason, fromPhase: Optional[String]): CompletionStage[Done] =
|
||||||
|
run(reason, fromPhase.asScala).toJava
|
||||||
|
|
||||||
|
@deprecated("Use the method with `reason` parameter instead", since = "2.5.8")
|
||||||
def run(fromPhase: Optional[String]): CompletionStage[Done] =
|
def run(fromPhase: Optional[String]): CompletionStage[Done] =
|
||||||
run(fromPhase.asScala).toJava
|
run(UnknownReason, fromPhase)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The configured timeout for a given `phase`.
|
* The configured timeout for a given `phase`.
|
||||||
|
|
@ -462,7 +538,7 @@ final class CoordinatedShutdown private[akka] (
|
||||||
* shutdown hooks the standard library JVM shutdown hooks APIs are better suited.
|
* shutdown hooks the standard library JVM shutdown hooks APIs are better suited.
|
||||||
*/
|
*/
|
||||||
@tailrec def addCancellableJvmShutdownHook[T](hook: ⇒ T): Cancellable = {
|
@tailrec def addCancellableJvmShutdownHook[T](hook: ⇒ T): Cancellable = {
|
||||||
if (!runStarted.get) {
|
if (runStarted.get == None) {
|
||||||
val currentLatch = _jvmHooksLatch.get
|
val currentLatch = _jvmHooksLatch.get
|
||||||
val newLatch = new CountDownLatch(currentLatch.getCount.toInt + 1)
|
val newLatch = new CountDownLatch(currentLatch.getCount.toInt + 1)
|
||||||
if (_jvmHooksLatch.compareAndSet(currentLatch, newLatch)) {
|
if (_jvmHooksLatch.compareAndSet(currentLatch, newLatch)) {
|
||||||
|
|
|
||||||
|
|
@ -406,8 +406,12 @@ private[akka] class ShardRegion(
|
||||||
CoordinatedShutdown(context.system).addTask(
|
CoordinatedShutdown(context.system).addTask(
|
||||||
CoordinatedShutdown.PhaseClusterShardingShutdownRegion,
|
CoordinatedShutdown.PhaseClusterShardingShutdownRegion,
|
||||||
"region-shutdown") { () ⇒
|
"region-shutdown") { () ⇒
|
||||||
self ! GracefulShutdown
|
if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) {
|
||||||
gracefulShutdownProgress.future
|
Future.successful(Done)
|
||||||
|
} else {
|
||||||
|
self ! GracefulShutdown
|
||||||
|
gracefulShutdownProgress.future
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// subscribe to MemberEvent, re-subscribe when restart
|
// subscribe to MemberEvent, re-subscribe when restart
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,151 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
package akka.cluster.sharding
|
||||||
|
|
||||||
|
import scala.concurrent.Future
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import akka.Done
|
||||||
|
import akka.actor.ActorSystem
|
||||||
|
import akka.actor.CoordinatedShutdown
|
||||||
|
import akka.actor.Props
|
||||||
|
import akka.cluster.Cluster
|
||||||
|
import akka.cluster.MemberStatus
|
||||||
|
import akka.testkit.AkkaSpec
|
||||||
|
import akka.testkit.TestActors.EchoActor
|
||||||
|
import akka.testkit.TestProbe
|
||||||
|
|
||||||
|
object CoordinatedShutdownShardingSpec {
|
||||||
|
val config =
|
||||||
|
"""
|
||||||
|
akka.loglevel = INFO
|
||||||
|
akka.actor.provider = "cluster"
|
||||||
|
akka.remote.netty.tcp.port = 0
|
||||||
|
akka.remote.artery.canonical.port = 0
|
||||||
|
"""
|
||||||
|
|
||||||
|
val extractEntityId: ShardRegion.ExtractEntityId = {
|
||||||
|
case msg: Int ⇒ (msg.toString, msg)
|
||||||
|
}
|
||||||
|
|
||||||
|
val extractShardId: ShardRegion.ExtractShardId = {
|
||||||
|
case msg: Int ⇒ (msg % 10).toString
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardingSpec.config) {
|
||||||
|
import CoordinatedShutdownShardingSpec._
|
||||||
|
|
||||||
|
val sys1 = ActorSystem(system.name, system.settings.config)
|
||||||
|
val sys2 = ActorSystem(system.name, system.settings.config)
|
||||||
|
val sys3 = system
|
||||||
|
|
||||||
|
val region1 = ClusterSharding(sys1).start("type1", Props[EchoActor](), ClusterShardingSettings(sys1),
|
||||||
|
extractEntityId, extractShardId)
|
||||||
|
val region2 = ClusterSharding(sys2).start("type1", Props[EchoActor](), ClusterShardingSettings(sys2),
|
||||||
|
extractEntityId, extractShardId)
|
||||||
|
val region3 = ClusterSharding(sys3).start("type1", Props[EchoActor](), ClusterShardingSettings(sys3),
|
||||||
|
extractEntityId, extractShardId)
|
||||||
|
|
||||||
|
val probe1 = TestProbe()(sys1)
|
||||||
|
val probe2 = TestProbe()(sys2)
|
||||||
|
val probe3 = TestProbe()(sys3)
|
||||||
|
|
||||||
|
CoordinatedShutdown(sys1).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒
|
||||||
|
probe1.ref ! "CS-unbind-1"
|
||||||
|
Future.successful(Done)
|
||||||
|
}
|
||||||
|
CoordinatedShutdown(sys2).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒
|
||||||
|
probe2.ref ! "CS-unbind-2"
|
||||||
|
Future.successful(Done)
|
||||||
|
}
|
||||||
|
CoordinatedShutdown(sys3).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { () ⇒
|
||||||
|
probe1.ref ! "CS-unbind-3"
|
||||||
|
Future.successful(Done)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def beforeTermination(): Unit = {
|
||||||
|
shutdown(sys1)
|
||||||
|
shutdown(sys2)
|
||||||
|
}
|
||||||
|
|
||||||
|
def pingEntities(): Unit = {
|
||||||
|
region3.tell(1, probe3.ref)
|
||||||
|
probe3.expectMsg(10.seconds, 1)
|
||||||
|
region3.tell(2, probe3.ref)
|
||||||
|
probe3.expectMsg(2)
|
||||||
|
region3.tell(3, probe3.ref)
|
||||||
|
probe3.expectMsg(3)
|
||||||
|
}
|
||||||
|
|
||||||
|
"Sharding and CoordinatedShutdown" must {
|
||||||
|
"init cluster" in {
|
||||||
|
Cluster(sys1).join(Cluster(sys1).selfAddress) // coordinator will initially run on sys2
|
||||||
|
awaitAssert(Cluster(sys1).selfMember.status should ===(MemberStatus.Up))
|
||||||
|
|
||||||
|
Cluster(sys2).join(Cluster(sys1).selfAddress)
|
||||||
|
within(10.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
Cluster(sys1).state.members.size should ===(2)
|
||||||
|
Cluster(sys1).state.members.map(_.status) should ===(Set(MemberStatus.Up))
|
||||||
|
Cluster(sys2).state.members.size should ===(2)
|
||||||
|
Cluster(sys2).state.members.map(_.status) should ===(Set(MemberStatus.Up))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Cluster(sys3).join(Cluster(sys1).selfAddress)
|
||||||
|
within(10.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
Cluster(sys1).state.members.size should ===(3)
|
||||||
|
Cluster(sys1).state.members.map(_.status) should ===(Set(MemberStatus.Up))
|
||||||
|
Cluster(sys2).state.members.size should ===(3)
|
||||||
|
Cluster(sys2).state.members.map(_.status) should ===(Set(MemberStatus.Up))
|
||||||
|
Cluster(sys3).state.members.size should ===(3)
|
||||||
|
Cluster(sys3).state.members.map(_.status) should ===(Set(MemberStatus.Up))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pingEntities()
|
||||||
|
}
|
||||||
|
|
||||||
|
"run coordinated shutdown when leaving" in {
|
||||||
|
Cluster(sys3).leave(Cluster(sys1).selfAddress)
|
||||||
|
probe1.expectMsg("CS-unbind-1")
|
||||||
|
|
||||||
|
within(10.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
Cluster(sys2).state.members.size should ===(2)
|
||||||
|
Cluster(sys3).state.members.size should ===(2)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
within(10.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
Cluster(sys1).isTerminated should ===(true)
|
||||||
|
sys1.whenTerminated.isCompleted should ===(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pingEntities()
|
||||||
|
}
|
||||||
|
|
||||||
|
"run coordinated shutdown when downing" in {
|
||||||
|
Cluster(sys3).down(Cluster(sys2).selfAddress)
|
||||||
|
probe2.expectMsg("CS-unbind-2")
|
||||||
|
|
||||||
|
within(10.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
Cluster(system).state.members.size should ===(1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
within(10.seconds) {
|
||||||
|
awaitAssert {
|
||||||
|
Cluster(sys2).isTerminated should ===(true)
|
||||||
|
sys2.whenTerminated.isCompleted should ===(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pingEntities()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -13,6 +13,8 @@ object GetShardTypeNamesSpec {
|
||||||
"""
|
"""
|
||||||
akka.loglevel = INFO
|
akka.loglevel = INFO
|
||||||
akka.actor.provider = "cluster"
|
akka.actor.provider = "cluster"
|
||||||
|
akka.remote.netty.tcp.port = 0
|
||||||
|
akka.remote.artery.canonical.port = 0
|
||||||
"""
|
"""
|
||||||
|
|
||||||
val extractEntityId: ShardRegion.ExtractEntityId = {
|
val extractEntityId: ShardRegion.ExtractEntityId = {
|
||||||
|
|
|
||||||
|
|
@ -5,9 +5,10 @@
|
||||||
package akka.cluster.singleton
|
package akka.cluster.singleton
|
||||||
|
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
import scala.concurrent.Future
|
||||||
|
|
||||||
import akka.actor.Actor
|
import akka.actor.Actor
|
||||||
import akka.actor.Deploy
|
import akka.actor.Deploy
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
|
|
@ -26,8 +27,8 @@ import akka.AkkaException
|
||||||
import akka.actor.NoSerializationVerificationNeeded
|
import akka.actor.NoSerializationVerificationNeeded
|
||||||
import akka.cluster.UniqueAddress
|
import akka.cluster.UniqueAddress
|
||||||
import akka.cluster.ClusterEvent
|
import akka.cluster.ClusterEvent
|
||||||
|
|
||||||
import scala.concurrent.Promise
|
import scala.concurrent.Promise
|
||||||
|
|
||||||
import akka.Done
|
import akka.Done
|
||||||
import akka.actor.CoordinatedShutdown
|
import akka.actor.CoordinatedShutdown
|
||||||
import akka.annotation.DoNotInherit
|
import akka.annotation.DoNotInherit
|
||||||
|
|
@ -254,8 +255,12 @@ object ClusterSingletonManager {
|
||||||
// should preferably complete before stopping the singleton sharding coordinator on same node.
|
// should preferably complete before stopping the singleton sharding coordinator on same node.
|
||||||
val coordShutdown = CoordinatedShutdown(context.system)
|
val coordShutdown = CoordinatedShutdown(context.system)
|
||||||
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-1") { () ⇒
|
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-1") { () ⇒
|
||||||
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting))
|
if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) {
|
||||||
self.ask(SelfExiting).mapTo[Done]
|
Future.successful(Done)
|
||||||
|
} else {
|
||||||
|
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting))
|
||||||
|
self.ask(SelfExiting).mapTo[Done]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
override def postStop(): Unit = cluster.unsubscribe(self)
|
override def postStop(): Unit = cluster.unsubscribe(self)
|
||||||
|
|
@ -468,11 +473,19 @@ class ClusterSingletonManager(
|
||||||
// for CoordinatedShutdown
|
// for CoordinatedShutdown
|
||||||
val coordShutdown = CoordinatedShutdown(context.system)
|
val coordShutdown = CoordinatedShutdown(context.system)
|
||||||
val memberExitingProgress = Promise[Done]()
|
val memberExitingProgress = Promise[Done]()
|
||||||
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-singleton-exiting")(() ⇒
|
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-singleton-exiting") { () ⇒
|
||||||
memberExitingProgress.future)
|
if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down)
|
||||||
|
Future.successful(Done)
|
||||||
|
else
|
||||||
|
memberExitingProgress.future
|
||||||
|
}
|
||||||
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-2") { () ⇒
|
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-2") { () ⇒
|
||||||
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting))
|
if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) {
|
||||||
self.ask(SelfExiting).mapTo[Done]
|
Future.successful(Done)
|
||||||
|
} else {
|
||||||
|
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting))
|
||||||
|
self.ask(SelfExiting).mapTo[Done]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def logInfo(message: String): Unit =
|
def logInfo(message: String): Unit =
|
||||||
|
|
|
||||||
|
|
@ -176,7 +176,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
|
||||||
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterLeave, "leave") {
|
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterLeave, "leave") {
|
||||||
val sys = context.system
|
val sys = context.system
|
||||||
() ⇒
|
() ⇒
|
||||||
if (Cluster(sys).isTerminated)
|
if (Cluster(sys).isTerminated || Cluster(sys).selfMember.status == Down)
|
||||||
Future.successful(Done)
|
Future.successful(Done)
|
||||||
else {
|
else {
|
||||||
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterLeave))
|
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterLeave))
|
||||||
|
|
@ -190,8 +190,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
|
||||||
override def postStop(): Unit = {
|
override def postStop(): Unit = {
|
||||||
clusterShutdown.trySuccess(Done)
|
clusterShutdown.trySuccess(Done)
|
||||||
if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) {
|
if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) {
|
||||||
// run the last phases e.g. if node was downed (not leaving)
|
// if it was stopped due to leaving CoordinatedShutdown was started earlier
|
||||||
coordShutdown.run(Some(CoordinatedShutdown.PhaseClusterShutdown))
|
coordShutdown.run(CoordinatedShutdown.ClusterDowningReason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -325,7 +325,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExitingDone, "exiting-completed") {
|
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExitingDone, "exiting-completed") {
|
||||||
val sys = context.system
|
val sys = context.system
|
||||||
() ⇒
|
() ⇒
|
||||||
if (Cluster(sys).isTerminated)
|
if (Cluster(sys).isTerminated || Cluster(sys).selfMember.status == Down)
|
||||||
Future.successful(Done)
|
Future.successful(Done)
|
||||||
else {
|
else {
|
||||||
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExitingDone))
|
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExitingDone))
|
||||||
|
|
@ -443,7 +443,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
"shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.",
|
"shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.",
|
||||||
seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes)
|
seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes)
|
||||||
joinSeedNodesDeadline = None
|
joinSeedNodesDeadline = None
|
||||||
CoordinatedShutdown(context.system).run()
|
CoordinatedShutdown(context.system).run(CoordinatedShutdown.ClusterDowningReason)
|
||||||
}
|
}
|
||||||
|
|
||||||
def becomeUninitialized(): Unit = {
|
def becomeUninitialized(): Unit = {
|
||||||
|
|
@ -922,7 +922,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
exitingTasksInProgress = true
|
exitingTasksInProgress = true
|
||||||
logInfo("Exiting, starting coordinated shutdown")
|
logInfo("Exiting, starting coordinated shutdown")
|
||||||
selfExiting.trySuccess(Done)
|
selfExiting.trySuccess(Done)
|
||||||
coordShutdown.run()
|
coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (talkback) {
|
if (talkback) {
|
||||||
|
|
@ -1105,7 +1105,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
||||||
exitingTasksInProgress = true
|
exitingTasksInProgress = true
|
||||||
logInfo("Exiting (leader), starting coordinated shutdown")
|
logInfo("Exiting (leader), starting coordinated shutdown")
|
||||||
selfExiting.trySuccess(Done)
|
selfExiting.trySuccess(Done)
|
||||||
coordShutdown.run()
|
coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason)
|
||||||
}
|
}
|
||||||
|
|
||||||
exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
|
exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
|
||||||
|
|
|
||||||
|
|
@ -158,7 +158,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
||||||
Cluster(sys2).join(Cluster(sys2).selfAddress)
|
Cluster(sys2).join(Cluster(sys2).selfAddress)
|
||||||
probe.expectMsgType[MemberUp]
|
probe.expectMsgType[MemberUp]
|
||||||
|
|
||||||
CoordinatedShutdown(sys2).run()
|
CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason)
|
||||||
probe.expectMsgType[MemberLeft]
|
probe.expectMsgType[MemberLeft]
|
||||||
probe.expectMsgType[MemberExited]
|
probe.expectMsgType[MemberExited]
|
||||||
probe.expectMsgType[MemberRemoved]
|
probe.expectMsgType[MemberRemoved]
|
||||||
|
|
@ -187,6 +187,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
||||||
probe.expectMsgType[MemberRemoved]
|
probe.expectMsgType[MemberRemoved]
|
||||||
Await.result(sys2.whenTerminated, 10.seconds)
|
Await.result(sys2.whenTerminated, 10.seconds)
|
||||||
Cluster(sys2).isTerminated should ===(true)
|
Cluster(sys2).isTerminated should ===(true)
|
||||||
|
CoordinatedShutdown(sys2).shutdownReason() should ===(Some(CoordinatedShutdown.ClusterLeavingReason))
|
||||||
} finally {
|
} finally {
|
||||||
shutdown(sys2)
|
shutdown(sys2)
|
||||||
}
|
}
|
||||||
|
|
@ -212,6 +213,7 @@ akka.loglevel=DEBUG
|
||||||
probe.expectMsgType[MemberRemoved]
|
probe.expectMsgType[MemberRemoved]
|
||||||
Await.result(sys3.whenTerminated, 10.seconds)
|
Await.result(sys3.whenTerminated, 10.seconds)
|
||||||
Cluster(sys3).isTerminated should ===(true)
|
Cluster(sys3).isTerminated should ===(true)
|
||||||
|
CoordinatedShutdown(sys3).shutdownReason() should ===(Some(CoordinatedShutdown.ClusterDowningReason))
|
||||||
} finally {
|
} finally {
|
||||||
shutdown(sys3)
|
shutdown(sys3)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -832,7 +832,8 @@ public class ActorDocTest extends AbstractJavaTest {
|
||||||
// don't run this
|
// don't run this
|
||||||
if (false) {
|
if (false) {
|
||||||
//#coordinated-shutdown-run
|
//#coordinated-shutdown-run
|
||||||
CompletionStage<Done> done = CoordinatedShutdown.get(system).runAll();
|
CompletionStage<Done> done = CoordinatedShutdown.get(system).runAll(
|
||||||
|
CoordinatedShutdown.unknownReason());
|
||||||
//#coordinated-shutdown-run
|
//#coordinated-shutdown-run
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -756,7 +756,7 @@ class ActorDocSpec extends AkkaSpec("""
|
||||||
// don't run this
|
// don't run this
|
||||||
def dummy(): Unit = {
|
def dummy(): Unit = {
|
||||||
//#coordinated-shutdown-run
|
//#coordinated-shutdown-run
|
||||||
val done: Future[Done] = CoordinatedShutdown(system).run()
|
val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason)
|
||||||
//#coordinated-shutdown-run
|
//#coordinated-shutdown-run
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -407,7 +407,9 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
||||||
override def settings: ArterySettings = provider.remoteSettings.Artery
|
override def settings: ArterySettings = provider.remoteSettings.Artery
|
||||||
|
|
||||||
override def start(): Unit = {
|
override def start(): Unit = {
|
||||||
Runtime.getRuntime.addShutdownHook(shutdownHook)
|
if (system.settings.JvmShutdownHooks)
|
||||||
|
Runtime.getRuntime.addShutdownHook(shutdownHook)
|
||||||
|
|
||||||
startMediaDriver()
|
startMediaDriver()
|
||||||
startAeron()
|
startAeron()
|
||||||
topLevelFREvents.loFreq(Transport_AeronStarted, NoMetaData)
|
topLevelFREvents.loFreq(Transport_AeronStarted, NoMetaData)
|
||||||
|
|
@ -877,7 +879,8 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
|
||||||
override def shutdown(): Future[Done] = {
|
override def shutdown(): Future[Done] = {
|
||||||
if (hasBeenShutdown.compareAndSet(false, true)) {
|
if (hasBeenShutdown.compareAndSet(false, true)) {
|
||||||
log.debug("Shutting down [{}]", localAddress)
|
log.debug("Shutting down [{}]", localAddress)
|
||||||
Runtime.getRuntime.removeShutdownHook(shutdownHook)
|
if (system.settings.JvmShutdownHooks)
|
||||||
|
Runtime.getRuntime.removeShutdownHook(shutdownHook)
|
||||||
val allAssociations = associationRegistry.allAssociations
|
val allAssociations = associationRegistry.allAssociations
|
||||||
val flushing: Future[Done] =
|
val flushing: Future[Done] =
|
||||||
if (allAssociations.isEmpty) Future.successful(Done)
|
if (allAssociations.isEmpty) Future.successful(Done)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue