From 9024f19cff0e2f458f4bfd6330d4449f64597f59 Mon Sep 17 00:00:00 2001 From: Felix Satyaputra Date: Tue, 14 May 2019 14:29:53 +0200 Subject: [PATCH] Reduce DDataShardCoordinator memory usage during unreachability (#25444) * stash GetShardHome requests in a Set, to keep only one per sender * regions repeatedly send GetShardHome requests when they don't receive a response, keeping only one of them is better * other existing tests cover this implicitly --- .../cluster/sharding/ShardCoordinator.scala | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 4dfab36a78..d9ddf45a18 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -1019,6 +1019,9 @@ class DDataShardCoordinator( if (rememberEntities) Set(CoordinatorStateKey, AllShardsKey) else Set(CoordinatorStateKey) var shards = Set.empty[String] + + var getShardHomeRequests: Set[(ActorRef, GetShardHome)] = Set.empty + if (rememberEntities) replicator ! Subscribe(AllShardsKey, self) @@ -1096,10 +1099,14 @@ class DDataShardCoordinator( // which was scheduled by previous watchStateActors def waitingForStateInitialized: Receive = { case StateInitialized => + unstashGetShardHomeRequests() unstashAll() stateInitialized() activate() + case g: GetShardHome ⇒ + stashGetShardHomeRequest(sender(), g) + case _ => stash() } @@ -1150,9 +1157,9 @@ class DDataShardCoordinator( evt) throw cause - case GetShardHome(shard) => + case g @ GetShardHome(shard) => if (!handleGetShardHome(shard)) - stash() // must wait for update that is in progress + stashGetShardHomeRequest(sender(), g) // must wait for update that is in progress case _ => stash() } @@ -1160,9 +1167,20 @@ class DDataShardCoordinator( private def unbecomeAfterUpdate[E <: DomainEvent](evt: E, afterUpdateCallback: E => Unit): Unit = { context.unbecome() afterUpdateCallback(evt) + unstashGetShardHomeRequests() unstashAll() } + private def stashGetShardHomeRequest(sender: ActorRef, request: GetShardHome): Unit = + getShardHomeRequests += (sender -> request) + + private def unstashGetShardHomeRequests(): Unit = { + getShardHomeRequests.foreach { + case (originalSender, request) ⇒ self.tell(request, sender = originalSender) + } + getShardHomeRequests = Set.empty + } + def activate() = { context.become(active) log.info("ShardCoordinator was moved to the active state {}", state)