Reduce DDataShardCoordinator memory usage during unreachability (#25444)
* stash GetShardHome requests in a Set, to keep only one per sender * regions repeatedly send GetShardHome requests when they don't receive a response, keeping only one of them is better * other existing tests cover this implicitly
This commit is contained in:
parent
1852649d0d
commit
9024f19cff
1 changed files with 20 additions and 2 deletions
|
|
@ -1019,6 +1019,9 @@ class DDataShardCoordinator(
|
|||
if (rememberEntities) Set(CoordinatorStateKey, AllShardsKey) else Set(CoordinatorStateKey)
|
||||
|
||||
var shards = Set.empty[String]
|
||||
|
||||
var getShardHomeRequests: Set[(ActorRef, GetShardHome)] = Set.empty
|
||||
|
||||
if (rememberEntities)
|
||||
replicator ! Subscribe(AllShardsKey, self)
|
||||
|
||||
|
|
@ -1096,10 +1099,14 @@ class DDataShardCoordinator(
|
|||
// which was scheduled by previous watchStateActors
|
||||
def waitingForStateInitialized: Receive = {
|
||||
case StateInitialized =>
|
||||
unstashGetShardHomeRequests()
|
||||
unstashAll()
|
||||
stateInitialized()
|
||||
activate()
|
||||
|
||||
case g: GetShardHome ⇒
|
||||
stashGetShardHomeRequest(sender(), g)
|
||||
|
||||
case _ => stash()
|
||||
}
|
||||
|
||||
|
|
@ -1150,9 +1157,9 @@ class DDataShardCoordinator(
|
|||
evt)
|
||||
throw cause
|
||||
|
||||
case GetShardHome(shard) =>
|
||||
case g @ GetShardHome(shard) =>
|
||||
if (!handleGetShardHome(shard))
|
||||
stash() // must wait for update that is in progress
|
||||
stashGetShardHomeRequest(sender(), g) // must wait for update that is in progress
|
||||
|
||||
case _ => stash()
|
||||
}
|
||||
|
|
@ -1160,9 +1167,20 @@ class DDataShardCoordinator(
|
|||
private def unbecomeAfterUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E => Unit): Unit = {
|
||||
context.unbecome()
|
||||
afterUpdateCallback(evt)
|
||||
unstashGetShardHomeRequests()
|
||||
unstashAll()
|
||||
}
|
||||
|
||||
private def stashGetShardHomeRequest(sender: ActorRef, request: GetShardHome): Unit =
|
||||
getShardHomeRequests += (sender -> request)
|
||||
|
||||
private def unstashGetShardHomeRequests(): Unit = {
|
||||
getShardHomeRequests.foreach {
|
||||
case (originalSender, request) ⇒ self.tell(request, sender = originalSender)
|
||||
}
|
||||
getShardHomeRequests = Set.empty
|
||||
}
|
||||
|
||||
def activate() = {
|
||||
context.become(active)
|
||||
log.info("ShardCoordinator was moved to the active state {}", state)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue