ClusterSharding: increasing retry interval for registration call, #25191

This commit is contained in:
filipMatusak 2019-04-16 13:55:08 +02:00 committed by Patrik Nordwall
parent 97756361f3
commit 5960924afa
2 changed files with 42 additions and 6 deletions

View file

@ -3,3 +3,7 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Rememb
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RememberEntityStarter.props")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RemoveInternalClusterShardingData.remove")
# #25191
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.retryTask")

View file

@ -347,6 +347,8 @@ object ShardRegion {
private case object Retry extends ShardRegionCommand
private case object RegisterRetry extends ShardRegionCommand
/**
* When an remembering entities and the shard stops unexpected (e.g. persist failure), we
* restart it after a back off using this message.
@ -442,7 +444,8 @@ private[akka] class ShardRegion(
replicator: ActorRef,
majorityMinCap: Int)
extends Actor
with ActorLogging {
with ActorLogging
with Timers {
import ShardCoordinator.Internal._
import ShardRegion._
@ -466,8 +469,9 @@ private[akka] class ShardRegion(
var gracefulShutdownInProgress = false
import context.dispatcher
val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry)
var retryCount = 0
val initRegistrationDelay: FiniteDuration = 100.millis.max(retryInterval / 2 / 2 / 2)
var nextRegistrationDelay: FiniteDuration = initRegistrationDelay
// for CoordinatedShutdown
val gracefulShutdownProgress = Promise[Done]()
@ -484,6 +488,8 @@ private[akka] class ShardRegion(
// subscribe to MemberEvent, re-subscribe when restart
override def preStart(): Unit = {
cluster.subscribe(self, classOf[MemberEvent])
timers.startPeriodicTimer(Retry, Retry, retryInterval)
startRegistration()
if (settings.passivateIdleEntityAfter > Duration.Zero)
log.info(
"{}: Idle entities will be passivated after [{}]",
@ -495,7 +501,6 @@ private[akka] class ShardRegion(
super.postStop()
cluster.unsubscribe(self)
gracefulShutdownProgress.trySuccess(Done)
retryTask.cancel()
}
// when using proxy the data center can be different from the own data center
@ -532,7 +537,7 @@ private[akka] class ShardRegion(
before.map(_.address).getOrElse(""),
after.map(_.address).getOrElse(""))
coordinator = None
register()
startRegistration()
}
}
@ -614,6 +619,7 @@ private[akka] class ShardRegion(
case RegisterAck(coord) =>
context.watch(coord)
coordinator = Some(coord)
finishRegistration()
requestShardBufferHomes()
case BeginHandOff(shard) =>
@ -662,6 +668,12 @@ private[akka] class ShardRegion(
tryCompleteGracefulShutdown()
case RegisterRetry
if (coordinator.isEmpty) {
register()
scheduleNextRegistration()
}
case GracefulShutdown =>
log.debug("{}: Starting graceful shutdown of region and all its shards", typeName)
gracefulShutdownInProgress = true
@ -691,9 +703,10 @@ private[akka] class ShardRegion(
}
def receiveTerminated(ref: ActorRef): Unit = {
if (coordinator.contains(ref))
if (coordinator.contains(ref)) {
coordinator = None
else if (regions.contains(ref)) {
startRegistration()
} else if (regions.contains(ref)) {
val shards = regions(ref)
regionByShard --= shards
regions -= ref
@ -758,6 +771,25 @@ private[akka] class ShardRegion(
context.stop(self) // all shards have been rebalanced, complete graceful shutdown
}
def startRegistration(): Unit = {
nextRegistrationDelay = initRegistrationDelay
register()
scheduleNextRegistration()
}
def scheduleNextRegistration(): Unit = {
if (nextRegistrationDelay < retryInterval) {
timers.startSingleTimer(RegisterRetry, RegisterRetry, nextRegistrationDelay)
// exponentially increasing retry interval until reaching the normal retryInterval
nextRegistrationDelay *= 2
}
}
def finishRegistration(): Unit = {
timers.cancel(RegisterRetry)
}
def register(): Unit = {
coordinatorSelection.foreach(_ ! registrationMessage)
if (shardBuffers.nonEmpty && retryCount >= 5) coordinatorSelection match {