Handle ShardsUpdated when waiting for remember entities (#31092)
Also, don't send updates if passivation is disabled
This commit is contained in:
parent
03886e28b6
commit
cea42b2a4d
2 changed files with 8 additions and 3 deletions
|
|
@ -6,7 +6,6 @@ package akka.cluster.sharding
|
|||
|
||||
import java.net.URLEncoder
|
||||
import java.util
|
||||
|
||||
import akka.actor.Actor
|
||||
import akka.actor.ActorLogging
|
||||
import akka.actor.ActorRef
|
||||
|
|
@ -24,6 +23,7 @@ import akka.cluster.ClusterEvent.InitialStateAsEvents
|
|||
import akka.cluster.ClusterEvent.MemberEvent
|
||||
import akka.cluster.ClusterEvent.MemberPreparingForShutdown
|
||||
import akka.cluster.ClusterEvent.MemberReadyForShutdown
|
||||
import akka.cluster.sharding.ShardRegion.ShardsUpdated
|
||||
import akka.cluster.sharding.internal.EntityPassivationStrategy
|
||||
import akka.cluster.sharding.internal.RememberEntitiesShardStore
|
||||
import akka.cluster.sharding.internal.RememberEntitiesShardStore.GetEntities
|
||||
|
|
@ -700,6 +700,7 @@ private[akka] class Shard(
|
|||
case msg: ShardQuery => receiveShardQuery(msg)
|
||||
case PassivateIntervalTick => stash()
|
||||
case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg)
|
||||
case msg: ShardsUpdated => shardsUpdated(msg)
|
||||
case msg if extractEntityId.isDefinedAt(msg) =>
|
||||
deliverMessage(msg, sender())
|
||||
case msg =>
|
||||
|
|
|
|||
|
|
@ -1025,7 +1025,9 @@ private[akka] class ShardRegion(
|
|||
shardsByRef = shardsByRef - ref
|
||||
shards = shards - shardId
|
||||
startingShards -= shardId
|
||||
shards.values.foreach(_ ! ShardsUpdated(shards.size))
|
||||
if (settings.passivationStrategy != ClusterShardingSettings.NoPassivationStrategy) {
|
||||
shards.values.foreach(_ ! ShardsUpdated(shards.size))
|
||||
}
|
||||
if (handingOff.contains(ref)) {
|
||||
handingOff = handingOff - ref
|
||||
log.debug("{}: Shard [{}] handoff complete", typeName, shardId)
|
||||
|
|
@ -1367,7 +1369,9 @@ private[akka] class ShardRegion(
|
|||
shardsByRef = shardsByRef.updated(shard, id)
|
||||
shards = shards.updated(id, shard)
|
||||
startingShards += id
|
||||
shards.values.foreach(_ ! ShardsUpdated(shards.size))
|
||||
if (settings.passivationStrategy != ClusterShardingSettings.NoPassivationStrategy) {
|
||||
shards.values.foreach(_ ! ShardsUpdated(shards.size))
|
||||
}
|
||||
None
|
||||
case Some(_) =>
|
||||
None
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue