Add most recently used entity passivation strategy (#30897)

This commit is contained in:
Peter Vlugter 2021-11-17 21:54:02 +13:00 committed by GitHub
parent 3ffcdcc28c
commit c173f8ded9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 523 additions and 52 deletions

View file

@ -92,6 +92,18 @@ class RecencyListSpec extends AnyWordSpec with Matchers {
clock.tick() // time = 14
recency.removeMostRecentWithin(3.seconds) shouldBe List("r", "k", "q", "p")
check(recency, List("n", "o"))
clock.tick() // time = 15
recency.update("s").update("t").update("u").update("v").update("w").update("x").update("y").update("z")
check(recency, List("n", "o", "s", "t", "u", "v", "w", "x", "y", "z"))
clock.tick() // time = 16
recency.removeLeastRecent(3, skip = 3) shouldBe List("t", "u", "v")
check(recency, List("n", "o", "s", "w", "x", "y", "z"))
clock.tick() // time = 17
recency.removeMostRecent(3, skip = 2) shouldBe List("x", "w", "s")
check(recency, List("n", "o", "y", "z"))
}
}

View file

@ -76,11 +76,11 @@ private[akka] class RecencyList[A](clock: RecencyList.Clock) {
private val lessRecent: Node[A] => OptionVal[Node[A]] = _.lessRecent
private val moreRecent: Node[A] => OptionVal[Node[A]] = _.moreRecent
def removeLeastRecent(n: Int = 1): immutable.Seq[A] =
removeWhile(start = leastRecent, next = moreRecent, limit = n)
def removeLeastRecent(n: Int = 1, skip: Int = 0): immutable.Seq[A] =
removeWhile(start = leastRecent, next = moreRecent, limit = n, skip = skip)
def removeMostRecent(n: Int = 1): immutable.Seq[A] =
removeWhile(start = mostRecent, next = lessRecent, limit = n)
def removeMostRecent(n: Int = 1, skip: Int = 0): immutable.Seq[A] =
removeWhile(start = mostRecent, next = lessRecent, limit = n, skip = skip)
def removeLeastRecentOutside(duration: FiniteDuration): immutable.Seq[A] = {
val min = clock.earlierTime(duration)
@ -118,15 +118,19 @@ private[akka] class RecencyList[A](clock: RecencyList.Clock) {
start: OptionVal[Node[A]],
next: Node[A] => OptionVal[Node[A]],
continueWhile: Node[A] => Boolean = continueToLimit,
limit: Int = size): immutable.Seq[A] = {
limit: Int = size,
skip: Int = 0): immutable.Seq[A] = {
var count = 0
var node = start
val max = limit + skip
val values = mutable.ListBuffer.empty[A]
while (node.isDefined && continueWhile(node.get) && (count < limit)) {
while (node.isDefined && continueWhile(node.get) && (count < max)) {
count += 1
removeFromCurrentPosition(node.get)
lookupNode -= node.get.value
values += node.get.value
if (count > skip) {
removeFromCurrentPosition(node.get)
lookupNode -= node.get.value
values += node.get.value
}
node = next(node.get)
}
values.result()

View file

@ -72,7 +72,8 @@ object ClusterShardingSettings {
passivationStrategySettings = new ClassicShardingSettings.PassivationStrategySettings(
strategy = settings.passivationStrategySettings.strategy,
idleTimeout = settings.passivationStrategySettings.idleTimeout,
leastRecentlyUsedLimit = settings.passivationStrategySettings.leastRecentlyUsedLimit),
leastRecentlyUsedLimit = settings.passivationStrategySettings.leastRecentlyUsedLimit,
mostRecentlyUsedLimit = settings.passivationStrategySettings.mostRecentlyUsedLimit),
shardRegionQueryTimeout = settings.shardRegionQueryTimeout,
new ClassicShardingSettings.TuningParameters(
bufferSize = settings.tuningParameters.bufferSize,
@ -169,13 +170,19 @@ object ClusterShardingSettings {
val strategy: String,
val idleTimeout: FiniteDuration,
val leastRecentlyUsedLimit: Int,
val mostRecentlyUsedLimit: Int,
private[akka] val oldSettingUsed: Boolean) {
def this(strategy: String, idleTimeout: FiniteDuration, leastRecentlyUsedLimit: Int) =
this(strategy, idleTimeout, leastRecentlyUsedLimit, oldSettingUsed = false)
def this(strategy: String, idleTimeout: FiniteDuration, leastRecentlyUsedLimit: Int, mostRecentlyUsedLimit: Int) =
this(strategy, idleTimeout, leastRecentlyUsedLimit, mostRecentlyUsedLimit, oldSettingUsed = false)
def this(classic: ClassicShardingSettings.PassivationStrategySettings) =
this(classic.strategy, classic.idleTimeout, classic.leastRecentlyUsedLimit, classic.oldSettingUsed)
this(
classic.strategy,
classic.idleTimeout,
classic.leastRecentlyUsedLimit,
classic.mostRecentlyUsedLimit,
classic.oldSettingUsed)
def withIdleStrategy(timeout: FiniteDuration): PassivationStrategySettings =
copy(strategy = "idle", idleTimeout = timeout)
@ -183,6 +190,9 @@ object ClusterShardingSettings {
def withLeastRecentlyUsedStrategy(limit: Int): PassivationStrategySettings =
copy(strategy = "least-recently-used", leastRecentlyUsedLimit = limit)
def withMostRecentlyUsedStrategy(limit: Int): PassivationStrategySettings =
copy(strategy = "most-recently-used", mostRecentlyUsedLimit = limit)
private[akka] def withOldIdleStrategy(timeout: FiniteDuration): PassivationStrategySettings =
copy(strategy = "idle", idleTimeout = timeout, oldSettingUsed = true)
@ -190,8 +200,14 @@ object ClusterShardingSettings {
strategy: String,
idleTimeout: FiniteDuration = idleTimeout,
leastRecentlyUsedLimit: Int = leastRecentlyUsedLimit,
mostRecentlyUsedLimit: Int = mostRecentlyUsedLimit,
oldSettingUsed: Boolean = oldSettingUsed): PassivationStrategySettings =
new PassivationStrategySettings(strategy, idleTimeout, leastRecentlyUsedLimit, oldSettingUsed)
new PassivationStrategySettings(
strategy,
idleTimeout,
leastRecentlyUsedLimit,
mostRecentlyUsedLimit,
oldSettingUsed)
}
object PassivationStrategySettings {
@ -199,6 +215,7 @@ object ClusterShardingSettings {
strategy = "none",
idleTimeout = Duration.Zero,
leastRecentlyUsedLimit = 0,
mostRecentlyUsedLimit = 0,
oldSettingUsed = false)
def oldDefault(idleTimeout: FiniteDuration): PassivationStrategySettings =
@ -531,6 +548,9 @@ final class ClusterShardingSettings(
def withLeastRecentlyUsedPassivationStrategy(limit: Int): ClusterShardingSettings =
copy(passivationStrategySettings = passivationStrategySettings.withLeastRecentlyUsedStrategy(limit))
def withMostRecentlyUsedPassivationStrategy(limit: Int): ClusterShardingSettings =
copy(passivationStrategySettings = passivationStrategySettings.withMostRecentlyUsedStrategy(limit))
def withShardRegionQueryTimeout(duration: FiniteDuration): ClusterShardingSettings =
copy(shardRegionQueryTimeout = duration)

View file

@ -39,6 +39,7 @@ akka.cluster.sharding {
# Passivation strategy to use. Possible values are:
# - "idle"
# - "least-recently-used"
# - "most-recently-used"
# Set to "none" or "off" to disable automatic passivation.
# Passivation strategies are always disabled if `remember-entities` is enabled.
strategy = "idle"
@ -57,6 +58,14 @@ akka.cluster.sharding {
# Limit of active entities in a shard region.
limit = 100000
}
# Most recently used passivation strategy.
# Passivate the most recently used entities when the number of active entities in a shard region
# reaches a limit. The per-region limit is divided evenly among the active shards in a region.
most-recently-used {
# Limit of active entities in a shard region.
limit = 100000
}
}
# If the coordinator can't store state changes it will be stopped

View file

@ -134,10 +134,11 @@ object ClusterShardingSettings {
val strategy: String,
val idleTimeout: FiniteDuration,
val leastRecentlyUsedLimit: Int,
val mostRecentlyUsedLimit: Int,
private[akka] val oldSettingUsed: Boolean) {
def this(strategy: String, idleTimeout: FiniteDuration, leastRecentlyUsedLimit: Int) =
this(strategy, idleTimeout, leastRecentlyUsedLimit, oldSettingUsed = false)
def this(strategy: String, idleTimeout: FiniteDuration, leastRecentlyUsedLimit: Int, mostRecentlyUsedLimit: Int) =
this(strategy, idleTimeout, leastRecentlyUsedLimit, mostRecentlyUsedLimit, oldSettingUsed = false)
def withIdleStrategy(timeout: FiniteDuration): PassivationStrategySettings =
copy(strategy = "idle", idleTimeout = timeout, oldSettingUsed = false)
@ -145,6 +146,9 @@ object ClusterShardingSettings {
def withLeastRecentlyUsedStrategy(limit: Int): PassivationStrategySettings =
copy(strategy = "least-recently-used", leastRecentlyUsedLimit = limit)
def withMostRecentlyUsedStrategy(limit: Int): PassivationStrategySettings =
copy(strategy = "most-recently-used", mostRecentlyUsedLimit = limit)
private[akka] def withOldIdleStrategy(timeout: FiniteDuration): PassivationStrategySettings =
copy(strategy = "idle", idleTimeout = timeout, oldSettingUsed = true)
@ -152,8 +156,14 @@ object ClusterShardingSettings {
strategy: String,
idleTimeout: FiniteDuration = idleTimeout,
leastRecentlyUsedLimit: Int = leastRecentlyUsedLimit,
mostRecentlyUsedLimit: Int = mostRecentlyUsedLimit,
oldSettingUsed: Boolean = oldSettingUsed): PassivationStrategySettings =
new PassivationStrategySettings(strategy, idleTimeout, leastRecentlyUsedLimit, oldSettingUsed)
new PassivationStrategySettings(
strategy,
idleTimeout,
leastRecentlyUsedLimit,
mostRecentlyUsedLimit,
oldSettingUsed)
}
object PassivationStrategySettings {
@ -161,13 +171,15 @@ object ClusterShardingSettings {
strategy = "none",
idleTimeout = Duration.Zero,
leastRecentlyUsedLimit = 0,
mostRecentlyUsedLimit = 0,
oldSettingUsed = false)
def apply(config: Config): PassivationStrategySettings = {
val settings = new PassivationStrategySettings(
strategy = toRootLowerCase(config.getString("passivation.strategy")),
idleTimeout = config.getDuration("passivation.idle.timeout", MILLISECONDS).millis,
leastRecentlyUsedLimit = config.getInt("passivation.least-recently-used.limit"))
leastRecentlyUsedLimit = config.getInt("passivation.least-recently-used.limit"),
mostRecentlyUsedLimit = config.getInt("passivation.most-recently-used.limit"))
// default to old setting if it exists (defined in application.conf), overriding the new settings
if (config.hasPath("passivate-idle-entity-after")) {
val timeout =
@ -191,6 +203,7 @@ object ClusterShardingSettings {
private[akka] case object NoPassivationStrategy extends PassivationStrategy
private[akka] case class IdlePassivationStrategy(timeout: FiniteDuration) extends PassivationStrategy
private[akka] case class LeastRecentlyUsedPassivationStrategy(limit: Int) extends PassivationStrategy
private[akka] case class MostRecentlyUsedPassivationStrategy(limit: Int) extends PassivationStrategy
/**
* INTERNAL API
@ -207,6 +220,8 @@ object ClusterShardingSettings {
IdlePassivationStrategy(settings.passivationStrategySettings.idleTimeout)
case "least-recently-used" =>
LeastRecentlyUsedPassivationStrategy(settings.passivationStrategySettings.leastRecentlyUsedLimit)
case "most-recently-used" =>
MostRecentlyUsedPassivationStrategy(settings.passivationStrategySettings.mostRecentlyUsedLimit)
case _ => NoPassivationStrategy
}
}
@ -619,6 +634,9 @@ final class ClusterShardingSettings(
def withLeastRecentlyUsedPassivationStrategy(limit: Int): ClusterShardingSettings =
copy(passivationStrategySettings = passivationStrategySettings.withLeastRecentlyUsedStrategy(limit))
def withMostRecentlyUsedPassivationStrategy(limit: Int): ClusterShardingSettings =
copy(passivationStrategySettings = passivationStrategySettings.withMostRecentlyUsedStrategy(limit))
def withShardRegionQueryTimeout(duration: FiniteDuration): ClusterShardingSettings =
copy(shardRegionQueryTimeout = duration)

View file

@ -26,6 +26,8 @@ private[akka] object EntityPassivationStrategy {
new IdleEntityPassivationStrategy(timeout)
case ClusterShardingSettings.LeastRecentlyUsedPassivationStrategy(limit) =>
new LeastRecentlyUsedEntityPassivationStrategy(limit)
case ClusterShardingSettings.MostRecentlyUsedPassivationStrategy(limit) =>
new MostRecentlyUsedEntityPassivationStrategy(limit)
case _ => DisabledEntityPassivationStrategy
}
}
@ -154,3 +156,40 @@ private[akka] final class LeastRecentlyUsedEntityPassivationStrategy(perRegionLi
if (excess > 0) recencyList.removeLeastRecent(excess) else PassivateEntities.none
}
}
/**
* Passivate the most recently used entities when the number of active entities in a shard region
* reaches a limit. The per-region limit is divided evenly among the active shards in a region.
* @param perRegionLimit active entity capacity for a shard region
*/
@InternalApi
private[akka] final class MostRecentlyUsedEntityPassivationStrategy(perRegionLimit: Int)
extends EntityPassivationStrategy {
import EntityPassivationStrategy.PassivateEntities
private var perShardLimit: Int = perRegionLimit
private val recencyList = RecencyList.empty[EntityId]
override val scheduledInterval: Option[FiniteDuration] = None
override def shardsUpdated(activeShards: Int): PassivateEntities = {
perShardLimit = perRegionLimit / activeShards
passivateExcessEntities()
}
override def entityCreated(id: EntityId): PassivateEntities = {
recencyList.update(id)
passivateExcessEntities(skip = 1) // remove most recent before adding this created entity
}
override def entityTouched(id: EntityId): Unit = recencyList.update(id)
override def entityTerminated(id: EntityId): Unit = recencyList.remove(id)
override def intervalPassed(): PassivateEntities = PassivateEntities.none
private def passivateExcessEntities(skip: Int = 0): PassivateEntities = {
val excess = recencyList.size - perShardLimit
if (excess > 0) recencyList.removeMostRecent(excess, skip) else PassivateEntities.none
}
}

View file

@ -18,6 +18,14 @@
# ║ LRU 4M │ 20.24 % │ 43,704,979 │ 34,857,131 │ 30,857,131 ║
# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ LRU 8M │ 43.03 % │ 43,704,979 │ 24,896,638 │ 16,896,638 ║
# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ MRU 1M │ 11.92 % │ 43,704,979 │ 38,493,369 │ 37,493,369 ║
# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ MRU 2M │ 25.67 % │ 43,704,979 │ 32,486,285 │ 30,486,285 ║
# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ MRU 4M │ 41.82 % │ 43,704,979 │ 25,429,608 │ 21,429,608 ║
# ╟────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ MRU 8M │ 68.01 % │ 43,704,979 │ 13,980,246 │ 5,980,246 ║
# ╚════════╧═════════╧════════════╧═════════════╧══════════════╝
#
@ -54,7 +62,35 @@ akka.cluster.sharding {
regions = 10
pattern = arc-database
strategy = lru-800k
}
},
{
name = "MRU 1M"
shards = 100
regions = 10
pattern = arc-database
strategy = mru-100k
},
{
name = "MRU 2M"
shards = 100
regions = 10
pattern = arc-database
strategy = mru-200k
},
{
name = "MRU 4M"
shards = 100
regions = 10
pattern = arc-database
strategy = mru-400k
},
{
name = "MRU 8M"
shards = 100
regions = 10
pattern = arc-database
strategy = mru-800k
},
]
print-detailed-stats = true
@ -94,5 +130,33 @@ akka.cluster.sharding {
per-region-limit = 800000
}
}
mru-100k {
strategy = most-recently-used
most-recently-used {
per-region-limit = 100000
}
}
mru-200k {
strategy = most-recently-used
most-recently-used {
per-region-limit = 200000
}
}
mru-400k {
strategy = most-recently-used
most-recently-used {
per-region-limit = 400000
}
}
mru-800k {
strategy = most-recently-used
most-recently-used {
per-region-limit = 800000
}
}
}
}

View file

@ -16,6 +16,12 @@
# ║ LRU 500k │ 7.53 % │ 37,656,092 │ 34,822,122 │ 34,322,122 ║
# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ LRU 1M │ 25.37 % │ 37,656,092 │ 28,102,287 │ 27,102,287 ║
# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ MRU 250k │ 5.25 % │ 37,656,092 │ 35,680,111 │ 35,430,111 ║
# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ MRU 500k │ 10.50 % │ 37,656,092 │ 33,702,975 │ 33,202,975 ║
# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ MRU 1M │ 20.79 % │ 37,656,092 │ 29,826,997 │ 28,826,997 ║
# ╚══════════╧═════════╧════════════╧═════════════╧══════════════╝
#
@ -45,6 +51,27 @@ akka.cluster.sharding {
regions = 10
pattern = arc-search-merged
strategy = lru-100k
},
{
name = "MRU 250k"
shards = 100
regions = 10
pattern = arc-search-merged
strategy = mru-25k
},
{
name = "MRU 500k"
shards = 100
regions = 10
pattern = arc-search-merged
strategy = mru-50k
},
{
name = "MRU 1M"
shards = 100
regions = 10
pattern = arc-search-merged
strategy = mru-100k
}
]
@ -78,5 +105,26 @@ akka.cluster.sharding {
per-region-limit = 100000
}
}
mru-25k {
strategy = most-recently-used
most-recently-used {
per-region-limit = 25000
}
}
mru-50k {
strategy = most-recently-used
most-recently-used {
per-region-limit = 50000
}
}
mru-100k {
strategy = most-recently-used
most-recently-used {
per-region-limit = 100000
}
}
}
}

View file

@ -21,6 +21,10 @@ akka.cluster.sharding {
sequence {
start = 1
}
loop {
start = 1
end = 1000000
}
uniform {
min = 1
max = 10000000

View file

@ -0,0 +1,68 @@
#
# Run with synthetically generated access events with a looping scan through ids.
#
# > akka-cluster-sharding/Test/runMain akka.cluster.sharding.passivation.simulator.Simulator synthetic-loop
#
# ╔══════════╤═════════╤════════════╤═════════════╤══════════════╗
# ║ Run │ Active │ Accesses │ Activations │ Passivations ║
# ╠══════════╪═════════╪════════════╪═════════════╪══════════════╣
# ║ LRU 500k │ 0.00 % │ 10,000,000 │ 10,000,000 │ 9,500,000 ║
# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ MRU 500k │ 45.00 % │ 10,000,000 │ 5,500,000 │ 5,000,000 ║
# ╚══════════╧═════════╧════════════╧═════════════╧══════════════╝
#
akka.cluster.sharding {
passivation.simulator {
runs = [
{
name = "LRU 500k"
shards = 100
regions = 10
pattern = loop-1M
strategy = lru-50k
},
{
name = "MRU 500k"
shards = 100
regions = 10
pattern = loop-1M
strategy = mru-50k
}
]
print-detailed-stats = true
# looping sequence of ids
# generate 10M events over 1M ids
loop-1M {
pattern = synthetic
synthetic {
events = 10000000
generator = loop
loop {
start = 1
end = 1000000
}
}
}
# LRU strategy with 50k limit in each of 10 regions
# total limit across cluster of 500k (50% of id space)
lru-50k {
strategy = least-recently-used
least-recently-used {
per-region-limit = 50000
}
}
# MRU strategy with 50k limit in each of 10 regions
# total limit across cluster of 500k (50% of id space)
mru-50k {
strategy = most-recently-used
most-recently-used {
per-region-limit = 50000
}
}
}
}

View file

@ -7,6 +7,8 @@
# ║ Run │ Active │ Accesses │ Activations │ Passivations ║
# ╠══════════╪═════════╪════════════╪═════════════╪══════════════╣
# ║ LRU 100k │ 40.47 % │ 50,000,000 │ 29,764,380 │ 29,664,380 ║
# ╟──────────┼─────────┼────────────┼─────────────┼──────────────╢
# ║ MRU 100k │ 10.08 % │ 50,000,000 │ 44,960,042 │ 44,860,042 ║
# ╚══════════╧═════════╧════════════╧═════════════╧══════════════╝
#
@ -19,6 +21,13 @@ akka.cluster.sharding {
regions = 10
pattern = scrambled-zipfian
strategy = lru-10k
},
{
name = "MRU 100k"
shards = 100
regions = 10
pattern = scrambled-zipfian
strategy = mru-10k
}
]
@ -47,5 +56,14 @@ akka.cluster.sharding {
per-region-limit = 10000
}
}
# MRU strategy with 10k limit in each of 10 regions
# total limit across cluster of 100k (1% of id space)
mru-10k {
strategy = most-recently-used
most-recently-used {
per-region-limit = 10000
}
}
}
}

View file

@ -67,6 +67,25 @@ class ClusterShardingSettingsSpec extends AnyWordSpec with Matchers {
.passivationStrategy shouldBe ClusterShardingSettings.LeastRecentlyUsedPassivationStrategy(42000)
}
"allow most recently used passivation strategy to be configured (via config)" in {
settings("""
#passivation-most-recently-used
akka.cluster.sharding {
passivation {
strategy = most-recently-used
most-recently-used.limit = 1000000
}
}
#passivation-most-recently-used
""").passivationStrategy shouldBe ClusterShardingSettings.MostRecentlyUsedPassivationStrategy(1000000)
}
"allow most recently used passivation strategy to be configured (via factory method)" in {
defaultSettings
.withMostRecentlyUsedPassivationStrategy(limit = 42000)
.passivationStrategy shouldBe ClusterShardingSettings.MostRecentlyUsedPassivationStrategy(42000)
}
"disable automatic passivation if `remember-entities` is enabled (via config)" in {
settings("""
akka.cluster.sharding.remember-entities = on

View file

@ -10,6 +10,8 @@ import akka.actor.{ Actor, ActorRef, Props }
import akka.cluster.Cluster
import akka.testkit.WithLogCapturing
import akka.testkit.{ AkkaSpec, TestProbe }
import org.scalatest.concurrent.Eventually
import scala.concurrent.duration._
object EntityPassivationSpec {
@ -42,6 +44,15 @@ object EntityPassivationSpec {
}
""").withFallback(config)
val mostRecentlyUsedConfig = ConfigFactory.parseString("""
akka.cluster.sharding {
passivation {
strategy = most-recently-used
most-recently-used.limit = 10
}
}
""").withFallback(config)
val disabledConfig = ConfigFactory.parseString("""
akka.cluster.sharding {
passivation {
@ -88,6 +99,7 @@ object EntityPassivationSpec {
abstract class AbstractEntityPassivationSpec(config: Config, expectedEntities: Int)
extends AkkaSpec(config)
with Eventually
with WithLogCapturing {
import EntityPassivationSpec._
@ -98,6 +110,7 @@ abstract class AbstractEntityPassivationSpec(config: Config, expectedEntities: I
val probes: Map[Int, TestProbe] = (1 to expectedEntities).map(id => id -> TestProbe()).toMap
val probeRefs: Map[String, ActorRef] = probes.map { case (id, probe) => id.toString -> probe.ref }
val stateProbe: TestProbe = TestProbe()
def expectReceived(id: Int, message: Any, within: FiniteDuration = patience.timeout): Entity.Received = {
val received = probes(id).expectMsgType[Entity.Received](within)
@ -108,6 +121,18 @@ abstract class AbstractEntityPassivationSpec(config: Config, expectedEntities: I
def expectNoMessage(id: Int, within: FiniteDuration): Unit =
probes(id).expectNoMessage(within)
def getState(region: ActorRef): ShardRegion.CurrentShardRegionState = {
region.tell(ShardRegion.GetShardRegionState, stateProbe.ref)
stateProbe.expectMsgType[ShardRegion.CurrentShardRegionState]
}
def expectState(region: ActorRef)(expectedShards: (Int, Iterable[Int])*): Unit =
eventually {
getState(region).shards should contain theSameElementsAs expectedShards.map {
case (shardId, entityIds) => ShardRegion.ShardState(shardId.toString, entityIds.map(_.toString).toSet)
}
}
def start(): ActorRef = {
// single node cluster
Cluster(system).join(Cluster(system).selfAddress)
@ -185,7 +210,7 @@ class LeastRecentlyUsedEntityPassivationSpec
if (id > 10) expectReceived(id = id - 10, message = Stop)
}
// shard 1 active ids: 11-20
expectState(region)(1 -> (11 to 20))
// activating a second shard will divide the per-shard limit in two, passivating half of the first shard
region ! Envelope(shard = 2, id = 21, message = "B")
@ -194,8 +219,7 @@ class LeastRecentlyUsedEntityPassivationSpec
expectReceived(id = id, message = Stop)
}
// shard 1 active ids: 16-20
// shard 2 active ids: 21
expectState(region)(1 -> (16 to 20), 2 -> Set(21))
// shards now have a limit of 5 entities
for (id <- 1 to 20) {
@ -205,8 +229,7 @@ class LeastRecentlyUsedEntityPassivationSpec
expectReceived(id = passivatedId, message = Stop)
}
// shard 1 active ids: 16-20
// shard 2 active ids: 21
expectState(region)(1 -> (16 to 20), 2 -> Set(21))
// shards now have a limit of 5 entities
for (id <- 21 to 24) {
@ -214,8 +237,7 @@ class LeastRecentlyUsedEntityPassivationSpec
expectReceived(id = id, message = "D")
}
// shard 1 active ids: 16-20
// shard 2 active ids: 21-24
expectState(region)(1 -> (16 to 20), 2 -> (21 to 24))
// activating a third shard will divide the per-shard limit in three, passivating entities over the new limits
region ! Envelope(shard = 3, id = 31, message = "E")
@ -224,9 +246,7 @@ class LeastRecentlyUsedEntityPassivationSpec
expectReceived(id = id, message = Stop)
}
// shard 1 active ids: 18, 19, 20
// shard 2 active ids: 22, 23, 24
// shard 3 active ids: 31
expectState(region)(1 -> Set(18, 19, 20), 2 -> Set(22, 23, 24), 3 -> Set(31))
// shards now have a limit of 3 entities
for (id <- 25 to 30) {
@ -235,9 +255,7 @@ class LeastRecentlyUsedEntityPassivationSpec
expectReceived(id = id - 3, message = Stop)
}
// shard 1 active ids: 18, 19, 20
// shard 2 active ids: 28, 29, 30
// shard 3 active ids: 31
expectState(region)(1 -> Set(18, 19, 20), 2 -> Set(28, 29, 30), 3 -> Set(31))
// shards now have a limit of 3 entities
for (id <- 31 to 40) {
@ -246,9 +264,7 @@ class LeastRecentlyUsedEntityPassivationSpec
if (id > 33) expectReceived(id = id - 3, message = Stop)
}
// shard 1 active ids: 18, 19, 20
// shard 2 active ids: 28, 29, 30
// shard 3 active ids: 38, 39, 40
expectState(region)(1 -> Set(18, 19, 20), 2 -> Set(28, 29, 30), 3 -> Set(38, 39, 40))
// manually passivate some entities
region ! Envelope(shard = 1, id = 19, message = ManuallyPassivate)
@ -261,9 +277,7 @@ class LeastRecentlyUsedEntityPassivationSpec
expectReceived(id = 29, message = Stop)
expectReceived(id = 39, message = Stop)
// shard 1 active ids: 18, 20
// shard 2 active ids: 28, 30
// shard 3 active ids: 38, 40
expectState(region)(1 -> Set(18, 20), 2 -> Set(28, 30), 3 -> Set(38, 40))
for (i <- 1 to 3) {
region ! Envelope(shard = 1, id = 10 + i, message = "H")
@ -283,17 +297,114 @@ class LeastRecentlyUsedEntityPassivationSpec
}
}
// shard 1 active ids: 11, 12, 13
// shard 2 active ids: 21, 22, 23
// shard 3 active ids: 31, 32, 33
expectState(region)(1 -> Set(11, 12, 13), 2 -> Set(21, 22, 23), 3 -> Set(31, 32, 33))
}
}
}
val statsProbe = TestProbe()
region.tell(ShardRegion.GetShardRegionState, statsProbe.ref)
val state = statsProbe.expectMsgType[ShardRegion.CurrentShardRegionState]
state.shards shouldBe Set(
ShardRegion.ShardState("1", Set("11", "12", "13")),
ShardRegion.ShardState("2", Set("21", "22", "23")),
ShardRegion.ShardState("3", Set("31", "32", "33")))
class MostRecentlyUsedEntityPassivationSpec
extends AbstractEntityPassivationSpec(EntityPassivationSpec.mostRecentlyUsedConfig, expectedEntities = 40) {
import EntityPassivationSpec.Entity.{ Envelope, ManuallyPassivate, Stop }
"Passivation of most recently used entities" must {
"passivate the most recently used entities when the per-shard entity limit is reached" in {
val region = start()
// only one active shard at first, most recently used entities passivated once the limit is reached
for (id <- 1 to 20) {
region ! Envelope(shard = 1, id = id, message = "A")
expectReceived(id, message = "A")
if (id > 10) expectReceived(id = id - 1, message = Stop)
}
expectState(region)(1 -> Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 20))
// activating a second shard will divide the per-shard limit in two, passivating half of the first shard
region ! Envelope(shard = 2, id = 21, message = "B")
expectReceived(id = 21, message = "B")
for (id <- Seq(20, 9, 8, 7, 6)) {
expectReceived(id, message = Stop)
}
expectState(region)(1 -> Set(1, 2, 3, 4, 5), 2 -> Set(21))
// shards now have a limit of 5 entities
for (id <- 1 to 20) {
region ! Envelope(shard = 1, id = id, message = "C")
expectReceived(id = id, message = "C")
if (id > 5) expectReceived(id = id - 1, message = Stop)
}
expectState(region)(1 -> Set(1, 2, 3, 4, 20), 2 -> Set(21))
// shards now have a limit of 5 entities
for (id <- 21 to 24) {
region ! Envelope(shard = 2, id = id, message = "D")
expectReceived(id = id, message = "D")
}
expectState(region)(1 -> Set(1, 2, 3, 4, 20), 2 -> Set(21, 22, 23, 24))
// activating a third shard will divide the per-shard limit in three, passivating entities over the new limits
region ! Envelope(shard = 3, id = 31, message = "E")
expectReceived(id = 31, message = "E")
for (id <- Seq(24, 20, 4)) {
expectReceived(id = id, message = Stop)
}
expectState(region)(1 -> Set(1, 2, 3), 2 -> Set(21, 22, 23), 3 -> Set(31))
// shards now have a limit of 3 entities
for (id <- 24 to 30) {
region ! Envelope(shard = 2, id = id, message = "F")
expectReceived(id = id, message = "F")
expectReceived(id = id - 1, message = Stop)
}
expectState(region)(1 -> Set(1, 2, 3), 2 -> Set(21, 22, 30), 3 -> Set(31))
// shards now have a limit of 3 entities
for (id <- 31 to 40) {
region ! Envelope(shard = 3, id = id, message = "G")
expectReceived(id = id, message = "G")
if (id > 33) expectReceived(id = id - 1, message = Stop)
}
expectState(region)(1 -> Set(1, 2, 3), 2 -> Set(21, 22, 30), 3 -> Set(31, 32, 40))
// manually passivate some entities
region ! Envelope(shard = 1, id = 2, message = ManuallyPassivate)
region ! Envelope(shard = 2, id = 22, message = ManuallyPassivate)
region ! Envelope(shard = 3, id = 32, message = ManuallyPassivate)
expectReceived(id = 2, message = ManuallyPassivate)
expectReceived(id = 22, message = ManuallyPassivate)
expectReceived(id = 32, message = ManuallyPassivate)
expectReceived(id = 2, message = Stop)
expectReceived(id = 22, message = Stop)
expectReceived(id = 32, message = Stop)
expectState(region)(1 -> Set(1, 3), 2 -> Set(21, 30), 3 -> Set(31, 40))
for (i <- 1 to 3) {
region ! Envelope(shard = 1, id = 11 + i, message = "H")
region ! Envelope(shard = 2, id = 22 + i, message = "H")
region ! Envelope(shard = 3, id = 33 + i, message = "H")
expectReceived(id = 11 + i, message = "H")
expectReceived(id = 22 + i, message = "H")
expectReceived(id = 33 + i, message = "H")
if (i == 2) {
expectReceived(id = 12, message = Stop)
expectReceived(id = 23, message = Stop)
expectReceived(id = 34, message = Stop)
} else if (i == 3) {
expectReceived(id = 13, message = Stop)
expectReceived(id = 24, message = Stop)
expectReceived(id = 35, message = Stop)
}
}
expectState(region)(1 -> Set(1, 3, 14), 2 -> Set(21, 30, 25), 3 -> Set(31, 40, 36))
}
}
}

View file

@ -18,10 +18,11 @@ trait AccessPattern {
abstract class SyntheticGenerator(events: Int) extends AccessPattern {
protected def createGenerator(): site.ycsb.generator.NumberGenerator
override def entityIds: Source[EntityId, NotUsed] = Source.unfold(createGenerator() -> 0) {
case (_, count) if count >= events => None
case (generator, count) => Option(generator.nextValue().longValue.toString).map(generator -> (count + 1) -> _)
private def generateEntityIds: Source[EntityId, NotUsed] = Source.unfold(createGenerator()) { generator =>
Option(generator.nextValue()).map(generator -> _.longValue.toString)
}
override def entityIds: Source[EntityId, NotUsed] = generateEntityIds.take(events)
}
object SyntheticGenerator {
@ -34,6 +35,13 @@ object SyntheticGenerator {
override protected def createGenerator(): NumberGenerator = new CounterGenerator(start)
}
/**
* Generate a looping sequence of id events.
*/
final class Loop(start: Long, end: Long, events: Int) extends SyntheticGenerator(events) {
override protected def createGenerator(): NumberGenerator = new SequentialGenerator(start, end)
}
/**
* Generate id events randomly using a uniform distribution, from the inclusive range min to max.
*/

View file

@ -6,7 +6,11 @@ package akka.cluster.sharding.passivation.simulator
import akka.NotUsed
import akka.actor.ActorSystem
import akka.cluster.sharding.internal.{ EntityPassivationStrategy, LeastRecentlyUsedEntityPassivationStrategy }
import akka.cluster.sharding.internal.{
EntityPassivationStrategy,
LeastRecentlyUsedEntityPassivationStrategy,
MostRecentlyUsedEntityPassivationStrategy
}
import akka.stream.scaladsl.{ Flow, Source }
import com.typesafe.config.ConfigFactory
@ -76,6 +80,8 @@ object Simulator {
generator match {
case SimulatorSettings.PatternSettings.Synthetic.Sequence(start) =>
new SyntheticGenerator.Sequence(start, events)
case SimulatorSettings.PatternSettings.Synthetic.Loop(start, end) =>
new SyntheticGenerator.Loop(start, end, events)
case SimulatorSettings.PatternSettings.Synthetic.Uniform(min, max) =>
new SyntheticGenerator.Uniform(min, max, events)
case SimulatorSettings.PatternSettings.Synthetic.Exponential(mean) =>
@ -98,6 +104,8 @@ object Simulator {
runSettings.strategy match {
case SimulatorSettings.StrategySettings.LeastRecentlyUsed(perRegionLimit) =>
() => new LeastRecentlyUsedEntityPassivationStrategy(perRegionLimit)
case SimulatorSettings.StrategySettings.MostRecentlyUsed(perRegionLimit) =>
() => new MostRecentlyUsedEntityPassivationStrategy(perRegionLimit)
}
}

View file

@ -45,11 +45,13 @@ object SimulatorSettings {
object StrategySettings {
final case class LeastRecentlyUsed(perRegionLimit: Int) extends StrategySettings
final case class MostRecentlyUsed(perRegionLimit: Int) extends StrategySettings
def apply(simulatorConfig: Config, strategy: String): StrategySettings = {
val config = simulatorConfig.getConfig(strategy).withFallback(simulatorConfig.getConfig("strategy-defaults"))
lowerCase(config.getString("strategy")) match {
case "least-recently-used" => LeastRecentlyUsed(config.getInt("least-recently-used.per-region-limit"))
case "most-recently-used" => MostRecentlyUsed(config.getInt("most-recently-used.per-region-limit"))
case _ => sys.error(s"Unknown strategy for [$strategy]")
}
}
@ -63,6 +65,7 @@ object SimulatorSettings {
object Synthetic {
sealed trait Generator
final case class Sequence(start: Long) extends Generator
final case class Loop(start: Long, end: Long) extends Generator
final case class Uniform(min: Long, max: Long) extends Generator
final case class Exponential(mean: Double) extends Generator
final case class Hotspot(min: Long, max: Long, hot: Double, rate: Double) extends Generator
@ -74,6 +77,10 @@ object SimulatorSettings {
case "sequence" =>
val start = config.getLong("sequence.start")
Sequence(start)
case "loop" =>
val start = config.getLong("loop.start")
val end = config.getLong("loop.end")
Loop(start, end)
case "uniform" =>
val min = config.getLong("uniform.min")
val max = config.getLong("uniform.max")

View file

@ -325,6 +325,20 @@ passivation strategy, and set the limit for active entities in a shard region:
Or enable the least recently used passivation strategy and set the active entity limit using the
`withLeastRecentlyUsedPassivationStrategy` method on `ClusterShardingSettings`.
#### Most recently used passivation strategy
The **most recently used** passivation strategy passivates those entities that have the most recent activity when the
number of active entities passes a specified limit. The configurable limit is for a whole shard region and is divided
evenly among the active shards in each region. This strategy is most useful when the older an entity is, the more
likely that entity will be accessed again; as seen in cyclic access patterns. Configure automatic passivation to use
the most recently used passivation strategy, and set the limit for active entities in a shard region:
@@snip [passivation most recently used](/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingSettingsSpec.scala) { #passivation-most-recently-used type=conf }
Or enable the most recently used passivation strategy and set the active entity limit using the
`withMostRecentlyUsedPassivationStrategy` method on `ClusterShardingSettings`.
## Sharding State
There are two types of state managed: