Consolidated passivation check on settings used in region and shard (#27738)
This commit is contained in:
parent
a84aa0095c
commit
ef09dca732
5 changed files with 63 additions and 11 deletions
|
|
@ -311,6 +311,11 @@ final class ClusterShardingSettings(
|
|||
private[akka] def shouldHostShard(cluster: Cluster): Boolean =
|
||||
role.forall(cluster.selfMember.roles.contains)
|
||||
|
||||
/** If true, idle entities should be passivated if they have not received any message by this interval, otherwise it is not enabled. */
|
||||
@InternalApi
|
||||
private[akka] val shouldPassivateIdleEntities: Boolean =
|
||||
passivateIdleEntityAfter > Duration.Zero && !rememberEntities
|
||||
|
||||
def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role))
|
||||
|
||||
def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role)
|
||||
|
|
|
|||
|
|
@ -198,7 +198,7 @@ private[akka] class Shard(
|
|||
private var handOffStopper: Option[ActorRef] = None
|
||||
|
||||
import context.dispatcher
|
||||
val passivateIdleTask = if (settings.passivateIdleEntityAfter > Duration.Zero && !settings.rememberEntities) {
|
||||
val passivateIdleTask = if (settings.shouldPassivateIdleEntities) {
|
||||
val idleInterval = settings.passivateIdleEntityAfter / 2
|
||||
Some(context.system.scheduler.scheduleWithFixedDelay(idleInterval, idleInterval, self, PassivateIdleTick))
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -535,11 +535,7 @@ private[akka] class ShardRegion(
|
|||
cluster.subscribe(self, classOf[MemberEvent])
|
||||
timers.startTimerWithFixedDelay(Retry, Retry, retryInterval)
|
||||
startRegistration()
|
||||
if (settings.passivateIdleEntityAfter > Duration.Zero && !settings.rememberEntities)
|
||||
log.info(
|
||||
"{}: Idle entities will be passivated after [{}]",
|
||||
typeName,
|
||||
PrettyDuration.format(settings.passivateIdleEntityAfter))
|
||||
logPassivateIdleEntities()
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
|
|
@ -548,6 +544,17 @@ private[akka] class ShardRegion(
|
|||
gracefulShutdownProgress.trySuccess(Done)
|
||||
}
|
||||
|
||||
private def logPassivateIdleEntities(): Unit = {
|
||||
if (settings.shouldPassivateIdleEntities)
|
||||
log.info(
|
||||
"{}: Idle entities will be passivated after [{}]",
|
||||
typeName,
|
||||
PrettyDuration.format(settings.passivateIdleEntityAfter))
|
||||
|
||||
if (settings.rememberEntities)
|
||||
log.debug("Idle entities will not be passivated because 'rememberEntities' is enabled.")
|
||||
}
|
||||
|
||||
// when using proxy the data center can be different from the own data center
|
||||
private val targetDcRole = dataCenter match {
|
||||
case Some(t) => ClusterSettings.DcRolePrefix + t
|
||||
|
|
|
|||
|
|
@ -32,16 +32,13 @@ class ClusterShardingInternalsSpec extends AkkaSpec("""
|
|||
|akka.actor.provider = cluster
|
||||
|akka.remote.classic.netty.tcp.port = 0
|
||||
|akka.remote.artery.canonical.port = 0
|
||||
|akka.cluster.sharding.shard-region-query-timeout = 10 s
|
||||
|""".stripMargin) with MockitoSugar {
|
||||
import ClusterShardingInternalsSpec._
|
||||
|
||||
val clusterSharding = spy(new ClusterSharding(system.asInstanceOf[ExtendedActorSystem]))
|
||||
|
||||
"ClusterSharding" must {
|
||||
"have a configurable shard region query timeout" in {
|
||||
ClusterShardingSettings(system).shardRegionQueryTimeout shouldEqual 10.seconds
|
||||
}
|
||||
|
||||
"start a region in proxy mode in case of node role mismatch" in {
|
||||
|
||||
val settingsWithRole = ClusterShardingSettings(system).withRole("nonExistingRole")
|
||||
|
|
@ -66,7 +63,7 @@ class ClusterShardingInternalsSpec extends AkkaSpec("""
|
|||
ArgumentMatchers.eq(extractShardId))
|
||||
}
|
||||
|
||||
"HandOffStopper must stop the entity even if the entity doesn't handle handOffStopMessage" in {
|
||||
"stop entities from HandOffStopper even if the entity doesn't handle handOffStopMessage" in {
|
||||
val probe = TestProbe()
|
||||
val shardName = "test"
|
||||
val emptyHandlerActor = system.actorOf(Props(new EmptyHandlerActor))
|
||||
|
|
@ -77,6 +74,7 @@ class ClusterShardingInternalsSpec extends AkkaSpec("""
|
|||
expectTerminated(emptyHandlerActor, 1.seconds)
|
||||
|
||||
probe.expectMsg(1.seconds, ShardStopped(shardName))
|
||||
probe.lastSender shouldEqual handOffStopper
|
||||
|
||||
watch(handOffStopper)
|
||||
expectTerminated(handOffStopper, 1.seconds)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.cluster.sharding
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.testkit.AkkaSpec
|
||||
|
||||
class ClusterShardingSettingsSpec extends AkkaSpec(s"""
|
||||
akka.actor.provider = cluster
|
||||
akka.remote.classic.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
""") {
|
||||
|
||||
"ClusterShardingSettings" must {
|
||||
|
||||
"passivate idle entities if `remember-entities` and `passivate-idle-entity-after` are the defaults" in {
|
||||
ClusterShardingSettings(system).shouldPassivateIdleEntities shouldEqual true
|
||||
}
|
||||
|
||||
"disable passivation if `remember-entities` is enabled" in {
|
||||
ClusterShardingSettings(system).withRememberEntities(true).shouldPassivateIdleEntities shouldEqual false
|
||||
}
|
||||
|
||||
"disable passivation if `remember-entities` is enabled and `passivate-idle-entity-after` is 0 or 'off'" in {
|
||||
ClusterShardingSettings(system)
|
||||
.withRememberEntities(true)
|
||||
.withPassivateIdleAfter(Duration.Zero)
|
||||
.shouldPassivateIdleEntities shouldEqual false
|
||||
}
|
||||
|
||||
"disable passivation if `remember-entities` is the default and `passivate-idle-entity-after` is 0 or 'off'" in {
|
||||
ClusterShardingSettings(system)
|
||||
.withPassivateIdleAfter(Duration.Zero)
|
||||
.shouldPassivateIdleEntities shouldEqual false
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue