diff --git a/akka-cluster-sharding/src/main/mima-filters/2.6.9.backwards.excludes/29549-distinct-shutdown-and-rebalance.excludes b/akka-cluster-sharding/src/main/mima-filters/2.6.9.backwards.excludes/29549-distinct-shutdown-and-rebalance.excludes new file mode 100644 index 0000000000..4b10fbceb0 --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.6.9.backwards.excludes/29549-distinct-shutdown-and-rebalance.excludes @@ -0,0 +1,6 @@ +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator#RebalanceWorker.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceWorkerProps") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardCoordinator#RebalanceWorker.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.cluster.sharding.ShardCoordinator.rebalanceWorkerProps") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.requestShardBufferHomes") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.ShardRegion.sendGracefulShutdownToCoordinator") \ No newline at end of file diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 8bbbe2b2be..cad2e0c19f 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -30,8 +30,7 @@ import akka.cluster.sharding.internal.{ RememberEntitiesProvider } import akka.dispatch.ExecutionContexts -import akka.event.BusLogging -import akka.event.Logging +import akka.event.{ BusLogging, Logging } import akka.pattern.{ pipe, AskTimeoutException } import akka.persistence._ import akka.util.PrettyDuration._ @@ -130,9 +129,10 @@ object ShardCoordinator { /** * Invoked when the location of a new shard is to be decided. - * @param requester actor reference to the [[ShardRegion]] that requested the location of the - * shard, can be returned if preference should be given to the node where the shard was first accessed - * @param shardId the id of the shard to allocate + * + * @param requester actor reference to the [[ShardRegion]] that requested the location of the + * shard, can be returned if preference should be given to the node where the shard was first accessed + * @param shardId the id of the shard to allocate * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * in the order they were allocated * @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of @@ -145,10 +145,11 @@ object ShardCoordinator { /** * Invoked periodically to decide which shards to rebalance to another location. + * * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * in the order they were allocated - * @param rebalanceInProgress set of shards that are currently being rebalanced, i.e. - * you should not include these in the returned set + * @param rebalanceInProgress set of shards that are currently being rebalanced, i.e. + * you should not include these in the returned set * @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round */ def rebalance( @@ -214,9 +215,10 @@ object ShardCoordinator { /** * Invoked when the location of a new shard is to be decided. - * @param requester actor reference to the [[ShardRegion]] that requested the location of the - * shard, can be returned if preference should be given to the node where the shard was first accessed - * @param shardId the id of the shard to allocate + * + * @param requester actor reference to the [[ShardRegion]] that requested the location of the + * shard, can be returned if preference should be given to the node where the shard was first accessed + * @param shardId the id of the shard to allocate * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * in the order they were allocated * @return a `Future` of the actor ref of the [[ShardRegion]] that is to be responsible for the shard, must be one of @@ -229,10 +231,11 @@ object ShardCoordinator { /** * Invoked periodically to decide which shards to rebalance to another location. + * * @param currentShardAllocations all actor refs to `ShardRegion` and their current allocated shards, * in the order they were allocated - * @param rebalanceInProgress set of shards that are currently being rebalanced, i.e. - * you should not include these in the returned set + * @param rebalanceInProgress set of shards that are currently being rebalanced, i.e. + * you should not include these in the returned set * @return a `Future` of the shards to be migrated, may be empty to skip rebalance in this round */ def rebalance( @@ -539,10 +542,12 @@ object ShardCoordinator { shard: String, shardRegionFrom: ActorRef, handOffTimeout: FiniteDuration, - regions: Set[ActorRef]) + regions: Set[ActorRef], + isRebalance: Boolean) extends Actor with ActorLogging with Timers { + import Internal._ regions.foreach { region => @@ -550,7 +555,14 @@ object ShardCoordinator { } var remaining = regions - log.debug("Rebalance [{}] from [{}] regions", regions.size, shard) + if (isRebalance) + log.debug("Rebalance [{}] from [{}] regions", shard, regions.size) + else + log.debug( + "Shutting down shard [{}] from region [{}]. Asking [{}] region(s) to hand-off shard", + shard, + shardRegionFrom, + regions.size) timers.startSingleTimer("hand-off-timeout", ReceiveTimeout, handOffTimeout) @@ -559,10 +571,16 @@ object ShardCoordinator { log.debug("BeginHandOffAck for shard [{}] received from [{}].", shard, sender()) acked(sender()) case RebalanceWorker.ShardRegionTerminated(shardRegion) => - log.debug("ShardRegion [{}] terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard) - acked(shardRegion) + if (remaining.contains(shardRegion)) { + log.debug("ShardRegion [{}] terminated while waiting for BeginHandOffAck for shard [{}].", shardRegion, shard) + acked(shardRegion) + } case ReceiveTimeout => - log.debug("Rebalance of shard [{}] from [{}] timed out", shard, shardRegionFrom) + if (isRebalance) + log.debug("Rebalance of [{}] from [{}] timed out", shard, shardRegionFrom) + else + log.debug("Shutting down [{}] shard from [{}] timed out", shard, shardRegionFrom) + done(ok = false) } @@ -595,9 +613,11 @@ object ShardCoordinator { shard: String, shardRegionFrom: ActorRef, handOffTimeout: FiniteDuration, - regions: Set[ActorRef]): Props = { - Props(new RebalanceWorker(shard, shardRegionFrom, handOffTimeout, regions)) + regions: Set[ActorRef], + isRebalance: Boolean): Props = { + Props(new RebalanceWorker(shard, shardRegionFrom, handOffTimeout, regions, isRebalance)) } + } /** @@ -609,6 +629,7 @@ abstract class ShardCoordinator( settings: ClusterShardingSettings, allocationStrategy: ShardCoordinator.ShardAllocationStrategy) extends Actor { + import ShardCoordinator._ import ShardCoordinator.Internal._ import ShardRegion.ShardId @@ -639,6 +660,7 @@ abstract class ShardCoordinator( var regionTerminationInProgress = Set.empty[ActorRef] import context.dispatcher + val rebalanceTask = context.system.scheduler.scheduleWithFixedDelay(rebalanceInterval, rebalanceInterval, self, RebalanceTick) @@ -775,26 +797,32 @@ abstract class ShardCoordinator( case RebalanceDone(shard, ok) => rebalanceWorkers -= sender() - if (ok) - log.debug("Rebalance shard [{}] completed successfully.", shard) - else - log.warning("Rebalance shard [{}] didn't complete within [{}].", shard, handOffTimeout.pretty) - // The shard could have been removed by ShardRegionTerminated - if (state.shards.contains(shard)) { - if (ok) { + + if (ok) { + log.debug("Shard [{}] deallocation completed successfully.", shard) + + // The shard could have been removed by ShardRegionTerminated + if (state.shards.contains(shard)) { update(ShardHomeDeallocated(shard)) { evt => - log.debug("Shard [{}] deallocated after rebalance", shard) + log.debug("Shard [{}] deallocated after", shard) state = state.updated(evt) clearRebalanceInProgress(shard) allocateShardHomesForRememberEntities() self.tell(GetShardHome(shard), ignoreRef) } } else { - // rebalance not completed, graceful shutdown will be retried - gracefulShutdownInProgress -= state.shards(shard) clearRebalanceInProgress(shard) } + } else { + log.warning("Shard [{}] deallocation didn't complete within [{}].", shard, handOffTimeout.pretty) + + // was that due to a graceful region shutdown? + // if so, consider the region as still alive and let it retry to gracefully shutdown later + state.shards.get(shard).foreach { region => + gracefulShutdownInProgress -= region + } + clearRebalanceInProgress(shard) } @@ -813,7 +841,8 @@ abstract class ShardCoordinator( log.debug("Graceful shutdown of region [{}] with [{}] shards", region, shards.size) } gracefulShutdownInProgress += region - continueRebalance(shards.toSet) + shutdownShards(region, shards.toSet) + case None => log.debug("Unknown region requested graceful shutdown [{}]", region) } @@ -1039,7 +1068,7 @@ abstract class ShardCoordinator( state.shards.get(shard) match { case Some(ref) => getShardHomeSender ! ShardHome(shard, ref) case None => - if (state.regions.contains(region) && !gracefulShutdownInProgress.contains(region) && !regionTerminationInProgress + if (state.regions.contains(region) && !gracefulShutdownInProgress(region) && !regionTerminationInProgress .contains(region)) { update(ShardHomeAllocated(shard, region)) { evt => state = state.updated(evt) @@ -1073,6 +1102,27 @@ abstract class ShardCoordinator( else region.path.address } + /** + * Start a RebalanceWorker to manage the shard rebalance. + * Does nothing if the shard is already in the process of being rebalanced. + */ + private def startShardRebalanceIfNeeded( + shard: String, + from: ActorRef, + handOffTimeout: FiniteDuration, + isRebalance: Boolean): Unit = { + if (!rebalanceInProgress.contains(shard)) { + rebalanceInProgress = rebalanceInProgress.updated(shard, Set.empty) + rebalanceWorkers += context.actorOf( + rebalanceWorkerProps( + shard, + from, + handOffTimeout, + state.regions.keySet.union(state.regionProxies), + isRebalance = isRebalance).withDispatcher(context.props.dispatcher)) + } + } + def continueRebalance(shards: Set[ShardId]): Unit = { if ((log: BusLogging).isInfoEnabled && (shards.nonEmpty || rebalanceInProgress.nonEmpty)) { log.info( @@ -1081,25 +1131,27 @@ abstract class ShardCoordinator( rebalanceInProgress.keySet.mkString(",")) } shards.foreach { shard => + // optimisation: check if not already in progress before fetching region if (!rebalanceInProgress.contains(shard)) { state.shards.get(shard) match { case Some(rebalanceFromRegion) => - rebalanceInProgress = rebalanceInProgress.updated(shard, Set.empty) log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) - rebalanceWorkers += context.actorOf( - rebalanceWorkerProps( - shard, - rebalanceFromRegion, - handOffTimeout, - state.regions.keySet.union(state.regionProxies)).withDispatcher(context.props.dispatcher)) + startShardRebalanceIfNeeded(shard, rebalanceFromRegion, handOffTimeout, isRebalance = true) case None => log.debug("Rebalance of non-existing shard [{}] is ignored", shard) } - } } } + def shutdownShards(shuttingDownRegion: ActorRef, shards: Set[ShardId]): Unit = { + if ((log: BusLogging).isInfoEnabled && (shards.nonEmpty)) { + log.info("Starting shutting down shards [{}] due to region shutting down.", shards.mkString(",")) + } + shards.foreach { shard => + startShardRebalanceIfNeeded(shard, shuttingDownRegion, handOffTimeout, isRebalance = false) + } + } } /** @@ -1117,6 +1169,7 @@ class PersistentShardCoordinator( allocationStrategy: ShardCoordinator.ShardAllocationStrategy) extends ShardCoordinator(settings, allocationStrategy) with PersistentActor { + import ShardCoordinator.Internal._ import settings.tuningParameters._ @@ -1229,9 +1282,13 @@ class PersistentShardCoordinator( */ @InternalApi private[akka] object DDataShardCoordinator { + private case object RememberEntitiesStoreStopped + private case class RememberEntitiesTimeout(shardId: ShardId) + private case object RememberEntitiesLoadTimeout + private val RememberEntitiesTimeoutKey = "RememberEntityTimeout" } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index b2bc5a7f5e..862224a41a 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -441,8 +441,23 @@ object ShardRegion { @InternalApi private[akka] final class ShardRegionStatus(val typeName: String, val registeredWithCoordinator: Boolean) + /** + * Periodic tick to run some house-keeping. + * This message is continuously sent to `self` using a timer configured with `retryInterval`. + */ private case object Retry extends ShardRegionCommand + /** + * Similar to Retry but used only when ShardRegion is starting and when we detect that + * the coordinator is moving. + * + * This is to ensure that a ShardRegion can register as soon as possible while the + * ShardCoordinator is in the process of recovering its state. + * + * This message is sent to `self` using a interval lower then [[Retry]] (higher frequency). + * The interval increases exponentially until it equals `retryInterval` in which case + * we stop to schedule it and let [[Retry]] take over. + */ private case object RegisterRetry extends ShardRegionCommand /** @@ -755,7 +770,7 @@ private[akka] class ShardRegion( context.watch(coord) coordinator = Some(coord) finishRegistration() - requestShardBufferHomes() + tryRequestShardBufferHomes() case BeginHandOff(shard) => log.debug("{}: BeginHandOff shard [{}]", typeName, shard) @@ -796,18 +811,26 @@ private[akka] class ShardRegion( } def receiveCommand(cmd: ShardRegionCommand): Unit = cmd match { - case Retry => - sendGracefulShutdownToCoordinator() - if (shardBuffers.nonEmpty) - retryCount += 1 - if (coordinator.isEmpty) - register() + case Retry => + // retryCount is used to avoid flooding the logs + // it's used inside register() whenever shardBuffers.nonEmpty + // therefore we update it if needed on each Retry msg + // the reason why it's updated here is because we don't want to increase it on each RegisterRetry, only on Retry + if (shardBuffers.nonEmpty) retryCount += 1 + + // we depend on the coordinator each time, if empty we need to register + // otherwise we can try to deliver some buffered messages + if (coordinator.isEmpty) register() else { - requestShardBufferHomes() + // Note: we do try to deliver buffered messages even in the middle of + // a graceful shutdown every message that we manage to deliver is a win + tryRequestShardBufferHomes() } - tryCompleteGracefulShutdown() + // eventually, also re-trigger a graceful shutdown if one is in progress + sendGracefulShutdownToCoordinatorIfInProgress() + tryCompleteGracefulShutdownIfInProgress() case RegisterRetry => if (coordinator.isEmpty) { @@ -818,8 +841,8 @@ private[akka] class ShardRegion( case GracefulShutdown => log.debug("{}: Starting graceful shutdown of region and all its shards", typeName) gracefulShutdownInProgress = true - sendGracefulShutdownToCoordinator() - tryCompleteGracefulShutdown() + sendGracefulShutdownToCoordinatorIfInProgress() + tryCompleteGracefulShutdownIfInProgress() case _ => unhandled(cmd) } @@ -882,7 +905,9 @@ private[akka] class ShardRegion( } } - tryCompleteGracefulShutdown() + // did this shard get removed because the ShardRegion is shutting down? + // If so, we can try to speed-up the region shutdown. We don't need to wait for the next tick. + tryCompleteGracefulShutdownIfInProgress() } } @@ -931,7 +956,7 @@ private[akka] class ShardRegion( case Failure(_) => Success(Left(shardId)) } - private def tryCompleteGracefulShutdown() = + private def tryCompleteGracefulShutdownIfInProgress(): Unit = if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) { context.stop(self) // all shards have been rebalanced, complete graceful shutdown } @@ -1007,32 +1032,36 @@ private[akka] class ShardRegion( def registrationMessage: Any = if (entityProps.isDefined) Register(self) else RegisterProxy(self) - def requestShardBufferHomes(): Unit = { - // Have to use vars because MessageBufferMap has no map, only foreach - var totalBuffered = 0 - var shards = List.empty[String] - shardBuffers.foreach { - case (shard, buf) => - coordinator.foreach { c => + /** + * Send GetShardHome for all shards with buffered messages + * If coordinator is empty, nothing happens + */ + def tryRequestShardBufferHomes(): Unit = { + coordinator.foreach { coord => + // Have to use vars because MessageBufferMap has no map, only foreach + var totalBuffered = 0 + var shards = List.empty[String] + shardBuffers.foreach { + case (shard, buf) => totalBuffered += buf.size shards ::= shard log.debug( - "{}: Retry request for shard [{}] homes from coordinator at [{}]. [{}] buffered messages.", + "{}: Requesting shard home for [{}] from coordinator at [{}]. [{}] buffered messages.", typeName, shard, - c, + coord, buf.size) - c ! GetShardHome(shard) - } - } + coord ! GetShardHome(shard) + } - if (retryCount >= 5 && retryCount % 5 == 0 && log.isWarningEnabled) { - log.warning( - "{}: Retry request for shards [{}] homes from coordinator. [{}] total buffered messages. Coordinator [{}]", - typeName, - shards.sorted.mkString(","), - totalBuffered, - coordinator) + if (retryCount >= 5 && retryCount % 5 == 0 && log.isWarningEnabled) { + log.warning( + "{}: Requested shard homes [{}] from coordinator at [{}]. [{}] total buffered messages.", + typeName, + shards.sorted.mkString(","), + coord, + totalBuffered) + } } } @@ -1189,10 +1218,11 @@ private[akka] class ShardRegion( } } - def sendGracefulShutdownToCoordinator(): Unit = { + def sendGracefulShutdownToCoordinatorIfInProgress(): Unit = { if (gracefulShutdownInProgress) { - log.debug("Sending graceful shutdown to {}", coordinatorSelection) - coordinatorSelection.foreach(_ ! GracefulShutdownReq(self)) + val actorSelections = coordinatorSelection + log.debug("Sending graceful shutdown to {}", actorSelections) + actorSelections.foreach(_ ! GracefulShutdownReq(self)) } } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala index 5a12553dff..1d623da558 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala @@ -189,7 +189,7 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding /** * {{{ - * startPersistenceIfNotDdataMode(startOn = first, setStoreOn = Seq(first, second, third)) + * startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second, third)) * }}} * * @param startOn the node to start the `SharedLeveldbStore` store on