diff --git a/akka-actor-tests/src/test/scala/akka/util/RecencyListSpec.scala b/akka-actor-tests/src/test/scala/akka/util/RecencyListSpec.scala index 4d4f6a6958..203f841ad2 100644 --- a/akka-actor-tests/src/test/scala/akka/util/RecencyListSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/util/RecencyListSpec.scala @@ -92,6 +92,18 @@ class RecencyListSpec extends AnyWordSpec with Matchers { clock.tick() // time = 14 recency.removeMostRecentWithin(3.seconds) shouldBe List("r", "k", "q", "p") check(recency, List("n", "o")) + + clock.tick() // time = 15 + recency.update("s").update("t").update("u").update("v").update("w").update("x").update("y").update("z") + check(recency, List("n", "o", "s", "t", "u", "v", "w", "x", "y", "z")) + + clock.tick() // time = 16 + recency.removeLeastRecent(3, skip = 3) shouldBe List("t", "u", "v") + check(recency, List("n", "o", "s", "w", "x", "y", "z")) + + clock.tick() // time = 17 + recency.removeMostRecent(3, skip = 2) shouldBe List("x", "w", "s") + check(recency, List("n", "o", "y", "z")) } } diff --git a/akka-actor/src/main/scala/akka/util/RecencyList.scala b/akka-actor/src/main/scala/akka/util/RecencyList.scala index 574fa0a669..1911df6de2 100644 --- a/akka-actor/src/main/scala/akka/util/RecencyList.scala +++ b/akka-actor/src/main/scala/akka/util/RecencyList.scala @@ -76,11 +76,11 @@ private[akka] class RecencyList[A](clock: RecencyList.Clock) { private val lessRecent: Node[A] => OptionVal[Node[A]] = _.lessRecent private val moreRecent: Node[A] => OptionVal[Node[A]] = _.moreRecent - def removeLeastRecent(n: Int = 1): immutable.Seq[A] = - removeWhile(start = leastRecent, next = moreRecent, limit = n) + def removeLeastRecent(n: Int = 1, skip: Int = 0): immutable.Seq[A] = + removeWhile(start = leastRecent, next = moreRecent, limit = n, skip = skip) - def removeMostRecent(n: Int = 1): immutable.Seq[A] = - removeWhile(start = mostRecent, next = lessRecent, limit = n) + def removeMostRecent(n: Int = 1, skip: Int = 0): immutable.Seq[A] = + removeWhile(start = mostRecent, next = lessRecent, limit = n, skip = skip) def removeLeastRecentOutside(duration: FiniteDuration): immutable.Seq[A] = { val min = clock.earlierTime(duration) @@ -118,15 +118,19 @@ private[akka] class RecencyList[A](clock: RecencyList.Clock) { start: OptionVal[Node[A]], next: Node[A] => OptionVal[Node[A]], continueWhile: Node[A] => Boolean = continueToLimit, - limit: Int = size): immutable.Seq[A] = { + limit: Int = size, + skip: Int = 0): immutable.Seq[A] = { var count = 0 var node = start + val max = limit + skip val values = mutable.ListBuffer.empty[A] - while (node.isDefined && continueWhile(node.get) && (count < limit)) { + while (node.isDefined && continueWhile(node.get) && (count < max)) { count += 1 - removeFromCurrentPosition(node.get) - lookupNode -= node.get.value - values += node.get.value + if (count > skip) { + removeFromCurrentPosition(node.get) + lookupNode -= node.get.value + values += node.get.value + } node = next(node.get) } values.result() diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index 3c10063b8a..85e077e27d 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -72,7 +72,8 @@ object ClusterShardingSettings { passivationStrategySettings = new ClassicShardingSettings.PassivationStrategySettings( strategy = settings.passivationStrategySettings.strategy, idleTimeout = settings.passivationStrategySettings.idleTimeout, - leastRecentlyUsedLimit = settings.passivationStrategySettings.leastRecentlyUsedLimit), + leastRecentlyUsedLimit = settings.passivationStrategySettings.leastRecentlyUsedLimit, + mostRecentlyUsedLimit = settings.passivationStrategySettings.mostRecentlyUsedLimit), shardRegionQueryTimeout = settings.shardRegionQueryTimeout, new ClassicShardingSettings.TuningParameters( bufferSize = settings.tuningParameters.bufferSize, @@ -169,13 +170,19 @@ object ClusterShardingSettings { val strategy: String, val idleTimeout: FiniteDuration, val leastRecentlyUsedLimit: Int, + val mostRecentlyUsedLimit: Int, private[akka] val oldSettingUsed: Boolean) { - def this(strategy: String, idleTimeout: FiniteDuration, leastRecentlyUsedLimit: Int) = - this(strategy, idleTimeout, leastRecentlyUsedLimit, oldSettingUsed = false) + def this(strategy: String, idleTimeout: FiniteDuration, leastRecentlyUsedLimit: Int, mostRecentlyUsedLimit: Int) = + this(strategy, idleTimeout, leastRecentlyUsedLimit, mostRecentlyUsedLimit, oldSettingUsed = false) def this(classic: ClassicShardingSettings.PassivationStrategySettings) = - this(classic.strategy, classic.idleTimeout, classic.leastRecentlyUsedLimit, classic.oldSettingUsed) + this( + classic.strategy, + classic.idleTimeout, + classic.leastRecentlyUsedLimit, + classic.mostRecentlyUsedLimit, + classic.oldSettingUsed) def withIdleStrategy(timeout: FiniteDuration): PassivationStrategySettings = copy(strategy = "idle", idleTimeout = timeout) @@ -183,6 +190,9 @@ object ClusterShardingSettings { def withLeastRecentlyUsedStrategy(limit: Int): PassivationStrategySettings = copy(strategy = "least-recently-used", leastRecentlyUsedLimit = limit) + def withMostRecentlyUsedStrategy(limit: Int): PassivationStrategySettings = + copy(strategy = "most-recently-used", mostRecentlyUsedLimit = limit) + private[akka] def withOldIdleStrategy(timeout: FiniteDuration): PassivationStrategySettings = copy(strategy = "idle", idleTimeout = timeout, oldSettingUsed = true) @@ -190,8 +200,14 @@ object ClusterShardingSettings { strategy: String, idleTimeout: FiniteDuration = idleTimeout, leastRecentlyUsedLimit: Int = leastRecentlyUsedLimit, + mostRecentlyUsedLimit: Int = mostRecentlyUsedLimit, oldSettingUsed: Boolean = oldSettingUsed): PassivationStrategySettings = - new PassivationStrategySettings(strategy, idleTimeout, leastRecentlyUsedLimit, oldSettingUsed) + new PassivationStrategySettings( + strategy, + idleTimeout, + leastRecentlyUsedLimit, + mostRecentlyUsedLimit, + oldSettingUsed) } object PassivationStrategySettings { @@ -199,6 +215,7 @@ object ClusterShardingSettings { strategy = "none", idleTimeout = Duration.Zero, leastRecentlyUsedLimit = 0, + mostRecentlyUsedLimit = 0, oldSettingUsed = false) def oldDefault(idleTimeout: FiniteDuration): PassivationStrategySettings = @@ -531,6 +548,9 @@ final class ClusterShardingSettings( def withLeastRecentlyUsedPassivationStrategy(limit: Int): ClusterShardingSettings = copy(passivationStrategySettings = passivationStrategySettings.withLeastRecentlyUsedStrategy(limit)) + def withMostRecentlyUsedPassivationStrategy(limit: Int): ClusterShardingSettings = + copy(passivationStrategySettings = passivationStrategySettings.withMostRecentlyUsedStrategy(limit)) + def withShardRegionQueryTimeout(duration: FiniteDuration): ClusterShardingSettings = copy(shardRegionQueryTimeout = duration) diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 36d0eab953..04c9bb2016 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -39,6 +39,7 @@ akka.cluster.sharding { # Passivation strategy to use. Possible values are: # - "idle" # - "least-recently-used" + # - "most-recently-used" # Set to "none" or "off" to disable automatic passivation. # Passivation strategies are always disabled if `remember-entities` is enabled. strategy = "idle" @@ -57,6 +58,14 @@ akka.cluster.sharding { # Limit of active entities in a shard region. limit = 100000 } + + # Most recently used passivation strategy. + # Passivate the most recently used entities when the number of active entities in a shard region + # reaches a limit. The per-region limit is divided evenly among the active shards in a region. + most-recently-used { + # Limit of active entities in a shard region. + limit = 100000 + } } # If the coordinator can't store state changes it will be stopped diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 1303b78e2c..ab443e0122 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -134,10 +134,11 @@ object ClusterShardingSettings { val strategy: String, val idleTimeout: FiniteDuration, val leastRecentlyUsedLimit: Int, + val mostRecentlyUsedLimit: Int, private[akka] val oldSettingUsed: Boolean) { - def this(strategy: String, idleTimeout: FiniteDuration, leastRecentlyUsedLimit: Int) = - this(strategy, idleTimeout, leastRecentlyUsedLimit, oldSettingUsed = false) + def this(strategy: String, idleTimeout: FiniteDuration, leastRecentlyUsedLimit: Int, mostRecentlyUsedLimit: Int) = + this(strategy, idleTimeout, leastRecentlyUsedLimit, mostRecentlyUsedLimit, oldSettingUsed = false) def withIdleStrategy(timeout: FiniteDuration): PassivationStrategySettings = copy(strategy = "idle", idleTimeout = timeout, oldSettingUsed = false) @@ -145,6 +146,9 @@ object ClusterShardingSettings { def withLeastRecentlyUsedStrategy(limit: Int): PassivationStrategySettings = copy(strategy = "least-recently-used", leastRecentlyUsedLimit = limit) + def withMostRecentlyUsedStrategy(limit: Int): PassivationStrategySettings = + copy(strategy = "most-recently-used", mostRecentlyUsedLimit = limit) + private[akka] def withOldIdleStrategy(timeout: FiniteDuration): PassivationStrategySettings = copy(strategy = "idle", idleTimeout = timeout, oldSettingUsed = true) @@ -152,8 +156,14 @@ object ClusterShardingSettings { strategy: String, idleTimeout: FiniteDuration = idleTimeout, leastRecentlyUsedLimit: Int = leastRecentlyUsedLimit, + mostRecentlyUsedLimit: Int = mostRecentlyUsedLimit, oldSettingUsed: Boolean = oldSettingUsed): PassivationStrategySettings = - new PassivationStrategySettings(strategy, idleTimeout, leastRecentlyUsedLimit, oldSettingUsed) + new PassivationStrategySettings( + strategy, + idleTimeout, + leastRecentlyUsedLimit, + mostRecentlyUsedLimit, + oldSettingUsed) } object PassivationStrategySettings { @@ -161,13 +171,15 @@ object ClusterShardingSettings { strategy = "none", idleTimeout = Duration.Zero, leastRecentlyUsedLimit = 0, + mostRecentlyUsedLimit = 0, oldSettingUsed = false) def apply(config: Config): PassivationStrategySettings = { val settings = new PassivationStrategySettings( strategy = toRootLowerCase(config.getString("passivation.strategy")), idleTimeout = config.getDuration("passivation.idle.timeout", MILLISECONDS).millis, - leastRecentlyUsedLimit = config.getInt("passivation.least-recently-used.limit")) + leastRecentlyUsedLimit = config.getInt("passivation.least-recently-used.limit"), + mostRecentlyUsedLimit = config.getInt("passivation.most-recently-used.limit")) // default to old setting if it exists (defined in application.conf), overriding the new settings if (config.hasPath("passivate-idle-entity-after")) { val timeout = @@ -191,6 +203,7 @@ object ClusterShardingSettings { private[akka] case object NoPassivationStrategy extends PassivationStrategy private[akka] case class IdlePassivationStrategy(timeout: FiniteDuration) extends PassivationStrategy private[akka] case class LeastRecentlyUsedPassivationStrategy(limit: Int) extends PassivationStrategy + private[akka] case class MostRecentlyUsedPassivationStrategy(limit: Int) extends PassivationStrategy /** * INTERNAL API @@ -207,6 +220,8 @@ object ClusterShardingSettings { IdlePassivationStrategy(settings.passivationStrategySettings.idleTimeout) case "least-recently-used" => LeastRecentlyUsedPassivationStrategy(settings.passivationStrategySettings.leastRecentlyUsedLimit) + case "most-recently-used" => + MostRecentlyUsedPassivationStrategy(settings.passivationStrategySettings.mostRecentlyUsedLimit) case _ => NoPassivationStrategy } } @@ -619,6 +634,9 @@ final class ClusterShardingSettings( def withLeastRecentlyUsedPassivationStrategy(limit: Int): ClusterShardingSettings = copy(passivationStrategySettings = passivationStrategySettings.withLeastRecentlyUsedStrategy(limit)) + def withMostRecentlyUsedPassivationStrategy(limit: Int): ClusterShardingSettings = + copy(passivationStrategySettings = passivationStrategySettings.withMostRecentlyUsedStrategy(limit)) + def withShardRegionQueryTimeout(duration: FiniteDuration): ClusterShardingSettings = copy(shardRegionQueryTimeout = duration) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EntityPassivationStrategy.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EntityPassivationStrategy.scala index a9e5bf07ab..a750d7fc3f 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EntityPassivationStrategy.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/internal/EntityPassivationStrategy.scala @@ -26,6 +26,8 @@ private[akka] object EntityPassivationStrategy { new IdleEntityPassivationStrategy(timeout) case ClusterShardingSettings.LeastRecentlyUsedPassivationStrategy(limit) => new LeastRecentlyUsedEntityPassivationStrategy(limit) + case ClusterShardingSettings.MostRecentlyUsedPassivationStrategy(limit) => + new MostRecentlyUsedEntityPassivationStrategy(limit) case _ => DisabledEntityPassivationStrategy } } @@ -154,3 +156,40 @@ private[akka] final class LeastRecentlyUsedEntityPassivationStrategy(perRegionLi if (excess > 0) recencyList.removeLeastRecent(excess) else PassivateEntities.none } } + +/** + * Passivate the most recently used entities when the number of active entities in a shard region + * reaches a limit. The per-region limit is divided evenly among the active shards in a region. + * @param perRegionLimit active entity capacity for a shard region + */ +@InternalApi +private[akka] final class MostRecentlyUsedEntityPassivationStrategy(perRegionLimit: Int) + extends EntityPassivationStrategy { + import EntityPassivationStrategy.PassivateEntities + + private var perShardLimit: Int = perRegionLimit + private val recencyList = RecencyList.empty[EntityId] + + override val scheduledInterval: Option[FiniteDuration] = None + + override def shardsUpdated(activeShards: Int): PassivateEntities = { + perShardLimit = perRegionLimit / activeShards + passivateExcessEntities() + } + + override def entityCreated(id: EntityId): PassivateEntities = { + recencyList.update(id) + passivateExcessEntities(skip = 1) // remove most recent before adding this created entity + } + + override def entityTouched(id: EntityId): Unit = recencyList.update(id) + + override def entityTerminated(id: EntityId): Unit = recencyList.remove(id) + + override def intervalPassed(): PassivateEntities = PassivateEntities.none + + private def passivateExcessEntities(skip: Int = 0): PassivateEntities = { + val excess = recencyList.size - perShardLimit + if (excess > 0) recencyList.removeMostRecent(excess, skip) else PassivateEntities.none + } +} diff --git a/akka-cluster-sharding/src/test/resources/arc-trace-database.conf b/akka-cluster-sharding/src/test/resources/arc-trace-database.conf index 6f19c316d4..1d181734f9 100644 --- a/akka-cluster-sharding/src/test/resources/arc-trace-database.conf +++ b/akka-cluster-sharding/src/test/resources/arc-trace-database.conf @@ -18,6 +18,14 @@ # ║ LRU 4M │ 20.24 % │ 43,704,979 │ 34,857,131 │ 30,857,131 ║ # ╟────────┼─────────┼────────────┼─────────────┼──────────────╢ # ║ LRU 8M │ 43.03 % │ 43,704,979 │ 24,896,638 │ 16,896,638 ║ +# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ MRU 1M │ 11.92 % │ 43,704,979 │ 38,493,369 │ 37,493,369 ║ +# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ MRU 2M │ 25.67 % │ 43,704,979 │ 32,486,285 │ 30,486,285 ║ +# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ MRU 4M │ 41.82 % │ 43,704,979 │ 25,429,608 │ 21,429,608 ║ +# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ MRU 8M │ 68.01 % │ 43,704,979 │ 13,980,246 │ 5,980,246 ║ # ╚════════╧═════════╧════════════╧═════════════╧══════════════╝ # @@ -54,7 +62,35 @@ akka.cluster.sharding { regions = 10 pattern = arc-database strategy = lru-800k - } + }, + { + name = "MRU 1M" + shards = 100 + regions = 10 + pattern = arc-database + strategy = mru-100k + }, + { + name = "MRU 2M" + shards = 100 + regions = 10 + pattern = arc-database + strategy = mru-200k + }, + { + name = "MRU 4M" + shards = 100 + regions = 10 + pattern = arc-database + strategy = mru-400k + }, + { + name = "MRU 8M" + shards = 100 + regions = 10 + pattern = arc-database + strategy = mru-800k + }, ] print-detailed-stats = true @@ -94,5 +130,33 @@ akka.cluster.sharding { per-region-limit = 800000 } } + + mru-100k { + strategy = most-recently-used + most-recently-used { + per-region-limit = 100000 + } + } + + mru-200k { + strategy = most-recently-used + most-recently-used { + per-region-limit = 200000 + } + } + + mru-400k { + strategy = most-recently-used + most-recently-used { + per-region-limit = 400000 + } + } + + mru-800k { + strategy = most-recently-used + most-recently-used { + per-region-limit = 800000 + } + } } } diff --git a/akka-cluster-sharding/src/test/resources/arc-trace-search.conf b/akka-cluster-sharding/src/test/resources/arc-trace-search.conf index e6e8868529..a2ee7f1624 100644 --- a/akka-cluster-sharding/src/test/resources/arc-trace-search.conf +++ b/akka-cluster-sharding/src/test/resources/arc-trace-search.conf @@ -16,6 +16,12 @@ # ║ LRU 500k │ 7.53 % │ 37,656,092 │ 34,822,122 │ 34,322,122 ║ # ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢ # ║ LRU 1M │ 25.37 % │ 37,656,092 │ 28,102,287 │ 27,102,287 ║ +# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ MRU 250k │ 5.25 % │ 37,656,092 │ 35,680,111 │ 35,430,111 ║ +# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ MRU 500k │ 10.50 % │ 37,656,092 │ 33,702,975 │ 33,202,975 ║ +# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ MRU 1M │ 20.79 % │ 37,656,092 │ 29,826,997 │ 28,826,997 ║ # ╚══════════╧═════════╧════════════╧═════════════╧══════════════╝ # @@ -45,6 +51,27 @@ akka.cluster.sharding { regions = 10 pattern = arc-search-merged strategy = lru-100k + }, + { + name = "MRU 250k" + shards = 100 + regions = 10 + pattern = arc-search-merged + strategy = mru-25k + }, + { + name = "MRU 500k" + shards = 100 + regions = 10 + pattern = arc-search-merged + strategy = mru-50k + }, + { + name = "MRU 1M" + shards = 100 + regions = 10 + pattern = arc-search-merged + strategy = mru-100k } ] @@ -78,5 +105,26 @@ akka.cluster.sharding { per-region-limit = 100000 } } + + mru-25k { + strategy = most-recently-used + most-recently-used { + per-region-limit = 25000 + } + } + + mru-50k { + strategy = most-recently-used + most-recently-used { + per-region-limit = 50000 + } + } + + mru-100k { + strategy = most-recently-used + most-recently-used { + per-region-limit = 100000 + } + } } } diff --git a/akka-cluster-sharding/src/test/resources/reference.conf b/akka-cluster-sharding/src/test/resources/reference.conf index 73f56a668d..91f78ee4a4 100644 --- a/akka-cluster-sharding/src/test/resources/reference.conf +++ b/akka-cluster-sharding/src/test/resources/reference.conf @@ -21,6 +21,10 @@ akka.cluster.sharding { sequence { start = 1 } + loop { + start = 1 + end = 1000000 + } uniform { min = 1 max = 10000000 diff --git a/akka-cluster-sharding/src/test/resources/synthetic-loop.conf b/akka-cluster-sharding/src/test/resources/synthetic-loop.conf new file mode 100644 index 0000000000..ab8f961103 --- /dev/null +++ b/akka-cluster-sharding/src/test/resources/synthetic-loop.conf @@ -0,0 +1,68 @@ +# +# Run with synthetically generated access events with a looping scan through ids. +# +# > akka-cluster-sharding/Test/runMain akka.cluster.sharding.passivation.simulator.Simulator synthetic-loop +# +# ╔══════════╤═════════╤════════════╤═════════════╤══════════════╗ +# ║ Run │ Active │ Accesses │ Activations │ Passivations ║ +# ╠══════════╪═════════╪════════════╪═════════════╪══════════════╣ +# ║ LRU 500k │ 0.00 % │ 10,000,000 │ 10,000,000 │ 9,500,000 ║ +# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ MRU 500k │ 45.00 % │ 10,000,000 │ 5,500,000 │ 5,000,000 ║ +# ╚══════════╧═════════╧════════════╧═════════════╧══════════════╝ +# + +akka.cluster.sharding { + passivation.simulator { + runs = [ + { + name = "LRU 500k" + shards = 100 + regions = 10 + pattern = loop-1M + strategy = lru-50k + }, + { + name = "MRU 500k" + shards = 100 + regions = 10 + pattern = loop-1M + strategy = mru-50k + } + ] + + print-detailed-stats = true + + # looping sequence of ids + # generate 10M events over 1M ids + loop-1M { + pattern = synthetic + synthetic { + events = 10000000 + generator = loop + loop { + start = 1 + end = 1000000 + } + } + } + + # LRU strategy with 50k limit in each of 10 regions + # total limit across cluster of 500k (50% of id space) + lru-50k { + strategy = least-recently-used + least-recently-used { + per-region-limit = 50000 + } + } + + # MRU strategy with 50k limit in each of 10 regions + # total limit across cluster of 500k (50% of id space) + mru-50k { + strategy = most-recently-used + most-recently-used { + per-region-limit = 50000 + } + } + } +} diff --git a/akka-cluster-sharding/src/test/resources/synthetic-zipfian.conf b/akka-cluster-sharding/src/test/resources/synthetic-zipfian.conf index 1de3545b23..3f9ce575b3 100644 --- a/akka-cluster-sharding/src/test/resources/synthetic-zipfian.conf +++ b/akka-cluster-sharding/src/test/resources/synthetic-zipfian.conf @@ -7,6 +7,8 @@ # ║ Run │ Active │ Accesses │ Activations │ Passivations ║ # ╠══════════╪═════════╪════════════╪═════════════╪══════════════╣ # ║ LRU 100k │ 40.47 % │ 50,000,000 │ 29,764,380 │ 29,664,380 ║ +# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢ +# ║ MRU 100k │ 10.08 % │ 50,000,000 │ 44,960,042 │ 44,860,042 ║ # ╚══════════╧═════════╧════════════╧═════════════╧══════════════╝ # @@ -19,6 +21,13 @@ akka.cluster.sharding { regions = 10 pattern = scrambled-zipfian strategy = lru-10k + }, + { + name = "MRU 100k" + shards = 100 + regions = 10 + pattern = scrambled-zipfian + strategy = mru-10k } ] @@ -47,5 +56,14 @@ akka.cluster.sharding { per-region-limit = 10000 } } + + # MRU strategy with 10k limit in each of 10 regions + # total limit across cluster of 100k (1% of id space) + mru-10k { + strategy = most-recently-used + most-recently-used { + per-region-limit = 10000 + } + } } } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingSettingsSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingSettingsSpec.scala index 1d5ab9ebd9..b3bbf4a855 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingSettingsSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingSettingsSpec.scala @@ -67,6 +67,25 @@ class ClusterShardingSettingsSpec extends AnyWordSpec with Matchers { .passivationStrategy shouldBe ClusterShardingSettings.LeastRecentlyUsedPassivationStrategy(42000) } + "allow most recently used passivation strategy to be configured (via config)" in { + settings(""" + #passivation-most-recently-used + akka.cluster.sharding { + passivation { + strategy = most-recently-used + most-recently-used.limit = 1000000 + } + } + #passivation-most-recently-used + """).passivationStrategy shouldBe ClusterShardingSettings.MostRecentlyUsedPassivationStrategy(1000000) + } + + "allow most recently used passivation strategy to be configured (via factory method)" in { + defaultSettings + .withMostRecentlyUsedPassivationStrategy(limit = 42000) + .passivationStrategy shouldBe ClusterShardingSettings.MostRecentlyUsedPassivationStrategy(42000) + } + "disable automatic passivation if `remember-entities` is enabled (via config)" in { settings(""" akka.cluster.sharding.remember-entities = on diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityPassivationSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityPassivationSpec.scala index 00d3e59390..1fdd33f679 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityPassivationSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityPassivationSpec.scala @@ -10,6 +10,8 @@ import akka.actor.{ Actor, ActorRef, Props } import akka.cluster.Cluster import akka.testkit.WithLogCapturing import akka.testkit.{ AkkaSpec, TestProbe } +import org.scalatest.concurrent.Eventually + import scala.concurrent.duration._ object EntityPassivationSpec { @@ -42,6 +44,15 @@ object EntityPassivationSpec { } """).withFallback(config) + val mostRecentlyUsedConfig = ConfigFactory.parseString(""" + akka.cluster.sharding { + passivation { + strategy = most-recently-used + most-recently-used.limit = 10 + } + } + """).withFallback(config) + val disabledConfig = ConfigFactory.parseString(""" akka.cluster.sharding { passivation { @@ -88,6 +99,7 @@ object EntityPassivationSpec { abstract class AbstractEntityPassivationSpec(config: Config, expectedEntities: Int) extends AkkaSpec(config) + with Eventually with WithLogCapturing { import EntityPassivationSpec._ @@ -98,6 +110,7 @@ abstract class AbstractEntityPassivationSpec(config: Config, expectedEntities: I val probes: Map[Int, TestProbe] = (1 to expectedEntities).map(id => id -> TestProbe()).toMap val probeRefs: Map[String, ActorRef] = probes.map { case (id, probe) => id.toString -> probe.ref } + val stateProbe: TestProbe = TestProbe() def expectReceived(id: Int, message: Any, within: FiniteDuration = patience.timeout): Entity.Received = { val received = probes(id).expectMsgType[Entity.Received](within) @@ -108,6 +121,18 @@ abstract class AbstractEntityPassivationSpec(config: Config, expectedEntities: I def expectNoMessage(id: Int, within: FiniteDuration): Unit = probes(id).expectNoMessage(within) + def getState(region: ActorRef): ShardRegion.CurrentShardRegionState = { + region.tell(ShardRegion.GetShardRegionState, stateProbe.ref) + stateProbe.expectMsgType[ShardRegion.CurrentShardRegionState] + } + + def expectState(region: ActorRef)(expectedShards: (Int, Iterable[Int])*): Unit = + eventually { + getState(region).shards should contain theSameElementsAs expectedShards.map { + case (shardId, entityIds) => ShardRegion.ShardState(shardId.toString, entityIds.map(_.toString).toSet) + } + } + def start(): ActorRef = { // single node cluster Cluster(system).join(Cluster(system).selfAddress) @@ -185,7 +210,7 @@ class LeastRecentlyUsedEntityPassivationSpec if (id > 10) expectReceived(id = id - 10, message = Stop) } - // shard 1 active ids: 11-20 + expectState(region)(1 -> (11 to 20)) // activating a second shard will divide the per-shard limit in two, passivating half of the first shard region ! Envelope(shard = 2, id = 21, message = "B") @@ -194,8 +219,7 @@ class LeastRecentlyUsedEntityPassivationSpec expectReceived(id = id, message = Stop) } - // shard 1 active ids: 16-20 - // shard 2 active ids: 21 + expectState(region)(1 -> (16 to 20), 2 -> Set(21)) // shards now have a limit of 5 entities for (id <- 1 to 20) { @@ -205,8 +229,7 @@ class LeastRecentlyUsedEntityPassivationSpec expectReceived(id = passivatedId, message = Stop) } - // shard 1 active ids: 16-20 - // shard 2 active ids: 21 + expectState(region)(1 -> (16 to 20), 2 -> Set(21)) // shards now have a limit of 5 entities for (id <- 21 to 24) { @@ -214,8 +237,7 @@ class LeastRecentlyUsedEntityPassivationSpec expectReceived(id = id, message = "D") } - // shard 1 active ids: 16-20 - // shard 2 active ids: 21-24 + expectState(region)(1 -> (16 to 20), 2 -> (21 to 24)) // activating a third shard will divide the per-shard limit in three, passivating entities over the new limits region ! Envelope(shard = 3, id = 31, message = "E") @@ -224,9 +246,7 @@ class LeastRecentlyUsedEntityPassivationSpec expectReceived(id = id, message = Stop) } - // shard 1 active ids: 18, 19, 20 - // shard 2 active ids: 22, 23, 24 - // shard 3 active ids: 31 + expectState(region)(1 -> Set(18, 19, 20), 2 -> Set(22, 23, 24), 3 -> Set(31)) // shards now have a limit of 3 entities for (id <- 25 to 30) { @@ -235,9 +255,7 @@ class LeastRecentlyUsedEntityPassivationSpec expectReceived(id = id - 3, message = Stop) } - // shard 1 active ids: 18, 19, 20 - // shard 2 active ids: 28, 29, 30 - // shard 3 active ids: 31 + expectState(region)(1 -> Set(18, 19, 20), 2 -> Set(28, 29, 30), 3 -> Set(31)) // shards now have a limit of 3 entities for (id <- 31 to 40) { @@ -246,9 +264,7 @@ class LeastRecentlyUsedEntityPassivationSpec if (id > 33) expectReceived(id = id - 3, message = Stop) } - // shard 1 active ids: 18, 19, 20 - // shard 2 active ids: 28, 29, 30 - // shard 3 active ids: 38, 39, 40 + expectState(region)(1 -> Set(18, 19, 20), 2 -> Set(28, 29, 30), 3 -> Set(38, 39, 40)) // manually passivate some entities region ! Envelope(shard = 1, id = 19, message = ManuallyPassivate) @@ -261,9 +277,7 @@ class LeastRecentlyUsedEntityPassivationSpec expectReceived(id = 29, message = Stop) expectReceived(id = 39, message = Stop) - // shard 1 active ids: 18, 20 - // shard 2 active ids: 28, 30 - // shard 3 active ids: 38, 40 + expectState(region)(1 -> Set(18, 20), 2 -> Set(28, 30), 3 -> Set(38, 40)) for (i <- 1 to 3) { region ! Envelope(shard = 1, id = 10 + i, message = "H") @@ -283,17 +297,114 @@ class LeastRecentlyUsedEntityPassivationSpec } } - // shard 1 active ids: 11, 12, 13 - // shard 2 active ids: 21, 22, 23 - // shard 3 active ids: 31, 32, 33 + expectState(region)(1 -> Set(11, 12, 13), 2 -> Set(21, 22, 23), 3 -> Set(31, 32, 33)) + } + } +} - val statsProbe = TestProbe() - region.tell(ShardRegion.GetShardRegionState, statsProbe.ref) - val state = statsProbe.expectMsgType[ShardRegion.CurrentShardRegionState] - state.shards shouldBe Set( - ShardRegion.ShardState("1", Set("11", "12", "13")), - ShardRegion.ShardState("2", Set("21", "22", "23")), - ShardRegion.ShardState("3", Set("31", "32", "33"))) +class MostRecentlyUsedEntityPassivationSpec + extends AbstractEntityPassivationSpec(EntityPassivationSpec.mostRecentlyUsedConfig, expectedEntities = 40) { + + import EntityPassivationSpec.Entity.{ Envelope, ManuallyPassivate, Stop } + + "Passivation of most recently used entities" must { + "passivate the most recently used entities when the per-shard entity limit is reached" in { + val region = start() + + // only one active shard at first, most recently used entities passivated once the limit is reached + for (id <- 1 to 20) { + region ! Envelope(shard = 1, id = id, message = "A") + expectReceived(id, message = "A") + if (id > 10) expectReceived(id = id - 1, message = Stop) + } + + expectState(region)(1 -> Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 20)) + + // activating a second shard will divide the per-shard limit in two, passivating half of the first shard + region ! Envelope(shard = 2, id = 21, message = "B") + expectReceived(id = 21, message = "B") + for (id <- Seq(20, 9, 8, 7, 6)) { + expectReceived(id, message = Stop) + } + + expectState(region)(1 -> Set(1, 2, 3, 4, 5), 2 -> Set(21)) + + // shards now have a limit of 5 entities + for (id <- 1 to 20) { + region ! Envelope(shard = 1, id = id, message = "C") + expectReceived(id = id, message = "C") + if (id > 5) expectReceived(id = id - 1, message = Stop) + } + + expectState(region)(1 -> Set(1, 2, 3, 4, 20), 2 -> Set(21)) + + // shards now have a limit of 5 entities + for (id <- 21 to 24) { + region ! Envelope(shard = 2, id = id, message = "D") + expectReceived(id = id, message = "D") + } + + expectState(region)(1 -> Set(1, 2, 3, 4, 20), 2 -> Set(21, 22, 23, 24)) + + // activating a third shard will divide the per-shard limit in three, passivating entities over the new limits + region ! Envelope(shard = 3, id = 31, message = "E") + expectReceived(id = 31, message = "E") + for (id <- Seq(24, 20, 4)) { + expectReceived(id = id, message = Stop) + } + + expectState(region)(1 -> Set(1, 2, 3), 2 -> Set(21, 22, 23), 3 -> Set(31)) + + // shards now have a limit of 3 entities + for (id <- 24 to 30) { + region ! Envelope(shard = 2, id = id, message = "F") + expectReceived(id = id, message = "F") + expectReceived(id = id - 1, message = Stop) + } + + expectState(region)(1 -> Set(1, 2, 3), 2 -> Set(21, 22, 30), 3 -> Set(31)) + + // shards now have a limit of 3 entities + for (id <- 31 to 40) { + region ! Envelope(shard = 3, id = id, message = "G") + expectReceived(id = id, message = "G") + if (id > 33) expectReceived(id = id - 1, message = Stop) + } + + expectState(region)(1 -> Set(1, 2, 3), 2 -> Set(21, 22, 30), 3 -> Set(31, 32, 40)) + + // manually passivate some entities + region ! Envelope(shard = 1, id = 2, message = ManuallyPassivate) + region ! Envelope(shard = 2, id = 22, message = ManuallyPassivate) + region ! Envelope(shard = 3, id = 32, message = ManuallyPassivate) + expectReceived(id = 2, message = ManuallyPassivate) + expectReceived(id = 22, message = ManuallyPassivate) + expectReceived(id = 32, message = ManuallyPassivate) + expectReceived(id = 2, message = Stop) + expectReceived(id = 22, message = Stop) + expectReceived(id = 32, message = Stop) + + expectState(region)(1 -> Set(1, 3), 2 -> Set(21, 30), 3 -> Set(31, 40)) + + for (i <- 1 to 3) { + region ! Envelope(shard = 1, id = 11 + i, message = "H") + region ! Envelope(shard = 2, id = 22 + i, message = "H") + region ! Envelope(shard = 3, id = 33 + i, message = "H") + expectReceived(id = 11 + i, message = "H") + expectReceived(id = 22 + i, message = "H") + expectReceived(id = 33 + i, message = "H") + if (i == 2) { + expectReceived(id = 12, message = Stop) + expectReceived(id = 23, message = Stop) + expectReceived(id = 34, message = Stop) + } else if (i == 3) { + expectReceived(id = 13, message = Stop) + expectReceived(id = 24, message = Stop) + expectReceived(id = 35, message = Stop) + } + } + + expectState(region)(1 -> Set(1, 3, 14), 2 -> Set(21, 30, 25), 3 -> Set(31, 40, 36)) } } } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/AccessPattern.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/AccessPattern.scala index 361503a2c1..9a27d821ad 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/AccessPattern.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/AccessPattern.scala @@ -18,10 +18,11 @@ trait AccessPattern { abstract class SyntheticGenerator(events: Int) extends AccessPattern { protected def createGenerator(): site.ycsb.generator.NumberGenerator - override def entityIds: Source[EntityId, NotUsed] = Source.unfold(createGenerator() -> 0) { - case (_, count) if count >= events => None - case (generator, count) => Option(generator.nextValue().longValue.toString).map(generator -> (count + 1) -> _) + private def generateEntityIds: Source[EntityId, NotUsed] = Source.unfold(createGenerator()) { generator => + Option(generator.nextValue()).map(generator -> _.longValue.toString) } + + override def entityIds: Source[EntityId, NotUsed] = generateEntityIds.take(events) } object SyntheticGenerator { @@ -34,6 +35,13 @@ object SyntheticGenerator { override protected def createGenerator(): NumberGenerator = new CounterGenerator(start) } + /** + * Generate a looping sequence of id events. + */ + final class Loop(start: Long, end: Long, events: Int) extends SyntheticGenerator(events) { + override protected def createGenerator(): NumberGenerator = new SequentialGenerator(start, end) + } + /** * Generate id events randomly using a uniform distribution, from the inclusive range min to max. */ diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/Simulator.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/Simulator.scala index 9d025e8723..23c690fd5b 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/Simulator.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/Simulator.scala @@ -6,7 +6,11 @@ package akka.cluster.sharding.passivation.simulator import akka.NotUsed import akka.actor.ActorSystem -import akka.cluster.sharding.internal.{ EntityPassivationStrategy, LeastRecentlyUsedEntityPassivationStrategy } +import akka.cluster.sharding.internal.{ + EntityPassivationStrategy, + LeastRecentlyUsedEntityPassivationStrategy, + MostRecentlyUsedEntityPassivationStrategy +} import akka.stream.scaladsl.{ Flow, Source } import com.typesafe.config.ConfigFactory @@ -76,6 +80,8 @@ object Simulator { generator match { case SimulatorSettings.PatternSettings.Synthetic.Sequence(start) => new SyntheticGenerator.Sequence(start, events) + case SimulatorSettings.PatternSettings.Synthetic.Loop(start, end) => + new SyntheticGenerator.Loop(start, end, events) case SimulatorSettings.PatternSettings.Synthetic.Uniform(min, max) => new SyntheticGenerator.Uniform(min, max, events) case SimulatorSettings.PatternSettings.Synthetic.Exponential(mean) => @@ -98,6 +104,8 @@ object Simulator { runSettings.strategy match { case SimulatorSettings.StrategySettings.LeastRecentlyUsed(perRegionLimit) => () => new LeastRecentlyUsedEntityPassivationStrategy(perRegionLimit) + case SimulatorSettings.StrategySettings.MostRecentlyUsed(perRegionLimit) => + () => new MostRecentlyUsedEntityPassivationStrategy(perRegionLimit) } } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorSettings.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorSettings.scala index 0cc6333cef..94a4b2f3aa 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorSettings.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/passivation/simulator/SimulatorSettings.scala @@ -45,11 +45,13 @@ object SimulatorSettings { object StrategySettings { final case class LeastRecentlyUsed(perRegionLimit: Int) extends StrategySettings + final case class MostRecentlyUsed(perRegionLimit: Int) extends StrategySettings def apply(simulatorConfig: Config, strategy: String): StrategySettings = { val config = simulatorConfig.getConfig(strategy).withFallback(simulatorConfig.getConfig("strategy-defaults")) lowerCase(config.getString("strategy")) match { case "least-recently-used" => LeastRecentlyUsed(config.getInt("least-recently-used.per-region-limit")) + case "most-recently-used" => MostRecentlyUsed(config.getInt("most-recently-used.per-region-limit")) case _ => sys.error(s"Unknown strategy for [$strategy]") } } @@ -63,6 +65,7 @@ object SimulatorSettings { object Synthetic { sealed trait Generator final case class Sequence(start: Long) extends Generator + final case class Loop(start: Long, end: Long) extends Generator final case class Uniform(min: Long, max: Long) extends Generator final case class Exponential(mean: Double) extends Generator final case class Hotspot(min: Long, max: Long, hot: Double, rate: Double) extends Generator @@ -74,6 +77,10 @@ object SimulatorSettings { case "sequence" => val start = config.getLong("sequence.start") Sequence(start) + case "loop" => + val start = config.getLong("loop.start") + val end = config.getLong("loop.end") + Loop(start, end) case "uniform" => val min = config.getLong("uniform.min") val max = config.getLong("uniform.max") diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index 47a1014757..e2c7862fe1 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -325,6 +325,20 @@ passivation strategy, and set the limit for active entities in a shard region: Or enable the least recently used passivation strategy and set the active entity limit using the `withLeastRecentlyUsedPassivationStrategy` method on `ClusterShardingSettings`. +#### Most recently used passivation strategy + +The **most recently used** passivation strategy passivates those entities that have the most recent activity when the +number of active entities passes a specified limit. The configurable limit is for a whole shard region and is divided +evenly among the active shards in each region. This strategy is most useful when the older an entity is, the more +likely that entity will be accessed again; as seen in cyclic access patterns. Configure automatic passivation to use +the most recently used passivation strategy, and set the limit for active entities in a shard region: + +@@snip [passivation most recently used](/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingSettingsSpec.scala) { #passivation-most-recently-used type=conf } + +Or enable the most recently used passivation strategy and set the active entity limit using the +`withMostRecentlyUsedPassivationStrategy` method on `ClusterShardingSettings`. + + ## Sharding State There are two types of state managed: