diff --git a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala index 774f479cf5..2f162016ce 100644 --- a/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala +++ b/akka-cluster-sharding-typed/src/main/scala/akka/cluster/sharding/typed/ClusterShardingSettings.scala @@ -4,8 +4,7 @@ package akka.cluster.sharding.typed -import scala.concurrent.duration.FiniteDuration - +import scala.concurrent.duration.{ Duration, FiniteDuration } import akka.actor.NoSerializationVerificationNeeded import akka.actor.typed.ActorSystem import akka.annotation.InternalApi @@ -42,6 +41,7 @@ object ClusterShardingSettings { rememberEntities = untypedSettings.rememberEntities, journalPluginId = untypedSettings.journalPluginId, snapshotPluginId = untypedSettings.snapshotPluginId, + passivateIdleEntityAfter = untypedSettings.passivateIdleEntityAfter, stateStoreMode = StateStoreMode.byName(untypedSettings.stateStoreMode), new TuningParameters(untypedSettings.tuningParameters), new ClusterSingletonManagerSettings( @@ -61,6 +61,7 @@ object ClusterShardingSettings { journalPluginId = settings.journalPluginId, snapshotPluginId = settings.snapshotPluginId, stateStoreMode = settings.stateStoreMode.name, + passivateIdleEntityAfter = settings.passivateIdleEntityAfter, new UntypedShardingSettings.TuningParameters( bufferSize = settings.tuningParameters.bufferSize, coordinatorFailureBackoff = settings.tuningParameters.coordinatorFailureBackoff, @@ -233,6 +234,10 @@ object ClusterShardingSettings { * be used for the internal persistence of ClusterSharding. If not defined the default * journal plugin is used. Note that this is not related to persistence used by the entity * actors. + * @param passivateIdleEntityAfter Passivate entities that have not received any message in this interval. + * Note that only messages sent through sharding are counted, so direct messages + * to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. + * Use 0 to disable automatic passivation. * @param snapshotPluginId Absolute path to the snapshot plugin configuration entity that is to * be used for the internal persistence of ClusterSharding. If not defined the default * snapshot plugin is used. Note that this is not related to persistence used by the entity @@ -246,6 +251,7 @@ final class ClusterShardingSettings( val rememberEntities: Boolean, val journalPluginId: String, val snapshotPluginId: String, + val passivateIdleEntityAfter: FiniteDuration, val stateStoreMode: ClusterShardingSettings.StateStoreMode, val tuningParameters: ClusterShardingSettings.TuningParameters, val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded { @@ -290,6 +296,12 @@ final class ClusterShardingSettings( def withStateStoreMode(stateStoreMode: ClusterShardingSettings.StateStoreMode): ClusterShardingSettings = copy(stateStoreMode = stateStoreMode) + def withPassivateIdleEntitiesAfter(duration: FiniteDuration): ClusterShardingSettings = + copy(passivateIdleEntityAfter = duration) + + def withPassivateIdleEntityAfter(duration: java.time.Duration): ClusterShardingSettings = + copy(passivateIdleEntityAfter = duration.asScala) + /** * The `role` of the `ClusterSingletonManagerSettings` is not used. The `role` of the * coordinator singleton will be the same as the `role` of `ClusterShardingSettings`. @@ -305,7 +317,8 @@ final class ClusterShardingSettings( snapshotPluginId: String = snapshotPluginId, stateStoreMode: ClusterShardingSettings.StateStoreMode = stateStoreMode, tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters, - coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings = + coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings, + passivateIdleEntityAfter: FiniteDuration = passivateIdleEntityAfter): ClusterShardingSettings = new ClusterShardingSettings( numberOfShards, role, @@ -313,6 +326,7 @@ final class ClusterShardingSettings( rememberEntities, journalPluginId, snapshotPluginId, + passivateIdleEntityAfter, stateStoreMode, tuningParameters, coordinatorSingletonSettings) diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 4dba0a92e3..4cf2117278 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -23,6 +23,10 @@ akka.cluster.sharding { # due to rebalance or crash. remember-entities = off + # Set this to a time duration to have sharding passivate entities when they have not + # gotten any message in this long time. Set to 'off' to disable. + passivate-idle-entity-after = off + # If the coordinator can't store state changes it will be stopped # and started again after this duration, with an exponential back-off # of up to 5 times this duration. diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala index 17fea285a6..25f5fba600 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterShardingSettings.scala @@ -12,6 +12,7 @@ import akka.annotation.InternalApi import com.typesafe.config.Config import akka.cluster.Cluster import akka.cluster.singleton.ClusterSingletonManagerSettings +import akka.util.JavaDurationConverters._ object ClusterShardingSettings { @@ -53,12 +54,17 @@ object ClusterShardingSettings { val coordinatorSingletonSettings = ClusterSingletonManagerSettings(config.getConfig("coordinator-singleton")) + val passivateIdleAfter = + if (config.getString("passivate-idle-entity-after").toLowerCase == "off") Duration.Zero + else config.getDuration("passivate-idle-entity-after", MILLISECONDS).millis + new ClusterShardingSettings( role = roleOption(config.getString("role")), rememberEntities = config.getBoolean("remember-entities"), journalPluginId = config.getString("journal-plugin-id"), snapshotPluginId = config.getString("snapshot-plugin-id"), stateStoreMode = config.getString("state-store-mode"), + passivateIdleEntityAfter = passivateIdleAfter, tuningParameters, coordinatorSingletonSettings) } @@ -175,6 +181,7 @@ object ClusterShardingSettings { 100.milliseconds, 5) } + } } @@ -191,6 +198,10 @@ object ClusterShardingSettings { * be used for the internal persistence of ClusterSharding. If not defined the default * snapshot plugin is used. Note that this is not related to persistence used by the entity * actors. + * @param passivateIdleEntityAfter Passivate entities that have not received any message in this interval. + * Note that only messages sent through sharding are counted, so direct messages + * to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. + * Use 0 to disable automatic passivation. * @param tuningParameters additional tuning parameters, see descriptions in reference.conf */ final class ClusterShardingSettings( @@ -199,9 +210,22 @@ final class ClusterShardingSettings( val journalPluginId: String, val snapshotPluginId: String, val stateStoreMode: String, + val passivateIdleEntityAfter: FiniteDuration, val tuningParameters: ClusterShardingSettings.TuningParameters, val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded { + // included for binary compatibility reasons + @deprecated("Use the ClusterShardingSettings factory methods or the constructor including passivateIdleEntityAfter instead", "2.5.18") + def this( + role: Option[String], + rememberEntities: Boolean, + journalPluginId: String, + snapshotPluginId: String, + stateStoreMode: String, + tuningParameters: ClusterShardingSettings.TuningParameters, + coordinatorSingletonSettings: ClusterSingletonManagerSettings) = + this(role, rememberEntities, journalPluginId, snapshotPluginId, stateStoreMode, Duration.Zero, tuningParameters, coordinatorSingletonSettings) + import ClusterShardingSettings.{ StateStoreModePersistence, StateStoreModeDData } require( stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData, @@ -231,6 +255,12 @@ final class ClusterShardingSettings( def withStateStoreMode(stateStoreMode: String): ClusterShardingSettings = copy(stateStoreMode = stateStoreMode) + def withPassivateIdleAfter(duration: FiniteDuration): ClusterShardingSettings = + copy(passivateIdleAfter = duration) + + def withPassivateIdleAfter(duration: java.time.Duration): ClusterShardingSettings = + copy(passivateIdleAfter = duration.asScala) + /** * The `role` of the `ClusterSingletonManagerSettings` is not used. The `role` of the * coordinator singleton will be the same as the `role` of `ClusterShardingSettings`. @@ -244,14 +274,17 @@ final class ClusterShardingSettings( journalPluginId: String = journalPluginId, snapshotPluginId: String = snapshotPluginId, stateStoreMode: String = stateStoreMode, + passivateIdleAfter: FiniteDuration = passivateIdleEntityAfter, tuningParameters: ClusterShardingSettings.TuningParameters = tuningParameters, coordinatorSingletonSettings: ClusterSingletonManagerSettings = coordinatorSingletonSettings): ClusterShardingSettings = + new ClusterShardingSettings( role, rememberEntities, journalPluginId, snapshotPluginId, stateStoreMode, + passivateIdleAfter, tuningParameters, coordinatorSingletonSettings) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 0c5fd92642..dc7d12830c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -12,7 +12,6 @@ import akka.actor.ActorSystem import akka.actor.Deploy import akka.actor.Props import akka.actor.Terminated -import akka.cluster.sharding.Shard.ShardCommand import akka.actor.Actor import akka.util.MessageBufferMap @@ -26,6 +25,8 @@ import akka.actor.Stash import akka.persistence._ import akka.actor.NoSerializationVerificationNeeded +import scala.concurrent.duration._ + /** * INTERNAL API * @see [[ClusterSharding$ ClusterSharding extension]] @@ -115,6 +116,9 @@ private[akka] object Shard { Props(new Shard(typeName, shardId, entityProps, settings, extractEntityId, extractShardId, handOffStopMessage)) .withDeploy(Deploy.local) } + + private case object PassivateIdleTick extends NoSerializationVerificationNeeded + } /** @@ -144,11 +148,20 @@ private[akka] class Shard( var state = State.Empty var idByRef = Map.empty[ActorRef, EntityId] var refById = Map.empty[EntityId, ActorRef] + var lastMessageTimestamp = Map.empty[EntityId, Long] var passivating = Set.empty[ActorRef] val messageBuffers = new MessageBufferMap[EntityId] var handOffStopper: Option[ActorRef] = None + import context.dispatcher + val passivateIdleTask = if (settings.passivateIdleEntityAfter > Duration.Zero) { + val idleInterval = settings.passivateIdleEntityAfter / 2 + Some(context.system.scheduler.schedule(idleInterval, idleInterval, self, PassivateIdleTick)) + } else { + None + } + initialized() def initialized(): Unit = context.parent ! ShardInitialized(shardId) @@ -166,6 +179,7 @@ private[akka] class Shard( case msg: ShardRegion.StartEntityAck ⇒ receiveStartEntityAck(msg) case msg: ShardRegionCommand ⇒ receiveShardRegionCommand(msg) case msg: ShardQuery ⇒ receiveShardQuery(msg) + case PassivateIdleTick ⇒ passivateIdleEntities() case msg if extractEntityId.isDefinedAt(msg) ⇒ deliverMessage(msg, sender()) } @@ -176,6 +190,9 @@ private[akka] class Shard( def receiveStartEntity(start: ShardRegion.StartEntity): Unit = { log.debug("Got a request from [{}] to start entity [{}] in shard [{}]", sender(), start.entityId, shardId) + if (passivateIdleTask.isDefined) { + lastMessageTimestamp = lastMessageTimestamp.updated(start.entityId, System.nanoTime()) + } getEntity(start.entityId) sender() ! ShardRegion.StartEntityAck(start.entityId, shardId) } @@ -240,6 +257,9 @@ private[akka] class Shard( val id = idByRef(ref) idByRef -= ref refById -= id + if (passivateIdleTask.isDefined) { + lastMessageTimestamp -= id + } if (messageBuffers.getOrEmpty(id).nonEmpty) { log.debug("Starting entity [{}] again, there are buffered messages for it", id) sendMsgBuffer(EntityStarted(id)) @@ -253,8 +273,6 @@ private[akka] class Shard( def passivate(entity: ActorRef, stopMessage: Any): Unit = { idByRef.get(entity) match { case Some(id) ⇒ if (!messageBuffers.contains(id)) { - log.debug("Passivating started on entity {}", id) - passivating = passivating + entity messageBuffers.add(id) entity ! stopMessage @@ -265,6 +283,17 @@ private[akka] class Shard( } } + def passivateIdleEntities(): Unit = { + val deadline = System.nanoTime() - settings.passivateIdleEntityAfter.toNanos + val refsToPassivate = lastMessageTimestamp.collect { + case (entityId, lastMessageTimestamp) if lastMessageTimestamp < deadline ⇒ refById(entityId) + } + if (refsToPassivate.nonEmpty) { + log.debug("Passivating [{}] idle entities", refsToPassivate.size) + refsToPassivate.foreach(passivate(_, handOffStopMessage)) + } + } + // EntityStopped handler def passivateCompleted(event: EntityStopped): Unit = { log.debug("Entity stopped after passivation [{}]", event.entityId) @@ -295,25 +324,31 @@ private[akka] class Shard( if (id == null || id == "") { log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName) context.system.deadLetters ! msg - } else if (payload.isInstanceOf[ShardRegion.StartEntity]) { - // in case it was wrapped, used in Typed - receiveStartEntity(payload.asInstanceOf[ShardRegion.StartEntity]) } else { - messageBuffers.contains(id) match { - case false ⇒ deliverTo(id, msg, payload, snd) + payload match { + case start: ShardRegion.StartEntity ⇒ + // in case it was wrapped, used in Typed + receiveStartEntity(start) + case _ ⇒ + messageBuffers.contains(id) match { + case false ⇒ deliverTo(id, msg, payload, snd) - case true if messageBuffers.totalSize >= bufferSize ⇒ - log.debug("Buffer is full, dropping message for entity [{}]", id) - context.system.deadLetters ! msg + case true if messageBuffers.totalSize >= bufferSize ⇒ + log.debug("Buffer is full, dropping message for entity [{}]", id) + context.system.deadLetters ! msg - case true ⇒ - log.debug("Message for entity [{}] buffered", id) - messageBuffers.append(id, msg, snd) + case true ⇒ + log.debug("Message for entity [{}] buffered", id) + messageBuffers.append(id, msg, snd) + } } } } def deliverTo(id: EntityId, msg: Any, payload: Msg, snd: ActorRef): Unit = { + if (passivateIdleTask.isDefined) { + lastMessageTimestamp = lastMessageTimestamp.updated(id, System.nanoTime()) + } val name = URLEncoder.encode(id, "utf-8") context.child(name) match { case Some(actor) ⇒ actor.tell(payload, snd) @@ -329,10 +364,17 @@ private[akka] class Shard( val a = context.watch(context.actorOf(entityProps(id), name)) idByRef = idByRef.updated(a, id) refById = refById.updated(id, a) + if (passivateIdleTask.isDefined) { + lastMessageTimestamp += (id -> System.nanoTime()) + } state = state.copy(state.entities + id) a } } + + override def postStop(): Unit = { + passivateIdleTask.foreach(_.cancel()) + } } private[akka] object RememberEntityStarter { @@ -425,6 +467,9 @@ private[akka] trait RememberingShard { selfType: Shard ⇒ val id = idByRef(ref) idByRef -= ref refById -= id + if (passivateIdleTask.isDefined) { + lastMessageTimestamp -= id + } if (messageBuffers.getOrEmpty(id).nonEmpty) { //Note; because we're not persisting the EntityStopped, we don't need // to persist the EntityStarted either. diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index fe5df78113..bac89f11c1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -7,7 +7,7 @@ package akka.cluster.sharding import java.net.URLEncoder import akka.pattern.AskTimeoutException -import akka.util.{ MessageBufferMap, Timeout } +import akka.util.{ MessageBufferMap, PrettyDuration, Timeout } import akka.pattern.{ ask, pipe } import akka.actor._ import akka.cluster.Cluster @@ -418,6 +418,8 @@ private[akka] class ShardRegion( // subscribe to MemberEvent, re-subscribe when restart override def preStart(): Unit = { cluster.subscribe(self, classOf[MemberEvent]) + if (settings.passivateIdleEntityAfter > Duration.Zero) + log.info("Idle entities will be passivated after [{}]", PrettyDuration.format(settings.passivateIdleEntityAfter)) } override def postStop(): Unit = { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/InactiveEntityPassivationSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/InactiveEntityPassivationSpec.scala new file mode 100644 index 0000000000..5763325969 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/InactiveEntityPassivationSpec.scala @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2009-2018 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.actor.{ Actor, ActorRef, Props } +import akka.cluster.Cluster +import akka.cluster.sharding.InactiveEntityPassivationSpec.Entity.GotIt +import akka.testkit.{ AkkaSpec, TestProbe } +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +object InactiveEntityPassivationSpec { + val config = ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.cluster.sharding.passivate-idle-entity-after = 3 s + akka.actor.serialize-messages = off + """) + + object Passivate + object Entity { + def props(probe: ActorRef) = Props(new Entity(probe)) + case class GotIt(id: String, msg: Any, when: Long) + } + class Entity(probe: ActorRef) extends Actor { + + def id = context.self.path.name + + def receive = { + case Passivate ⇒ + probe ! id + " passivating" + context.stop(self) + case msg ⇒ probe ! GotIt(id, msg, System.nanoTime()) + } + + } + + val extractEntityId: ShardRegion.ExtractEntityId = { + case msg: Int ⇒ (msg.toString, msg) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case msg: Int ⇒ (msg % 10).toString + } + +} + +class InactiveEntityPassivationSpec extends AkkaSpec(InactiveEntityPassivationSpec.config) { + import InactiveEntityPassivationSpec._ + + "Passivation of inactive entities" must { + + "passivate entities when they haven't seen messages for the configured duration" in { + // single node cluster + Cluster(system).join(Cluster(system).selfAddress) + val probe = TestProbe() + val settings = ClusterShardingSettings(system) + val region = ClusterSharding(system).start( + "myType", + InactiveEntityPassivationSpec.Entity.props(probe.ref), + settings, + extractEntityId, + extractShardId, + ClusterSharding(system).defaultShardAllocationStrategy(settings), + Passivate + ) + + region ! 1 + region ! 2 + val responses = Set( + probe.expectMsgType[GotIt], + probe.expectMsgType[GotIt]) + responses.map(_.id) should ===(Set("1", "2")) + val timeOneSawMessage = responses.find(_.id == "1").get.when + Thread.sleep(1000) + region ! 2 + probe.expectMsgType[GotIt].id should ===("2") + Thread.sleep(1000) + region ! 2 + probe.expectMsgType[GotIt].id should ===("2") + + // make sure "1" hasn't seen a message in 3 seconds and passivates + val timeSinceOneSawAMessage = (System.nanoTime() - timeOneSawMessage).nanos + probe.expectNoMessage(3.seconds - timeSinceOneSawAMessage) + probe.expectMsg("1 passivating") + + // but it can be re activated just fine: + region ! 1 + region ! 2 + Set( + probe.expectMsgType[GotIt], + probe.expectMsgType[GotIt]).map(_.id) should ===(Set("1", "2")) + + } + } + +} diff --git a/akka-docs/src/main/paradox/cluster-sharding.md b/akka-docs/src/main/paradox/cluster-sharding.md index b4e23ca319..6a390ef5cb 100644 --- a/akka-docs/src/main/paradox/cluster-sharding.md +++ b/akka-docs/src/main/paradox/cluster-sharding.md @@ -334,6 +334,15 @@ then supposed to stop itself. Incoming messages will be buffered by the `Shard` between reception of `Passivate` and termination of the entity. Such buffered messages are thereafter delivered to a new incarnation of the entity. +### Automatic Passivation + +The entities can be configured to be automatically passivated if they haven't received +a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, +or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable +time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages +to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. +By default automatic passivation is disabled. + ## Remembering Entities diff --git a/akka-docs/src/main/paradox/typed/cluster-sharding.md b/akka-docs/src/main/paradox/typed/cluster-sharding.md index c7b346540b..3e1e8834f1 100644 --- a/akka-docs/src/main/paradox/typed/cluster-sharding.md +++ b/akka-docs/src/main/paradox/typed/cluster-sharding.md @@ -116,3 +116,12 @@ Scala Java : @@snip [ShardingCompileOnlyTest.java](/akka-cluster-sharding-typed/src/test/java/jdocs/akka/cluster/sharding/typed/ShardingCompileOnlyTest.java) { #counter-messages #counter-passivate #counter-passivate-start } + +### Automatic Passivation + +The entities can be configured to be automatically passivated if they haven't received +a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting, +or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable +time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages +to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity. +By default automatic passivation is disabled. \ No newline at end of file