Run all CoordinatedShutdown phases also when downing, #24048

This commit is contained in:
Patrik Nordwall 2017-11-23 08:48:38 +01:00
parent cbc1c9a4f0
commit fa3da328be
5 changed files with 183 additions and 14 deletions

View file

@ -406,9 +406,13 @@ private[akka] class ShardRegion(
CoordinatedShutdown(context.system).addTask(
CoordinatedShutdown.PhaseClusterShardingShutdownRegion,
"region-shutdown") { ()
if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) {
Future.successful(Done)
} else {
self ! GracefulShutdown
gracefulShutdownProgress.future
}
}
// subscribe to MemberEvent, re-subscribe when restart
override def preStart(): Unit = {

View file

@ -0,0 +1,151 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster.sharding
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.Done
import akka.actor.ActorSystem
import akka.actor.CoordinatedShutdown
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.testkit.AkkaSpec
import akka.testkit.TestActors.EchoActor
import akka.testkit.TestProbe
object CoordinatedShutdownShardingSpec {
val config =
"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
"""
val extractEntityId: ShardRegion.ExtractEntityId = {
case msg: Int (msg.toString, msg)
}
val extractShardId: ShardRegion.ExtractShardId = {
case msg: Int (msg % 10).toString
}
}
class CoordinatedShutdownShardingSpec extends AkkaSpec(CoordinatedShutdownShardingSpec.config) {
import CoordinatedShutdownShardingSpec._
val sys1 = ActorSystem(system.name, system.settings.config)
val sys2 = ActorSystem(system.name, system.settings.config)
val sys3 = system
val region1 = ClusterSharding(sys1).start("type1", Props[EchoActor](), ClusterShardingSettings(sys1),
extractEntityId, extractShardId)
val region2 = ClusterSharding(sys2).start("type1", Props[EchoActor](), ClusterShardingSettings(sys2),
extractEntityId, extractShardId)
val region3 = ClusterSharding(sys3).start("type1", Props[EchoActor](), ClusterShardingSettings(sys3),
extractEntityId, extractShardId)
val probe1 = TestProbe()(sys1)
val probe2 = TestProbe()(sys2)
val probe3 = TestProbe()(sys3)
CoordinatedShutdown(sys1).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { ()
probe1.ref ! "CS-unbind-1"
Future.successful(Done)
}
CoordinatedShutdown(sys2).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { ()
probe2.ref ! "CS-unbind-2"
Future.successful(Done)
}
CoordinatedShutdown(sys3).addTask(CoordinatedShutdown.PhaseBeforeServiceUnbind, "unbind") { ()
probe1.ref ! "CS-unbind-3"
Future.successful(Done)
}
override def beforeTermination(): Unit = {
shutdown(sys1)
shutdown(sys2)
}
def pingEntities(): Unit = {
region3.tell(1, probe3.ref)
probe3.expectMsg(10.seconds, 1)
region3.tell(2, probe3.ref)
probe3.expectMsg(2)
region3.tell(3, probe3.ref)
probe3.expectMsg(3)
}
"Sharding and CoordinatedShutdown" must {
"init cluster" in {
Cluster(sys1).join(Cluster(sys1).selfAddress) // coordinator will initially run on sys2
awaitAssert(Cluster(sys1).selfMember.status should ===(MemberStatus.Up))
Cluster(sys2).join(Cluster(sys1).selfAddress)
within(10.seconds) {
awaitAssert {
Cluster(sys1).state.members.size should ===(2)
Cluster(sys1).state.members.map(_.status) should ===(Set(MemberStatus.Up))
Cluster(sys2).state.members.size should ===(2)
Cluster(sys2).state.members.map(_.status) should ===(Set(MemberStatus.Up))
}
}
Cluster(sys3).join(Cluster(sys1).selfAddress)
within(10.seconds) {
awaitAssert {
Cluster(sys1).state.members.size should ===(3)
Cluster(sys1).state.members.map(_.status) should ===(Set(MemberStatus.Up))
Cluster(sys2).state.members.size should ===(3)
Cluster(sys2).state.members.map(_.status) should ===(Set(MemberStatus.Up))
Cluster(sys3).state.members.size should ===(3)
Cluster(sys3).state.members.map(_.status) should ===(Set(MemberStatus.Up))
}
}
pingEntities()
}
"run coordinated shutdown when leaving" in {
Cluster(sys3).leave(Cluster(sys1).selfAddress)
probe1.expectMsg("CS-unbind-1")
within(10.seconds) {
awaitAssert {
Cluster(sys2).state.members.size should ===(2)
Cluster(sys3).state.members.size should ===(2)
}
}
within(10.seconds) {
awaitAssert {
Cluster(sys1).isTerminated should ===(true)
sys1.whenTerminated.isCompleted should ===(true)
}
}
pingEntities()
}
"run coordinated shutdown when downing" in {
Cluster(sys3).down(Cluster(sys2).selfAddress)
probe2.expectMsg("CS-unbind-2")
within(10.seconds) {
awaitAssert {
Cluster(system).state.members.size should ===(1)
}
}
within(10.seconds) {
awaitAssert {
Cluster(sys2).isTerminated should ===(true)
sys2.whenTerminated.isCompleted should ===(true)
}
}
pingEntities()
}
}
}

View file

@ -13,6 +13,8 @@ object GetShardTypeNamesSpec {
"""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
"""
val extractEntityId: ShardRegion.ExtractEntityId = {

View file

@ -5,9 +5,10 @@
package akka.cluster.singleton
import com.typesafe.config.Config
import scala.concurrent.duration._
import scala.collection.immutable
import scala.concurrent.Future
import akka.actor.Actor
import akka.actor.Deploy
import akka.actor.ActorSystem
@ -26,8 +27,8 @@ import akka.AkkaException
import akka.actor.NoSerializationVerificationNeeded
import akka.cluster.UniqueAddress
import akka.cluster.ClusterEvent
import scala.concurrent.Promise
import akka.Done
import akka.actor.CoordinatedShutdown
import akka.annotation.DoNotInherit
@ -254,10 +255,14 @@ object ClusterSingletonManager {
// should preferably complete before stopping the singleton sharding coordinator on same node.
val coordShutdown = CoordinatedShutdown(context.system)
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-1") { ()
if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) {
Future.successful(Done)
} else {
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting))
self.ask(SelfExiting).mapTo[Done]
}
}
}
override def postStop(): Unit = cluster.unsubscribe(self)
private val selfDc = ClusterSettings.DcRolePrefix + cluster.settings.SelfDataCenter
@ -468,12 +473,20 @@ class ClusterSingletonManager(
// for CoordinatedShutdown
val coordShutdown = CoordinatedShutdown(context.system)
val memberExitingProgress = Promise[Done]()
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-singleton-exiting")(()
memberExitingProgress.future)
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "wait-singleton-exiting") { ()
if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down)
Future.successful(Done)
else
memberExitingProgress.future
}
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExiting, "singleton-exiting-2") { ()
if (cluster.isTerminated || cluster.selfMember.status == MemberStatus.Down) {
Future.successful(Done)
} else {
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExiting))
self.ask(SelfExiting).mapTo[Done]
}
}
def logInfo(message: String): Unit =
if (LogInfo) log.info(message)

View file

@ -176,7 +176,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterLeave, "leave") {
val sys = context.system
()
if (Cluster(sys).isTerminated)
if (Cluster(sys).isTerminated || Cluster(sys).selfMember.status == Down)
Future.successful(Done)
else {
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterLeave))
@ -190,8 +190,7 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac
override def postStop(): Unit = {
clusterShutdown.trySuccess(Done)
if (Cluster(context.system).settings.RunCoordinatedShutdownWhenDown) {
// run the last phases e.g. if node was downed (not leaving)
coordShutdown.run(Some(CoordinatedShutdown.PhaseClusterShutdown))
coordShutdown.run()
}
}
@ -325,7 +324,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
coordShutdown.addTask(CoordinatedShutdown.PhaseClusterExitingDone, "exiting-completed") {
val sys = context.system
()
if (Cluster(sys).isTerminated)
if (Cluster(sys).isTerminated || Cluster(sys).selfMember.status == Down)
Future.successful(Done)
else {
implicit val timeout = Timeout(coordShutdown.timeout(CoordinatedShutdown.PhaseClusterExitingDone))