RebalanceWorker should watch shard regions (#27261)

* RebalanceWorker should watch shard regions

Fixes #27259.

The RebalanceWorker actor needs to watch the shard regions that it's
expecting a BeginHandOffAck message from, in case the ShardRegion shuts
down before it can receive the BeginHandOff message, preventing hand
off.  This can be a problem when two nodes are shut down at about the
same time.
This commit is contained in:
James Roper 2019-08-16 00:36:02 +10:00 committed by Patrik Nordwall
parent c1eb0719da
commit bbff92ade6
2 changed files with 34 additions and 11 deletions

View file

@ -4,6 +4,11 @@ ProblemFilters.exclude[Problem]("akka.cluster.sharding.Shard.*")
# #25191
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.retryTask")
# Internal API change https://github.com/akka/akka/pull/27261
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator#RebalanceWorker.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceWorkerProps")
# #27100 Productionize: GetShardRegionStats returns empty shard set on ask timeout
# askAllShards, an internal function, was renamed and changed to query all or a subset of shards to try failures only
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.askAllShards")

View file

@ -424,9 +424,13 @@ object ShardCoordinator {
shard: String,
from: ActorRef,
handOffTimeout: FiniteDuration,
regions: Set[ActorRef])
extends Actor {
regions: Set[ActorRef],
shuttingDownRegions: Set[ActorRef])
extends Actor
with ActorLogging {
import Internal._
shuttingDownRegions.foreach(context.watch)
regions.foreach(_ ! BeginHandOff(shard))
var remaining = regions
@ -435,14 +439,24 @@ object ShardCoordinator {
def receive = {
case BeginHandOffAck(`shard`) =>
remaining -= sender()
if (remaining.isEmpty) {
from ! HandOff(shard)
context.become(stoppingShard, discardOld = true)
}
log.debug("BeginHandOffAck for shard [{}] received from {}.", shard, sender())
acked(sender())
case Terminated(shardRegion) =>
log.debug("ShardRegion {} terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard)
acked(shardRegion)
case ReceiveTimeout => done(ok = false)
}
private def acked(shardRegion: ActorRef) = {
context.unwatch(shardRegion)
remaining -= shardRegion
if (remaining.isEmpty) {
log.debug("All shard regions acked, handing off shard [{}].", shard)
from ! HandOff(shard)
context.become(stoppingShard, discardOld = true)
}
}
def stoppingShard: Receive = {
case ShardStopped(`shard`) => done(ok = true)
case ReceiveTimeout => done(ok = false)
@ -458,9 +472,12 @@ object ShardCoordinator {
shard: String,
from: ActorRef,
handOffTimeout: FiniteDuration,
regions: Set[ActorRef]): Props =
Props(new RebalanceWorker(shard, from, handOffTimeout, regions))
regions: Set[ActorRef],
// Note: must be a subset of regions
shuttingDownRegions: Set[ActorRef]): Props = {
require(shuttingDownRegions.size <= regions.size, "'shuttingDownRegions' must be a subset of 'regions'.")
Props(new RebalanceWorker(shard, from, handOffTimeout, regions, shuttingDownRegions))
}
}
/**
@ -886,7 +903,8 @@ abstract class ShardCoordinator(
shard,
rebalanceFromRegion,
handOffTimeout,
state.regions.keySet.union(state.regionProxies)).withDispatcher(context.props.dispatcher))
state.regions.keySet.union(state.regionProxies),
gracefulShutdownInProgress).withDispatcher(context.props.dispatcher))
case None =>
log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
}