From c5f16dcee1c1b213c3db01993d6e0fa08e9f4555 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Fri, 12 Feb 2021 09:59:20 +0000 Subject: [PATCH] Full cluster shutdown (#29838) * member information for full cluster shutdown * Cluster singleton: dont hand over when in ready for shutdown * Noop everything in shard coordinator * Set all members to preparing for shutdown * Don't allow a node to join after prepare for shutdown * Review feedbac: singleton listen to all member chagnes * Java API * More better * Keep sharding working while ready for shutdown * Mima * Revert DEBUG logging * gs * Fix api doc link * Missed review feedback * Review feedback --- .../src/multi-jvm/resources/logback-test.xml | 2 +- ...sterShardingPreparingForShutdownSpec.scala | 139 ++++++++++++ .../typed/ClusterShardingStatsSpec.scala | 1 - .../full-cluster-shutdown.excludes | 2 + .../scala/akka/cluster/sharding/Shard.scala | 35 ++- .../cluster/sharding/ShardCoordinator.scala | 27 ++- .../akka/cluster/sharding/ShardRegion.scala | 57 +++-- .../singleton/ClusterSingletonManager.scala | 183 +++++++++++----- ...letonManagerPreparingForShutdownSpec.scala | 166 +++++++++++++++ .../scala/akka/cluster/typed/Cluster.scala | 20 ++ .../typed/internal/AdaptedClusterImpl.scala | 5 + .../typed/BasicClusterExampleTest.java | 6 + .../cluster/protobuf/msg/ClusterMessages.java | 27 ++- .../src/main/protobuf/ClusterMessages.proto | 2 + .../src/main/scala/akka/cluster/Cluster.scala | 7 + .../scala/akka/cluster/ClusterDaemon.scala | 201 +++++++++++------- .../scala/akka/cluster/ClusterEvent.scala | 24 ++- .../src/main/scala/akka/cluster/Member.scala | 47 ++-- .../scala/akka/cluster/MembershipState.scala | 7 +- .../protobuf/ClusterMessageSerializer.scala | 6 +- .../akka/cluster/ClusterShutdownSpec.scala | 105 +++++++++ .../main/paradox/typed/cluster-membership.md | 25 +++ 22 files changed, 911 insertions(+), 183 deletions(-) create mode 100644 akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala create mode 100644 akka-cluster-sharding/src/main/mima-filters/2.6.12.backwards.excludes/full-cluster-shutdown.excludes create mode 100644 akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerPreparingForShutdownSpec.scala create mode 100644 akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterShutdownSpec.scala diff --git a/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml b/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml index 51541ba6f6..e36652a969 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml +++ b/akka-cluster-sharding-typed/src/multi-jvm/resources/logback-test.xml @@ -8,7 +8,7 @@ %date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n - + diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala new file mode 100644 index 0000000000..31e1e3c410 --- /dev/null +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingPreparingForShutdownSpec.scala @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding.typed + +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.util.ccompat._ +import akka.actor.typed.scaladsl.Behaviors +import akka.cluster.MemberStatus +import akka.cluster.MemberStatus.Removed +import akka.cluster.sharding.typed.ClusterShardingPreparingForShutdownSpec.Pinger.Command +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.cluster.typed.Leave +import akka.cluster.typed.MultiNodeTypedClusterSpec +import akka.cluster.typed.PrepareForFullClusterShutdown +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.serialization.jackson.CborSerializable +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object ClusterShardingPreparingForShutdownSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = DEBUG + akka.actor.provider = "cluster" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning + akka.cluster.testkit.auto-down-unreachable-after = off + akka.cluster.leader-actions-interval = 100ms + """)) + + object Pinger { + sealed trait Command extends CborSerializable + case class Ping(id: Int, ref: ActorRef[Pong]) extends Command + case class Pong(id: Int) extends CborSerializable + + def apply(): Behavior[Command] = Behaviors.setup { _ => + Behaviors.receiveMessage[Command] { + case Ping(id: Int, ref) => + ref ! Pong(id) + Behaviors.same + } + } + + } + + val typeKey = EntityTypeKey[Command]("ping") +} + +class ClusterShardingPreparingForShutdownMultiJvmNode1 extends ClusterShardingPreparingForShutdownSpec +class ClusterShardingPreparingForShutdownMultiJvmNode2 extends ClusterShardingPreparingForShutdownSpec +class ClusterShardingPreparingForShutdownMultiJvmNode3 extends ClusterShardingPreparingForShutdownSpec + +@ccompatUsedUntil213 +class ClusterShardingPreparingForShutdownSpec + extends MultiNodeSpec(ClusterShardingPreparingForShutdownSpec) + with MultiNodeTypedClusterSpec { + import ClusterShardingPreparingForShutdownSpec._ + import ClusterShardingPreparingForShutdownSpec.Pinger._ + + override def initialParticipants = roles.size + + private val sharding = ClusterSharding(typedSystem) + + "Preparing for shut down ClusterSharding" must { + + "form cluster" in { + formCluster(first, second, third) + } + + "not rebalance but should still work preparing for shutdown" in { + + val shardRegion: ActorRef[ShardingEnvelope[Command]] = + sharding.init(Entity(typeKey)(_ => Pinger())) + + val probe = TestProbe[Pong]() + shardRegion ! ShardingEnvelope("id1", Pinger.Ping(1, probe.ref)) + probe.expectMessage(Pong(1)) + + runOn(second) { + cluster.manager ! PrepareForFullClusterShutdown + + } + awaitAssert({ + withClue("members: " + cluster.state.members) { + cluster.selfMember.status shouldEqual MemberStatus.ReadyForShutdown + cluster.state.members.unsorted.map(_.status) shouldEqual Set(MemberStatus.ReadyForShutdown) + } + }, 10.seconds) + enterBarrier("preparation-complete") + + shardRegion ! ShardingEnvelope("id2", Pinger.Ping(2, probe.ref)) + probe.expectMessage(Pong(2)) + + runOn(second) { + cluster.manager ! Leave(address(second)) + } + awaitAssert({ + runOn(first, third) { + withClue("members: " + cluster.state.members) { + cluster.state.members.size shouldEqual 2 + } + } + runOn(second) { + withClue("self member: " + cluster.selfMember) { + cluster.selfMember.status shouldEqual MemberStatus.Removed + } + } + }, 5.seconds) // keep this lower than coordinated shutdown timeout + + runOn(first, third) { + shardRegion ! ShardingEnvelope("id3", Pinger.Ping(3, probe.ref)) + probe.expectMessage(Pong(3)) + } + enterBarrier("new-shards-verified") + + runOn(third) { + cluster.manager ! Leave(address(first)) + cluster.manager ! Leave(address(third)) + } + awaitAssert({ + withClue("self member: " + cluster.selfMember) { + cluster.selfMember.status shouldEqual Removed + } + }, 15.seconds) + enterBarrier("done") + } + } +} diff --git a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala index 3366559432..7931811acd 100644 --- a/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala +++ b/akka-cluster-sharding-typed/src/multi-jvm/scala/akka/cluster/sharding/typed/ClusterShardingStatsSpec.scala @@ -63,7 +63,6 @@ abstract class ClusterShardingStatsSpec import Pinger._ private val typeKey = EntityTypeKey[Command]("ping") - private val sharding = ClusterSharding(typedSystem) private val settings = ClusterShardingSettings(typedSystem) private val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all diff --git a/akka-cluster-sharding/src/main/mima-filters/2.6.12.backwards.excludes/full-cluster-shutdown.excludes b/akka-cluster-sharding/src/main/mima-filters/2.6.12.backwards.excludes/full-cluster-shutdown.excludes new file mode 100644 index 0000000000..6a743b2f22 --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.6.12.backwards.excludes/full-cluster-shutdown.excludes @@ -0,0 +1,2 @@ +# private[akka] class +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceTask") diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index a0d71c9399..5dc9af7d21 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -20,6 +20,10 @@ import akka.actor.Terminated import akka.actor.Timers import akka.annotation.InternalStableApi import akka.cluster.Cluster +import akka.cluster.ClusterEvent.InitialStateAsEvents +import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.ClusterEvent.MemberPreparingForShutdown +import akka.cluster.ClusterEvent.MemberReadyForShutdown import akka.cluster.sharding.internal.RememberEntitiesShardStore import akka.cluster.sharding.internal.RememberEntitiesShardStore.GetEntities import akka.cluster.sharding.internal.RememberEntitiesProvider @@ -453,6 +457,7 @@ private[akka] class Shard( private val messageBuffers = new MessageBufferMap[EntityId] private var handOffStopper: Option[ActorRef] = None + private var preparingForShutdown = false import context.dispatcher private val passivateIdleTask = if (settings.shouldPassivateIdleEntities) { @@ -479,6 +484,11 @@ private[akka] class Shard( } override def preStart(): Unit = { + Cluster(context.system).subscribe( + self, + InitialStateAsEvents, + classOf[MemberPreparingForShutdown], + classOf[MemberReadyForShutdown]) acquireLeaseIfNeeded() } @@ -509,6 +519,8 @@ private[akka] class Shard( tryGetLease(lease.get) case ll: LeaseLost => receiveLeaseLost(ll) + case me: MemberEvent => + receiveMemberEvent(me) case msg => if (verboseDebug) log.debug( @@ -519,6 +531,15 @@ private[akka] class Shard( stash() } + private def receiveMemberEvent(event: MemberEvent): Unit = event match { + case _: MemberReadyForShutdown | _: MemberPreparingForShutdown => + if (!preparingForShutdown) { + log.info("{}: Preparing for shutdown", typeName) + preparingForShutdown = true + } + case _ => + } + private def tryGetLease(l: Lease): Unit = { log.info("{}: Acquiring lease {}", typeName, l.settings) pipe(l.acquire(reason => self ! LeaseLost(reason)).map(r => LeaseAcquireResult(r, None)).recover { @@ -548,6 +569,8 @@ private[akka] class Shard( onEntitiesRemembered(entityIds) case RememberEntityTimeout(GetEntities) => loadingEntityIdsFailed() + case me: MemberEvent => + receiveMemberEvent(me) case msg => if (verboseDebug) log.debug( @@ -590,6 +613,7 @@ private[akka] class Shard( // when not remembering entities, we stay in this state all the time def idle: Receive = { case Terminated(ref) => receiveTerminated(ref) + case me: MemberEvent => receiveMemberEvent(me) case EntityTerminated(ref) => entityTerminated(ref) case msg: CoordinatorMessage => receiveCoordinatorMessage(msg) case msg: RememberEntityCommand => receiveRememberEntityCommand(msg) @@ -659,6 +683,7 @@ private[akka] class Shard( throw new RuntimeException( s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender())) + case me: MemberEvent => receiveMemberEvent(me) case Terminated(ref) => receiveTerminated(ref) case EntityTerminated(ref) => entityTerminated(ref) case _: CoordinatorMessage => stash() @@ -814,7 +839,8 @@ private[akka] class Shard( } private def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match { - case HandOff(`shardId`) => handOff(sender()) + case HandOff(`shardId`) => + handOff(sender()) case HandOff(shard) => log.warning("{}: Shard [{}] can not hand off for another Shard [{}]", typeName, shardId, shard) case _ => unhandled(msg) @@ -839,7 +865,12 @@ private[akka] class Shard( // does conversion so only do once val activeEntities = entities.activeEntities() - if (activeEntities.nonEmpty) { + if (preparingForShutdown) { + log.info("{}: HandOff shard [{}] while preparing for shutdown. Stopping right away.", typeName, shardId) + activeEntities.foreach { _ ! handOffStopMessage } + replyTo ! ShardStopped(shardId) + context.stop(self) + } else if (activeEntities.nonEmpty && !preparingForShutdown) { val entityHandOffTimeout = (settings.tuningParameters.handOffTimeout - 5.seconds).max(1.seconds) log.debug( "{}: Starting HandOffStopper for shard [{}] to terminate [{}] entities.", diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index e7b25db59e..c53c78ded0 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -639,7 +639,8 @@ object ShardCoordinator { abstract class ShardCoordinator( settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) - extends Actor { + extends Actor + with Timers { import ShardCoordinator._ import ShardCoordinator.Internal._ @@ -661,6 +662,7 @@ abstract class ShardCoordinator( var allRegionsRegistered = false var state = State.empty.withRememberEntities(settings.rememberEntities) + var preparingForShutdown = false // rebalanceInProgress for the ShardId keys, pending GetShardHome requests by the ActorRef values var rebalanceInProgress = Map.empty[ShardId, Set[ActorRef]] var rebalanceWorkers: Set[ActorRef] = Set.empty @@ -672,14 +674,17 @@ abstract class ShardCoordinator( import context.dispatcher - val rebalanceTask = - context.system.scheduler.scheduleWithFixedDelay(rebalanceInterval, rebalanceInterval, self, RebalanceTick) - - cluster.subscribe(self, initialStateMode = InitialStateAsEvents, ClusterShuttingDown.getClass) + cluster.subscribe( + self, + initialStateMode = InitialStateAsEvents, + ClusterShuttingDown.getClass, + classOf[MemberReadyForShutdown], + classOf[MemberPreparingForShutdown]) protected def typeName: String override def preStart(): Unit = { + timers.startTimerWithFixedDelay(RebalanceTick, RebalanceTick, rebalanceInterval) allocationStrategy match { case strategy: StartableAllocationStrategy => strategy.start() @@ -691,7 +696,6 @@ abstract class ShardCoordinator( override def postStop(): Unit = { super.postStop() - rebalanceTask.cancel() cluster.unsubscribe(self) } @@ -786,7 +790,7 @@ abstract class ShardCoordinator( } case RebalanceTick => - if (state.regions.nonEmpty) { + if (state.regions.nonEmpty && !preparingForShutdown) { val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress.keySet) shardsFuture.value match { case Some(Success(shards)) => @@ -896,6 +900,15 @@ abstract class ShardCoordinator( // it will soon be stopped when singleton is stopped context.become(shuttingDown) + case _: MemberPreparingForShutdown | _: MemberReadyForShutdown => + if (!preparingForShutdown) { + log.info( + "{}: Shard coordinator detected prepare for full cluster shutdown. No new rebalances will take place.", + typeName) + timers.cancel(RebalanceTick) + preparingForShutdown = true + } + case ShardRegion.GetCurrentRegions => val reply = ShardRegion.CurrentRegions(state.regions.keySet.map { ref => if (ref.path.address.host.isEmpty) cluster.selfAddress diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index e948cc7187..f311786dbd 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -626,6 +626,7 @@ private[akka] class ShardRegion( var startingShards = Set.empty[ShardId] var handingOff = Set.empty[ActorRef] var gracefulShutdownInProgress = false + var preparingForShutdown = false import context.dispatcher var retryCount = 0 @@ -759,6 +760,12 @@ private[akka] class ShardRegion( context.stop(self) } + case _: MemberReadyForShutdown | _: MemberPreparingForShutdown => + if (!preparingForShutdown) { + log.info("{}. preparing for shutdown", typeName) + } + preparingForShutdown = true + case _: MemberEvent => // these are expected, no need to warn about them case _ => unhandled(evt) @@ -819,14 +826,18 @@ private[akka] class ShardRegion( case BeginHandOff(shard) => log.debug("{}: BeginHandOff shard [{}]", typeName, shard) - if (regionByShard.contains(shard)) { - val regionRef = regionByShard(shard) - val updatedShards = regions(regionRef) - shard - if (updatedShards.isEmpty) regions -= regionRef - else regions = regions.updated(regionRef, updatedShards) - regionByShard -= shard + if (!preparingForShutdown) { + if (regionByShard.contains(shard)) { + val regionRef = regionByShard(shard) + val updatedShards = regions(regionRef) - shard + if (updatedShards.isEmpty) regions -= regionRef + else regions = regions.updated(regionRef, updatedShards) + regionByShard -= shard + } + sender() ! BeginHandOffAck(shard) + } else { + log.debug("{}: Ignoring begin handoff as preparing to shutdown", typeName) } - sender() ! BeginHandOffAck(shard) case msg @ HandOff(shard) => log.debug("{}: HandOff shard [{}]", typeName, shard) @@ -885,20 +896,28 @@ private[akka] class ShardRegion( } case GracefulShutdown => - log.debug("{}: Starting graceful shutdown of region and all its shards", typeName) + if (preparingForShutdown) { + log.debug( + "{}: Skipping graceful shutdown of region and all its shards as cluster is preparing for shutdown", + typeName) + gracefulShutdownProgress.trySuccess(Done) + context.stop(self) + } else { + log.debug("{}: Starting graceful shutdown of region and all its shards", typeName) - val coordShutdown = CoordinatedShutdown(context.system) - if (coordShutdown.getShutdownReason().isPresent) { - // use a shorter timeout than the coordinated shutdown phase to be able to log better reason for the timeout - val timeout = coordShutdown.timeout(CoordinatedShutdown.PhaseClusterShardingShutdownRegion) - 1.second - if (timeout > Duration.Zero) { - timers.startSingleTimer(GracefulShutdownTimeout, GracefulShutdownTimeout, timeout) + val coordShutdown = CoordinatedShutdown(context.system) + if (coordShutdown.getShutdownReason().isPresent) { + // use a shorter timeout than the coordinated shutdown phase to be able to log better reason for the timeout + val timeout = coordShutdown.timeout(CoordinatedShutdown.PhaseClusterShardingShutdownRegion) - 1.second + if (timeout > Duration.Zero) { + timers.startSingleTimer(GracefulShutdownTimeout, GracefulShutdownTimeout, timeout) + } } - } - gracefulShutdownInProgress = true - sendGracefulShutdownToCoordinatorIfInProgress() - tryCompleteGracefulShutdownIfInProgress() + gracefulShutdownInProgress = true + sendGracefulShutdownToCoordinatorIfInProgress() + tryCompleteGracefulShutdownIfInProgress() + } case GracefulShutdownTimeout => log.warning( @@ -1258,9 +1277,7 @@ private[akka] class ShardRegion( .orElse(entityProps match { case Some(props) if !shardsByRef.values.exists(_ == id) => log.debug(ShardingLogMarker.shardStarted(typeName, id), "{}: Starting shard [{}] in region", typeName, id) - val name = URLEncoder.encode(id, "utf-8") - val shard = context.watch( context.actorOf( Shard diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala index 477ea0a4e8..e2167b5ef9 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/singleton/ClusterSingletonManager.scala @@ -9,9 +9,7 @@ import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration._ import scala.util.control.NonFatal - import com.typesafe.config.Config - import akka.AkkaException import akka.Done import akka.actor.Actor @@ -526,6 +524,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se var oldestChangedBuffer: ActorRef = _ // Previous GetNext request delivered event and new GetNext is to be sent var oldestChangedReceived = true + var preparingForFullShutdown = false var selfExited = false @@ -583,7 +582,13 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se require(!cluster.isTerminated, "Cluster node must not be terminated") // subscribe to cluster changes, re-subscribe when restart - cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved], classOf[MemberDowned]) + cluster.subscribe( + self, + ClusterEvent.InitialStateAsEvents, + classOf[MemberRemoved], + classOf[MemberDowned], + classOf[MemberPreparingForShutdown], + classOf[MemberReadyForShutdown]) startTimerWithFixedDelay(CleanupTimer, Cleanup, 1.minute) @@ -630,6 +635,28 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se case Event(HandOverToMe, _) => // nothing to hand over in start stay() + + case Event(event: MemberEvent, _) => + handleMemberEvent(event) + } + + def handleMemberEvent(event: MemberEvent): State = { + event match { + case _: MemberRemoved if event.member.uniqueAddress == cluster.selfUniqueAddress => + logInfo("Self removed, stopping ClusterSingletonManager") + stop() + case _: MemberDowned if event.member.uniqueAddress == cluster.selfUniqueAddress => + logInfo("Self downed, stopping ClusterSingletonManager") + stop() + case _: MemberReadyForShutdown | _: MemberPreparingForShutdown => + if (!preparingForFullShutdown) { + logInfo("Preparing for shut down, disabling expensive actions") + preparingForFullShutdown = true + } + stay() + case _ => + stay() + } } when(Younger) { @@ -640,7 +667,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se if (previousOldest.forall(removed.contains)) tryGotoOldest() else { - peer(previousOldest.head.address) ! HandOverToMe + if (!preparingForFullShutdown) { + peer(previousOldest.head.address) ! HandOverToMe + } goto(BecomingOldest).using(BecomingOldestData(previousOldest)) } } else { @@ -656,18 +685,13 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se stay().using(YoungerData(newPreviousOldest)) } - case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => - logInfo("Self downed, stopping ClusterSingletonManager") - stop() - - case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress => - logInfo("Self removed, stopping ClusterSingletonManager") - stop() - - case Event(MemberRemoved(m, _), _) => + case Event(event @ MemberRemoved(m, _), _) if event.member.uniqueAddress != cluster.selfUniqueAddress => scheduleDelayedMemberRemoved(m) stay() + case Event(event: MemberEvent, _) => + handleMemberEvent(event) + case Event(DelayedMemberRemoved(m), YoungerData(previousOldest)) => if (!selfExited) logInfo("Member removed [{}]", m.address) @@ -682,7 +706,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se else { // this node was probably quickly restarted with same hostname:port, // confirm that the old singleton instance has been stopped - sender() ! HandOverDone + if (!preparingForFullShutdown) { + sender() ! HandOverDone + } } stay() @@ -713,18 +739,13 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se stay() } - case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => - logInfo("Self downed, stopping ClusterSingletonManager") - stop() - - case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress => - logInfo("Self removed, stopping ClusterSingletonManager") - stop() - - case Event(MemberRemoved(m, _), _) => + case Event(event @ MemberRemoved(m, _), _) if event.member.uniqueAddress != cluster.selfUniqueAddress => scheduleDelayedMemberRemoved(m) stay() + case Event(event: MemberEvent, _) if event.member.uniqueAddress == cluster.selfUniqueAddress => + handleMemberEvent(event) + case Event(DelayedMemberRemoved(m), BecomingOldestData(previousOldest)) => if (!selfExited) logInfo("Member removed [{}], previous oldest [{}]", m.address, previousOldest.map(_.address).mkString(", ")) @@ -750,25 +771,31 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se case Some(senderUniqueAddress) => previousOldest.headOption match { case Some(oldest) => - if (oldest == senderUniqueAddress) - sender() ! HandOverToMe - else + if (oldest == senderUniqueAddress) { + if (!preparingForFullShutdown) { + sender() ! HandOverToMe + } + } else logInfo( "Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]", sender().path.address, oldest.address) stay() case None => - sender() ! HandOverToMe + if (!preparingForFullShutdown) { + sender() ! HandOverToMe + } stay().using(BecomingOldestData(senderUniqueAddress :: previousOldest)) } } case Event(HandOverRetry(count), BecomingOldestData(previousOldest)) => if (count <= maxHandOverRetries) { - logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldest.headOption.map(_.address)) - previousOldest.headOption.foreach(node => peer(node.address) ! HandOverToMe) - startSingleTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval) + if (!preparingForFullShutdown) { + logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldest.headOption.map(_.address)) + previousOldest.headOption.foreach(node => peer(node.address) ! HandOverToMe) + startSingleTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval) + } stay() } else if (previousOldest.forall(removed.contains)) { // can't send HandOverToMe, previousOldest unknown for new node (or restart) @@ -792,9 +819,12 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se def tryAcquireLease() = { import context.dispatcher - pipe(lease.get.acquire(reason => self ! LeaseLost(reason)).map[Any](AcquireLeaseResult).recover { - case NonFatal(t) => AcquireLeaseFailure(t) - }).to(self) + + if (!preparingForFullShutdown) { + pipe(lease.get.acquire(reason => self ! LeaseLost(reason)).map[Any](AcquireLeaseResult).recover { + case NonFatal(t) => AcquireLeaseFailure(t) + }).to(self) + } goto(AcquiringLease).using(AcquiringLeaseData(leaseRequestInProgress = true, None)) } @@ -847,19 +877,29 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se // complete memberExitingProgress when handOverDone sender() ! Done // reply to ask stay() - case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => - logInfo("Self downed, stopping ClusterSingletonManager") - stop() + + case Event(event: MemberEvent, _) => + handleMemberEvent(event) + } @InternalStableApi def gotoOldest(): State = { - logInfo( - ClusterLogMarker.singletonStarted, - "Singleton manager starting singleton actor [{}]", - self.path / singletonName) - val singleton = context.watch(context.actorOf(singletonProps, singletonName)) - goto(Oldest).using(OldestData(Some(singleton))) + if (preparingForFullShutdown) { + logInfo( + ClusterLogMarker.singletonStarted, + "Singleton manager NOT starting singleton actor [{}] as cluster is preparing to shutdown", + self.path / singletonName) + goto(Oldest).using(OldestData(None)) + } else { + logInfo( + ClusterLogMarker.singletonStarted, + "Singleton manager starting singleton actor [{}]", + self.path / singletonName) + val singleton = context.watch(context.actorOf(singletonProps, singletonName)) + goto(Oldest).using(OldestData(Some(singleton))) + } + } def handleOldestChanged(singleton: Option[ActorRef], oldestOption: Option[UniqueAddress]) = { @@ -876,12 +916,16 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se gotoHandingOver(singleton, None) case Some(a) => // send TakeOver request in case the new oldest doesn't know previous oldest - peer(a.address) ! TakeOverFromMe - startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval) + if (!preparingForFullShutdown) { + peer(a.address) ! TakeOverFromMe + startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval) + } goto(WasOldest).using(WasOldestData(singleton, newOldestOption = Some(a))) case None => // new oldest will initiate the hand-over - startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval) + if (!preparingForFullShutdown) { + startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval) + } goto(WasOldest).using(WasOldestData(singleton, newOldestOption = None)) } } @@ -916,6 +960,10 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se stop() } + // Downed in this case is handled differently so keep this below it + case Event(event: MemberEvent, _) => + handleMemberEvent(event) + case Event(LeaseLost(reason), OldestData(singleton)) => log.warning("Lease has been lost. Reason: {}. Terminating singleton and trying to re-acquire lease", reason) singleton match { @@ -939,14 +987,18 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) else log.debug("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) - newOldestOption.foreach(node => peer(node.address) ! TakeOverFromMe) - startSingleTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval) + + if (!preparingForFullShutdown) { + newOldestOption.foreach(node => peer(node.address) ! TakeOverFromMe) + startSingleTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval) + } stay() } else throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [$newOldestOption] never occurred") case Event(HandOverToMe, WasOldestData(singleton, _)) => gotoHandingOver(singleton, Some(sender())) + case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited => logInfo("Self removed, stopping ClusterSingletonManager") stop() @@ -975,6 +1027,10 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se logInfo("Self downed, stopping") gotoStopping(s) } + + case Event(event: MemberEvent, _) => + handleMemberEvent(event) + } def gotoHandingOver(singleton: Option[ActorRef], handOverTo: Option[ActorRef]): State = { @@ -995,7 +1051,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se case Event(HandOverToMe, HandingOverData(_, handOverTo)) if handOverTo.contains(sender()) => // retry - sender() ! HandOverInProgress + if (!preparingForFullShutdown) { + sender() ! HandOverInProgress + } stay() case Event(SelfExiting, _) => @@ -1003,6 +1061,15 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se // complete memberExitingProgress when handOverDone sender() ! Done // reply to ask stay() + + case Event(MemberReadyForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => + logInfo("Ready for shutdown when handing over. Giving up on handover.") + stop() + + case Event(MemberPreparingForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => + logInfo("Preparing for shutdown when handing over. Giving up on handover.") + stop() + } def handOverDone(handOverTo: Option[ActorRef]): State = { @@ -1033,6 +1100,15 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se case Event(Terminated(ref), StoppingData(singleton)) if ref == singleton => logInfo(ClusterLogMarker.singletonTerminated, "Singleton actor [{}] was terminated", singleton.path) stop() + + case Event(MemberReadyForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => + logInfo("Ready for shutdown when stopping. Not waiting for user actor to shutdown") + stop() + + case Event(MemberPreparingForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => + logInfo("Preparing for shutdown when stopping. Not waiting for user actor to shutdown") + stop() + } when(End) { @@ -1046,7 +1122,14 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se def selfMemberExited(): Unit = { selfExited = true - logInfo("Exited [{}]", cluster.selfAddress) + logInfo( + "Exited [{}].{}", + cluster.selfAddress, + if (preparingForFullShutdown) " From preparing from shutdown" else "") + // handover won't be done so just complete right away + if (preparingForFullShutdown) { + memberExitingProgress.trySuccess(Done) + } } whenUnhandled { @@ -1090,6 +1173,8 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se "Failed to release lease. Singleton may not be able to run on another node until lease timeout occurs") } stay() + case Event(_: MemberEvent, _) => + stay() // silence } onTransition { diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerPreparingForShutdownSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerPreparingForShutdownSpec.scala new file mode 100644 index 0000000000..a818406142 --- /dev/null +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/singleton/ClusterSingletonManagerPreparingForShutdownSpec.scala @@ -0,0 +1,166 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.cluster.singleton + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.cluster.MemberStatus.Removed +import akka.cluster.MultiNodeClusterSpec +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object ClusterSingletonManagerPreparingForShutdownSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning + akka.cluster.testkit.auto-down-unreachable-after = off + akka.cluster.leader-actions-interval = 100ms + """)) + + case object EchoStarted + + /** + * The singleton actor + */ + class Echo(testActor: ActorRef) extends Actor with ActorLogging { + override def preStart(): Unit = { + log.info("Singleton starting on {}", Cluster(context.system).selfUniqueAddress) + testActor ! "preStart" + } + override def postStop(): Unit = { + testActor ! "postStop" + } + + def receive = { + case "stop" => + testActor ! "stop" + context.stop(self) + case _ => + sender() ! self + } + } +} + +class ClusterSingletonManagerPreparingForShutdownMultiJvmNode1 extends ClusterSingletonManagerPreparingForShutdownSpec +class ClusterSingletonManagerPreparingForShutdownMultiJvmNode2 extends ClusterSingletonManagerPreparingForShutdownSpec +class ClusterSingletonManagerPreparingForShutdownMultiJvmNode3 extends ClusterSingletonManagerPreparingForShutdownSpec + +class ClusterSingletonManagerPreparingForShutdownSpec + extends MultiNodeSpec(ClusterSingletonManagerPreparingForShutdownSpec) + with MultiNodeClusterSpec + with STMultiNodeSpec + with ImplicitSender { + import ClusterSingletonManagerPreparingForShutdownSpec._ + + override def initialParticipants = roles.size + + def createSingleton(): ActorRef = { + system.actorOf( + ClusterSingletonManager.props( + singletonProps = Props(classOf[Echo], testActor), + terminationMessage = "stop", + settings = ClusterSingletonManagerSettings(system)), + name = "echo") + } + + val echoProxyTerminatedProbe = TestProbe() + + lazy val echoProxy: ActorRef = { + echoProxyTerminatedProbe.watch( + system.actorOf( + ClusterSingletonProxy + .props(singletonManagerPath = "/user/echo", settings = ClusterSingletonProxySettings(system)), + name = "echoProxy")) + } + + "Preparing for shut down ClusterSingletonManager" must { + + "form cluster" in { + awaitClusterUp(first, second, third) + } + + "not handover when ready for shutdown" in { + + createSingleton() + runOn(first) { + within(10.seconds) { + expectMsg("preStart") + echoProxy ! "hello" + expectMsgType[ActorRef] + expectNoMessage(2.seconds) + } + } + enterBarrier("singleton-active") + + runOn(first) { + Cluster(system).prepareForFullClusterShutdown() + } + awaitAssert({ + withClue("members: " + Cluster(system).readView.members) { + Cluster(system).selfMember.status shouldEqual MemberStatus.ReadyForShutdown + } + }, 10.seconds) + enterBarrier("preparation-complete") + + runOn(first) { + Cluster(system).leave(address(first)) + } + awaitAssert( + { + runOn(second, third) { + withClue("members: " + Cluster(system).readView.members) { + Cluster(system).readView.members.size shouldEqual 2 + } + } + runOn(first) { + withClue("self member: " + Cluster(system).selfMember) { + Cluster(system).selfMember.status shouldEqual MemberStatus.Removed + } + } + }, + 8.seconds) // this timeout must be lower than coordinated shutdown timeout otherwise it could pass due to the timeout continuing with the cluster exit + // where as this is testing that shutdown happens right away when a cluster is in preparing to shutdown mode + enterBarrier("initial-singleton-removed") + + // even tho the handover isn't completed the new oldest node will start it after a timeout + // make sure this isn't the case + runOn(second) { + echoProxy ! "hello" + expectNoMessage(5.seconds) + } + + enterBarrier("no-singleton-running") + } + + "last nodes should shut down" in { + runOn(second) { + Cluster(system).leave(address(second)) + Cluster(system).leave(address(third)) + } + awaitAssert({ + withClue("self member: " + Cluster(system).selfMember) { + Cluster(system).selfMember.status shouldEqual Removed + } + }, 10.seconds) + enterBarrier("done") + } + + } +} diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala index d0ec98d80c..c38a7d2b39 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/Cluster.scala @@ -150,6 +150,26 @@ object Leave { */ final case class Down(address: Address) extends ClusterCommand +/** + * Initiate a full cluster shutdown. This stops: + * - New members joining the cluster + * - New rebalances in Cluster Sharding + * - Singleton handovers + * + * However, it does not stop the nodes. That is expected to be signalled externally. + * + * Not for user extension + */ +@DoNotInherit sealed trait PrepareForFullClusterShutdown extends ClusterCommand + +case object PrepareForFullClusterShutdown extends PrepareForFullClusterShutdown { + + /** + * Java API + */ + def prepareForFullClusterShutdown(): PrepareForFullClusterShutdown = this +} + /** * Akka Typed Cluster API entry point */ diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala index a290faf3b3..f094853293 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AdaptedClusterImpl.scala @@ -14,6 +14,7 @@ import akka.actor.typed.scaladsl.adapter._ import akka.annotation.InternalApi import akka.cluster.{ ClusterEvent, Member, MemberStatus } import akka.cluster.ClusterEvent.MemberEvent +import akka.cluster.typed.PrepareForFullClusterShutdown import akka.cluster.typed._ /** @@ -129,6 +130,10 @@ private[akka] object AdapterClusterImpl { adaptedCluster.joinSeedNodes(addresses) Behaviors.same + case PrepareForFullClusterShutdown => + adaptedCluster.prepareForFullClusterShutdown() + Behaviors.same + } } diff --git a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java index f06a8d40ac..d036fa5098 100644 --- a/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java +++ b/akka-cluster-typed/src/test/java/jdocs/akka/cluster/typed/BasicClusterExampleTest.java @@ -67,6 +67,12 @@ public class BasicClusterExampleTest { // extends JUnitSuite { // TODO wait for/verify cluster to form + // #prepare + PrepareForFullClusterShutdown msg = + PrepareForFullClusterShutdown.prepareForFullClusterShutdown(); + cluster2.manager().tell(msg); + // #prepare + // #cluster-leave cluster2.manager().tell(Leave.create(cluster2.selfMember().address())); // #cluster-leave diff --git a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java index a927db41e5..0098da699d 100644 --- a/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java +++ b/akka-cluster/src/main/java/akka/cluster/protobuf/msg/ClusterMessages.java @@ -166,6 +166,14 @@ public final class ClusterMessages { * WeaklyUp = 6; */ WeaklyUp(6), + /** + * PreparingForShutdown = 7; + */ + PreparingForShutdown(7), + /** + * ReadyForShutdown = 8; + */ + ReadyForShutdown(8), ; /** @@ -196,6 +204,14 @@ public final class ClusterMessages { * WeaklyUp = 6; */ public static final int WeaklyUp_VALUE = 6; + /** + * PreparingForShutdown = 7; + */ + public static final int PreparingForShutdown_VALUE = 7; + /** + * ReadyForShutdown = 8; + */ + public static final int ReadyForShutdown_VALUE = 8; public final int getNumber() { @@ -225,6 +241,8 @@ public final class ClusterMessages { case 4: return Down; case 5: return Removed; case 6: return WeaklyUp; + case 7: return PreparingForShutdown; + case 8: return ReadyForShutdown; default: return null; } } @@ -22446,10 +22464,11 @@ public final class ClusterMessages { "\002(\r\022\031\n\021allowLocalRoutees\030\003 \002(\010\022\017\n\007useRol" + "e\030\004 \001(\t\022\020\n\010useRoles\030\005 \003(\t*D\n\022Reachabilit" + "yStatus\022\r\n\tReachable\020\000\022\017\n\013Unreachable\020\001\022" + - "\016\n\nTerminated\020\002*b\n\014MemberStatus\022\013\n\007Joini" + - "ng\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022\010" + - "\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006B\035\n\031a" + - "kka.cluster.protobuf.msgH\001" + "\016\n\nTerminated\020\002*\222\001\n\014MemberStatus\022\013\n\007Join" + + "ing\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022" + + "\010\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006\022\030\n\024" + + "PreparingForShutdown\020\007\022\024\n\020ReadyForShutdo" + + "wn\020\010B\035\n\031akka.cluster.protobuf.msgH\001" }; descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, diff --git a/akka-cluster/src/main/protobuf/ClusterMessages.proto b/akka-cluster/src/main/protobuf/ClusterMessages.proto index 65b323429c..acb19aab13 100644 --- a/akka-cluster/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster/src/main/protobuf/ClusterMessages.proto @@ -198,6 +198,8 @@ enum MemberStatus { Down = 4; Removed = 5; WeaklyUp = 6; + PreparingForShutdown = 7; + ReadyForShutdown = 8; } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index adf2cce188..ecb7160833 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -320,6 +320,13 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { clusterCore ! ClusterUserAction.JoinTo(fillLocal(address)) } + /** + * Change the state of every member in preparation for a full cluster shutdown. + */ + def prepareForFullClusterShutdown(): Unit = { + clusterCore ! ClusterUserAction.PrepareForShutdown + } + private def fillLocal(address: Address): Address = { // local address might be used if grabbed from actorRef.path.address if (address.hasLocalScope && address.system == selfAddress.system) selfAddress diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 5eb9e7ece5..4225818ad7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -61,6 +61,11 @@ private[cluster] object ClusterUserAction { @SerialVersionUID(1L) final case class Down(address: Address) extends ClusterMessage + /** + * Command to mark all nodes as shutting down + */ + @SerialVersionUID(1L) + case object PrepareForShutdown extends ClusterMessage } /** @@ -356,6 +361,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh var joinSeedNodesDeadline: Option[Deadline] = None var leaderActionCounter = 0 var selfDownCounter = 0 + var preparingForShutdown = false var exitingTasksInProgress = false val selfExiting = Promise[Done]() @@ -555,6 +561,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh case Join(node, roles, appVersion) => joining(node, roles, appVersion) case ClusterUserAction.Down(address) => downing(address) case ClusterUserAction.Leave(address) => leaving(address) + case ClusterUserAction.PrepareForShutdown => startPrepareForShutdown() case SendGossipTo(address) => sendGossipTo(address) case msg: SubscriptionMessage => publisher.forward(msg) case QuarantinedEvent(ua) => quarantined(UniqueAddress(ua)) @@ -728,83 +735,90 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh * current gossip state, including the new joining member. */ def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: Version): Unit = { - val selfStatus = latestGossip.member(selfUniqueAddress).status - if (joiningNode.address.protocol != selfAddress.protocol) - logWarning( - "Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", - selfAddress.protocol, - joiningNode.address.protocol) - else if (joiningNode.address.system != selfAddress.system) - logWarning( - "Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", - selfAddress.system, - joiningNode.address.system) - else if (removeUnreachableWithMemberStatus.contains(selfStatus)) - logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", joiningNode, selfStatus) - else { - val localMembers = latestGossip.members + if (!preparingForShutdown) { + val selfStatus = latestGossip.member(selfUniqueAddress).status + if (joiningNode.address.protocol != selfAddress.protocol) + logWarning( + "Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", + selfAddress.protocol, + joiningNode.address.protocol) + else if (joiningNode.address.system != selfAddress.system) + logWarning( + "Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", + selfAddress.system, + joiningNode.address.system) + else if (removeUnreachableWithMemberStatus.contains(selfStatus)) + logInfo( + "Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", + joiningNode, + selfStatus) + else { + val localMembers = latestGossip.members + + // check by address without uid to make sure that node with same host:port is not allowed + // to join until previous node with that host:port has been removed from the cluster + localMembers.find(_.address == joiningNode.address) match { + case Some(m) if m.uniqueAddress == joiningNode => + // node retried join attempt, probably due to lost Welcome message + logInfo("Existing member [{}] is joining again.", m) + if (joiningNode != selfUniqueAddress) + sender() ! Welcome(selfUniqueAddress, latestGossip) + case Some(m) => + // node restarted, same host:port as existing member, but with different uid + // safe to down and later remove existing member + // new node will retry join + logInfo( + "New incarnation of existing member [{}] is trying to join. " + + "Existing will be removed from the cluster and then new member will be allowed to join.", + m) + if (m.status != Down) { + // we can confirm it as terminated/unreachable immediately + val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, m.uniqueAddress) + val newOverview = latestGossip.overview.copy(reachability = newReachability) + val newGossip = latestGossip.copy(overview = newOverview) + updateLatestGossip(newGossip) + + downing(m.address) + } + case None => + // remove the node from the failure detector + failureDetector.remove(joiningNode.address) + crossDcFailureDetector.remove(joiningNode.address) + + // add joining node as Joining + // add self in case someone else joins before self has joined (Set discards duplicates) + val newMembers = localMembers + Member(joiningNode, roles, appVersion) + Member( + selfUniqueAddress, + cluster.selfRoles, + cluster.settings.AppVersion) + val newGossip = latestGossip.copy(members = newMembers) - // check by address without uid to make sure that node with same host:port is not allowed - // to join until previous node with that host:port has been removed from the cluster - localMembers.find(_.address == joiningNode.address) match { - case Some(m) if m.uniqueAddress == joiningNode => - // node retried join attempt, probably due to lost Welcome message - logInfo("Existing member [{}] is joining again.", m) - if (joiningNode != selfUniqueAddress) - sender() ! Welcome(selfUniqueAddress, latestGossip) - case Some(m) => - // node restarted, same host:port as existing member, but with different uid - // safe to down and later remove existing member - // new node will retry join - logInfo( - "New incarnation of existing member [{}] is trying to join. " + - "Existing will be removed from the cluster and then new member will be allowed to join.", - m) - if (m.status != Down) { - // we can confirm it as terminated/unreachable immediately - val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, m.uniqueAddress) - val newOverview = latestGossip.overview.copy(reachability = newReachability) - val newGossip = latestGossip.copy(overview = newOverview) updateLatestGossip(newGossip) - downing(m.address) - } - case None => - // remove the node from the failure detector - failureDetector.remove(joiningNode.address) - crossDcFailureDetector.remove(joiningNode.address) + if (joiningNode == selfUniqueAddress) { + logInfo( + ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining), + "Node [{}] is JOINING itself (with roles [{}], version [{}]) and forming new cluster", + joiningNode.address, + roles.mkString(", "), + appVersion) + if (localMembers.isEmpty) + leaderActions() // important for deterministic oldest when bootstrapping + } else { + logInfo( + ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining), + "Node [{}] is JOINING, roles [{}], version [{}]", + joiningNode.address, + roles.mkString(", "), + appVersion) + sender() ! Welcome(selfUniqueAddress, latestGossip) + } - // add joining node as Joining - // add self in case someone else joins before self has joined (Set discards duplicates) - val newMembers = localMembers + Member(joiningNode, roles, appVersion) + Member( - selfUniqueAddress, - cluster.selfRoles, - cluster.settings.AppVersion) - val newGossip = latestGossip.copy(members = newMembers) - - updateLatestGossip(newGossip) - - if (joiningNode == selfUniqueAddress) { - logInfo( - ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining), - "Node [{}] is JOINING itself (with roles [{}], version [{}]) and forming new cluster", - joiningNode.address, - roles.mkString(", "), - appVersion) - if (localMembers.isEmpty) - leaderActions() // important for deterministic oldest when bootstrapping - } else { - logInfo( - ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining), - "Node [{}] is JOINING, roles [{}], version [{}]", - joiningNode.address, - roles.mkString(", "), - appVersion) - sender() ! Welcome(selfUniqueAddress, latestGossip) - } - - publishMembershipState() + publishMembershipState() + } } + } else { + logInfo("Ignoring join request from [{}] as preparing for shutdown", joiningNode) } } @@ -826,6 +840,27 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh } } + def startPrepareForShutdown(): Unit = { + preparingForShutdown = true + if (!preparingForShutdown) { + val changedMembers = latestGossip.members.collect { + case m if MembershipState.allowedToPrepareToShutdown(m.status) => + m.copy(status = PreparingForShutdown) + } + val newGossip = latestGossip.update(changedMembers) + updateLatestGossip(newGossip) + changedMembers.foreach { member => + logInfo( + ClusterLogMarker.memberChanged(member.uniqueAddress, MemberStatus.PreparingForShutdown), + "Preparing for shutdown [{}] as [{}]", + member.address, + PreparingForShutdown) + } + publishMembershipState() + gossip() + } + } + /** * State transition to LEAVING. * The node will eventually be removed by the leader, after hand-off in EXITING, and only after @@ -834,7 +869,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def leaving(address: Address): Unit = { // only try to update if the node is available (in the member ring) latestGossip.members.find(_.address == address).foreach { existingMember => - if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember.status == Up) { + if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember.status == Up || existingMember.status == PreparingForShutdown || existingMember.status == ReadyForShutdown) { // mark node as LEAVING val newMembers = latestGossip.members - existingMember + existingMember.copy(status = Leaving) val newGossip = latestGossip.copy(members = newMembers) @@ -1211,7 +1246,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh } else { leaderActionCounter += 1 import cluster.settings.{ AllowWeaklyUpMembers, LeaderActionsInterval, WeaklyUpAfter } - if (AllowWeaklyUpMembers && LeaderActionsInterval * leaderActionCounter >= WeaklyUpAfter) + if (AllowWeaklyUpMembers && LeaderActionsInterval * leaderActionCounter >= WeaklyUpAfter && !preparingForShutdown) moveJoiningToWeaklyUp() if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0) @@ -1231,9 +1266,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh isCurrentlyLeader = false } cleanupExitingConfirmed() + checkForPrepareForShutdown() shutdownSelfWhenDown() } + def checkForPrepareForShutdown(): Unit = { + if (MembershipState.allowedToPrepareToShutdown(latestGossip.member(selfUniqueAddress).status) && latestGossip.members + .exists(m => MembershipState.prepareForShutdownStates(m.status))) { + logDebug("Detected full cluster shutdown") + self ! ClusterUserAction.PrepareForShutdown + } + } + def shutdownSelfWhenDown(): Unit = { if (latestGossip.member(selfUniqueAddress).status == Down) { // When all reachable have seen the state this member will shutdown itself when it has @@ -1290,7 +1334,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val removedOtherDc = if (latestGossip.isMultiDc) { latestGossip.members.filter { m => - (m.dataCenter != selfDc && removeUnreachableWithMemberStatus(m.status)) + m.dataCenter != selfDc && removeUnreachableWithMemberStatus(m.status) } } else Set.empty[Member] @@ -1303,9 +1347,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh var upNumber = 0 { - case m if m.dataCenter == selfDc && isJoiningToUp(m) => + case m if m.dataCenter == selfDc && isJoiningToUp(m) && !preparingForShutdown => // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) // and minimum number of nodes have joined the cluster + // don't move members to up when preparing for shutdown if (upNumber == 0) { // It is alright to use same upNumber as already used by a removed member, since the upNumber // is only used for comparing age of current cluster members (Member.isOlderThan) @@ -1319,6 +1364,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh case m if m.dataCenter == selfDc && m.status == Leaving => // Move LEAVING => EXITING (once we have a convergence on LEAVING) m.copy(status = Exiting) + + case m if m.dataCenter == selfDc & m.status == PreparingForShutdown => + // Move PreparingForShutdown => ReadyForShutdown (once we have a convergence on PreparingForShutdown) + m.copy(status = ReadyForShutdown) } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index f8461f8dbb..235b9791e0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -318,6 +318,16 @@ object ClusterEvent { if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member) } + final case class MemberPreparingForShutdown(member: Member) extends MemberEvent { + if (member.status != PreparingForShutdown) + throw new IllegalArgumentException("Expected PreparingForShutdown status, got: " + member) + } + + final case class MemberReadyForShutdown(member: Member) extends MemberEvent { + if (member.status != ReadyForShutdown) + throw new IllegalArgumentException("Expected ReadyForShutdown status, got: " + member) + } + /** * Member status changed to `MemberStatus.Exiting` and will be removed * when all members have seen the `Exiting` status. @@ -554,12 +564,14 @@ object ClusterEvent { newMember } val memberEvents = (newMembers ++ changedMembers).unsorted.collect { - case m if m.status == Joining => MemberJoined(m) - case m if m.status == WeaklyUp => MemberWeaklyUp(m) - case m if m.status == Up => MemberUp(m) - case m if m.status == Leaving => MemberLeft(m) - case m if m.status == Exiting => MemberExited(m) - case m if m.status == Down => MemberDowned(m) + case m if m.status == Joining => MemberJoined(m) + case m if m.status == WeaklyUp => MemberWeaklyUp(m) + case m if m.status == Up => MemberUp(m) + case m if m.status == Leaving => MemberLeft(m) + case m if m.status == Exiting => MemberExited(m) + case m if m.status == Down => MemberDowned(m) + case m if m.status == PreparingForShutdown => MemberPreparingForShutdown(m) + case m if m.status == ReadyForShutdown => MemberReadyForShutdown(m) // no events for other transitions } diff --git a/akka-cluster/src/main/scala/akka/cluster/Member.scala b/akka-cluster/src/main/scala/akka/cluster/Member.scala index 95a712f99e..e7d346abb7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Member.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Member.scala @@ -195,6 +195,7 @@ object Member { /** * Picks the Member with the highest "priority" MemberStatus. + * Where highest priority is furthest along the membership state machine */ def highestPriorityOf(m1: Member, m2: Member): Member = { if (m1.status == m2.status) @@ -202,19 +203,23 @@ object Member { if (m1.isOlderThan(m2)) m1 else m2 else (m1.status, m2.status) match { - case (Removed, _) => m1 - case (_, Removed) => m2 - case (Down, _) => m1 - case (_, Down) => m2 - case (Exiting, _) => m1 - case (_, Exiting) => m2 - case (Leaving, _) => m1 - case (_, Leaving) => m2 - case (Joining, _) => m2 - case (_, Joining) => m1 - case (WeaklyUp, _) => m2 - case (_, WeaklyUp) => m1 - case (Up, Up) => m1 + case (Removed, _) => m1 + case (_, Removed) => m2 + case (ReadyForShutdown, _) => m1 + case (_, ReadyForShutdown) => m2 + case (Down, _) => m1 + case (_, Down) => m2 + case (Exiting, _) => m1 + case (_, Exiting) => m2 + case (Leaving, _) => m1 + case (_, Leaving) => m2 + case (Joining, _) => m2 + case (_, Joining) => m1 + case (WeaklyUp, _) => m2 + case (_, WeaklyUp) => m1 + case (PreparingForShutdown, _) => m1 + case (_, PreparingForShutdown) => m2 + case (Up, Up) => m1 } } @@ -235,6 +240,8 @@ object MemberStatus { @SerialVersionUID(1L) case object Exiting extends MemberStatus @SerialVersionUID(1L) case object Down extends MemberStatus @SerialVersionUID(1L) case object Removed extends MemberStatus + @SerialVersionUID(1L) case object PreparingForShutdown extends MemberStatus + @SerialVersionUID(1L) case object ReadyForShutdown extends MemberStatus /** * Java API: retrieve the `Joining` status singleton @@ -271,6 +278,16 @@ object MemberStatus { */ def removed: MemberStatus = Removed + /** + * Java API: retrieve the `ShuttingDown` status singleton + */ + def shuttingDown: MemberStatus = PreparingForShutdown + + /** + * Java API: retrieve the `ShutDown` status singleton + */ + def shutDown: MemberStatus = ReadyForShutdown + /** * INTERNAL API */ @@ -278,10 +295,12 @@ object MemberStatus { Map( Joining -> Set(WeaklyUp, Up, Leaving, Down, Removed), WeaklyUp -> Set(Up, Leaving, Down, Removed), - Up -> Set(Leaving, Down, Removed), + Up -> Set(Leaving, Down, Removed, PreparingForShutdown), Leaving -> Set(Exiting, Down, Removed), Down -> Set(Removed), Exiting -> Set(Removed, Down), + PreparingForShutdown -> Set(ReadyForShutdown, Removed, Leaving, Down), + ReadyForShutdown -> Set(Removed, Leaving, Down), Removed -> Set.empty[MemberStatus]) } diff --git a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala index 181d271c16..4ca5b4d9bb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala +++ b/akka-cluster/src/main/scala/akka/cluster/MembershipState.scala @@ -22,10 +22,13 @@ import akka.util.ccompat._ @ccompatUsedUntil213 @InternalApi private[akka] object MembershipState { import MemberStatus._ - private val leaderMemberStatus = Set[MemberStatus](Up, Leaving) - private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) + private val leaderMemberStatus = Set[MemberStatus](Up, Leaving, PreparingForShutdown, ReadyForShutdown) + private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving, PreparingForShutdown, ReadyForShutdown) val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) + // If a member hasn't join yet or has already started leaving don't mark it as shutting down + val allowedToPrepareToShutdown = Set[MemberStatus](Up) + val prepareForShutdownStates = Set[MemberStatus](PreparingForShutdown, ReadyForShutdown) } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index df1a3ce372..d58241139e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -54,6 +54,7 @@ private[akka] object ClusterMessageSerializer { val WelcomeManifest = "W" val LeaveManifest = "L" val DownManifest = "D" + val PrepareForShutdownManifest = "PS" val InitJoinManifest = "IJ" val InitJoinAckManifest = "IJA" val InitJoinNackManifest = "IJN" @@ -93,6 +94,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) case _: GossipStatus => GossipStatusManifest case _: GossipEnvelope => GossipEnvelopeManifest case _: ClusterRouterPool => ClusterRouterPoolManifest + case ClusterUserAction.PrepareForShutdown => PrepareForShutdownManifest case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") } @@ -372,7 +374,9 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) MemberStatus.Exiting -> cm.MemberStatus.Exiting_VALUE, MemberStatus.Down -> cm.MemberStatus.Down_VALUE, MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE, - MemberStatus.WeaklyUp -> cm.MemberStatus.WeaklyUp_VALUE) + MemberStatus.WeaklyUp -> cm.MemberStatus.WeaklyUp_VALUE, + MemberStatus.PreparingForShutdown -> cm.MemberStatus.PreparingForShutdown_VALUE, + MemberStatus.ReadyForShutdown -> cm.MemberStatus.ReadyForShutdown_VALUE) private val memberStatusFromInt = memberStatusToInt.map { case (a, b) => (b, a) } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterShutdownSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterShutdownSpec.scala new file mode 100644 index 0000000000..37309a3e4b --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterShutdownSpec.scala @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster + +import akka.cluster.MemberStatus.Removed +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.util.ccompat._ +import com.typesafe.config.ConfigFactory +import org.scalatest.concurrent.Eventually + +import scala.concurrent.duration._ + +object ClusterShutdownSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val forth = role("forth") + + commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString(""" + # important config + """).withFallback(MultiNodeClusterSpec.clusterConfig))) +} + +class ClusterShutdownSpecMultiJvmNode1 extends ClusterShutdownSpec +class ClusterShutdownSpecMultiJvmNode2 extends ClusterShutdownSpec +class ClusterShutdownSpecMultiJvmNode3 extends ClusterShutdownSpec +class ClusterShutdownSpecMultiJvmNode4 extends ClusterShutdownSpec + +@ccompatUsedUntil213 +abstract class ClusterShutdownSpec + extends MultiNodeSpec(ClusterShutdownSpec) + with MultiNodeClusterSpec + with Eventually { + + import ClusterShutdownSpec._ + + "Cluster shutdown" should { + "form cluster" in { + awaitClusterUp(first, second, third) + } + enterBarrier("cluster-up") + "prepare for shutdown" in { + runOn(first) { + Cluster(system).prepareForFullClusterShutdown() + } + + runOn(first, second, third) { + awaitAssert({ + withClue("members: " + Cluster(system).readView.members) { + Cluster(system).selfMember.status shouldEqual MemberStatus.ReadyForShutdown + } + }, 10.seconds) + } + } + "spread around the cluster" in { + runOn(first, second, third) { + awaitAssert { + Cluster(system).readView.members.unsorted.map(_.status) shouldEqual Set(MemberStatus.ReadyForShutdown) + } + } + enterBarrier("propagation finished") + } + "not allow new members to join" in { + runOn(forth) { + cluster.join(address(first)) + Thread.sleep(3000) + // should not be allowed to join the cluster even after some time + awaitAssert { + cluster.selfMember.status shouldBe MemberStatus.Removed + } + + } + enterBarrier("not-allowed-to-join") + } + "be allowed to leave" in { + runOn(first) { + Cluster(system).leave(address(first)) + } + awaitAssert({ + withClue("members: " + Cluster(system).readView.members) { + runOn(second, third) { + Cluster(system).readView.members.size shouldEqual 2 + } + runOn(first) { + Cluster(system).selfMember.status shouldEqual Removed + } + } + }, 10.seconds) + enterBarrier("first-gone") + runOn(second) { + Cluster(system).leave(address(second)) + Cluster(system).leave(address(third)) + } + awaitAssert({ + withClue("self member: " + Cluster(system).selfMember) { + Cluster(system).selfMember.status shouldEqual Removed + } + }, 10.seconds) + enterBarrier("all-gone") + } + } +} diff --git a/akka-docs/src/main/paradox/typed/cluster-membership.md b/akka-docs/src/main/paradox/typed/cluster-membership.md index dbd53512cc..79b1c514ef 100644 --- a/akka-docs/src/main/paradox/typed/cluster-membership.md +++ b/akka-docs/src/main/paradox/typed/cluster-membership.md @@ -38,6 +38,8 @@ merged and converge to the same end result. * **weakly up** - transient state while network split (only if `akka.cluster.allow-weakly-up-members=on`) * **up** - normal operating state + + * **preparing for shutdown** / **ready for shutdown** - an optional state that can be moved to before doing a full cluster shut down * **leaving** / **exiting** - states during graceful removal @@ -58,6 +60,8 @@ Note that the node might already have been shutdown when this event is published of at least one other node. * `ClusterEvent.ReachableMember` - A member is considered as reachable again, after having been unreachable. All nodes that previously detected it as unreachable has detected it as reachable again. + * `ClusterEvent.MemberPreparingForShutdown` - A member is preparing for a full cluster shutdown + * `ClusterEvent.MemberReadyForShutdown` - A member is ready for a full cluster shutdown ## Membership Lifecycle @@ -126,6 +130,27 @@ that are in this state, but you should be aware of that members on the other side of a network partition have no knowledge about the existence of the new members. You should for example not count `WeaklyUp` members in quorum decisions. +## Full cluster shutdown + +In some rare cases it may be desirable to do a full cluster shutdown rather than a rolling deploy. +For example, a protocol change where it is simpler to restart the cluster than to make the protocol change +backward compatible. + +As of Akka `2.6.13` it can be signalled that a full cluster shutdown is about to happen and any expensive actions such as: + +* Cluster sharding rebalances +* Moving of Cluster singletons + +Won't happen. That way the shutdown will be as quick as possible and a new version can be started up without delay. + +If a cluster isn't to be restarted right away then there is no need to prepare it for shutdown. + +To use this feature use `Cluster(system).prepareForFullClusterShutdown()` in classic or @apidoc[PrepareForFullClusterShutdown] in typed. + +Wait for all `Up` members to become `ReadyForShutdown` and then all nodes can be shutdown and restarted. +Members that aren't `Up` yet will remain in the `Joining` or `WeaklyUp` states. Any node that is already leaving +the cluster i.e. in the `Leaving` or `Exiting` states will continue to leave the cluster via the normal path. + ## State Diagrams ### State Diagram for the Member States