add CoordinatedShutdown, #21537

* CoordinatedShutdown that can run tasks for configured phases in order (DAG)
* coordinate handover/shutdown of singleton with cluster exiting/shutdown
* phase config obj with depends-on list
* integrate graceful leaving of sharding in coordinated shutdown
* add timeout and recover
* add some missing artery ports to tests
* leave via CoordinatedShutdown.run
* optionally exit-jvm in last phase
* run via jvm shutdown hook
* send ExitingConfirmed to leader before shutdown of Exiting
  to not have to wait for failure detector to mark it as
  unreachable before removing
* the unreachable signal is still kept as a safe guard if
  message is lost or leader dies
* PhaseClusterExiting vs MemberExited in ClusterSingletonManager
* terminate ActorSystem when cluster shutdown (via Down)
* add more predefined and custom phases
* reference documentation
* migration guide
* problem when the leader order was sys2, sys1, sys3,
  then sys3 could not perform it's duties and move Leving sys1 to
  Exiting because it was observing sys1 as unreachable
* exclude Leaving with exitingConfirmed from convergence condidtion
This commit is contained in:
Patrik Nordwall 2016-12-01 18:49:38 +01:00
parent 4a9c753710
commit 84ade6fdc3
69 changed files with 1778 additions and 339 deletions

View file

@ -0,0 +1,296 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.concurrent.Future
import akka.Done
import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
import akka.actor.CoordinatedShutdown.Phase
import scala.concurrent.Promise
import java.util.concurrent.TimeoutException
class CoordinatedShutdownSpec extends AkkaSpec {
def extSys = system.asInstanceOf[ExtendedActorSystem]
// some convenience to make the test readable
def phase(dependsOn: String*): Phase = Phase(dependsOn.toSet, timeout = 10.seconds, recover = true)
val emptyPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = true)
private def checkTopologicalSort(phases: Map[String, Phase]): List[String] = {
val result = CoordinatedShutdown.topologicalSort(phases)
result.zipWithIndex.foreach {
case (phase, i)
phases.get(phase) match {
case Some(Phase(dependsOn, _, _))
dependsOn.foreach { depPhase
withClue(s"phase [$phase] depends on [$depPhase] but was ordered before it in topological sort result $result") {
i should be > result.indexOf(depPhase)
}
}
case None // ok
}
}
result
}
"CoordinatedShutdown" must {
"sort phases in topolgical order" in {
checkTopologicalSort(Map.empty) should ===(Nil)
checkTopologicalSort(Map(
"a" emptyPhase)) should ===(List("a"))
checkTopologicalSort(Map(
"b" phase("a"))) should ===(List("a", "b"))
val result1 = checkTopologicalSort(Map(
"c" phase("a"), "b" phase("a")))
result1.head should ===("a")
// b, c can be in any order
result1.toSet should ===(Set("a", "b", "c"))
checkTopologicalSort(Map(
"b" phase("a"), "c" phase("b"))) should ===(List("a", "b", "c"))
checkTopologicalSort(Map(
"b" phase("a"), "c" phase("a", "b"))) should ===(List("a", "b", "c"))
val result2 = checkTopologicalSort(Map(
"c" phase("a", "b")))
result2.last should ===("c")
// a, b can be in any order
result2.toSet should ===(Set("a", "b", "c"))
checkTopologicalSort(Map(
"b" phase("a"), "c" phase("b"), "d" phase("b", "c"),
"e" phase("d"))) should ===(
List("a", "b", "c", "d", "e"))
val result3 = checkTopologicalSort(Map(
"a2" phase("a1"), "a3" phase("a2"),
"b2" phase("b1"), "b3" phase("b2")))
val (a, b) = result3.partition(_.charAt(0) == 'a')
a should ===(List("a1", "a2", "a3"))
b should ===(List("b1", "b2", "b3"))
}
"detect cycles in phases (non-DAG)" in {
intercept[IllegalArgumentException] {
CoordinatedShutdown.topologicalSort(Map(
"a" phase("a")))
}
intercept[IllegalArgumentException] {
CoordinatedShutdown.topologicalSort(Map(
"b" phase("a"), "a" phase("b")))
}
intercept[IllegalArgumentException] {
CoordinatedShutdown.topologicalSort(Map(
"c" phase("a"), "c" phase("b"), "b" phase("c")))
}
intercept[IllegalArgumentException] {
CoordinatedShutdown.topologicalSort(Map(
"d" phase("a"), "d" phase("c"), "c" phase("b"), "b" phase("d")))
}
}
"have pre-defined phases from config" in {
import CoordinatedShutdown._
CoordinatedShutdown(system).orderedPhases should ===(List(
PhaseBeforeServiceUnbind,
PhaseServiceUnbind,
PhaseServiceRequestsDone,
PhaseServiceStop,
PhaseBeforeClusterShutdown,
PhaseClusterShardingShutdownRegion,
PhaseClusterLeave,
PhaseClusterExiting,
PhaseClusterExitingDone,
PhaseClusterShutdown,
PhaseBeforeActorSystemTerminate,
PhaseActorSystemTerminate))
}
"run ordered phases" in {
import system.dispatcher
val phases = Map(
"a" emptyPhase,
"b" phase("a"),
"c" phase("b", "a"))
val co = new CoordinatedShutdown(extSys, phases)
co.addTask("a", "a1") { ()
testActor ! "A"
Future.successful(Done)
}
co.addTask("b", "b1") { ()
testActor ! "B"
Future.successful(Done)
}
co.addTask("b", "b2") { ()
Future {
// to verify that c is not performed before b
Thread.sleep(100)
testActor ! "B"
Done
}
}
co.addTask("c", "c1") { ()
testActor ! "C"
Future.successful(Done)
}
Await.result(co.run(), remainingOrDefault)
receiveN(4) should ===(List("A", "B", "B", "C"))
}
"run from a given phase" in {
import system.dispatcher
val phases = Map(
"a" emptyPhase,
"b" phase("a"),
"c" phase("b", "a"))
val co = new CoordinatedShutdown(extSys, phases)
co.addTask("a", "a1") { ()
testActor ! "A"
Future.successful(Done)
}
co.addTask("b", "b1") { ()
testActor ! "B"
Future.successful(Done)
}
co.addTask("c", "c1") { ()
testActor ! "C"
Future.successful(Done)
}
Await.result(co.run(Some("b")), remainingOrDefault)
receiveN(2) should ===(List("B", "C"))
}
"only run once" in {
import system.dispatcher
val phases = Map("a" emptyPhase)
val co = new CoordinatedShutdown(extSys, phases)
co.addTask("a", "a1") { ()
testActor ! "A"
Future.successful(Done)
}
Await.result(co.run(), remainingOrDefault)
expectMsg("A")
Await.result(co.run(), remainingOrDefault)
testActor ! "done"
expectMsg("done") // no additional A
}
"continue after timeout or failure" in {
import system.dispatcher
val phases = Map(
"a" emptyPhase,
"b" Phase(dependsOn = Set("a"), timeout = 100.millis, recover = true),
"c" phase("b", "a"))
val co = new CoordinatedShutdown(extSys, phases)
co.addTask("a", "a1") { ()
testActor ! "A"
Future.failed(new RuntimeException("boom"))
}
co.addTask("a", "a2") { ()
Future {
// to verify that b is not performed before a also in case of failure
Thread.sleep(100)
testActor ! "A"
Done
}
}
co.addTask("b", "b1") { ()
testActor ! "B"
Promise[Done]().future // never completed
}
co.addTask("c", "c1") { ()
testActor ! "C"
Future.successful(Done)
}
Await.result(co.run(), remainingOrDefault)
expectMsg("A")
expectMsg("A")
expectMsg("B")
expectMsg("C")
}
"abort if recover=off" in {
import system.dispatcher
val phases = Map(
"b" Phase(dependsOn = Set("a"), timeout = 100.millis, recover = false),
"c" phase("b", "a"))
val co = new CoordinatedShutdown(extSys, phases)
co.addTask("b", "b1") { ()
testActor ! "B"
Promise[Done]().future // never completed
}
co.addTask("c", "c1") { ()
testActor ! "C"
Future.successful(Done)
}
val result = co.run()
expectMsg("B")
intercept[TimeoutException] {
Await.result(result, remainingOrDefault)
}
expectNoMsg(200.millis) // C not run
}
"be possible to add tasks in later phase from task in earlier phase" in {
import system.dispatcher
val phases = Map(
"a" emptyPhase,
"b" phase("a"))
val co = new CoordinatedShutdown(extSys, phases)
co.addTask("a", "a1") { ()
testActor ! "A"
co.addTask("b", "b1") { ()
testActor ! "B"
Future.successful(Done)
}
Future.successful(Done)
}
Await.result(co.run(), remainingOrDefault)
expectMsg("A")
expectMsg("B")
}
"parse phases from config" in {
CoordinatedShutdown.phasesFromConfig(ConfigFactory.parseString("""
default-phase-timeout = 10s
phases {
a = {}
b {
depends-on = [a]
timeout = 15s
}
c {
depends-on = [a, b]
recover = off
}
}
""")) should ===(Map(
"a" Phase(dependsOn = Set.empty, timeout = 10.seconds, recover = true),
"b" Phase(dependsOn = Set("a"), timeout = 15.seconds, recover = true),
"c" Phase(dependsOn = Set("a", "b"), timeout = 10.seconds, recover = false)))
}
// this must be the last test, since it terminates the ActorSystem
"terminate ActorSystem" in {
Await.result(CoordinatedShutdown(system).run(), 10.seconds) should ===(Done)
system.whenTerminated.isCompleted should ===(true)
}
}
}

View file

@ -907,4 +907,115 @@ akka {
}
# CoordinatedShutdown is an extension that will perform registered
# tasks in the order that is defined by the phases. It is started
# by calling CoordinatedShutdown(system).run(). This can be triggered
# by different things, for example:
# - JVM shutdown hook will by default run CoordinatedShutdown
# - Cluster node will automatically run CoordinatedShutdown when it
# sees itself as Exiting
# - A management console or other application specific command can
# run CoordinatedShutdown
coordinated-shutdown {
# The timeout that will be used for a phase if not specified with
# 'timeout' in the phase
default-phase-timeout = 5 s
# Terminate the ActorSystem in the last phase actor-system-terminate.
terminate-actor-system = on
# Exit the JVM (System.exit(0)) in the last phase actor-system-terminate
# if this is set to 'on'. It is done after termination of the
# ActorSystem if terminate-actor-system=on, otherwise it is done
# immediately when the last phase is reached.
exit-jvm = off
# Run the coordinated shutdown when the JVM process exits, e.g.
# via kill SIGTERM signal (SIGINT ctrl-c doesn't work).
run-by-jvm-shutdown-hook = on
#//#coordinated-shutdown-phases
# CoordinatedShutdown will run the tasks that are added to these
# phases. The phases can be ordered as a DAG by defining the
# dependencies between the phases.
# Each phase is defined as a named config section with the
# following optional properties:
# - timeout=15s: Override the default-phase-timeout for this phase.
# - recover=off: If the phase fails the shutdown is aborted
# and depending phases will not be executed.
# depends-on=[]: Run the phase after the given phases
phases {
# The first pre-defined phase that applications can add tasks to.
# Note that more phases can be be added in the application's
# configuration by overriding this phase with an additional
# depends-on.
before-service-unbind {
}
# Stop accepting new incoming requests in for example HTTP.
service-unbind {
depends-on = [before-service-unbind]
}
# Wait for requests that are in progress to be completed.
service-requests-done {
depends-on = [service-unbind]
}
# Final shutdown of service endpoints.
service-stop {
depends-on = [service-requests-done]
}
# Phase for custom application tasks that are to be run
# after service shutdown and before cluster shutdown.
before-cluster-shutdown {
depends-on = [service-stop]
}
# Graceful shutdown of the Cluster Sharding regions.
cluster-sharding-shutdown-region {
timeout = 10 s
depends-on = [before-cluster-shutdown]
}
# Emit the leave command for the node that is shutting down.
cluster-leave {
depends-on = [cluster-sharding-shutdown-region]
}
# Shutdown cluster singletons
cluster-exiting {
timeout = 10 s
depends-on = [cluster-leave]
}
# Wait until exiting has been completed
cluster-exiting-done {
depends-on = [cluster-exiting]
}
# Shutdown the cluster extension
cluster-shutdown {
depends-on = [cluster-exiting-done]
}
# Phase for custom application tasks that are to be run
# after cluster shutdown and before ActorSystem termination.
before-actor-system-terminate {
depends-on = [cluster-shutdown]
}
# Last phase. See terminate-actor-system and exit-jvm above.
# Don't add phases that depends on this phase because the
# dispatcher and scheduler of the ActorSystem have been shutdown.
actor-system-terminate {
timeout = 10 s
depends-on = [before-actor-system-terminate]
}
}
#//#coordinated-shutdown-phases
}
}

View file

@ -0,0 +1,404 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.actor
import scala.concurrent.duration._
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.TimeUnit.MILLISECONDS
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import akka.Done
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration
import scala.annotation.tailrec
import com.typesafe.config.ConfigFactory
import akka.pattern.after
import java.util.concurrent.TimeoutException
import scala.util.control.NonFatal
import akka.event.Logging
import akka.dispatch.ExecutionContexts
import java.util.concurrent.Executors
import scala.util.Try
import scala.concurrent.Await
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicReference
import java.util.function.Supplier
import java.util.concurrent.CompletionStage
import java.util.Optional
object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with ExtensionIdProvider {
val PhaseBeforeServiceUnbind = "before-service-unbind"
val PhaseServiceUnbind = "service-unbind"
val PhaseServiceRequestsDone = "service-requests-done"
val PhaseServiceStop = "service-stop"
val PhaseBeforeClusterShutdown = "before-cluster-shutdown"
val PhaseClusterShardingShutdownRegion = "cluster-sharding-shutdown-region"
val PhaseClusterLeave = "cluster-leave"
val PhaseClusterExiting = "cluster-exiting"
val PhaseClusterExitingDone = "cluster-exiting-done"
val PhaseClusterShutdown = "cluster-shutdown"
val PhaseBeforeActorSystemTerminate = "before-actor-system-terminate"
val PhaseActorSystemTerminate = "actor-system-terminate"
@volatile private var runningJvmHook = false
override def get(system: ActorSystem): CoordinatedShutdown = super.get(system)
override def lookup = CoordinatedShutdown
override def createExtension(system: ExtendedActorSystem): CoordinatedShutdown = {
val conf = system.settings.config.getConfig("akka.coordinated-shutdown")
val phases = phasesFromConfig(conf)
val coord = new CoordinatedShutdown(system, phases)
initPhaseActorSystemTerminate(system, conf, coord)
initJvmHook(system, conf, coord)
coord
}
private def initPhaseActorSystemTerminate(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = {
val terminateActorSystem = conf.getBoolean("terminate-actor-system")
val exitJvm = conf.getBoolean("exit-jvm")
if (terminateActorSystem || exitJvm) {
coord.addTask(PhaseActorSystemTerminate, "terminate-system") { ()
if (exitJvm && terminateActorSystem) {
// In case ActorSystem shutdown takes longer than the phase timeout,
// exit the JVM forcefully anyway.
// We must spawn a separate thread to not block current thread,
// since that would have blocked the shutdown of the ActorSystem.
val timeout = coord.timeout(PhaseActorSystemTerminate)
val t = new Thread {
override def run(): Unit = {
if (Try(Await.ready(system.whenTerminated, timeout)).isFailure && !runningJvmHook)
System.exit(0)
}
}
t.setName("CoordinatedShutdown-exit")
t.start()
}
if (terminateActorSystem) {
system.terminate().map { _
if (exitJvm && !runningJvmHook) System.exit(0)
Done
}(ExecutionContexts.sameThreadExecutionContext)
} else if (exitJvm) {
System.exit(0)
Future.successful(Done)
} else
Future.successful(Done)
}
}
}
private def initJvmHook(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = {
val runByJvmShutdownHook = conf.getBoolean("run-by-jvm-shutdown-hook")
if (runByJvmShutdownHook) {
coord.addJvmShutdownHook { ()
runningJvmHook = true // avoid System.exit from PhaseActorSystemTerminate task
if (!system.whenTerminated.isCompleted) {
coord.log.info("Starting coordinated shutdown from JVM shutdown hook")
try
Await.ready(coord.run(), coord.totalTimeout())
catch {
case NonFatal(e)
coord.log.warning(
"CoordinatedShutdown from JVM shutdown failed: {}",
e.getMessage)
}
}
}
}
}
/**
* INTERNAL API
*/
private[akka] final case class Phase(dependsOn: Set[String], timeout: FiniteDuration, recover: Boolean)
/**
* INTERNAL API
*/
private[akka] def phasesFromConfig(conf: Config): Map[String, Phase] = {
import scala.collection.JavaConverters._
val defaultPhaseTimeout = conf.getString("default-phase-timeout")
val phasesConf = conf.getConfig("phases")
val defaultPhaseConfig = ConfigFactory.parseString(s"""
timeout = $defaultPhaseTimeout
recover = true
depends-on = []
""")
phasesConf.root.unwrapped.asScala.toMap.map {
case (k, _: java.util.Map[_, _])
val c = phasesConf.getConfig(k).withFallback(defaultPhaseConfig)
val dependsOn = c.getStringList("depends-on").asScala.toSet
val timeout = c.getDuration("timeout", MILLISECONDS).millis
val recover = c.getBoolean("recover")
k Phase(dependsOn, timeout, recover)
case (k, v)
throw new IllegalArgumentException(s"Expected object value for [$k], got [$v]")
}
}
/**
* INTERNAL API: https://en.wikipedia.org/wiki/Topological_sorting
*/
private[akka] def topologicalSort(phases: Map[String, Phase]): List[String] = {
var result = List.empty[String]
var unmarked = phases.keySet ++ phases.values.flatMap(_.dependsOn) // in case phase is not defined as key
var tempMark = Set.empty[String] // for detecting cycles
while (unmarked.nonEmpty) {
depthFirstSearch(unmarked.head)
}
def depthFirstSearch(u: String): Unit = {
if (tempMark(u))
throw new IllegalArgumentException("Cycle detected in graph of phases. It must be a DAG. " +
s"phase [$u] depends transitively on itself. All dependencies: $phases")
if (unmarked(u)) {
tempMark += u
phases.get(u) match {
case Some(Phase(dependsOn, _, _)) dependsOn.foreach(depthFirstSearch)
case None
}
unmarked -= u // permanent mark
tempMark -= u
result = u :: result
}
}
result.reverse
}
}
final class CoordinatedShutdown private[akka] (
system: ExtendedActorSystem,
phases: Map[String, CoordinatedShutdown.Phase]) extends Extension {
import CoordinatedShutdown.Phase
/** INTERNAL API */
private[akka] val log = Logging(system, getClass)
private val knownPhases = phases.keySet ++ phases.values.flatMap(_.dependsOn)
/** INTERNAL API */
private[akka] val orderedPhases = CoordinatedShutdown.topologicalSort(phases)
private val tasks = new ConcurrentHashMap[String, Vector[(String, () Future[Done])]]
private val runStarted = new AtomicBoolean(false)
private val runPromise = Promise[Done]()
private var _jvmHooksLatch = new AtomicReference[CountDownLatch](new CountDownLatch(0))
/**
* INTERNAL API
*/
private[akka] def jvmHooksLatch: CountDownLatch = _jvmHooksLatch.get
/**
* Scala API: Add a task to a phase. It doesn't remove previously added tasks.
* Tasks added to the same phase are executed in parallel without any
* ordering assumptions. Next phase will not start until all tasks of
* previous phase have been completed.
*
* Tasks should typically be registered as early as possible after system
* startup. When running the coordinated shutdown tasks that have been registered
* will be performed but tasks that are added too late will not be run.
* It is possible to add a task to a later phase by a task in an earlier phase
* and it will be performed.
*/
@tailrec def addTask(phase: String, taskName: String)(task: () Future[Done]): Unit = {
require(
knownPhases(phase),
s"Unknown phase [$phase], known phases [$knownPhases]. " +
"All phases (along with their optional dependencies) must be defined in configuration")
val current = tasks.get(phase)
if (current == null) {
if (tasks.putIfAbsent(phase, Vector(taskName task)) != null)
addTask(phase, taskName)(task) // CAS failed, retry
} else {
if (!tasks.replace(phase, current, current :+ (taskName task)))
addTask(phase, taskName)(task) // CAS failed, retry
}
}
/**
* Java API: Add a task to a phase. It doesn't remove previously added tasks.
* Tasks added to the same phase are executed in parallel without any
* ordering assumptions. Next phase will not start until all tasks of
* previous phase have been completed.
*
* Tasks should typically be registered as early as possible after system
* startup. When running the coordinated shutdown tasks that have been registered
* will be performed but tasks that are added too late will not be run.
* It is possible to add a task to a later phase by a task in an earlier phase
* and it will be performed.
*/
def addTask(phase: String, taskName: String, task: Supplier[CompletionStage[Done]]): Unit =
addTask(phase, taskName)(() task.get().toScala)
/**
* Scala API: Run tasks of all phases. The returned
* `Future` is completed when all tasks have been completed,
* or there is a failure when recovery is disabled.
*
* It's safe to call this method multiple times. It will only run the once.
*/
def run(): Future[Done] = run(None)
/**
* Java API: Run tasks of all phases. The returned
* `CompletionStage` is completed when all tasks have been completed,
* or there is a failure when recovery is disabled.
*
* It's safe to call this method multiple times. It will only run the once.
*/
def runAll(): CompletionStage[Done] = run().toJava
/**
* Scala API: Run tasks of all phases including and after the given phase.
* The returned `Future` is completed when all such tasks have been completed,
* or there is a failure when recovery is disabled.
*
* It's safe to call this method multiple times. It will only run the once.
*/
def run(fromPhase: Option[String]): Future[Done] = {
if (runStarted.compareAndSet(false, true)) {
import system.dispatcher
val debugEnabled = log.isDebugEnabled
def loop(remainingPhases: List[String]): Future[Done] = {
remainingPhases match {
case Nil Future.successful(Done)
case phase :: remaining
val phaseResult = (tasks.get(phase) match {
case null
if (debugEnabled) log.debug("Performing phase [{}] with [0] tasks", phase)
Future.successful(Done)
case tasks
if (debugEnabled) log.debug(
"Performing phase [{}] with [{}] tasks: [{}]",
phase, tasks.size, tasks.map { case (taskName, _) taskName }.mkString(", "))
// note that tasks within same phase are performed in parallel
val recoverEnabled = phases(phase).recover
val result = Future.sequence(tasks.map {
case (taskName, task)
try {
val r = task.apply()
if (recoverEnabled) r.recover {
case NonFatal(e)
log.warning("Task [{}] failed in phase [{}]: {}", taskName, phase, e.getMessage)
Done
}
else r
} catch {
case NonFatal(e)
// in case task.apply throws
if (recoverEnabled) {
log.warning("Task [{}] failed in phase [{}]: {}", taskName, phase, e.getMessage)
Future.successful(Done)
} else
Future.failed(e)
}
}).map(_ Done)(ExecutionContexts.sameThreadExecutionContext)
val timeout = phases(phase).timeout
val deadline = Deadline.now + timeout
val timeoutFut = after(timeout, system.scheduler) {
if (phase == CoordinatedShutdown.PhaseActorSystemTerminate && deadline.hasTimeLeft) {
// too early, i.e. triggered by system termination
result
} else if (result.isCompleted)
Future.successful(Done)
else if (recoverEnabled) {
log.warning("Coordinated shutdown phase [{}] timed out after {}", phase, timeout)
Future.successful(Done)
} else
Future.failed(
new TimeoutException(s"Coordinated shutdown phase [$phase] timed out after $timeout"))
}
Future.firstCompletedOf(List(result, timeoutFut))
})
if (remaining.isEmpty)
phaseResult // avoid flatMap when system terminated in last phase
else
phaseResult.flatMap(_ loop(remaining))
}
}
val remainingPhases = fromPhase match {
case None orderedPhases // all
case Some(p) orderedPhases.dropWhile(_ != p)
}
val done = loop(remainingPhases)
runPromise.completeWith(done)
}
runPromise.future
}
/**
* Java API: Run tasks of all phases including and after the given phase.
* The returned `CompletionStage` is completed when all such tasks have been completed,
* or there is a failure when recovery is disabled.
*
* It's safe to call this method multiple times. It will only run once.
*/
def run(fromPhase: Optional[String]): CompletionStage[Done] =
run(fromPhase.asScala).toJava
/**
* The configured timeout for a given `phase`.
* For example useful as timeout when actor `ask` requests
* is used as a task.
*/
def timeout(phase: String): FiniteDuration =
phases.get(phase) match {
case Some(Phase(_, timeout, _)) timeout
case None
throw new IllegalArgumentException(s"Unknown phase [$phase]. All phases must be defined in configuration")
}
/**
* Sum of timeouts of all phases that have some task.
*/
def totalTimeout(): FiniteDuration = {
import scala.collection.JavaConverters._
tasks.keySet.asScala.foldLeft(Duration.Zero) {
case (acc, phase) acc + timeout(phase)
}
}
/**
* Scala API: Add a JVM shutdown hook that will be run when the JVM process
* begins its shutdown sequence. Added hooks may run in an order
* concurrently, but they are running before Akka internal shutdown
* hooks, e.g. those shutting down Artery.
*/
@tailrec def addJvmShutdownHook(hook: () Unit): Unit = {
if (!runStarted.get) {
val currentLatch = _jvmHooksLatch.get
val newLatch = new CountDownLatch(currentLatch.getCount.toInt + 1)
if (_jvmHooksLatch.compareAndSet(currentLatch, newLatch)) {
Runtime.getRuntime.addShutdownHook(new Thread {
override def run(): Unit = {
try hook() finally _jvmHooksLatch.get.countDown()
}
})
} else
addJvmShutdownHook(hook) // lost CAS, retry
}
}
/**
* Java API: Add a JVM shutdown hook that will be run when the JVM process
* begins its shutdown sequence. Added hooks may run in an order
* concurrently, but they are running before Akka internal shutdown
* hooks, e.g. those shutting down Artery.
*/
def addJvmShutdownHook(hook: Runnable): Unit =
addJvmShutdownHook(() hook.run())
}

View file

@ -5,3 +5,7 @@ akka {
warn-about-java-serializer-usage = off
}
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off

View file

@ -237,6 +237,7 @@ object ShardCoordinator {
* `ShardRegion` requests full handoff to be able to shutdown gracefully.
*/
@SerialVersionUID(1L) final case class GracefulShutdownReq(shardRegion: ActorRef) extends CoordinatorCommand
with DeadLetterSuppression
// DomainEvents for the persistent state of the event sourced ShardCoordinator
sealed trait DomainEvent extends ClusterShardingSerializable

View file

@ -18,6 +18,8 @@ import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.concurrent.Promise
import akka.Done
/**
* @see [[ClusterSharding$ ClusterSharding extension]]
@ -372,6 +374,15 @@ class ShardRegion(
val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry)
var retryCount = 0
// for CoordinatedShutdown
val gracefulShutdownProgress = Promise[Done]()
CoordinatedShutdown(context.system).addTask(
CoordinatedShutdown.PhaseClusterShardingShutdownRegion,
"region-shutdown") { ()
self ! GracefulShutdown
gracefulShutdownProgress.future
}
// subscribe to MemberEvent, re-subscribe when restart
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
@ -380,6 +391,7 @@ class ShardRegion(
override def postStop(): Unit = {
super.postStop()
cluster.unsubscribe(self)
gracefulShutdownProgress.trySuccess(Done)
retryTask.cancel()
}
@ -391,6 +403,14 @@ class ShardRegion(
def coordinatorSelection: Option[ActorSelection] =
membersByAge.headOption.map(m context.actorSelection(RootActorPath(m.address) + coordinatorPath))
/**
* When leaving the coordinator singleton is started rather quickly on next
* oldest node and therefore it is good to send the GracefulShutdownReq to
* the likely locations of the coordinator.
*/
def gracefulShutdownCoordinatorSelections: List[ActorSelection] =
membersByAge.take(2).toList.map(m context.actorSelection(RootActorPath(m.address) + coordinatorPath))
var coordinator: Option[ActorRef] = None
def changeMembers(newMembers: immutable.SortedSet[Member]): Unit = {
@ -603,8 +623,9 @@ class ShardRegion(
}
private def tryCompleteGracefulShutdown() =
if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty)
if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) {
context.stop(self) // all shards have been rebalanced, complete graceful shutdown
}
def register(): Unit = {
coordinatorSelection.foreach(_ ! registrationMessage)
@ -755,7 +776,8 @@ class ShardRegion(
}
}
def sendGracefulShutdownToCoordinator(): Unit =
def sendGracefulShutdownToCoordinator(): Unit = {
if (gracefulShutdownInProgress)
coordinator.foreach(_ ! GracefulShutdownReq(self))
gracefulShutdownCoordinatorSelections.foreach(_ ! GracefulShutdownReq(self))
}
}

View file

@ -38,32 +38,6 @@ object ClusterShardingGracefulShutdownSpec {
case id: Int id.toString
}
//#graceful-shutdown
class IllustrateGracefulShutdown extends Actor {
val system = context.system
val cluster = Cluster(system)
val region = ClusterSharding(system).shardRegion("Entity")
def receive = {
case "leave"
context.watch(region)
region ! ShardRegion.GracefulShutdown
case Terminated(`region`)
cluster.registerOnMemberRemoved(self ! "member-removed")
cluster.leave(cluster.selfAddress)
case "member-removed"
// Let singletons hand over gracefully before stopping the system
import context.dispatcher
system.scheduler.scheduleOnce(10.seconds, self, "stop-system")
case "stop-system"
system.terminate()
}
}
//#graceful-shutdown
}
abstract class ClusterShardingGracefulShutdownSpecConfig(val mode: String) extends MultiNodeConfig {

View file

@ -184,7 +184,11 @@ abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConf
enterBarrier("after-3")
}
"recover after leaving coordinator node" in within(30.seconds) {
"recover after leaving coordinator node" in {
system.actorSelection(node(first) / "user" / "shardLocations") ! GetLocations
val Locations(originalLocations) = expectMsgType[Locations]
val firstAddress = node(first).address
runOn(third) {
cluster.leave(node(first).address)
}
@ -196,18 +200,17 @@ abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConf
enterBarrier("stopped")
runOn(second, third, fourth) {
system.actorSelection(node(first) / "user" / "shardLocations") ! GetLocations
val Locations(locations) = expectMsgType[Locations]
val firstAddress = node(first).address
awaitAssert {
val probe = TestProbe()
locations.foreach {
case (id, ref)
region.tell(Ping(id), probe.ref)
if (ref.path.address == firstAddress)
probe.expectMsgType[ActorRef](1.second) should not be (ref)
else
probe.expectMsg(1.second, ref) // should not move
within(15.seconds) {
awaitAssert {
val probe = TestProbe()
originalLocations.foreach {
case (id, ref)
region.tell(Ping(id), probe.ref)
if (ref.path.address == firstAddress)
probe.expectMsgType[ActorRef](1.second) should not be (ref)
else
probe.expectMsg(1.second, ref) // should not move
}
}
}
}

View file

@ -184,37 +184,6 @@ public class ClusterShardingTest {
//#counter-actor
static//#graceful-shutdown
public class IllustrateGracefulShutdown extends AbstractActor {
public IllustrateGracefulShutdown() {
final ActorSystem system = context().system();
final Cluster cluster = Cluster.get(system);
final ActorRef region = ClusterSharding.get(system).shardRegion("Entity");
receive(ReceiveBuilder.
match(String.class, s -> s.equals("leave"), s -> {
context().watch(region);
region.tell(ShardRegion.gracefulShutdownInstance(), self());
}).
match(Terminated.class, t -> t.actor().equals(region), t -> {
cluster.registerOnMemberRemoved(() ->
self().tell("member-removed", self()));
cluster.leave(cluster.selfAddress());
}).
match(String.class, s -> s.equals("member-removed"), s -> {
// Let singletons hand over gracefully before stopping the system
context().system().scheduler().scheduleOnce(Duration.create(10, SECONDS),
self(), "stop-system", context().dispatcher(), self());
}).
match(String.class, s -> s.equals("stop-system"), s -> {
system.terminate();
}).
build());
}
}
//#graceful-shutdown
static//#supervisor
public class CounterSupervisor extends UntypedActor {

View file

@ -4,4 +4,8 @@ akka {
serialize-messages = on
warn-about-java-serializer-usage = off
}
}
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off

View file

@ -29,6 +29,7 @@ object RemoveInternalClusterShardingDataSpec {
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb {
native = off

View file

@ -25,6 +25,11 @@ import akka.AkkaException
import akka.actor.NoSerializationVerificationNeeded
import akka.cluster.UniqueAddress
import akka.cluster.ClusterEvent
import scala.concurrent.Promise
import akka.Done
import akka.actor.CoordinatedShutdown
import akka.pattern.ask
import akka.util.Timeout
object ClusterSingletonManagerSettings {
@ -196,6 +201,7 @@ object ClusterSingletonManager {
final case class StoppingData(singleton: ActorRef) extends Data
case object EndData extends Data
final case class DelayedMemberRemoved(member: Member)
case object SelfExiting
val HandOverRetryTimer = "hand-over-retry"
val TakeOverRetryTimer = "take-over-retry"
@ -236,6 +242,17 @@ object ClusterSingletonManager {
// subscribe to MemberEvent, re-subscribe when restart
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
// It's a delicate difference between CoordinatedShutdown.PhaseClusterExiting and MemberExited.
// MemberExited event is published immediately (leader may have performed that transition on other node),
// and that will trigger run of CoordinatedShutdown, while PhaseClusterExiting will happen later.
// Using PhaseClusterExiting in the singleton because the graceful shutdown of sharding region
// should preferably complete before stopping the singleton sharding coordinator on same node.
val coordShutdown = CoordinatedShutdown(context.system)
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-1") { ()
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting))
self.ask(SelfExiting).mapTo[Done]
}
}
override def postStop(): Unit = cluster.unsubscribe(self)
@ -285,8 +302,12 @@ object ClusterSingletonManager {
def receive = {
case state: CurrentClusterState handleInitial(state)
case MemberUp(m) add(m)
case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || mEvent.isInstanceOf[MemberRemoved])
remove(mEvent.member)
case MemberRemoved(m, _) remove(m)
case MemberExited(m) if m.uniqueAddress != cluster.selfUniqueAddress
remove(m)
case SelfExiting
remove(cluster.readView.self)
sender() ! Done // reply to ask
case GetNext if changes.isEmpty
context.become(deliverNext, discardOld = false)
case GetNext
@ -301,16 +322,31 @@ object ClusterSingletonManager {
context.unbecome()
case MemberUp(m)
add(m)
if (changes.nonEmpty) {
sendFirstChange()
context.unbecome()
}
case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || mEvent.isInstanceOf[MemberRemoved])
remove(mEvent.member)
if (changes.nonEmpty) {
sendFirstChange()
context.unbecome()
}
deliverChanges
case MemberRemoved(m, _)
remove(m)
deliverChanges()
case MemberExited(m) if m.uniqueAddress != cluster.selfUniqueAddress
remove(m)
deliverChanges()
case SelfExiting
remove(cluster.readView.self)
deliverChanges()
sender() ! Done // reply to ask
}
def deliverChanges(): Unit = {
if (changes.nonEmpty) {
sendFirstChange()
context.unbecome()
}
}
override def unhandled(msg: Any): Unit = {
msg match {
case _: MemberEvent // ok, silence
case _ super.unhandled(msg)
}
}
}
@ -422,6 +458,16 @@ class ClusterSingletonManager(
removed = removed filter { case (_, deadline) deadline.hasTimeLeft }
}
// for CoordinatedShutdown
val coordShutdown = CoordinatedShutdown(context.system)
val memberExitingProgress = Promise[Done]()
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-singleton-exiting")(()
memberExitingProgress.future)
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-2") { ()
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting))
self.ask(SelfExiting).mapTo[Done]
}
def logInfo(message: String): Unit =
if (LogInfo) log.info(message)
@ -436,7 +482,7 @@ class ClusterSingletonManager(
require(!cluster.isTerminated, "Cluster node must not be terminated")
// subscribe to cluster changes, re-subscribe when restart
cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberExited], classOf[MemberRemoved])
cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved])
setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true)
@ -448,6 +494,7 @@ class ClusterSingletonManager(
override def postStop(): Unit = {
cancelTimer(CleanupTimer)
cluster.unsubscribe(self)
memberExitingProgress.trySuccess(Done)
super.postStop()
}
@ -634,11 +681,17 @@ class ClusterSingletonManager(
case Event(Terminated(ref), d @ OldestData(singleton, _)) if ref == singleton
stay using d.copy(singletonTerminated = true)
case Event(SelfExiting, _)
selfMemberExited()
// complete memberExitingProgress when handOverDone
sender() ! Done // reply to ask
stay
}
when(WasOldest) {
case Event(TakeOverRetry(count), WasOldestData(singleton, singletonTerminated, newOldestOption))
if (cluster.isTerminated && (newOldestOption.isEmpty || count > maxTakeOverRetries)) {
if ((cluster.isTerminated || selfExited) && (newOldestOption.isEmpty || count > maxTakeOverRetries)) {
if (singletonTerminated) stop()
else gotoStopping(singleton)
} else if (count <= maxTakeOverRetries) {
@ -663,6 +716,12 @@ class ClusterSingletonManager(
case Event(Terminated(ref), d @ WasOldestData(singleton, _, _)) if ref == singleton
stay using d.copy(singletonTerminated = true)
case Event(SelfExiting, _)
selfMemberExited()
// complete memberExitingProgress when handOverDone
sender() ! Done // reply to ask
stay
}
def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverTo: Option[ActorRef]): State = {
@ -684,12 +743,18 @@ class ClusterSingletonManager(
sender() ! HandOverInProgress
stay
case Event(SelfExiting, _)
selfMemberExited()
// complete memberExitingProgress when handOverDone
sender() ! Done // reply to ask
stay
}
def handOverDone(handOverTo: Option[ActorRef]): State = {
val newOldest = handOverTo.map(_.path.address)
logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest)
handOverTo foreach { _ ! HandOverDone }
memberExitingProgress.trySuccess(Done)
if (removed.contains(cluster.selfUniqueAddress)) {
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
@ -715,12 +780,16 @@ class ClusterSingletonManager(
stop()
}
def selfMemberExited(): Unit = {
selfExited = true
logInfo("Exited [{}]", cluster.selfAddress)
}
whenUnhandled {
case Event(MemberExited(m), _)
if (m.uniqueAddress == cluster.selfUniqueAddress) {
selfExited = true
logInfo("Exited [{}]", m.address)
}
case Event(SelfExiting, _)
selfMemberExited()
memberExitingProgress.trySuccess(Done)
sender() ! Done // reply to ask
stay
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited
logInfo("Self removed, stopping ClusterSingletonManager")

View file

@ -453,8 +453,10 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
val sys2 = ActorSystem(
system.name,
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port"
else s"akka.remote.netty.tcp.port=$port").withFallback(system.settings.config))
s"""
akka.remote.artery.canonical.port=$port
akka.remote.netty.tcp.port=$port
""").withFallback(system.settings.config))
Cluster(sys2).join(Cluster(sys2).selfAddress)
val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2")
ClusterClientReceptionist(sys2).registerService(service2)

View file

@ -141,9 +141,10 @@ class DistributedPubSubRestartSpec extends MultiNodeSpec(DistributedPubSubRestar
val newSystem = {
val port = Cluster(system).selfAddress.port.get
val config = ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port"
else s"akka.remote.netty.tcp.port=$port"
).withFallback(system.settings.config)
s"""
akka.remote.artery.canonical.port=$port
akka.remote.netty.tcp.port=$port
""").withFallback(system.settings.config)
ActorSystem(system.name, config)
}

View file

@ -130,8 +130,12 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan
}
runOn(first) {
cluster.registerOnMemberRemoved(testActor ! "MemberRemoved")
expectMsg(10.seconds, "stop")
expectMsg("postStop")
// CoordinatedShutdown makes sure that singleton actors are
// stopped before Cluster shutdown
expectMsg("MemberRemoved")
}
enterBarrier("first-stopped")
@ -153,13 +157,12 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan
}
enterBarrier("second-working")
runOn(third) {
cluster.leave(node(second).address)
}
runOn(second) {
cluster.registerOnMemberRemoved(testActor ! "MemberRemoved")
cluster.leave(node(second).address)
expectMsg(15.seconds, "stop")
expectMsg("postStop")
expectMsg("MemberRemoved")
}
enterBarrier("second-stopped")
@ -169,12 +172,11 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan
enterBarrier("third-started")
runOn(third) {
cluster.registerOnMemberRemoved(testActor ! "MemberRemoved")
cluster.leave(node(third).address)
}
runOn(third) {
expectMsg(5.seconds, "stop")
expectMsg("postStop")
expectMsg("MemberRemoved")
}
enterBarrier("third-stopped")

View file

@ -238,19 +238,27 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS
// make sure that the proxy has received membership changes
// and points to the current singleton
val p = TestProbe()
within(5.seconds) {
val oldestAddress = node(oldest).address
within(10.seconds) {
awaitAssert {
system.actorSelection("/user/consumerProxy").tell(Ping, p.ref)
p.expectMsg(1.second, Pong)
val replyFromAddress = p.lastSender.path.address
if (oldest == proxyNode)
replyFromAddress.hasLocalScope should ===(true)
else
replyFromAddress should ===(oldestAddress)
}
}
// then send the real message
system.actorSelection("/user/consumerProxy") ! msg
}
enterBarrier(s"sent-msg-$msg")
// expect a message on the oldest node
runOn(oldest) {
expectMsg(5.seconds, msg)
expectMsg(msg)
}
enterBarrier("after-" + msg + "-proxy-verified")

View file

@ -22,7 +22,8 @@ public class ClusterClientTest extends JUnitSuite {
new AkkaJUnitActorSystemResource("DistributedPubSubMediatorTest",
ConfigFactory.parseString(
"akka.actor.provider = \"cluster\"\n" +
"akka.remote.netty.tcp.port=0"));
"akka.remote.netty.tcp.port=0\n" +
"akka.remote.artery.canonical.port=0"));
private final ActorSystem system = actorSystemResource.getSystem();

