add Reason to CoordinatedShutdown, #24048
This commit is contained in:
parent
fa3da328be
commit
e49acb7daa
6 changed files with 116 additions and 28 deletions
|
|
@ -12,6 +12,7 @@ import akka.Done
|
|||
import akka.testkit.{ AkkaSpec, TestKit }
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
import akka.actor.CoordinatedShutdown.Phase
|
||||
import akka.actor.CoordinatedShutdown.UnknownReason
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.concurrent.Promise
|
||||
|
|
@ -42,6 +43,8 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
|||
result
|
||||
}
|
||||
|
||||
case object CustomReason extends CoordinatedShutdown.Reason
|
||||
|
||||
"CoordinatedShutdown" must {
|
||||
|
||||
"sort phases in topolgical order" in {
|
||||
|
|
@ -151,7 +154,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
|||
testActor ! "C"
|
||||
Future.successful(Done)
|
||||
}
|
||||
Await.result(co.run(), remainingOrDefault)
|
||||
Await.result(co.run(UnknownReason), remainingOrDefault)
|
||||
receiveN(4) should ===(List("A", "B", "B", "C"))
|
||||
}
|
||||
|
||||
|
|
@ -174,8 +177,9 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
|||
testActor ! "C"
|
||||
Future.successful(Done)
|
||||
}
|
||||
Await.result(co.run(Some("b")), remainingOrDefault)
|
||||
Await.result(co.run(CustomReason, Some("b")), remainingOrDefault)
|
||||
receiveN(2) should ===(List("B", "C"))
|
||||
co.shutdownReason() should ===(Some(CustomReason))
|
||||
}
|
||||
|
||||
"only run once" in {
|
||||
|
|
@ -186,11 +190,14 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
|||
testActor ! "A"
|
||||
Future.successful(Done)
|
||||
}
|
||||
Await.result(co.run(), remainingOrDefault)
|
||||
co.shutdownReason() should ===(None)
|
||||
Await.result(co.run(CustomReason), remainingOrDefault)
|
||||
co.shutdownReason() should ===(Some(CustomReason))
|
||||
expectMsg("A")
|
||||
Await.result(co.run(), remainingOrDefault)
|
||||
Await.result(co.run(UnknownReason), remainingOrDefault)
|
||||
testActor ! "done"
|
||||
expectMsg("done") // no additional A
|
||||
co.shutdownReason() should ===(Some(CustomReason))
|
||||
}
|
||||
|
||||
"continue after timeout or failure" in {
|
||||
|
|
@ -220,7 +227,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
|||
testActor ! "C"
|
||||
Future.successful(Done)
|
||||
}
|
||||
Await.result(co.run(), remainingOrDefault)
|
||||
Await.result(co.run(UnknownReason), remainingOrDefault)
|
||||
expectMsg("A")
|
||||
expectMsg("A")
|
||||
expectMsg("B")
|
||||
|
|
@ -241,7 +248,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
|||
testActor ! "C"
|
||||
Future.successful(Done)
|
||||
}
|
||||
val result = co.run()
|
||||
val result = co.run(UnknownReason)
|
||||
expectMsg("B")
|
||||
intercept[TimeoutException] {
|
||||
Await.result(result, remainingOrDefault)
|
||||
|
|
@ -263,7 +270,7 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
|||
}
|
||||
Future.successful(Done)
|
||||
}
|
||||
Await.result(co.run(), remainingOrDefault)
|
||||
Await.result(co.run(UnknownReason), remainingOrDefault)
|
||||
expectMsg("A")
|
||||
expectMsg("B")
|
||||
}
|
||||
|
|
@ -291,8 +298,9 @@ class CoordinatedShutdownSpec extends AkkaSpec {
|
|||
|
||||
// this must be the last test, since it terminates the ActorSystem
|
||||
"terminate ActorSystem" in {
|
||||
Await.result(CoordinatedShutdown(system).run(), 10.seconds) should ===(Done)
|
||||
Await.result(CoordinatedShutdown(system).run(CustomReason), 10.seconds) should ===(Done)
|
||||
system.whenTerminated.isCompleted should ===(true)
|
||||
CoordinatedShutdown(system).shutdownReason() === (Some(CustomReason))
|
||||
}
|
||||
|
||||
"add and remove user JVM hooks with run-by-jvm-shutdown-hook = off, terminate-actor-system = off" in new JvmHookTest {
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import scala.concurrent.duration._
|
|||
import scala.compat.java8.FutureConverters._
|
||||
import scala.compat.java8.OptionConverters._
|
||||
import java.util.concurrent._
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
|
@ -99,6 +98,54 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
|
|||
*/
|
||||
val PhaseActorSystemTerminate = "actor-system-terminate"
|
||||
|
||||
/**
|
||||
* Reason for the shutdown, which can be used by tasks in case they need to do
|
||||
* different things depending on what caused the shutdown. There are some
|
||||
* predefined reasons, but external libraries applications may also define
|
||||
* other reasons.
|
||||
*/
|
||||
trait Reason
|
||||
|
||||
/**
|
||||
* Scala API: The reason for the shutdown was unknown. Needed for backwards compatibility.
|
||||
*/
|
||||
case object UnknownReason extends Reason
|
||||
|
||||
/**
|
||||
* Java API: The reason for the shutdown was unknown. Needed for backwards compatibility.
|
||||
*/
|
||||
def unknownReason: Reason = UnknownReason
|
||||
|
||||
/**
|
||||
* Scala API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM.
|
||||
*/
|
||||
object JvmExitReason extends Reason
|
||||
|
||||
/**
|
||||
* Java API: The shutdown was initiated by a JVM shutdown hook, e.g. triggered by SIGTERM.
|
||||
*/
|
||||
def jvmExitReason: Reason = JvmExitReason
|
||||
|
||||
/**
|
||||
* Scala API: The shutdown was initiated by Cluster downing.
|
||||
*/
|
||||
object ClusterDowningReason extends Reason
|
||||
|
||||
/**
|
||||
* Java API: The shutdown was initiated by Cluster downing.
|
||||
*/
|
||||
def clusterDowningReason: Reason = ClusterDowningReason
|
||||
|
||||
/**
|
||||
* Scala API: The shutdown was initiated by Cluster leaving.
|
||||
*/
|
||||
object ClusterLeavingReason extends Reason
|
||||
|
||||
/**
|
||||
* Java API: The shutdown was initiated by Cluster leaving.
|
||||
*/
|
||||
def clusterLeavingReason: Reason = ClusterLeavingReason
|
||||
|
||||
@volatile private var runningJvmHook = false
|
||||
|
||||
override def get(system: ActorSystem): CoordinatedShutdown = super.get(system)
|
||||
|
|
@ -168,7 +215,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
|
|||
try {
|
||||
// totalTimeout will be 0 when no tasks registered, so at least 3.seconds
|
||||
val totalTimeout = coord.totalTimeout().max(3.seconds)
|
||||
Await.ready(coord.run(), totalTimeout)
|
||||
Await.ready(coord.run(JvmExitReason), totalTimeout)
|
||||
} catch {
|
||||
case NonFatal(e) ⇒
|
||||
coord.log.warning(
|
||||
|
|
@ -246,6 +293,9 @@ final class CoordinatedShutdown private[akka] (
|
|||
system: ExtendedActorSystem,
|
||||
phases: Map[String, CoordinatedShutdown.Phase]) extends Extension {
|
||||
import CoordinatedShutdown.Phase
|
||||
import CoordinatedShutdown.Reason
|
||||
import CoordinatedShutdown.UnknownReason
|
||||
import CoordinatedShutdown.JvmExitReason
|
||||
|
||||
/** INTERNAL API */
|
||||
private[akka] val log = Logging(system, getClass)
|
||||
|
|
@ -253,7 +303,7 @@ final class CoordinatedShutdown private[akka] (
|
|||
/** INTERNAL API */
|
||||
private[akka] val orderedPhases = CoordinatedShutdown.topologicalSort(phases)
|
||||
private val tasks = new ConcurrentHashMap[String, Vector[(String, () ⇒ Future[Done])]]
|
||||
private val runStarted = new AtomicBoolean(false)
|
||||
private val runStarted = new AtomicReference[Option[Reason]](None)
|
||||
private val runPromise = Promise[Done]()
|
||||
|
||||
private var _jvmHooksLatch = new AtomicReference[CountDownLatch](new CountDownLatch(0))
|
||||
|
|
@ -306,33 +356,51 @@ final class CoordinatedShutdown private[akka] (
|
|||
def addTask(phase: String, taskName: String, task: Supplier[CompletionStage[Done]]): Unit =
|
||||
addTask(phase, taskName)(() ⇒ task.get().toScala)
|
||||
|
||||
/**
|
||||
* The `Reason` for the shutdown as passed to the `run` method. `None` if the shutdown
|
||||
* has not been started.
|
||||
*/
|
||||
def shutdownReason(): Option[Reason] = runStarted.get()
|
||||
|
||||
/**
|
||||
* The `Reason` for the shutdown as passed to the `run` method. `Optional.empty` if the shutdown
|
||||
* has not been started.
|
||||
*/
|
||||
def getShutdownReason(): Optional[Reason] = shutdownReason().asJava
|
||||
|
||||
/**
|
||||
* Scala API: Run tasks of all phases. The returned
|
||||
* `Future` is completed when all tasks have been completed,
|
||||
* or there is a failure when recovery is disabled.
|
||||
*
|
||||
* It's safe to call this method multiple times. It will only run the once.
|
||||
* It's safe to call this method multiple times. It will only run the shutdown sequence once.
|
||||
*/
|
||||
def run(): Future[Done] = run(None)
|
||||
def run(reason: Reason): Future[Done] = run(reason, None)
|
||||
|
||||
@deprecated("Use the method with `reason` parameter instead", since = "2.5.8")
|
||||
def run(): Future[Done] = run(UnknownReason)
|
||||
|
||||
/**
|
||||
* Java API: Run tasks of all phases. The returned
|
||||
* `CompletionStage` is completed when all tasks have been completed,
|
||||
* or there is a failure when recovery is disabled.
|
||||
*
|
||||
* It's safe to call this method multiple times. It will only run the once.
|
||||
* It's safe to call this method multiple times. It will only run the shutdown sequence once.
|
||||
*/
|
||||
def runAll(): CompletionStage[Done] = run().toJava
|
||||
def runAll(reason: Reason): CompletionStage[Done] = run(reason).toJava
|
||||
|
||||
@deprecated("Use the method with `reason` parameter instead", since = "2.5.8")
|
||||
def runAll(): CompletionStage[Done] = runAll(UnknownReason)
|
||||
|
||||
/**
|
||||
* Scala API: Run tasks of all phases including and after the given phase.
|
||||
* The returned `Future` is completed when all such tasks have been completed,
|
||||
* or there is a failure when recovery is disabled.
|
||||
*
|
||||
* It's safe to call this method multiple times. It will only run the once.
|
||||
* It's safe to call this method multiple times. It will only run shutdown sequence once.
|
||||
*/
|
||||
def run(fromPhase: Option[String]): Future[Done] = {
|
||||
if (runStarted.compareAndSet(false, true)) {
|
||||
def run(reason: Reason, fromPhase: Option[String]): Future[Done] = {
|
||||
if (runStarted.compareAndSet(None, Some(reason))) {
|
||||
import system.dispatcher
|
||||
val debugEnabled = log.isDebugEnabled
|
||||
def loop(remainingPhases: List[String]): Future[Done] = {
|
||||
|
|
@ -409,15 +477,23 @@ final class CoordinatedShutdown private[akka] (
|
|||
runPromise.future
|
||||
}
|
||||
|
||||
@deprecated("Use the method with `reason` parameter instead", since = "2.5.8")
|
||||
def run(fromPhase: Option[String]): Future[Done] =
|
||||
run(UnknownReason, fromPhase)
|
||||
|
||||
/**
|
||||
* Java API: Run tasks of all phases including and after the given phase.
|
||||
* The returned `CompletionStage` is completed when all such tasks have been completed,
|
||||
* or there is a failure when recovery is disabled.
|
||||
*
|
||||
* It's safe to call this method multiple times. It will only run once.
|
||||
* It's safe to call this method multiple times. It will only run the shutdown sequence once.
|
||||
*/
|
||||
def run(reason: Reason, fromPhase: Optional[String]): CompletionStage[Done] =
|
||||
run(reason, fromPhase.asScala).toJava
|
||||
|
||||
@deprecated("Use the method with `reason` parameter instead", since = "2.5.8")
|
||||
def run(fromPhase: Optional[String]): CompletionStage[Done] =
|
||||
run(fromPhase.asScala).toJava
|
||||
run(UnknownReason, fromPhase)
|
||||
|
||||
/**
|
||||
* The configured timeout for a given `phase`.
|
||||
|
|
@ -462,7 +538,7 @@ final class CoordinatedShutdown private[akka] (
|
|||
* shutdown hooks the standard library JVM shutdown hooks APIs are better suited.
|
||||
*/
|
||||
@tailrec def addCancellableJvmShutdownHook[T](hook: ⇒ T): Cancellable = {
|
||||
if (!runStarted.get) {
|
||||
if (runStarted.get == None) {
|
||||
val currentLatch = _jvmHooksLatch.get
|
||||
val newLatch = new CountDownLatch(currentLatch.getCount.toInt + 1)
|
||||
if (_jvmHooksLatch.compareAndSet(currentLatch, newLatch)) {
|
||||
|
|
|
|||
|
|
@ -190,7 +190,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
|
|||
override def postStop(): Unit = {
|
||||
clusterShutdown.trySuccess(Done)
|
||||
if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) {
|
||||
coordShutdown.run()
|
||||
// if it was stopped due to leaving CoordinatedShutdown was started earlier
|
||||
coordShutdown.run(CoordinatedShutdown.ClusterDowningReason)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -442,7 +443,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
"shutdown-after-unsuccessful-join-seed-nodes [{}]. Running CoordinatedShutdown.",
|
||||
seedNodes.mkString(", "), ShutdownAfterUnsuccessfulJoinSeedNodes)
|
||||
joinSeedNodesDeadline = None
|
||||
CoordinatedShutdown(context.system).run()
|
||||
CoordinatedShutdown(context.system).run(CoordinatedShutdown.ClusterDowningReason)
|
||||
}
|
||||
|
||||
def becomeUninitialized(): Unit = {
|
||||
|
|
@ -921,7 +922,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
exitingTasksInProgress = true
|
||||
logInfo("Exiting, starting coordinated shutdown")
|
||||
selfExiting.trySuccess(Done)
|
||||
coordShutdown.run()
|
||||
coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason)
|
||||
}
|
||||
|
||||
if (talkback) {
|
||||
|
|
@ -1104,7 +1105,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
|
|||
exitingTasksInProgress = true
|
||||
logInfo("Exiting (leader), starting coordinated shutdown")
|
||||
selfExiting.trySuccess(Done)
|
||||
coordShutdown.run()
|
||||
coordShutdown.run(CoordinatedShutdown.ClusterLeavingReason)
|
||||
}
|
||||
|
||||
exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
|
||||
|
|
|
|||
|
|
@ -158,7 +158,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
|||
Cluster(sys2).join(Cluster(sys2).selfAddress)
|
||||
probe.expectMsgType[MemberUp]
|
||||
|
||||
CoordinatedShutdown(sys2).run()
|
||||
CoordinatedShutdown(sys2).run(CoordinatedShutdown.UnknownReason)
|
||||
probe.expectMsgType[MemberLeft]
|
||||
probe.expectMsgType[MemberExited]
|
||||
probe.expectMsgType[MemberRemoved]
|
||||
|
|
@ -187,6 +187,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
|
|||
probe.expectMsgType[MemberRemoved]
|
||||
Await.result(sys2.whenTerminated, 10.seconds)
|
||||
Cluster(sys2).isTerminated should ===(true)
|
||||
CoordinatedShutdown(sys2).shutdownReason() should ===(Some(CoordinatedShutdown.ClusterLeavingReason))
|
||||
} finally {
|
||||
shutdown(sys2)
|
||||
}
|
||||
|
|
@ -212,6 +213,7 @@ akka.loglevel=DEBUG
|
|||
probe.expectMsgType[MemberRemoved]
|
||||
Await.result(sys3.whenTerminated, 10.seconds)
|
||||
Cluster(sys3).isTerminated should ===(true)
|
||||
CoordinatedShutdown(sys3).shutdownReason() should ===(Some(CoordinatedShutdown.ClusterDowningReason))
|
||||
} finally {
|
||||
shutdown(sys3)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -832,7 +832,8 @@ public class ActorDocTest extends AbstractJavaTest {
|
|||
// don't run this
|
||||
if (false) {
|
||||
//#coordinated-shutdown-run
|
||||
CompletionStage<Done> done = CoordinatedShutdown.get(system).runAll();
|
||||
CompletionStage<Done> done = CoordinatedShutdown.get(system).runAll(
|
||||
CoordinatedShutdown.unknownReason());
|
||||
//#coordinated-shutdown-run
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -756,7 +756,7 @@ class ActorDocSpec extends AkkaSpec("""
|
|||
// don't run this
|
||||
def dummy(): Unit = {
|
||||
//#coordinated-shutdown-run
|
||||
val done: Future[Done] = CoordinatedShutdown(system).run()
|
||||
val done: Future[Done] = CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason)
|
||||
//#coordinated-shutdown-run
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue