Unstash GetShardHome requests one-by-one, #29742 (#29748)

* because it's likely that the first GetShardHome request will result in
  allocation update and then all are stashed again
* rename to unstashOneGetShardHomeRequest
This commit is contained in:
Patrik Nordwall 2020-10-19 12:36:02 +02:00 committed by GitHub
parent bd39e9bd21
commit 330893e2c1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 8 deletions

View file

@ -0,0 +1,2 @@
# #29742 Unstash GetShardHome requests one-by-one
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.ShardCoordinator.unstashOneGetShardHomeRequest")

View file

@ -951,6 +951,7 @@ abstract class ShardCoordinator(
def handleGetShardHome(shard: ShardId): Boolean = {
if (rebalanceInProgress.contains(shard)) {
deferGetShardHomeRequest(shard, sender())
unstashOneGetShardHomeRequest() // continue unstashing
true
} else if (!hasAllRegionsRegistered()) {
log.debug(
@ -970,6 +971,8 @@ abstract class ShardCoordinator(
shardRegionRef)
else
sender() ! ShardHome(shard, shardRegionRef)
unstashOneGetShardHomeRequest() // continue unstashing
true
case None =>
false // location not known, yet, caller will handle allocation
@ -1124,6 +1127,8 @@ abstract class ShardCoordinator(
}
}
protected def unstashOneGetShardHomeRequest(): Unit
private def regionAddress(region: ActorRef): Address = {
if (region.path.address.host.isEmpty) cluster.selfAddress
else region.path.address
@ -1310,6 +1315,8 @@ class PersistentShardCoordinator(
saveSnapshot(state)
}
}
override protected def unstashOneGetShardHomeRequest(): Unit = ()
}
/**
@ -1463,7 +1470,7 @@ private[akka] class DDataShardCoordinator(
// which was scheduled by previous watchStateActors
def waitingForStateInitialized: Receive = {
case StateInitialized =>
unstashGetShardHomeRequests()
unstashOneGetShardHomeRequest()
unstashAll()
stateInitialized()
activate()
@ -1625,7 +1632,7 @@ private[akka] class DDataShardCoordinator(
afterUpdateCallback(evt)
if (verboseDebug)
log.debug("{}: New coordinator state after [{}]: [{}]", typeName, evt, state)
unstashGetShardHomeRequests()
unstashOneGetShardHomeRequest()
unstashAll()
}
@ -1639,16 +1646,21 @@ private[akka] class DDataShardCoordinator(
getShardHomeRequests += (sender -> request)
}
private def unstashGetShardHomeRequests(): Unit = {
getShardHomeRequests.foreach {
case (originalSender, request) => self.tell(request, sender = originalSender)
override protected def unstashOneGetShardHomeRequest(): Unit = {
if (getShardHomeRequests.nonEmpty) {
// unstash one, will continue unstash of next after receive GetShardHome or update completed
val requestTuple = getShardHomeRequests.head
val (originalSender, request) = requestTuple
self.tell(request, sender = originalSender)
getShardHomeRequests -= requestTuple
}
getShardHomeRequests = Set.empty
}
def activate() = {
def activate(): Unit = {
context.become(active.orElse(receiveLateRememberedEntities))
log.info("{}: ShardCoordinator was moved to the active state {}", typeName, state)
log.info("{}: ShardCoordinator was moved to the active state with [{}] shards", typeName, state.shards.size)
if (verboseDebug)
log.debug("{}: Full ShardCoordinator initial state {}", typeName, state)
}
// only used once the coordinator is initialized