View file

@ -26,7 +26,8 @@ public class DistributedPubSubMediatorTest extends JUnitSuite {
new AkkaJUnitActorSystemResource("DistributedPubSubMediatorTest",
ConfigFactory.parseString(
"akka.actor.provider = \"cluster\"\n" +
"akka.remote.netty.tcp.port=0"));
"akka.remote.netty.tcp.port=0\n" +
"akka.remote.artery.canonical.port=0"));
private final ActorSystem system = actorSystemResource.getSystem();

View file

@ -4,4 +4,8 @@ akka {
serialize-messages = on
warn-about-java-serializer-usage = off
}
}
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off

View file

@ -17,6 +17,7 @@ object DistributedPubSubMediatorRouterSpec {
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port=0
akka.remote.artery.canonical.port=0
akka.remote.log-remote-lifecycle-events = off
akka.cluster.pub-sub.routing-logic = $routingLogic
"""

View file

@ -31,6 +31,7 @@ class ClusterSingletonRestart2Spec extends AkkaSpec("""
akka.loglevel = INFO
akka.cluster.roles = [singleton]
akka.actor.provider = akka.cluster.ClusterActorRefProvider
akka.cluster.auto-down-unreachable-after = 2s
akka.remote {
netty.tcp {
hostname = "127.0.0.1"
@ -59,7 +60,7 @@ class ClusterSingletonRestart2Spec extends AkkaSpec("""
settings = ClusterSingletonManagerSettings(from).withRole("singleton")),
name = "echo")
within(10.seconds) {
within(45.seconds) {
awaitAssert {
Cluster(from) join Cluster(to).selfAddress
Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress)
@ -98,8 +99,10 @@ class ClusterSingletonRestart2Spec extends AkkaSpec("""
val sys4Config =
ConfigFactory.parseString(
if (RARP(sys1).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$sys2port"
else s"akka.remote.netty.tcp.port=$sys2port").withFallback(system.settings.config)
s"""
akka.remote.artery.canonical.port=$sys2port
akka.remote.netty.tcp.port=$sys2port
""").withFallback(system.settings.config)
ActorSystem(system.name, sys4Config)
}

View file

@ -17,6 +17,7 @@ import com.typesafe.config.ConfigFactory
class ClusterSingletonRestartSpec extends AkkaSpec("""
akka.loglevel = INFO
akka.actor.provider = akka.cluster.ClusterActorRefProvider
akka.cluster.auto-down-unreachable-after = 2s
akka.remote {
netty.tcp {
hostname = "127.0.0.1"
@ -73,9 +74,10 @@ class ClusterSingletonRestartSpec extends AkkaSpec("""
val sys3Config =
ConfigFactory.parseString(
if (RARP(sys1).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$sys1port"
else s"akka.remote.netty.tcp.port=$sys1port"
).withFallback(system.settings.config)
s"""
akka.remote.artery.canonical.port=$sys1port
akka.remote.netty.tcp.port=$sys1port
""").withFallback(system.settings.config)
ActorSystem(system.name, sys3Config)
}
@ -91,7 +93,7 @@ class ClusterSingletonRestartSpec extends AkkaSpec("""
Cluster(sys2).leave(Cluster(sys2).selfAddress)
within(10.seconds) {
within(15.seconds) {
awaitAssert {
Cluster(sys3).state.members.map(_.uniqueAddress) should ===(Set(Cluster(sys3).selfUniqueAddress))
}

View file

@ -72,6 +72,11 @@ akka {
# routers or other services to distribute work to certain member types,
# e.g. front-end and back-end nodes.
roles = []
# Run the coordinated shutdown from phase 'cluster-shutdown' when the cluster
# is shutdown for other reasons than when leaving, e.g. when downing. This
# will terminate the ActorSystem when the cluster extension is shutdown.
run-coordinated-shutdown-when-down = on
role {
# Minimum required number of members of a certain role before the leader

View file

@ -17,6 +17,11 @@ import scala.collection.breakOut
import akka.remote.QuarantinedEvent
import java.util.ArrayList
import java.util.Collections
import akka.pattern.ask
import akka.util.Timeout
import akka.Done
import scala.concurrent.Future
import scala.concurrent.Promise
/**
* Base trait for all cluster messages. All ClusterMessage's are serializable.
@ -107,6 +112,8 @@ private[cluster] object InternalClusterAction {
@SerialVersionUID(1L)
final case class InitJoinNack(address: Address) extends ClusterMessage with DeadLetterSuppression
final case class ExitingConfirmed(node: UniqueAddress) extends ClusterMessage with DeadLetterSuppression
/**
* Marker interface for periodic tick messages
*/
@ -139,8 +146,10 @@ private[cluster] object InternalClusterAction {
final case class AddOnMemberRemovedListener(callback: Runnable) extends NoSerializationVerificationNeeded
sealed trait SubscriptionMessage
final case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Set[Class[_]]) extends SubscriptionMessage
final case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage
final case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode,
to: Set[Class[_]]) extends SubscriptionMessage
final case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]])
extends SubscriptionMessage with DeadLetterSuppression
/**
* @param receiver [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the `receiver`
*/
@ -149,6 +158,9 @@ private[cluster] object InternalClusterAction {
sealed trait PublishMessage
final case class PublishChanges(newGossip: Gossip) extends PublishMessage
final case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage
final case object ExitingCompleted
}
/**
@ -165,6 +177,30 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
// Child actors are therefore created when GetClusterCoreRef is received
var coreSupervisor: Option[ActorRef] = None
val clusterShutdown = Promise[Done]()
val coordShutdown = CoordinatedShutdown(context.system)
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterLeave, "leave") {
val sys = context.system
()
if (Cluster(sys).isTerminated)
Future.successful(Done)
else {
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterLeave))
self.ask(CoordinatedShutdownLeave.LeaveReq).mapTo[Done]
}
}
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterShutdown, "wait-shutdown") { ()
clusterShutdown.future
}
override def postStop(): Unit = {
clusterShutdown.trySuccess(Done)
if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) {
// run the last phases e.g. if node was downed (not leaving)
coordShutdown.run(Some(CoordinatedShutdown.PhaseClusterShutdown))
}
}
def createChildren(): Unit = {
coreSupervisor = Some(context.actorOf(Props[ClusterCoreSupervisor].
withDispatcher(context.props.dispatcher), name = "core"))
@ -188,6 +224,10 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
context.actorOf(Props(classOf[ClusterMetricsCollector], publisher).
withDispatcher(context.props.dispatcher), name = "metrics")
}
case CoordinatedShutdownLeave.LeaveReq
val ref = context.actorOf(CoordinatedShutdownLeave.props().withDispatcher(context.props.dispatcher))
// forward the ask request
ref.forward(CoordinatedShutdownLeave.LeaveReq)
}
}
@ -267,6 +307,24 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
var seedNodeProcessCounter = 0 // for unique names
var leaderActionCounter = 0
var exitingTasksInProgress = false
val selfExiting = Promise[Done]()
val coordShutdown = CoordinatedShutdown(context.system)
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-exiting") { ()
selfExiting.future
}
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExitingDone, "exiting-completed") {
val sys = context.system
()
if (Cluster(sys).isTerminated)
Future.successful(Done)
else {
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExitingDone))
self.ask(ExitingCompleted).mapTo[Done]
}
}
var exitingConfirmed = Set.empty[UniqueAddress]
/**
* Looks up and returns the remote cluster command connection for the specific address.
*/
@ -320,16 +378,17 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
failureDetectorReaperTask.cancel()
leaderActionsTask.cancel()
publishStatsTask foreach { _.cancel() }
selfExiting.trySuccess(Done)
}
def uninitialized: Actor.Receive = {
def uninitialized: Actor.Receive = ({
case InitJoin sender() ! InitJoinNack(selfAddress)
case ClusterUserAction.JoinTo(address) join(address)
case JoinSeedNodes(newSeedNodes) joinSeedNodes(newSeedNodes)
case msg: SubscriptionMessage publisher forward msg
}
}: Actor.Receive).orElse(receiveExitingCompleted)
def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = {
def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = ({
case Welcome(from, gossip) welcome(joinWith, from, gossip)
case InitJoin sender() ! InitJoinNack(selfAddress)
case ClusterUserAction.JoinTo(address)
@ -346,7 +405,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if (seedNodes.nonEmpty) joinSeedNodes(seedNodes)
else join(joinWith)
}
}
}: Actor.Receive).orElse(receiveExitingCompleted)
def becomeUninitialized(): Unit = {
// make sure that join process is stopped
@ -364,7 +423,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
context.become(initialized)
}
def initialized: Actor.Receive = {
def initialized: Actor.Receive = ({
case msg: GossipEnvelope receiveGossip(msg)
case msg: GossipStatus receiveGossipStatus(msg)
case GossipTick gossipTick()
@ -385,19 +444,23 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
logInfo(
"Trying to join seed nodes [{}] when already part of a cluster, ignoring",
seedNodes.mkString(", "))
}
case ExitingConfirmed(address) receiveExitingConfirmed(address)
}: Actor.Receive).orElse(receiveExitingCompleted)
def removed: Actor.Receive = {
case msg: SubscriptionMessage publisher forward msg
def receiveExitingCompleted: Actor.Receive = {
case ExitingCompleted
exitingCompleted()
sender() ! Done // reply to ask
}
def receive = uninitialized
override def unhandled(message: Any): Unit = message match {
case _: Tick
case _: GossipEnvelope
case _: GossipStatus
case other super.unhandled(other)
case _: Tick
case _: GossipEnvelope
case _: GossipStatus
case _: ExitingConfirmed
case other super.unhandled(other)
}
def initJoin(): Unit = {
@ -580,6 +643,52 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
}
def exitingCompleted() = {
logInfo("Exiting completed")
// ExitingCompleted sent via CoordinatedShutdown to continue the leaving process.
exitingTasksInProgress = false
// mark as seen
latestGossip = latestGossip seen selfUniqueAddress
assertLatestGossip()
publish(latestGossip)
// Let others know (best effort) before shutdown. Otherwise they will not see
// convergence of the Exiting state until they have detected this node as
// unreachable and the required downing has finished. They will still need to detect
// unreachable, but Exiting unreachable will be removed without downing, i.e.
// normally the leaving of a leader will be graceful without the need
// for downing. However, if those final gossip messages never arrive it is
// alright to require the downing, because that is probably caused by a
// network failure anyway.
gossipRandomN(NumberOfGossipsBeforeShutdownWhenLeaderExits)
// send ExitingConfirmed to two potential leaders
val membersWithoutSelf = latestGossip.members.filterNot(_.uniqueAddress == selfUniqueAddress)
latestGossip.leaderOf(membersWithoutSelf, selfUniqueAddress) match {
case Some(node1)
clusterCore(node1.address) ! ExitingConfirmed(selfUniqueAddress)
latestGossip.leaderOf(membersWithoutSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match {
case Some(node2)
clusterCore(node2.address) ! ExitingConfirmed(selfUniqueAddress)
case None // no more potential leader
}
case None // no leader
}
shutdown()
}
def receiveExitingConfirmed(node: UniqueAddress): Unit = {
logInfo("Exiting confirmed [{}]", node.address)
exitingConfirmed += node
}
def cleanupExitingConfirmed(): Unit = {
// in case the actual removal was performed by another leader node we
if (exitingConfirmed.nonEmpty)
exitingConfirmed = exitingConfirmed.filter(n latestGossip.members.exists(_.uniqueAddress == n))
}
/**
* This method is called when a member sees itself as Exiting or Down.
*/
@ -694,13 +803,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val (winningGossip, talkback, gossipType) = comparison match {
case VectorClock.Same
// same version
(remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), Same)
val talkback = !exitingTasksInProgress && !remoteGossip.seenByNode(selfUniqueAddress)
(remoteGossip mergeSeen localGossip, talkback, Same)
case VectorClock.Before
// local is newer
(localGossip, true, Older)
case VectorClock.After
// remote is newer
(remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), Newer)
val talkback = !exitingTasksInProgress && !remoteGossip.seenByNode(selfUniqueAddress)
(remoteGossip, talkback, Newer)
case _
// conflicting versions, merge
// We can see that a removal was done when it is not in one of the gossips has status
@ -725,7 +836,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
(prunedRemoteGossip merge prunedLocalGossip, true, Merge)
}
latestGossip = winningGossip seen selfUniqueAddress
// Don't mark gossip state as seen while exiting is in progress, e.g.
// shutting down singleton actors. This delays removal of the member until
// the exiting tasks have been completed.
if (exitingTasksInProgress)
latestGossip = winningGossip
else
latestGossip = winningGossip seen selfUniqueAddress
assertLatestGossip()
// for all new joining nodes we remove them from the failure detector
@ -754,9 +871,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
publish(latestGossip)
val selfStatus = latestGossip.member(selfUniqueAddress).status
if (selfStatus == Exiting)
shutdown()
else if (talkback) {
if (selfStatus == Exiting && !exitingTasksInProgress) {
// ExitingCompleted will be received via CoordinatedShutdown to continue
// the leaving process. Meanwhile the gossip state is not marked as seen.
exitingTasksInProgress = true
logInfo("Exiting, starting coordinated shutdown")
selfExiting.trySuccess(Done)
coordShutdown.run()
}
if (talkback) {
// send back gossip to sender() when sender() had different view, i.e. merge, or sender() had
// older or sender() had newer
gossipTo(from, sender())
@ -875,7 +999,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
// only run the leader actions if we are the LEADER
val firstNotice = 20
val periodicNotice = 60
if (latestGossip.convergence(selfUniqueAddress)) {
if (latestGossip.convergence(selfUniqueAddress, exitingConfirmed)) {
if (leaderActionCounter >= firstNotice)
logInfo("Leader can perform its duties again")
leaderActionCounter = 0
@ -893,6 +1017,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}").mkString(", "))
}
}
cleanupExitingConfirmed()
shutdownSelfWhenDown()
}
@ -948,6 +1073,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
if Gossip.removeUnreachableWithMemberStatus(m.status)
} yield m
val removedExitingConfirmed = exitingConfirmed.filter(n localGossip.member(n).status == Exiting)
val changedMembers = localMembers collect {
var upNumber = 0
@ -971,14 +1098,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
}
if (removedUnreachable.nonEmpty || changedMembers.nonEmpty) {
if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty) {
// handle changes
// replace changed members
val newMembers = changedMembers union localMembers diff removedUnreachable
val newMembers = changedMembers.union(localMembers).diff(removedUnreachable)
.filterNot(m removedExitingConfirmed(m.uniqueAddress))
// removing REMOVED nodes from the `seen` table
val removed = removedUnreachable.map(_.uniqueAddress)
val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed)
val newSeen = localSeen diff removed
// removing REMOVED nodes from the `reachability` table
val newReachability = localOverview.reachability.remove(removed)
@ -992,7 +1120,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
}
val newGossip = localGossip copy (members = newMembers, overview = newOverview, version = newVersion)
if (!exitingTasksInProgress && newGossip.member(selfUniqueAddress).status == Exiting) {
// Leader is moving itself from Leaving to Exiting.
// ExitingCompleted will be received via CoordinatedShutdown to continue
// the leaving process. Meanwhile the gossip state is not marked as seen.
exitingTasksInProgress = true
logInfo("Exiting (leader), starting coordinated shutdown")
selfExiting.trySuccess(Done)
coordShutdown.run()
}
updateLatestGossip(newGossip)
exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed)
// log status changes
changedMembers foreach { m
@ -1004,23 +1143,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
val status = if (m.status == Exiting) "exiting" else "unreachable"
logInfo("Leader is removing {} node [{}]", status, m.address)
}
publish(latestGossip)
if (latestGossip.member(selfUniqueAddress).status == Exiting) {
// Leader is moving itself from Leaving to Exiting. Let others know (best effort)
// before shutdown. Otherwise they will not see the Exiting state change
// and there will not be convergence until they have detected this node as
// unreachable and the required downing has finished. They will still need to detect
// unreachable, but Exiting unreachable will be removed without downing, i.e.
// normally the leaving of a leader will be graceful without the need
// for downing. However, if those final gossip messages never arrive it is
// alright to require the downing, because that is probably caused by a
// network failure anyway.
gossipRandomN(NumberOfGossipsBeforeShutdownWhenLeaderExits)
shutdown()
removedExitingConfirmed.foreach { n
logInfo("Leader is removing confirmed Exiting node [{}]", n.address)
}
publish(latestGossip)
}
}
@ -1144,10 +1271,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def updateLatestGossip(newGossip: Gossip): Unit = {
// Updating the vclock version for the changes
val versionedGossip = newGossip :+ vclockNode
// Nobody else have seen this gossip but us
val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress)
// Update the state with the new gossip
latestGossip = seenVersionedGossip
// Don't mark gossip state as seen while exiting is in progress, e.g.
// shutting down singleton actors. This delays removal of the member until
// the exiting tasks have been completed.
if (exitingTasksInProgress)
latestGossip = versionedGossip.clearSeen()
else {
// Nobody else has seen this gossip but us
val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress)
// Update the state with the new gossip
latestGossip = seenVersionedGossip
}
assertLatestGossip()
}

View file

@ -13,6 +13,7 @@ import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
import akka.event.EventStream
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.actor.DeadLetterSuppression
/**
* Domain events published to the event bus.
@ -199,7 +200,7 @@ object ClusterEvent {
* This event is published when the cluster node is shutting down,
* before the final [[MemberRemoved]] events are published.
*/
final case object ClusterShuttingDown extends ClusterDomainEvent
final case object ClusterShuttingDown extends ClusterDomainEvent with DeadLetterSuppression
/**
* Java API: get the singleton instance of `ClusterShuttingDown` event
@ -335,9 +336,9 @@ object ClusterEvent {
private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] =
if (newGossip eq oldGossip) Nil
else {
val newConvergence = newGossip.convergence(selfUniqueAddress)
val newConvergence = newGossip.convergence(selfUniqueAddress, Set.empty)
val newSeenBy = newGossip.seenBy
if (newConvergence != oldGossip.convergence(selfUniqueAddress) || newSeenBy != oldGossip.seenBy)
if (newConvergence != oldGossip.convergence(selfUniqueAddress, Set.empty) || newSeenBy != oldGossip.seenBy)
List(SeenChanged(newConvergence, newSeenBy.map(_.address)))
else Nil
}

View file

@ -103,6 +103,7 @@ final class ClusterSettings(val config: Config, val systemName: String) {
case (key, value: ConfigObject) key value.toConfig.getInt("min-nr-of-members")
}.toMap
}
val RunCoordinatedShutdownWhenDown: Boolean = cc.getBoolean("run-coordinated-shutdown-when-down")
val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled")
val UseDispatcher: String = cc.getString("use-dispatcher") match {
case "" Dispatchers.DefaultDispatcherId

View file

@ -0,0 +1,56 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import akka.Done
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus._
/**
* INTERNAL API
*/
private[akka] object CoordinatedShutdownLeave {
def props(): Props = Props[CoordinatedShutdownLeave]
case object LeaveReq
}
/**
* INTERNAL API
*/
private[akka] class CoordinatedShutdownLeave extends Actor {
import CoordinatedShutdownLeave.LeaveReq
val cluster = Cluster(context.system)
override def postStop(): Unit = {
cluster.unsubscribe(self)
}
def receive = {
case LeaveReq
// MemberRemoved is needed in case it was downed instead
cluster.leave(cluster.selfAddress)
cluster.subscribe(self, classOf[MemberLeft], classOf[MemberRemoved])
context.become(waitingLeaveCompleted(sender()))
}
def waitingLeaveCompleted(replyTo: ActorRef): Receive = {
case s: CurrentClusterState
if (s.members.exists(m m.uniqueAddress == cluster.selfUniqueAddress &&
(m.status == Leaving || m.status == Exiting || m.status == Down))) {
replyTo ! Done
context.stop(self)
}
case evt: MemberEvent
if (evt.member.uniqueAddress == cluster.selfUniqueAddress) {
replyTo ! Done
context.stop(self)
}
}
}

View file

@ -114,6 +114,13 @@ private[cluster] final case class Gossip(
this copy (overview = overview copy (seen = Set(node)))
}
/**
* Remove all seen entries
*/
def clearSeen(): Gossip = {
this copy (overview = overview copy (seen = Set.empty))
}
/**
* The nodes that have seen the current version of the Gossip.
*/
@ -158,7 +165,7 @@ private[cluster] final case class Gossip(
*
* @return true if convergence have been reached and false if not
*/
def convergence(selfUniqueAddress: UniqueAddress): Boolean = {
def convergence(selfUniqueAddress: UniqueAddress, exitingConfirmed: Set[UniqueAddress]): Boolean = {
// First check that:
// 1. we don't have any members that are unreachable, excluding observations from members
// that have status DOWN, or
@ -167,10 +174,11 @@ private[cluster] final case class Gossip(
// When that is done we check that all members with a convergence
// status is in the seen table, i.e. has seen this version
val unreachable = reachabilityExcludingDownedObservers.allUnreachableOrTerminated.collect {
case node if (node != selfUniqueAddress) member(node)
case node if (node != selfUniqueAddress && !exitingConfirmed(node)) member(node)
}
unreachable.forall(m Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) &&
!members.exists(m Gossip.convergenceMemberStatus(m.status) && !seenByNode(m.uniqueAddress))
!members.exists(m Gossip.convergenceMemberStatus(m.status) &&
!(seenByNode(m.uniqueAddress) || exitingConfirmed(m.uniqueAddress)))
}
lazy val reachabilityExcludingDownedObservers: Reachability = {
@ -187,7 +195,7 @@ private[cluster] final case class Gossip(
def roleLeader(role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] =
leaderOf(members.filter(_.hasRole(role)), selfUniqueAddress)
private def leaderOf(mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = {
def leaderOf(mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = {
val reachableMembers =
if (overview.reachability.isAllReachable) mbrs.filterNot(_.status == Down)
else mbrs.filter(m m.status != Down &&

View file

@ -19,6 +19,7 @@ import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.concurrent.duration.Deadline
import java.io.NotSerializableException
import akka.cluster.InternalClusterAction.ExitingConfirmed
/**
* Protobuf serializer of cluster messages.
@ -57,6 +58,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
classOf[InternalClusterAction.InitJoinNack] (bytes InternalClusterAction.InitJoinNack(addressFromBinary(bytes))),
classOf[ClusterHeartbeatSender.Heartbeat] (bytes ClusterHeartbeatSender.Heartbeat(addressFromBinary(bytes))),
classOf[ClusterHeartbeatSender.HeartbeatRsp] (bytes ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes))),
classOf[ExitingConfirmed] (bytes InternalClusterAction.ExitingConfirmed(uniqueAddressFromBinary(bytes))),
classOf[GossipStatus] gossipStatusFromBinary,
classOf[GossipEnvelope] gossipEnvelopeFromBinary,
classOf[MetricsGossipEnvelope] metricsGossipEnvelopeFromBinary)
@ -64,18 +66,19 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
def includeManifest: Boolean = true
def toBinary(obj: AnyRef): Array[Byte] = obj match {
case ClusterHeartbeatSender.Heartbeat(from) addressToProtoByteArray(from)
case ClusterHeartbeatSender.HeartbeatRsp(from) uniqueAddressToProtoByteArray(from)
case m: GossipEnvelope gossipEnvelopeToProto(m).toByteArray
case m: GossipStatus gossipStatusToProto(m).toByteArray
case m: MetricsGossipEnvelope compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles) joinToProto(node, roles).toByteArray
case InternalClusterAction.Welcome(from, gossip) compress(welcomeToProto(from, gossip))
case ClusterUserAction.Leave(address) addressToProtoByteArray(address)
case ClusterUserAction.Down(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoin cm.Empty.getDefaultInstance.toByteArray
case InternalClusterAction.InitJoinAck(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoinNack(address) addressToProtoByteArray(address)
case ClusterHeartbeatSender.Heartbeat(from) addressToProtoByteArray(from)
case ClusterHeartbeatSender.HeartbeatRsp(from) uniqueAddressToProtoByteArray(from)
case m: GossipEnvelope gossipEnvelopeToProto(m).toByteArray
case m: GossipStatus gossipStatusToProto(m).toByteArray
case m: MetricsGossipEnvelope compress(metricsGossipEnvelopeToProto(m))
case InternalClusterAction.Join(node, roles) joinToProto(node, roles).toByteArray
case InternalClusterAction.Welcome(from, gossip) compress(welcomeToProto(from, gossip))
case ClusterUserAction.Leave(address) addressToProtoByteArray(address)
case ClusterUserAction.Down(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoin cm.Empty.getDefaultInstance.toByteArray
case InternalClusterAction.InitJoinAck(address) addressToProtoByteArray(address)
case InternalClusterAction.InitJoinNack(address) addressToProtoByteArray(address)
case InternalClusterAction.ExitingConfirmed(node) uniqueAddressToProtoByteArray(node)
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}")
}

View file

@ -15,9 +15,8 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig
val second = role("second")
val third = role("third")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(
"akka.cluster.auto-down-unreachable-after = 0s")).
withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
commonConfig(debugConfig(on = false)
.withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet))
}
class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec
@ -36,7 +35,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
awaitClusterUp(first, second, third)
within(30.seconds) {
within(15.seconds) {
runOn(first) {
cluster.leave(second)
}
@ -44,7 +43,9 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec
runOn(first, third) {
enterBarrier("second-shutdown")
markNodeAsUnavailable(second)
// this test verifies that the removal is performed via the ExitingCompleted message,
// otherwise we would have `markNodeAsUnavailable(second)` to trigger the FailureDetectorPuppet
// verify that the 'second' node is no longer part of the 'members'/'unreachable' set
awaitAssert {
clusterView.members.map(_.address) should not contain (address(second))

View file

@ -69,17 +69,11 @@ abstract class QuickRestartSpec
system.name,
// use the same port
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled)
s"""
s"""
akka.cluster.roles = [round-$n]
akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}
akka.remote.artery.canonical.port = ${Cluster(restartingSystem).selfAddress.port.get}
"""
else
s"""
akka.cluster.roles = [round-$n]
akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get}
"""
).withFallback(system.settings.config))
""").withFallback(system.settings.config))
log.info("Restarting node has address: {}", Cluster(restartingSystem).selfUniqueAddress)
Cluster(restartingSystem).joinSeedNodes(seedNodes)
within(20.seconds) {

View file

@ -55,11 +55,10 @@ abstract class RestartFirstSeedNodeSpec
lazy val restartedSeed1System = ActorSystem(
system.name,
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled)
"akka.remote.artery.canonical.port=" + seedNodes.head.port.get
else
"akka.remote.netty.tcp.port=" + seedNodes.head.port.get
).withFallback(system.settings.config))
s"""
akka.remote.netty.tcp.port = ${seedNodes.head.port.get}
akka.remote.artery.canonical.port = ${seedNodes.head.port.get}
""").withFallback(system.settings.config))
override def afterAll(): Unit = {
runOn(seed1) {

View file

@ -56,10 +56,10 @@ abstract class RestartNode2SpecSpec
system.name,
ConfigFactory.parseString(
s"""
akka.remote.netty.tcp.port= ${seedNodes.head.port.get}
akka.remote.netty.tcp.port = ${seedNodes.head.port.get}
akka.remote.artery.canonical.port = ${seedNodes.head.port.get}
#akka.remote.retry-gate-closed-for = 1s
""").
withFallback(system.settings.config))
""").withFallback(system.settings.config))
override def afterAll(): Unit = {
runOn(seed1) {

View file

@ -51,11 +51,10 @@ abstract class RestartNode3Spec
lazy val restartedSecondSystem = ActorSystem(
system.name,
ConfigFactory.parseString(
if (RARP(system).provider.remoteSettings.Artery.Enabled)
"akka.remote.artery.canonical.port=" + secondUniqueAddress.address.port.get
else
"akka.remote.netty.tcp.port=" + secondUniqueAddress.address.port.get
).withFallback(system.settings.config))
s"""
akka.remote.artery.canonical.port = ${secondUniqueAddress.address.port.get}
akka.remote.netty.tcp.port = ${secondUniqueAddress.address.port.get}
""").withFallback(system.settings.config))
override def afterAll(): Unit = {
runOn(second) {

View file

@ -71,8 +71,10 @@ abstract class RestartNodeSpec
lazy val restartedSecondSystem = ActorSystem(
system.name,
ConfigFactory.parseString("akka.remote.netty.tcp.port=" + secondUniqueAddress.address.port.get).
withFallback(system.settings.config))
ConfigFactory.parseString(s"""
akka.remote.netty.tcp.port = ${secondUniqueAddress.address.port.get}
akka.remote.artery.canonical.port = ${secondUniqueAddress.address.port.get}
""").withFallback(system.settings.config))
override def afterAll(): Unit = {
runOn(second) {

View file

@ -5,3 +5,8 @@ akka {
warn-about-java-serializer-usage = off
}
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off

View file

@ -34,6 +34,7 @@ object ClusterDeployerSpec {
}
}
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
""", ConfigParseOptions.defaults)
class RecipeActor extends Actor {

View file

@ -24,6 +24,7 @@ object ClusterDomainEventPublisherSpec {
val config = """
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
"""
}

View file

@ -33,7 +33,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val eJoining = TestMember(Address("akka.tcp", "sys", "e", 2552), Joining, eRoles)
val eUp = TestMember(Address("akka.tcp", "sys", "e", 2552), Up, eRoles)
val eDown = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, eRoles)
val selfDummyAddress = UniqueAddress(Address("akka.tcp", "sys", "selfDummy", 2552), 17)
val selfDummyAddress = UniqueAddress(Address("akka.tcp", "sys", "selfDummy", 2552), 17L)
private[cluster] def converge(gossip: Gossip): (Gossip, Set[UniqueAddress]) =
((gossip, Set.empty[UniqueAddress]) /: gossip.members) { case ((gs, as), m) (gs.seen(m.uniqueAddress), as + m.uniqueAddress) }

View file

@ -40,11 +40,11 @@ object ClusterHeartbeatSenderStateSpec {
class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers {
import ClusterHeartbeatSenderStateSpec._
val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1)
val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2)
val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3)
val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4)
val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5)
val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1L)
val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2L)
val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3L)
val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4L)
val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5L)
private def emptyState: ClusterHeartbeatSenderState = emptyState(aa)
@ -142,7 +142,8 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers {
"behave correctly for random operations" in {
val rnd = ThreadLocalRandom.current
val nodes = (1 to rnd.nextInt(10, 200)).map(n UniqueAddress(Address("akka.tcp", "sys", "n" + n, 2552), n)).toVector
val nodes = (1 to rnd.nextInt(10, 200)).map(n
UniqueAddress(Address("akka.tcp", "sys", "n" + n, 2552), n.toLong)).toVector
def rndNode() = nodes(rnd.nextInt(0, nodes.size))
val selfUniqueAddress = rndNode()
var state = emptyState(selfUniqueAddress)

View file

@ -16,6 +16,10 @@ import akka.testkit.TestProbe
import akka.actor.ActorSystem
import akka.actor.Props
import com.typesafe.config.ConfigFactory
import akka.actor.CoordinatedShutdown
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent._
import scala.concurrent.Await
object ClusterSpec {
val config = """
@ -28,7 +32,7 @@ object ClusterSpec {
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.remote.netty.tcp.port = 0
#akka.loglevel = DEBUG
akka.remote.artery.canonical.port = 0
"""
final case class GossipTo(address: Address)
@ -109,6 +113,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
"""))
try {
val ref = sys2.actorOf(Props.empty)
@ -137,5 +142,77 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {
cluster.remotePathOf(testActor).uid should ===(testActor.path.uid)
cluster.remotePathOf(testActor).address should ===(selfAddress)
}
"leave via CoordinatedShutdown.run" in {
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
"""))
try {
val probe = TestProbe()(sys2)
Cluster(sys2).subscribe(probe.ref, classOf[MemberEvent])
probe.expectMsgType[CurrentClusterState]
Cluster(sys2).join(Cluster(sys2).selfAddress)
probe.expectMsgType[MemberUp]
CoordinatedShutdown(sys2).run()
probe.expectMsgType[MemberLeft]
probe.expectMsgType[MemberExited]
probe.expectMsgType[MemberRemoved]
} finally {
shutdown(sys2)
}
}
"terminate ActorSystem via leave (CoordinatedShutdown)" in {
val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.coordinated-shutdown.terminate-actor-system = on
"""))
try {
val probe = TestProbe()(sys2)
Cluster(sys2).subscribe(probe.ref, classOf[MemberEvent])
probe.expectMsgType[CurrentClusterState]
Cluster(sys2).join(Cluster(sys2).selfAddress)
probe.expectMsgType[MemberUp]
Cluster(sys2).leave(Cluster(sys2).selfAddress)
probe.expectMsgType[MemberLeft]
probe.expectMsgType[MemberExited]
probe.expectMsgType[MemberRemoved]
Await.result(sys2.whenTerminated, 10.seconds)
Cluster(sys2).isTerminated should ===(true)
} finally {
shutdown(sys2)
}
}
"terminate ActorSystem via down (CoordinatedShutdown)" in {
val sys3 = ActorSystem("ClusterSpec3", ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.coordinated-shutdown.terminate-actor-system = on
akka.cluster.run-coordinated-shutdown-when-down = on
akka.loglevel=DEBUG
"""))
try {
val probe = TestProbe()(sys3)
Cluster(sys3).subscribe(probe.ref, classOf[MemberEvent])
probe.expectMsgType[CurrentClusterState]
Cluster(sys3).join(Cluster(sys3).selfAddress)
probe.expectMsgType[MemberUp]
Cluster(sys3).down(Cluster(sys3).selfAddress)
probe.expectMsgType[MemberRemoved]
Await.result(sys3.whenTerminated, 10.seconds)
Cluster(sys3).isTerminated should ===(true)
} finally {
shutdown(sys3)
}
}
}
}

View file

@ -28,43 +28,57 @@ class GossipSpec extends WordSpec with Matchers {
"A Gossip" must {
"reach convergence when it's empty" in {
Gossip.empty.convergence(a1.uniqueAddress) should ===(true)
Gossip.empty.convergence(a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence for one node" in {
val g1 = (Gossip(members = SortedSet(a1))).seen(a1.uniqueAddress)
g1.convergence(a1.uniqueAddress) should ===(true)
val g1 = Gossip(members = SortedSet(a1)).seen(a1.uniqueAddress)
g1.convergence(a1.uniqueAddress, Set.empty) should ===(true)
}
"not reach convergence until all have seen version" in {
val g1 = (Gossip(members = SortedSet(a1, b1))).seen(a1.uniqueAddress)
g1.convergence(a1.uniqueAddress) should ===(false)
val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress)
g1.convergence(a1.uniqueAddress, Set.empty) should ===(false)
}
"reach convergence for two nodes" in {
val g1 = (Gossip(members = SortedSet(a1, b1))).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress) should ===(true)
val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence, skipping joining" in {
// e1 is joining
val g1 = (Gossip(members = SortedSet(a1, b1, e1))).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress) should ===(true)
val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence, skipping down" in {
// e3 is down
val g1 = (Gossip(members = SortedSet(a1, b1, e3))).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress) should ===(true)
val g1 = Gossip(members = SortedSet(a1, b1, e3)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence, skipping Leaving with exitingConfirmed" in {
// c1 is Leaving
val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true)
}
"reach convergence, skipping unreachable Leaving with exitingConfirmed" in {
// c1 is Leaving
val r1 = Reachability.empty.unreachable(b1.uniqueAddress, c1.uniqueAddress)
val g1 = Gossip(members = SortedSet(a1, b1, c1), overview = GossipOverview(reachability = r1))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true)
}
"not reach convergence when unreachable" in {
val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress)
val g1 = (Gossip(members = SortedSet(a1, b1), overview = GossipOverview(reachability = r1)))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress)
g1.convergence(b1.uniqueAddress) should ===(false)
g1.convergence(b1.uniqueAddress, Set.empty) should ===(false)
// but from a1's point of view (it knows that itself is not unreachable)
g1.convergence(a1.uniqueAddress) should ===(true)
g1.convergence(a1.uniqueAddress, Set.empty) should ===(true)
}
"reach convergence when downed node has observed unreachable" in {
@ -72,7 +86,7 @@ class GossipSpec extends WordSpec with Matchers {
val r1 = Reachability.empty.unreachable(e3.uniqueAddress, a1.uniqueAddress)
val g1 = (Gossip(members = SortedSet(a1, b1, e3), overview = GossipOverview(reachability = r1)))
.seen(a1.uniqueAddress).seen(b1.uniqueAddress).seen(e3.uniqueAddress)
g1.convergence(b1.uniqueAddress) should ===(true)
g1.convergence(b1.uniqueAddress, Set.empty) should ===(true)
}
"merge members by status priority" in {

View file

@ -14,7 +14,7 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with Matchers {
val iterations = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.iterations").getOrElse("1000").toInt
def createHeartbeatNodeRingOfSize(size: Int): HeartbeatNodeRing = {
val nodes = (1 to size).map(n UniqueAddress(Address("akka.tcp", "sys", "node-" + n, 2552), n))
val nodes = (1 to size).map(n UniqueAddress(Address("akka.tcp", "sys", "node-" + n, 2552), n.toLong))
val selfAddress = nodes(size / 2)
HeartbeatNodeRing(selfAddress, nodes.toSet, Set.empty, 5)
}

View file

@ -10,12 +10,12 @@ import akka.actor.Address
class HeartbeatNodeRingSpec extends WordSpec with Matchers {
val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1)
val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2)
val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3)
val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4)
val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5)
val ff = UniqueAddress(Address("akka.tcp", "sys", "ff", 2552), 6)
val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1L)
val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2L)
val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3L)
val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4L)
val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5L)
val ff = UniqueAddress(Address("akka.tcp", "sys", "ff", 2552), 6L)
val nodes = Set(aa, bb, cc, dd, ee, ff)

View file

@ -50,7 +50,7 @@ class MemberOrderingSpec extends WordSpec with Matchers {
"have stable equals and hashCode" in {
val address = Address("akka.tcp", "sys1", "host1", 9000)
val m1 = m(address, Joining)
val m11 = Member(UniqueAddress(address, -3), Set.empty)
val m11 = Member(UniqueAddress(address, -3L), Set.empty)
val m2 = m1.copy(status = Up)
val m22 = m11.copy(status = Up)
val m3 = m(address.copy(port = Some(10000)), Up)
@ -81,7 +81,7 @@ class MemberOrderingSpec extends WordSpec with Matchers {
// different uid
val a = m(address1, Joining)
val b = Member(UniqueAddress(address1, -3), Set.empty)
val b = Member(UniqueAddress(address1, -3L), Set.empty)
Member.ordering.compare(a, b) should ===(1)
Member.ordering.compare(b, a) should ===(-1)

View file

@ -19,9 +19,9 @@ class ReachabilityPerfSpec extends WordSpec with Matchers {
private def createReachabilityOfSize(base: Reachability, size: Int): Reachability =
(base /: (1 to size)) {
case (r, i)
val observer = UniqueAddress(address.copy(host = Some("node-" + i)), i)
val observer = UniqueAddress(address.copy(host = Some("node-" + i)), i.toLong)
val j = if (i == size) 1 else i + 1
val subject = UniqueAddress(address.copy(host = Some("node-" + j)), j)
val subject = UniqueAddress(address.copy(host = Some("node-" + j)), j.toLong)
r.unreachable(observer, subject).reachable(observer, subject)
}

View file

@ -12,11 +12,11 @@ class ReachabilitySpec extends WordSpec with Matchers {
import Reachability.{ Reachable, Unreachable, Terminated, Record }
val nodeA = UniqueAddress(Address("akka.tcp", "sys", "a", 2552), 1)
val nodeB = UniqueAddress(Address("akka.tcp", "sys", "b", 2552), 2)
val nodeC = UniqueAddress(Address("akka.tcp", "sys", "c", 2552), 3)
val nodeD = UniqueAddress(Address("akka.tcp", "sys", "d", 2552), 4)
val nodeE = UniqueAddress(Address("akka.tcp", "sys", "e", 2552), 5)
val nodeA = UniqueAddress(Address("akka.tcp", "sys", "a", 2552), 1L)
val nodeB = UniqueAddress(Address("akka.tcp", "sys", "b", 2552), 2L)
val nodeC = UniqueAddress(Address("akka.tcp", "sys", "c", 2552), 3L)
val nodeD = UniqueAddress(Address("akka.tcp", "sys", "d", 2552), 4L)
val nodeE = UniqueAddress(Address("akka.tcp", "sys", "e", 2552), 5L)
"Reachability table" must {

View file

@ -17,6 +17,7 @@ object StartupWithOneThreadSpec {
akka.actor.provider = "cluster"
akka.actor.creation-timeout = 10s
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.actor.default-dispatcher {
executor = thread-pool-executor

View file

@ -10,5 +10,5 @@ object TestMember {
apply(address, status, Set.empty)
def apply(address: Address, status: MemberStatus, roles: Set[String]): Member =
new Member(UniqueAddress(address, 0), Int.MaxValue, status, roles)
new Member(UniqueAddress(address, 0L), Int.MaxValue, status, roles)
}

View file

@ -43,9 +43,9 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
"be serializable" in {
val address = Address("akka.tcp", "system", "some.host.org", 4711)
val uniqueAddress = UniqueAddress(address, 17)
val uniqueAddress = UniqueAddress(address, 17L)
val address2 = Address("akka.tcp", "system", "other.host.org", 4711)
val uniqueAddress2 = UniqueAddress(address2, 18)
val uniqueAddress2 = UniqueAddress(address2, 18L)
checkSerialization(InternalClusterAction.Join(uniqueAddress, Set("foo", "bar")))
checkSerialization(ClusterUserAction.Leave(address))
checkSerialization(ClusterUserAction.Down(address))
@ -54,6 +54,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
checkSerialization(InternalClusterAction.InitJoinNack(address))
checkSerialization(ClusterHeartbeatSender.Heartbeat(address))
checkSerialization(ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress))
checkSerialization(InternalClusterAction.ExitingConfirmed(uniqueAddress))
val node1 = VectorClock.Node("node1")
val node2 = VectorClock.Node("node2")

View file

@ -24,6 +24,7 @@ object ClusterRouterSupervisorSpec {
class ClusterRouterSupervisorSpec extends AkkaSpec("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
""") {
import ClusterRouterSupervisorSpec._

View file

@ -15,6 +15,7 @@ import akka.routing.ActorRefRoutee
class WeightedRouteesSpec extends AkkaSpec(ConfigFactory.parseString("""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
""")) {
val protocol =

View file

@ -3,3 +3,7 @@ akka.actor {
serialize-creators = on
warn-about-java-serializer-usage = off
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off

View file

@ -293,11 +293,8 @@ to the ``ShardRegion`` actor to handoff all shards that are hosted by that ``Sha
During this period other regions will buffer messages for those shards in the same way as when a rebalance is
triggered by the coordinator. When the shards have been stopped the coordinator will allocate these shards elsewhere.
When the ``ShardRegion`` has terminated you probably want to ``leave`` the cluster, and shut down the ``ActorSystem``.
This is how to do that:
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#graceful-shutdown
This is performed automatically by the :ref:`coordinated-shutdown-lambda` and is therefore part of the
graceful leaving process of a cluster member.
.. _RemoveInternalClusterShardingData-java:

View file

@ -186,11 +186,16 @@ It can also be performed programmatically with:
.. includecode:: code/docs/cluster/ClusterDocTest.java#leave
Note that this command can be issued to any member in the cluster, not necessarily the
one that is leaving. The cluster extension, but not the actor system or JVM, of the
leaving member will be shutdown after the leader has changed status of the member to
`Exiting`. Thereafter the member will be removed from the cluster. Normally this is handled
automatically, but in case of network failures during this process it might still be necessary
to set the nodes status to ``Down`` in order to complete the removal.
one that is leaving.
The :ref:`coordinated-shutdown-lambda` will automatically run when the cluster node sees itself as
``Exiting``, i.e. leaving from another node will trigger the shutdown process on the leaving node.
Tasks for graceful leaving of cluster including graceful shutdown of Cluster Singletons and
Cluster Sharding are added automatically when Akka Cluster is used, i.e. running the shutdown
process will also trigger the graceful leaving if it's not already in progress.
Normally this is handled automatically, but in case of network failures during this process it might still
be necessary to set the nodes status to ``Down`` in order to complete the removal.
.. _weakly_up_java:
@ -357,9 +362,7 @@ How To Cleanup when Member is Removed
You can do some clean up in a ``registerOnMemberRemoved`` callback, which will
be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown.
For example, this is how to shut down the ``ActorSystem`` and thereafter exit the JVM:
.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnRemoved
An alternative is to register tasks to the :ref:`coordinated-shutdown-lambda`.
.. note::
Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on

View file

@ -12,11 +12,17 @@ import akka.testkit.TestEvent;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import docs.AbstractJavaTest;
import docs.actor.ActorDocTest.FirstActor;
import scala.PartialFunction;
import scala.runtime.BoxedUnit;
import static docs.actorlambda.Messages.Swap.Swap;
import static docs.actorlambda.Messages.*;
import static akka.japi.Util.immutableSeq;
import akka.actor.CoordinatedShutdown;
import static akka.pattern.PatternsCS.ask;
import akka.util.Timeout;
import akka.Done;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
@ -677,5 +683,31 @@ public class ActorDocTest extends AbstractJavaTest {
system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions)));
}
}
@Test
public void coordinatedShutdown() {
final ActorRef someActor = system.actorOf(Props.create(FirstActor.class));
//#coordinated-shutdown-addTask
CoordinatedShutdown.get(system).addTask(
CoordinatedShutdown.PhaseBeforeServiceUnbind(), "someTaskName",
() -> {
return ask(someActor, "stop", new Timeout(5, TimeUnit.SECONDS))
.thenApply(reply -> Done.getInstance());
});
//#coordinated-shutdown-addTask
//#coordinated-shutdown-jvm-hook
CoordinatedShutdown.get(system).addJvmShutdownHook(() ->
System.out.println("custom JVM shutdown hook...")
);
//#coordinated-shutdown-jvm-hook
// don't run this
if (false) {
//#coordinated-shutdown-run
CompletionStage<Done> done = CoordinatedShutdown.get(system).runAll();
//#coordinated-shutdown-run
}
}
}

