parent
22f31b4d69
commit
5fce4bea63
5 changed files with 78 additions and 49 deletions
|
|
@ -24,8 +24,8 @@ akka.cluster.sharding {
|
|||
remember-entities = off
|
||||
|
||||
# Set this to a time duration to have sharding passivate entities when they have not
|
||||
# gotten any message in this long time. Set to 'off' to disable.
|
||||
passivate-idle-entity-after = off
|
||||
# received any message in this length of time. Set to 'off' to disable.
|
||||
passivate-idle-entity-after = 120s
|
||||
|
||||
# If the coordinator can't store state changes it will be stopped
|
||||
# and started again after this duration, with an exponential back-off
|
||||
|
|
|
|||
|
|
@ -4,24 +4,32 @@
|
|||
|
||||
package akka.cluster.sharding
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.{ Actor, ActorRef, Props }
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.sharding.InactiveEntityPassivationSpec.Entity.GotIt
|
||||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import com.typesafe.config.Config
|
||||
|
||||
object InactiveEntityPassivationSpec {
|
||||
|
||||
val config = ConfigFactory.parseString("""
|
||||
akka.loglevel = INFO
|
||||
akka.actor.provider = "cluster"
|
||||
akka.remote.netty.tcp.port = 0
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.cluster.sharding.passivate-idle-entity-after = 3 s
|
||||
akka.actor.serialize-messages = off
|
||||
""")
|
||||
|
||||
val enabledConfig = ConfigFactory.parseString("""
|
||||
akka.cluster.sharding.passivate-idle-entity-after = 3 s
|
||||
""").withFallback(config)
|
||||
|
||||
val disabledConfig =
|
||||
ConfigFactory.parseString("""akka.cluster.sharding.passivate-idle-entity-after = off""").withFallback(config)
|
||||
|
||||
object Passivate
|
||||
object Entity {
|
||||
def props(probe: ActorRef) = Props(new Entity(probe))
|
||||
|
|
@ -33,11 +41,9 @@ object InactiveEntityPassivationSpec {
|
|||
|
||||
def receive = {
|
||||
case Passivate =>
|
||||
probe ! id + " passivating"
|
||||
context.stop(self)
|
||||
case msg => probe ! GotIt(id, msg, System.nanoTime())
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
val extractEntityId: ShardRegion.ExtractEntityId = {
|
||||
|
|
@ -47,54 +53,71 @@ object InactiveEntityPassivationSpec {
|
|||
val extractShardId: ShardRegion.ExtractShardId = {
|
||||
case msg: Int => (msg % 10).toString
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class InactiveEntityPassivationSpec extends AkkaSpec(InactiveEntityPassivationSpec.config) {
|
||||
abstract class AbstractInactiveEntityPassivationSpec(c: Config) extends AkkaSpec(c) {
|
||||
import InactiveEntityPassivationSpec._
|
||||
|
||||
val smallTolerance = 300.millis
|
||||
private val smallTolerance = 300.millis
|
||||
|
||||
private val settings = ClusterShardingSettings(system)
|
||||
|
||||
def start(probe: TestProbe): ActorRef = {
|
||||
// single node cluster
|
||||
Cluster(system).join(Cluster(system).selfAddress)
|
||||
ClusterSharding(system).start(
|
||||
"myType",
|
||||
InactiveEntityPassivationSpec.Entity.props(probe.ref),
|
||||
settings,
|
||||
extractEntityId,
|
||||
extractShardId,
|
||||
ClusterSharding(system).defaultShardAllocationStrategy(settings),
|
||||
Passivate)
|
||||
}
|
||||
|
||||
def timeUntilPassivate(region: ActorRef, probe: TestProbe): FiniteDuration = {
|
||||
region ! 1
|
||||
region ! 2
|
||||
val responses = Set(probe.expectMsgType[GotIt], probe.expectMsgType[GotIt])
|
||||
responses.map(_.id) should ===(Set("1", "2"))
|
||||
val timeOneSawMessage = responses.find(_.id == "1").get.when
|
||||
Thread.sleep(1000)
|
||||
region ! 2
|
||||
probe.expectMsgType[GotIt].id should ===("2")
|
||||
Thread.sleep(1000)
|
||||
region ! 2
|
||||
probe.expectMsgType[GotIt].id should ===("2")
|
||||
|
||||
val timeSinceOneSawAMessage = (System.nanoTime() - timeOneSawMessage).nanos
|
||||
(settings.passivateIdleEntityAfter - timeSinceOneSawAMessage) + smallTolerance
|
||||
}
|
||||
}
|
||||
|
||||
class InactiveEntityPassivationSpec
|
||||
extends AbstractInactiveEntityPassivationSpec(InactiveEntityPassivationSpec.enabledConfig) {
|
||||
"Passivation of inactive entities" must {
|
||||
|
||||
"passivate entities when they haven't seen messages for the configured duration" in {
|
||||
// single node cluster
|
||||
Cluster(system).join(Cluster(system).selfAddress)
|
||||
val probe = TestProbe()
|
||||
val settings = ClusterShardingSettings(system)
|
||||
val region = ClusterSharding(system).start(
|
||||
"myType",
|
||||
InactiveEntityPassivationSpec.Entity.props(probe.ref),
|
||||
settings,
|
||||
extractEntityId,
|
||||
extractShardId,
|
||||
ClusterSharding(system).defaultShardAllocationStrategy(settings),
|
||||
Passivate)
|
||||
|
||||
region ! 1
|
||||
region ! 2
|
||||
val responses = Set(probe.expectMsgType[GotIt], probe.expectMsgType[GotIt])
|
||||
responses.map(_.id) should ===(Set("1", "2"))
|
||||
val timeOneSawMessage = responses.find(_.id == "1").get.when
|
||||
Thread.sleep(1000)
|
||||
region ! 2
|
||||
probe.expectMsgType[GotIt].id should ===("2")
|
||||
Thread.sleep(1000)
|
||||
region ! 2
|
||||
probe.expectMsgType[GotIt].id should ===("2")
|
||||
val region = start(probe)
|
||||
|
||||
// make sure "1" hasn't seen a message in 3 seconds and passivates
|
||||
val timeSinceOneSawAMessage = (System.nanoTime() - timeOneSawMessage).nanos
|
||||
val timeUntilPassivate: FiniteDuration = (3.seconds - timeSinceOneSawAMessage) - smallTolerance
|
||||
probe.expectNoMessage(timeUntilPassivate)
|
||||
probe.expectMsg("1 passivating")
|
||||
probe.expectNoMessage(timeUntilPassivate(region, probe))
|
||||
|
||||
// but it can be re activated just fine:
|
||||
// but it can be re activated
|
||||
region ! 1
|
||||
region ! 2
|
||||
Set(probe.expectMsgType[GotIt], probe.expectMsgType[GotIt]).map(_.id) should ===(Set("1", "2"))
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class DisabledInactiveEntityPassivationSpec
|
||||
extends AbstractInactiveEntityPassivationSpec(InactiveEntityPassivationSpec.disabledConfig) {
|
||||
"Passivation of inactive entities" must {
|
||||
"not passivate when passivation is disabled" in {
|
||||
val probe = TestProbe()
|
||||
val region = start(probe)
|
||||
probe.expectNoMessage(timeUntilPassivate(region, probe))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -344,12 +344,12 @@ are thereafter delivered to a new incarnation of the entity.
|
|||
|
||||
### Automatic Passivation
|
||||
|
||||
The entities can be configured to be automatically passivated if they haven't received
|
||||
a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting,
|
||||
The entities are automatically passivated if they haven't received a message within the duration configured in
|
||||
`akka.cluster.sharding.passivate-idle-entity-after`
|
||||
or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable
|
||||
time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages
|
||||
to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity.
|
||||
By default automatic passivation is disabled.
|
||||
to the `ActorRef` or messages that the actor sends to itself are not counted in this activity.
|
||||
Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`.
|
||||
|
||||
<a id="cluster-sharding-remembering"></a>
|
||||
## Remembering Entities
|
||||
|
|
|
|||
|
|
@ -24,3 +24,9 @@ If you are still using Scala 2.11 then you must upgrade to 2.12 or 2.13
|
|||
Actor DSL is a rarely used feature and has been deprecated since `2.5.0`.
|
||||
Use plain `system.actorOf` instead of the DSL to create Actors if you have been using it.
|
||||
|
||||
## Cluster Sharding
|
||||
|
||||
### Passivate idle entity
|
||||
The configuration `akka.cluster.sharding.passivate-idle-entity-after` is now enabled by default.
|
||||
Sharding will passivate entities when they have not received any messages after this duration.
|
||||
Set to `off` to disable.
|
||||
|
|
@ -123,9 +123,9 @@ message if the entity needs to perform some asynchronous cleanup or interactions
|
|||
|
||||
### Automatic Passivation
|
||||
|
||||
The entities can be configured to be automatically passivated if they haven't received
|
||||
a message for a while using the `akka.cluster.sharding.passivate-idle-entity-after` setting,
|
||||
The entities are automatically passivated if they haven't received a message within the duration configured in
|
||||
`akka.cluster.sharding.passivate-idle-entity-after`
|
||||
or by explicitly setting `ClusterShardingSettings.passivateIdleEntityAfter` to a suitable
|
||||
time to keep the actor alive. Note that only messages sent through sharding are counted, so direct messages
|
||||
to the `ActorRef` of the actor or messages that it sends to itself are not counted as activity.
|
||||
By default automatic passivation is disabled.
|
||||
to the `ActorRef` or messages that the actor sends to itself are not counted in this activity.
|
||||
Passivation can be disabled by setting `akka.cluster.sharding.passivate-idle-entity-after = off`.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue