diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 4a60ccc710..fbbd5cf9c0 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -311,6 +311,11 @@ final class ClusterShardingSettings( private[akka] def shouldHostShard(cluster: Cluster): Boolean = role.forall(cluster.selfMember.roles.contains) + /** If true, idle entities should be passivated if they have not received any message by this interval, otherwise it is not enabled. */ + @InternalApi + private[akka] val shouldPassivateIdleEntities: Boolean = + passivateIdleEntityAfter > Duration.Zero && !rememberEntities + def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role)) def withRole(role: Option[String]): ClusterShardingSettings = copy(role = role) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 5be4791513..553a4bda31 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -198,7 +198,7 @@ private[akka] class Shard( private var handOffStopper: Option[ActorRef] = None import context.dispatcher - val passivateIdleTask = if (settings.passivateIdleEntityAfter > Duration.Zero && !settings.rememberEntities) { + val passivateIdleTask = if (settings.shouldPassivateIdleEntities) { val idleInterval = settings.passivateIdleEntityAfter / 2 Some(context.system.scheduler.scheduleWithFixedDelay(idleInterval, idleInterval, self, PassivateIdleTick)) } else { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index d30ed877e5..118a69b6c3 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -535,11 +535,7 @@ private[akka] class ShardRegion( cluster.subscribe(self, classOf[MemberEvent]) timers.startTimerWithFixedDelay(Retry, Retry, retryInterval) startRegistration() - if (settings.passivateIdleEntityAfter > Duration.Zero && !settings.rememberEntities) - log.info( - "{}: Idle entities will be passivated after [{}]", - typeName, - PrettyDuration.format(settings.passivateIdleEntityAfter)) + logPassivateIdleEntities() } override def postStop(): Unit = { @@ -548,6 +544,17 @@ private[akka] class ShardRegion( gracefulShutdownProgress.trySuccess(Done) } + private def logPassivateIdleEntities(): Unit = { + if (settings.shouldPassivateIdleEntities) + log.info( + "{}: Idle entities will be passivated after [{}]", + typeName, + PrettyDuration.format(settings.passivateIdleEntityAfter)) + + if (settings.rememberEntities) + log.debug("Idle entities will not be passivated because 'rememberEntities' is enabled.") + } + // when using proxy the data center can be different from the own data center private val targetDcRole = dataCenter match { case Some(t) => ClusterSettings.DcRolePrefix + t diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala index b60838ba59..232aa80585 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala @@ -32,16 +32,13 @@ class ClusterShardingInternalsSpec extends AkkaSpec(""" |akka.actor.provider = cluster |akka.remote.classic.netty.tcp.port = 0 |akka.remote.artery.canonical.port = 0 - |akka.cluster.sharding.shard-region-query-timeout = 10 s |""".stripMargin) with MockitoSugar { import ClusterShardingInternalsSpec._ val clusterSharding = spy(new ClusterSharding(system.asInstanceOf[ExtendedActorSystem])) "ClusterSharding" must { - "have a configurable shard region query timeout" in { - ClusterShardingSettings(system).shardRegionQueryTimeout shouldEqual 10.seconds - } + "start a region in proxy mode in case of node role mismatch" in { val settingsWithRole = ClusterShardingSettings(system).withRole("nonExistingRole") @@ -66,7 +63,7 @@ class ClusterShardingInternalsSpec extends AkkaSpec(""" ArgumentMatchers.eq(extractShardId)) } - "HandOffStopper must stop the entity even if the entity doesn't handle handOffStopMessage" in { + "stop entities from HandOffStopper even if the entity doesn't handle handOffStopMessage" in { val probe = TestProbe() val shardName = "test" val emptyHandlerActor = system.actorOf(Props(new EmptyHandlerActor)) @@ -77,6 +74,7 @@ class ClusterShardingInternalsSpec extends AkkaSpec(""" expectTerminated(emptyHandlerActor, 1.seconds) probe.expectMsg(1.seconds, ShardStopped(shardName)) + probe.lastSender shouldEqual handOffStopper watch(handOffStopper) expectTerminated(handOffStopper, 1.seconds) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingSettingsSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingSettingsSpec.scala new file mode 100644 index 0000000000..73de46811d --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingSettingsSpec.scala @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.cluster.sharding + +import scala.concurrent.duration._ + +import akka.testkit.AkkaSpec + +class ClusterShardingSettingsSpec extends AkkaSpec(s""" + akka.actor.provider = cluster + akka.remote.classic.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + """) { + + "ClusterShardingSettings" must { + + "passivate idle entities if `remember-entities` and `passivate-idle-entity-after` are the defaults" in { + ClusterShardingSettings(system).shouldPassivateIdleEntities shouldEqual true + } + + "disable passivation if `remember-entities` is enabled" in { + ClusterShardingSettings(system).withRememberEntities(true).shouldPassivateIdleEntities shouldEqual false + } + + "disable passivation if `remember-entities` is enabled and `passivate-idle-entity-after` is 0 or 'off'" in { + ClusterShardingSettings(system) + .withRememberEntities(true) + .withPassivateIdleAfter(Duration.Zero) + .shouldPassivateIdleEntities shouldEqual false + } + + "disable passivation if `remember-entities` is the default and `passivate-idle-entity-after` is 0 or 'off'" in { + ClusterShardingSettings(system) + .withPassivateIdleAfter(Duration.Zero) + .shouldPassivateIdleEntities shouldEqual false + } + + } + +}