View file

@ -764,6 +764,76 @@ before stopping the target actor. Simple cleanup tasks can be handled in ``postS
within a supervisor you control and only in response to a :class:`Terminated`
message, i.e. not for top-level actors.
.. _coordinated-shutdown-lambda:
Coordinated Shutdown
--------------------
There is an extension named ``CoordinatedShutdown`` that will stop certain actors and
services in a specific order and perform registered tasks during the shutdown process.
The order of the shutdown phases is defined in configuration ``akka.coordinated-shutdown.phases``.
The default phases are defined as:
.. includecode:: ../../../akka-actor/src/main/resources/reference.conf#coordinated-shutdown-phases
More phases can be be added in the application's configuration if needed by overriding a phase with an
additional ``depends-on``. Especially the phases ``before-service-unbind``, ``before-cluster-shutdown`` and
``before-actor-system-terminate`` are intended for application specific phases or tasks.
The default phases are defined in a single linear order, but the phases can be ordered as a
directed acyclic graph (DAG) by defining the dependencies between the phases.
The phases are ordered with `topological <https://en.wikipedia.org/wiki/Topological_sorting>`_ sort of the DAG.
Tasks can be added to a phase with:
.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-addTask
The returned ``CompletionStage<Done>`` should be completed when the task is completed. The task name parameter
is only used for debugging/logging.
Tasks added to the same phase are executed in parallel without any ordering assumptions.
Next phase will not start until all tasks of previous phase have been completed.
If tasks are not completed within a configured timeout (see :ref:`reference.conf <config-akka-actor>`)
the next phase will be started anyway. It is possible to configure ``recover=off`` for a phase
to abort the rest of the shutdown process if a task fails or is not completed within the timeout.
Tasks should typically be registered as early as possible after system startup. When running
the coordinated shutdown tasks that have been registered will be performed but tasks that are
added too late will not be run.
To start the coordinated shutdown process you can invoke ``runAll`` on the ``CoordinatedShutdown``
extension:
.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-run
It's safe to call the ``runAll`` method multiple times. It will only run once.
That also means that the ``ActorSystem`` will be terminated in the last phase. By default, the
JVM is not forcefully stopped (it will be stopped if all non-daemon threads have been terminated).
To enable a hard ``System.exit`` as a final action you can configure::
akka.coordinated-shutdown.exit-jvm = on
When using :ref:`Akka Cluster <cluster_usage_java>` the ``CoordinatedShutdown`` will automatically run
when the cluster node sees itself as ``Exiting``, i.e. leaving from another node will trigger
the shutdown process on the leaving node. Tasks for graceful leaving of cluster including graceful
shutdown of Cluster Singletons and Cluster Sharding are added automatically when Akka Cluster is used,
i.e. running the shutdown process will also trigger the graceful leaving if it's not already in progress.
By default, the ``CoordinatedShutdown`` will be run when the JVM process exits, e.g.
via ``kill SIGTERM`` signal (``SIGINT`` ctrl-c doesn't work). This behavior can be disabled with::
akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off
If you have application specific JVM shutdown hooks it's recommended that you register them via the
``CoordinatedShutdown`` so that they are running before Akka internal shutdown hooks, e.g.
those shutting down Akka Remoting (Artery).
.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-jvm-hook
.. _actor-hotswap-lambda:
Become/Unbecome

View file

@ -714,6 +714,73 @@ before stopping the target actor. Simple cleanup tasks can be handled in ``postS
returned. In order to guarantee proper deregistration, only reuse names from
within a supervisor you control and only in response to a :class:`Terminated`
message, i.e. not for top-level actors.
Coordinated Shutdown
--------------------
There is an extension named ``CoordinatedShutdown`` that will stop certain actors and
services in a specific order and perform registered tasks during the shutdown process.
The order of the shutdown phases is defined in configuration ``akka.coordinated-shutdown.phases``.
The default phases are defined as:
.. includecode:: ../../../akka-actor/src/main/resources/reference.conf#coordinated-shutdown-phases
More phases can be be added in the application's configuration if needed by overriding a phase with an
additional ``depends-on``. Especially the phases ``before-service-unbind``, ``before-cluster-shutdown`` and
``before-actor-system-terminate`` are intended for application specific phases or tasks.
The default phases are defined in a single linear order, but the phases can be ordered as a
directed acyclic graph (DAG) by defining the dependencies between the phases.
The phases are ordered with `topological <https://en.wikipedia.org/wiki/Topological_sorting>`_ sort of the DAG.
Tasks can be added to a phase with:
.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-addTask
The returned ``CompletionStage<Done>`` should be completed when the task is completed. The task name parameter
is only used for debugging/logging.
Tasks added to the same phase are executed in parallel without any ordering assumptions.
Next phase will not start until all tasks of previous phase have been completed.
If tasks are not completed within a configured timeout (see :ref:`reference.conf <config-akka-actor>`)
the next phase will be started anyway. It is possible to configure ``recover=off`` for a phase
to abort the rest of the shutdown process if a task fails or is not completed within the timeout.
Tasks should typically be registered as early as possible after system startup. When running
the coordinated shutdown tasks that have been registered will be performed but tasks that are
added too late will not be run.
To start the coordinated shutdown process you can invoke ``runAll`` on the ``CoordinatedShutdown``
extension:
.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-run
It's safe to call the ``runAll`` method multiple times. It will only run once.
That also means that the ``ActorSystem`` will be terminated in the last phase. By default, the
JVM is not forcefully stopped (it will be stopped if all non-daemon threads have been terminated).
To enable a hard ``System.exit`` as a final action you can configure::
akka.coordinated-shutdown.exit-jvm = on
When using :ref:`Akka Cluster <cluster_usage_java>` the ``CoordinatedShutdown`` will automatically run
when the cluster node sees itself as ``Exiting``, i.e. leaving from another node will trigger
the shutdown process on the leaving node. Tasks for graceful leaving of cluster including graceful
shutdown of Cluster Singletons and Cluster Sharding are added automatically when Akka Cluster is used,
i.e. running the shutdown process will also trigger the graceful leaving if it's not already in progress.
By default, the ``CoordinatedShutdown`` will be run when the JVM process exits, e.g.
via ``kill SIGTERM`` signal (``SIGINT`` ctrl-c doesn't work). This behavior can be disabled with::
akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off
If you have application specific JVM shutdown hooks it's recommended that you register them via the
``CoordinatedShutdown`` so that they are running before Akka internal shutdown hooks, e.g.
those shutting down Akka Remoting (Artery).
.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-jvm-hook
.. _UntypedActor.HotSwap:

View file

@ -4,8 +4,8 @@
Migration Guide 2.4.x to 2.5.x
##############################
Akka Actor
==========
Actor
=====
Actor DSL deprecation
---------------------
@ -13,8 +13,8 @@ Actor DSL deprecation
Actor DSL is a rarely used feature and thus will be deprecated and removed.
Use plain ``system.actorOf`` instead of the DSL to create Actors if you have been using it.
Akka Streams
============
Streams
=======
Removal of StatefulStage, PushPullStage
---------------------------------------
@ -85,6 +85,25 @@ in akka-remote's `reference.conf`_.
Cluster
=======
Coordinated Shutdown
--------------------
There is a new extension named ``CoordinatedShutdown`` that will stop certain actors and
services in a specific order and perform registered tasks during the shutdown process.
When using Akka Cluster, tasks for graceful leaving of cluster including graceful
shutdown of Cluster Singletons and Cluster Sharding are now performed automatically.
Previously it was documented that things like terminating the ``ActorSystem`` should be
done when the cluster member was removed, but this was very difficult to get right.
That is now taken care of automatically. This might result in changed behavior, hopefully
to the better. It might also be in conflict with your previous shutdown code so please
read the documentation for the Coordinated Shutdown and revisit your own implementations.
Most likely your implementation will not be needed any more or it can be simplified.
More information can be found in the :ref:`documentation for Scala <coordinated-shutdown-scala>` or
:ref:`documentation for Java <coordinated-shutdown-lambda>`
Cluster Management Command Line Tool
------------------------------------
@ -97,8 +116,8 @@ See documentation of `akka/akka-cluster-management <https://github.com/akka/akka
The command line script for cluster management has been deprecated and is scheduled for removal
in the next major version. Use the HTTP API with `curl <https://curl.haxx.se/>`_ or similar instead.
Akka Persistence
================
Persistence
===========
Removal of PersistentView
-------------------------
@ -123,8 +142,8 @@ non-sharable journal or snapshot store. The proxy is available by setting ``akka
``akka.persistence.snapshot-store.plugin`` to ``akka.persistence.journal.proxy`` or ``akka.persistence.snapshot-store.proxy``,
respectively. The proxy supplants the :ref:`Shared LevelDB journal<shared-leveldb-journal>`.
Akka Persistence Query
======================
Persistence Query
=================
Persistence Query has been promoted to a stable module.
Only slight API changes were made since the module was introduced:

View file

@ -785,6 +785,76 @@ before stopping the target actor. Simple cleanup tasks can be handled in ``postS
within a supervisor you control and only in response to a :class:`Terminated`
message, i.e. not for top-level actors.
.. _coordinated-shutdown-scala:
Coordinated Shutdown
--------------------
There is an extension named ``CoordinatedShutdown`` that will stop certain actors and
services in a specific order and perform registered tasks during the shutdown process.
The order of the shutdown phases is defined in configuration ``akka.coordinated-shutdown.phases``.
The default phases are defined as:
.. includecode:: ../../../akka-actor/src/main/resources/reference.conf#coordinated-shutdown-phases
More phases can be be added in the application's configuration if needed by overriding a phase with an
additional ``depends-on``. Especially the phases ``before-service-unbind``, ``before-cluster-shutdown`` and
``before-actor-system-terminate`` are intended for application specific phases or tasks.
The default phases are defined in a single linear order, but the phases can be ordered as a
directed acyclic graph (DAG) by defining the dependencies between the phases.
The phases are ordered with `topological <https://en.wikipedia.org/wiki/Topological_sorting>`_ sort of the DAG.
Tasks can be added to a phase with:
.. includecode:: code/docs/actor/ActorDocSpec.scala#coordinated-shutdown-addTask
The returned ``Future[Done]`` should be completed when the task is completed. The task name parameter
is only used for debugging/logging.
Tasks added to the same phase are executed in parallel without any ordering assumptions.
Next phase will not start until all tasks of previous phase have been completed.
If tasks are not completed within a configured timeout (see :ref:`reference.conf <config-akka-actor>`)
the next phase will be started anyway. It is possible to configure ``recover=off`` for a phase
to abort the rest of the shutdown process if a task fails or is not completed within the timeout.
Tasks should typically be registered as early as possible after system startup. When running
the coordinated shutdown tasks that have been registered will be performed but tasks that are
added too late will not be run.
To start the coordinated shutdown process you can invoke ``run`` on the ``CoordinatedShutdown``
extension:
.. includecode:: code/docs/actor/ActorDocSpec.scala#coordinated-shutdown-run
It's safe to call the ``run`` method multiple times. It will only run once.
That also means that the ``ActorSystem`` will be terminated in the last phase. By default, the
JVM is not forcefully stopped (it will be stopped if all non-daemon threads have been terminated).
To enable a hard ``System.exit`` as a final action you can configure::
akka.coordinated-shutdown.exit-jvm = on
When using :ref:`Akka Cluster <cluster_usage_scala>` the ``CoordinatedShutdown`` will automatically run
when the cluster node sees itself as ``Exiting``, i.e. leaving from another node will trigger
the shutdown process on the leaving node. Tasks for graceful leaving of cluster including graceful
shutdown of Cluster Singletons and Cluster Sharding are added automatically when Akka Cluster is used,
i.e. running the shutdown process will also trigger the graceful leaving if it's not already in progress.
By default, the ``CoordinatedShutdown`` will be run when the JVM process exits, e.g.
via ``kill SIGTERM`` signal (``SIGINT`` ctrl-c doesn't work). This behavior can be disabled with::
akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off
If you have application specific JVM shutdown hooks it's recommended that you register them via the
``CoordinatedShutdown`` so that they are running before Akka internal shutdown hooks, e.g.
those shutting down Akka Remoting (Artery).
.. includecode:: code/docs/actor/ActorDocSpec.scala#coordinated-shutdown-jvm-hook
.. _Actor.HotSwap:
Become/Unbecome

View file

@ -295,11 +295,8 @@ You can send the message ``ShardRegion.GracefulShutdown`` message to the ``Shard
During this period other regions will buffer messages for those shards in the same way as when a rebalance is
triggered by the coordinator. When the shards have been stopped the coordinator will allocate these shards elsewhere.
When the ``ShardRegion`` has terminated you probably want to ``leave`` the cluster, and shut down the ``ActorSystem``.
This is how to do that:
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala#graceful-shutdown
This is performed automatically by the :ref:`coordinated-shutdown-scala` and is therefore part of the
graceful leaving process of a cluster member.
.. _RemoveInternalClusterShardingData-scala:

View file

@ -182,11 +182,16 @@ It can also be performed programmatically with:
.. includecode:: code/docs/cluster/ClusterDocSpec.scala#leave
Note that this command can be issued to any member in the cluster, not necessarily the
one that is leaving. The cluster extension, but not the actor system or JVM, of the
leaving member will be shutdown after the leader has changed status of the member to
`Exiting`. Thereafter the member will be removed from the cluster. Normally this is handled
automatically, but in case of network failures during this process it might still be necessary
to set the nodes status to ``Down`` in order to complete the removal.
one that is leaving.
The :ref:`coordinated-shutdown-scala` will automatically run when the cluster node sees itself as
``Exiting``, i.e. leaving from another node will trigger the shutdown process on the leaving node.
Tasks for graceful leaving of cluster including graceful shutdown of Cluster Singletons and
Cluster Sharding are added automatically when Akka Cluster is used, i.e. running the shutdown
process will also trigger the graceful leaving if it's not already in progress.
Normally this is handled automatically, but in case of network failures during this process it might still
be necessary to set the nodes status to ``Down`` in order to complete the removal.
.. _weakly_up_scala:
@ -353,9 +358,7 @@ How To Cleanup when Member is Removed
You can do some clean up in a ``registerOnMemberRemoved`` callback, which will
be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown.
For example, this is how to shut down the ``ActorSystem`` and thereafter exit the JVM:
.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnRemoved
An alternative is to register tasks to the :ref:`coordinated-shutdown-scala`.
.. note::
Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on

View file

@ -20,6 +20,8 @@ import akka.testkit._
import akka.util._
import scala.concurrent.duration._
import scala.concurrent.Await
import akka.Done
import akka.actor.CoordinatedShutdown
//#my-actor
class MyActor extends Actor {
@ -631,4 +633,30 @@ class ActorDocSpec extends AkkaSpec("""
})
}
"using CoordinatedShutdown" in {
val someActor = system.actorOf(Props(classOf[Replier], this))
//#coordinated-shutdown-addTask
CoordinatedShutdown(system).addTask(
CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { () =>
import akka.pattern.ask
import system.dispatcher
implicit val timeout = Timeout(5.seconds)
(someActor ? "stop").map(_ => Done)
}
//#coordinated-shutdown-addTask
//#coordinated-shutdown-jvm-hook
CoordinatedShutdown(system).addJvmShutdownHook { () =>
println("custom JVM shutdown hook...")
}
//#coordinated-shutdown-jvm-hook
// don't run this
def dummy(): Unit = {
//#coordinated-shutdown-run
val done: Future[Done] = CoordinatedShutdown(system).run()
//#coordinated-shutdown-run
}
}
}

View file

@ -33,7 +33,7 @@ object MaxThroughputSpec extends MultiNodeConfig {
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(s"""
# for serious measurements you should increase the totalMessagesFactor (20)
akka.test.MaxThroughputSpec.totalMessagesFactor = 1.0
akka.test.MaxThroughputSpec.totalMessagesFactor = 10.0
akka.test.MaxThroughputSpec.real-message = off
akka {
loglevel = INFO

View file

@ -460,7 +460,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
private lazy val shutdownHook = new Thread {
override def run(): Unit = {
if (hasBeenShutdown.compareAndSet(false, true)) {
log.debug("Shutting down [{}] via shutdownHook", localAddress)
val coord = CoordinatedShutdown(system)
val totalTimeout = coord.totalTimeout()
if (!coord.jvmHooksLatch.await(totalTimeout.toMillis, TimeUnit.MILLISECONDS))
log.warning(
"CoordinatedShutdown took longer than [{}]. Shutting down [{}] via shutdownHook",
totalTimeout, localAddress)
else
log.debug("Shutting down [{}] via shutdownHook", localAddress)
Await.result(internalShutdown(), settings.Advanced.DriverTimeout + 3.seconds)
}
}

View file

@ -34,39 +34,6 @@ public class FactorialFrontendMain {
});
//#registerOnUp
//#registerOnRemoved
Cluster.get(system).registerOnMemberRemoved(new Runnable() {
@Override
public void run() {
// exit JVM when ActorSystem has been terminated
final Runnable exit = new Runnable() {
@Override public void run() {
System.exit(0);
}
};
system.registerOnTermination(exit);
// shut down ActorSystem
system.terminate();
// In case ActorSystem shutdown takes longer than 10 seconds,
// exit the JVM forcefully anyway.
// We must spawn a separate thread to not block current thread,
// since that would have blocked the shutdown of the ActorSystem.
new Thread() {
@Override public void run(){
try {
Await.ready(system.whenTerminated(), Duration.create(10, TimeUnit.SECONDS));
} catch (Exception e) {
System.exit(-1);
}
}
}.start();
}
});
//#registerOnRemoved
}
}

View file

@ -60,25 +60,5 @@ object FactorialFrontend {
}
//#registerOnUp
//#registerOnRemoved
Cluster(system).registerOnMemberRemoved {
// exit JVM when ActorSystem has been terminated
system.registerOnTermination(System.exit(0))
// shut down ActorSystem
system.terminate()
// In case ActorSystem shutdown takes longer than 10 seconds,
// exit the JVM forcefully anyway.
// We must spawn a separate thread to not block current thread,
// since that would have blocked the shutdown of the ActorSystem.
new Thread {
override def run(): Unit = {
if (Try(Await.ready(system.whenTerminated, 10.seconds)).isFailure)
System.exit(-1)
}
}.start()
}
//#registerOnRemoved
}
}

View file

@ -10,7 +10,7 @@ import akka.stream.impl.StreamLayout.Module
import akka.stream.impl._
import akka.stream.impl.fusing._
import akka.stream.stage._
import org.reactivestreams.{Processor, Publisher, Subscriber, Subscription}
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable

View file

@ -82,6 +82,11 @@ object MiMa extends AutoPlugin {
import com.typesafe.tools.mima.core._
val bcIssuesBetween24and25 = Seq(
// #21537 coordinated shutdown
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.removed"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.convergence"),
// #21423 removal of deprecated stages (in 2.5.x)
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.transform"),
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SubSource.transform"),