From 78df739ef679d0c76cd6e0c7ab2ad5200dc77858 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Mon, 20 Dec 2021 21:09:14 +1300 Subject: [PATCH] Add adjustable active entity limit per shard region (#31004) --- .../scala/akka/cluster/sharding/Shard.scala | 7 + .../akka/cluster/sharding/ShardRegion.scala | 13 ++ .../internal/EntityPassivationStrategy.scala | 98 ++++++++------ .../passivation/LeastFrequentlyUsedSpec.scala | 62 +++++++++ .../passivation/LeastRecentlyUsedSpec.scala | 128 ++++++++++++++++++ .../passivation/MostRecentlyUsedSpec.scala | 61 +++++++++ 6 files changed, 331 insertions(+), 38 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 8689a64cdf..0b4b1cce84 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -616,6 +616,7 @@ private[akka] class Shard( case msg: CoordinatorMessage => receiveCoordinatorMessage(msg) case msg: RememberEntityCommand => receiveRememberEntityCommand(msg) case msg: ShardRegion.StartEntity => startEntity(msg.entityId, Some(sender())) + case msg: ShardRegion.SetActiveEntityLimit => activeEntityLimitUpdated(msg) case msg: ShardRegion.ShardsUpdated => shardsUpdated(msg) case Passivate(stopMessage) => passivate(sender(), stopMessage) case PassivateIntervalTick => passivateEntitiesAfterInterval() @@ -624,6 +625,7 @@ private[akka] class Shard( case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender()) } + def rememberUpdate(add: Set[EntityId] = Set.empty, remove: Set[EntityId] = Set.empty): Unit = { rememberEntitiesStore match { case None => @@ -981,6 +983,11 @@ private[akka] class Shard( } } + private def activeEntityLimitUpdated(updated: ShardRegion.SetActiveEntityLimit): Unit = { + val entitiesToPassivate = passivationStrategy.limitUpdated(updated.perRegionLimit) + passivateEntities(entitiesToPassivate) + } + private def shardsUpdated(updated: ShardRegion.ShardsUpdated): Unit = { val entitiesToPassivate = passivationStrategy.shardsUpdated(updated.activeShards) passivateEntities(entitiesToPassivate) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 9b5a9675a1..be0eacd47a 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -15,6 +15,7 @@ import scala.runtime.AbstractFunction1 import scala.util.{ Failure, Success } import akka.Done import akka.actor._ +import akka.annotation.ApiMayChange import akka.annotation.{ InternalApi, InternalStableApi } import akka.cluster.Cluster import akka.cluster.ClusterEvent._ @@ -488,6 +489,14 @@ object ShardRegion { extends ClusterShardingSerializable with DeadLetterSuppression + /** + * API MAY CHANGE: Messages for passivation strategies may change after additional testing and feedback. + * + * When limit-based automatic passivation is enabled, set a new active entity limit for a shard region. + */ + @ApiMayChange + final case class SetActiveEntityLimit(perRegionLimit: Int) + /** * INTERNAL API * @@ -774,6 +783,7 @@ private[akka] class ShardRegion( case query: ShardRegionQuery => receiveQuery(query) case msg: RestartShard => deliverMessage(msg, sender()) case msg: StartEntity => deliverStartEntity(msg, sender()) + case msg: SetActiveEntityLimit => deliverToAllShards(msg, sender()) case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender()) case unknownMsg => log.warning("{}: Message does not have an extractor defined in shard so it was ignored: {}", typeName, unknownMsg) @@ -1277,6 +1287,9 @@ private[akka] class ShardRegion( } } + def deliverToAllShards(msg: Any, snd: ActorRef): Unit = + shards.values.foreach(_.tell(msg, snd)) + def deliverMessage(msg: Any, snd: ActorRef): Unit = msg match { case RestartShard(shardId) => diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EntityPassivationStrategy.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EntityPassivationStrategy.scala index 058e20347e..2b6f1acadc 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EntityPassivationStrategy.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EntityPassivationStrategy.scala @@ -49,6 +49,13 @@ private[akka] object EntityPassivationStrategy { private[akka] sealed abstract class EntityPassivationStrategy { import EntityPassivationStrategy.PassivateEntities + /** + * The per-region active entity limit has been updated, which can trigger passivation. + * @param newLimit the new per-region active entity limit + * @return entities to passivate in the associated shard + */ + def limitUpdated(newLimit: Int): PassivateEntities + /** * Active shards in this region have been updated, which can trigger passivation. * @param activeShards updated number of active shards @@ -89,6 +96,7 @@ private[akka] sealed abstract class EntityPassivationStrategy { private[akka] object DisabledEntityPassivationStrategy extends EntityPassivationStrategy { import EntityPassivationStrategy.PassivateEntities + override def limitUpdated(newLimit: Int): PassivateEntities = PassivateEntities.none override def shardsUpdated(activeShards: Int): PassivateEntities = PassivateEntities.none override def entityTouched(id: EntityId): PassivateEntities = PassivateEntities.none override def entityTerminated(id: EntityId): Unit = () @@ -115,6 +123,8 @@ private[akka] final class IdleEntityPassivationStrategy(idleCheck: IdleCheck) ex override val scheduledInterval: Option[FiniteDuration] = Some(idleCheck.interval) + override def limitUpdated(newLimit: Int): PassivateEntities = PassivateEntities.none + override def shardsUpdated(activeShards: Int): PassivateEntities = PassivateEntities.none override def entityTouched(id: EntityId): PassivateEntities = { @@ -127,28 +137,49 @@ private[akka] final class IdleEntityPassivationStrategy(idleCheck: IdleCheck) ex override def intervalPassed(): PassivateEntities = recencyList.removeLeastRecentOutside(idleCheck.timeout) } +/** + * INTERNAL API: Shared base class for limit-based passivation strategies. + * @param initialLimit initial active entity capacity for a shard region + */ +@InternalApi +private[akka] abstract class LimitBasedEntityPassivationStrategy(initialLimit: Int) extends EntityPassivationStrategy { + import EntityPassivationStrategy.PassivateEntities + + protected var activeShards: Int = 1 + protected var perRegionLimit: Int = initialLimit + protected var perShardLimit: Int = perRegionLimit + + override def limitUpdated(newPerRegionLimit: Int): PassivateEntities = { + perRegionLimit = newPerRegionLimit + perShardLimit = perRegionLimit / activeShards + passivateEntitiesOnLimitUpdate() + } + + override def shardsUpdated(newActiveShards: Int): PassivateEntities = { + activeShards = newActiveShards + perShardLimit = perRegionLimit / activeShards + passivateEntitiesOnLimitUpdate() + } + + protected def passivateEntitiesOnLimitUpdate(): PassivateEntities +} + /** * INTERNAL API: Passivate the least recently used entities when the number of active entities in a shard region * reaches a limit. The per-region limit is divided evenly among the active shards in a region. - * @param perRegionLimit active entity capacity for a shard region + * @param initialLimit initial active entity capacity for a shard region * @param idleCheck optionally passivate idle entities after the given timeout, checking every interval */ @InternalApi -private[akka] final class LeastRecentlyUsedEntityPassivationStrategy(perRegionLimit: Int, idleCheck: Option[IdleCheck]) - extends EntityPassivationStrategy { +private[akka] final class LeastRecentlyUsedEntityPassivationStrategy(initialLimit: Int, idleCheck: Option[IdleCheck]) + extends LimitBasedEntityPassivationStrategy(initialLimit) { import EntityPassivationStrategy.PassivateEntities - private var perShardLimit: Int = perRegionLimit private val recencyList = RecencyList.empty[EntityId] override val scheduledInterval: Option[FiniteDuration] = idleCheck.map(_.interval) - override def shardsUpdated(activeShards: Int): PassivateEntities = { - perShardLimit = perRegionLimit / activeShards - passivateExcessEntities() - } - override def entityTouched(id: EntityId): PassivateEntities = { recencyList.update(id) passivateExcessEntities() @@ -160,6 +191,8 @@ private[akka] final class LeastRecentlyUsedEntityPassivationStrategy(perRegionLi recencyList.removeLeastRecentOutside(idle.timeout) } + override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities = passivateExcessEntities() + private def passivateExcessEntities(): PassivateEntities = { val excess = recencyList.size - perShardLimit if (excess > 0) recencyList.removeLeastRecent(excess) else PassivateEntities.none @@ -174,21 +207,19 @@ private[akka] final class LeastRecentlyUsedEntityPassivationStrategy(perRegionLi * Active entities are tracked in multiple recency lists, where entities are promoted to higher-level * segments on subsequent accesses, and demoted through levels when segments become full. * The proportions of the segmented levels can be configured as fractions of the overall limit. - * @param perRegionLimit active entity capacity for a shard region + * @param initialLimit initial active entity capacity for a shard region * @param proportions proportions of the segmented levels * @param idleCheck optionally passivate idle entities after the given timeout, checking every interval */ @InternalApi private[akka] final class SegmentedLeastRecentlyUsedEntityPassivationStrategy( - perRegionLimit: Int, + initialLimit: Int, proportions: immutable.Seq[Double], idleCheck: Option[IdleCheck]) - extends EntityPassivationStrategy { + extends LimitBasedEntityPassivationStrategy(initialLimit) { import EntityPassivationStrategy.PassivateEntities - private var perShardLimit: Int = perRegionLimit - private def limits: immutable.Seq[Int] = proportions.map(p => (p * perShardLimit).toInt) private val segmentedRecencyList = @@ -197,12 +228,6 @@ private[akka] final class SegmentedLeastRecentlyUsedEntityPassivationStrategy( override val scheduledInterval: Option[FiniteDuration] = idleCheck.map(_.interval) - override def shardsUpdated(activeShards: Int): PassivateEntities = { - perShardLimit = perRegionLimit / activeShards - segmentedRecencyList.updateLimits(limits) - passivateExcessEntities() - } - override def entityTouched(id: EntityId): PassivateEntities = { segmentedRecencyList.update(id) passivateExcessEntities() @@ -214,6 +239,11 @@ private[akka] final class SegmentedLeastRecentlyUsedEntityPassivationStrategy( segmentedRecencyList.removeOverallLeastRecentOutside(idle.timeout) } + override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities = { + segmentedRecencyList.updateLimits(limits) + passivateExcessEntities() + } + private def passivateExcessEntities(): PassivateEntities = segmentedRecencyList.removeLeastRecentOverLimit() } @@ -222,25 +252,19 @@ private[akka] final class SegmentedLeastRecentlyUsedEntityPassivationStrategy( * * Passivate the most recently used entities when the number of active entities in a shard region * reaches a limit. The per-region limit is divided evenly among the active shards in a region. - * @param perRegionLimit active entity capacity for a shard region + * @param initialLimit initial active entity capacity for a shard region * @param idleCheck optionally passivate idle entities after the given timeout, checking every interval */ @InternalApi -private[akka] final class MostRecentlyUsedEntityPassivationStrategy(perRegionLimit: Int, idleCheck: Option[IdleCheck]) - extends EntityPassivationStrategy { +private[akka] final class MostRecentlyUsedEntityPassivationStrategy(initialLimit: Int, idleCheck: Option[IdleCheck]) + extends LimitBasedEntityPassivationStrategy(initialLimit) { import EntityPassivationStrategy.PassivateEntities - private var perShardLimit: Int = perRegionLimit private val recencyList = RecencyList.empty[EntityId] override val scheduledInterval: Option[FiniteDuration] = idleCheck.map(_.interval) - override def shardsUpdated(activeShards: Int): PassivateEntities = { - perShardLimit = perRegionLimit / activeShards - passivateExcessEntities() - } - override def entityTouched(id: EntityId): PassivateEntities = { recencyList.update(id) passivateExcessEntities(skip = 1) // remove most recent before adding this created entity @@ -252,6 +276,8 @@ private[akka] final class MostRecentlyUsedEntityPassivationStrategy(perRegionLim recencyList.removeLeastRecentOutside(idle.timeout) } + override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities = passivateExcessEntities() + private def passivateExcessEntities(skip: Int = 0): PassivateEntities = { val excess = recencyList.size - perShardLimit if (excess > 0) recencyList.removeMostRecent(excess, skip) else PassivateEntities.none @@ -263,31 +289,25 @@ private[akka] final class MostRecentlyUsedEntityPassivationStrategy(perRegionLim * * Passivate the least frequently used entities when the number of active entities in a shard region * reaches a limit. The per-region limit is divided evenly among the active shards in a region. - * @param perRegionLimit active entity capacity for a shard region + * @param initialLimit initial active entity capacity for a shard region * @param dynamicAging whether to apply "dynamic aging" as entities are passivated * @param idleCheck optionally passivate idle entities after the given timeout, checking every interval */ @InternalApi private[akka] final class LeastFrequentlyUsedEntityPassivationStrategy( - perRegionLimit: Int, + initialLimit: Int, dynamicAging: Boolean, idleCheck: Option[IdleCheck]) - extends EntityPassivationStrategy { + extends LimitBasedEntityPassivationStrategy(initialLimit) { import EntityPassivationStrategy.PassivateEntities - private var perShardLimit: Int = perRegionLimit private val frequencyList = if (idleCheck.isDefined) FrequencyList.withOverallRecency.empty[EntityId](dynamicAging) else FrequencyList.empty[EntityId](dynamicAging) override val scheduledInterval: Option[FiniteDuration] = idleCheck.map(_.interval) - override def shardsUpdated(activeShards: Int): PassivateEntities = { - perShardLimit = perRegionLimit / activeShards - passivateExcessEntities() - } - override def entityTouched(id: EntityId): PassivateEntities = { // first remove excess entities so that dynamic aging is updated // and the adjusted age is applied to any new entities on update @@ -304,6 +324,8 @@ private[akka] final class LeastFrequentlyUsedEntityPassivationStrategy( frequencyList.removeOverallLeastRecentOutside(idle.timeout) } + override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities = passivateExcessEntities() + private def passivateExcessEntities(adjustment: Int = 0): PassivateEntities = { val excess = frequencyList.size - perShardLimit + adjustment if (excess > 0) frequencyList.removeLeastFrequent(excess) else PassivateEntities.none diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/LeastFrequentlyUsedSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/LeastFrequentlyUsedSpec.scala index 3d4a39eaba..134695605a 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/LeastFrequentlyUsedSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/LeastFrequentlyUsedSpec.scala @@ -4,6 +4,7 @@ package akka.cluster.sharding.passivation +import akka.cluster.sharding.ShardRegion import com.typesafe.config.Config import com.typesafe.config.ConfigFactory @@ -324,3 +325,64 @@ class LeastFrequentlyUsedWithIdleSpec } } } + +class LeastFrequentlyUsedLimitAdjustmentSpec + extends AbstractEntityPassivationSpec(LeastFrequentlyUsedSpec.config, expectedEntities = 21) { + + import EntityPassivationSpec.Entity.Envelope + import EntityPassivationSpec.Entity.Stop + + "Passivation of least frequently used entities" must { + "adjust per-shard entity limits when the per-region limit is dynamically adjusted" in { + val region = start() + + // only one active shard at first, initial per-shard limit of 10 + for (id <- 1 to 20) { + region ! Envelope(shard = 1, id = id, message = "A") + expectReceived(id = id, message = "A") + if (id > 10) expectReceived(id = id - 10, message = Stop) + } + + expectState(region)(1 -> (11 to 20)) + + // activating a second shard will divide the per-shard limit in two, passivating half of the first shard + region ! Envelope(shard = 2, id = 21, message = "B") + expectReceived(id = 21, message = "B") + for (id <- 11 to 15) { + expectReceived(id = id, message = Stop) + } + + expectState(region)(1 -> (16 to 20), 2 -> Set(21)) + + // reduce the per-region limit from 10 to 6, per-shard limit becomes 3 + region ! ShardRegion.SetActiveEntityLimit(6) + for (id <- 16 to 17) { // passivate entities over new limit + expectReceived(id = id, message = Stop) + } + + expectState(region)(1 -> (18 to 20), 2 -> Set(21)) + + for (id <- 1 to 10) { + region ! Envelope(shard = 1, id = id, message = "C") + expectReceived(id = id, message = "C") + val passivated = if (id < 4) id + 17 else id - 3 + expectReceived(id = passivated, message = Stop) + } + + expectState(region)(1 -> (8 to 10), 2 -> Set(21)) + + // increase the per-region limit from 6 to 12, per-shard limit becomes 6 + region ! ShardRegion.SetActiveEntityLimit(12) + + for (id <- 11 to 20) { + region ! Envelope(shard = 1, id = id, message = "D") + expectReceived(id = id, message = "D") + if (id > 13) { // start passivating at new higher limit of 6 + expectReceived(id = id - 6, message = Stop) + } + } + + expectState(region)(1 -> (15 to 20), 2 -> Set(21)) + } + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/LeastRecentlyUsedSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/LeastRecentlyUsedSpec.scala index d70165190a..0c39051067 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/LeastRecentlyUsedSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/LeastRecentlyUsedSpec.scala @@ -4,6 +4,7 @@ package akka.cluster.sharding.passivation +import akka.cluster.sharding.ShardRegion import com.typesafe.config.Config import com.typesafe.config.ConfigFactory @@ -43,6 +44,11 @@ object LeastRecentlyUsedSpec { } """).withFallback(EntityPassivationSpec.config) + val segmentedInitialLimitConfig: Config = + ConfigFactory.parseString(""" + akka.cluster.sharding.passivation.slru.active-entity-limit = 20 + """).withFallback(segmentedConfig) + val idleConfig: Config = ConfigFactory.parseString(""" akka.cluster.sharding { passivation { @@ -279,3 +285,125 @@ class LeastRecentlyUsedWithIdleSpec } } } + +class LeastRecentlyUsedLimitAdjustmentSpec + extends AbstractEntityPassivationSpec(LeastRecentlyUsedSpec.config, expectedEntities = 21) { + + import EntityPassivationSpec.Entity.Envelope + import EntityPassivationSpec.Entity.Stop + + "Passivation of least recently used entities" must { + "adjust per-shard entity limits when the per-region limit is dynamically adjusted" in { + val region = start() + + // only one active shard at first, initial per-shard limit of 10 + for (id <- 1 to 20) { + region ! Envelope(shard = 1, id = id, message = "A") + expectReceived(id = id, message = "A") + if (id > 10) expectReceived(id = id - 10, message = Stop) + } + + expectState(region)(1 -> (11 to 20)) + + // activating a second shard will divide the per-shard limit in two, passivating half of the first shard + region ! Envelope(shard = 2, id = 21, message = "B") + expectReceived(id = 21, message = "B") + for (id <- 11 to 15) { + expectReceived(id = id, message = Stop) + } + + expectState(region)(1 -> (16 to 20), 2 -> Set(21)) + + // reduce the per-region limit from 10 to 6, per-shard limit becomes 3 + region ! ShardRegion.SetActiveEntityLimit(6) + for (id <- 16 to 17) { // passivate entities over new limit + expectReceived(id = id, message = Stop) + } + + expectState(region)(1 -> (18 to 20), 2 -> Set(21)) + + for (id <- 1 to 10) { + region ! Envelope(shard = 1, id = id, message = "C") + expectReceived(id = id, message = "C") + val passivated = if (id < 4) id + 17 else id - 3 + expectReceived(id = passivated, message = Stop) + } + + expectState(region)(1 -> (8 to 10), 2 -> Set(21)) + + // increase the per-region limit from 6 to 12, per-shard limit becomes 6 + region ! ShardRegion.SetActiveEntityLimit(12) + + for (id <- 11 to 20) { + region ! Envelope(shard = 1, id = id, message = "D") + expectReceived(id = id, message = "D") + if (id > 13) { // start passivating at new higher limit of 6 + expectReceived(id = id - 6, message = Stop) + } + } + + expectState(region)(1 -> (15 to 20), 2 -> Set(21)) + } + } +} + +class SegmentedLeastRecentlyUsedLimitAdjustmentSpec + extends AbstractEntityPassivationSpec(LeastRecentlyUsedSpec.segmentedInitialLimitConfig, expectedEntities = 31) { + + import EntityPassivationSpec.Entity.Envelope + import EntityPassivationSpec.Entity.Stop + + "Passivation of segmented least recently used entities" must { + "adjust per-shard entity limits when the per-region limit is dynamically adjusted" in { + val region = start() + + // only one active shard at first, initial per-shard limit of 20 + for (id <- 1 to 30) { + region ! Envelope(shard = 1, id = id, message = "A") + expectReceived(id = id, message = "A") + if (id > 20) expectReceived(id = id - 20, message = Stop) + } + + expectState(region)(1 -> (11 to 30)) + + // activating a second shard will divide the per-shard limit in two, passivating half of the first shard + region ! Envelope(shard = 2, id = 31, message = "B") + expectReceived(id = 31, message = "B") + for (id <- 11 to 20) { + expectReceived(id = id, message = Stop) + } + + expectState(region)(1 -> (21 to 30), 2 -> Set(31)) + + // reduce the per-region limit from 20 to 10, per-shard limit becomes 5 + region ! ShardRegion.SetActiveEntityLimit(10) + for (id <- 21 to 25) { // passivate entities over new limit + expectReceived(id = id, message = Stop) + } + + expectState(region)(1 -> (26 to 30), 2 -> Set(31)) + + for (id <- 1 to 10) { + region ! Envelope(shard = 1, id = id, message = "C") + expectReceived(id = id, message = "C") + val passivated = if (id < 6) id + 25 else id - 5 + expectReceived(id = passivated, message = Stop) + } + + expectState(region)(1 -> (6 to 10), 2 -> Set(31)) + + // increase the per-region limit from 10 to 30, per-shard limit becomes 15 + region ! ShardRegion.SetActiveEntityLimit(30) + + for (id <- 11 to 30) { + region ! Envelope(shard = 1, id = id, message = "D") + expectReceived(id = id, message = "D") + if (id > 20) { // start passivating at new higher limit of 15 + expectReceived(id = id - 15, message = Stop) + } + } + + expectState(region)(1 -> (16 to 30), 2 -> Set(31)) + } + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/MostRecentlyUsedSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/MostRecentlyUsedSpec.scala index 09df025f89..15269e49e1 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/MostRecentlyUsedSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/MostRecentlyUsedSpec.scala @@ -4,6 +4,7 @@ package akka.cluster.sharding.passivation +import akka.cluster.sharding.ShardRegion import com.typesafe.config.Config import com.typesafe.config.ConfigFactory @@ -187,3 +188,63 @@ class MostRecentlyUsedWithIdleSpec } } } + +class MostRecentlyUsedLimitAdjustmentSpec + extends AbstractEntityPassivationSpec(MostRecentlyUsedSpec.config, expectedEntities = 21) { + + import EntityPassivationSpec.Entity.Envelope + import EntityPassivationSpec.Entity.Stop + + "Passivation of most recently used entities" must { + "adjust per-shard entity limits when the per-region limit is dynamically adjusted" in { + val region = start() + + // only one active shard at first, initial per-shard limit of 10 + for (id <- 1 to 20) { + region ! Envelope(shard = 1, id = id, message = "A") + expectReceived(id = id, message = "A") + if (id > 10) expectReceived(id = id - 1, message = Stop) + } + + expectState(region)(1 -> Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 20)) + + // activating a second shard will divide the per-shard limit in two, passivating half of the first shard + region ! Envelope(shard = 2, id = 21, message = "B") + expectReceived(id = 21, message = "B") + for (id <- Seq(20, 9, 8, 7, 6)) { + expectReceived(id, message = Stop) + } + + expectState(region)(1 -> Set(1, 2, 3, 4, 5), 2 -> Set(21)) + + // reduce the per-region limit from 10 to 6, per-shard limit becomes 3 + region ! ShardRegion.SetActiveEntityLimit(6) + for (id <- Seq(5, 4)) { // passivate entities over new limit + expectReceived(id = id, message = Stop) + } + + expectState(region)(1 -> Set(1, 2, 3), 2 -> Set(21)) + + for (id <- 1 to 10) { + region ! Envelope(shard = 1, id = id, message = "C") + expectReceived(id = id, message = "C") + if (id > 5) expectReceived(id = id - 1, message = Stop) + } + + expectState(region)(1 -> Set(1, 2, 10), 2 -> Set(21)) + + // increase the per-region limit from 6 to 12, per-shard limit becomes 6 + region ! ShardRegion.SetActiveEntityLimit(12) + + for (id <- 11 to 20) { + region ! Envelope(shard = 1, id = id, message = "D") + expectReceived(id = id, message = "D") + if (id > 13) { // start passivating at new higher limit of 6 + expectReceived(id = id - 1, message = Stop) + } + } + + expectState(region)(1 -> Set(1, 2, 10, 11, 12, 20), 2 -> Set(21)) + } + } +}