Avoid stuck rebalances during coordinator leaving (#29385)

* Watch all regions as they may shutdown after rebalance starts

* Send graceful shutdown to selection if no coordinator found

* mima

* Add missing new line

* Make log markers consistent for rebalance worker
This commit is contained in:
Christopher Batey 2020-07-17 12:44:12 +01:00 committed by GitHub
parent b5d5dd2d2b
commit 212de410b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 15 deletions

View file

@ -0,0 +1,2 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator#RebalanceWorker.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceWorkerProps")

View file

@ -460,27 +460,32 @@ object ShardCoordinator {
shard: String, shard: String,
from: ActorRef, from: ActorRef,
handOffTimeout: FiniteDuration, handOffTimeout: FiniteDuration,
regions: Set[ActorRef], regions: Set[ActorRef])
shuttingDownRegions: Set[ActorRef])
extends Actor extends Actor
with ActorLogging with ActorLogging
with Timers { with Timers {
import Internal._ import Internal._
shuttingDownRegions.foreach(context.watch) regions.foreach { region =>
regions.foreach(_ ! BeginHandOff(shard)) context.watch(region)
region ! BeginHandOff(shard)
}
var remaining = regions var remaining = regions
log.debug("Rebalance [{}] from region [{}]", shard, regions)
timers.startSingleTimer("hand-off-timeout", ReceiveTimeout, handOffTimeout) timers.startSingleTimer("hand-off-timeout", ReceiveTimeout, handOffTimeout)
def receive = { def receive = {
case BeginHandOffAck(`shard`) => case BeginHandOffAck(`shard`) =>
log.debug("BeginHandOffAck for shard [{}] received from {}.", shard, sender()) log.debug("BeginHandOffAck for shard [{}] received from [{}].", shard, sender())
acked(sender()) acked(sender())
case Terminated(shardRegion) => case Terminated(shardRegion) =>
log.debug("ShardRegion {} terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard) log.debug("ShardRegion [{}] terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard)
acked(shardRegion) acked(shardRegion)
case ReceiveTimeout => done(ok = false) case ReceiveTimeout =>
log.debug("Rebalance of [{}] from [{}] timed out", shard, from)
done(ok = false)
} }
private def acked(shardRegion: ActorRef) = { private def acked(shardRegion: ActorRef) = {
@ -508,11 +513,8 @@ object ShardCoordinator {
shard: String, shard: String,
from: ActorRef, from: ActorRef,
handOffTimeout: FiniteDuration, handOffTimeout: FiniteDuration,
regions: Set[ActorRef], regions: Set[ActorRef]): Props = {
// Note: must be a subset of regions Props(new RebalanceWorker(shard, from, handOffTimeout, regions))
shuttingDownRegions: Set[ActorRef]): Props = {
require(shuttingDownRegions.size <= regions.size, "'shuttingDownRegions' must be a subset of 'regions'.")
Props(new RebalanceWorker(shard, from, handOffTimeout, regions, shuttingDownRegions))
} }
} }
@ -715,6 +717,7 @@ abstract class ShardCoordinator(
gracefulShutdownInProgress += region gracefulShutdownInProgress += region
continueRebalance(shards.toSet) continueRebalance(shards.toSet)
case None => case None =>
log.debug("Unknown region requested graceful shutdown [{}]", region)
} }
case ShardRegion.GetClusterShardingStats(waitMax) => case ShardRegion.GetClusterShardingStats(waitMax) =>
@ -976,8 +979,7 @@ abstract class ShardCoordinator(
shard, shard,
rebalanceFromRegion, rebalanceFromRegion,
handOffTimeout, handOffTimeout,
state.regions.keySet.union(state.regionProxies), state.regions.keySet.union(state.regionProxies)).withDispatcher(context.props.dispatcher))
gracefulShutdownInProgress).withDispatcher(context.props.dispatcher))
case None => case None =>
log.debug("Rebalance of non-existing shard [{}] is ignored", shard) log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
} }

View file

@ -1140,7 +1140,9 @@ private[akka] class ShardRegion(
} }
def sendGracefulShutdownToCoordinator(): Unit = { def sendGracefulShutdownToCoordinator(): Unit = {
if (gracefulShutdownInProgress) if (gracefulShutdownInProgress) {
log.debug("Sending graceful shutdown to {}", coordinatorSelection)
coordinatorSelection.foreach(_ ! GracefulShutdownReq(self)) coordinatorSelection.foreach(_ ! GracefulShutdownReq(self))
}
} }
} }