Full cluster shutdown (#29838)

* member information for full cluster shutdown

* Cluster singleton: dont hand over when in ready for shutdown

* Noop everything in shard coordinator

* Set all members to preparing for shutdown

* Don't allow a node to join after prepare for shutdown

* Review feedbac: singleton listen to all member chagnes

* Java API

* More better

* Keep sharding working while ready for shutdown

* Mima

* Revert DEBUG logging

* gs

* Fix api doc link

* Missed review feedback

* Review feedback
This commit is contained in:
Christopher Batey 2021-02-12 09:59:20 +00:00 committed by GitHub
parent 278a36d036
commit c5f16dcee1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 911 additions and 183 deletions

View file

@ -8,7 +8,7 @@
<pattern>%date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n</pattern> <pattern>%date{ISO8601} %-5level %logger %marker - %msg MDC: {%mdc}%n</pattern>
</encoder> </encoder>
</appender> </appender>
<root level="INFO"> <root level="INFO">
<appender-ref ref="STDOUT"/> <appender-ref ref="STDOUT"/>
</root> </root>

View file

@ -0,0 +1,139 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding.typed
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.util.ccompat._
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.MemberStatus
import akka.cluster.MemberStatus.Removed
import akka.cluster.sharding.typed.ClusterShardingPreparingForShutdownSpec.Pinger.Command
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.cluster.typed.Leave
import akka.cluster.typed.MultiNodeTypedClusterSpec
import akka.cluster.typed.PrepareForFullClusterShutdown
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.serialization.jackson.CborSerializable
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object ClusterShardingPreparingForShutdownSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = off
akka.cluster.leader-actions-interval = 100ms
"""))
object Pinger {
sealed trait Command extends CborSerializable
case class Ping(id: Int, ref: ActorRef[Pong]) extends Command
case class Pong(id: Int) extends CborSerializable
def apply(): Behavior[Command] = Behaviors.setup { _ =>
Behaviors.receiveMessage[Command] {
case Ping(id: Int, ref) =>
ref ! Pong(id)
Behaviors.same
}
}
}
val typeKey = EntityTypeKey[Command]("ping")
}
class ClusterShardingPreparingForShutdownMultiJvmNode1 extends ClusterShardingPreparingForShutdownSpec
class ClusterShardingPreparingForShutdownMultiJvmNode2 extends ClusterShardingPreparingForShutdownSpec
class ClusterShardingPreparingForShutdownMultiJvmNode3 extends ClusterShardingPreparingForShutdownSpec
@ccompatUsedUntil213
class ClusterShardingPreparingForShutdownSpec
extends MultiNodeSpec(ClusterShardingPreparingForShutdownSpec)
with MultiNodeTypedClusterSpec {
import ClusterShardingPreparingForShutdownSpec._
import ClusterShardingPreparingForShutdownSpec.Pinger._
override def initialParticipants = roles.size
private val sharding = ClusterSharding(typedSystem)
"Preparing for shut down ClusterSharding" must {
"form cluster" in {
formCluster(first, second, third)
}
"not rebalance but should still work preparing for shutdown" in {
val shardRegion: ActorRef[ShardingEnvelope[Command]] =
sharding.init(Entity(typeKey)(_ => Pinger()))
val probe = TestProbe[Pong]()
shardRegion ! ShardingEnvelope("id1", Pinger.Ping(1, probe.ref))
probe.expectMessage(Pong(1))
runOn(second) {
cluster.manager ! PrepareForFullClusterShutdown
}
awaitAssert({
withClue("members: " + cluster.state.members) {
cluster.selfMember.status shouldEqual MemberStatus.ReadyForShutdown
cluster.state.members.unsorted.map(_.status) shouldEqual Set(MemberStatus.ReadyForShutdown)
}
}, 10.seconds)
enterBarrier("preparation-complete")
shardRegion ! ShardingEnvelope("id2", Pinger.Ping(2, probe.ref))
probe.expectMessage(Pong(2))
runOn(second) {
cluster.manager ! Leave(address(second))
}
awaitAssert({
runOn(first, third) {
withClue("members: " + cluster.state.members) {
cluster.state.members.size shouldEqual 2
}
}
runOn(second) {
withClue("self member: " + cluster.selfMember) {
cluster.selfMember.status shouldEqual MemberStatus.Removed
}
}
}, 5.seconds) // keep this lower than coordinated shutdown timeout
runOn(first, third) {
shardRegion ! ShardingEnvelope("id3", Pinger.Ping(3, probe.ref))
probe.expectMessage(Pong(3))
}
enterBarrier("new-shards-verified")
runOn(third) {
cluster.manager ! Leave(address(first))
cluster.manager ! Leave(address(third))
}
awaitAssert({
withClue("self member: " + cluster.selfMember) {
cluster.selfMember.status shouldEqual Removed
}
}, 15.seconds)
enterBarrier("done")
}
}
}

View file

@ -63,7 +63,6 @@ abstract class ClusterShardingStatsSpec
import Pinger._ import Pinger._
private val typeKey = EntityTypeKey[Command]("ping") private val typeKey = EntityTypeKey[Command]("ping")
private val sharding = ClusterSharding(typedSystem) private val sharding = ClusterSharding(typedSystem)
private val settings = ClusterShardingSettings(typedSystem) private val settings = ClusterShardingSettings(typedSystem)
private val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all private val queryTimeout = settings.shardRegionQueryTimeout * roles.size.toLong //numeric widening y'all

View file

@ -0,0 +1,2 @@
# private[akka] class
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceTask")

View file

@ -20,6 +20,10 @@ import akka.actor.Terminated
import akka.actor.Timers import akka.actor.Timers
import akka.annotation.InternalStableApi import akka.annotation.InternalStableApi
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.ClusterEvent.InitialStateAsEvents
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent.MemberPreparingForShutdown
import akka.cluster.ClusterEvent.MemberReadyForShutdown
import akka.cluster.sharding.internal.RememberEntitiesShardStore import akka.cluster.sharding.internal.RememberEntitiesShardStore
import akka.cluster.sharding.internal.RememberEntitiesShardStore.GetEntities import akka.cluster.sharding.internal.RememberEntitiesShardStore.GetEntities
import akka.cluster.sharding.internal.RememberEntitiesProvider import akka.cluster.sharding.internal.RememberEntitiesProvider
@ -453,6 +457,7 @@ private[akka] class Shard(
private val messageBuffers = new MessageBufferMap[EntityId] private val messageBuffers = new MessageBufferMap[EntityId]
private var handOffStopper: Option[ActorRef] = None private var handOffStopper: Option[ActorRef] = None
private var preparingForShutdown = false
import context.dispatcher import context.dispatcher
private val passivateIdleTask = if (settings.shouldPassivateIdleEntities) { private val passivateIdleTask = if (settings.shouldPassivateIdleEntities) {
@ -479,6 +484,11 @@ private[akka] class Shard(
} }
override def preStart(): Unit = { override def preStart(): Unit = {
Cluster(context.system).subscribe(
self,
InitialStateAsEvents,
classOf[MemberPreparingForShutdown],
classOf[MemberReadyForShutdown])
acquireLeaseIfNeeded() acquireLeaseIfNeeded()
} }
@ -509,6 +519,8 @@ private[akka] class Shard(
tryGetLease(lease.get) tryGetLease(lease.get)
case ll: LeaseLost => case ll: LeaseLost =>
receiveLeaseLost(ll) receiveLeaseLost(ll)
case me: MemberEvent =>
receiveMemberEvent(me)
case msg => case msg =>
if (verboseDebug) if (verboseDebug)
log.debug( log.debug(
@ -519,6 +531,15 @@ private[akka] class Shard(
stash() stash()
} }
private def receiveMemberEvent(event: MemberEvent): Unit = event match {
case _: MemberReadyForShutdown | _: MemberPreparingForShutdown =>
if (!preparingForShutdown) {
log.info("{}: Preparing for shutdown", typeName)
preparingForShutdown = true
}
case _ =>
}
private def tryGetLease(l: Lease): Unit = { private def tryGetLease(l: Lease): Unit = {
log.info("{}: Acquiring lease {}", typeName, l.settings) log.info("{}: Acquiring lease {}", typeName, l.settings)
pipe(l.acquire(reason => self ! LeaseLost(reason)).map(r => LeaseAcquireResult(r, None)).recover { pipe(l.acquire(reason => self ! LeaseLost(reason)).map(r => LeaseAcquireResult(r, None)).recover {
@ -548,6 +569,8 @@ private[akka] class Shard(
onEntitiesRemembered(entityIds) onEntitiesRemembered(entityIds)
case RememberEntityTimeout(GetEntities) => case RememberEntityTimeout(GetEntities) =>
loadingEntityIdsFailed() loadingEntityIdsFailed()
case me: MemberEvent =>
receiveMemberEvent(me)
case msg => case msg =>
if (verboseDebug) if (verboseDebug)
log.debug( log.debug(
@ -590,6 +613,7 @@ private[akka] class Shard(
// when not remembering entities, we stay in this state all the time // when not remembering entities, we stay in this state all the time
def idle: Receive = { def idle: Receive = {
case Terminated(ref) => receiveTerminated(ref) case Terminated(ref) => receiveTerminated(ref)
case me: MemberEvent => receiveMemberEvent(me)
case EntityTerminated(ref) => entityTerminated(ref) case EntityTerminated(ref) => entityTerminated(ref)
case msg: CoordinatorMessage => receiveCoordinatorMessage(msg) case msg: CoordinatorMessage => receiveCoordinatorMessage(msg)
case msg: RememberEntityCommand => receiveRememberEntityCommand(msg) case msg: RememberEntityCommand => receiveRememberEntityCommand(msg)
@ -659,6 +683,7 @@ private[akka] class Shard(
throw new RuntimeException( throw new RuntimeException(
s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender())) case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender()))
case me: MemberEvent => receiveMemberEvent(me)
case Terminated(ref) => receiveTerminated(ref) case Terminated(ref) => receiveTerminated(ref)
case EntityTerminated(ref) => entityTerminated(ref) case EntityTerminated(ref) => entityTerminated(ref)
case _: CoordinatorMessage => stash() case _: CoordinatorMessage => stash()
@ -814,7 +839,8 @@ private[akka] class Shard(
} }
private def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match { private def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match {
case HandOff(`shardId`) => handOff(sender()) case HandOff(`shardId`) =>
handOff(sender())
case HandOff(shard) => case HandOff(shard) =>
log.warning("{}: Shard [{}] can not hand off for another Shard [{}]", typeName, shardId, shard) log.warning("{}: Shard [{}] can not hand off for another Shard [{}]", typeName, shardId, shard)
case _ => unhandled(msg) case _ => unhandled(msg)
@ -839,7 +865,12 @@ private[akka] class Shard(
// does conversion so only do once // does conversion so only do once
val activeEntities = entities.activeEntities() val activeEntities = entities.activeEntities()
if (activeEntities.nonEmpty) { if (preparingForShutdown) {
log.info("{}: HandOff shard [{}] while preparing for shutdown. Stopping right away.", typeName, shardId)
activeEntities.foreach { _ ! handOffStopMessage }
replyTo ! ShardStopped(shardId)
context.stop(self)
} else if (activeEntities.nonEmpty && !preparingForShutdown) {
val entityHandOffTimeout = (settings.tuningParameters.handOffTimeout - 5.seconds).max(1.seconds) val entityHandOffTimeout = (settings.tuningParameters.handOffTimeout - 5.seconds).max(1.seconds)
log.debug( log.debug(
"{}: Starting HandOffStopper for shard [{}] to terminate [{}] entities.", "{}: Starting HandOffStopper for shard [{}] to terminate [{}] entities.",

View file

@ -639,7 +639,8 @@ object ShardCoordinator {
abstract class ShardCoordinator( abstract class ShardCoordinator(
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy) allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends Actor { extends Actor
with Timers {
import ShardCoordinator._ import ShardCoordinator._
import ShardCoordinator.Internal._ import ShardCoordinator.Internal._
@ -661,6 +662,7 @@ abstract class ShardCoordinator(
var allRegionsRegistered = false var allRegionsRegistered = false
var state = State.empty.withRememberEntities(settings.rememberEntities) var state = State.empty.withRememberEntities(settings.rememberEntities)
var preparingForShutdown = false
// rebalanceInProgress for the ShardId keys, pending GetShardHome requests by the ActorRef values // rebalanceInProgress for the ShardId keys, pending GetShardHome requests by the ActorRef values
var rebalanceInProgress = Map.empty[ShardId, Set[ActorRef]] var rebalanceInProgress = Map.empty[ShardId, Set[ActorRef]]
var rebalanceWorkers: Set[ActorRef] = Set.empty var rebalanceWorkers: Set[ActorRef] = Set.empty
@ -672,14 +674,17 @@ abstract class ShardCoordinator(
import context.dispatcher import context.dispatcher
val rebalanceTask = cluster.subscribe(
context.system.scheduler.scheduleWithFixedDelay(rebalanceInterval, rebalanceInterval, self, RebalanceTick) self,
initialStateMode = InitialStateAsEvents,
cluster.subscribe(self, initialStateMode = InitialStateAsEvents, ClusterShuttingDown.getClass) ClusterShuttingDown.getClass,
classOf[MemberReadyForShutdown],
classOf[MemberPreparingForShutdown])
protected def typeName: String protected def typeName: String
override def preStart(): Unit = { override def preStart(): Unit = {
timers.startTimerWithFixedDelay(RebalanceTick, RebalanceTick, rebalanceInterval)
allocationStrategy match { allocationStrategy match {
case strategy: StartableAllocationStrategy => case strategy: StartableAllocationStrategy =>
strategy.start() strategy.start()
@ -691,7 +696,6 @@ abstract class ShardCoordinator(
override def postStop(): Unit = { override def postStop(): Unit = {
super.postStop() super.postStop()
rebalanceTask.cancel()
cluster.unsubscribe(self) cluster.unsubscribe(self)
} }
@ -786,7 +790,7 @@ abstract class ShardCoordinator(
} }
case RebalanceTick => case RebalanceTick =>
if (state.regions.nonEmpty) { if (state.regions.nonEmpty && !preparingForShutdown) {
val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress.keySet) val shardsFuture = allocationStrategy.rebalance(state.regions, rebalanceInProgress.keySet)
shardsFuture.value match { shardsFuture.value match {
case Some(Success(shards)) => case Some(Success(shards)) =>
@ -896,6 +900,15 @@ abstract class ShardCoordinator(
// it will soon be stopped when singleton is stopped // it will soon be stopped when singleton is stopped
context.become(shuttingDown) context.become(shuttingDown)
case _: MemberPreparingForShutdown | _: MemberReadyForShutdown =>
if (!preparingForShutdown) {
log.info(
"{}: Shard coordinator detected prepare for full cluster shutdown. No new rebalances will take place.",
typeName)
timers.cancel(RebalanceTick)
preparingForShutdown = true
}
case ShardRegion.GetCurrentRegions => case ShardRegion.GetCurrentRegions =>
val reply = ShardRegion.CurrentRegions(state.regions.keySet.map { ref => val reply = ShardRegion.CurrentRegions(state.regions.keySet.map { ref =>
if (ref.path.address.host.isEmpty) cluster.selfAddress if (ref.path.address.host.isEmpty) cluster.selfAddress

View file

@ -626,6 +626,7 @@ private[akka] class ShardRegion(
var startingShards = Set.empty[ShardId] var startingShards = Set.empty[ShardId]
var handingOff = Set.empty[ActorRef] var handingOff = Set.empty[ActorRef]
var gracefulShutdownInProgress = false var gracefulShutdownInProgress = false
var preparingForShutdown = false
import context.dispatcher import context.dispatcher
var retryCount = 0 var retryCount = 0
@ -759,6 +760,12 @@ private[akka] class ShardRegion(
context.stop(self) context.stop(self)
} }
case _: MemberReadyForShutdown | _: MemberPreparingForShutdown =>
if (!preparingForShutdown) {
log.info("{}. preparing for shutdown", typeName)
}
preparingForShutdown = true
case _: MemberEvent => // these are expected, no need to warn about them case _: MemberEvent => // these are expected, no need to warn about them
case _ => unhandled(evt) case _ => unhandled(evt)
@ -819,14 +826,18 @@ private[akka] class ShardRegion(
case BeginHandOff(shard) => case BeginHandOff(shard) =>
log.debug("{}: BeginHandOff shard [{}]", typeName, shard) log.debug("{}: BeginHandOff shard [{}]", typeName, shard)
if (regionByShard.contains(shard)) { if (!preparingForShutdown) {
val regionRef = regionByShard(shard) if (regionByShard.contains(shard)) {
val updatedShards = regions(regionRef) - shard val regionRef = regionByShard(shard)
if (updatedShards.isEmpty) regions -= regionRef val updatedShards = regions(regionRef) - shard
else regions = regions.updated(regionRef, updatedShards) if (updatedShards.isEmpty) regions -= regionRef
regionByShard -= shard else regions = regions.updated(regionRef, updatedShards)
regionByShard -= shard
}
sender() ! BeginHandOffAck(shard)
} else {
log.debug("{}: Ignoring begin handoff as preparing to shutdown", typeName)
} }
sender() ! BeginHandOffAck(shard)
case msg @ HandOff(shard) => case msg @ HandOff(shard) =>
log.debug("{}: HandOff shard [{}]", typeName, shard) log.debug("{}: HandOff shard [{}]", typeName, shard)
@ -885,20 +896,28 @@ private[akka] class ShardRegion(
} }
case GracefulShutdown => case GracefulShutdown =>
log.debug("{}: Starting graceful shutdown of region and all its shards", typeName) if (preparingForShutdown) {
log.debug(
"{}: Skipping graceful shutdown of region and all its shards as cluster is preparing for shutdown",
typeName)
gracefulShutdownProgress.trySuccess(Done)
context.stop(self)
} else {
log.debug("{}: Starting graceful shutdown of region and all its shards", typeName)
val coordShutdown = CoordinatedShutdown(context.system) val coordShutdown = CoordinatedShutdown(context.system)
if (coordShutdown.getShutdownReason().isPresent) { if (coordShutdown.getShutdownReason().isPresent) {
// use a shorter timeout than the coordinated shutdown phase to be able to log better reason for the timeout // use a shorter timeout than the coordinated shutdown phase to be able to log better reason for the timeout
val timeout = coordShutdown.timeout(CoordinatedShutdown.PhaseClusterShardingShutdownRegion) - 1.second val timeout = coordShutdown.timeout(CoordinatedShutdown.PhaseClusterShardingShutdownRegion) - 1.second
if (timeout > Duration.Zero) { if (timeout > Duration.Zero) {
timers.startSingleTimer(GracefulShutdownTimeout, GracefulShutdownTimeout, timeout) timers.startSingleTimer(GracefulShutdownTimeout, GracefulShutdownTimeout, timeout)
}
} }
}
gracefulShutdownInProgress = true gracefulShutdownInProgress = true
sendGracefulShutdownToCoordinatorIfInProgress() sendGracefulShutdownToCoordinatorIfInProgress()
tryCompleteGracefulShutdownIfInProgress() tryCompleteGracefulShutdownIfInProgress()
}
case GracefulShutdownTimeout => case GracefulShutdownTimeout =>
log.warning( log.warning(
@ -1258,9 +1277,7 @@ private[akka] class ShardRegion(
.orElse(entityProps match { .orElse(entityProps match {
case Some(props) if !shardsByRef.values.exists(_ == id) => case Some(props) if !shardsByRef.values.exists(_ == id) =>
log.debug(ShardingLogMarker.shardStarted(typeName, id), "{}: Starting shard [{}] in region", typeName, id) log.debug(ShardingLogMarker.shardStarted(typeName, id), "{}: Starting shard [{}] in region", typeName, id)
val name = URLEncoder.encode(id, "utf-8") val name = URLEncoder.encode(id, "utf-8")
val shard = context.watch( val shard = context.watch(
context.actorOf( context.actorOf(
Shard Shard

View file

@ -9,9 +9,7 @@ import scala.concurrent.Future
import scala.concurrent.Promise import scala.concurrent.Promise
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NonFatal import scala.util.control.NonFatal
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.AkkaException import akka.AkkaException
import akka.Done import akka.Done
import akka.actor.Actor import akka.actor.Actor
@ -526,6 +524,7 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
var oldestChangedBuffer: ActorRef = _ var oldestChangedBuffer: ActorRef = _
// Previous GetNext request delivered event and new GetNext is to be sent // Previous GetNext request delivered event and new GetNext is to be sent
var oldestChangedReceived = true var oldestChangedReceived = true
var preparingForFullShutdown = false
var selfExited = false var selfExited = false
@ -583,7 +582,13 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
require(!cluster.isTerminated, "Cluster node must not be terminated") require(!cluster.isTerminated, "Cluster node must not be terminated")
// subscribe to cluster changes, re-subscribe when restart // subscribe to cluster changes, re-subscribe when restart
cluster.subscribe(self, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved], classOf[MemberDowned]) cluster.subscribe(
self,
ClusterEvent.InitialStateAsEvents,
classOf[MemberRemoved],
classOf[MemberDowned],
classOf[MemberPreparingForShutdown],
classOf[MemberReadyForShutdown])
startTimerWithFixedDelay(CleanupTimer, Cleanup, 1.minute) startTimerWithFixedDelay(CleanupTimer, Cleanup, 1.minute)
@ -630,6 +635,28 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case Event(HandOverToMe, _) => case Event(HandOverToMe, _) =>
// nothing to hand over in start // nothing to hand over in start
stay() stay()
case Event(event: MemberEvent, _) =>
handleMemberEvent(event)
}
def handleMemberEvent(event: MemberEvent): State = {
event match {
case _: MemberRemoved if event.member.uniqueAddress == cluster.selfUniqueAddress =>
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
case _: MemberDowned if event.member.uniqueAddress == cluster.selfUniqueAddress =>
logInfo("Self downed, stopping ClusterSingletonManager")
stop()
case _: MemberReadyForShutdown | _: MemberPreparingForShutdown =>
if (!preparingForFullShutdown) {
logInfo("Preparing for shut down, disabling expensive actions")
preparingForFullShutdown = true
}
stay()
case _ =>
stay()
}
} }
when(Younger) { when(Younger) {
@ -640,7 +667,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
if (previousOldest.forall(removed.contains)) if (previousOldest.forall(removed.contains))
tryGotoOldest() tryGotoOldest()
else { else {
peer(previousOldest.head.address) ! HandOverToMe if (!preparingForFullShutdown) {
peer(previousOldest.head.address) ! HandOverToMe
}
goto(BecomingOldest).using(BecomingOldestData(previousOldest)) goto(BecomingOldest).using(BecomingOldestData(previousOldest))
} }
} else { } else {
@ -656,18 +685,13 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
stay().using(YoungerData(newPreviousOldest)) stay().using(YoungerData(newPreviousOldest))
} }
case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => case Event(event @ MemberRemoved(m, _), _) if event.member.uniqueAddress != cluster.selfUniqueAddress =>
logInfo("Self downed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), _) =>
scheduleDelayedMemberRemoved(m) scheduleDelayedMemberRemoved(m)
stay() stay()
case Event(event: MemberEvent, _) =>
handleMemberEvent(event)
case Event(DelayedMemberRemoved(m), YoungerData(previousOldest)) => case Event(DelayedMemberRemoved(m), YoungerData(previousOldest)) =>
if (!selfExited) if (!selfExited)
logInfo("Member removed [{}]", m.address) logInfo("Member removed [{}]", m.address)
@ -682,7 +706,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
else { else {
// this node was probably quickly restarted with same hostname:port, // this node was probably quickly restarted with same hostname:port,
// confirm that the old singleton instance has been stopped // confirm that the old singleton instance has been stopped
sender() ! HandOverDone if (!preparingForFullShutdown) {
sender() ! HandOverDone
}
} }
stay() stay()
@ -713,18 +739,13 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
stay() stay()
} }
case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress => case Event(event @ MemberRemoved(m, _), _) if event.member.uniqueAddress != cluster.selfUniqueAddress =>
logInfo("Self downed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), _) =>
scheduleDelayedMemberRemoved(m) scheduleDelayedMemberRemoved(m)
stay() stay()
case Event(event: MemberEvent, _) if event.member.uniqueAddress == cluster.selfUniqueAddress =>
handleMemberEvent(event)
case Event(DelayedMemberRemoved(m), BecomingOldestData(previousOldest)) => case Event(DelayedMemberRemoved(m), BecomingOldestData(previousOldest)) =>
if (!selfExited) if (!selfExited)
logInfo("Member removed [{}], previous oldest [{}]", m.address, previousOldest.map(_.address).mkString(", ")) logInfo("Member removed [{}], previous oldest [{}]", m.address, previousOldest.map(_.address).mkString(", "))
@ -750,25 +771,31 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case Some(senderUniqueAddress) => case Some(senderUniqueAddress) =>
previousOldest.headOption match { previousOldest.headOption match {
case Some(oldest) => case Some(oldest) =>
if (oldest == senderUniqueAddress) if (oldest == senderUniqueAddress) {
sender() ! HandOverToMe if (!preparingForFullShutdown) {
else sender() ! HandOverToMe
}
} else
logInfo( logInfo(
"Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]", "Ignoring TakeOver request in BecomingOldest from [{}]. Expected previous oldest [{}]",
sender().path.address, sender().path.address,
oldest.address) oldest.address)
stay() stay()
case None => case None =>
sender() ! HandOverToMe if (!preparingForFullShutdown) {
sender() ! HandOverToMe
}
stay().using(BecomingOldestData(senderUniqueAddress :: previousOldest)) stay().using(BecomingOldestData(senderUniqueAddress :: previousOldest))
} }
} }
case Event(HandOverRetry(count), BecomingOldestData(previousOldest)) => case Event(HandOverRetry(count), BecomingOldestData(previousOldest)) =>
if (count <= maxHandOverRetries) { if (count <= maxHandOverRetries) {
logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldest.headOption.map(_.address)) if (!preparingForFullShutdown) {
previousOldest.headOption.foreach(node => peer(node.address) ! HandOverToMe) logInfo("Retry [{}], sending HandOverToMe to [{}]", count, previousOldest.headOption.map(_.address))
startSingleTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval) previousOldest.headOption.foreach(node => peer(node.address) ! HandOverToMe)
startSingleTimer(HandOverRetryTimer, HandOverRetry(count + 1), handOverRetryInterval)
}
stay() stay()
} else if (previousOldest.forall(removed.contains)) { } else if (previousOldest.forall(removed.contains)) {
// can't send HandOverToMe, previousOldest unknown for new node (or restart) // can't send HandOverToMe, previousOldest unknown for new node (or restart)
@ -792,9 +819,12 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
def tryAcquireLease() = { def tryAcquireLease() = {
import context.dispatcher import context.dispatcher
pipe(lease.get.acquire(reason => self ! LeaseLost(reason)).map[Any](AcquireLeaseResult).recover {
case NonFatal(t) => AcquireLeaseFailure(t) if (!preparingForFullShutdown) {
}).to(self) pipe(lease.get.acquire(reason => self ! LeaseLost(reason)).map[Any](AcquireLeaseResult).recover {
case NonFatal(t) => AcquireLeaseFailure(t)
}).to(self)
}
goto(AcquiringLease).using(AcquiringLeaseData(leaseRequestInProgress = true, None)) goto(AcquiringLease).using(AcquiringLeaseData(leaseRequestInProgress = true, None))
} }
@ -847,19 +877,29 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
// complete memberExitingProgress when handOverDone // complete memberExitingProgress when handOverDone
sender() ! Done // reply to ask sender() ! Done // reply to ask
stay() stay()
case Event(MemberDowned(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
logInfo("Self downed, stopping ClusterSingletonManager") case Event(event: MemberEvent, _) =>
stop() handleMemberEvent(event)
} }
@InternalStableApi @InternalStableApi
def gotoOldest(): State = { def gotoOldest(): State = {
logInfo( if (preparingForFullShutdown) {
ClusterLogMarker.singletonStarted, logInfo(
"Singleton manager starting singleton actor [{}]", ClusterLogMarker.singletonStarted,
self.path / singletonName) "Singleton manager NOT starting singleton actor [{}] as cluster is preparing to shutdown",
val singleton = context.watch(context.actorOf(singletonProps, singletonName)) self.path / singletonName)
goto(Oldest).using(OldestData(Some(singleton))) goto(Oldest).using(OldestData(None))
} else {
logInfo(
ClusterLogMarker.singletonStarted,
"Singleton manager starting singleton actor [{}]",
self.path / singletonName)
val singleton = context.watch(context.actorOf(singletonProps, singletonName))
goto(Oldest).using(OldestData(Some(singleton)))
}
} }
def handleOldestChanged(singleton: Option[ActorRef], oldestOption: Option[UniqueAddress]) = { def handleOldestChanged(singleton: Option[ActorRef], oldestOption: Option[UniqueAddress]) = {
@ -876,12 +916,16 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
gotoHandingOver(singleton, None) gotoHandingOver(singleton, None)
case Some(a) => case Some(a) =>
// send TakeOver request in case the new oldest doesn't know previous oldest // send TakeOver request in case the new oldest doesn't know previous oldest
peer(a.address) ! TakeOverFromMe if (!preparingForFullShutdown) {
startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval) peer(a.address) ! TakeOverFromMe
startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval)
}
goto(WasOldest).using(WasOldestData(singleton, newOldestOption = Some(a))) goto(WasOldest).using(WasOldestData(singleton, newOldestOption = Some(a)))
case None => case None =>
// new oldest will initiate the hand-over // new oldest will initiate the hand-over
startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval) if (!preparingForFullShutdown) {
startSingleTimer(TakeOverRetryTimer, TakeOverRetry(1), handOverRetryInterval)
}
goto(WasOldest).using(WasOldestData(singleton, newOldestOption = None)) goto(WasOldest).using(WasOldestData(singleton, newOldestOption = None))
} }
} }
@ -916,6 +960,10 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
stop() stop()
} }
// Downed in this case is handled differently so keep this below it
case Event(event: MemberEvent, _) =>
handleMemberEvent(event)
case Event(LeaseLost(reason), OldestData(singleton)) => case Event(LeaseLost(reason), OldestData(singleton)) =>
log.warning("Lease has been lost. Reason: {}. Terminating singleton and trying to re-acquire lease", reason) log.warning("Lease has been lost. Reason: {}. Terminating singleton and trying to re-acquire lease", reason)
singleton match { singleton match {
@ -939,14 +987,18 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) logInfo("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address))
else else
log.debug("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address)) log.debug("Retry [{}], sending TakeOverFromMe to [{}]", count, newOldestOption.map(_.address))
newOldestOption.foreach(node => peer(node.address) ! TakeOverFromMe)
startSingleTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval) if (!preparingForFullShutdown) {
newOldestOption.foreach(node => peer(node.address) ! TakeOverFromMe)
startSingleTimer(TakeOverRetryTimer, TakeOverRetry(count + 1), handOverRetryInterval)
}
stay() stay()
} else } else
throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [$newOldestOption] never occurred") throw new ClusterSingletonManagerIsStuck(s"Expected hand-over to [$newOldestOption] never occurred")
case Event(HandOverToMe, WasOldestData(singleton, _)) => case Event(HandOverToMe, WasOldestData(singleton, _)) =>
gotoHandingOver(singleton, Some(sender())) gotoHandingOver(singleton, Some(sender()))
case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited => case Event(MemberRemoved(m, _), _) if m.uniqueAddress == cluster.selfUniqueAddress && !selfExited =>
logInfo("Self removed, stopping ClusterSingletonManager") logInfo("Self removed, stopping ClusterSingletonManager")
stop() stop()
@ -975,6 +1027,10 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
logInfo("Self downed, stopping") logInfo("Self downed, stopping")
gotoStopping(s) gotoStopping(s)
} }
case Event(event: MemberEvent, _) =>
handleMemberEvent(event)
} }
def gotoHandingOver(singleton: Option[ActorRef], handOverTo: Option[ActorRef]): State = { def gotoHandingOver(singleton: Option[ActorRef], handOverTo: Option[ActorRef]): State = {
@ -995,7 +1051,9 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case Event(HandOverToMe, HandingOverData(_, handOverTo)) if handOverTo.contains(sender()) => case Event(HandOverToMe, HandingOverData(_, handOverTo)) if handOverTo.contains(sender()) =>
// retry // retry
sender() ! HandOverInProgress if (!preparingForFullShutdown) {
sender() ! HandOverInProgress
}
stay() stay()
case Event(SelfExiting, _) => case Event(SelfExiting, _) =>
@ -1003,6 +1061,15 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
// complete memberExitingProgress when handOverDone // complete memberExitingProgress when handOverDone
sender() ! Done // reply to ask sender() ! Done // reply to ask
stay() stay()
case Event(MemberReadyForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
logInfo("Ready for shutdown when handing over. Giving up on handover.")
stop()
case Event(MemberPreparingForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
logInfo("Preparing for shutdown when handing over. Giving up on handover.")
stop()
} }
def handOverDone(handOverTo: Option[ActorRef]): State = { def handOverDone(handOverTo: Option[ActorRef]): State = {
@ -1033,6 +1100,15 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case Event(Terminated(ref), StoppingData(singleton)) if ref == singleton => case Event(Terminated(ref), StoppingData(singleton)) if ref == singleton =>
logInfo(ClusterLogMarker.singletonTerminated, "Singleton actor [{}] was terminated", singleton.path) logInfo(ClusterLogMarker.singletonTerminated, "Singleton actor [{}] was terminated", singleton.path)
stop() stop()
case Event(MemberReadyForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
logInfo("Ready for shutdown when stopping. Not waiting for user actor to shutdown")
stop()
case Event(MemberPreparingForShutdown(m), _) if m.uniqueAddress == cluster.selfUniqueAddress =>
logInfo("Preparing for shutdown when stopping. Not waiting for user actor to shutdown")
stop()
} }
when(End) { when(End) {
@ -1046,7 +1122,14 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
def selfMemberExited(): Unit = { def selfMemberExited(): Unit = {
selfExited = true selfExited = true
logInfo("Exited [{}]", cluster.selfAddress) logInfo(
"Exited [{}].{}",
cluster.selfAddress,
if (preparingForFullShutdown) " From preparing from shutdown" else "")
// handover won't be done so just complete right away
if (preparingForFullShutdown) {
memberExitingProgress.trySuccess(Done)
}
} }
whenUnhandled { whenUnhandled {
@ -1090,6 +1173,8 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
"Failed to release lease. Singleton may not be able to run on another node until lease timeout occurs") "Failed to release lease. Singleton may not be able to run on another node until lease timeout occurs")
} }
stay() stay()
case Event(_: MemberEvent, _) =>
stay() // silence
} }
onTransition { onTransition {

View file

@ -0,0 +1,166 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.singleton
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.cluster.MemberStatus.Removed
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object ClusterSingletonManagerPreparingForShutdownSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "cluster"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.testkit.auto-down-unreachable-after = off
akka.cluster.leader-actions-interval = 100ms
"""))
case object EchoStarted
/**
* The singleton actor
*/
class Echo(testActor: ActorRef) extends Actor with ActorLogging {
override def preStart(): Unit = {
log.info("Singleton starting on {}", Cluster(context.system).selfUniqueAddress)
testActor ! "preStart"
}
override def postStop(): Unit = {
testActor ! "postStop"
}
def receive = {
case "stop" =>
testActor ! "stop"
context.stop(self)
case _ =>
sender() ! self
}
}
}
class ClusterSingletonManagerPreparingForShutdownMultiJvmNode1 extends ClusterSingletonManagerPreparingForShutdownSpec
class ClusterSingletonManagerPreparingForShutdownMultiJvmNode2 extends ClusterSingletonManagerPreparingForShutdownSpec
class ClusterSingletonManagerPreparingForShutdownMultiJvmNode3 extends ClusterSingletonManagerPreparingForShutdownSpec
class ClusterSingletonManagerPreparingForShutdownSpec
extends MultiNodeSpec(ClusterSingletonManagerPreparingForShutdownSpec)
with MultiNodeClusterSpec
with STMultiNodeSpec
with ImplicitSender {
import ClusterSingletonManagerPreparingForShutdownSpec._
override def initialParticipants = roles.size
def createSingleton(): ActorRef = {
system.actorOf(
ClusterSingletonManager.props(
singletonProps = Props(classOf[Echo], testActor),
terminationMessage = "stop",
settings = ClusterSingletonManagerSettings(system)),
name = "echo")
}
val echoProxyTerminatedProbe = TestProbe()
lazy val echoProxy: ActorRef = {
echoProxyTerminatedProbe.watch(
system.actorOf(
ClusterSingletonProxy
.props(singletonManagerPath = "/user/echo", settings = ClusterSingletonProxySettings(system)),
name = "echoProxy"))
}
"Preparing for shut down ClusterSingletonManager" must {
"form cluster" in {
awaitClusterUp(first, second, third)
}
"not handover when ready for shutdown" in {
createSingleton()
runOn(first) {
within(10.seconds) {
expectMsg("preStart")
echoProxy ! "hello"
expectMsgType[ActorRef]
expectNoMessage(2.seconds)
}
}
enterBarrier("singleton-active")
runOn(first) {
Cluster(system).prepareForFullClusterShutdown()
}
awaitAssert({
withClue("members: " + Cluster(system).readView.members) {
Cluster(system).selfMember.status shouldEqual MemberStatus.ReadyForShutdown
}
}, 10.seconds)
enterBarrier("preparation-complete")
runOn(first) {
Cluster(system).leave(address(first))
}
awaitAssert(
{
runOn(second, third) {
withClue("members: " + Cluster(system).readView.members) {
Cluster(system).readView.members.size shouldEqual 2
}
}
runOn(first) {
withClue("self member: " + Cluster(system).selfMember) {
Cluster(system).selfMember.status shouldEqual MemberStatus.Removed
}
}
},
8.seconds) // this timeout must be lower than coordinated shutdown timeout otherwise it could pass due to the timeout continuing with the cluster exit
// where as this is testing that shutdown happens right away when a cluster is in preparing to shutdown mode
enterBarrier("initial-singleton-removed")
// even tho the handover isn't completed the new oldest node will start it after a timeout
// make sure this isn't the case
runOn(second) {
echoProxy ! "hello"
expectNoMessage(5.seconds)
}
enterBarrier("no-singleton-running")
}
"last nodes should shut down" in {
runOn(second) {
Cluster(system).leave(address(second))
Cluster(system).leave(address(third))
}
awaitAssert({
withClue("self member: " + Cluster(system).selfMember) {
Cluster(system).selfMember.status shouldEqual Removed
}
}, 10.seconds)
enterBarrier("done")
}
}
}

View file

@ -150,6 +150,26 @@ object Leave {
*/ */
final case class Down(address: Address) extends ClusterCommand final case class Down(address: Address) extends ClusterCommand
/**
* Initiate a full cluster shutdown. This stops:
* - New members joining the cluster
* - New rebalances in Cluster Sharding
* - Singleton handovers
*
* However, it does not stop the nodes. That is expected to be signalled externally.
*
* Not for user extension
*/
@DoNotInherit sealed trait PrepareForFullClusterShutdown extends ClusterCommand
case object PrepareForFullClusterShutdown extends PrepareForFullClusterShutdown {
/**
* Java API
*/
def prepareForFullClusterShutdown(): PrepareForFullClusterShutdown = this
}
/** /**
* Akka Typed Cluster API entry point * Akka Typed Cluster API entry point
*/ */

View file

@ -14,6 +14,7 @@ import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.{ ClusterEvent, Member, MemberStatus } import akka.cluster.{ ClusterEvent, Member, MemberStatus }
import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.typed.PrepareForFullClusterShutdown
import akka.cluster.typed._ import akka.cluster.typed._
/** /**
@ -129,6 +130,10 @@ private[akka] object AdapterClusterImpl {
adaptedCluster.joinSeedNodes(addresses) adaptedCluster.joinSeedNodes(addresses)
Behaviors.same Behaviors.same
case PrepareForFullClusterShutdown =>
adaptedCluster.prepareForFullClusterShutdown()
Behaviors.same
} }
} }

View file

@ -67,6 +67,12 @@ public class BasicClusterExampleTest { // extends JUnitSuite {
// TODO wait for/verify cluster to form // TODO wait for/verify cluster to form
// #prepare
PrepareForFullClusterShutdown msg =
PrepareForFullClusterShutdown.prepareForFullClusterShutdown();
cluster2.manager().tell(msg);
// #prepare
// #cluster-leave // #cluster-leave
cluster2.manager().tell(Leave.create(cluster2.selfMember().address())); cluster2.manager().tell(Leave.create(cluster2.selfMember().address()));
// #cluster-leave // #cluster-leave

View file

@ -166,6 +166,14 @@ public final class ClusterMessages {
* <code>WeaklyUp = 6;</code> * <code>WeaklyUp = 6;</code>
*/ */
WeaklyUp(6), WeaklyUp(6),
/**
* <code>PreparingForShutdown = 7;</code>
*/
PreparingForShutdown(7),
/**
* <code>ReadyForShutdown = 8;</code>
*/
ReadyForShutdown(8),
; ;
/** /**
@ -196,6 +204,14 @@ public final class ClusterMessages {
* <code>WeaklyUp = 6;</code> * <code>WeaklyUp = 6;</code>
*/ */
public static final int WeaklyUp_VALUE = 6; public static final int WeaklyUp_VALUE = 6;
/**
* <code>PreparingForShutdown = 7;</code>
*/
public static final int PreparingForShutdown_VALUE = 7;
/**
* <code>ReadyForShutdown = 8;</code>
*/
public static final int ReadyForShutdown_VALUE = 8;
public final int getNumber() { public final int getNumber() {
@ -225,6 +241,8 @@ public final class ClusterMessages {
case 4: return Down; case 4: return Down;
case 5: return Removed; case 5: return Removed;
case 6: return WeaklyUp; case 6: return WeaklyUp;
case 7: return PreparingForShutdown;
case 8: return ReadyForShutdown;
default: return null; default: return null;
} }
} }
@ -22446,10 +22464,11 @@ public final class ClusterMessages {
"\002(\r\022\031\n\021allowLocalRoutees\030\003 \002(\010\022\017\n\007useRol" + "\002(\r\022\031\n\021allowLocalRoutees\030\003 \002(\010\022\017\n\007useRol" +
"e\030\004 \001(\t\022\020\n\010useRoles\030\005 \003(\t*D\n\022Reachabilit" + "e\030\004 \001(\t\022\020\n\010useRoles\030\005 \003(\t*D\n\022Reachabilit" +
"yStatus\022\r\n\tReachable\020\000\022\017\n\013Unreachable\020\001\022" + "yStatus\022\r\n\tReachable\020\000\022\017\n\013Unreachable\020\001\022" +
"\016\n\nTerminated\020\002*b\n\014MemberStatus\022\013\n\007Joini" + "\016\n\nTerminated\020\002*\222\001\n\014MemberStatus\022\013\n\007Join" +
"ng\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022\010" + "ing\020\000\022\006\n\002Up\020\001\022\013\n\007Leaving\020\002\022\013\n\007Exiting\020\003\022" +
"\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006B\035\n\031a" + "\010\n\004Down\020\004\022\013\n\007Removed\020\005\022\014\n\010WeaklyUp\020\006\022\030\n\024" +
"kka.cluster.protobuf.msgH\001" "PreparingForShutdown\020\007\022\024\n\020ReadyForShutdo" +
"wn\020\010B\035\n\031akka.cluster.protobuf.msgH\001"
}; };
descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData, .internalBuildGeneratedFileFrom(descriptorData,

View file

@ -198,6 +198,8 @@ enum MemberStatus {
Down = 4; Down = 4;
Removed = 5; Removed = 5;
WeaklyUp = 6; WeaklyUp = 6;
PreparingForShutdown = 7;
ReadyForShutdown = 8;
} }
/** /**

View file

@ -320,6 +320,13 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
clusterCore ! ClusterUserAction.JoinTo(fillLocal(address)) clusterCore ! ClusterUserAction.JoinTo(fillLocal(address))
} }
/**
* Change the state of every member in preparation for a full cluster shutdown.
*/
def prepareForFullClusterShutdown(): Unit = {
clusterCore ! ClusterUserAction.PrepareForShutdown
}
private def fillLocal(address: Address): Address = { private def fillLocal(address: Address): Address = {
// local address might be used if grabbed from actorRef.path.address // local address might be used if grabbed from actorRef.path.address
if (address.hasLocalScope && address.system == selfAddress.system) selfAddress if (address.hasLocalScope && address.system == selfAddress.system) selfAddress

View file

@ -61,6 +61,11 @@ private[cluster] object ClusterUserAction {
@SerialVersionUID(1L) @SerialVersionUID(1L)
final case class Down(address: Address) extends ClusterMessage final case class Down(address: Address) extends ClusterMessage
/**
* Command to mark all nodes as shutting down
*/
@SerialVersionUID(1L)
case object PrepareForShutdown extends ClusterMessage
} }
/** /**
@ -356,6 +361,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
var joinSeedNodesDeadline: Option[Deadline] = None var joinSeedNodesDeadline: Option[Deadline] = None
var leaderActionCounter = 0 var leaderActionCounter = 0
var selfDownCounter = 0 var selfDownCounter = 0
var preparingForShutdown = false
var exitingTasksInProgress = false var exitingTasksInProgress = false
val selfExiting = Promise[Done]() val selfExiting = Promise[Done]()
@ -555,6 +561,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
case Join(node, roles, appVersion) => joining(node, roles, appVersion) case Join(node, roles, appVersion) => joining(node, roles, appVersion)
case ClusterUserAction.Down(address) => downing(address) case ClusterUserAction.Down(address) => downing(address)
case ClusterUserAction.Leave(address) => leaving(address) case ClusterUserAction.Leave(address) => leaving(address)
case ClusterUserAction.PrepareForShutdown => startPrepareForShutdown()
case SendGossipTo(address) => sendGossipTo(address) case SendGossipTo(address) => sendGossipTo(address)
case msg: SubscriptionMessage => publisher.forward(msg) case msg: SubscriptionMessage => publisher.forward(msg)
case QuarantinedEvent(ua) => quarantined(UniqueAddress(ua)) case QuarantinedEvent(ua) => quarantined(UniqueAddress(ua))
@ -728,83 +735,90 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
* current gossip state, including the new joining member. * current gossip state, including the new joining member.
*/ */
def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: Version): Unit = { def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: Version): Unit = {
val selfStatus = latestGossip.member(selfUniqueAddress).status if (!preparingForShutdown) {
if (joiningNode.address.protocol != selfAddress.protocol) val selfStatus = latestGossip.member(selfUniqueAddress).status
logWarning( if (joiningNode.address.protocol != selfAddress.protocol)
"Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", logWarning(
selfAddress.protocol, "Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]",
joiningNode.address.protocol) selfAddress.protocol,
else if (joiningNode.address.system != selfAddress.system) joiningNode.address.protocol)
logWarning( else if (joiningNode.address.system != selfAddress.system)
"Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]", logWarning(
selfAddress.system, "Member with wrong ActorSystem name tried to join, but was ignored, expected [{}] but was [{}]",
joiningNode.address.system) selfAddress.system,
else if (removeUnreachableWithMemberStatus.contains(selfStatus)) joiningNode.address.system)
logInfo("Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.", joiningNode, selfStatus) else if (removeUnreachableWithMemberStatus.contains(selfStatus))
else { logInfo(
val localMembers = latestGossip.members "Trying to join [{}] to [{}] member, ignoring. Use a member that is Up instead.",
joiningNode,
selfStatus)
else {
val localMembers = latestGossip.members
// check by address without uid to make sure that node with same host:port is not allowed
// to join until previous node with that host:port has been removed from the cluster
localMembers.find(_.address == joiningNode.address) match {
case Some(m) if m.uniqueAddress == joiningNode =>
// node retried join attempt, probably due to lost Welcome message
logInfo("Existing member [{}] is joining again.", m)
if (joiningNode != selfUniqueAddress)
sender() ! Welcome(selfUniqueAddress, latestGossip)
case Some(m) =>
// node restarted, same host:port as existing member, but with different uid
// safe to down and later remove existing member
// new node will retry join
logInfo(
"New incarnation of existing member [{}] is trying to join. " +
"Existing will be removed from the cluster and then new member will be allowed to join.",
m)
if (m.status != Down) {
// we can confirm it as terminated/unreachable immediately
val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, m.uniqueAddress)
val newOverview = latestGossip.overview.copy(reachability = newReachability)
val newGossip = latestGossip.copy(overview = newOverview)
updateLatestGossip(newGossip)
downing(m.address)
}
case None =>
// remove the node from the failure detector
failureDetector.remove(joiningNode.address)
crossDcFailureDetector.remove(joiningNode.address)
// add joining node as Joining
// add self in case someone else joins before self has joined (Set discards duplicates)
val newMembers = localMembers + Member(joiningNode, roles, appVersion) + Member(
selfUniqueAddress,
cluster.selfRoles,
cluster.settings.AppVersion)
val newGossip = latestGossip.copy(members = newMembers)
// check by address without uid to make sure that node with same host:port is not allowed
// to join until previous node with that host:port has been removed from the cluster
localMembers.find(_.address == joiningNode.address) match {
case Some(m) if m.uniqueAddress == joiningNode =>
// node retried join attempt, probably due to lost Welcome message
logInfo("Existing member [{}] is joining again.", m)
if (joiningNode != selfUniqueAddress)
sender() ! Welcome(selfUniqueAddress, latestGossip)
case Some(m) =>
// node restarted, same host:port as existing member, but with different uid
// safe to down and later remove existing member
// new node will retry join
logInfo(
"New incarnation of existing member [{}] is trying to join. " +
"Existing will be removed from the cluster and then new member will be allowed to join.",
m)
if (m.status != Down) {
// we can confirm it as terminated/unreachable immediately
val newReachability = latestGossip.overview.reachability.terminated(selfUniqueAddress, m.uniqueAddress)
val newOverview = latestGossip.overview.copy(reachability = newReachability)
val newGossip = latestGossip.copy(overview = newOverview)
updateLatestGossip(newGossip) updateLatestGossip(newGossip)
downing(m.address) if (joiningNode == selfUniqueAddress) {
} logInfo(
case None => ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining),
// remove the node from the failure detector "Node [{}] is JOINING itself (with roles [{}], version [{}]) and forming new cluster",
failureDetector.remove(joiningNode.address) joiningNode.address,
crossDcFailureDetector.remove(joiningNode.address) roles.mkString(", "),
appVersion)
if (localMembers.isEmpty)
leaderActions() // important for deterministic oldest when bootstrapping
} else {
logInfo(
ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining),
"Node [{}] is JOINING, roles [{}], version [{}]",
joiningNode.address,
roles.mkString(", "),
appVersion)
sender() ! Welcome(selfUniqueAddress, latestGossip)
}
// add joining node as Joining publishMembershipState()
// add self in case someone else joins before self has joined (Set discards duplicates) }
val newMembers = localMembers + Member(joiningNode, roles, appVersion) + Member(
selfUniqueAddress,
cluster.selfRoles,
cluster.settings.AppVersion)
val newGossip = latestGossip.copy(members = newMembers)
updateLatestGossip(newGossip)
if (joiningNode == selfUniqueAddress) {
logInfo(
ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining),
"Node [{}] is JOINING itself (with roles [{}], version [{}]) and forming new cluster",
joiningNode.address,
roles.mkString(", "),
appVersion)
if (localMembers.isEmpty)
leaderActions() // important for deterministic oldest when bootstrapping
} else {
logInfo(
ClusterLogMarker.memberChanged(joiningNode, MemberStatus.Joining),
"Node [{}] is JOINING, roles [{}], version [{}]",
joiningNode.address,
roles.mkString(", "),
appVersion)
sender() ! Welcome(selfUniqueAddress, latestGossip)
}
publishMembershipState()
} }
} else {
logInfo("Ignoring join request from [{}] as preparing for shutdown", joiningNode)
} }
} }
@ -826,6 +840,27 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
} }
} }
def startPrepareForShutdown(): Unit = {
preparingForShutdown = true
if (!preparingForShutdown) {
val changedMembers = latestGossip.members.collect {
case m if MembershipState.allowedToPrepareToShutdown(m.status) =>
m.copy(status = PreparingForShutdown)
}
val newGossip = latestGossip.update(changedMembers)
updateLatestGossip(newGossip)
changedMembers.foreach { member =>
logInfo(
ClusterLogMarker.memberChanged(member.uniqueAddress, MemberStatus.PreparingForShutdown),
"Preparing for shutdown [{}] as [{}]",
member.address,
PreparingForShutdown)
}
publishMembershipState()
gossip()
}
}
/** /**
* State transition to LEAVING. * State transition to LEAVING.
* The node will eventually be removed by the leader, after hand-off in EXITING, and only after * The node will eventually be removed by the leader, after hand-off in EXITING, and only after
@ -834,7 +869,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
def leaving(address: Address): Unit = { def leaving(address: Address): Unit = {
// only try to update if the node is available (in the member ring) // only try to update if the node is available (in the member ring)
latestGossip.members.find(_.address == address).foreach { existingMember => latestGossip.members.find(_.address == address).foreach { existingMember =>
if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember.status == Up) { if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember.status == Up || existingMember.status == PreparingForShutdown || existingMember.status == ReadyForShutdown) {
// mark node as LEAVING // mark node as LEAVING
val newMembers = latestGossip.members - existingMember + existingMember.copy(status = Leaving) val newMembers = latestGossip.members - existingMember + existingMember.copy(status = Leaving)
val newGossip = latestGossip.copy(members = newMembers) val newGossip = latestGossip.copy(members = newMembers)
@ -1211,7 +1246,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
} else { } else {
leaderActionCounter += 1 leaderActionCounter += 1
import cluster.settings.{ AllowWeaklyUpMembers, LeaderActionsInterval, WeaklyUpAfter } import cluster.settings.{ AllowWeaklyUpMembers, LeaderActionsInterval, WeaklyUpAfter }
if (AllowWeaklyUpMembers && LeaderActionsInterval * leaderActionCounter >= WeaklyUpAfter) if (AllowWeaklyUpMembers && LeaderActionsInterval * leaderActionCounter >= WeaklyUpAfter && !preparingForShutdown)
moveJoiningToWeaklyUp() moveJoiningToWeaklyUp()
if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0) if (leaderActionCounter == firstNotice || leaderActionCounter % periodicNotice == 0)
@ -1231,9 +1266,18 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
isCurrentlyLeader = false isCurrentlyLeader = false
} }
cleanupExitingConfirmed() cleanupExitingConfirmed()
checkForPrepareForShutdown()
shutdownSelfWhenDown() shutdownSelfWhenDown()
} }
def checkForPrepareForShutdown(): Unit = {
if (MembershipState.allowedToPrepareToShutdown(latestGossip.member(selfUniqueAddress).status) && latestGossip.members
.exists(m => MembershipState.prepareForShutdownStates(m.status))) {
logDebug("Detected full cluster shutdown")
self ! ClusterUserAction.PrepareForShutdown
}
}
def shutdownSelfWhenDown(): Unit = { def shutdownSelfWhenDown(): Unit = {
if (latestGossip.member(selfUniqueAddress).status == Down) { if (latestGossip.member(selfUniqueAddress).status == Down) {
// When all reachable have seen the state this member will shutdown itself when it has // When all reachable have seen the state this member will shutdown itself when it has
@ -1290,7 +1334,7 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
val removedOtherDc = val removedOtherDc =
if (latestGossip.isMultiDc) { if (latestGossip.isMultiDc) {
latestGossip.members.filter { m => latestGossip.members.filter { m =>
(m.dataCenter != selfDc && removeUnreachableWithMemberStatus(m.status)) m.dataCenter != selfDc && removeUnreachableWithMemberStatus(m.status)
} }
} else } else
Set.empty[Member] Set.empty[Member]
@ -1303,9 +1347,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
var upNumber = 0 var upNumber = 0
{ {
case m if m.dataCenter == selfDc && isJoiningToUp(m) => case m if m.dataCenter == selfDc && isJoiningToUp(m) && !preparingForShutdown =>
// Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence) // Move JOINING => UP (once all nodes have seen that this node is JOINING, i.e. we have a convergence)
// and minimum number of nodes have joined the cluster // and minimum number of nodes have joined the cluster
// don't move members to up when preparing for shutdown
if (upNumber == 0) { if (upNumber == 0) {
// It is alright to use same upNumber as already used by a removed member, since the upNumber // It is alright to use same upNumber as already used by a removed member, since the upNumber
// is only used for comparing age of current cluster members (Member.isOlderThan) // is only used for comparing age of current cluster members (Member.isOlderThan)
@ -1319,6 +1364,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
case m if m.dataCenter == selfDc && m.status == Leaving => case m if m.dataCenter == selfDc && m.status == Leaving =>
// Move LEAVING => EXITING (once we have a convergence on LEAVING) // Move LEAVING => EXITING (once we have a convergence on LEAVING)
m.copy(status = Exiting) m.copy(status = Exiting)
case m if m.dataCenter == selfDc & m.status == PreparingForShutdown =>
// Move PreparingForShutdown => ReadyForShutdown (once we have a convergence on PreparingForShutdown)
m.copy(status = ReadyForShutdown)
} }
} }
} }

View file

@ -318,6 +318,16 @@ object ClusterEvent {
if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member) if (member.status != Leaving) throw new IllegalArgumentException("Expected Leaving status, got: " + member)
} }
final case class MemberPreparingForShutdown(member: Member) extends MemberEvent {
if (member.status != PreparingForShutdown)
throw new IllegalArgumentException("Expected PreparingForShutdown status, got: " + member)
}
final case class MemberReadyForShutdown(member: Member) extends MemberEvent {
if (member.status != ReadyForShutdown)
throw new IllegalArgumentException("Expected ReadyForShutdown status, got: " + member)
}
/** /**
* Member status changed to `MemberStatus.Exiting` and will be removed * Member status changed to `MemberStatus.Exiting` and will be removed
* when all members have seen the `Exiting` status. * when all members have seen the `Exiting` status.
@ -554,12 +564,14 @@ object ClusterEvent {
newMember newMember
} }
val memberEvents = (newMembers ++ changedMembers).unsorted.collect { val memberEvents = (newMembers ++ changedMembers).unsorted.collect {
case m if m.status == Joining => MemberJoined(m) case m if m.status == Joining => MemberJoined(m)
case m if m.status == WeaklyUp => MemberWeaklyUp(m) case m if m.status == WeaklyUp => MemberWeaklyUp(m)
case m if m.status == Up => MemberUp(m) case m if m.status == Up => MemberUp(m)
case m if m.status == Leaving => MemberLeft(m) case m if m.status == Leaving => MemberLeft(m)
case m if m.status == Exiting => MemberExited(m) case m if m.status == Exiting => MemberExited(m)
case m if m.status == Down => MemberDowned(m) case m if m.status == Down => MemberDowned(m)
case m if m.status == PreparingForShutdown => MemberPreparingForShutdown(m)
case m if m.status == ReadyForShutdown => MemberReadyForShutdown(m)
// no events for other transitions // no events for other transitions
} }

View file

@ -195,6 +195,7 @@ object Member {
/** /**
* Picks the Member with the highest "priority" MemberStatus. * Picks the Member with the highest "priority" MemberStatus.
* Where highest priority is furthest along the membership state machine
*/ */
def highestPriorityOf(m1: Member, m2: Member): Member = { def highestPriorityOf(m1: Member, m2: Member): Member = {
if (m1.status == m2.status) if (m1.status == m2.status)
@ -202,19 +203,23 @@ object Member {
if (m1.isOlderThan(m2)) m1 else m2 if (m1.isOlderThan(m2)) m1 else m2
else else
(m1.status, m2.status) match { (m1.status, m2.status) match {
case (Removed, _) => m1 case (Removed, _) => m1
case (_, Removed) => m2 case (_, Removed) => m2
case (Down, _) => m1 case (ReadyForShutdown, _) => m1
case (_, Down) => m2 case (_, ReadyForShutdown) => m2
case (Exiting, _) => m1 case (Down, _) => m1
case (_, Exiting) => m2 case (_, Down) => m2
case (Leaving, _) => m1 case (Exiting, _) => m1
case (_, Leaving) => m2 case (_, Exiting) => m2
case (Joining, _) => m2 case (Leaving, _) => m1
case (_, Joining) => m1 case (_, Leaving) => m2
case (WeaklyUp, _) => m2 case (Joining, _) => m2
case (_, WeaklyUp) => m1 case (_, Joining) => m1
case (Up, Up) => m1 case (WeaklyUp, _) => m2
case (_, WeaklyUp) => m1
case (PreparingForShutdown, _) => m1
case (_, PreparingForShutdown) => m2
case (Up, Up) => m1
} }
} }
@ -235,6 +240,8 @@ object MemberStatus {
@SerialVersionUID(1L) case object Exiting extends MemberStatus @SerialVersionUID(1L) case object Exiting extends MemberStatus
@SerialVersionUID(1L) case object Down extends MemberStatus @SerialVersionUID(1L) case object Down extends MemberStatus
@SerialVersionUID(1L) case object Removed extends MemberStatus @SerialVersionUID(1L) case object Removed extends MemberStatus
@SerialVersionUID(1L) case object PreparingForShutdown extends MemberStatus
@SerialVersionUID(1L) case object ReadyForShutdown extends MemberStatus
/** /**
* Java API: retrieve the `Joining` status singleton * Java API: retrieve the `Joining` status singleton
@ -271,6 +278,16 @@ object MemberStatus {
*/ */
def removed: MemberStatus = Removed def removed: MemberStatus = Removed
/**
* Java API: retrieve the `ShuttingDown` status singleton
*/
def shuttingDown: MemberStatus = PreparingForShutdown
/**
* Java API: retrieve the `ShutDown` status singleton
*/
def shutDown: MemberStatus = ReadyForShutdown
/** /**
* INTERNAL API * INTERNAL API
*/ */
@ -278,10 +295,12 @@ object MemberStatus {
Map( Map(
Joining -> Set(WeaklyUp, Up, Leaving, Down, Removed), Joining -> Set(WeaklyUp, Up, Leaving, Down, Removed),
WeaklyUp -> Set(Up, Leaving, Down, Removed), WeaklyUp -> Set(Up, Leaving, Down, Removed),
Up -> Set(Leaving, Down, Removed), Up -> Set(Leaving, Down, Removed, PreparingForShutdown),
Leaving -> Set(Exiting, Down, Removed), Leaving -> Set(Exiting, Down, Removed),
Down -> Set(Removed), Down -> Set(Removed),
Exiting -> Set(Removed, Down), Exiting -> Set(Removed, Down),
PreparingForShutdown -> Set(ReadyForShutdown, Removed, Leaving, Down),
ReadyForShutdown -> Set(Removed, Leaving, Down),
Removed -> Set.empty[MemberStatus]) Removed -> Set.empty[MemberStatus])
} }

View file

@ -22,10 +22,13 @@ import akka.util.ccompat._
@ccompatUsedUntil213 @ccompatUsedUntil213
@InternalApi private[akka] object MembershipState { @InternalApi private[akka] object MembershipState {
import MemberStatus._ import MemberStatus._
private val leaderMemberStatus = Set[MemberStatus](Up, Leaving) private val leaderMemberStatus = Set[MemberStatus](Up, Leaving, PreparingForShutdown, ReadyForShutdown)
private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving) private val convergenceMemberStatus = Set[MemberStatus](Up, Leaving, PreparingForShutdown, ReadyForShutdown)
val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) val convergenceSkipUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting)
val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting) val removeUnreachableWithMemberStatus = Set[MemberStatus](Down, Exiting)
// If a member hasn't join yet or has already started leaving don't mark it as shutting down
val allowedToPrepareToShutdown = Set[MemberStatus](Up)
val prepareForShutdownStates = Set[MemberStatus](PreparingForShutdown, ReadyForShutdown)
} }
/** /**

View file

@ -54,6 +54,7 @@ private[akka] object ClusterMessageSerializer {
val WelcomeManifest = "W" val WelcomeManifest = "W"
val LeaveManifest = "L" val LeaveManifest = "L"
val DownManifest = "D" val DownManifest = "D"
val PrepareForShutdownManifest = "PS"
val InitJoinManifest = "IJ" val InitJoinManifest = "IJ"
val InitJoinAckManifest = "IJA" val InitJoinAckManifest = "IJA"
val InitJoinNackManifest = "IJN" val InitJoinNackManifest = "IJN"
@ -93,6 +94,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
case _: GossipStatus => GossipStatusManifest case _: GossipStatus => GossipStatusManifest
case _: GossipEnvelope => GossipEnvelopeManifest case _: GossipEnvelope => GossipEnvelopeManifest
case _: ClusterRouterPool => ClusterRouterPoolManifest case _: ClusterRouterPool => ClusterRouterPoolManifest
case ClusterUserAction.PrepareForShutdown => PrepareForShutdownManifest
case _ => case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
} }
@ -372,7 +374,9 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
MemberStatus.Exiting -> cm.MemberStatus.Exiting_VALUE, MemberStatus.Exiting -> cm.MemberStatus.Exiting_VALUE,
MemberStatus.Down -> cm.MemberStatus.Down_VALUE, MemberStatus.Down -> cm.MemberStatus.Down_VALUE,
MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE, MemberStatus.Removed -> cm.MemberStatus.Removed_VALUE,
MemberStatus.WeaklyUp -> cm.MemberStatus.WeaklyUp_VALUE) MemberStatus.WeaklyUp -> cm.MemberStatus.WeaklyUp_VALUE,
MemberStatus.PreparingForShutdown -> cm.MemberStatus.PreparingForShutdown_VALUE,
MemberStatus.ReadyForShutdown -> cm.MemberStatus.ReadyForShutdown_VALUE)
private val memberStatusFromInt = memberStatusToInt.map { case (a, b) => (b, a) } private val memberStatusFromInt = memberStatusToInt.map { case (a, b) => (b, a) }

View file

@ -0,0 +1,105 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster
import akka.cluster.MemberStatus.Removed
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.util.ccompat._
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.Eventually
import scala.concurrent.duration._
object ClusterShutdownSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val forth = role("forth")
commonConfig(debugConfig(on = false).withFallback(ConfigFactory.parseString("""
# important config
""").withFallback(MultiNodeClusterSpec.clusterConfig)))
}
class ClusterShutdownSpecMultiJvmNode1 extends ClusterShutdownSpec
class ClusterShutdownSpecMultiJvmNode2 extends ClusterShutdownSpec
class ClusterShutdownSpecMultiJvmNode3 extends ClusterShutdownSpec
class ClusterShutdownSpecMultiJvmNode4 extends ClusterShutdownSpec
@ccompatUsedUntil213
abstract class ClusterShutdownSpec
extends MultiNodeSpec(ClusterShutdownSpec)
with MultiNodeClusterSpec
with Eventually {
import ClusterShutdownSpec._
"Cluster shutdown" should {
"form cluster" in {
awaitClusterUp(first, second, third)
}
enterBarrier("cluster-up")
"prepare for shutdown" in {
runOn(first) {
Cluster(system).prepareForFullClusterShutdown()
}
runOn(first, second, third) {
awaitAssert({
withClue("members: " + Cluster(system).readView.members) {
Cluster(system).selfMember.status shouldEqual MemberStatus.ReadyForShutdown
}
}, 10.seconds)
}
}
"spread around the cluster" in {
runOn(first, second, third) {
awaitAssert {
Cluster(system).readView.members.unsorted.map(_.status) shouldEqual Set(MemberStatus.ReadyForShutdown)
}
}
enterBarrier("propagation finished")
}
"not allow new members to join" in {
runOn(forth) {
cluster.join(address(first))
Thread.sleep(3000)
// should not be allowed to join the cluster even after some time
awaitAssert {
cluster.selfMember.status shouldBe MemberStatus.Removed
}
}
enterBarrier("not-allowed-to-join")
}
"be allowed to leave" in {
runOn(first) {
Cluster(system).leave(address(first))
}
awaitAssert({
withClue("members: " + Cluster(system).readView.members) {
runOn(second, third) {
Cluster(system).readView.members.size shouldEqual 2
}
runOn(first) {
Cluster(system).selfMember.status shouldEqual Removed
}
}
}, 10.seconds)
enterBarrier("first-gone")
runOn(second) {
Cluster(system).leave(address(second))
Cluster(system).leave(address(third))
}
awaitAssert({
withClue("self member: " + Cluster(system).selfMember) {
Cluster(system).selfMember.status shouldEqual Removed
}
}, 10.seconds)
enterBarrier("all-gone")
}
}
}

View file

@ -38,6 +38,8 @@ merged and converge to the same end result.
* **weakly up** - transient state while network split (only if `akka.cluster.allow-weakly-up-members=on`) * **weakly up** - transient state while network split (only if `akka.cluster.allow-weakly-up-members=on`)
* **up** - normal operating state * **up** - normal operating state
* **preparing for shutdown** / **ready for shutdown** - an optional state that can be moved to before doing a full cluster shut down
* **leaving** / **exiting** - states during graceful removal * **leaving** / **exiting** - states during graceful removal
@ -58,6 +60,8 @@ Note that the node might already have been shutdown when this event is published
of at least one other node. of at least one other node.
* `ClusterEvent.ReachableMember` - A member is considered as reachable again, after having been unreachable. * `ClusterEvent.ReachableMember` - A member is considered as reachable again, after having been unreachable.
All nodes that previously detected it as unreachable has detected it as reachable again. All nodes that previously detected it as unreachable has detected it as reachable again.
* `ClusterEvent.MemberPreparingForShutdown` - A member is preparing for a full cluster shutdown
* `ClusterEvent.MemberReadyForShutdown` - A member is ready for a full cluster shutdown
## Membership Lifecycle ## Membership Lifecycle
@ -126,6 +130,27 @@ that are in this state, but you should be aware of that members on the other
side of a network partition have no knowledge about the existence of the side of a network partition have no knowledge about the existence of the
new members. You should for example not count `WeaklyUp` members in quorum decisions. new members. You should for example not count `WeaklyUp` members in quorum decisions.
## Full cluster shutdown
In some rare cases it may be desirable to do a full cluster shutdown rather than a rolling deploy.
For example, a protocol change where it is simpler to restart the cluster than to make the protocol change
backward compatible.
As of Akka `2.6.13` it can be signalled that a full cluster shutdown is about to happen and any expensive actions such as:
* Cluster sharding rebalances
* Moving of Cluster singletons
Won't happen. That way the shutdown will be as quick as possible and a new version can be started up without delay.
If a cluster isn't to be restarted right away then there is no need to prepare it for shutdown.
To use this feature use `Cluster(system).prepareForFullClusterShutdown()` in classic or @apidoc[PrepareForFullClusterShutdown] in typed.
Wait for all `Up` members to become `ReadyForShutdown` and then all nodes can be shutdown and restarted.
Members that aren't `Up` yet will remain in the `Joining` or `WeaklyUp` states. Any node that is already leaving
the cluster i.e. in the `Leaving` or `Exiting` states will continue to leave the cluster via the normal path.
## State Diagrams ## State Diagrams
### State Diagram for the Member States ### State Diagram for the Member States