From 5960924afa2fbca353351244bb300b770f150522 Mon Sep 17 00:00:00 2001 From: filipMatusak Date: Tue, 16 Apr 2019 13:55:08 +0200 Subject: [PATCH] ClusterSharding: increasing retry interval for registration call, #25191 --- .../mima-filters/2.5.22.backwards.excludes | 4 ++ .../akka/cluster/sharding/ShardRegion.scala | 44 ++++++++++++++++--- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/akka-cluster-sharding/src/main/mima-filters/2.5.22.backwards.excludes b/akka-cluster-sharding/src/main/mima-filters/2.5.22.backwards.excludes index ce07e26479..ca5a132e91 100644 --- a/akka-cluster-sharding/src/main/mima-filters/2.5.22.backwards.excludes +++ b/akka-cluster-sharding/src/main/mima-filters/2.5.22.backwards.excludes @@ -3,3 +3,7 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Rememb ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.this") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RememberEntityStarter.props") ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.RemoveInternalClusterShardingData.remove") + +# #25191 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.retryTask") + diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 648e0d15df..cc0818b07f 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -347,6 +347,8 @@ object ShardRegion { private case object Retry extends ShardRegionCommand + private case object RegisterRetry extends ShardRegionCommand + /** * When an remembering entities and the shard stops unexpected (e.g. persist failure), we * restart it after a back off using this message. @@ -442,7 +444,8 @@ private[akka] class ShardRegion( replicator: ActorRef, majorityMinCap: Int) extends Actor - with ActorLogging { + with ActorLogging + with Timers { import ShardCoordinator.Internal._ import ShardRegion._ @@ -466,8 +469,9 @@ private[akka] class ShardRegion( var gracefulShutdownInProgress = false import context.dispatcher - val retryTask = context.system.scheduler.schedule(retryInterval, retryInterval, self, Retry) var retryCount = 0 + val initRegistrationDelay: FiniteDuration = 100.millis.max(retryInterval / 2 / 2 / 2) + var nextRegistrationDelay: FiniteDuration = initRegistrationDelay // for CoordinatedShutdown val gracefulShutdownProgress = Promise[Done]() @@ -484,6 +488,8 @@ private[akka] class ShardRegion( // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) + timers.startPeriodicTimer(Retry, Retry, retryInterval) + startRegistration() if (settings.passivateIdleEntityAfter > Duration.Zero) log.info( "{}: Idle entities will be passivated after [{}]", @@ -495,7 +501,6 @@ private[akka] class ShardRegion( super.postStop() cluster.unsubscribe(self) gracefulShutdownProgress.trySuccess(Done) - retryTask.cancel() } // when using proxy the data center can be different from the own data center @@ -532,7 +537,7 @@ private[akka] class ShardRegion( before.map(_.address).getOrElse(""), after.map(_.address).getOrElse("")) coordinator = None - register() + startRegistration() } } @@ -614,6 +619,7 @@ private[akka] class ShardRegion( case RegisterAck(coord) => context.watch(coord) coordinator = Some(coord) + finishRegistration() requestShardBufferHomes() case BeginHandOff(shard) => @@ -662,6 +668,12 @@ private[akka] class ShardRegion( tryCompleteGracefulShutdown() + case RegisterRetry ⇒ + if (coordinator.isEmpty) { + register() + scheduleNextRegistration() + } + case GracefulShutdown => log.debug("{}: Starting graceful shutdown of region and all its shards", typeName) gracefulShutdownInProgress = true @@ -691,9 +703,10 @@ private[akka] class ShardRegion( } def receiveTerminated(ref: ActorRef): Unit = { - if (coordinator.contains(ref)) + if (coordinator.contains(ref)) { coordinator = None - else if (regions.contains(ref)) { + startRegistration() + } else if (regions.contains(ref)) { val shards = regions(ref) regionByShard --= shards regions -= ref @@ -758,6 +771,25 @@ private[akka] class ShardRegion( context.stop(self) // all shards have been rebalanced, complete graceful shutdown } + def startRegistration(): Unit = { + nextRegistrationDelay = initRegistrationDelay + + register() + scheduleNextRegistration() + } + + def scheduleNextRegistration(): Unit = { + if (nextRegistrationDelay < retryInterval) { + timers.startSingleTimer(RegisterRetry, RegisterRetry, nextRegistrationDelay) + // exponentially increasing retry interval until reaching the normal retryInterval + nextRegistrationDelay *= 2 + } + } + + def finishRegistration(): Unit = { + timers.cancel(RegisterRetry) + } + def register(): Unit = { coordinatorSelection.foreach(_ ! registrationMessage) if (shardBuffers.nonEmpty && retryCount >= 5) coordinatorSelection match {