Merge pull request #29551 from akka/wip-29543-alloc-after-rebalance-patriknw

Allocate rebalanced shards immediately, #29543
This commit is contained in:
Patrik Nordwall 2020-09-09 18:45:46 +02:00 committed by GitHub
commit 7e768fa732
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -535,6 +535,7 @@ abstract class ShardCoordinator(
import settings.tuningParameters._ import settings.tuningParameters._
val log = Logging.withMarker(context.system, this) val log = Logging.withMarker(context.system, this)
private val ignoreRef = context.system.asInstanceOf[ExtendedActorSystem].provider.ignoreRef
val cluster = Cluster(context.system) val cluster = Cluster(context.system)
val removalMargin = cluster.downingProvider.downRemovalMargin val removalMargin = cluster.downingProvider.downRemovalMargin
@ -703,6 +704,7 @@ abstract class ShardCoordinator(
state = state.updated(evt) state = state.updated(evt)
clearRebalanceInProgress(shard) clearRebalanceInProgress(shard)
allocateShardHomesForRememberEntities() allocateShardHomesForRememberEntities()
self.tell(GetShardHome(shard), ignoreRef)
} }
} else { } else {
// rebalance not completed, graceful shutdown will be retried // rebalance not completed, graceful shutdown will be retried
@ -750,10 +752,6 @@ abstract class ShardCoordinator(
} }
.pipeTo(sender()) .pipeTo(sender())
case ShardHome(_, _) =>
//On rebalance, we send ourselves a GetShardHome message to reallocate a
// shard. This receive handles the "response" from that message. i.e. ignores it.
case ClusterShuttingDown => case ClusterShuttingDown =>
log.debug("Shutting down ShardCoordinator") log.debug("Shutting down ShardCoordinator")
// can't stop because supervisor will start it again, // can't stop because supervisor will start it again,
@ -897,7 +895,7 @@ abstract class ShardCoordinator(
log.debug("ShardRegion terminated: [{}]", ref) log.debug("ShardRegion terminated: [{}]", ref)
regionTerminationInProgress += ref regionTerminationInProgress += ref
state.regions(ref).foreach { s => state.regions(ref).foreach { s =>
self ! GetShardHome(s) self.tell(GetShardHome(s), ignoreRef)
} }
update(ShardRegionTerminated(ref)) { evt => update(ShardRegionTerminated(ref)) { evt =>
@ -930,7 +928,9 @@ abstract class ShardCoordinator(
def allocateShardHomesForRememberEntities(): Unit = { def allocateShardHomesForRememberEntities(): Unit = {
if (settings.rememberEntities && state.unallocatedShards.nonEmpty) if (settings.rememberEntities && state.unallocatedShards.nonEmpty)
state.unallocatedShards.foreach { self ! GetShardHome(_) } state.unallocatedShards.foreach { shard =>
self.tell(GetShardHome(shard), ignoreRef)
}
} }
def continueGetShardHome(shard: ShardId, region: ActorRef, getShardHomeSender: ActorRef): Unit = def continueGetShardHome(shard: ShardId, region: ActorRef, getShardHomeSender: ActorRef): Unit =