Add adjustable active entity limit per shard region (#31004)

This commit is contained in:
Peter Vlugter 2021-12-20 21:09:14 +13:00 committed by GitHub
parent cc446477ee
commit 78df739ef6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 331 additions and 38 deletions

View file

@ -616,6 +616,7 @@ private[akka] class Shard(
case msg: CoordinatorMessage => receiveCoordinatorMessage(msg)
case msg: RememberEntityCommand => receiveRememberEntityCommand(msg)
case msg: ShardRegion.StartEntity => startEntity(msg.entityId, Some(sender()))
case msg: ShardRegion.SetActiveEntityLimit => activeEntityLimitUpdated(msg)
case msg: ShardRegion.ShardsUpdated => shardsUpdated(msg)
case Passivate(stopMessage) => passivate(sender(), stopMessage)
case PassivateIntervalTick => passivateEntitiesAfterInterval()
@ -624,6 +625,7 @@ private[akka] class Shard(
case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg)
case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender())
}
def rememberUpdate(add: Set[EntityId] = Set.empty, remove: Set[EntityId] = Set.empty): Unit = {
rememberEntitiesStore match {
case None =>
@ -981,6 +983,11 @@ private[akka] class Shard(
}
}
private def activeEntityLimitUpdated(updated: ShardRegion.SetActiveEntityLimit): Unit = {
val entitiesToPassivate = passivationStrategy.limitUpdated(updated.perRegionLimit)
passivateEntities(entitiesToPassivate)
}
private def shardsUpdated(updated: ShardRegion.ShardsUpdated): Unit = {
val entitiesToPassivate = passivationStrategy.shardsUpdated(updated.activeShards)
passivateEntities(entitiesToPassivate)

View file

@ -15,6 +15,7 @@ import scala.runtime.AbstractFunction1
import scala.util.{ Failure, Success }
import akka.Done
import akka.actor._
import akka.annotation.ApiMayChange
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
@ -488,6 +489,14 @@ object ShardRegion {
extends ClusterShardingSerializable
with DeadLetterSuppression
/**
* API MAY CHANGE: Messages for passivation strategies may change after additional testing and feedback.
*
* When limit-based automatic passivation is enabled, set a new active entity limit for a shard region.
*/
@ApiMayChange
final case class SetActiveEntityLimit(perRegionLimit: Int)
/**
* INTERNAL API
*
@ -774,6 +783,7 @@ private[akka] class ShardRegion(
case query: ShardRegionQuery => receiveQuery(query)
case msg: RestartShard => deliverMessage(msg, sender())
case msg: StartEntity => deliverStartEntity(msg, sender())
case msg: SetActiveEntityLimit => deliverToAllShards(msg, sender())
case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender())
case unknownMsg =>
log.warning("{}: Message does not have an extractor defined in shard so it was ignored: {}", typeName, unknownMsg)
@ -1277,6 +1287,9 @@ private[akka] class ShardRegion(
}
}
def deliverToAllShards(msg: Any, snd: ActorRef): Unit =
shards.values.foreach(_.tell(msg, snd))
def deliverMessage(msg: Any, snd: ActorRef): Unit =
msg match {
case RestartShard(shardId) =>

View file

@ -49,6 +49,13 @@ private[akka] object EntityPassivationStrategy {
private[akka] sealed abstract class EntityPassivationStrategy {
import EntityPassivationStrategy.PassivateEntities
/**
* The per-region active entity limit has been updated, which can trigger passivation.
* @param newLimit the new per-region active entity limit
* @return entities to passivate in the associated shard
*/
def limitUpdated(newLimit: Int): PassivateEntities
/**
* Active shards in this region have been updated, which can trigger passivation.
* @param activeShards updated number of active shards
@ -89,6 +96,7 @@ private[akka] sealed abstract class EntityPassivationStrategy {
private[akka] object DisabledEntityPassivationStrategy extends EntityPassivationStrategy {
import EntityPassivationStrategy.PassivateEntities
override def limitUpdated(newLimit: Int): PassivateEntities = PassivateEntities.none
override def shardsUpdated(activeShards: Int): PassivateEntities = PassivateEntities.none
override def entityTouched(id: EntityId): PassivateEntities = PassivateEntities.none
override def entityTerminated(id: EntityId): Unit = ()
@ -115,6 +123,8 @@ private[akka] final class IdleEntityPassivationStrategy(idleCheck: IdleCheck) ex
override val scheduledInterval: Option[FiniteDuration] = Some(idleCheck.interval)
override def limitUpdated(newLimit: Int): PassivateEntities = PassivateEntities.none
override def shardsUpdated(activeShards: Int): PassivateEntities = PassivateEntities.none
override def entityTouched(id: EntityId): PassivateEntities = {
@ -127,28 +137,49 @@ private[akka] final class IdleEntityPassivationStrategy(idleCheck: IdleCheck) ex
override def intervalPassed(): PassivateEntities = recencyList.removeLeastRecentOutside(idleCheck.timeout)
}
/**
* INTERNAL API: Shared base class for limit-based passivation strategies.
* @param initialLimit initial active entity capacity for a shard region
*/
@InternalApi
private[akka] abstract class LimitBasedEntityPassivationStrategy(initialLimit: Int) extends EntityPassivationStrategy {
import EntityPassivationStrategy.PassivateEntities
protected var activeShards: Int = 1
protected var perRegionLimit: Int = initialLimit
protected var perShardLimit: Int = perRegionLimit
override def limitUpdated(newPerRegionLimit: Int): PassivateEntities = {
perRegionLimit = newPerRegionLimit
perShardLimit = perRegionLimit / activeShards
passivateEntitiesOnLimitUpdate()
}
override def shardsUpdated(newActiveShards: Int): PassivateEntities = {
activeShards = newActiveShards
perShardLimit = perRegionLimit / activeShards
passivateEntitiesOnLimitUpdate()
}
protected def passivateEntitiesOnLimitUpdate(): PassivateEntities
}
/**
* INTERNAL API: Passivate the least recently used entities when the number of active entities in a shard region
* reaches a limit. The per-region limit is divided evenly among the active shards in a region.
* @param perRegionLimit active entity capacity for a shard region
* @param initialLimit initial active entity capacity for a shard region
* @param idleCheck optionally passivate idle entities after the given timeout, checking every interval
*/
@InternalApi
private[akka] final class LeastRecentlyUsedEntityPassivationStrategy(perRegionLimit: Int, idleCheck: Option[IdleCheck])
extends EntityPassivationStrategy {
private[akka] final class LeastRecentlyUsedEntityPassivationStrategy(initialLimit: Int, idleCheck: Option[IdleCheck])
extends LimitBasedEntityPassivationStrategy(initialLimit) {
import EntityPassivationStrategy.PassivateEntities
private var perShardLimit: Int = perRegionLimit
private val recencyList = RecencyList.empty[EntityId]
override val scheduledInterval: Option[FiniteDuration] = idleCheck.map(_.interval)
override def shardsUpdated(activeShards: Int): PassivateEntities = {
perShardLimit = perRegionLimit / activeShards
passivateExcessEntities()
}
override def entityTouched(id: EntityId): PassivateEntities = {
recencyList.update(id)
passivateExcessEntities()
@ -160,6 +191,8 @@ private[akka] final class LeastRecentlyUsedEntityPassivationStrategy(perRegionLi
recencyList.removeLeastRecentOutside(idle.timeout)
}
override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities = passivateExcessEntities()
private def passivateExcessEntities(): PassivateEntities = {
val excess = recencyList.size - perShardLimit
if (excess > 0) recencyList.removeLeastRecent(excess) else PassivateEntities.none
@ -174,21 +207,19 @@ private[akka] final class LeastRecentlyUsedEntityPassivationStrategy(perRegionLi
* Active entities are tracked in multiple recency lists, where entities are promoted to higher-level
* segments on subsequent accesses, and demoted through levels when segments become full.
* The proportions of the segmented levels can be configured as fractions of the overall limit.
* @param perRegionLimit active entity capacity for a shard region
* @param initialLimit initial active entity capacity for a shard region
* @param proportions proportions of the segmented levels
* @param idleCheck optionally passivate idle entities after the given timeout, checking every interval
*/
@InternalApi
private[akka] final class SegmentedLeastRecentlyUsedEntityPassivationStrategy(
perRegionLimit: Int,
initialLimit: Int,
proportions: immutable.Seq[Double],
idleCheck: Option[IdleCheck])
extends EntityPassivationStrategy {
extends LimitBasedEntityPassivationStrategy(initialLimit) {
import EntityPassivationStrategy.PassivateEntities
private var perShardLimit: Int = perRegionLimit
private def limits: immutable.Seq[Int] = proportions.map(p => (p * perShardLimit).toInt)
private val segmentedRecencyList =
@ -197,12 +228,6 @@ private[akka] final class SegmentedLeastRecentlyUsedEntityPassivationStrategy(
override val scheduledInterval: Option[FiniteDuration] = idleCheck.map(_.interval)
override def shardsUpdated(activeShards: Int): PassivateEntities = {
perShardLimit = perRegionLimit / activeShards
segmentedRecencyList.updateLimits(limits)
passivateExcessEntities()
}
override def entityTouched(id: EntityId): PassivateEntities = {
segmentedRecencyList.update(id)
passivateExcessEntities()
@ -214,6 +239,11 @@ private[akka] final class SegmentedLeastRecentlyUsedEntityPassivationStrategy(
segmentedRecencyList.removeOverallLeastRecentOutside(idle.timeout)
}
override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities = {
segmentedRecencyList.updateLimits(limits)
passivateExcessEntities()
}
private def passivateExcessEntities(): PassivateEntities = segmentedRecencyList.removeLeastRecentOverLimit()
}
@ -222,25 +252,19 @@ private[akka] final class SegmentedLeastRecentlyUsedEntityPassivationStrategy(
*
* Passivate the most recently used entities when the number of active entities in a shard region
* reaches a limit. The per-region limit is divided evenly among the active shards in a region.
* @param perRegionLimit active entity capacity for a shard region
* @param initialLimit initial active entity capacity for a shard region
* @param idleCheck optionally passivate idle entities after the given timeout, checking every interval
*/
@InternalApi
private[akka] final class MostRecentlyUsedEntityPassivationStrategy(perRegionLimit: Int, idleCheck: Option[IdleCheck])
extends EntityPassivationStrategy {
private[akka] final class MostRecentlyUsedEntityPassivationStrategy(initialLimit: Int, idleCheck: Option[IdleCheck])
extends LimitBasedEntityPassivationStrategy(initialLimit) {
import EntityPassivationStrategy.PassivateEntities
private var perShardLimit: Int = perRegionLimit
private val recencyList = RecencyList.empty[EntityId]
override val scheduledInterval: Option[FiniteDuration] = idleCheck.map(_.interval)
override def shardsUpdated(activeShards: Int): PassivateEntities = {
perShardLimit = perRegionLimit / activeShards
passivateExcessEntities()
}
override def entityTouched(id: EntityId): PassivateEntities = {
recencyList.update(id)
passivateExcessEntities(skip = 1) // remove most recent before adding this created entity
@ -252,6 +276,8 @@ private[akka] final class MostRecentlyUsedEntityPassivationStrategy(perRegionLim
recencyList.removeLeastRecentOutside(idle.timeout)
}
override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities = passivateExcessEntities()
private def passivateExcessEntities(skip: Int = 0): PassivateEntities = {
val excess = recencyList.size - perShardLimit
if (excess > 0) recencyList.removeMostRecent(excess, skip) else PassivateEntities.none
@ -263,31 +289,25 @@ private[akka] final class MostRecentlyUsedEntityPassivationStrategy(perRegionLim
*
* Passivate the least frequently used entities when the number of active entities in a shard region
* reaches a limit. The per-region limit is divided evenly among the active shards in a region.
* @param perRegionLimit active entity capacity for a shard region
* @param initialLimit initial active entity capacity for a shard region
* @param dynamicAging whether to apply "dynamic aging" as entities are passivated
* @param idleCheck optionally passivate idle entities after the given timeout, checking every interval
*/
@InternalApi
private[akka] final class LeastFrequentlyUsedEntityPassivationStrategy(
perRegionLimit: Int,
initialLimit: Int,
dynamicAging: Boolean,
idleCheck: Option[IdleCheck])
extends EntityPassivationStrategy {
extends LimitBasedEntityPassivationStrategy(initialLimit) {
import EntityPassivationStrategy.PassivateEntities
private var perShardLimit: Int = perRegionLimit
private val frequencyList =
if (idleCheck.isDefined) FrequencyList.withOverallRecency.empty[EntityId](dynamicAging)
else FrequencyList.empty[EntityId](dynamicAging)
override val scheduledInterval: Option[FiniteDuration] = idleCheck.map(_.interval)
override def shardsUpdated(activeShards: Int): PassivateEntities = {
perShardLimit = perRegionLimit / activeShards
passivateExcessEntities()
}
override def entityTouched(id: EntityId): PassivateEntities = {
// first remove excess entities so that dynamic aging is updated
// and the adjusted age is applied to any new entities on update
@ -304,6 +324,8 @@ private[akka] final class LeastFrequentlyUsedEntityPassivationStrategy(
frequencyList.removeOverallLeastRecentOutside(idle.timeout)
}
override protected def passivateEntitiesOnLimitUpdate(): PassivateEntities = passivateExcessEntities()
private def passivateExcessEntities(adjustment: Int = 0): PassivateEntities = {
val excess = frequencyList.size - perShardLimit + adjustment
if (excess > 0) frequencyList.removeLeastFrequent(excess) else PassivateEntities.none

View file

@ -4,6 +4,7 @@
package akka.cluster.sharding.passivation
import akka.cluster.sharding.ShardRegion
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
@ -324,3 +325,64 @@ class LeastFrequentlyUsedWithIdleSpec
}
}
}
class LeastFrequentlyUsedLimitAdjustmentSpec
extends AbstractEntityPassivationSpec(LeastFrequentlyUsedSpec.config, expectedEntities = 21) {
import EntityPassivationSpec.Entity.Envelope
import EntityPassivationSpec.Entity.Stop
"Passivation of least frequently used entities" must {
"adjust per-shard entity limits when the per-region limit is dynamically adjusted" in {
val region = start()
// only one active shard at first, initial per-shard limit of 10
for (id <- 1 to 20) {
region ! Envelope(shard = 1, id = id, message = "A")
expectReceived(id = id, message = "A")
if (id > 10) expectReceived(id = id - 10, message = Stop)
}
expectState(region)(1 -> (11 to 20))
// activating a second shard will divide the per-shard limit in two, passivating half of the first shard
region ! Envelope(shard = 2, id = 21, message = "B")
expectReceived(id = 21, message = "B")
for (id <- 11 to 15) {
expectReceived(id = id, message = Stop)
}
expectState(region)(1 -> (16 to 20), 2 -> Set(21))
// reduce the per-region limit from 10 to 6, per-shard limit becomes 3
region ! ShardRegion.SetActiveEntityLimit(6)
for (id <- 16 to 17) { // passivate entities over new limit
expectReceived(id = id, message = Stop)
}
expectState(region)(1 -> (18 to 20), 2 -> Set(21))
for (id <- 1 to 10) {
region ! Envelope(shard = 1, id = id, message = "C")
expectReceived(id = id, message = "C")
val passivated = if (id < 4) id + 17 else id - 3
expectReceived(id = passivated, message = Stop)
}
expectState(region)(1 -> (8 to 10), 2 -> Set(21))
// increase the per-region limit from 6 to 12, per-shard limit becomes 6
region ! ShardRegion.SetActiveEntityLimit(12)
for (id <- 11 to 20) {
region ! Envelope(shard = 1, id = id, message = "D")
expectReceived(id = id, message = "D")
if (id > 13) { // start passivating at new higher limit of 6
expectReceived(id = id - 6, message = Stop)
}
}
expectState(region)(1 -> (15 to 20), 2 -> Set(21))
}
}
}

View file

@ -4,6 +4,7 @@
package akka.cluster.sharding.passivation
import akka.cluster.sharding.ShardRegion
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
@ -43,6 +44,11 @@ object LeastRecentlyUsedSpec {
}
""").withFallback(EntityPassivationSpec.config)
val segmentedInitialLimitConfig: Config =
ConfigFactory.parseString("""
akka.cluster.sharding.passivation.slru.active-entity-limit = 20
""").withFallback(segmentedConfig)
val idleConfig: Config = ConfigFactory.parseString("""
akka.cluster.sharding {
passivation {
@ -279,3 +285,125 @@ class LeastRecentlyUsedWithIdleSpec
}
}
}
class LeastRecentlyUsedLimitAdjustmentSpec
extends AbstractEntityPassivationSpec(LeastRecentlyUsedSpec.config, expectedEntities = 21) {
import EntityPassivationSpec.Entity.Envelope
import EntityPassivationSpec.Entity.Stop
"Passivation of least recently used entities" must {
"adjust per-shard entity limits when the per-region limit is dynamically adjusted" in {
val region = start()
// only one active shard at first, initial per-shard limit of 10
for (id <- 1 to 20) {
region ! Envelope(shard = 1, id = id, message = "A")
expectReceived(id = id, message = "A")
if (id > 10) expectReceived(id = id - 10, message = Stop)
}
expectState(region)(1 -> (11 to 20))
// activating a second shard will divide the per-shard limit in two, passivating half of the first shard
region ! Envelope(shard = 2, id = 21, message = "B")
expectReceived(id = 21, message = "B")
for (id <- 11 to 15) {
expectReceived(id = id, message = Stop)
}
expectState(region)(1 -> (16 to 20), 2 -> Set(21))
// reduce the per-region limit from 10 to 6, per-shard limit becomes 3
region ! ShardRegion.SetActiveEntityLimit(6)
for (id <- 16 to 17) { // passivate entities over new limit
expectReceived(id = id, message = Stop)
}
expectState(region)(1 -> (18 to 20), 2 -> Set(21))
for (id <- 1 to 10) {
region ! Envelope(shard = 1, id = id, message = "C")
expectReceived(id = id, message = "C")
val passivated = if (id < 4) id + 17 else id - 3
expectReceived(id = passivated, message = Stop)
}
expectState(region)(1 -> (8 to 10), 2 -> Set(21))
// increase the per-region limit from 6 to 12, per-shard limit becomes 6
region ! ShardRegion.SetActiveEntityLimit(12)
for (id <- 11 to 20) {
region ! Envelope(shard = 1, id = id, message = "D")
expectReceived(id = id, message = "D")
if (id > 13) { // start passivating at new higher limit of 6
expectReceived(id = id - 6, message = Stop)
}
}
expectState(region)(1 -> (15 to 20), 2 -> Set(21))
}
}
}
class SegmentedLeastRecentlyUsedLimitAdjustmentSpec
extends AbstractEntityPassivationSpec(LeastRecentlyUsedSpec.segmentedInitialLimitConfig, expectedEntities = 31) {
import EntityPassivationSpec.Entity.Envelope
import EntityPassivationSpec.Entity.Stop
"Passivation of segmented least recently used entities" must {
"adjust per-shard entity limits when the per-region limit is dynamically adjusted" in {
val region = start()
// only one active shard at first, initial per-shard limit of 20
for (id <- 1 to 30) {
region ! Envelope(shard = 1, id = id, message = "A")
expectReceived(id = id, message = "A")
if (id > 20) expectReceived(id = id - 20, message = Stop)
}
expectState(region)(1 -> (11 to 30))
// activating a second shard will divide the per-shard limit in two, passivating half of the first shard
region ! Envelope(shard = 2, id = 31, message = "B")
expectReceived(id = 31, message = "B")
for (id <- 11 to 20) {
expectReceived(id = id, message = Stop)
}
expectState(region)(1 -> (21 to 30), 2 -> Set(31))
// reduce the per-region limit from 20 to 10, per-shard limit becomes 5
region ! ShardRegion.SetActiveEntityLimit(10)
for (id <- 21 to 25) { // passivate entities over new limit
expectReceived(id = id, message = Stop)
}
expectState(region)(1 -> (26 to 30), 2 -> Set(31))
for (id <- 1 to 10) {
region ! Envelope(shard = 1, id = id, message = "C")
expectReceived(id = id, message = "C")
val passivated = if (id < 6) id + 25 else id - 5
expectReceived(id = passivated, message = Stop)
}
expectState(region)(1 -> (6 to 10), 2 -> Set(31))
// increase the per-region limit from 10 to 30, per-shard limit becomes 15
region ! ShardRegion.SetActiveEntityLimit(30)
for (id <- 11 to 30) {
region ! Envelope(shard = 1, id = id, message = "D")
expectReceived(id = id, message = "D")
if (id > 20) { // start passivating at new higher limit of 15
expectReceived(id = id - 15, message = Stop)
}
}
expectState(region)(1 -> (16 to 30), 2 -> Set(31))
}
}
}

View file

@ -4,6 +4,7 @@
package akka.cluster.sharding.passivation
import akka.cluster.sharding.ShardRegion
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
@ -187,3 +188,63 @@ class MostRecentlyUsedWithIdleSpec
}
}
}
class MostRecentlyUsedLimitAdjustmentSpec
extends AbstractEntityPassivationSpec(MostRecentlyUsedSpec.config, expectedEntities = 21) {
import EntityPassivationSpec.Entity.Envelope
import EntityPassivationSpec.Entity.Stop
"Passivation of most recently used entities" must {
"adjust per-shard entity limits when the per-region limit is dynamically adjusted" in {
val region = start()
// only one active shard at first, initial per-shard limit of 10
for (id <- 1 to 20) {
region ! Envelope(shard = 1, id = id, message = "A")
expectReceived(id = id, message = "A")
if (id > 10) expectReceived(id = id - 1, message = Stop)
}
expectState(region)(1 -> Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 20))
// activating a second shard will divide the per-shard limit in two, passivating half of the first shard
region ! Envelope(shard = 2, id = 21, message = "B")
expectReceived(id = 21, message = "B")
for (id <- Seq(20, 9, 8, 7, 6)) {
expectReceived(id, message = Stop)
}
expectState(region)(1 -> Set(1, 2, 3, 4, 5), 2 -> Set(21))
// reduce the per-region limit from 10 to 6, per-shard limit becomes 3
region ! ShardRegion.SetActiveEntityLimit(6)
for (id <- Seq(5, 4)) { // passivate entities over new limit
expectReceived(id = id, message = Stop)
}
expectState(region)(1 -> Set(1, 2, 3), 2 -> Set(21))
for (id <- 1 to 10) {
region ! Envelope(shard = 1, id = id, message = "C")
expectReceived(id = id, message = "C")
if (id > 5) expectReceived(id = id - 1, message = Stop)
}
expectState(region)(1 -> Set(1, 2, 10), 2 -> Set(21))
// increase the per-region limit from 6 to 12, per-shard limit becomes 6
region ! ShardRegion.SetActiveEntityLimit(12)
for (id <- 11 to 20) {
region ! Envelope(shard = 1, id = id, message = "D")
expectReceived(id = id, message = "D")
if (id > 13) { // start passivating at new higher limit of 6
expectReceived(id = id - 1, message = Stop)
}
}
expectState(region)(1 -> Set(1, 2, 10, 11, 12, 20), 2 -> Set(21))
}
}
}