Make a distinction between hand-offs for rebalance and region shutdown (#29579)

This commit is contained in:
Renato Cavalcanti 2020-09-29 12:54:54 +02:00 committed by GitHub
parent 294c534d15
commit 289f665445
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 168 additions and 75 deletions

View file

@ -0,0 +1,6 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator#RebalanceWorker.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceWorkerProps")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardCoordinator#RebalanceWorker.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceWorkerProps")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.requestShardBufferHomes")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.sendGracefulShutdownToCoordinator")

View file

@ -30,8 +30,7 @@ import akka.cluster.sharding.internal.{
RememberEntitiesProvider RememberEntitiesProvider
} }
import akka.dispatch.ExecutionContexts import akka.dispatch.ExecutionContexts
import akka.event.BusLogging import akka.event.{ BusLogging, Logging }
import akka.event.Logging
import akka.pattern.{ pipe, AskTimeoutException } import akka.pattern.{ pipe, AskTimeoutException }
import akka.persistence._ import akka.persistence._
import akka.util.PrettyDuration._ import akka.util.PrettyDuration._
@ -130,9 +129,10 @@ object ShardCoordinator {
/** /**
* Invoked when the location of a new shard is to be decided. * Invoked when the location of a new shard is to be decided.
* @param requester actor reference to the [[ShardRegion]] that requested the location of the *
* shard, can be returned if preference should be given to the node where the shard was first accessed * @param requester actor reference to the [[ShardRegion]] that requested the location of the
* @param shardId the id of the shard to allocate * shard, can be returned if preference should be given to the node where the shard was first accessed
* @param shardId the id of the shard to allocate
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
* in the order they were allocated * in the order they were allocated
* @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of * @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
@ -145,10 +145,11 @@ object ShardCoordinator {
/** /**
* Invoked periodically to decide which shards to rebalance to another location. * Invoked periodically to decide which shards to rebalance to another location.
*
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
* in the order they were allocated * in the order they were allocated
* @param rebalanceInProgress set of shards that are currently being rebalanced, i.e. * @param rebalanceInProgress set of shards that are currently being rebalanced, i.e.
* you should not include these in the returned set * you should not include these in the returned set
* @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round * @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round
*/ */
def rebalance( def rebalance(
@ -214,9 +215,10 @@ object ShardCoordinator {
/** /**
* Invoked when the location of a new shard is to be decided. * Invoked when the location of a new shard is to be decided.
* @param requester actor reference to the [[ShardRegion]] that requested the location of the *
* shard, can be returned if preference should be given to the node where the shard was first accessed * @param requester actor reference to the [[ShardRegion]] that requested the location of the
* @param shardId the id of the shard to allocate * shard, can be returned if preference should be given to the node where the shard was first accessed
* @param shardId the id of the shard to allocate
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
* in the order they were allocated * in the order they were allocated
* @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of * @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of
@ -229,10 +231,11 @@ object ShardCoordinator {
/** /**
* Invoked periodically to decide which shards to rebalance to another location. * Invoked periodically to decide which shards to rebalance to another location.
*
* @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards,
* in the order they were allocated * in the order they were allocated
* @param rebalanceInProgress set of shards that are currently being rebalanced, i.e. * @param rebalanceInProgress set of shards that are currently being rebalanced, i.e.
* you should not include these in the returned set * you should not include these in the returned set
* @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round * @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round
*/ */
def rebalance( def rebalance(
@ -539,10 +542,12 @@ object ShardCoordinator {
shard: String, shard: String,
shardRegionFrom: ActorRef, shardRegionFrom: ActorRef,
handOffTimeout: FiniteDuration, handOffTimeout: FiniteDuration,
regions: Set[ActorRef]) regions: Set[ActorRef],
isRebalance: Boolean)
extends Actor extends Actor
with ActorLogging with ActorLogging
with Timers { with Timers {
import Internal._ import Internal._
regions.foreach { region => regions.foreach { region =>
@ -550,7 +555,14 @@ object ShardCoordinator {
} }
var remaining = regions var remaining = regions
log.debug("Rebalance [{}] from [{}] regions", regions.size, shard) if (isRebalance)
log.debug("Rebalance [{}] from [{}] regions", shard, regions.size)
else
log.debug(
"Shutting down shard [{}] from region [{}]. Asking [{}] region(s) to hand-off shard",
shard,
shardRegionFrom,
regions.size)
timers.startSingleTimer("hand-off-timeout", ReceiveTimeout, handOffTimeout) timers.startSingleTimer("hand-off-timeout", ReceiveTimeout, handOffTimeout)
@ -559,10 +571,16 @@ object ShardCoordinator {
log.debug("BeginHandOffAck for shard [{}] received from [{}].", shard, sender()) log.debug("BeginHandOffAck for shard [{}] received from [{}].", shard, sender())
acked(sender()) acked(sender())
case RebalanceWorker.ShardRegionTerminated(shardRegion) => case RebalanceWorker.ShardRegionTerminated(shardRegion) =>
log.debug("ShardRegion [{}] terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard) if (remaining.contains(shardRegion)) {
acked(shardRegion) log.debug("ShardRegion [{}] terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard)
acked(shardRegion)
}
case ReceiveTimeout => case ReceiveTimeout =>
log.debug("Rebalance of shard [{}] from [{}] timed out", shard, shardRegionFrom) if (isRebalance)
log.debug("Rebalance of [{}] from [{}] timed out", shard, shardRegionFrom)
else
log.debug("Shutting down [{}] shard from [{}] timed out", shard, shardRegionFrom)
done(ok = false) done(ok = false)
} }
@ -595,9 +613,11 @@ object ShardCoordinator {
shard: String, shard: String,
shardRegionFrom: ActorRef, shardRegionFrom: ActorRef,
handOffTimeout: FiniteDuration, handOffTimeout: FiniteDuration,
regions: Set[ActorRef]): Props = { regions: Set[ActorRef],
Props(new RebalanceWorker(shard, shardRegionFrom, handOffTimeout, regions)) isRebalance: Boolean): Props = {
Props(new RebalanceWorker(shard, shardRegionFrom, handOffTimeout, regions, isRebalance))
} }
} }
/** /**
@ -609,6 +629,7 @@ abstract class ShardCoordinator(
settings: ClusterShardingSettings, settings: ClusterShardingSettings,
allocationStrategy: ShardCoordinator.ShardAllocationStrategy) allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends Actor { extends Actor {
import ShardCoordinator._ import ShardCoordinator._
import ShardCoordinator.Internal._ import ShardCoordinator.Internal._
import ShardRegion.ShardId import ShardRegion.ShardId
@ -639,6 +660,7 @@ abstract class ShardCoordinator(
var regionTerminationInProgress = Set.empty[ActorRef] var regionTerminationInProgress = Set.empty[ActorRef]
import context.dispatcher import context.dispatcher
val rebalanceTask = val rebalanceTask =
context.system.scheduler.scheduleWithFixedDelay(rebalanceInterval, rebalanceInterval, self, RebalanceTick) context.system.scheduler.scheduleWithFixedDelay(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
@ -775,26 +797,32 @@ abstract class ShardCoordinator(
case RebalanceDone(shard, ok) => case RebalanceDone(shard, ok) =>
rebalanceWorkers -= sender() rebalanceWorkers -= sender()
if (ok)
log.debug("Rebalance shard [{}] completed successfully.", shard) if (ok) {
else log.debug("Shard [{}] deallocation completed successfully.", shard)
log.warning("Rebalance shard [{}] didn't complete within [{}].", shard, handOffTimeout.pretty)
// The shard could have been removed by ShardRegionTerminated // The shard could have been removed by ShardRegionTerminated
if (state.shards.contains(shard)) { if (state.shards.contains(shard)) {
if (ok) {
update(ShardHomeDeallocated(shard)) { evt => update(ShardHomeDeallocated(shard)) { evt =>
log.debug("Shard [{}] deallocated after rebalance", shard) log.debug("Shard [{}] deallocated after", shard)
state = state.updated(evt) state = state.updated(evt)
clearRebalanceInProgress(shard) clearRebalanceInProgress(shard)
allocateShardHomesForRememberEntities() allocateShardHomesForRememberEntities()
self.tell(GetShardHome(shard), ignoreRef) self.tell(GetShardHome(shard), ignoreRef)
} }
} else { } else {
// rebalance not completed, graceful shutdown will be retried
gracefulShutdownInProgress -= state.shards(shard)
clearRebalanceInProgress(shard) clearRebalanceInProgress(shard)
} }
} else { } else {
log.warning("Shard [{}] deallocation didn't complete within [{}].", shard, handOffTimeout.pretty)
// was that due to a graceful region shutdown?
// if so, consider the region as still alive and let it retry to gracefully shutdown later
state.shards.get(shard).foreach { region =>
gracefulShutdownInProgress -= region
}
clearRebalanceInProgress(shard) clearRebalanceInProgress(shard)
} }
@ -813,7 +841,8 @@ abstract class ShardCoordinator(
log.debug("Graceful shutdown of region [{}] with [{}] shards", region, shards.size) log.debug("Graceful shutdown of region [{}] with [{}] shards", region, shards.size)
} }
gracefulShutdownInProgress += region gracefulShutdownInProgress += region
continueRebalance(shards.toSet) shutdownShards(region, shards.toSet)
case None => case None =>
log.debug("Unknown region requested graceful shutdown [{}]", region) log.debug("Unknown region requested graceful shutdown [{}]", region)
} }
@ -1039,7 +1068,7 @@ abstract class ShardCoordinator(
state.shards.get(shard) match { state.shards.get(shard) match {
case Some(ref) => getShardHomeSender ! ShardHome(shard, ref) case Some(ref) => getShardHomeSender ! ShardHome(shard, ref)
case None => case None =>
if (state.regions.contains(region) && !gracefulShutdownInProgress.contains(region) && !regionTerminationInProgress if (state.regions.contains(region) && !gracefulShutdownInProgress(region) && !regionTerminationInProgress
.contains(region)) { .contains(region)) {
update(ShardHomeAllocated(shard, region)) { evt => update(ShardHomeAllocated(shard, region)) { evt =>
state = state.updated(evt) state = state.updated(evt)
@ -1073,6 +1102,27 @@ abstract class ShardCoordinator(
else region.path.address else region.path.address
} }
/**
* Start a RebalanceWorker to manage the shard rebalance.
* Does nothing if the shard is already in the process of being rebalanced.
*/
private def startShardRebalanceIfNeeded(
shard: String,
from: ActorRef,
handOffTimeout: FiniteDuration,
isRebalance: Boolean): Unit = {
if (!rebalanceInProgress.contains(shard)) {
rebalanceInProgress = rebalanceInProgress.updated(shard, Set.empty)
rebalanceWorkers += context.actorOf(
rebalanceWorkerProps(
shard,
from,
handOffTimeout,
state.regions.keySet.union(state.regionProxies),
isRebalance = isRebalance).withDispatcher(context.props.dispatcher))
}
}
def continueRebalance(shards: Set[ShardId]): Unit = { def continueRebalance(shards: Set[ShardId]): Unit = {
if ((log: BusLogging).isInfoEnabled && (shards.nonEmpty || rebalanceInProgress.nonEmpty)) { if ((log: BusLogging).isInfoEnabled && (shards.nonEmpty || rebalanceInProgress.nonEmpty)) {
log.info( log.info(
@ -1081,25 +1131,27 @@ abstract class ShardCoordinator(
rebalanceInProgress.keySet.mkString(",")) rebalanceInProgress.keySet.mkString(","))
} }
shards.foreach { shard => shards.foreach { shard =>
// optimisation: check if not already in progress before fetching region
if (!rebalanceInProgress.contains(shard)) { if (!rebalanceInProgress.contains(shard)) {
state.shards.get(shard) match { state.shards.get(shard) match {
case Some(rebalanceFromRegion) => case Some(rebalanceFromRegion) =>
rebalanceInProgress = rebalanceInProgress.updated(shard, Set.empty)
log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
rebalanceWorkers += context.actorOf( startShardRebalanceIfNeeded(shard, rebalanceFromRegion, handOffTimeout, isRebalance = true)
rebalanceWorkerProps(
shard,
rebalanceFromRegion,
handOffTimeout,
state.regions.keySet.union(state.regionProxies)).withDispatcher(context.props.dispatcher))
case None => case None =>
log.debug("Rebalance of non-existing shard [{}] is ignored", shard) log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
} }
} }
} }
} }
def shutdownShards(shuttingDownRegion: ActorRef, shards: Set[ShardId]): Unit = {
if ((log: BusLogging).isInfoEnabled && (shards.nonEmpty)) {
log.info("Starting shutting down shards [{}] due to region shutting down.", shards.mkString(","))
}
shards.foreach { shard =>
startShardRebalanceIfNeeded(shard, shuttingDownRegion, handOffTimeout, isRebalance = false)
}
}
} }
/** /**
@ -1117,6 +1169,7 @@ class PersistentShardCoordinator(
allocationStrategy: ShardCoordinator.ShardAllocationStrategy) allocationStrategy: ShardCoordinator.ShardAllocationStrategy)
extends ShardCoordinator(settings, allocationStrategy) extends ShardCoordinator(settings, allocationStrategy)
with PersistentActor { with PersistentActor {
import ShardCoordinator.Internal._ import ShardCoordinator.Internal._
import settings.tuningParameters._ import settings.tuningParameters._
@ -1229,9 +1282,13 @@ class PersistentShardCoordinator(
*/ */
@InternalApi @InternalApi
private[akka] object DDataShardCoordinator { private[akka] object DDataShardCoordinator {
private case object RememberEntitiesStoreStopped private case object RememberEntitiesStoreStopped
private case class RememberEntitiesTimeout(shardId: ShardId) private case class RememberEntitiesTimeout(shardId: ShardId)
private case object RememberEntitiesLoadTimeout private case object RememberEntitiesLoadTimeout
private val RememberEntitiesTimeoutKey = "RememberEntityTimeout" private val RememberEntitiesTimeoutKey = "RememberEntityTimeout"
} }

View file

@ -441,8 +441,23 @@ object ShardRegion {
@InternalApi @InternalApi
private[akka] final class ShardRegionStatus(val typeName: String, val registeredWithCoordinator: Boolean) private[akka] final class ShardRegionStatus(val typeName: String, val registeredWithCoordinator: Boolean)
/**
* Periodic tick to run some house-keeping.
* This message is continuously sent to `self` using a timer configured with `retryInterval`.
*/
private case object Retry extends ShardRegionCommand private case object Retry extends ShardRegionCommand
/**
* Similar to Retry but used only when ShardRegion is starting and when we detect that
* the coordinator is moving.
*
* This is to ensure that a ShardRegion can register as soon as possible while the
* ShardCoordinator is in the process of recovering its state.
*
* This message is sent to `self` using a interval lower then [[Retry]] (higher frequency).
* The interval increases exponentially until it equals `retryInterval` in which case
* we stop to schedule it and let [[Retry]] take over.
*/
private case object RegisterRetry extends ShardRegionCommand private case object RegisterRetry extends ShardRegionCommand
/** /**
@ -755,7 +770,7 @@ private[akka] class ShardRegion(
context.watch(coord) context.watch(coord)
coordinator = Some(coord) coordinator = Some(coord)
finishRegistration() finishRegistration()
requestShardBufferHomes() tryRequestShardBufferHomes()
case BeginHandOff(shard) => case BeginHandOff(shard) =>
log.debug("{}: BeginHandOff shard [{}]", typeName, shard) log.debug("{}: BeginHandOff shard [{}]", typeName, shard)
@ -796,18 +811,26 @@ private[akka] class ShardRegion(
} }
def receiveCommand(cmd: ShardRegionCommand): Unit = cmd match { def receiveCommand(cmd: ShardRegionCommand): Unit = cmd match {
case Retry =>
sendGracefulShutdownToCoordinator()
if (shardBuffers.nonEmpty) case Retry =>
retryCount += 1 // retryCount is used to avoid flooding the logs
if (coordinator.isEmpty) // it's used inside register() whenever shardBuffers.nonEmpty
register() // therefore we update it if needed on each Retry msg
// the reason why it's updated here is because we don't want to increase it on each RegisterRetry, only on Retry
if (shardBuffers.nonEmpty) retryCount += 1
// we depend on the coordinator each time, if empty we need to register
// otherwise we can try to deliver some buffered messages
if (coordinator.isEmpty) register()
else { else {
requestShardBufferHomes() // Note: we do try to deliver buffered messages even in the middle of
// a graceful shutdown every message that we manage to deliver is a win
tryRequestShardBufferHomes()
} }
tryCompleteGracefulShutdown() // eventually, also re-trigger a graceful shutdown if one is in progress
sendGracefulShutdownToCoordinatorIfInProgress()
tryCompleteGracefulShutdownIfInProgress()
case RegisterRetry => case RegisterRetry =>
if (coordinator.isEmpty) { if (coordinator.isEmpty) {
@ -818,8 +841,8 @@ private[akka] class ShardRegion(
case GracefulShutdown => case GracefulShutdown =>
log.debug("{}: Starting graceful shutdown of region and all its shards", typeName) log.debug("{}: Starting graceful shutdown of region and all its shards", typeName)
gracefulShutdownInProgress = true gracefulShutdownInProgress = true
sendGracefulShutdownToCoordinator() sendGracefulShutdownToCoordinatorIfInProgress()
tryCompleteGracefulShutdown() tryCompleteGracefulShutdownIfInProgress()
case _ => unhandled(cmd) case _ => unhandled(cmd)
} }
@ -882,7 +905,9 @@ private[akka] class ShardRegion(
} }
} }
tryCompleteGracefulShutdown() // did this shard get removed because the ShardRegion is shutting down?
// If so, we can try to speed-up the region shutdown. We don't need to wait for the next tick.
tryCompleteGracefulShutdownIfInProgress()
} }
} }
@ -931,7 +956,7 @@ private[akka] class ShardRegion(
case Failure(_) => Success(Left(shardId)) case Failure(_) => Success(Left(shardId))
} }
private def tryCompleteGracefulShutdown() = private def tryCompleteGracefulShutdownIfInProgress(): Unit =
if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) { if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) {
context.stop(self) // all shards have been rebalanced, complete graceful shutdown context.stop(self) // all shards have been rebalanced, complete graceful shutdown
} }
@ -1007,32 +1032,36 @@ private[akka] class ShardRegion(
def registrationMessage: Any = def registrationMessage: Any =
if (entityProps.isDefined) Register(self) else RegisterProxy(self) if (entityProps.isDefined) Register(self) else RegisterProxy(self)
def requestShardBufferHomes(): Unit = { /**
// Have to use vars because MessageBufferMap has no map, only foreach * Send GetShardHome for all shards with buffered messages
var totalBuffered = 0 * If coordinator is empty, nothing happens
var shards = List.empty[String] */
shardBuffers.foreach { def tryRequestShardBufferHomes(): Unit = {
case (shard, buf) => coordinator.foreach { coord =>
coordinator.foreach { c => // Have to use vars because MessageBufferMap has no map, only foreach
var totalBuffered = 0
var shards = List.empty[String]
shardBuffers.foreach {
case (shard, buf) =>
totalBuffered += buf.size totalBuffered += buf.size
shards ::= shard shards ::= shard
log.debug( log.debug(
"{}: Retry request for shard [{}] homes from coordinator at [{}]. [{}] buffered messages.", "{}: Requesting shard home for [{}] from coordinator at [{}]. [{}] buffered messages.",
typeName, typeName,
shard, shard,
c, coord,
buf.size) buf.size)
c ! GetShardHome(shard) coord ! GetShardHome(shard)
} }
}
if (retryCount >= 5 && retryCount % 5 == 0 && log.isWarningEnabled) { if (retryCount >= 5 && retryCount % 5 == 0 && log.isWarningEnabled) {
log.warning( log.warning(
"{}: Retry request for shards [{}] homes from coordinator. [{}] total buffered messages. Coordinator [{}]", "{}: Requested shard homes [{}] from coordinator at [{}]. [{}] total buffered messages.",
typeName, typeName,
shards.sorted.mkString(","), shards.sorted.mkString(","),
totalBuffered, coord,
coordinator) totalBuffered)
}
} }
} }
@ -1189,10 +1218,11 @@ private[akka] class ShardRegion(
} }
} }
def sendGracefulShutdownToCoordinator(): Unit = { def sendGracefulShutdownToCoordinatorIfInProgress(): Unit = {
if (gracefulShutdownInProgress) { if (gracefulShutdownInProgress) {
log.debug("Sending graceful shutdown to {}", coordinatorSelection) val actorSelections = coordinatorSelection
coordinatorSelection.foreach(_ ! GracefulShutdownReq(self)) log.debug("Sending graceful shutdown to {}", actorSelections)
actorSelections.foreach(_ ! GracefulShutdownReq(self))
} }
} }
} }

View file

@ -189,7 +189,7 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding
/** /**
* {{{ * {{{
* startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third)) * startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second, third))
* }}} * }}}
* *
* @param startOn the node to start the `SharedLeveldbStore` store on * @param startOn the node to start the `SharedLeveldbStore` store on