diff --git a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala new file mode 100644 index 0000000000..5ef3f7789f --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala @@ -0,0 +1,296 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.actor + +import scala.concurrent.duration._ +import scala.concurrent.Await +import scala.concurrent.Future + +import akka.Done +import akka.testkit.AkkaSpec +import com.typesafe.config.ConfigFactory +import akka.actor.CoordinatedShutdown.Phase +import scala.concurrent.Promise +import java.util.concurrent.TimeoutException + +class CoordinatedShutdownSpec extends AkkaSpec { + + def extSys = system.asInstanceOf[ExtendedActorSystem] + + // some convenience to make the test readable + def phase(dependsOn: String*): Phase = Phase(dependsOn.toSet, timeout = 10.seconds, recover = true) + val emptyPhase: Phase = Phase(Set.empty, timeout = 10.seconds, recover = true) + + private def checkTopologicalSort(phases: Map[String, Phase]): List[String] = { + val result = CoordinatedShutdown.topologicalSort(phases) + result.zipWithIndex.foreach { + case (phase, i) ⇒ + phases.get(phase) match { + case Some(Phase(dependsOn, _, _)) ⇒ + dependsOn.foreach { depPhase ⇒ + withClue(s"phase [$phase] depends on [$depPhase] but was ordered before it in topological sort result $result") { + i should be > result.indexOf(depPhase) + } + } + case None ⇒ // ok + } + } + result + } + + "CoordinatedShutdown" must { + + "sort phases in topolgical order" in { + checkTopologicalSort(Map.empty) should ===(Nil) + + checkTopologicalSort(Map( + "a" → emptyPhase)) should ===(List("a")) + + checkTopologicalSort(Map( + "b" → phase("a"))) should ===(List("a", "b")) + + val result1 = checkTopologicalSort(Map( + "c" → phase("a"), "b" → phase("a"))) + result1.head should ===("a") + // b, c can be in any order + result1.toSet should ===(Set("a", "b", "c")) + + checkTopologicalSort(Map( + "b" → phase("a"), "c" → phase("b"))) should ===(List("a", "b", "c")) + + checkTopologicalSort(Map( + "b" → phase("a"), "c" → phase("a", "b"))) should ===(List("a", "b", "c")) + + val result2 = checkTopologicalSort(Map( + "c" → phase("a", "b"))) + result2.last should ===("c") + // a, b can be in any order + result2.toSet should ===(Set("a", "b", "c")) + + checkTopologicalSort(Map( + "b" → phase("a"), "c" → phase("b"), "d" → phase("b", "c"), + "e" → phase("d"))) should ===( + List("a", "b", "c", "d", "e")) + + val result3 = checkTopologicalSort(Map( + "a2" → phase("a1"), "a3" → phase("a2"), + "b2" → phase("b1"), "b3" → phase("b2"))) + val (a, b) = result3.partition(_.charAt(0) == 'a') + a should ===(List("a1", "a2", "a3")) + b should ===(List("b1", "b2", "b3")) + } + + "detect cycles in phases (non-DAG)" in { + intercept[IllegalArgumentException] { + CoordinatedShutdown.topologicalSort(Map( + "a" → phase("a"))) + } + + intercept[IllegalArgumentException] { + CoordinatedShutdown.topologicalSort(Map( + "b" → phase("a"), "a" → phase("b"))) + } + + intercept[IllegalArgumentException] { + CoordinatedShutdown.topologicalSort(Map( + "c" → phase("a"), "c" → phase("b"), "b" → phase("c"))) + } + + intercept[IllegalArgumentException] { + CoordinatedShutdown.topologicalSort(Map( + "d" → phase("a"), "d" → phase("c"), "c" → phase("b"), "b" → phase("d"))) + } + + } + + "have pre-defined phases from config" in { + import CoordinatedShutdown._ + CoordinatedShutdown(system).orderedPhases should ===(List( + PhaseBeforeServiceUnbind, + PhaseServiceUnbind, + PhaseServiceRequestsDone, + PhaseServiceStop, + PhaseBeforeClusterShutdown, + PhaseClusterShardingShutdownRegion, + PhaseClusterLeave, + PhaseClusterExiting, + PhaseClusterExitingDone, + PhaseClusterShutdown, + PhaseBeforeActorSystemTerminate, + PhaseActorSystemTerminate)) + } + + "run ordered phases" in { + import system.dispatcher + val phases = Map( + "a" → emptyPhase, + "b" → phase("a"), + "c" → phase("b", "a")) + val co = new CoordinatedShutdown(extSys, phases) + co.addTask("a", "a1") { () ⇒ + testActor ! "A" + Future.successful(Done) + } + co.addTask("b", "b1") { () ⇒ + testActor ! "B" + Future.successful(Done) + } + co.addTask("b", "b2") { () ⇒ + Future { + // to verify that c is not performed before b + Thread.sleep(100) + testActor ! "B" + Done + } + } + co.addTask("c", "c1") { () ⇒ + testActor ! "C" + Future.successful(Done) + } + Await.result(co.run(), remainingOrDefault) + receiveN(4) should ===(List("A", "B", "B", "C")) + } + + "run from a given phase" in { + import system.dispatcher + val phases = Map( + "a" → emptyPhase, + "b" → phase("a"), + "c" → phase("b", "a")) + val co = new CoordinatedShutdown(extSys, phases) + co.addTask("a", "a1") { () ⇒ + testActor ! "A" + Future.successful(Done) + } + co.addTask("b", "b1") { () ⇒ + testActor ! "B" + Future.successful(Done) + } + co.addTask("c", "c1") { () ⇒ + testActor ! "C" + Future.successful(Done) + } + Await.result(co.run(Some("b")), remainingOrDefault) + receiveN(2) should ===(List("B", "C")) + } + + "only run once" in { + import system.dispatcher + val phases = Map("a" → emptyPhase) + val co = new CoordinatedShutdown(extSys, phases) + co.addTask("a", "a1") { () ⇒ + testActor ! "A" + Future.successful(Done) + } + Await.result(co.run(), remainingOrDefault) + expectMsg("A") + Await.result(co.run(), remainingOrDefault) + testActor ! "done" + expectMsg("done") // no additional A + } + + "continue after timeout or failure" in { + import system.dispatcher + val phases = Map( + "a" → emptyPhase, + "b" → Phase(dependsOn = Set("a"), timeout = 100.millis, recover = true), + "c" → phase("b", "a")) + val co = new CoordinatedShutdown(extSys, phases) + co.addTask("a", "a1") { () ⇒ + testActor ! "A" + Future.failed(new RuntimeException("boom")) + } + co.addTask("a", "a2") { () ⇒ + Future { + // to verify that b is not performed before a also in case of failure + Thread.sleep(100) + testActor ! "A" + Done + } + } + co.addTask("b", "b1") { () ⇒ + testActor ! "B" + Promise[Done]().future // never completed + } + co.addTask("c", "c1") { () ⇒ + testActor ! "C" + Future.successful(Done) + } + Await.result(co.run(), remainingOrDefault) + expectMsg("A") + expectMsg("A") + expectMsg("B") + expectMsg("C") + } + + "abort if recover=off" in { + import system.dispatcher + val phases = Map( + "b" → Phase(dependsOn = Set("a"), timeout = 100.millis, recover = false), + "c" → phase("b", "a")) + val co = new CoordinatedShutdown(extSys, phases) + co.addTask("b", "b1") { () ⇒ + testActor ! "B" + Promise[Done]().future // never completed + } + co.addTask("c", "c1") { () ⇒ + testActor ! "C" + Future.successful(Done) + } + val result = co.run() + expectMsg("B") + intercept[TimeoutException] { + Await.result(result, remainingOrDefault) + } + expectNoMsg(200.millis) // C not run + } + + "be possible to add tasks in later phase from task in earlier phase" in { + import system.dispatcher + val phases = Map( + "a" → emptyPhase, + "b" → phase("a")) + val co = new CoordinatedShutdown(extSys, phases) + co.addTask("a", "a1") { () ⇒ + testActor ! "A" + co.addTask("b", "b1") { () ⇒ + testActor ! "B" + Future.successful(Done) + } + Future.successful(Done) + } + Await.result(co.run(), remainingOrDefault) + expectMsg("A") + expectMsg("B") + } + + "parse phases from config" in { + CoordinatedShutdown.phasesFromConfig(ConfigFactory.parseString(""" + default-phase-timeout = 10s + phases { + a = {} + b { + depends-on = [a] + timeout = 15s + } + c { + depends-on = [a, b] + recover = off + } + } + """)) should ===(Map( + "a" → Phase(dependsOn = Set.empty, timeout = 10.seconds, recover = true), + "b" → Phase(dependsOn = Set("a"), timeout = 15.seconds, recover = true), + "c" → Phase(dependsOn = Set("a", "b"), timeout = 10.seconds, recover = false))) + } + + // this must be the last test, since it terminates the ActorSystem + "terminate ActorSystem" in { + Await.result(CoordinatedShutdown(system).run(), 10.seconds) should ===(Done) + system.whenTerminated.isCompleted should ===(true) + } + + } + +} diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 004829290a..0704cccc03 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -907,4 +907,115 @@ akka { } + # CoordinatedShutdown is an extension that will perform registered + # tasks in the order that is defined by the phases. It is started + # by calling CoordinatedShutdown(system).run(). This can be triggered + # by different things, for example: + # - JVM shutdown hook will by default run CoordinatedShutdown + # - Cluster node will automatically run CoordinatedShutdown when it + # sees itself as Exiting + # - A management console or other application specific command can + # run CoordinatedShutdown + coordinated-shutdown { + # The timeout that will be used for a phase if not specified with + # 'timeout' in the phase + default-phase-timeout = 5 s + + # Terminate the ActorSystem in the last phase actor-system-terminate. + terminate-actor-system = on + + # Exit the JVM (System.exit(0)) in the last phase actor-system-terminate + # if this is set to 'on'. It is done after termination of the + # ActorSystem if terminate-actor-system=on, otherwise it is done + # immediately when the last phase is reached. + exit-jvm = off + + # Run the coordinated shutdown when the JVM process exits, e.g. + # via kill SIGTERM signal (SIGINT ctrl-c doesn't work). + run-by-jvm-shutdown-hook = on + + #//#coordinated-shutdown-phases + # CoordinatedShutdown will run the tasks that are added to these + # phases. The phases can be ordered as a DAG by defining the + # dependencies between the phases. + # Each phase is defined as a named config section with the + # following optional properties: + # - timeout=15s: Override the default-phase-timeout for this phase. + # - recover=off: If the phase fails the shutdown is aborted + # and depending phases will not be executed. + # depends-on=[]: Run the phase after the given phases + phases { + + # The first pre-defined phase that applications can add tasks to. + # Note that more phases can be be added in the application's + # configuration by overriding this phase with an additional + # depends-on. + before-service-unbind { + } + + # Stop accepting new incoming requests in for example HTTP. + service-unbind { + depends-on = [before-service-unbind] + } + + # Wait for requests that are in progress to be completed. + service-requests-done { + depends-on = [service-unbind] + } + + # Final shutdown of service endpoints. + service-stop { + depends-on = [service-requests-done] + } + + # Phase for custom application tasks that are to be run + # after service shutdown and before cluster shutdown. + before-cluster-shutdown { + depends-on = [service-stop] + } + + # Graceful shutdown of the Cluster Sharding regions. + cluster-sharding-shutdown-region { + timeout = 10 s + depends-on = [before-cluster-shutdown] + } + + # Emit the leave command for the node that is shutting down. + cluster-leave { + depends-on = [cluster-sharding-shutdown-region] + } + + # Shutdown cluster singletons + cluster-exiting { + timeout = 10 s + depends-on = [cluster-leave] + } + + # Wait until exiting has been completed + cluster-exiting-done { + depends-on = [cluster-exiting] + } + + # Shutdown the cluster extension + cluster-shutdown { + depends-on = [cluster-exiting-done] + } + + # Phase for custom application tasks that are to be run + # after cluster shutdown and before ActorSystem termination. + before-actor-system-terminate { + depends-on = [cluster-shutdown] + } + + # Last phase. See terminate-actor-system and exit-jvm above. + # Don't add phases that depends on this phase because the + # dispatcher and scheduler of the ActorSystem have been shutdown. + actor-system-terminate { + timeout = 10 s + depends-on = [before-actor-system-terminate] + } + } + #//#coordinated-shutdown-phases + } + } diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala new file mode 100644 index 0000000000..fcb30d7f25 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -0,0 +1,404 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.actor + +import scala.concurrent.duration._ +import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters._ +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.TimeUnit.MILLISECONDS + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.concurrent.Promise + +import akka.Done +import com.typesafe.config.Config +import scala.concurrent.duration.FiniteDuration +import scala.annotation.tailrec +import com.typesafe.config.ConfigFactory +import akka.pattern.after +import java.util.concurrent.TimeoutException +import scala.util.control.NonFatal +import akka.event.Logging +import akka.dispatch.ExecutionContexts +import java.util.concurrent.Executors +import scala.util.Try +import scala.concurrent.Await +import java.util.concurrent.CountDownLatch +import java.util.concurrent.atomic.AtomicReference +import java.util.function.Supplier +import java.util.concurrent.CompletionStage +import java.util.Optional + +object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with ExtensionIdProvider { + val PhaseBeforeServiceUnbind = "before-service-unbind" + val PhaseServiceUnbind = "service-unbind" + val PhaseServiceRequestsDone = "service-requests-done" + val PhaseServiceStop = "service-stop" + val PhaseBeforeClusterShutdown = "before-cluster-shutdown" + val PhaseClusterShardingShutdownRegion = "cluster-sharding-shutdown-region" + val PhaseClusterLeave = "cluster-leave" + val PhaseClusterExiting = "cluster-exiting" + val PhaseClusterExitingDone = "cluster-exiting-done" + val PhaseClusterShutdown = "cluster-shutdown" + val PhaseBeforeActorSystemTerminate = "before-actor-system-terminate" + val PhaseActorSystemTerminate = "actor-system-terminate" + + @volatile private var runningJvmHook = false + + override def get(system: ActorSystem): CoordinatedShutdown = super.get(system) + + override def lookup = CoordinatedShutdown + + override def createExtension(system: ExtendedActorSystem): CoordinatedShutdown = { + val conf = system.settings.config.getConfig("akka.coordinated-shutdown") + val phases = phasesFromConfig(conf) + val coord = new CoordinatedShutdown(system, phases) + initPhaseActorSystemTerminate(system, conf, coord) + initJvmHook(system, conf, coord) + coord + } + + private def initPhaseActorSystemTerminate(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = { + val terminateActorSystem = conf.getBoolean("terminate-actor-system") + val exitJvm = conf.getBoolean("exit-jvm") + if (terminateActorSystem || exitJvm) { + coord.addTask(PhaseActorSystemTerminate, "terminate-system") { () ⇒ + if (exitJvm && terminateActorSystem) { + // In case ActorSystem shutdown takes longer than the phase timeout, + // exit the JVM forcefully anyway. + // We must spawn a separate thread to not block current thread, + // since that would have blocked the shutdown of the ActorSystem. + val timeout = coord.timeout(PhaseActorSystemTerminate) + val t = new Thread { + override def run(): Unit = { + if (Try(Await.ready(system.whenTerminated, timeout)).isFailure && !runningJvmHook) + System.exit(0) + } + } + t.setName("CoordinatedShutdown-exit") + t.start() + } + + if (terminateActorSystem) { + system.terminate().map { _ ⇒ + if (exitJvm && !runningJvmHook) System.exit(0) + Done + }(ExecutionContexts.sameThreadExecutionContext) + } else if (exitJvm) { + System.exit(0) + Future.successful(Done) + } else + Future.successful(Done) + } + } + } + + private def initJvmHook(system: ActorSystem, conf: Config, coord: CoordinatedShutdown): Unit = { + val runByJvmShutdownHook = conf.getBoolean("run-by-jvm-shutdown-hook") + if (runByJvmShutdownHook) { + coord.addJvmShutdownHook { () ⇒ + runningJvmHook = true // avoid System.exit from PhaseActorSystemTerminate task + if (!system.whenTerminated.isCompleted) { + coord.log.info("Starting coordinated shutdown from JVM shutdown hook") + try + Await.ready(coord.run(), coord.totalTimeout()) + catch { + case NonFatal(e) ⇒ + coord.log.warning( + "CoordinatedShutdown from JVM shutdown failed: {}", + e.getMessage) + } + } + } + } + } + + /** + * INTERNAL API + */ + private[akka] final case class Phase(dependsOn: Set[String], timeout: FiniteDuration, recover: Boolean) + + /** + * INTERNAL API + */ + private[akka] def phasesFromConfig(conf: Config): Map[String, Phase] = { + import scala.collection.JavaConverters._ + val defaultPhaseTimeout = conf.getString("default-phase-timeout") + val phasesConf = conf.getConfig("phases") + val defaultPhaseConfig = ConfigFactory.parseString(s""" + timeout = $defaultPhaseTimeout + recover = true + depends-on = [] + """) + phasesConf.root.unwrapped.asScala.toMap.map { + case (k, _: java.util.Map[_, _]) ⇒ + val c = phasesConf.getConfig(k).withFallback(defaultPhaseConfig) + val dependsOn = c.getStringList("depends-on").asScala.toSet + val timeout = c.getDuration("timeout", MILLISECONDS).millis + val recover = c.getBoolean("recover") + k → Phase(dependsOn, timeout, recover) + case (k, v) ⇒ + throw new IllegalArgumentException(s"Expected object value for [$k], got [$v]") + } + } + + /** + * INTERNAL API: https://en.wikipedia.org/wiki/Topological_sorting + */ + private[akka] def topologicalSort(phases: Map[String, Phase]): List[String] = { + var result = List.empty[String] + var unmarked = phases.keySet ++ phases.values.flatMap(_.dependsOn) // in case phase is not defined as key + var tempMark = Set.empty[String] // for detecting cycles + + while (unmarked.nonEmpty) { + depthFirstSearch(unmarked.head) + } + + def depthFirstSearch(u: String): Unit = { + if (tempMark(u)) + throw new IllegalArgumentException("Cycle detected in graph of phases. It must be a DAG. " + + s"phase [$u] depends transitively on itself. All dependencies: $phases") + if (unmarked(u)) { + tempMark += u + phases.get(u) match { + case Some(Phase(dependsOn, _, _)) ⇒ dependsOn.foreach(depthFirstSearch) + case None ⇒ + } + unmarked -= u // permanent mark + tempMark -= u + result = u :: result + } + } + + result.reverse + } + +} + +final class CoordinatedShutdown private[akka] ( + system: ExtendedActorSystem, + phases: Map[String, CoordinatedShutdown.Phase]) extends Extension { + import CoordinatedShutdown.Phase + + /** INTERNAL API */ + private[akka] val log = Logging(system, getClass) + private val knownPhases = phases.keySet ++ phases.values.flatMap(_.dependsOn) + /** INTERNAL API */ + private[akka] val orderedPhases = CoordinatedShutdown.topologicalSort(phases) + private val tasks = new ConcurrentHashMap[String, Vector[(String, () ⇒ Future[Done])]] + private val runStarted = new AtomicBoolean(false) + private val runPromise = Promise[Done]() + + private var _jvmHooksLatch = new AtomicReference[CountDownLatch](new CountDownLatch(0)) + + /** + * INTERNAL API + */ + private[akka] def jvmHooksLatch: CountDownLatch = _jvmHooksLatch.get + + /** + * Scala API: Add a task to a phase. It doesn't remove previously added tasks. + * Tasks added to the same phase are executed in parallel without any + * ordering assumptions. Next phase will not start until all tasks of + * previous phase have been completed. + * + * Tasks should typically be registered as early as possible after system + * startup. When running the coordinated shutdown tasks that have been registered + * will be performed but tasks that are added too late will not be run. + * It is possible to add a task to a later phase by a task in an earlier phase + * and it will be performed. + */ + @tailrec def addTask(phase: String, taskName: String)(task: () ⇒ Future[Done]): Unit = { + require( + knownPhases(phase), + s"Unknown phase [$phase], known phases [$knownPhases]. " + + "All phases (along with their optional dependencies) must be defined in configuration") + val current = tasks.get(phase) + if (current == null) { + if (tasks.putIfAbsent(phase, Vector(taskName → task)) != null) + addTask(phase, taskName)(task) // CAS failed, retry + } else { + if (!tasks.replace(phase, current, current :+ (taskName → task))) + addTask(phase, taskName)(task) // CAS failed, retry + } + } + + /** + * Java API: Add a task to a phase. It doesn't remove previously added tasks. + * Tasks added to the same phase are executed in parallel without any + * ordering assumptions. Next phase will not start until all tasks of + * previous phase have been completed. + * + * Tasks should typically be registered as early as possible after system + * startup. When running the coordinated shutdown tasks that have been registered + * will be performed but tasks that are added too late will not be run. + * It is possible to add a task to a later phase by a task in an earlier phase + * and it will be performed. + */ + def addTask(phase: String, taskName: String, task: Supplier[CompletionStage[Done]]): Unit = + addTask(phase, taskName)(() ⇒ task.get().toScala) + + /** + * Scala API: Run tasks of all phases. The returned + * `Future` is completed when all tasks have been completed, + * or there is a failure when recovery is disabled. + * + * It's safe to call this method multiple times. It will only run the once. + */ + def run(): Future[Done] = run(None) + + /** + * Java API: Run tasks of all phases. The returned + * `CompletionStage` is completed when all tasks have been completed, + * or there is a failure when recovery is disabled. + * + * It's safe to call this method multiple times. It will only run the once. + */ + def runAll(): CompletionStage[Done] = run().toJava + + /** + * Scala API: Run tasks of all phases including and after the given phase. + * The returned `Future` is completed when all such tasks have been completed, + * or there is a failure when recovery is disabled. + * + * It's safe to call this method multiple times. It will only run the once. + */ + def run(fromPhase: Option[String]): Future[Done] = { + if (runStarted.compareAndSet(false, true)) { + import system.dispatcher + val debugEnabled = log.isDebugEnabled + def loop(remainingPhases: List[String]): Future[Done] = { + remainingPhases match { + case Nil ⇒ Future.successful(Done) + case phase :: remaining ⇒ + val phaseResult = (tasks.get(phase) match { + case null ⇒ + if (debugEnabled) log.debug("Performing phase [{}] with [0] tasks", phase) + Future.successful(Done) + case tasks ⇒ + if (debugEnabled) log.debug( + "Performing phase [{}] with [{}] tasks: [{}]", + phase, tasks.size, tasks.map { case (taskName, _) ⇒ taskName }.mkString(", ")) + // note that tasks within same phase are performed in parallel + val recoverEnabled = phases(phase).recover + val result = Future.sequence(tasks.map { + case (taskName, task) ⇒ + try { + val r = task.apply() + if (recoverEnabled) r.recover { + case NonFatal(e) ⇒ + log.warning("Task [{}] failed in phase [{}]: {}", taskName, phase, e.getMessage) + Done + } + else r + } catch { + case NonFatal(e) ⇒ + // in case task.apply throws + if (recoverEnabled) { + log.warning("Task [{}] failed in phase [{}]: {}", taskName, phase, e.getMessage) + Future.successful(Done) + } else + Future.failed(e) + } + }).map(_ ⇒ Done)(ExecutionContexts.sameThreadExecutionContext) + val timeout = phases(phase).timeout + val deadline = Deadline.now + timeout + val timeoutFut = after(timeout, system.scheduler) { + if (phase == CoordinatedShutdown.PhaseActorSystemTerminate && deadline.hasTimeLeft) { + // too early, i.e. triggered by system termination + result + } else if (result.isCompleted) + Future.successful(Done) + else if (recoverEnabled) { + log.warning("Coordinated shutdown phase [{}] timed out after {}", phase, timeout) + Future.successful(Done) + } else + Future.failed( + new TimeoutException(s"Coordinated shutdown phase [$phase] timed out after $timeout")) + } + Future.firstCompletedOf(List(result, timeoutFut)) + }) + if (remaining.isEmpty) + phaseResult // avoid flatMap when system terminated in last phase + else + phaseResult.flatMap(_ ⇒ loop(remaining)) + } + } + + val remainingPhases = fromPhase match { + case None ⇒ orderedPhases // all + case Some(p) ⇒ orderedPhases.dropWhile(_ != p) + } + val done = loop(remainingPhases) + runPromise.completeWith(done) + } + runPromise.future + } + + /** + * Java API: Run tasks of all phases including and after the given phase. + * The returned `CompletionStage` is completed when all such tasks have been completed, + * or there is a failure when recovery is disabled. + * + * It's safe to call this method multiple times. It will only run once. + */ + def run(fromPhase: Optional[String]): CompletionStage[Done] = + run(fromPhase.asScala).toJava + + /** + * The configured timeout for a given `phase`. + * For example useful as timeout when actor `ask` requests + * is used as a task. + */ + def timeout(phase: String): FiniteDuration = + phases.get(phase) match { + case Some(Phase(_, timeout, _)) ⇒ timeout + case None ⇒ + throw new IllegalArgumentException(s"Unknown phase [$phase]. All phases must be defined in configuration") + } + + /** + * Sum of timeouts of all phases that have some task. + */ + def totalTimeout(): FiniteDuration = { + import scala.collection.JavaConverters._ + tasks.keySet.asScala.foldLeft(Duration.Zero) { + case (acc, phase) ⇒ acc + timeout(phase) + } + } + + /** + * Scala API: Add a JVM shutdown hook that will be run when the JVM process + * begins its shutdown sequence. Added hooks may run in an order + * concurrently, but they are running before Akka internal shutdown + * hooks, e.g. those shutting down Artery. + */ + @tailrec def addJvmShutdownHook(hook: () ⇒ Unit): Unit = { + if (!runStarted.get) { + val currentLatch = _jvmHooksLatch.get + val newLatch = new CountDownLatch(currentLatch.getCount.toInt + 1) + if (_jvmHooksLatch.compareAndSet(currentLatch, newLatch)) { + Runtime.getRuntime.addShutdownHook(new Thread { + override def run(): Unit = { + try hook() finally _jvmHooksLatch.get.countDown() + } + }) + } else + addJvmShutdownHook(hook) // lost CAS, retry + } + } + + /** + * Java API: Add a JVM shutdown hook that will be run when the JVM process + * begins its shutdown sequence. Added hooks may run in an order + * concurrently, but they are running before Akka internal shutdown + * hooks, e.g. those shutting down Artery. + */ + def addJvmShutdownHook(hook: Runnable): Unit = + addJvmShutdownHook(() ⇒ hook.run()) + +} diff --git a/akka-cluster-metrics/src/test/resources/reference.conf b/akka-cluster-metrics/src/test/resources/reference.conf index 1eca761d06..eebfbdd0a2 100644 --- a/akka-cluster-metrics/src/test/resources/reference.conf +++ b/akka-cluster-metrics/src/test/resources/reference.conf @@ -5,3 +5,7 @@ akka { warn-about-java-serializer-usage = off } } + +akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off +akka.coordinated-shutdown.terminate-actor-system = off +akka.cluster.run-coordinated-shutdown-when-down = off diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index b009ebd99f..e8ce32f3e6 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -237,6 +237,7 @@ object ShardCoordinator { * `ShardRegion` requests full handoff to be able to shutdown gracefully. */ @SerialVersionUID(1L) final case class GracefulShutdownReq(shardRegion: ActorRef) extends CoordinatorCommand + with DeadLetterSuppression // DomainEvents for the persistent state of the event sourced ShardCoordinator sealed trait DomainEvent extends ClusterShardingSerializable diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index f878580641..c5b2982f8d 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -18,6 +18,8 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.Future import scala.reflect.ClassTag +import scala.concurrent.Promise +import akka.Done /** * @see [[ClusterSharding$ ClusterSharding extension]] @@ -372,6 +374,15 @@ class ShardRegion( val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry) var retryCount = 0 + // for CoordinatedShutdown + val gracefulShutdownProgress = Promise[Done]() + CoordinatedShutdown(context.system).addTask( + CoordinatedShutdown.PhaseClusterShardingShutdownRegion, + "region-shutdown") { () ⇒ + self ! GracefulShutdown + gracefulShutdownProgress.future + } + // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) @@ -380,6 +391,7 @@ class ShardRegion( override def postStop(): Unit = { super.postStop() cluster.unsubscribe(self) + gracefulShutdownProgress.trySuccess(Done) retryTask.cancel() } @@ -391,6 +403,14 @@ class ShardRegion( def coordinatorSelection: Option[ActorSelection] = membersByAge.headOption.map(m ⇒ context.actorSelection(RootActorPath(m.address) + coordinatorPath)) + /** + * When leaving the coordinator singleton is started rather quickly on next + * oldest node and therefore it is good to send the GracefulShutdownReq to + * the likely locations of the coordinator. + */ + def gracefulShutdownCoordinatorSelections: List[ActorSelection] = + membersByAge.take(2).toList.map(m ⇒ context.actorSelection(RootActorPath(m.address) + coordinatorPath)) + var coordinator: Option[ActorRef] = None def changeMembers(newMembers: immutable.SortedSet[Member]): Unit = { @@ -603,8 +623,9 @@ class ShardRegion( } private def tryCompleteGracefulShutdown() = - if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) + if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) { context.stop(self) // all shards have been rebalanced, complete graceful shutdown + } def register(): Unit = { coordinatorSelection.foreach(_ ! registrationMessage) @@ -755,7 +776,8 @@ class ShardRegion( } } - def sendGracefulShutdownToCoordinator(): Unit = + def sendGracefulShutdownToCoordinator(): Unit = { if (gracefulShutdownInProgress) - coordinator.foreach(_ ! GracefulShutdownReq(self)) + gracefulShutdownCoordinatorSelections.foreach(_ ! GracefulShutdownReq(self)) + } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index a115146568..006bc62f29 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -38,32 +38,6 @@ object ClusterShardingGracefulShutdownSpec { case id: Int ⇒ id.toString } - //#graceful-shutdown - class IllustrateGracefulShutdown extends Actor { - val system = context.system - val cluster = Cluster(system) - val region = ClusterSharding(system).shardRegion("Entity") - - def receive = { - case "leave" ⇒ - context.watch(region) - region ! ShardRegion.GracefulShutdown - - case Terminated(`region`) ⇒ - cluster.registerOnMemberRemoved(self ! "member-removed") - cluster.leave(cluster.selfAddress) - - case "member-removed" ⇒ - // Let singletons hand over gracefully before stopping the system - import context.dispatcher - system.scheduler.scheduleOnce(10.seconds, self, "stop-system") - - case "stop-system" ⇒ - system.terminate() - } - } - //#graceful-shutdown - } abstract class ClusterShardingGracefulShutdownSpecConfig(val mode: String) extends MultiNodeConfig { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala index 30db523f82..01d6a3be5d 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingLeavingSpec.scala @@ -184,7 +184,11 @@ abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConf enterBarrier("after-3") } - "recover after leaving coordinator node" in within(30.seconds) { + "recover after leaving coordinator node" in { + system.actorSelection(node(first) / "user" / "shardLocations") ! GetLocations + val Locations(originalLocations) = expectMsgType[Locations] + val firstAddress = node(first).address + runOn(third) { cluster.leave(node(first).address) } @@ -196,18 +200,17 @@ abstract class ClusterShardingLeavingSpec(config: ClusterShardingLeavingSpecConf enterBarrier("stopped") runOn(second, third, fourth) { - system.actorSelection(node(first) / "user" / "shardLocations") ! GetLocations - val Locations(locations) = expectMsgType[Locations] - val firstAddress = node(first).address - awaitAssert { - val probe = TestProbe() - locations.foreach { - case (id, ref) ⇒ - region.tell(Ping(id), probe.ref) - if (ref.path.address == firstAddress) - probe.expectMsgType[ActorRef](1.second) should not be (ref) - else - probe.expectMsg(1.second, ref) // should not move + within(15.seconds) { + awaitAssert { + val probe = TestProbe() + originalLocations.foreach { + case (id, ref) ⇒ + region.tell(Ping(id), probe.ref) + if (ref.path.address == firstAddress) + probe.expectMsgType[ActorRef](1.second) should not be (ref) + else + probe.expectMsg(1.second, ref) // should not move + } } } } diff --git a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java index f4ea9e8ce4..bf93ee3c3f 100644 --- a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java +++ b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java @@ -184,37 +184,6 @@ public class ClusterShardingTest { //#counter-actor - static//#graceful-shutdown - public class IllustrateGracefulShutdown extends AbstractActor { - - public IllustrateGracefulShutdown() { - final ActorSystem system = context().system(); - final Cluster cluster = Cluster.get(system); - final ActorRef region = ClusterSharding.get(system).shardRegion("Entity"); - - receive(ReceiveBuilder. - match(String.class, s -> s.equals("leave"), s -> { - context().watch(region); - region.tell(ShardRegion.gracefulShutdownInstance(), self()); - }). - match(Terminated.class, t -> t.actor().equals(region), t -> { - cluster.registerOnMemberRemoved(() -> - self().tell("member-removed", self())); - cluster.leave(cluster.selfAddress()); - }). - match(String.class, s -> s.equals("member-removed"), s -> { - // Let singletons hand over gracefully before stopping the system - context().system().scheduler().scheduleOnce(Duration.create(10, SECONDS), - self(), "stop-system", context().dispatcher(), self()); - }). - match(String.class, s -> s.equals("stop-system"), s -> { - system.terminate(); - }). - build()); - } - } - //#graceful-shutdown - static//#supervisor public class CounterSupervisor extends UntypedActor { diff --git a/akka-cluster-sharding/src/test/resources/reference.conf b/akka-cluster-sharding/src/test/resources/reference.conf index 609cc78f1a..eebfbdd0a2 100644 --- a/akka-cluster-sharding/src/test/resources/reference.conf +++ b/akka-cluster-sharding/src/test/resources/reference.conf @@ -4,4 +4,8 @@ akka { serialize-messages = on warn-about-java-serializer-usage = off } -} \ No newline at end of file +} + +akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off +akka.coordinated-shutdown.terminate-actor-system = off +akka.cluster.run-coordinated-shutdown-when-down = off diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala index e90c9eaa6c..2cfd35c2a6 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala @@ -29,6 +29,7 @@ object RemoveInternalClusterShardingDataSpec { akka.loglevel = INFO akka.actor.provider = "cluster" akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" akka.persistence.journal.leveldb { native = off diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 6b4a1a338c..1e5f81c304 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -25,6 +25,11 @@ import akka.AkkaException import akka.actor.NoSerializationVerificationNeeded import akka.cluster.UniqueAddress import akka.cluster.ClusterEvent +import scala.concurrent.Promise +import akka.Done +import akka.actor.CoordinatedShutdown +import akka.pattern.ask +import akka.util.Timeout object ClusterSingletonManagerSettings { @@ -196,6 +201,7 @@ object ClusterSingletonManager { final case class StoppingData(singleton: ActorRef) extends Data case object EndData extends Data final case class DelayedMemberRemoved(member: Member) + case object SelfExiting val HandOverRetryTimer = "hand-over-retry" val TakeOverRetryTimer = "take-over-retry" @@ -236,6 +242,17 @@ object ClusterSingletonManager { // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) + + // It's a delicate difference between CoordinatedShutdown.PhaseClusterExiting and MemberExited. + // MemberExited event is published immediately (leader may have performed that transition on other node), + // and that will trigger run of CoordinatedShutdown, while PhaseClusterExiting will happen later. + // Using PhaseClusterExiting in the singleton because the graceful shutdown of sharding region + // should preferably complete before stopping the singleton sharding coordinator on same node. + val coordShutdown = CoordinatedShutdown(context.system) + coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-1") { () ⇒ + implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting)) + self.ask(SelfExiting).mapTo[Done] + } } override def postStop(): Unit = cluster.unsubscribe(self) @@ -285,8 +302,12 @@ object ClusterSingletonManager { def receive = { case state: CurrentClusterState ⇒ handleInitial(state) case MemberUp(m) ⇒ add(m) - case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || mEvent.isInstanceOf[MemberRemoved]) ⇒ - remove(mEvent.member) + case MemberRemoved(m, _) ⇒ remove(m) + case MemberExited(m) if m.uniqueAddress != cluster.selfUniqueAddress ⇒ + remove(m) + case SelfExiting ⇒ + remove(cluster.readView.self) + sender() ! Done // reply to ask case GetNext if changes.isEmpty ⇒ context.become(deliverNext, discardOld = false) case GetNext ⇒ @@ -301,16 +322,31 @@ object ClusterSingletonManager { context.unbecome() case MemberUp(m) ⇒ add(m) - if (changes.nonEmpty) { - sendFirstChange() - context.unbecome() - } - case mEvent: MemberEvent if (mEvent.isInstanceOf[MemberExited] || mEvent.isInstanceOf[MemberRemoved]) ⇒ - remove(mEvent.member) - if (changes.nonEmpty) { - sendFirstChange() - context.unbecome() - } + deliverChanges + case MemberRemoved(m, _) ⇒ + remove(m) + deliverChanges() + case MemberExited(m) if m.uniqueAddress != cluster.selfUniqueAddress ⇒ + remove(m) + deliverChanges() + case SelfExiting ⇒ + remove(cluster.readView.self) + deliverChanges() + sender() ! Done // reply to ask + } + + def deliverChanges(): Unit = { + if (changes.nonEmpty) { + sendFirstChange() + context.unbecome() + } + } + + override def unhandled(msg: Any): Unit = { + msg match { + case _: MemberEvent ⇒ // ok, silence + case _ ⇒ super.unhandled(msg) + } } } @@ -422,6 +458,16 @@ class ClusterSingletonManager( removed = removed filter { case (_, deadline) ⇒ deadline.hasTimeLeft } } + // for CoordinatedShutdown + val coordShutdown = CoordinatedShutdown(context.system) + val memberExitingProgress = Promise[Done]() + coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-singleton-exiting")(() ⇒ + memberExitingProgress.future) + coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-2") { () ⇒ + implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting)) + self.ask(SelfExiting).mapTo[Done] + } + def logInfo(message: String): Unit = if (LogInfo) log.info(message) @@ -436,7 +482,7 @@ class ClusterSingletonManager( require(!cluster.isTerminated, "Cluster node must not be terminated") // subscribe to cluster changes, re-subscribe when restart - cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberExited], classOf[MemberRemoved]) + cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved]) setTimer(CleanupTimer, Cleanup, 1.minute, repeat = true) @@ -448,6 +494,7 @@ class ClusterSingletonManager( override def postStop(): Unit = { cancelTimer(CleanupTimer) cluster.unsubscribe(self) + memberExitingProgress.trySuccess(Done) super.postStop() } @@ -634,11 +681,17 @@ class ClusterSingletonManager( case Event(Terminated(ref), d @ OldestData(singleton, _)) if ref == singleton ⇒ stay using d.copy(singletonTerminated = true) + + case Event(SelfExiting, _) ⇒ + selfMemberExited() + // complete memberExitingProgress when handOverDone + sender() ! Done // reply to ask + stay } when(WasOldest) { case Event(TakeOverRetry(count), WasOldestData(singleton, singletonTerminated, newOldestOption)) ⇒ - if (cluster.isTerminated && (newOldestOption.isEmpty || count > maxTakeOverRetries)) { + if ((cluster.isTerminated || selfExited) && (newOldestOption.isEmpty || count > maxTakeOverRetries)) { if (singletonTerminated) stop() else gotoStopping(singleton) } else if (count <= maxTakeOverRetries) { @@ -663,6 +716,12 @@ class ClusterSingletonManager( case Event(Terminated(ref), d @ WasOldestData(singleton, _, _)) if ref == singleton ⇒ stay using d.copy(singletonTerminated = true) + case Event(SelfExiting, _) ⇒ + selfMemberExited() + // complete memberExitingProgress when handOverDone + sender() ! Done // reply to ask + stay + } def gotoHandingOver(singleton: ActorRef, singletonTerminated: Boolean, handOverTo: Option[ActorRef]): State = { @@ -684,12 +743,18 @@ class ClusterSingletonManager( sender() ! HandOverInProgress stay + case Event(SelfExiting, _) ⇒ + selfMemberExited() + // complete memberExitingProgress when handOverDone + sender() ! Done // reply to ask + stay } def handOverDone(handOverTo: Option[ActorRef]): State = { val newOldest = handOverTo.map(_.path.address) logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest) handOverTo foreach { _ ! HandOverDone } + memberExitingProgress.trySuccess(Done) if (removed.contains(cluster.selfUniqueAddress)) { logInfo("Self removed, stopping ClusterSingletonManager") stop() @@ -715,12 +780,16 @@ class ClusterSingletonManager( stop() } + def selfMemberExited(): Unit = { + selfExited = true + logInfo("Exited [{}]", cluster.selfAddress) + } + whenUnhandled { - case Event(MemberExited(m), _) ⇒ - if (m.uniqueAddress == cluster.selfUniqueAddress) { - selfExited = true - logInfo("Exited [{}]", m.address) - } + case Event(SelfExiting, _) ⇒ + selfMemberExited() + memberExitingProgress.trySuccess(Done) + sender() ! Done // reply to ask stay case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited ⇒ logInfo("Self removed, stopping ClusterSingletonManager") diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala index 6b5f26e9f6..e8ba308bac 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala @@ -453,8 +453,10 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod val sys2 = ActorSystem( system.name, ConfigFactory.parseString( - if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port" - else s"akka.remote.netty.tcp.port=$port").withFallback(system.settings.config)) + s""" + akka.remote.artery.canonical.port=$port + akka.remote.netty.tcp.port=$port + """).withFallback(system.settings.config)) Cluster(sys2).join(Cluster(sys2).selfAddress) val service2 = sys2.actorOf(Props(classOf[TestService], testActor), "service2") ClusterClientReceptionist(sys2).registerService(service2) diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/pubsub/DistributedPubSubRestartSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/pubsub/DistributedPubSubRestartSpec.scala index 74beb030f9..96108a5437 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/pubsub/DistributedPubSubRestartSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/pubsub/DistributedPubSubRestartSpec.scala @@ -141,9 +141,10 @@ class DistributedPubSubRestartSpec extends MultiNodeSpec(DistributedPubSubRestar val newSystem = { val port = Cluster(system).selfAddress.port.get val config = ConfigFactory.parseString( - if (RARP(system).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$port" - else s"akka.remote.netty.tcp.port=$port" - ).withFallback(system.settings.config) + s""" + akka.remote.artery.canonical.port=$port + akka.remote.netty.tcp.port=$port + """).withFallback(system.settings.config) ActorSystem(system.name, config) } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala index 49812bfbb2..0aa88140cb 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerLeaveSpec.scala @@ -130,8 +130,12 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan } runOn(first) { + cluster.registerOnMemberRemoved(testActor ! "MemberRemoved") expectMsg(10.seconds, "stop") expectMsg("postStop") + // CoordinatedShutdown makes sure that singleton actors are + // stopped before Cluster shutdown + expectMsg("MemberRemoved") } enterBarrier("first-stopped") @@ -153,13 +157,12 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan } enterBarrier("second-working") - runOn(third) { - cluster.leave(node(second).address) - } - runOn(second) { + cluster.registerOnMemberRemoved(testActor ! "MemberRemoved") + cluster.leave(node(second).address) expectMsg(15.seconds, "stop") expectMsg("postStop") + expectMsg("MemberRemoved") } enterBarrier("second-stopped") @@ -169,12 +172,11 @@ class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonMan enterBarrier("third-started") runOn(third) { + cluster.registerOnMemberRemoved(testActor ! "MemberRemoved") cluster.leave(node(third).address) - } - - runOn(third) { expectMsg(5.seconds, "stop") expectMsg("postStop") + expectMsg("MemberRemoved") } enterBarrier("third-stopped") diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala index e7bfee8e89..259949b10b 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerSpec.scala @@ -238,19 +238,27 @@ class ClusterSingletonManagerSpec extends MultiNodeSpec(ClusterSingletonManagerS // make sure that the proxy has received membership changes // and points to the current singleton val p = TestProbe() - within(5.seconds) { + val oldestAddress = node(oldest).address + within(10.seconds) { awaitAssert { system.actorSelection("/user/consumerProxy").tell(Ping, p.ref) p.expectMsg(1.second, Pong) + val replyFromAddress = p.lastSender.path.address + if (oldest == proxyNode) + replyFromAddress.hasLocalScope should ===(true) + else + replyFromAddress should ===(oldestAddress) } } // then send the real message system.actorSelection("/user/consumerProxy") ! msg } + enterBarrier(s"sent-msg-$msg") + // expect a message on the oldest node runOn(oldest) { - expectMsg(5.seconds, msg) + expectMsg(msg) } enterBarrier("after-" + msg + "-proxy-verified") diff --git a/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java b/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java index 897941fca0..dd8e03d3ec 100644 --- a/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java +++ b/akka-cluster-tools/src/test/java/akka/cluster/client/ClusterClientTest.java @@ -22,7 +22,8 @@ public class ClusterClientTest extends JUnitSuite { new AkkaJUnitActorSystemResource("DistributedPubSubMediatorTest", ConfigFactory.parseString( "akka.actor.provider = \"cluster\"\n" + - "akka.remote.netty.tcp.port=0")); + "akka.remote.netty.tcp.port=0\n" + + "akka.remote.artery.canonical.port=0")); private final ActorSystem system = actorSystemResource.getSystem(); diff --git a/akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java b/akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java index b2f2a651fb..dc1515da4a 100644 --- a/akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java +++ b/akka-cluster-tools/src/test/java/akka/cluster/pubsub/DistributedPubSubMediatorTest.java @@ -26,7 +26,8 @@ public class DistributedPubSubMediatorTest extends JUnitSuite { new AkkaJUnitActorSystemResource("DistributedPubSubMediatorTest", ConfigFactory.parseString( "akka.actor.provider = \"cluster\"\n" + - "akka.remote.netty.tcp.port=0")); + "akka.remote.netty.tcp.port=0\n" + + "akka.remote.artery.canonical.port=0")); private final ActorSystem system = actorSystemResource.getSystem(); diff --git a/akka-cluster-tools/src/test/resources/reference.conf b/akka-cluster-tools/src/test/resources/reference.conf index 609cc78f1a..eebfbdd0a2 100644 --- a/akka-cluster-tools/src/test/resources/reference.conf +++ b/akka-cluster-tools/src/test/resources/reference.conf @@ -4,4 +4,8 @@ akka { serialize-messages = on warn-about-java-serializer-usage = off } -} \ No newline at end of file +} + +akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off +akka.coordinated-shutdown.terminate-actor-system = off +akka.cluster.run-coordinated-shutdown-when-down = off diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/DistributedPubSubMediatorRouterSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/DistributedPubSubMediatorRouterSpec.scala index 52bf506cfb..8332a433b6 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/DistributedPubSubMediatorRouterSpec.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/pubsub/DistributedPubSubMediatorRouterSpec.scala @@ -17,6 +17,7 @@ object DistributedPubSubMediatorRouterSpec { akka.loglevel = INFO akka.actor.provider = "cluster" akka.remote.netty.tcp.port=0 + akka.remote.artery.canonical.port=0 akka.remote.log-remote-lifecycle-events = off akka.cluster.pub-sub.routing-logic = $routingLogic """ diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala index f19887d036..ade67675e3 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestart2Spec.scala @@ -31,6 +31,7 @@ class ClusterSingletonRestart2Spec extends AkkaSpec(""" akka.loglevel = INFO akka.cluster.roles = [singleton] akka.actor.provider = akka.cluster.ClusterActorRefProvider + akka.cluster.auto-down-unreachable-after = 2s akka.remote { netty.tcp { hostname = "127.0.0.1" @@ -59,7 +60,7 @@ class ClusterSingletonRestart2Spec extends AkkaSpec(""" settings = ClusterSingletonManagerSettings(from).withRole("singleton")), name = "echo") - within(10.seconds) { + within(45.seconds) { awaitAssert { Cluster(from) join Cluster(to).selfAddress Cluster(from).state.members.map(_.uniqueAddress) should contain(Cluster(from).selfUniqueAddress) @@ -98,8 +99,10 @@ class ClusterSingletonRestart2Spec extends AkkaSpec(""" val sys4Config = ConfigFactory.parseString( - if (RARP(sys1).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$sys2port" - else s"akka.remote.netty.tcp.port=$sys2port").withFallback(system.settings.config) + s""" + akka.remote.artery.canonical.port=$sys2port + akka.remote.netty.tcp.port=$sys2port + """).withFallback(system.settings.config) ActorSystem(system.name, sys4Config) } diff --git a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestartSpec.scala b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestartSpec.scala index 9d34bf8acd..0951364086 100644 --- a/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestartSpec.scala +++ b/akka-cluster-tools/src/test/scala/akka/cluster/singleton/ClusterSingletonRestartSpec.scala @@ -17,6 +17,7 @@ import com.typesafe.config.ConfigFactory class ClusterSingletonRestartSpec extends AkkaSpec(""" akka.loglevel = INFO akka.actor.provider = akka.cluster.ClusterActorRefProvider + akka.cluster.auto-down-unreachable-after = 2s akka.remote { netty.tcp { hostname = "127.0.0.1" @@ -73,9 +74,10 @@ class ClusterSingletonRestartSpec extends AkkaSpec(""" val sys3Config = ConfigFactory.parseString( - if (RARP(sys1).provider.remoteSettings.Artery.Enabled) s"akka.remote.artery.canonical.port=$sys1port" - else s"akka.remote.netty.tcp.port=$sys1port" - ).withFallback(system.settings.config) + s""" + akka.remote.artery.canonical.port=$sys1port + akka.remote.netty.tcp.port=$sys1port + """).withFallback(system.settings.config) ActorSystem(system.name, sys3Config) } @@ -91,7 +93,7 @@ class ClusterSingletonRestartSpec extends AkkaSpec(""" Cluster(sys2).leave(Cluster(sys2).selfAddress) - within(10.seconds) { + within(15.seconds) { awaitAssert { Cluster(sys3).state.members.map(_.uniqueAddress) should ===(Set(Cluster(sys3).selfUniqueAddress)) } diff --git a/akka-cluster/src/main/resources/reference.conf b/akka-cluster/src/main/resources/reference.conf index a24dd0faa9..d749f10bc8 100644 --- a/akka-cluster/src/main/resources/reference.conf +++ b/akka-cluster/src/main/resources/reference.conf @@ -72,6 +72,11 @@ akka { # routers or other services to distribute work to certain member types, # e.g. front-end and back-end nodes. roles = [] + + # Run the coordinated shutdown from phase 'cluster-shutdown' when the cluster + # is shutdown for other reasons than when leaving, e.g. when downing. This + # will terminate the ActorSystem when the cluster extension is shutdown. + run-coordinated-shutdown-when-down = on role { # Minimum required number of members of a certain role before the leader diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index c102fa1605..f1a33ee7a3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -17,6 +17,11 @@ import scala.collection.breakOut import akka.remote.QuarantinedEvent import java.util.ArrayList import java.util.Collections +import akka.pattern.ask +import akka.util.Timeout +import akka.Done +import scala.concurrent.Future +import scala.concurrent.Promise /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -107,6 +112,8 @@ private[cluster] object InternalClusterAction { @SerialVersionUID(1L) final case class InitJoinNack(address: Address) extends ClusterMessage with DeadLetterSuppression + final case class ExitingConfirmed(node: UniqueAddress) extends ClusterMessage with DeadLetterSuppression + /** * Marker interface for periodic tick messages */ @@ -139,8 +146,10 @@ private[cluster] object InternalClusterAction { final case class AddOnMemberRemovedListener(callback: Runnable) extends NoSerializationVerificationNeeded sealed trait SubscriptionMessage - final case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, to: Set[Class[_]]) extends SubscriptionMessage - final case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) extends SubscriptionMessage + final case class Subscribe(subscriber: ActorRef, initialStateMode: SubscriptionInitialStateMode, + to: Set[Class[_]]) extends SubscriptionMessage + final case class Unsubscribe(subscriber: ActorRef, to: Option[Class[_]]) + extends SubscriptionMessage with DeadLetterSuppression /** * @param receiver [[akka.cluster.ClusterEvent.CurrentClusterState]] will be sent to the `receiver` */ @@ -149,6 +158,9 @@ private[cluster] object InternalClusterAction { sealed trait PublishMessage final case class PublishChanges(newGossip: Gossip) extends PublishMessage final case class PublishEvent(event: ClusterDomainEvent) extends PublishMessage + + final case object ExitingCompleted + } /** @@ -165,6 +177,30 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac // Child actors are therefore created when GetClusterCoreRef is received var coreSupervisor: Option[ActorRef] = None + val clusterShutdown = Promise[Done]() + val coordShutdown = CoordinatedShutdown(context.system) + coordShutdown.addTask(CoordinatedShutdown.PhaseClusterLeave, "leave") { + val sys = context.system + () ⇒ + if (Cluster(sys).isTerminated) + Future.successful(Done) + else { + implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterLeave)) + self.ask(CoordinatedShutdownLeave.LeaveReq).mapTo[Done] + } + } + coordShutdown.addTask(CoordinatedShutdown.PhaseClusterShutdown, "wait-shutdown") { () ⇒ + clusterShutdown.future + } + + override def postStop(): Unit = { + clusterShutdown.trySuccess(Done) + if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) { + // run the last phases e.g. if node was downed (not leaving) + coordShutdown.run(Some(CoordinatedShutdown.PhaseClusterShutdown)) + } + } + def createChildren(): Unit = { coreSupervisor = Some(context.actorOf(Props[ClusterCoreSupervisor]. withDispatcher(context.props.dispatcher), name = "core")) @@ -188,6 +224,10 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac context.actorOf(Props(classOf[ClusterMetricsCollector], publisher). withDispatcher(context.props.dispatcher), name = "metrics") } + case CoordinatedShutdownLeave.LeaveReq ⇒ + val ref = context.actorOf(CoordinatedShutdownLeave.props().withDispatcher(context.props.dispatcher)) + // forward the ask request + ref.forward(CoordinatedShutdownLeave.LeaveReq) } } @@ -267,6 +307,24 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with var seedNodeProcessCounter = 0 // for unique names var leaderActionCounter = 0 + var exitingTasksInProgress = false + val selfExiting = Promise[Done]() + val coordShutdown = CoordinatedShutdown(context.system) + coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-exiting") { () ⇒ + selfExiting.future + } + coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExitingDone, "exiting-completed") { + val sys = context.system + () ⇒ + if (Cluster(sys).isTerminated) + Future.successful(Done) + else { + implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExitingDone)) + self.ask(ExitingCompleted).mapTo[Done] + } + } + var exitingConfirmed = Set.empty[UniqueAddress] + /** * Looks up and returns the remote cluster command connection for the specific address. */ @@ -320,16 +378,17 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with failureDetectorReaperTask.cancel() leaderActionsTask.cancel() publishStatsTask foreach { _.cancel() } + selfExiting.trySuccess(Done) } - def uninitialized: Actor.Receive = { + def uninitialized: Actor.Receive = ({ case InitJoin ⇒ sender() ! InitJoinNack(selfAddress) case ClusterUserAction.JoinTo(address) ⇒ join(address) case JoinSeedNodes(newSeedNodes) ⇒ joinSeedNodes(newSeedNodes) case msg: SubscriptionMessage ⇒ publisher forward msg - } + }: Actor.Receive).orElse(receiveExitingCompleted) - def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = { + def tryingToJoin(joinWith: Address, deadline: Option[Deadline]): Actor.Receive = ({ case Welcome(from, gossip) ⇒ welcome(joinWith, from, gossip) case InitJoin ⇒ sender() ! InitJoinNack(selfAddress) case ClusterUserAction.JoinTo(address) ⇒ @@ -346,7 +405,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if (seedNodes.nonEmpty) joinSeedNodes(seedNodes) else join(joinWith) } - } + }: Actor.Receive).orElse(receiveExitingCompleted) def becomeUninitialized(): Unit = { // make sure that join process is stopped @@ -364,7 +423,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with context.become(initialized) } - def initialized: Actor.Receive = { + def initialized: Actor.Receive = ({ case msg: GossipEnvelope ⇒ receiveGossip(msg) case msg: GossipStatus ⇒ receiveGossipStatus(msg) case GossipTick ⇒ gossipTick() @@ -385,19 +444,23 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with logInfo( "Trying to join seed nodes [{}] when already part of a cluster, ignoring", seedNodes.mkString(", ")) - } + case ExitingConfirmed(address) ⇒ receiveExitingConfirmed(address) + }: Actor.Receive).orElse(receiveExitingCompleted) - def removed: Actor.Receive = { - case msg: SubscriptionMessage ⇒ publisher forward msg + def receiveExitingCompleted: Actor.Receive = { + case ExitingCompleted ⇒ + exitingCompleted() + sender() ! Done // reply to ask } def receive = uninitialized override def unhandled(message: Any): Unit = message match { - case _: Tick ⇒ - case _: GossipEnvelope ⇒ - case _: GossipStatus ⇒ - case other ⇒ super.unhandled(other) + case _: Tick ⇒ + case _: GossipEnvelope ⇒ + case _: GossipStatus ⇒ + case _: ExitingConfirmed ⇒ + case other ⇒ super.unhandled(other) } def initJoin(): Unit = { @@ -580,6 +643,52 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } } + def exitingCompleted() = { + logInfo("Exiting completed") + // ExitingCompleted sent via CoordinatedShutdown to continue the leaving process. + exitingTasksInProgress = false + // mark as seen + latestGossip = latestGossip seen selfUniqueAddress + assertLatestGossip() + publish(latestGossip) + + // Let others know (best effort) before shutdown. Otherwise they will not see + // convergence of the Exiting state until they have detected this node as + // unreachable and the required downing has finished. They will still need to detect + // unreachable, but Exiting unreachable will be removed without downing, i.e. + // normally the leaving of a leader will be graceful without the need + // for downing. However, if those final gossip messages never arrive it is + // alright to require the downing, because that is probably caused by a + // network failure anyway. + gossipRandomN(NumberOfGossipsBeforeShutdownWhenLeaderExits) + + // send ExitingConfirmed to two potential leaders + val membersWithoutSelf = latestGossip.members.filterNot(_.uniqueAddress == selfUniqueAddress) + latestGossip.leaderOf(membersWithoutSelf, selfUniqueAddress) match { + case Some(node1) ⇒ + clusterCore(node1.address) ! ExitingConfirmed(selfUniqueAddress) + latestGossip.leaderOf(membersWithoutSelf.filterNot(_.uniqueAddress == node1), selfUniqueAddress) match { + case Some(node2) ⇒ + clusterCore(node2.address) ! ExitingConfirmed(selfUniqueAddress) + case None ⇒ // no more potential leader + } + case None ⇒ // no leader + } + + shutdown() + } + + def receiveExitingConfirmed(node: UniqueAddress): Unit = { + logInfo("Exiting confirmed [{}]", node.address) + exitingConfirmed += node + } + + def cleanupExitingConfirmed(): Unit = { + // in case the actual removal was performed by another leader node we + if (exitingConfirmed.nonEmpty) + exitingConfirmed = exitingConfirmed.filter(n ⇒ latestGossip.members.exists(_.uniqueAddress == n)) + } + /** * This method is called when a member sees itself as Exiting or Down. */ @@ -694,13 +803,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val (winningGossip, talkback, gossipType) = comparison match { case VectorClock.Same ⇒ // same version - (remoteGossip mergeSeen localGossip, !remoteGossip.seenByNode(selfUniqueAddress), Same) + val talkback = !exitingTasksInProgress && !remoteGossip.seenByNode(selfUniqueAddress) + (remoteGossip mergeSeen localGossip, talkback, Same) case VectorClock.Before ⇒ // local is newer (localGossip, true, Older) case VectorClock.After ⇒ // remote is newer - (remoteGossip, !remoteGossip.seenByNode(selfUniqueAddress), Newer) + val talkback = !exitingTasksInProgress && !remoteGossip.seenByNode(selfUniqueAddress) + (remoteGossip, talkback, Newer) case _ ⇒ // conflicting versions, merge // We can see that a removal was done when it is not in one of the gossips has status @@ -725,7 +836,13 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with (prunedRemoteGossip merge prunedLocalGossip, true, Merge) } - latestGossip = winningGossip seen selfUniqueAddress + // Don't mark gossip state as seen while exiting is in progress, e.g. + // shutting down singleton actors. This delays removal of the member until + // the exiting tasks have been completed. + if (exitingTasksInProgress) + latestGossip = winningGossip + else + latestGossip = winningGossip seen selfUniqueAddress assertLatestGossip() // for all new joining nodes we remove them from the failure detector @@ -754,9 +871,16 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with publish(latestGossip) val selfStatus = latestGossip.member(selfUniqueAddress).status - if (selfStatus == Exiting) - shutdown() - else if (talkback) { + if (selfStatus == Exiting && !exitingTasksInProgress) { + // ExitingCompleted will be received via CoordinatedShutdown to continue + // the leaving process. Meanwhile the gossip state is not marked as seen. + exitingTasksInProgress = true + logInfo("Exiting, starting coordinated shutdown") + selfExiting.trySuccess(Done) + coordShutdown.run() + } + + if (talkback) { // send back gossip to sender() when sender() had different view, i.e. merge, or sender() had // older or sender() had newer gossipTo(from, sender()) @@ -875,7 +999,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with // only run the leader actions if we are the LEADER val firstNotice = 20 val periodicNotice = 60 - if (latestGossip.convergence(selfUniqueAddress)) { + if (latestGossip.convergence(selfUniqueAddress, exitingConfirmed)) { if (leaderActionCounter >= firstNotice) logInfo("Leader can perform its duties again") leaderActionCounter = 0 @@ -893,6 +1017,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with s"${m.address} ${m.status} seen=${latestGossip.seenByNode(m.uniqueAddress)}").mkString(", ")) } } + cleanupExitingConfirmed() shutdownSelfWhenDown() } @@ -948,6 +1073,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with if Gossip.removeUnreachableWithMemberStatus(m.status) } yield m + val removedExitingConfirmed = exitingConfirmed.filter(n ⇒ localGossip.member(n).status == Exiting) + val changedMembers = localMembers collect { var upNumber = 0 @@ -971,14 +1098,15 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } } - if (removedUnreachable.nonEmpty || changedMembers.nonEmpty) { + if (removedUnreachable.nonEmpty || removedExitingConfirmed.nonEmpty || changedMembers.nonEmpty) { // handle changes // replace changed members - val newMembers = changedMembers union localMembers diff removedUnreachable + val newMembers = changedMembers.union(localMembers).diff(removedUnreachable) + .filterNot(m ⇒ removedExitingConfirmed(m.uniqueAddress)) // removing REMOVED nodes from the `seen` table - val removed = removedUnreachable.map(_.uniqueAddress) + val removed = removedUnreachable.map(_.uniqueAddress).union(removedExitingConfirmed) val newSeen = localSeen diff removed // removing REMOVED nodes from the `reachability` table val newReachability = localOverview.reachability.remove(removed) @@ -992,7 +1120,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with } val newGossip = localGossip copy (members = newMembers, overview = newOverview, version = newVersion) + if (!exitingTasksInProgress && newGossip.member(selfUniqueAddress).status == Exiting) { + // Leader is moving itself from Leaving to Exiting. + // ExitingCompleted will be received via CoordinatedShutdown to continue + // the leaving process. Meanwhile the gossip state is not marked as seen. + exitingTasksInProgress = true + logInfo("Exiting (leader), starting coordinated shutdown") + selfExiting.trySuccess(Done) + coordShutdown.run() + } + updateLatestGossip(newGossip) + exitingConfirmed = exitingConfirmed.filterNot(removedExitingConfirmed) // log status changes changedMembers foreach { m ⇒ @@ -1004,23 +1143,11 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with val status = if (m.status == Exiting) "exiting" else "unreachable" logInfo("Leader is removing {} node [{}]", status, m.address) } - - publish(latestGossip) - - if (latestGossip.member(selfUniqueAddress).status == Exiting) { - // Leader is moving itself from Leaving to Exiting. Let others know (best effort) - // before shutdown. Otherwise they will not see the Exiting state change - // and there will not be convergence until they have detected this node as - // unreachable and the required downing has finished. They will still need to detect - // unreachable, but Exiting unreachable will be removed without downing, i.e. - // normally the leaving of a leader will be graceful without the need - // for downing. However, if those final gossip messages never arrive it is - // alright to require the downing, because that is probably caused by a - // network failure anyway. - gossipRandomN(NumberOfGossipsBeforeShutdownWhenLeaderExits) - shutdown() + removedExitingConfirmed.foreach { n ⇒ + logInfo("Leader is removing confirmed Exiting node [{}]", n.address) } + publish(latestGossip) } } @@ -1144,10 +1271,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with def updateLatestGossip(newGossip: Gossip): Unit = { // Updating the vclock version for the changes val versionedGossip = newGossip :+ vclockNode - // Nobody else have seen this gossip but us - val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress) - // Update the state with the new gossip - latestGossip = seenVersionedGossip + + // Don't mark gossip state as seen while exiting is in progress, e.g. + // shutting down singleton actors. This delays removal of the member until + // the exiting tasks have been completed. + if (exitingTasksInProgress) + latestGossip = versionedGossip.clearSeen() + else { + // Nobody else has seen this gossip but us + val seenVersionedGossip = versionedGossip onlySeen (selfUniqueAddress) + // Update the state with the new gossip + latestGossip = seenVersionedGossip + } assertLatestGossip() } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 29c6c5c6ab..0bda0b293c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -13,6 +13,7 @@ import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ import akka.event.EventStream import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.actor.DeadLetterSuppression /** * Domain events published to the event bus. @@ -199,7 +200,7 @@ object ClusterEvent { * This event is published when the cluster node is shutting down, * before the final [[MemberRemoved]] events are published. */ - final case object ClusterShuttingDown extends ClusterDomainEvent + final case object ClusterShuttingDown extends ClusterDomainEvent with DeadLetterSuppression /** * Java API: get the singleton instance of `ClusterShuttingDown` event @@ -335,9 +336,9 @@ object ClusterEvent { private[cluster] def diffSeen(oldGossip: Gossip, newGossip: Gossip, selfUniqueAddress: UniqueAddress): immutable.Seq[SeenChanged] = if (newGossip eq oldGossip) Nil else { - val newConvergence = newGossip.convergence(selfUniqueAddress) + val newConvergence = newGossip.convergence(selfUniqueAddress, Set.empty) val newSeenBy = newGossip.seenBy - if (newConvergence != oldGossip.convergence(selfUniqueAddress) || newSeenBy != oldGossip.seenBy) + if (newConvergence != oldGossip.convergence(selfUniqueAddress, Set.empty) || newSeenBy != oldGossip.seenBy) List(SeenChanged(newConvergence, newSeenBy.map(_.address))) else Nil } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 70cb27fd54..55abb5d05c 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -103,6 +103,7 @@ final class ClusterSettings(val config: Config, val systemName: String) { case (key, value: ConfigObject) ⇒ key → value.toConfig.getInt("min-nr-of-members") }.toMap } + val RunCoordinatedShutdownWhenDown: Boolean = cc.getBoolean("run-coordinated-shutdown-when-down") val JmxEnabled: Boolean = cc.getBoolean("jmx.enabled") val UseDispatcher: String = cc.getString("use-dispatcher") match { case "" ⇒ Dispatchers.DefaultDispatcherId diff --git a/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala b/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala new file mode 100644 index 0000000000..f26166316a --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/CoordinatedShutdownLeave.scala @@ -0,0 +1,56 @@ +/** + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.cluster + +import akka.Done +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.cluster.ClusterEvent._ +import akka.cluster.MemberStatus._ + +/** + * INTERNAL API + */ +private[akka] object CoordinatedShutdownLeave { + def props(): Props = Props[CoordinatedShutdownLeave] + + case object LeaveReq +} + +/** + * INTERNAL API + */ +private[akka] class CoordinatedShutdownLeave extends Actor { + import CoordinatedShutdownLeave.LeaveReq + + val cluster = Cluster(context.system) + + override def postStop(): Unit = { + cluster.unsubscribe(self) + } + + def receive = { + case LeaveReq ⇒ + // MemberRemoved is needed in case it was downed instead + cluster.leave(cluster.selfAddress) + cluster.subscribe(self, classOf[MemberLeft], classOf[MemberRemoved]) + context.become(waitingLeaveCompleted(sender())) + } + + def waitingLeaveCompleted(replyTo: ActorRef): Receive = { + case s: CurrentClusterState ⇒ + if (s.members.exists(m ⇒ m.uniqueAddress == cluster.selfUniqueAddress && + (m.status == Leaving || m.status == Exiting || m.status == Down))) { + replyTo ! Done + context.stop(self) + } + case evt: MemberEvent ⇒ + if (evt.member.uniqueAddress == cluster.selfUniqueAddress) { + replyTo ! Done + context.stop(self) + } + } + +} diff --git a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala index e2b78657b4..745d6d2440 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Gossip.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Gossip.scala @@ -114,6 +114,13 @@ private[cluster] final case class Gossip( this copy (overview = overview copy (seen = Set(node))) } + /** + * Remove all seen entries + */ + def clearSeen(): Gossip = { + this copy (overview = overview copy (seen = Set.empty)) + } + /** * The nodes that have seen the current version of the Gossip. */ @@ -158,7 +165,7 @@ private[cluster] final case class Gossip( * * @return true if convergence have been reached and false if not */ - def convergence(selfUniqueAddress: UniqueAddress): Boolean = { + def convergence(selfUniqueAddress: UniqueAddress, exitingConfirmed: Set[UniqueAddress]): Boolean = { // First check that: // 1. we don't have any members that are unreachable, excluding observations from members // that have status DOWN, or @@ -167,10 +174,11 @@ private[cluster] final case class Gossip( // When that is done we check that all members with a convergence // status is in the seen table, i.e. has seen this version val unreachable = reachabilityExcludingDownedObservers.allUnreachableOrTerminated.collect { - case node if (node != selfUniqueAddress) ⇒ member(node) + case node if (node != selfUniqueAddress && !exitingConfirmed(node)) ⇒ member(node) } unreachable.forall(m ⇒ Gossip.convergenceSkipUnreachableWithMemberStatus(m.status)) && - !members.exists(m ⇒ Gossip.convergenceMemberStatus(m.status) && !seenByNode(m.uniqueAddress)) + !members.exists(m ⇒ Gossip.convergenceMemberStatus(m.status) && + !(seenByNode(m.uniqueAddress) || exitingConfirmed(m.uniqueAddress))) } lazy val reachabilityExcludingDownedObservers: Reachability = { @@ -187,7 +195,7 @@ private[cluster] final case class Gossip( def roleLeader(role: String, selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = leaderOf(members.filter(_.hasRole(role)), selfUniqueAddress) - private def leaderOf(mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = { + def leaderOf(mbrs: immutable.SortedSet[Member], selfUniqueAddress: UniqueAddress): Option[UniqueAddress] = { val reachableMembers = if (overview.reachability.isAllReachable) mbrs.filterNot(_.status == Down) else mbrs.filter(m ⇒ m.status != Down && diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index d64cd0a4f7..29c3174610 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -19,6 +19,7 @@ import scala.collection.JavaConverters._ import scala.collection.immutable import scala.concurrent.duration.Deadline import java.io.NotSerializableException +import akka.cluster.InternalClusterAction.ExitingConfirmed /** * Protobuf serializer of cluster messages. @@ -57,6 +58,7 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri classOf[InternalClusterAction.InitJoinNack] → (bytes ⇒ InternalClusterAction.InitJoinNack(addressFromBinary(bytes))), classOf[ClusterHeartbeatSender.Heartbeat] → (bytes ⇒ ClusterHeartbeatSender.Heartbeat(addressFromBinary(bytes))), classOf[ClusterHeartbeatSender.HeartbeatRsp] → (bytes ⇒ ClusterHeartbeatSender.HeartbeatRsp(uniqueAddressFromBinary(bytes))), + classOf[ExitingConfirmed] → (bytes ⇒ InternalClusterAction.ExitingConfirmed(uniqueAddressFromBinary(bytes))), classOf[GossipStatus] → gossipStatusFromBinary, classOf[GossipEnvelope] → gossipEnvelopeFromBinary, classOf[MetricsGossipEnvelope] → metricsGossipEnvelopeFromBinary) @@ -64,18 +66,19 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri def includeManifest: Boolean = true def toBinary(obj: AnyRef): Array[Byte] = obj match { - case ClusterHeartbeatSender.Heartbeat(from) ⇒ addressToProtoByteArray(from) - case ClusterHeartbeatSender.HeartbeatRsp(from) ⇒ uniqueAddressToProtoByteArray(from) - case m: GossipEnvelope ⇒ gossipEnvelopeToProto(m).toByteArray - case m: GossipStatus ⇒ gossipStatusToProto(m).toByteArray - case m: MetricsGossipEnvelope ⇒ compress(metricsGossipEnvelopeToProto(m)) - case InternalClusterAction.Join(node, roles) ⇒ joinToProto(node, roles).toByteArray - case InternalClusterAction.Welcome(from, gossip) ⇒ compress(welcomeToProto(from, gossip)) - case ClusterUserAction.Leave(address) ⇒ addressToProtoByteArray(address) - case ClusterUserAction.Down(address) ⇒ addressToProtoByteArray(address) - case InternalClusterAction.InitJoin ⇒ cm.Empty.getDefaultInstance.toByteArray - case InternalClusterAction.InitJoinAck(address) ⇒ addressToProtoByteArray(address) - case InternalClusterAction.InitJoinNack(address) ⇒ addressToProtoByteArray(address) + case ClusterHeartbeatSender.Heartbeat(from) ⇒ addressToProtoByteArray(from) + case ClusterHeartbeatSender.HeartbeatRsp(from) ⇒ uniqueAddressToProtoByteArray(from) + case m: GossipEnvelope ⇒ gossipEnvelopeToProto(m).toByteArray + case m: GossipStatus ⇒ gossipStatusToProto(m).toByteArray + case m: MetricsGossipEnvelope ⇒ compress(metricsGossipEnvelopeToProto(m)) + case InternalClusterAction.Join(node, roles) ⇒ joinToProto(node, roles).toByteArray + case InternalClusterAction.Welcome(from, gossip) ⇒ compress(welcomeToProto(from, gossip)) + case ClusterUserAction.Leave(address) ⇒ addressToProtoByteArray(address) + case ClusterUserAction.Down(address) ⇒ addressToProtoByteArray(address) + case InternalClusterAction.InitJoin ⇒ cm.Empty.getDefaultInstance.toByteArray + case InternalClusterAction.InitJoinAck(address) ⇒ addressToProtoByteArray(address) + case InternalClusterAction.InitJoinNack(address) ⇒ addressToProtoByteArray(address) + case InternalClusterAction.ExitingConfirmed(node) ⇒ uniqueAddressToProtoByteArray(node) case _ ⇒ throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass}") } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala index 760c4594a3..9e8c850e9d 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/NodeLeavingAndExitingAndBeingRemovedSpec.scala @@ -15,9 +15,8 @@ object NodeLeavingAndExitingAndBeingRemovedMultiJvmSpec extends MultiNodeConfig val second = role("second") val third = role("third") - commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString( - "akka.cluster.auto-down-unreachable-after = 0s")). - withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) + commonConfig(debugConfig(on = false) + .withFallback(MultiNodeClusterSpec.clusterConfigWithFailureDetectorPuppet)) } class NodeLeavingAndExitingAndBeingRemovedMultiJvmNode1 extends NodeLeavingAndExitingAndBeingRemovedSpec @@ -36,7 +35,7 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec awaitClusterUp(first, second, third) - within(30.seconds) { + within(15.seconds) { runOn(first) { cluster.leave(second) } @@ -44,7 +43,9 @@ abstract class NodeLeavingAndExitingAndBeingRemovedSpec runOn(first, third) { enterBarrier("second-shutdown") - markNodeAsUnavailable(second) + // this test verifies that the removal is performed via the ExitingCompleted message, + // otherwise we would have `markNodeAsUnavailable(second)` to trigger the FailureDetectorPuppet + // verify that the 'second' node is no longer part of the 'members'/'unreachable' set awaitAssert { clusterView.members.map(_.address) should not contain (address(second)) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala index d82c6f3752..bdd7143a47 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/QuickRestartSpec.scala @@ -69,17 +69,11 @@ abstract class QuickRestartSpec system.name, // use the same port ConfigFactory.parseString( - if (RARP(system).provider.remoteSettings.Artery.Enabled) - s""" + s""" akka.cluster.roles = [round-$n] + akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get} akka.remote.artery.canonical.port = ${Cluster(restartingSystem).selfAddress.port.get} - """ - else - s""" - akka.cluster.roles = [round-$n] - akka.remote.netty.tcp.port = ${Cluster(restartingSystem).selfAddress.port.get} - """ - ).withFallback(system.settings.config)) + """).withFallback(system.settings.config)) log.info("Restarting node has address: {}", Cluster(restartingSystem).selfUniqueAddress) Cluster(restartingSystem).joinSeedNodes(seedNodes) within(20.seconds) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala index 594076dfeb..96286a8d2f 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartFirstSeedNodeSpec.scala @@ -55,11 +55,10 @@ abstract class RestartFirstSeedNodeSpec lazy val restartedSeed1System = ActorSystem( system.name, ConfigFactory.parseString( - if (RARP(system).provider.remoteSettings.Artery.Enabled) - "akka.remote.artery.canonical.port=" + seedNodes.head.port.get - else - "akka.remote.netty.tcp.port=" + seedNodes.head.port.get - ).withFallback(system.settings.config)) + s""" + akka.remote.netty.tcp.port = ${seedNodes.head.port.get} + akka.remote.artery.canonical.port = ${seedNodes.head.port.get} + """).withFallback(system.settings.config)) override def afterAll(): Unit = { runOn(seed1) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala index 61eaad43b2..b171058c02 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode2Spec.scala @@ -56,10 +56,10 @@ abstract class RestartNode2SpecSpec system.name, ConfigFactory.parseString( s""" - akka.remote.netty.tcp.port= ${seedNodes.head.port.get} + akka.remote.netty.tcp.port = ${seedNodes.head.port.get} + akka.remote.artery.canonical.port = ${seedNodes.head.port.get} #akka.remote.retry-gate-closed-for = 1s - """). - withFallback(system.settings.config)) + """).withFallback(system.settings.config)) override def afterAll(): Unit = { runOn(seed1) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala index 5eafb5edf7..e1eb6e1b73 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNode3Spec.scala @@ -51,11 +51,10 @@ abstract class RestartNode3Spec lazy val restartedSecondSystem = ActorSystem( system.name, ConfigFactory.parseString( - if (RARP(system).provider.remoteSettings.Artery.Enabled) - "akka.remote.artery.canonical.port=" + secondUniqueAddress.address.port.get - else - "akka.remote.netty.tcp.port=" + secondUniqueAddress.address.port.get - ).withFallback(system.settings.config)) + s""" + akka.remote.artery.canonical.port = ${secondUniqueAddress.address.port.get} + akka.remote.netty.tcp.port = ${secondUniqueAddress.address.port.get} + """).withFallback(system.settings.config)) override def afterAll(): Unit = { runOn(second) { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala index 18ddba0a28..0fe5927e01 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/RestartNodeSpec.scala @@ -71,8 +71,10 @@ abstract class RestartNodeSpec lazy val restartedSecondSystem = ActorSystem( system.name, - ConfigFactory.parseString("akka.remote.netty.tcp.port=" + secondUniqueAddress.address.port.get). - withFallback(system.settings.config)) + ConfigFactory.parseString(s""" + akka.remote.netty.tcp.port = ${secondUniqueAddress.address.port.get} + akka.remote.artery.canonical.port = ${secondUniqueAddress.address.port.get} + """).withFallback(system.settings.config)) override def afterAll(): Unit = { runOn(second) { diff --git a/akka-cluster/src/test/resources/reference.conf b/akka-cluster/src/test/resources/reference.conf index 1eca761d06..44cc294c18 100644 --- a/akka-cluster/src/test/resources/reference.conf +++ b/akka-cluster/src/test/resources/reference.conf @@ -5,3 +5,8 @@ akka { warn-about-java-serializer-usage = off } } + +akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off +akka.coordinated-shutdown.terminate-actor-system = off +akka.cluster.run-coordinated-shutdown-when-down = off + diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 15ae3c4a6f..de4f313984 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -34,6 +34,7 @@ object ClusterDeployerSpec { } } akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 """, ConfigParseOptions.defaults) class RecipeActor extends Actor { diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index 99a24e6467..e9d1897fce 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -24,6 +24,7 @@ object ClusterDomainEventPublisherSpec { val config = """ akka.actor.provider = "cluster" akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 """ } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala index b696c475c1..e474340b81 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventSpec.scala @@ -33,7 +33,7 @@ class ClusterDomainEventSpec extends WordSpec with Matchers { val eJoining = TestMember(Address("akka.tcp", "sys", "e", 2552), Joining, eRoles) val eUp = TestMember(Address("akka.tcp", "sys", "e", 2552), Up, eRoles) val eDown = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, eRoles) - val selfDummyAddress = UniqueAddress(Address("akka.tcp", "sys", "selfDummy", 2552), 17) + val selfDummyAddress = UniqueAddress(Address("akka.tcp", "sys", "selfDummy", 2552), 17L) private[cluster] def converge(gossip: Gossip): (Gossip, Set[UniqueAddress]) = ((gossip, Set.empty[UniqueAddress]) /: gossip.members) { case ((gs, as), m) ⇒ (gs.seen(m.uniqueAddress), as + m.uniqueAddress) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala index de8c1853ea..f4a39aa829 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterHeartbeatSenderStateSpec.scala @@ -40,11 +40,11 @@ object ClusterHeartbeatSenderStateSpec { class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { import ClusterHeartbeatSenderStateSpec._ - val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1) - val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2) - val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3) - val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4) - val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5) + val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1L) + val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2L) + val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3L) + val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4L) + val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5L) private def emptyState: ClusterHeartbeatSenderState = emptyState(aa) @@ -142,7 +142,8 @@ class ClusterHeartbeatSenderStateSpec extends WordSpec with Matchers { "behave correctly for random operations" in { val rnd = ThreadLocalRandom.current - val nodes = (1 to rnd.nextInt(10, 200)).map(n ⇒ UniqueAddress(Address("akka.tcp", "sys", "n" + n, 2552), n)).toVector + val nodes = (1 to rnd.nextInt(10, 200)).map(n ⇒ + UniqueAddress(Address("akka.tcp", "sys", "n" + n, 2552), n.toLong)).toVector def rndNode() = nodes(rnd.nextInt(0, nodes.size)) val selfUniqueAddress = rndNode() var state = emptyState(selfUniqueAddress) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 742b2a788d..481e44a6f3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -16,6 +16,10 @@ import akka.testkit.TestProbe import akka.actor.ActorSystem import akka.actor.Props import com.typesafe.config.ConfigFactory +import akka.actor.CoordinatedShutdown +import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.ClusterEvent._ +import scala.concurrent.Await object ClusterSpec { val config = """ @@ -28,7 +32,7 @@ object ClusterSpec { akka.actor.provider = "cluster" akka.remote.log-remote-lifecycle-events = off akka.remote.netty.tcp.port = 0 - #akka.loglevel = DEBUG + akka.remote.artery.canonical.port = 0 """ final case class GossipTo(address: Address) @@ -109,6 +113,7 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString(""" akka.actor.provider = "cluster" akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 """)) try { val ref = sys2.actorOf(Props.empty) @@ -137,5 +142,77 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { cluster.remotePathOf(testActor).uid should ===(testActor.path.uid) cluster.remotePathOf(testActor).address should ===(selfAddress) } + + "leave via CoordinatedShutdown.run" in { + val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString(""" + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + """)) + try { + val probe = TestProbe()(sys2) + Cluster(sys2).subscribe(probe.ref, classOf[MemberEvent]) + probe.expectMsgType[CurrentClusterState] + Cluster(sys2).join(Cluster(sys2).selfAddress) + probe.expectMsgType[MemberUp] + + CoordinatedShutdown(sys2).run() + probe.expectMsgType[MemberLeft] + probe.expectMsgType[MemberExited] + probe.expectMsgType[MemberRemoved] + } finally { + shutdown(sys2) + } + } + + "terminate ActorSystem via leave (CoordinatedShutdown)" in { + val sys2 = ActorSystem("ClusterSpec2", ConfigFactory.parseString(""" + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.coordinated-shutdown.terminate-actor-system = on + """)) + try { + val probe = TestProbe()(sys2) + Cluster(sys2).subscribe(probe.ref, classOf[MemberEvent]) + probe.expectMsgType[CurrentClusterState] + Cluster(sys2).join(Cluster(sys2).selfAddress) + probe.expectMsgType[MemberUp] + + Cluster(sys2).leave(Cluster(sys2).selfAddress) + probe.expectMsgType[MemberLeft] + probe.expectMsgType[MemberExited] + probe.expectMsgType[MemberRemoved] + Await.result(sys2.whenTerminated, 10.seconds) + Cluster(sys2).isTerminated should ===(true) + } finally { + shutdown(sys2) + } + } + + "terminate ActorSystem via down (CoordinatedShutdown)" in { + val sys3 = ActorSystem("ClusterSpec3", ConfigFactory.parseString(""" + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.coordinated-shutdown.terminate-actor-system = on + akka.cluster.run-coordinated-shutdown-when-down = on +akka.loglevel=DEBUG + """)) + try { + val probe = TestProbe()(sys3) + Cluster(sys3).subscribe(probe.ref, classOf[MemberEvent]) + probe.expectMsgType[CurrentClusterState] + Cluster(sys3).join(Cluster(sys3).selfAddress) + probe.expectMsgType[MemberUp] + + Cluster(sys3).down(Cluster(sys3).selfAddress) + probe.expectMsgType[MemberRemoved] + Await.result(sys3.whenTerminated, 10.seconds) + Cluster(sys3).isTerminated should ===(true) + } finally { + shutdown(sys3) + } + } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala index 8a913ba538..956814d9df 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipSpec.scala @@ -28,43 +28,57 @@ class GossipSpec extends WordSpec with Matchers { "A Gossip" must { "reach convergence when it's empty" in { - Gossip.empty.convergence(a1.uniqueAddress) should ===(true) + Gossip.empty.convergence(a1.uniqueAddress, Set.empty) should ===(true) } "reach convergence for one node" in { - val g1 = (Gossip(members = SortedSet(a1))).seen(a1.uniqueAddress) - g1.convergence(a1.uniqueAddress) should ===(true) + val g1 = Gossip(members = SortedSet(a1)).seen(a1.uniqueAddress) + g1.convergence(a1.uniqueAddress, Set.empty) should ===(true) } "not reach convergence until all have seen version" in { - val g1 = (Gossip(members = SortedSet(a1, b1))).seen(a1.uniqueAddress) - g1.convergence(a1.uniqueAddress) should ===(false) + val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress) + g1.convergence(a1.uniqueAddress, Set.empty) should ===(false) } "reach convergence for two nodes" in { - val g1 = (Gossip(members = SortedSet(a1, b1))).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(a1.uniqueAddress) should ===(true) + val g1 = Gossip(members = SortedSet(a1, b1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) + g1.convergence(a1.uniqueAddress, Set.empty) should ===(true) } "reach convergence, skipping joining" in { // e1 is joining - val g1 = (Gossip(members = SortedSet(a1, b1, e1))).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(a1.uniqueAddress) should ===(true) + val g1 = Gossip(members = SortedSet(a1, b1, e1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) + g1.convergence(a1.uniqueAddress, Set.empty) should ===(true) } "reach convergence, skipping down" in { // e3 is down - val g1 = (Gossip(members = SortedSet(a1, b1, e3))).seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(a1.uniqueAddress) should ===(true) + val g1 = Gossip(members = SortedSet(a1, b1, e3)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) + g1.convergence(a1.uniqueAddress, Set.empty) should ===(true) + } + + "reach convergence, skipping Leaving with exitingConfirmed" in { + // c1 is Leaving + val g1 = Gossip(members = SortedSet(a1, b1, c1)).seen(a1.uniqueAddress).seen(b1.uniqueAddress) + g1.convergence(a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true) + } + + "reach convergence, skipping unreachable Leaving with exitingConfirmed" in { + // c1 is Leaving + val r1 = Reachability.empty.unreachable(b1.uniqueAddress, c1.uniqueAddress) + val g1 = Gossip(members = SortedSet(a1, b1, c1), overview = GossipOverview(reachability = r1)) + .seen(a1.uniqueAddress).seen(b1.uniqueAddress) + g1.convergence(a1.uniqueAddress, Set(c1.uniqueAddress)) should ===(true) } "not reach convergence when unreachable" in { val r1 = Reachability.empty.unreachable(b1.uniqueAddress, a1.uniqueAddress) val g1 = (Gossip(members = SortedSet(a1, b1), overview = GossipOverview(reachability = r1))) .seen(a1.uniqueAddress).seen(b1.uniqueAddress) - g1.convergence(b1.uniqueAddress) should ===(false) + g1.convergence(b1.uniqueAddress, Set.empty) should ===(false) // but from a1's point of view (it knows that itself is not unreachable) - g1.convergence(a1.uniqueAddress) should ===(true) + g1.convergence(a1.uniqueAddress, Set.empty) should ===(true) } "reach convergence when downed node has observed unreachable" in { @@ -72,7 +86,7 @@ class GossipSpec extends WordSpec with Matchers { val r1 = Reachability.empty.unreachable(e3.uniqueAddress, a1.uniqueAddress) val g1 = (Gossip(members = SortedSet(a1, b1, e3), overview = GossipOverview(reachability = r1))) .seen(a1.uniqueAddress).seen(b1.uniqueAddress).seen(e3.uniqueAddress) - g1.convergence(b1.uniqueAddress) should ===(true) + g1.convergence(b1.uniqueAddress, Set.empty) should ===(true) } "merge members by status priority" in { diff --git a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala index 37f78e2244..73c8b057be 100644 --- a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingPerfSpec.scala @@ -14,7 +14,7 @@ class HeartbeatNodeRingPerfSpec extends WordSpec with Matchers { val iterations = sys.props.get("akka.cluster.HeartbeatNodeRingPerfSpec.iterations").getOrElse("1000").toInt def createHeartbeatNodeRingOfSize(size: Int): HeartbeatNodeRing = { - val nodes = (1 to size).map(n ⇒ UniqueAddress(Address("akka.tcp", "sys", "node-" + n, 2552), n)) + val nodes = (1 to size).map(n ⇒ UniqueAddress(Address("akka.tcp", "sys", "node-" + n, 2552), n.toLong)) val selfAddress = nodes(size / 2) HeartbeatNodeRing(selfAddress, nodes.toSet, Set.empty, 5) } diff --git a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala index 44a930eb3a..4affa49671 100644 --- a/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/HeartbeatNodeRingSpec.scala @@ -10,12 +10,12 @@ import akka.actor.Address class HeartbeatNodeRingSpec extends WordSpec with Matchers { - val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1) - val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2) - val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3) - val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4) - val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5) - val ff = UniqueAddress(Address("akka.tcp", "sys", "ff", 2552), 6) + val aa = UniqueAddress(Address("akka.tcp", "sys", "aa", 2552), 1L) + val bb = UniqueAddress(Address("akka.tcp", "sys", "bb", 2552), 2L) + val cc = UniqueAddress(Address("akka.tcp", "sys", "cc", 2552), 3L) + val dd = UniqueAddress(Address("akka.tcp", "sys", "dd", 2552), 4L) + val ee = UniqueAddress(Address("akka.tcp", "sys", "ee", 2552), 5L) + val ff = UniqueAddress(Address("akka.tcp", "sys", "ff", 2552), 6L) val nodes = Set(aa, bb, cc, dd, ee, ff) diff --git a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala index 999baa7166..80654dadb1 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MemberOrderingSpec.scala @@ -50,7 +50,7 @@ class MemberOrderingSpec extends WordSpec with Matchers { "have stable equals and hashCode" in { val address = Address("akka.tcp", "sys1", "host1", 9000) val m1 = m(address, Joining) - val m11 = Member(UniqueAddress(address, -3), Set.empty) + val m11 = Member(UniqueAddress(address, -3L), Set.empty) val m2 = m1.copy(status = Up) val m22 = m11.copy(status = Up) val m3 = m(address.copy(port = Some(10000)), Up) @@ -81,7 +81,7 @@ class MemberOrderingSpec extends WordSpec with Matchers { // different uid val a = m(address1, Joining) - val b = Member(UniqueAddress(address1, -3), Set.empty) + val b = Member(UniqueAddress(address1, -3L), Set.empty) Member.ordering.compare(a, b) should ===(1) Member.ordering.compare(b, a) should ===(-1) diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala index 4ad8af3ce7..b340a0a28d 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilityPerfSpec.scala @@ -19,9 +19,9 @@ class ReachabilityPerfSpec extends WordSpec with Matchers { private def createReachabilityOfSize(base: Reachability, size: Int): Reachability = (base /: (1 to size)) { case (r, i) ⇒ - val observer = UniqueAddress(address.copy(host = Some("node-" + i)), i) + val observer = UniqueAddress(address.copy(host = Some("node-" + i)), i.toLong) val j = if (i == size) 1 else i + 1 - val subject = UniqueAddress(address.copy(host = Some("node-" + j)), j) + val subject = UniqueAddress(address.copy(host = Some("node-" + j)), j.toLong) r.unreachable(observer, subject).reachable(observer, subject) } diff --git a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala index 3391cd2a21..58c7a18dc4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ReachabilitySpec.scala @@ -12,11 +12,11 @@ class ReachabilitySpec extends WordSpec with Matchers { import Reachability.{ Reachable, Unreachable, Terminated, Record } - val nodeA = UniqueAddress(Address("akka.tcp", "sys", "a", 2552), 1) - val nodeB = UniqueAddress(Address("akka.tcp", "sys", "b", 2552), 2) - val nodeC = UniqueAddress(Address("akka.tcp", "sys", "c", 2552), 3) - val nodeD = UniqueAddress(Address("akka.tcp", "sys", "d", 2552), 4) - val nodeE = UniqueAddress(Address("akka.tcp", "sys", "e", 2552), 5) + val nodeA = UniqueAddress(Address("akka.tcp", "sys", "a", 2552), 1L) + val nodeB = UniqueAddress(Address("akka.tcp", "sys", "b", 2552), 2L) + val nodeC = UniqueAddress(Address("akka.tcp", "sys", "c", 2552), 3L) + val nodeD = UniqueAddress(Address("akka.tcp", "sys", "d", 2552), 4L) + val nodeE = UniqueAddress(Address("akka.tcp", "sys", "e", 2552), 5L) "Reachability table" must { diff --git a/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala b/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala index 4251cfbdd2..14b4afb067 100644 --- a/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/StartupWithOneThreadSpec.scala @@ -17,6 +17,7 @@ object StartupWithOneThreadSpec { akka.actor.provider = "cluster" akka.actor.creation-timeout = 10s akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 akka.actor.default-dispatcher { executor = thread-pool-executor diff --git a/akka-cluster/src/test/scala/akka/cluster/TestMember.scala b/akka-cluster/src/test/scala/akka/cluster/TestMember.scala index 55f54cbf64..ef5299fa8a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/TestMember.scala +++ b/akka-cluster/src/test/scala/akka/cluster/TestMember.scala @@ -10,5 +10,5 @@ object TestMember { apply(address, status, Set.empty) def apply(address: Address, status: MemberStatus, roles: Set[String]): Member = - new Member(UniqueAddress(address, 0), Int.MaxValue, status, roles) + new Member(UniqueAddress(address, 0L), Int.MaxValue, status, roles) } diff --git a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala index 71855e7ce0..fa0359e532 100644 --- a/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -43,9 +43,9 @@ class ClusterMessageSerializerSpec extends AkkaSpec( "be serializable" in { val address = Address("akka.tcp", "system", "some.host.org", 4711) - val uniqueAddress = UniqueAddress(address, 17) + val uniqueAddress = UniqueAddress(address, 17L) val address2 = Address("akka.tcp", "system", "other.host.org", 4711) - val uniqueAddress2 = UniqueAddress(address2, 18) + val uniqueAddress2 = UniqueAddress(address2, 18L) checkSerialization(InternalClusterAction.Join(uniqueAddress, Set("foo", "bar"))) checkSerialization(ClusterUserAction.Leave(address)) checkSerialization(ClusterUserAction.Down(address)) @@ -54,6 +54,7 @@ class ClusterMessageSerializerSpec extends AkkaSpec( checkSerialization(InternalClusterAction.InitJoinNack(address)) checkSerialization(ClusterHeartbeatSender.Heartbeat(address)) checkSerialization(ClusterHeartbeatSender.HeartbeatRsp(uniqueAddress)) + checkSerialization(InternalClusterAction.ExitingConfirmed(uniqueAddress)) val node1 = VectorClock.Node("node1") val node2 = VectorClock.Node("node2") diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala index f444b8d2cc..6f36ed573f 100644 --- a/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/routing/ClusterRouterSupervisorSpec.scala @@ -24,6 +24,7 @@ object ClusterRouterSupervisorSpec { class ClusterRouterSupervisorSpec extends AkkaSpec(""" akka.actor.provider = "cluster" akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 """) { import ClusterRouterSupervisorSpec._ diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/WeightedRouteesSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/WeightedRouteesSpec.scala index b0252517b9..edddde0521 100644 --- a/akka-cluster/src/test/scala/akka/cluster/routing/WeightedRouteesSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/routing/WeightedRouteesSpec.scala @@ -15,6 +15,7 @@ import akka.routing.ActorRefRoutee class WeightedRouteesSpec extends AkkaSpec(ConfigFactory.parseString(""" akka.actor.provider = "cluster" akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 """)) { val protocol = diff --git a/akka-distributed-data/src/test/resources/reference.conf b/akka-distributed-data/src/test/resources/reference.conf index 69122cf472..4345bd355e 100644 --- a/akka-distributed-data/src/test/resources/reference.conf +++ b/akka-distributed-data/src/test/resources/reference.conf @@ -3,3 +3,7 @@ akka.actor { serialize-creators = on warn-about-java-serializer-usage = off } + +akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off +akka.coordinated-shutdown.terminate-actor-system = off +akka.cluster.run-coordinated-shutdown-when-down = off diff --git a/akka-docs/rst/java/cluster-sharding.rst b/akka-docs/rst/java/cluster-sharding.rst index 0d6ac5bca6..45063fc957 100644 --- a/akka-docs/rst/java/cluster-sharding.rst +++ b/akka-docs/rst/java/cluster-sharding.rst @@ -293,11 +293,8 @@ to the ``ShardRegion`` actor to handoff all shards that are hosted by that ``Sha During this period other regions will buffer messages for those shards in the same way as when a rebalance is triggered by the coordinator. When the shards have been stopped the coordinator will allocate these shards elsewhere. -When the ``ShardRegion`` has terminated you probably want to ``leave`` the cluster, and shut down the ``ActorSystem``. - -This is how to do that: - -.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#graceful-shutdown +This is performed automatically by the :ref:`coordinated-shutdown-lambda` and is therefore part of the +graceful leaving process of a cluster member. .. _RemoveInternalClusterShardingData-java: diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index ef4dac22bd..2183592f72 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -186,11 +186,16 @@ It can also be performed programmatically with: .. includecode:: code/docs/cluster/ClusterDocTest.java#leave Note that this command can be issued to any member in the cluster, not necessarily the -one that is leaving. The cluster extension, but not the actor system or JVM, of the -leaving member will be shutdown after the leader has changed status of the member to -`Exiting`. Thereafter the member will be removed from the cluster. Normally this is handled -automatically, but in case of network failures during this process it might still be necessary -to set the node’s status to ``Down`` in order to complete the removal. +one that is leaving. + +The :ref:`coordinated-shutdown-lambda` will automatically run when the cluster node sees itself as +``Exiting``, i.e. leaving from another node will trigger the shutdown process on the leaving node. +Tasks for graceful leaving of cluster including graceful shutdown of Cluster Singletons and +Cluster Sharding are added automatically when Akka Cluster is used, i.e. running the shutdown +process will also trigger the graceful leaving if it's not already in progress. + +Normally this is handled automatically, but in case of network failures during this process it might still +be necessary to set the node’s status to ``Down`` in order to complete the removal. .. _weakly_up_java: @@ -357,9 +362,7 @@ How To Cleanup when Member is Removed You can do some clean up in a ``registerOnMemberRemoved`` callback, which will be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown. -For example, this is how to shut down the ``ActorSystem`` and thereafter exit the JVM: - -.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnRemoved +An alternative is to register tasks to the :ref:`coordinated-shutdown-lambda`. .. note:: Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on diff --git a/akka-docs/rst/java/code/docs/actorlambda/ActorDocTest.java b/akka-docs/rst/java/code/docs/actorlambda/ActorDocTest.java index 18ae5730d0..67cc354909 100644 --- a/akka-docs/rst/java/code/docs/actorlambda/ActorDocTest.java +++ b/akka-docs/rst/java/code/docs/actorlambda/ActorDocTest.java @@ -12,11 +12,17 @@ import akka.testkit.TestEvent; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import docs.AbstractJavaTest; +import docs.actor.ActorDocTest.FirstActor; import scala.PartialFunction; import scala.runtime.BoxedUnit; import static docs.actorlambda.Messages.Swap.Swap; import static docs.actorlambda.Messages.*; import static akka.japi.Util.immutableSeq; +import akka.actor.CoordinatedShutdown; +import static akka.pattern.PatternsCS.ask; +import akka.util.Timeout; +import akka.Done; +import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -677,5 +683,31 @@ public class ActorDocTest extends AbstractJavaTest { system.eventStream().publish(new TestEvent.UnMute(immutableSeq(ignoreExceptions))); } } + + @Test + public void coordinatedShutdown() { + final ActorRef someActor = system.actorOf(Props.create(FirstActor.class)); + //#coordinated-shutdown-addTask + CoordinatedShutdown.get(system).addTask( + CoordinatedShutdown.PhaseBeforeServiceUnbind(), "someTaskName", + () -> { + return ask(someActor, "stop", new Timeout(5, TimeUnit.SECONDS)) + .thenApply(reply -> Done.getInstance()); + }); + //#coordinated-shutdown-addTask + + //#coordinated-shutdown-jvm-hook + CoordinatedShutdown.get(system).addJvmShutdownHook(() -> + System.out.println("custom JVM shutdown hook...") + ); + //#coordinated-shutdown-jvm-hook + + // don't run this + if (false) { + //#coordinated-shutdown-run + CompletionStage done = CoordinatedShutdown.get(system).runAll(); + //#coordinated-shutdown-run + } + } } diff --git a/akka-docs/rst/java/lambda-actors.rst b/akka-docs/rst/java/lambda-actors.rst index d24480aa54..db4cbc6251 100644 --- a/akka-docs/rst/java/lambda-actors.rst +++ b/akka-docs/rst/java/lambda-actors.rst @@ -764,6 +764,76 @@ before stopping the target actor. Simple cleanup tasks can be handled in ``postS within a supervisor you control and only in response to a :class:`Terminated` message, i.e. not for top-level actors. +.. _coordinated-shutdown-lambda: + +Coordinated Shutdown +-------------------- + +There is an extension named ``CoordinatedShutdown`` that will stop certain actors and +services in a specific order and perform registered tasks during the shutdown process. + +The order of the shutdown phases is defined in configuration ``akka.coordinated-shutdown.phases``. +The default phases are defined as: + +.. includecode:: ../../../akka-actor/src/main/resources/reference.conf#coordinated-shutdown-phases + +More phases can be be added in the application's configuration if needed by overriding a phase with an +additional ``depends-on``. Especially the phases ``before-service-unbind``, ``before-cluster-shutdown`` and +``before-actor-system-terminate`` are intended for application specific phases or tasks. + +The default phases are defined in a single linear order, but the phases can be ordered as a +directed acyclic graph (DAG) by defining the dependencies between the phases. +The phases are ordered with `topological `_ sort of the DAG. + +Tasks can be added to a phase with: + +.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-addTask + +The returned ``CompletionStage`` should be completed when the task is completed. The task name parameter +is only used for debugging/logging. + +Tasks added to the same phase are executed in parallel without any ordering assumptions. +Next phase will not start until all tasks of previous phase have been completed. + +If tasks are not completed within a configured timeout (see :ref:`reference.conf `) +the next phase will be started anyway. It is possible to configure ``recover=off`` for a phase +to abort the rest of the shutdown process if a task fails or is not completed within the timeout. + +Tasks should typically be registered as early as possible after system startup. When running +the coordinated shutdown tasks that have been registered will be performed but tasks that are +added too late will not be run. + +To start the coordinated shutdown process you can invoke ``runAll`` on the ``CoordinatedShutdown`` +extension: + +.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-run + +It's safe to call the ``runAll`` method multiple times. It will only run once. + +That also means that the ``ActorSystem`` will be terminated in the last phase. By default, the +JVM is not forcefully stopped (it will be stopped if all non-daemon threads have been terminated). +To enable a hard ``System.exit`` as a final action you can configure:: + + akka.coordinated-shutdown.exit-jvm = on + +When using :ref:`Akka Cluster ` the ``CoordinatedShutdown`` will automatically run +when the cluster node sees itself as ``Exiting``, i.e. leaving from another node will trigger +the shutdown process on the leaving node. Tasks for graceful leaving of cluster including graceful +shutdown of Cluster Singletons and Cluster Sharding are added automatically when Akka Cluster is used, +i.e. running the shutdown process will also trigger the graceful leaving if it's not already in progress. + +By default, the ``CoordinatedShutdown`` will be run when the JVM process exits, e.g. +via ``kill SIGTERM`` signal (``SIGINT`` ctrl-c doesn't work). This behavior can be disabled with:: + + akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off + +If you have application specific JVM shutdown hooks it's recommended that you register them via the +``CoordinatedShutdown`` so that they are running before Akka internal shutdown hooks, e.g. +those shutting down Akka Remoting (Artery). + +.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-jvm-hook + + .. _actor-hotswap-lambda: Become/Unbecome diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index b9da5bc02f..7f22f40026 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -714,6 +714,73 @@ before stopping the target actor. Simple cleanup tasks can be handled in ``postS returned. In order to guarantee proper deregistration, only reuse names from within a supervisor you control and only in response to a :class:`Terminated` message, i.e. not for top-level actors. + +Coordinated Shutdown +-------------------- + +There is an extension named ``CoordinatedShutdown`` that will stop certain actors and +services in a specific order and perform registered tasks during the shutdown process. + +The order of the shutdown phases is defined in configuration ``akka.coordinated-shutdown.phases``. +The default phases are defined as: + +.. includecode:: ../../../akka-actor/src/main/resources/reference.conf#coordinated-shutdown-phases + +More phases can be be added in the application's configuration if needed by overriding a phase with an +additional ``depends-on``. Especially the phases ``before-service-unbind``, ``before-cluster-shutdown`` and +``before-actor-system-terminate`` are intended for application specific phases or tasks. + +The default phases are defined in a single linear order, but the phases can be ordered as a +directed acyclic graph (DAG) by defining the dependencies between the phases. +The phases are ordered with `topological `_ sort of the DAG. + +Tasks can be added to a phase with: + +.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-addTask + +The returned ``CompletionStage`` should be completed when the task is completed. The task name parameter +is only used for debugging/logging. + +Tasks added to the same phase are executed in parallel without any ordering assumptions. +Next phase will not start until all tasks of previous phase have been completed. + +If tasks are not completed within a configured timeout (see :ref:`reference.conf `) +the next phase will be started anyway. It is possible to configure ``recover=off`` for a phase +to abort the rest of the shutdown process if a task fails or is not completed within the timeout. + +Tasks should typically be registered as early as possible after system startup. When running +the coordinated shutdown tasks that have been registered will be performed but tasks that are +added too late will not be run. + +To start the coordinated shutdown process you can invoke ``runAll`` on the ``CoordinatedShutdown`` +extension: + +.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-run + +It's safe to call the ``runAll`` method multiple times. It will only run once. + +That also means that the ``ActorSystem`` will be terminated in the last phase. By default, the +JVM is not forcefully stopped (it will be stopped if all non-daemon threads have been terminated). +To enable a hard ``System.exit`` as a final action you can configure:: + + akka.coordinated-shutdown.exit-jvm = on + +When using :ref:`Akka Cluster ` the ``CoordinatedShutdown`` will automatically run +when the cluster node sees itself as ``Exiting``, i.e. leaving from another node will trigger +the shutdown process on the leaving node. Tasks for graceful leaving of cluster including graceful +shutdown of Cluster Singletons and Cluster Sharding are added automatically when Akka Cluster is used, +i.e. running the shutdown process will also trigger the graceful leaving if it's not already in progress. + +By default, the ``CoordinatedShutdown`` will be run when the JVM process exits, e.g. +via ``kill SIGTERM`` signal (``SIGINT`` ctrl-c doesn't work). This behavior can be disabled with:: + + akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off + +If you have application specific JVM shutdown hooks it's recommended that you register them via the +``CoordinatedShutdown`` so that they are running before Akka internal shutdown hooks, e.g. +those shutting down Akka Remoting (Artery). + +.. includecode:: code/docs/actorlambda/ActorDocTest.java#coordinated-shutdown-jvm-hook .. _UntypedActor.HotSwap: diff --git a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst index 8e17736b39..3bf5d6c271 100644 --- a/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst +++ b/akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst @@ -4,8 +4,8 @@ Migration Guide 2.4.x to 2.5.x ############################## -Akka Actor -========== +Actor +===== Actor DSL deprecation --------------------- @@ -13,8 +13,8 @@ Actor DSL deprecation Actor DSL is a rarely used feature and thus will be deprecated and removed. Use plain ``system.actorOf`` instead of the DSL to create Actors if you have been using it. -Akka Streams -============ +Streams +======= Removal of StatefulStage, PushPullStage --------------------------------------- @@ -85,6 +85,25 @@ in akka-remote's `reference.conf`_. Cluster ======= +Coordinated Shutdown +-------------------- + +There is a new extension named ``CoordinatedShutdown`` that will stop certain actors and +services in a specific order and perform registered tasks during the shutdown process. + +When using Akka Cluster, tasks for graceful leaving of cluster including graceful +shutdown of Cluster Singletons and Cluster Sharding are now performed automatically. + +Previously it was documented that things like terminating the ``ActorSystem`` should be +done when the cluster member was removed, but this was very difficult to get right. +That is now taken care of automatically. This might result in changed behavior, hopefully +to the better. It might also be in conflict with your previous shutdown code so please +read the documentation for the Coordinated Shutdown and revisit your own implementations. +Most likely your implementation will not be needed any more or it can be simplified. + +More information can be found in the :ref:`documentation for Scala ` or +:ref:`documentation for Java ` + Cluster Management Command Line Tool ------------------------------------ @@ -97,8 +116,8 @@ See documentation of `akka/akka-cluster-management `_ or similar instead. -Akka Persistence -================ +Persistence +=========== Removal of PersistentView ------------------------- @@ -123,8 +142,8 @@ non-sharable journal or snapshot store. The proxy is available by setting ``akka ``akka.persistence.snapshot-store.plugin`` to ``akka.persistence.journal.proxy`` or ``akka.persistence.snapshot-store.proxy``, respectively. The proxy supplants the :ref:`Shared LevelDB journal`. -Akka Persistence Query -====================== +Persistence Query +================= Persistence Query has been promoted to a stable module. Only slight API changes were made since the module was introduced: diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index f24cc6d490..33bd4c404a 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -785,6 +785,76 @@ before stopping the target actor. Simple cleanup tasks can be handled in ``postS within a supervisor you control and only in response to a :class:`Terminated` message, i.e. not for top-level actors. +.. _coordinated-shutdown-scala: + +Coordinated Shutdown +-------------------- + +There is an extension named ``CoordinatedShutdown`` that will stop certain actors and +services in a specific order and perform registered tasks during the shutdown process. + +The order of the shutdown phases is defined in configuration ``akka.coordinated-shutdown.phases``. +The default phases are defined as: + +.. includecode:: ../../../akka-actor/src/main/resources/reference.conf#coordinated-shutdown-phases + +More phases can be be added in the application's configuration if needed by overriding a phase with an +additional ``depends-on``. Especially the phases ``before-service-unbind``, ``before-cluster-shutdown`` and +``before-actor-system-terminate`` are intended for application specific phases or tasks. + +The default phases are defined in a single linear order, but the phases can be ordered as a +directed acyclic graph (DAG) by defining the dependencies between the phases. +The phases are ordered with `topological `_ sort of the DAG. + +Tasks can be added to a phase with: + +.. includecode:: code/docs/actor/ActorDocSpec.scala#coordinated-shutdown-addTask + +The returned ``Future[Done]`` should be completed when the task is completed. The task name parameter +is only used for debugging/logging. + +Tasks added to the same phase are executed in parallel without any ordering assumptions. +Next phase will not start until all tasks of previous phase have been completed. + +If tasks are not completed within a configured timeout (see :ref:`reference.conf `) +the next phase will be started anyway. It is possible to configure ``recover=off`` for a phase +to abort the rest of the shutdown process if a task fails or is not completed within the timeout. + +Tasks should typically be registered as early as possible after system startup. When running +the coordinated shutdown tasks that have been registered will be performed but tasks that are +added too late will not be run. + +To start the coordinated shutdown process you can invoke ``run`` on the ``CoordinatedShutdown`` +extension: + +.. includecode:: code/docs/actor/ActorDocSpec.scala#coordinated-shutdown-run + +It's safe to call the ``run`` method multiple times. It will only run once. + +That also means that the ``ActorSystem`` will be terminated in the last phase. By default, the +JVM is not forcefully stopped (it will be stopped if all non-daemon threads have been terminated). +To enable a hard ``System.exit`` as a final action you can configure:: + + akka.coordinated-shutdown.exit-jvm = on + +When using :ref:`Akka Cluster ` the ``CoordinatedShutdown`` will automatically run +when the cluster node sees itself as ``Exiting``, i.e. leaving from another node will trigger +the shutdown process on the leaving node. Tasks for graceful leaving of cluster including graceful +shutdown of Cluster Singletons and Cluster Sharding are added automatically when Akka Cluster is used, +i.e. running the shutdown process will also trigger the graceful leaving if it's not already in progress. + +By default, the ``CoordinatedShutdown`` will be run when the JVM process exits, e.g. +via ``kill SIGTERM`` signal (``SIGINT`` ctrl-c doesn't work). This behavior can be disabled with:: + + akka.coordinated-shutdown.run-by-jvm-shutdown-hook=off + +If you have application specific JVM shutdown hooks it's recommended that you register them via the +``CoordinatedShutdown`` so that they are running before Akka internal shutdown hooks, e.g. +those shutting down Akka Remoting (Artery). + +.. includecode:: code/docs/actor/ActorDocSpec.scala#coordinated-shutdown-jvm-hook + + .. _Actor.HotSwap: Become/Unbecome diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index b52755417d..7c83c57065 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -295,11 +295,8 @@ You can send the message ``ShardRegion.GracefulShutdown`` message to the ``Shard During this period other regions will buffer messages for those shards in the same way as when a rebalance is triggered by the coordinator. When the shards have been stopped the coordinator will allocate these shards elsewhere. -When the ``ShardRegion`` has terminated you probably want to ``leave`` the cluster, and shut down the ``ActorSystem``. - -This is how to do that: - -.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala#graceful-shutdown +This is performed automatically by the :ref:`coordinated-shutdown-scala` and is therefore part of the +graceful leaving process of a cluster member. .. _RemoveInternalClusterShardingData-scala: diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index b9315efbbc..32faa240b8 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -182,11 +182,16 @@ It can also be performed programmatically with: .. includecode:: code/docs/cluster/ClusterDocSpec.scala#leave Note that this command can be issued to any member in the cluster, not necessarily the -one that is leaving. The cluster extension, but not the actor system or JVM, of the -leaving member will be shutdown after the leader has changed status of the member to -`Exiting`. Thereafter the member will be removed from the cluster. Normally this is handled -automatically, but in case of network failures during this process it might still be necessary -to set the node’s status to ``Down`` in order to complete the removal. +one that is leaving. + +The :ref:`coordinated-shutdown-scala` will automatically run when the cluster node sees itself as +``Exiting``, i.e. leaving from another node will trigger the shutdown process on the leaving node. +Tasks for graceful leaving of cluster including graceful shutdown of Cluster Singletons and +Cluster Sharding are added automatically when Akka Cluster is used, i.e. running the shutdown +process will also trigger the graceful leaving if it's not already in progress. + +Normally this is handled automatically, but in case of network failures during this process it might still +be necessary to set the node’s status to ``Down`` in order to complete the removal. .. _weakly_up_scala: @@ -353,9 +358,7 @@ How To Cleanup when Member is Removed You can do some clean up in a ``registerOnMemberRemoved`` callback, which will be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown. -For example, this is how to shut down the ``ActorSystem`` and thereafter exit the JVM: - -.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnRemoved +An alternative is to register tasks to the :ref:`coordinated-shutdown-scala`. .. note:: Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on diff --git a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala index f5a6072b19..bcdd62620a 100644 --- a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala @@ -20,6 +20,8 @@ import akka.testkit._ import akka.util._ import scala.concurrent.duration._ import scala.concurrent.Await +import akka.Done +import akka.actor.CoordinatedShutdown //#my-actor class MyActor extends Actor { @@ -631,4 +633,30 @@ class ActorDocSpec extends AkkaSpec(""" }) } + "using CoordinatedShutdown" in { + val someActor = system.actorOf(Props(classOf[Replier], this)) + //#coordinated-shutdown-addTask + CoordinatedShutdown(system).addTask( + CoordinatedShutdown.PhaseBeforeServiceUnbind, "someTaskName") { () => + import akka.pattern.ask + import system.dispatcher + implicit val timeout = Timeout(5.seconds) + (someActor ? "stop").map(_ => Done) + } + //#coordinated-shutdown-addTask + + //#coordinated-shutdown-jvm-hook + CoordinatedShutdown(system).addJvmShutdownHook { () => + println("custom JVM shutdown hook...") + } + //#coordinated-shutdown-jvm-hook + + // don't run this + def dummy(): Unit = { + //#coordinated-shutdown-run + val done: Future[Done] = CoordinatedShutdown(system).run() + //#coordinated-shutdown-run + } + } + } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 398ff811d8..de628d7a31 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -33,7 +33,7 @@ object MaxThroughputSpec extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback( ConfigFactory.parseString(s""" # for serious measurements you should increase the totalMessagesFactor (20) - akka.test.MaxThroughputSpec.totalMessagesFactor = 1.0 + akka.test.MaxThroughputSpec.totalMessagesFactor = 10.0 akka.test.MaxThroughputSpec.real-message = off akka { loglevel = INFO diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 47a459c881..48e26b3e3f 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -460,7 +460,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private lazy val shutdownHook = new Thread { override def run(): Unit = { if (hasBeenShutdown.compareAndSet(false, true)) { - log.debug("Shutting down [{}] via shutdownHook", localAddress) + val coord = CoordinatedShutdown(system) + val totalTimeout = coord.totalTimeout() + if (!coord.jvmHooksLatch.await(totalTimeout.toMillis, TimeUnit.MILLISECONDS)) + log.warning( + "CoordinatedShutdown took longer than [{}]. Shutting down [{}] via shutdownHook", + totalTimeout, localAddress) + else + log.debug("Shutting down [{}] via shutdownHook", localAddress) Await.result(internalShutdown(), settings.Advanced.DriverTimeout + 3.seconds) } } diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java index 8a6317ef96..f25a355be1 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java @@ -34,39 +34,6 @@ public class FactorialFrontendMain { }); //#registerOnUp - //#registerOnRemoved - Cluster.get(system).registerOnMemberRemoved(new Runnable() { - @Override - public void run() { - // exit JVM when ActorSystem has been terminated - final Runnable exit = new Runnable() { - @Override public void run() { - System.exit(0); - } - }; - system.registerOnTermination(exit); - - // shut down ActorSystem - system.terminate(); - - // In case ActorSystem shutdown takes longer than 10 seconds, - // exit the JVM forcefully anyway. - // We must spawn a separate thread to not block current thread, - // since that would have blocked the shutdown of the ActorSystem. - new Thread() { - @Override public void run(){ - try { - Await.ready(system.whenTerminated(), Duration.create(10, TimeUnit.SECONDS)); - } catch (Exception e) { - System.exit(-1); - } - - } - }.start(); - } - }); - //#registerOnRemoved - } } diff --git a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala index 16e49a9250..61bc5c7dc0 100644 --- a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala +++ b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala @@ -60,25 +60,5 @@ object FactorialFrontend { } //#registerOnUp - //#registerOnRemoved - Cluster(system).registerOnMemberRemoved { - // exit JVM when ActorSystem has been terminated - system.registerOnTermination(System.exit(0)) - // shut down ActorSystem - system.terminate() - - // In case ActorSystem shutdown takes longer than 10 seconds, - // exit the JVM forcefully anyway. - // We must spawn a separate thread to not block current thread, - // since that would have blocked the shutdown of the ActorSystem. - new Thread { - override def run(): Unit = { - if (Try(Await.ready(system.whenTerminated, 10.seconds)).isFailure) - System.exit(-1) - } - }.start() - } - //#registerOnRemoved - } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 40d7fd8079..d20bbffc73 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -10,7 +10,7 @@ import akka.stream.impl.StreamLayout.Module import akka.stream.impl._ import akka.stream.impl.fusing._ import akka.stream.stage._ -import org.reactivestreams.{Processor, Publisher, Subscriber, Subscription} +import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable diff --git a/project/MiMa.scala b/project/MiMa.scala index 014e78dad4..4eee635b4a 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -82,6 +82,11 @@ object MiMa extends AutoPlugin { import com.typesafe.tools.mima.core._ val bcIssuesBetween24and25 = Seq( + + // #21537 coordinated shutdown + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ClusterCoreDaemon.removed"), + ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.Gossip.convergence"), + // #21423 removal of deprecated stages (in 2.5.x) ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.transform"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SubSource.transform"),