diff --git a/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml b/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml
index 51541ba6f6..e36652a969 100644
--- a/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml
+++ b/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml
@@ -8,7 +8,7 @@
%date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n
-
+
diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala
new file mode 100644
index 0000000000..31e1e3c410
--- /dev/null
+++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala
@@ -0,0 +1,139 @@
+/*
+ * Copyright (C) 2020 Lightbend Inc.
+ */
+
+package akka.cluster.sharding.typed
+
+import akka.actor.testkit.typed.scaladsl.TestProbe
+import akka.actor.typed.ActorRef
+import akka.actor.typed.Behavior
+import akka.util.ccompat._
+import akka.actor.typed.scaladsl.Behaviors
+import akka.cluster.MemberStatus
+import akka.cluster.MemberStatus.Removed
+import akka.cluster.sharding.typed.ClusterShardingPreparingForShutdownSpec.Pinger.Command
+import akka.cluster.sharding.typed.scaladsl.ClusterSharding
+import akka.cluster.sharding.typed.scaladsl.Entity
+import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
+import akka.cluster.typed.Leave
+import akka.cluster.typed.MultiNodeTypedClusterSpec
+import akka.cluster.typed.PrepareForFullClusterShutdown
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.serialization.jackson.CborSerializable
+import com.typesafe.config.ConfigFactory
+
+import scala.concurrent.duration._
+
+object ClusterShardingPreparingForShutdownSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+
+ commonConfig(ConfigFactory.parseString("""
+ akka.loglevel = DEBUG
+ akka.actor.provider = "cluster"
+ akka.remote.log-remote-lifecycle-events = off
+ akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
+ akka.cluster.testkit.auto-down-unreachable-after = off
+ akka.cluster.leader-actions-interval = 100ms
+ """))
+
+ object Pinger {
+ sealed trait Command extends CborSerializable
+ case class Ping(id: Int, ref: ActorRef[Pong]) extends Command
+ case class Pong(id: Int) extends CborSerializable
+
+ def apply(): Behavior[Command] = Behaviors.setup { _ =>
+ Behaviors.receiveMessage[Command] {
+ case Ping(id: Int, ref) =>
+ ref ! Pong(id)
+ Behaviors.same
+ }
+ }
+
+ }
+
+ val typeKey = EntityTypeKey[Command]("ping")
+}
+
+class ClusterShardingPreparingForShutdownMultiJvmNode1 extends ClusterShardingPreparingForShutdownSpec
+class ClusterShardingPreparingForShutdownMultiJvmNode2 extends ClusterShardingPreparingForShutdownSpec
+class ClusterShardingPreparingForShutdownMultiJvmNode3 extends ClusterShardingPreparingForShutdownSpec
+
+@ccompatUsedUntil213
+class ClusterShardingPreparingForShutdownSpec
+ extends MultiNodeSpec(ClusterShardingPreparingForShutdownSpec)
+ with MultiNodeTypedClusterSpec {
+ import ClusterShardingPreparingForShutdownSpec._
+ import ClusterShardingPreparingForShutdownSpec.Pinger._
+
+ override def initialParticipants = roles.size
+
+ private val sharding = ClusterSharding(typedSystem)
+
+ "Preparing for shut down ClusterSharding" must {
+
+ "form cluster" in {
+ formCluster(first, second, third)
+ }
+
+ "not rebalance but should still work preparing for shutdown" in {
+
+ val shardRegion: ActorRef[ShardingEnvelope[Command]] =
+ sharding.init(Entity(typeKey)(_ => Pinger()))
+
+ val probe = TestProbe[Pong]()
+ shardRegion ! ShardingEnvelope("id1", Pinger.Ping(1, probe.ref))
+ probe.expectMessage(Pong(1))
+
+ runOn(second) {
+ cluster.manager ! PrepareForFullClusterShutdown
+
+ }
+ awaitAssert({
+ withClue("members: " + cluster.state.members) {
+ cluster.selfMember.status shouldEqual MemberStatus.ReadyForShutdown
+ cluster.state.members.unsorted.map(_.status) shouldEqual Set(MemberStatus.ReadyForShutdown)
+ }
+ }, 10.seconds)
+ enterBarrier("preparation-complete")
+
+ shardRegion ! ShardingEnvelope("id2", Pinger.Ping(2, probe.ref))
+ probe.expectMessage(Pong(2))
+
+ runOn(second) {
+ cluster.manager ! Leave(address(second))
+ }
+ awaitAssert({
+ runOn(first, third) {
+ withClue("members: " + cluster.state.members) {
+ cluster.state.members.size shouldEqual 2
+ }
+ }
+ runOn(second) {
+ withClue("self member: " + cluster.selfMember) {
+ cluster.selfMember.status shouldEqual MemberStatus.Removed
+ }
+ }
+ }, 5.seconds) // keep this lower than coordinated shutdown timeout
+
+ runOn(first, third) {
+ shardRegion ! ShardingEnvelope("id3", Pinger.Ping(3, probe.ref))
+ probe.expectMessage(Pong(3))
+ }
+ enterBarrier("new-shards-verified")
+
+ runOn(third) {
+ cluster.manager ! Leave(address(first))
+ cluster.manager ! Leave(address(third))
+ }
+ awaitAssert({
+ withClue("self member: " + cluster.selfMember) {
+ cluster.selfMember.status shouldEqual Removed
+ }
+ }, 15.seconds)
+ enterBarrier("done")
+ }
+ }
+}
diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala
index 3366559432..7931811acd 100644
--- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala
+++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala
@@ -63,7 +63,6 @@ abstract class ClusterShardingStatsSpec
import Pinger._
private val typeKey = EntityTypeKey[Command]("ping")
-
private val sharding = ClusterSharding(typedSystem)
private val settings = ClusterShardingSettings(typedSystem)
private val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all
diff --git a/akka-cluster-sharding/src/main/mima-filters/2.6.12.backwards.excludes/full-cluster-shutdown.excludes b/akka-cluster-sharding/src/main/mima-filters/2.6.12.backwards.excludes/full-cluster-shutdown.excludes
new file mode 100644
index 0000000000..6a743b2f22
--- /dev/null
+++ b/akka-cluster-sharding/src/main/mima-filters/2.6.12.backwards.excludes/full-cluster-shutdown.excludes
@@ -0,0 +1,2 @@
+# private[akka] class
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceTask")
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
index a0d71c9399..5dc9af7d21 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala
@@ -20,6 +20,10 @@ import akka.actor.Terminated
import akka.actor.Timers
import akka.annotation.InternalStableApi
import akka.cluster.Cluster
+import akka.cluster.ClusterEvent.InitialStateAsEvents
+import akka.cluster.ClusterEvent.MemberEvent
+import akka.cluster.ClusterEvent.MemberPreparingForShutdown
+import akka.cluster.ClusterEvent.MemberReadyForShutdown
import akka.cluster.sharding.internal.RememberEntitiesShardStore
import akka.cluster.sharding.internal.RememberEntitiesShardStore.GetEntities
import akka.cluster.sharding.internal.RememberEntitiesProvider
@@ -453,6 +457,7 @@ private[akka] class Shard(
private val messageBuffers = new MessageBufferMap[EntityId]
private var handOffStopper: Option[ActorRef] = None
+ private var preparingForShutdown = false
import context.dispatcher
private val passivateIdleTask = if (settings.shouldPassivateIdleEntities) {
@@ -479,6 +484,11 @@ private[akka] class Shard(
}
override def preStart(): Unit = {
+ Cluster(context.system).subscribe(
+ self,
+ InitialStateAsEvents,
+ classOf[MemberPreparingForShutdown],
+ classOf[MemberReadyForShutdown])
acquireLeaseIfNeeded()
}
@@ -509,6 +519,8 @@ private[akka] class Shard(
tryGetLease(lease.get)
case ll: LeaseLost =>
receiveLeaseLost(ll)
+ case me: MemberEvent =>
+ receiveMemberEvent(me)
case msg =>
if (verboseDebug)
log.debug(
@@ -519,6 +531,15 @@ private[akka] class Shard(
stash()
}
+ private def receiveMemberEvent(event: MemberEvent): Unit = event match {
+ case _: MemberReadyForShutdown | _: MemberPreparingForShutdown =>
+ if (!preparingForShutdown) {
+ log.info("{}: Preparing for shutdown", typeName)
+ preparingForShutdown = true
+ }
+ case _ =>
+ }
+
private def tryGetLease(l: Lease): Unit = {
log.info("{}: Acquiring lease {}", typeName, l.settings)
pipe(l.acquire(reason => self ! LeaseLost(reason)).map(r => LeaseAcquireResult(r, None)).recover {
@@ -548,6 +569,8 @@ private[akka] class Shard(
onEntitiesRemembered(entityIds)
case RememberEntityTimeout(GetEntities) =>
loadingEntityIdsFailed()
+ case me: MemberEvent =>
+ receiveMemberEvent(me)
case msg =>
if (verboseDebug)
log.debug(
@@ -590,6 +613,7 @@ private[akka] class Shard(
// when not remembering entities, we stay in this state all the time
def idle: Receive = {
case Terminated(ref) => receiveTerminated(ref)
+ case me: MemberEvent => receiveMemberEvent(me)
case EntityTerminated(ref) => entityTerminated(ref)
case msg: CoordinatorMessage => receiveCoordinatorMessage(msg)
case msg: RememberEntityCommand => receiveRememberEntityCommand(msg)
@@ -659,6 +683,7 @@ private[akka] class Shard(
throw new RuntimeException(
s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender()))
+ case me: MemberEvent => receiveMemberEvent(me)
case Terminated(ref) => receiveTerminated(ref)
case EntityTerminated(ref) => entityTerminated(ref)
case _: CoordinatorMessage => stash()
@@ -814,7 +839,8 @@ private[akka] class Shard(
}
private def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match {
- case HandOff(`shardId`) => handOff(sender())
+ case HandOff(`shardId`) =>
+ handOff(sender())
case HandOff(shard) =>
log.warning("{}: Shard [{}] can not hand off for another Shard [{}]", typeName, shardId, shard)
case _ => unhandled(msg)
@@ -839,7 +865,12 @@ private[akka] class Shard(
// does conversion so only do once
val activeEntities = entities.activeEntities()
- if (activeEntities.nonEmpty) {
+ if (preparingForShutdown) {
+ log.info("{}: HandOff shard [{}] while preparing for shutdown. Stopping right away.", typeName, shardId)
+ activeEntities.foreach { _ ! handOffStopMessage }
+ replyTo ! ShardStopped(shardId)
+ context.stop(self)
+ } else if (activeEntities.nonEmpty && !preparingForShutdown) {
val entityHandOffTimeout = (settings.tuningParameters.handOffTimeout - 5.seconds).max(1.seconds)
log.debug(
"{}: Starting HandOffStopper for shard [{}] to terminate [{}] entities.",
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala
index e7b25db59e..c53c78ded0 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala
@@ -639,7 +639,8 @@ object ShardCoordinator {
abstract class ShardCoordinator(
settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
- extends Actor {
+ extends Actor
+ with Timers {
import ShardCoordinator._
import ShardCoordinator.Internal._
@@ -661,6 +662,7 @@ abstract class ShardCoordinator(
var allRegionsRegistered = false
var state = State.empty.withRememberEntities(settings.rememberEntities)
+ var preparingForShutdown = false
// rebalanceInProgress for the ShardId keys, pending GetShardHome requests by the ActorRef values
var rebalanceInProgress = Map.empty[ShardId, Set[ActorRef]]
var rebalanceWorkers: Set[ActorRef] = Set.empty
@@ -672,14 +674,17 @@ abstract class ShardCoordinator(
import context.dispatcher
- val rebalanceTask =
- context.system.scheduler.scheduleWithFixedDelay(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
-
- cluster.subscribe(self, initialStateMode = InitialStateAsEvents, ClusterShuttingDown.getClass)
+ cluster.subscribe(
+ self,
+ initialStateMode = InitialStateAsEvents,
+ ClusterShuttingDown.getClass,
+ classOf[MemberReadyForShutdown],
+ classOf[MemberPreparingForShutdown])
protected def typeName: String
override def preStart(): Unit = {
+ timers.startTimerWithFixedDelay(RebalanceTick, RebalanceTick, rebalanceInterval)
allocationStrategy match {
case strategy: StartableAllocationStrategy =>
strategy.start()
@@ -691,7 +696,6 @@ abstract class ShardCoordinator(
override def postStop(): Unit = {
super.postStop()
- rebalanceTask.cancel()
cluster.unsubscribe(self)
}
@@ -786,7 +790,7 @@ abstract class ShardCoordinator(
}
case RebalanceTick =>
- if (state.regions.nonEmpty) {
+ if (state.regions.nonEmpty && !preparingForShutdown) {
val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress.keySet)
shardsFuture.value match {
case Some(Success(shards)) =>
@@ -896,6 +900,15 @@ abstract class ShardCoordinator(
// it will soon be stopped when singleton is stopped
context.become(shuttingDown)
+ case _: MemberPreparingForShutdown | _: MemberReadyForShutdown =>
+ if (!preparingForShutdown) {
+ log.info(
+ "{}: Shard coordinator detected prepare for full cluster shutdown. No new rebalances will take place.",
+ typeName)
+ timers.cancel(RebalanceTick)
+ preparingForShutdown = true
+ }
+
case ShardRegion.GetCurrentRegions =>
val reply = ShardRegion.CurrentRegions(state.regions.keySet.map { ref =>
if (ref.path.address.host.isEmpty) cluster.selfAddress
diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala
index e948cc7187..f311786dbd 100644
--- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala
+++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala
@@ -626,6 +626,7 @@ private[akka] class ShardRegion(
var startingShards = Set.empty[ShardId]
var handingOff = Set.empty[ActorRef]
var gracefulShutdownInProgress = false
+ var preparingForShutdown = false
import context.dispatcher
var retryCount = 0
@@ -759,6 +760,12 @@ private[akka] class ShardRegion(
context.stop(self)
}
+ case _: MemberReadyForShutdown | _: MemberPreparingForShutdown =>
+ if (!preparingForShutdown) {
+ log.info("{}. preparing for shutdown", typeName)
+ }
+ preparingForShutdown = true
+
case _: MemberEvent => // these are expected, no need to warn about them
case _ => unhandled(evt)
@@ -819,14 +826,18 @@ private[akka] class ShardRegion(
case BeginHandOff(shard) =>
log.debug("{}: BeginHandOff shard [{}]", typeName, shard)
- if (regionByShard.contains(shard)) {
- val regionRef = regionByShard(shard)
- val updatedShards = regions(regionRef) - shard
- if (updatedShards.isEmpty) regions -= regionRef
- else regions = regions.updated(regionRef, updatedShards)
- regionByShard -= shard
+ if (!preparingForShutdown) {
+ if (regionByShard.contains(shard)) {
+ val regionRef = regionByShard(shard)
+ val updatedShards = regions(regionRef) - shard
+ if (updatedShards.isEmpty) regions -= regionRef
+ else regions = regions.updated(regionRef, updatedShards)
+ regionByShard -= shard
+ }
+ sender() ! BeginHandOffAck(shard)
+ } else {
+ log.debug("{}: Ignoring begin handoff as preparing to shutdown", typeName)
}
- sender() ! BeginHandOffAck(shard)
case msg @ HandOff(shard) =>
log.debug("{}: HandOff shard [{}]", typeName, shard)
@@ -885,20 +896,28 @@ private[akka] class ShardRegion(
}
case GracefulShutdown =>
- log.debug("{}: Starting graceful shutdown of region and all its shards", typeName)
+ if (preparingForShutdown) {
+ log.debug(
+ "{}: Skipping graceful shutdown of region and all its shards as cluster is preparing for shutdown",
+ typeName)
+ gracefulShutdownProgress.trySuccess(Done)
+ context.stop(self)
+ } else {
+ log.debug("{}: Starting graceful shutdown of region and all its shards", typeName)
- val coordShutdown = CoordinatedShutdown(context.system)
- if (coordShutdown.getShutdownReason().isPresent) {
- // use a shorter timeout than the coordinated shutdown phase to be able to log better reason for the timeout
- val timeout = coordShutdown.timeout(CoordinatedShutdown.PhaseClusterShardingShutdownRegion) - 1.second
- if (timeout > Duration.Zero) {
- timers.startSingleTimer(GracefulShutdownTimeout, GracefulShutdownTimeout, timeout)
+ val coordShutdown = CoordinatedShutdown(context.system)
+ if (coordShutdown.getShutdownReason().isPresent) {
+ // use a shorter timeout than the coordinated shutdown phase to be able to log better reason for the timeout
+ val timeout = coordShutdown.timeout(CoordinatedShutdown.PhaseClusterShardingShutdownRegion) - 1.second
+ if (timeout > Duration.Zero) {
+ timers.startSingleTimer(GracefulShutdownTimeout, GracefulShutdownTimeout, timeout)
+ }
}
- }
- gracefulShutdownInProgress = true
- sendGracefulShutdownToCoordinatorIfInProgress()
- tryCompleteGracefulShutdownIfInProgress()
+ gracefulShutdownInProgress = true
+ sendGracefulShutdownToCoordinatorIfInProgress()
+ tryCompleteGracefulShutdownIfInProgress()
+ }
case GracefulShutdownTimeout =>
log.warning(
@@ -1258,9 +1277,7 @@ private[akka] class ShardRegion(
.orElse(entityProps match {
case Some(props) if !shardsByRef.values.exists(_ == id) =>
log.debug(ShardingLogMarker.shardStarted(typeName, id), "{}: Starting shard [{}] in region", typeName, id)
-
val name = URLEncoder.encode(id, "utf-8")
-
val shard = context.watch(
context.actorOf(
Shard
diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala
index 477ea0a4e8..e2167b5ef9 100644
--- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala
+++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala
@@ -9,9 +9,7 @@ import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration._
import scala.util.control.NonFatal
-
import com.typesafe.config.Config
-
import akka.AkkaException
import akka.Done
import akka.actor.Actor
@@ -526,6 +524,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
var oldestChangedBuffer: ActorRef = _
// Previous GetNext request delivered event and new GetNext is to be sent
var oldestChangedReceived = true
+ var preparingForFullShutdown = false
var selfExited = false
@@ -583,7 +582,13 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
require(!cluster.isTerminated, "Cluster node must not be terminated")
// subscribe to cluster changes, re-subscribe when restart
- cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved], classOf[MemberDowned])
+ cluster.subscribe(
+ self,
+ ClusterEvent.InitialStateAsEvents,
+ classOf[MemberRemoved],
+ classOf[MemberDowned],
+ classOf[MemberPreparingForShutdown],
+ classOf[MemberReadyForShutdown])
startTimerWithFixedDelay(CleanupTimer, Cleanup, 1.minute)
@@ -630,6 +635,28 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case Event(HandOverToMe, _) =>
// nothing to hand over in start
stay()
+
+ case Event(event: MemberEvent, _) =>
+ handleMemberEvent(event)
+ }
+
+ def handleMemberEvent(event: MemberEvent): State = {
+ event match {
+ case _: MemberRemoved if event.member.uniqueAddress == cluster.selfUniqueAddress =>
+ logInfo("Self removed, stopping ClusterSingletonManager")
+ stop()
+ case _: MemberDowned if event.member.uniqueAddress == cluster.selfUniqueAddress =>
+ logInfo("Self downed, stopping ClusterSingletonManager")
+ stop()
+ case _: MemberReadyForShutdown | _: MemberPreparingForShutdown =>
+ if (!preparingForFullShutdown) {
+ logInfo("Preparing for shut down, disabling expensive actions")
+ preparingForFullShutdown = true
+ }
+ stay()
+ case _ =>
+ stay()
+ }
}
when(Younger) {
@@ -640,7 +667,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
if (previousOldest.forall(removed.contains))
tryGotoOldest()
else {
- peer(previousOldest.head.address) ! HandOverToMe
+ if (!preparingForFullShutdown) {
+ peer(previousOldest.head.address) ! HandOverToMe
+ }
goto(BecomingOldest).using(BecomingOldestData(previousOldest))
}
} else {
@@ -656,18 +685,13 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
stay().using(YoungerData(newPreviousOldest))
}
- case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
- logInfo("Self downed, stopping ClusterSingletonManager")
- stop()
-
- case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
- logInfo("Self removed, stopping ClusterSingletonManager")
- stop()
-
- case Event(MemberRemoved(m, _), _) =>
+ case Event(event @ MemberRemoved(m, _), _) if event.member.uniqueAddress != cluster.selfUniqueAddress =>
scheduleDelayedMemberRemoved(m)
stay()
+ case Event(event: MemberEvent, _) =>
+ handleMemberEvent(event)
+
case Event(DelayedMemberRemoved(m), YoungerData(previousOldest)) =>
if (!selfExited)
logInfo("Member removed [{}]", m.address)
@@ -682,7 +706,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
else {
// this node was probably quickly restarted with same hostname:port,
// confirm that the old singleton instance has been stopped
- sender() ! HandOverDone
+ if (!preparingForFullShutdown) {
+ sender() ! HandOverDone
+ }
}
stay()
@@ -713,18 +739,13 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
stay()
}
- case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
- logInfo("Self downed, stopping ClusterSingletonManager")
- stop()
-
- case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
- logInfo("Self removed, stopping ClusterSingletonManager")
- stop()
-
- case Event(MemberRemoved(m, _), _) =>
+ case Event(event @ MemberRemoved(m, _), _) if event.member.uniqueAddress != cluster.selfUniqueAddress =>
scheduleDelayedMemberRemoved(m)
stay()
+ case Event(event: MemberEvent, _) if event.member.uniqueAddress == cluster.selfUniqueAddress =>
+ handleMemberEvent(event)
+
case Event(DelayedMemberRemoved(m), BecomingOldestData(previousOldest)) =>
if (!selfExited)
logInfo("Member removed [{}], previous oldest [{}]", m.address, previousOldest.map(_.address).mkString(", "))
@@ -750,25 +771,31 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case Some(senderUniqueAddress) =>
previousOldest.headOption match {
case Some(oldest) =>
- if (oldest == senderUniqueAddress)
- sender() ! HandOverToMe
- else
+ if (oldest == senderUniqueAddress) {
+ if (!preparingForFullShutdown) {
+ sender() ! HandOverToMe
+ }
+ } else
logInfo(
"Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address,
oldest.address)
stay()
case None =>
- sender() ! HandOverToMe
+ if (!preparingForFullShutdown) {
+ sender() ! HandOverToMe
+ }
stay().using(BecomingOldestData(senderUniqueAddress :: previousOldest))
}
}
case Event(HandOverRetry(count), BecomingOldestData(previousOldest)) =>
if (count <= maxHandOverRetries) {
- logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldest.headOption.map(_.address))
- previousOldest.headOption.foreach(node => peer(node.address) ! HandOverToMe)
- startSingleTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval)
+ if (!preparingForFullShutdown) {
+ logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldest.headOption.map(_.address))
+ previousOldest.headOption.foreach(node => peer(node.address) ! HandOverToMe)
+ startSingleTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval)
+ }
stay()
} else if (previousOldest.forall(removed.contains)) {
// can't send HandOverToMe, previousOldest unknown for new node (or restart)
@@ -792,9 +819,12 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
def tryAcquireLease() = {
import context.dispatcher
- pipe(lease.get.acquire(reason => self ! LeaseLost(reason)).map[Any](AcquireLeaseResult).recover {
- case NonFatal(t) => AcquireLeaseFailure(t)
- }).to(self)
+
+ if (!preparingForFullShutdown) {
+ pipe(lease.get.acquire(reason => self ! LeaseLost(reason)).map[Any](AcquireLeaseResult).recover {
+ case NonFatal(t) => AcquireLeaseFailure(t)
+ }).to(self)
+ }
goto(AcquiringLease).using(AcquiringLeaseData(leaseRequestInProgress = true, None))
}
@@ -847,19 +877,29 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
// complete memberExitingProgress when handOverDone
sender() ! Done // reply to ask
stay()
- case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
- logInfo("Self downed, stopping ClusterSingletonManager")
- stop()
+
+ case Event(event: MemberEvent, _) =>
+ handleMemberEvent(event)
+
}
@InternalStableApi
def gotoOldest(): State = {
- logInfo(
- ClusterLogMarker.singletonStarted,
- "Singleton manager starting singleton actor [{}]",
- self.path / singletonName)
- val singleton = context.watch(context.actorOf(singletonProps, singletonName))
- goto(Oldest).using(OldestData(Some(singleton)))
+ if (preparingForFullShutdown) {
+ logInfo(
+ ClusterLogMarker.singletonStarted,
+ "Singleton manager NOT starting singleton actor [{}] as cluster is preparing to shutdown",
+ self.path / singletonName)
+ goto(Oldest).using(OldestData(None))
+ } else {
+ logInfo(
+ ClusterLogMarker.singletonStarted,
+ "Singleton manager starting singleton actor [{}]",
+ self.path / singletonName)
+ val singleton = context.watch(context.actorOf(singletonProps, singletonName))
+ goto(Oldest).using(OldestData(Some(singleton)))
+ }
+
}
def handleOldestChanged(singleton: Option[ActorRef], oldestOption: Option[UniqueAddress]) = {
@@ -876,12 +916,16 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
gotoHandingOver(singleton, None)
case Some(a) =>
// send TakeOver request in case the new oldest doesn't know previous oldest
- peer(a.address) ! TakeOverFromMe
- startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval)
+ if (!preparingForFullShutdown) {
+ peer(a.address) ! TakeOverFromMe
+ startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval)
+ }
goto(WasOldest).using(WasOldestData(singleton, newOldestOption = Some(a)))
case None =>
// new oldest will initiate the hand-over
- startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval)
+ if (!preparingForFullShutdown) {
+ startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval)
+ }
goto(WasOldest).using(WasOldestData(singleton, newOldestOption = None))
}
}
@@ -916,6 +960,10 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
stop()
}
+ // Downed in this case is handled differently so keep this below it
+ case Event(event: MemberEvent, _) =>
+ handleMemberEvent(event)
+
case Event(LeaseLost(reason), OldestData(singleton)) =>
log.warning("Lease has been lost. Reason: {}. Terminating singleton and trying to re-acquire lease", reason)
singleton match {
@@ -939,14 +987,18 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address))
else
log.debug("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address))
- newOldestOption.foreach(node => peer(node.address) ! TakeOverFromMe)
- startSingleTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval)
+
+ if (!preparingForFullShutdown) {
+ newOldestOption.foreach(node => peer(node.address) ! TakeOverFromMe)
+ startSingleTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval)
+ }
stay()
} else
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [$newOldestOption] never occurred")
case Event(HandOverToMe, WasOldestData(singleton, _)) =>
gotoHandingOver(singleton, Some(sender()))
+
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited =>
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
@@ -975,6 +1027,10 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
logInfo("Self downed, stopping")
gotoStopping(s)
}
+
+ case Event(event: MemberEvent, _) =>
+ handleMemberEvent(event)
+
}
def gotoHandingOver(singleton: Option[ActorRef], handOverTo: Option[ActorRef]): State = {
@@ -995,7 +1051,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case Event(HandOverToMe, HandingOverData(_, handOverTo)) if handOverTo.contains(sender()) =>
// retry
- sender() ! HandOverInProgress
+ if (!preparingForFullShutdown) {
+ sender() ! HandOverInProgress
+ }
stay()
case Event(SelfExiting, _) =>
@@ -1003,6 +1061,15 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
// complete memberExitingProgress when handOverDone
sender() ! Done // reply to ask
stay()
+
+ case Event(MemberReadyForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
+ logInfo("Ready for shutdown when handing over. Giving up on handover.")
+ stop()
+
+ case Event(MemberPreparingForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
+ logInfo("Preparing for shutdown when handing over. Giving up on handover.")
+ stop()
+
}
def handOverDone(handOverTo: Option[ActorRef]): State = {
@@ -1033,6 +1100,15 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case Event(Terminated(ref), StoppingData(singleton)) if ref == singleton =>
logInfo(ClusterLogMarker.singletonTerminated, "Singleton actor [{}] was terminated", singleton.path)
stop()
+
+ case Event(MemberReadyForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
+ logInfo("Ready for shutdown when stopping. Not waiting for user actor to shutdown")
+ stop()
+
+ case Event(MemberPreparingForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
+ logInfo("Preparing for shutdown when stopping. Not waiting for user actor to shutdown")
+ stop()
+
}
when(End) {
@@ -1046,7 +1122,14 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
def selfMemberExited(): Unit = {
selfExited = true
- logInfo("Exited [{}]", cluster.selfAddress)
+ logInfo(
+ "Exited [{}].{}",
+ cluster.selfAddress,
+ if (preparingForFullShutdown) " From preparing from shutdown" else "")
+ // handover won't be done so just complete right away
+ if (preparingForFullShutdown) {
+ memberExitingProgress.trySuccess(Done)
+ }
}
whenUnhandled {
@@ -1090,6 +1173,8 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
"Failed to release lease. Singleton may not be able to run on another node until lease timeout occurs")
}
stay()
+ case Event(_: MemberEvent, _) =>
+ stay() // silence
}
onTransition {
diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerPreparingForShutdownSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerPreparingForShutdownSpec.scala
new file mode 100644
index 0000000000..a818406142
--- /dev/null
+++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerPreparingForShutdownSpec.scala
@@ -0,0 +1,166 @@
+/*
+ * Copyright (C) 2009-2020 Lightbend Inc.
+ */
+
+package akka.cluster.singleton
+
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.cluster.Cluster
+import akka.cluster.MemberStatus
+import akka.cluster.MemberStatus.Removed
+import akka.cluster.MultiNodeClusterSpec
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.remote.testkit.STMultiNodeSpec
+import akka.testkit._
+import com.typesafe.config.ConfigFactory
+
+import scala.concurrent.duration._
+
+object ClusterSingletonManagerPreparingForShutdownSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+
+ commonConfig(ConfigFactory.parseString("""
+ akka.loglevel = INFO
+ akka.actor.provider = "cluster"
+ akka.remote.log-remote-lifecycle-events = off
+ akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
+ akka.cluster.testkit.auto-down-unreachable-after = off
+ akka.cluster.leader-actions-interval = 100ms
+ """))
+
+ case object EchoStarted
+
+ /**
+ * The singleton actor
+ */
+ class Echo(testActor: ActorRef) extends Actor with ActorLogging {
+ override def preStart(): Unit = {
+ log.info("Singleton starting on {}", Cluster(context.system).selfUniqueAddress)
+ testActor ! "preStart"
+ }
+ override def postStop(): Unit = {
+ testActor ! "postStop"
+ }
+
+ def receive = {
+ case "stop" =>
+ testActor ! "stop"
+ context.stop(self)
+ case _ =>
+ sender() ! self
+ }
+ }
+}
+
+class ClusterSingletonManagerPreparingForShutdownMultiJvmNode1 extends ClusterSingletonManagerPreparingForShutdownSpec
+class ClusterSingletonManagerPreparingForShutdownMultiJvmNode2 extends ClusterSingletonManagerPreparingForShutdownSpec
+class ClusterSingletonManagerPreparingForShutdownMultiJvmNode3 extends ClusterSingletonManagerPreparingForShutdownSpec
+
+class ClusterSingletonManagerPreparingForShutdownSpec
+ extends MultiNodeSpec(ClusterSingletonManagerPreparingForShutdownSpec)
+ with MultiNodeClusterSpec
+ with STMultiNodeSpec
+ with ImplicitSender {
+ import ClusterSingletonManagerPreparingForShutdownSpec._
+
+ override def initialParticipants = roles.size
+
+ def createSingleton(): ActorRef = {
+ system.actorOf(
+ ClusterSingletonManager.props(
+ singletonProps = Props(classOf[Echo], testActor),
+ terminationMessage = "stop",
+ settings = ClusterSingletonManagerSettings(system)),
+ name = "echo")
+ }
+
+ val echoProxyTerminatedProbe = TestProbe()
+
+ lazy val echoProxy: ActorRef = {
+ echoProxyTerminatedProbe.watch(
+ system.actorOf(
+ ClusterSingletonProxy
+ .props(singletonManagerPath = "/user/echo", settings = ClusterSingletonProxySettings(system)),
+ name = "echoProxy"))
+ }
+
+ "Preparing for shut down ClusterSingletonManager" must {
+
+ "form cluster" in {
+ awaitClusterUp(first, second, third)
+ }
+
+ "not handover when ready for shutdown" in {
+
+ createSingleton()
+ runOn(first) {
+ within(10.seconds) {
+ expectMsg("preStart")
+ echoProxy ! "hello"
+ expectMsgType[ActorRef]
+ expectNoMessage(2.seconds)
+ }
+ }
+ enterBarrier("singleton-active")
+
+ runOn(first) {
+ Cluster(system).prepareForFullClusterShutdown()
+ }
+ awaitAssert({
+ withClue("members: " + Cluster(system).readView.members) {
+ Cluster(system).selfMember.status shouldEqual MemberStatus.ReadyForShutdown
+ }
+ }, 10.seconds)
+ enterBarrier("preparation-complete")
+
+ runOn(first) {
+ Cluster(system).leave(address(first))
+ }
+ awaitAssert(
+ {
+ runOn(second, third) {
+ withClue("members: " + Cluster(system).readView.members) {
+ Cluster(system).readView.members.size shouldEqual 2
+ }
+ }
+ runOn(first) {
+ withClue("self member: " + Cluster(system).selfMember) {
+ Cluster(system).selfMember.status shouldEqual MemberStatus.Removed
+ }
+ }
+ },
+ 8.seconds) // this timeout must be lower than coordinated shutdown timeout otherwise it could pass due to the timeout continuing with the cluster exit
+ // where as this is testing that shutdown happens right away when a cluster is in preparing to shutdown mode
+ enterBarrier("initial-singleton-removed")
+
+ // even tho the handover isn't completed the new oldest node will start it after a timeout
+ // make sure this isn't the case
+ runOn(second) {
+ echoProxy ! "hello"
+ expectNoMessage(5.seconds)
+ }
+
+ enterBarrier("no-singleton-running")
+ }
+
+ "last nodes should shut down" in {
+ runOn(second) {
+ Cluster(system).leave(address(second))
+ Cluster(system).leave(address(third))
+ }
+ awaitAssert({
+ withClue("self member: " + Cluster(system).selfMember) {
+ Cluster(system).selfMember.status shouldEqual Removed
+ }
+ }, 10.seconds)
+ enterBarrier("done")
+ }
+
+ }
+}
diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala
index d0ec98d80c..c38a7d2b39 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala
@@ -150,6 +150,26 @@ object Leave {
*/
final case class Down(address: Address) extends ClusterCommand
+/**
+ * Initiate a full cluster shutdown. This stops:
+ * - New members joining the cluster
+ * - New rebalances in Cluster Sharding
+ * - Singleton handovers
+ *
+ * However, it does not stop the nodes. That is expected to be signalled externally.
+ *
+ * Not for user extension
+ */
+@DoNotInherit sealed trait PrepareForFullClusterShutdown extends ClusterCommand
+
+case object PrepareForFullClusterShutdown extends PrepareForFullClusterShutdown {
+
+ /**
+ * Java API
+ */
+ def prepareForFullClusterShutdown(): PrepareForFullClusterShutdown = this
+}
+
/**
* Akka Typed Cluster API entry point
*/
diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala
index a290faf3b3..f094853293 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala
@@ -14,6 +14,7 @@ import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import akka.cluster.{ ClusterEvent, Member, MemberStatus }
import akka.cluster.ClusterEvent.MemberEvent
+import akka.cluster.typed.PrepareForFullClusterShutdown
import akka.cluster.typed._
/**
@@ -129,6 +130,10 @@ private[akka] object AdapterClusterImpl {
adaptedCluster.joinSeedNodes(addresses)
Behaviors.same
+ case PrepareForFullClusterShutdown =>
+ adaptedCluster.prepareForFullClusterShutdown()
+ Behaviors.same
+
}
}
diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java
index f06a8d40ac..d036fa5098 100644
--- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java
+++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java
@@ -67,6 +67,12 @@ public class BasicClusterExampleTest { // extends JUnitSuite {
// TODO wait for/verify cluster to form
+ // #prepare
+ PrepareForFullClusterShutdown msg =
+ PrepareForFullClusterShutdown.prepareForFullClusterShutdown();
+ cluster2.manager().tell(msg);
+ // #prepare
+
// #cluster-leave
cluster2.manager().tell(Leave.create(cluster2.selfMember().address()));
// #cluster-leave
diff --git a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java
index a927db41e5..0098da699d 100644
--- a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java
+++ b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java
@@ -166,6 +166,14 @@ public final class ClusterMessages {
* WeaklyUp = 6;
*/
WeaklyUp(6),
+ /**
+ * PreparingForShutdown = 7;
+ */
+ PreparingForShutdown(7),
+ /**
+ * ReadyForShutdown = 8;
+ */
+ ReadyForShutdown(8),
;
/**
@@ -196,6 +204,14 @@ public final class ClusterMessages {
* WeaklyUp = 6;
*/
public static final int WeaklyUp_VALUE = 6;
+ /**
+ * PreparingForShutdown = 7;
+ */
+ public static final int PreparingForShutdown_VALUE = 7;
+ /**
+ * ReadyForShutdown = 8;
+ */
+ public static final int ReadyForShutdown_VALUE = 8;
public final int getNumber() {
@@ -225,6 +241,8 @@ public final class ClusterMessages {
case 4: return Down;
case 5: return Removed;
case 6: return WeaklyUp;
+ case 7: return PreparingForShutdown;
+ case 8: return ReadyForShutdown;
default: return null;
}
}
@@ -22446,10 +22464,11 @@ public final class ClusterMessages {
"\002(\r\022\031\n\021allowLocalRoutees\030\003 \002(\010\022\017\n\007useRol" +
"e\030\004 \001(\t\022\020\n\010useRoles\030\005 \003(\t*D\n\022Reachabilit" +
"yStatus\022\r\n\tReachable\020\000\022\017\n\013Unreachable\020\001\022" +
- "\016\n\nTerminated\020\002*b\n\014MemberStatus\022\013\n\007Joini" +
- "ng\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022\010" +
- "\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006B\035\n\031a" +
- "kka.cluster.protobuf.msgH\001"
+ "\016\n\nTerminated\020\002*\222\001\n\014MemberStatus\022\013\n\007Join" +
+ "ing\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022" +
+ "\010\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006\022\030\n\024" +
+ "PreparingForShutdown\020\007\022\024\n\020ReadyForShutdo" +
+ "wn\020\010B\035\n\031akka.cluster.protobuf.msgH\001"
};
descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto
index 65b323429c..acb19aab13 100644
--- a/akka-cluster/src/main/protobuf/ClusterMessages.proto
+++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto
@@ -198,6 +198,8 @@ enum MemberStatus {
Down = 4;
Removed = 5;
WeaklyUp = 6;
+ PreparingForShutdown = 7;
+ ReadyForShutdown = 8;
}
/**
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index adf2cce188..ecb7160833 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -320,6 +320,13 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
clusterCore ! ClusterUserAction.JoinTo(fillLocal(address))
}
+ /**
+ * Change the state of every member in preparation for a full cluster shutdown.
+ */
+ def prepareForFullClusterShutdown(): Unit = {
+ clusterCore ! ClusterUserAction.PrepareForShutdown
+ }
+
private def fillLocal(address: Address): Address = {
// local address might be used if grabbed from actorRef.path.address
if (address.hasLocalScope && address.system == selfAddress.system) selfAddress
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
index 5eb9e7ece5..4225818ad7 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
@@ -61,6 +61,11 @@ private[cluster] object ClusterUserAction {
@SerialVersionUID(1L)
final case class Down(address: Address) extends ClusterMessage
+ /**
+ * Command to mark all nodes as shutting down
+ */
+ @SerialVersionUID(1L)
+ case object PrepareForShutdown extends ClusterMessage
}
/**
@@ -356,6 +361,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
var joinSeedNodesDeadline: Option[Deadline] = None
var leaderActionCounter = 0
var selfDownCounter = 0
+ var preparingForShutdown = false
var exitingTasksInProgress = false
val selfExiting = Promise[Done]()
@@ -555,6 +561,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
case Join(node, roles, appVersion) => joining(node, roles, appVersion)
case ClusterUserAction.Down(address) => downing(address)
case ClusterUserAction.Leave(address) => leaving(address)
+ case ClusterUserAction.PrepareForShutdown => startPrepareForShutdown()
case SendGossipTo(address) => sendGossipTo(address)
case msg: SubscriptionMessage => publisher.forward(msg)
case QuarantinedEvent(ua) => quarantined(UniqueAddress(ua))
@@ -728,83 +735,90 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
* current gossip state, including the new joining member.
*/
def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: Version): Unit = {
- val selfStatus = latestGossip.member(selfUniqueAddress).status
- if (joiningNode.address.protocol != selfAddress.protocol)
- logWarning(
- "Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
- selfAddress.protocol,
- joiningNode.address.protocol)
- else if (joiningNode.address.system != selfAddress.system)
- logWarning(
- "Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
- selfAddress.system,
- joiningNode.address.system)
- else if (removeUnreachableWithMemberStatus.contains(selfStatus))
- logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", joiningNode, selfStatus)
- else {
- val localMembers = latestGossip.members
+ if (!preparingForShutdown) {
+ val selfStatus = latestGossip.member(selfUniqueAddress).status
+ if (joiningNode.address.protocol != selfAddress.protocol)
+ logWarning(
+ "Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
+ selfAddress.protocol,
+ joiningNode.address.protocol)
+ else if (joiningNode.address.system != selfAddress.system)
+ logWarning(
+ "Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
+ selfAddress.system,
+ joiningNode.address.system)
+ else if (removeUnreachableWithMemberStatus.contains(selfStatus))
+ logInfo(
+ "Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.",
+ joiningNode,
+ selfStatus)
+ else {
+ val localMembers = latestGossip.members
+
+ // check by address without uid to make sure that node with same host:port is not allowed
+ // to join until previous node with that host:port has been removed from the cluster
+ localMembers.find(_.address == joiningNode.address) match {
+ case Some(m) if m.uniqueAddress == joiningNode =>
+ // node retried join attempt, probably due to lost Welcome message
+ logInfo("Existing member [{}] is joining again.", m)
+ if (joiningNode != selfUniqueAddress)
+ sender() ! Welcome(selfUniqueAddress, latestGossip)
+ case Some(m) =>
+ // node restarted, same host:port as existing member, but with different uid
+ // safe to down and later remove existing member
+ // new node will retry join
+ logInfo(
+ "New incarnation of existing member [{}] is trying to join. " +
+ "Existing will be removed from the cluster and then new member will be allowed to join.",
+ m)
+ if (m.status != Down) {
+ // we can confirm it as terminated/unreachable immediately
+ val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, m.uniqueAddress)
+ val newOverview = latestGossip.overview.copy(reachability = newReachability)
+ val newGossip = latestGossip.copy(overview = newOverview)
+ updateLatestGossip(newGossip)
+
+ downing(m.address)
+ }
+ case None =>
+ // remove the node from the failure detector
+ failureDetector.remove(joiningNode.address)
+ crossDcFailureDetector.remove(joiningNode.address)
+
+ // add joining node as Joining
+ // add self in case someone else joins before self has joined (Set discards duplicates)
+ val newMembers = localMembers + Member(joiningNode, roles, appVersion) + Member(
+ selfUniqueAddress,
+ cluster.selfRoles,
+ cluster.settings.AppVersion)
+ val newGossip = latestGossip.copy(members = newMembers)
- // check by address without uid to make sure that node with same host:port is not allowed
- // to join until previous node with that host:port has been removed from the cluster
- localMembers.find(_.address == joiningNode.address) match {
- case Some(m) if m.uniqueAddress == joiningNode =>
- // node retried join attempt, probably due to lost Welcome message
- logInfo("Existing member [{}] is joining again.", m)
- if (joiningNode != selfUniqueAddress)
- sender() ! Welcome(selfUniqueAddress, latestGossip)
- case Some(m) =>
- // node restarted, same host:port as existing member, but with different uid
- // safe to down and later remove existing member
- // new node will retry join
- logInfo(
- "New incarnation of existing member [{}] is trying to join. " +
- "Existing will be removed from the cluster and then new member will be allowed to join.",
- m)
- if (m.status != Down) {
- // we can confirm it as terminated/unreachable immediately
- val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, m.uniqueAddress)
- val newOverview = latestGossip.overview.copy(reachability = newReachability)
- val newGossip = latestGossip.copy(overview = newOverview)
updateLatestGossip(newGossip)
- downing(m.address)
- }
- case None =>
- // remove the node from the failure detector
- failureDetector.remove(joiningNode.address)
- crossDcFailureDetector.remove(joiningNode.address)
+ if (joiningNode == selfUniqueAddress) {
+ logInfo(
+ ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining),
+ "Node [{}] is JOINING itself (with roles [{}], version [{}]) and forming new cluster",
+ joiningNode.address,
+ roles.mkString(", "),
+ appVersion)
+ if (localMembers.isEmpty)
+ leaderActions() // important for deterministic oldest when bootstrapping
+ } else {
+ logInfo(
+ ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining),
+ "Node [{}] is JOINING, roles [{}], version [{}]",
+ joiningNode.address,
+ roles.mkString(", "),
+ appVersion)
+ sender() ! Welcome(selfUniqueAddress, latestGossip)
+ }
- // add joining node as Joining
- // add self in case someone else joins before self has joined (Set discards duplicates)
- val newMembers = localMembers + Member(joiningNode, roles, appVersion) + Member(
- selfUniqueAddress,
- cluster.selfRoles,
- cluster.settings.AppVersion)
- val newGossip = latestGossip.copy(members = newMembers)
-
- updateLatestGossip(newGossip)
-
- if (joiningNode == selfUniqueAddress) {
- logInfo(
- ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining),
- "Node [{}] is JOINING itself (with roles [{}], version [{}]) and forming new cluster",
- joiningNode.address,
- roles.mkString(", "),
- appVersion)
- if (localMembers.isEmpty)
- leaderActions() // important for deterministic oldest when bootstrapping
- } else {
- logInfo(
- ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining),
- "Node [{}] is JOINING, roles [{}], version [{}]",
- joiningNode.address,
- roles.mkString(", "),
- appVersion)
- sender() ! Welcome(selfUniqueAddress, latestGossip)
- }
-
- publishMembershipState()
+ publishMembershipState()
+ }
}
+ } else {
+ logInfo("Ignoring join request from [{}] as preparing for shutdown", joiningNode)
}
}
@@ -826,6 +840,27 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
}
}
+ def startPrepareForShutdown(): Unit = {
+ preparingForShutdown = true
+ if (!preparingForShutdown) {
+ val changedMembers = latestGossip.members.collect {
+ case m if MembershipState.allowedToPrepareToShutdown(m.status) =>
+ m.copy(status = PreparingForShutdown)
+ }
+ val newGossip = latestGossip.update(changedMembers)
+ updateLatestGossip(newGossip)
+ changedMembers.foreach { member =>
+ logInfo(
+ ClusterLogMarker.memberChanged(member.uniqueAddress, MemberStatus.PreparingForShutdown),
+ "Preparing for shutdown [{}] as [{}]",
+ member.address,
+ PreparingForShutdown)
+ }
+ publishMembershipState()
+ gossip()
+ }
+ }
+
/**
* State transition to LEAVING.
* The node will eventually be removed by the leader, after hand-off in EXITING, and only after
@@ -834,7 +869,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
def leaving(address: Address): Unit = {
// only try to update if the node is available (in the member ring)
latestGossip.members.find(_.address == address).foreach { existingMember =>
- if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember.status == Up) {
+ if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember.status == Up || existingMember.status == PreparingForShutdown || existingMember.status == ReadyForShutdown) {
// mark node as LEAVING
val newMembers = latestGossip.members - existingMember + existingMember.copy(status = Leaving)
val newGossip = latestGossip.copy(members = newMembers)
@@ -1211,7 +1246,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
} else {
leaderActionCounter += 1
import cluster.settings.{ AllowWeaklyUpMembers, LeaderActionsInterval, WeaklyUpAfter }
- if (AllowWeaklyUpMembers && LeaderActionsInterval * leaderActionCounter >= WeaklyUpAfter)
+ if (AllowWeaklyUpMembers && LeaderActionsInterval * leaderActionCounter >= WeaklyUpAfter && !preparingForShutdown)
moveJoiningToWeaklyUp()
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
@@ -1231,9 +1266,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
isCurrentlyLeader = false
}
cleanupExitingConfirmed()
+ checkForPrepareForShutdown()
shutdownSelfWhenDown()
}
+ def checkForPrepareForShutdown(): Unit = {
+ if (MembershipState.allowedToPrepareToShutdown(latestGossip.member(selfUniqueAddress).status) && latestGossip.members
+ .exists(m => MembershipState.prepareForShutdownStates(m.status))) {
+ logDebug("Detected full cluster shutdown")
+ self ! ClusterUserAction.PrepareForShutdown
+ }
+ }
+
def shutdownSelfWhenDown(): Unit = {
if (latestGossip.member(selfUniqueAddress).status == Down) {
// When all reachable have seen the state this member will shutdown itself when it has
@@ -1290,7 +1334,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val removedOtherDc =
if (latestGossip.isMultiDc) {
latestGossip.members.filter { m =>
- (m.dataCenter != selfDc && removeUnreachableWithMemberStatus(m.status))
+ m.dataCenter != selfDc && removeUnreachableWithMemberStatus(m.status)
}
} else
Set.empty[Member]
@@ -1303,9 +1347,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
var upNumber = 0
{
- case m if m.dataCenter == selfDc && isJoiningToUp(m) =>
+ case m if m.dataCenter == selfDc && isJoiningToUp(m) && !preparingForShutdown =>
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster
+ // don't move members to up when preparing for shutdown
if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan)
@@ -1319,6 +1364,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
case m if m.dataCenter == selfDc && m.status == Leaving =>
// Move LEAVING => EXITING (once we have a convergence on LEAVING)
m.copy(status = Exiting)
+
+ case m if m.dataCenter == selfDc & m.status == PreparingForShutdown =>
+ // Move PreparingForShutdown => ReadyForShutdown (once we have a convergence on PreparingForShutdown)
+ m.copy(status = ReadyForShutdown)
}
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala
index f8461f8dbb..235b9791e0 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala
@@ -318,6 +318,16 @@ object ClusterEvent {
if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member)
}
+ final case class MemberPreparingForShutdown(member: Member) extends MemberEvent {
+ if (member.status != PreparingForShutdown)
+ throw new IllegalArgumentException("Expected PreparingForShutdown status, got: " + member)
+ }
+
+ final case class MemberReadyForShutdown(member: Member) extends MemberEvent {
+ if (member.status != ReadyForShutdown)
+ throw new IllegalArgumentException("Expected ReadyForShutdown status, got: " + member)
+ }
+
/**
* Member status changed to `MemberStatus.Exiting` and will be removed
* when all members have seen the `Exiting` status.
@@ -554,12 +564,14 @@ object ClusterEvent {
newMember
}
val memberEvents = (newMembers ++ changedMembers).unsorted.collect {
- case m if m.status == Joining => MemberJoined(m)
- case m if m.status == WeaklyUp => MemberWeaklyUp(m)
- case m if m.status == Up => MemberUp(m)
- case m if m.status == Leaving => MemberLeft(m)
- case m if m.status == Exiting => MemberExited(m)
- case m if m.status == Down => MemberDowned(m)
+ case m if m.status == Joining => MemberJoined(m)
+ case m if m.status == WeaklyUp => MemberWeaklyUp(m)
+ case m if m.status == Up => MemberUp(m)
+ case m if m.status == Leaving => MemberLeft(m)
+ case m if m.status == Exiting => MemberExited(m)
+ case m if m.status == Down => MemberDowned(m)
+ case m if m.status == PreparingForShutdown => MemberPreparingForShutdown(m)
+ case m if m.status == ReadyForShutdown => MemberReadyForShutdown(m)
// no events for other transitions
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala
index 95a712f99e..e7d346abb7 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Member.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala
@@ -195,6 +195,7 @@ object Member {
/**
* Picks the Member with the highest "priority" MemberStatus.
+ * Where highest priority is furthest along the membership state machine
*/
def highestPriorityOf(m1: Member, m2: Member): Member = {
if (m1.status == m2.status)
@@ -202,19 +203,23 @@ object Member {
if (m1.isOlderThan(m2)) m1 else m2
else
(m1.status, m2.status) match {
- case (Removed, _) => m1
- case (_, Removed) => m2
- case (Down, _) => m1
- case (_, Down) => m2
- case (Exiting, _) => m1
- case (_, Exiting) => m2
- case (Leaving, _) => m1
- case (_, Leaving) => m2
- case (Joining, _) => m2
- case (_, Joining) => m1
- case (WeaklyUp, _) => m2
- case (_, WeaklyUp) => m1
- case (Up, Up) => m1
+ case (Removed, _) => m1
+ case (_, Removed) => m2
+ case (ReadyForShutdown, _) => m1
+ case (_, ReadyForShutdown) => m2
+ case (Down, _) => m1
+ case (_, Down) => m2
+ case (Exiting, _) => m1
+ case (_, Exiting) => m2
+ case (Leaving, _) => m1
+ case (_, Leaving) => m2
+ case (Joining, _) => m2
+ case (_, Joining) => m1
+ case (WeaklyUp, _) => m2
+ case (_, WeaklyUp) => m1
+ case (PreparingForShutdown, _) => m1
+ case (_, PreparingForShutdown) => m2
+ case (Up, Up) => m1
}
}
@@ -235,6 +240,8 @@ object MemberStatus {
@SerialVersionUID(1L) case object Exiting extends MemberStatus
@SerialVersionUID(1L) case object Down extends MemberStatus
@SerialVersionUID(1L) case object Removed extends MemberStatus
+ @SerialVersionUID(1L) case object PreparingForShutdown extends MemberStatus
+ @SerialVersionUID(1L) case object ReadyForShutdown extends MemberStatus
/**
* Java API: retrieve the `Joining` status singleton
@@ -271,6 +278,16 @@ object MemberStatus {
*/
def removed: MemberStatus = Removed
+ /**
+ * Java API: retrieve the `ShuttingDown` status singleton
+ */
+ def shuttingDown: MemberStatus = PreparingForShutdown
+
+ /**
+ * Java API: retrieve the `ShutDown` status singleton
+ */
+ def shutDown: MemberStatus = ReadyForShutdown
+
/**
* INTERNAL API
*/
@@ -278,10 +295,12 @@ object MemberStatus {
Map(
Joining -> Set(WeaklyUp, Up, Leaving, Down, Removed),
WeaklyUp -> Set(Up, Leaving, Down, Removed),
- Up -> Set(Leaving, Down, Removed),
+ Up -> Set(Leaving, Down, Removed, PreparingForShutdown),
Leaving -> Set(Exiting, Down, Removed),
Down -> Set(Removed),
Exiting -> Set(Removed, Down),
+ PreparingForShutdown -> Set(ReadyForShutdown, Removed, Leaving, Down),
+ ReadyForShutdown -> Set(Removed, Leaving, Down),
Removed -> Set.empty[MemberStatus])
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala
index 181d271c16..4ca5b4d9bb 100644
--- a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala
@@ -22,10 +22,13 @@ import akka.util.ccompat._
@ccompatUsedUntil213
@InternalApi private[akka] object MembershipState {
import MemberStatus._
- private val leaderMemberStatus = Set[MemberStatus](Up, Leaving)
- private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving)
+ private val leaderMemberStatus = Set[MemberStatus](Up, Leaving, PreparingForShutdown, ReadyForShutdown)
+ private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving, PreparingForShutdown, ReadyForShutdown)
val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting)
val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting)
+ // If a member hasn't join yet or has already started leaving don't mark it as shutting down
+ val allowedToPrepareToShutdown = Set[MemberStatus](Up)
+ val prepareForShutdownStates = Set[MemberStatus](PreparingForShutdown, ReadyForShutdown)
}
/**
diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala
index df1a3ce372..d58241139e 100644
--- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala
@@ -54,6 +54,7 @@ private[akka] object ClusterMessageSerializer {
val WelcomeManifest = "W"
val LeaveManifest = "L"
val DownManifest = "D"
+ val PrepareForShutdownManifest = "PS"
val InitJoinManifest = "IJ"
val InitJoinAckManifest = "IJA"
val InitJoinNackManifest = "IJN"
@@ -93,6 +94,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
case _: GossipStatus => GossipStatusManifest
case _: GossipEnvelope => GossipEnvelopeManifest
case _: ClusterRouterPool => ClusterRouterPoolManifest
+ case ClusterUserAction.PrepareForShutdown => PrepareForShutdownManifest
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
@@ -372,7 +374,9 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
MemberStatus.Exiting -> cm.MemberStatus.Exiting_VALUE,
MemberStatus.Down -> cm.MemberStatus.Down_VALUE,
MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE,
- MemberStatus.WeaklyUp -> cm.MemberStatus.WeaklyUp_VALUE)
+ MemberStatus.WeaklyUp -> cm.MemberStatus.WeaklyUp_VALUE,
+ MemberStatus.PreparingForShutdown -> cm.MemberStatus.PreparingForShutdown_VALUE,
+ MemberStatus.ReadyForShutdown -> cm.MemberStatus.ReadyForShutdown_VALUE)
private val memberStatusFromInt = memberStatusToInt.map { case (a, b) => (b, a) }
diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterShutdownSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterShutdownSpec.scala
new file mode 100644
index 0000000000..37309a3e4b
--- /dev/null
+++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterShutdownSpec.scala
@@ -0,0 +1,105 @@
+/*
+ * Copyright (C) 2020 Lightbend Inc.
+ */
+
+package akka.cluster
+
+import akka.cluster.MemberStatus.Removed
+import akka.remote.testkit.MultiNodeConfig
+import akka.remote.testkit.MultiNodeSpec
+import akka.util.ccompat._
+import com.typesafe.config.ConfigFactory
+import org.scalatest.concurrent.Eventually
+
+import scala.concurrent.duration._
+
+object ClusterShutdownSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+ val third = role("third")
+ val forth = role("forth")
+
+ commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
+ # important config
+ """).withFallback(MultiNodeClusterSpec.clusterConfig)))
+}
+
+class ClusterShutdownSpecMultiJvmNode1 extends ClusterShutdownSpec
+class ClusterShutdownSpecMultiJvmNode2 extends ClusterShutdownSpec
+class ClusterShutdownSpecMultiJvmNode3 extends ClusterShutdownSpec
+class ClusterShutdownSpecMultiJvmNode4 extends ClusterShutdownSpec
+
+@ccompatUsedUntil213
+abstract class ClusterShutdownSpec
+ extends MultiNodeSpec(ClusterShutdownSpec)
+ with MultiNodeClusterSpec
+ with Eventually {
+
+ import ClusterShutdownSpec._
+
+ "Cluster shutdown" should {
+ "form cluster" in {
+ awaitClusterUp(first, second, third)
+ }
+ enterBarrier("cluster-up")
+ "prepare for shutdown" in {
+ runOn(first) {
+ Cluster(system).prepareForFullClusterShutdown()
+ }
+
+ runOn(first, second, third) {
+ awaitAssert({
+ withClue("members: " + Cluster(system).readView.members) {
+ Cluster(system).selfMember.status shouldEqual MemberStatus.ReadyForShutdown
+ }
+ }, 10.seconds)
+ }
+ }
+ "spread around the cluster" in {
+ runOn(first, second, third) {
+ awaitAssert {
+ Cluster(system).readView.members.unsorted.map(_.status) shouldEqual Set(MemberStatus.ReadyForShutdown)
+ }
+ }
+ enterBarrier("propagation finished")
+ }
+ "not allow new members to join" in {
+ runOn(forth) {
+ cluster.join(address(first))
+ Thread.sleep(3000)
+ // should not be allowed to join the cluster even after some time
+ awaitAssert {
+ cluster.selfMember.status shouldBe MemberStatus.Removed
+ }
+
+ }
+ enterBarrier("not-allowed-to-join")
+ }
+ "be allowed to leave" in {
+ runOn(first) {
+ Cluster(system).leave(address(first))
+ }
+ awaitAssert({
+ withClue("members: " + Cluster(system).readView.members) {
+ runOn(second, third) {
+ Cluster(system).readView.members.size shouldEqual 2
+ }
+ runOn(first) {
+ Cluster(system).selfMember.status shouldEqual Removed
+ }
+ }
+ }, 10.seconds)
+ enterBarrier("first-gone")
+ runOn(second) {
+ Cluster(system).leave(address(second))
+ Cluster(system).leave(address(third))
+ }
+ awaitAssert({
+ withClue("self member: " + Cluster(system).selfMember) {
+ Cluster(system).selfMember.status shouldEqual Removed
+ }
+ }, 10.seconds)
+ enterBarrier("all-gone")
+ }
+ }
+}
diff --git a/akka-docs/src/main/paradox/typed/cluster-membership.md b/akka-docs/src/main/paradox/typed/cluster-membership.md
index dbd53512cc..79b1c514ef 100644
--- a/akka-docs/src/main/paradox/typed/cluster-membership.md
+++ b/akka-docs/src/main/paradox/typed/cluster-membership.md
@@ -38,6 +38,8 @@ merged and converge to the same end result.
* **weakly up** - transient state while network split (only if `akka.cluster.allow-weakly-up-members=on`)
* **up** - normal operating state
+
+ * **preparing for shutdown** / **ready for shutdown** - an optional state that can be moved to before doing a full cluster shut down
* **leaving** / **exiting** - states during graceful removal
@@ -58,6 +60,8 @@ Note that the node might already have been shutdown when this event is published
of at least one other node.
* `ClusterEvent.ReachableMember` - A member is considered as reachable again, after having been unreachable.
All nodes that previously detected it as unreachable has detected it as reachable again.
+ * `ClusterEvent.MemberPreparingForShutdown` - A member is preparing for a full cluster shutdown
+ * `ClusterEvent.MemberReadyForShutdown` - A member is ready for a full cluster shutdown
## Membership Lifecycle
@@ -126,6 +130,27 @@ that are in this state, but you should be aware of that members on the other
side of a network partition have no knowledge about the existence of the
new members. You should for example not count `WeaklyUp` members in quorum decisions.
+## Full cluster shutdown
+
+In some rare cases it may be desirable to do a full cluster shutdown rather than a rolling deploy.
+For example, a protocol change where it is simpler to restart the cluster than to make the protocol change
+backward compatible.
+
+As of Akka `2.6.13` it can be signalled that a full cluster shutdown is about to happen and any expensive actions such as:
+
+* Cluster sharding rebalances
+* Moving of Cluster singletons
+
+Won't happen. That way the shutdown will be as quick as possible and a new version can be started up without delay.
+
+If a cluster isn't to be restarted right away then there is no need to prepare it for shutdown.
+
+To use this feature use `Cluster(system).prepareForFullClusterShutdown()` in classic or @apidoc[PrepareForFullClusterShutdown] in typed.
+
+Wait for all `Up` members to become `ReadyForShutdown` and then all nodes can be shutdown and restarted.
+Members that aren't `Up` yet will remain in the `Joining` or `WeaklyUp` states. Any node that is already leaving
+the cluster i.e. in the `Leaving` or `Exiting` states will continue to leave the cluster via the normal path.
+
## State Diagrams
### State Diagram for the Member States