diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 457901a76a..aec14d5eaa 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -847,10 +847,15 @@ private[akka] class Shard( if (verboseDebug) log.debug("Stop of [{}] arrived, already is among the pending stops", entityId) case Active(_) => - log.debug("Entity [{}] stopped without passivating, will restart after backoff", entityId) - entities.waitingForRestart(entityId) - val msg = RestartTerminatedEntity(entityId) - timers.startSingleTimer(msg, msg, entityRestartBackoff) + if (rememberEntitiesStore.isDefined) { + log.debug("Entity [{}] stopped without passivating, will restart after backoff", entityId) + entities.waitingForRestart(entityId) + val msg = RestartTerminatedEntity(entityId) + timers.startSingleTimer(msg, msg, entityRestartBackoff) + } else { + log.debug("Entity [{}] terminated", entityId) + entities.removeEntity(entityId) + } case Passivating(_) => if (entities.pendingRememberedEntitiesExist()) { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityTerminationSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityTerminationSpec.scala new file mode 100644 index 0000000000..d88e317416 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityTerminationSpec.scala @@ -0,0 +1,147 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.actor.Actor +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.testkit.WithLogCapturing +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +/** + * Verifies that the automatic restart on terminate/crash that is in place for remember entities does not apply + * when remember entities is not enabled + */ +object EntityTerminationSpec { + + final case class EntityEnvelope(id: String, msg: Any) + + def config = ConfigFactory.parseString(""" + akka.loglevel=DEBUG + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + akka.actor.provider = cluster + akka.remote.artery.canonical.port = 0 + akka.remote.classic.netty.tcp.port = 0 + akka.cluster.sharding.state-store-mode = ddata + # no leaks between test runs thank you + akka.cluster.sharding.distributed-data.durable.keys = [] + akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.entity-restart-backoff = 250ms + """.stripMargin) + + object StoppingActor { + def props(): Props = Props(new StoppingActor) + } + class StoppingActor extends Actor { + def receive = { + case "stop" => context.stop(self) + case "ping" => sender() ! "pong" + case "passivate" => context.parent ! ShardRegion.Passivate("stop") + } + } +} + +class EntityTerminationSpec extends AkkaSpec(EntityTerminationSpec.config) with ImplicitSender with WithLogCapturing { + + import EntityTerminationSpec._ + + val extractEntityId: ShardRegion.ExtractEntityId = { + case EntityEnvelope(id, payload) => (id.toString, payload) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case EntityEnvelope(_, _) => "1" // single shard for all entities + case ShardRegion.StartEntity(_) => "1" + } + + override def atStartup(): Unit = { + // Form a one node cluster + val cluster = Cluster(system) + cluster.join(cluster.selfAddress) + awaitAssert(cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1)) + } + + "Sharding, when an entity terminates" must { + + "allow stop without passivation if not remembering entities" in { + val sharding = ClusterSharding(system).start( + "regular", + StoppingActor.props(), + ClusterShardingSettings(system), + extractEntityId, + extractShardId) + + sharding ! EntityEnvelope("1", "ping") + expectMsg("pong") + val entity = lastSender + watch(entity) + + sharding ! EntityEnvelope("1", "stop") + expectTerminated(entity) + + Thread.sleep(400) // restart backoff is 250 ms + sharding ! ShardRegion.GetShardRegionState + val regionState = expectMsgType[ShardRegion.CurrentShardRegionState] + regionState.shards should have size (1) + regionState.shards.head.entityIds should be(empty) + } + + "automatically restart a terminating entity (not passivating) if remembering entities" in { + val sharding = ClusterSharding(system).start( + "remembering", + StoppingActor.props(), + ClusterShardingSettings(system).withRememberEntities(true), + extractEntityId, + extractShardId) + + sharding ! EntityEnvelope("1", "ping") + expectMsg("pong") + val entity = lastSender + watch(entity) + + sharding ! EntityEnvelope("1", "stop") + expectTerminated(entity) + + Thread.sleep(400) // restart backoff is 250 ms + awaitAssert({ + sharding ! ShardRegion.GetShardRegionState + val regionState = expectMsgType[ShardRegion.CurrentShardRegionState] + regionState.shards should have size (1) + regionState.shards.head.entityIds should have size (1) + }, 2.seconds) + } + + "allow terminating entity to passivate if remembering entities" in { + val sharding = ClusterSharding(system).start( + "remembering", + StoppingActor.props(), + ClusterShardingSettings(system).withRememberEntities(true), + extractEntityId, + extractShardId) + + sharding ! EntityEnvelope("1", "ping") + expectMsg("pong") + val entity = lastSender + watch(entity) + + sharding ! EntityEnvelope("1", "passivate") + expectTerminated(entity) + Thread.sleep(400) // restart backoff is 250 ms + + sharding ! ShardRegion.GetShardRegionState + val regionState = expectMsgType[ShardRegion.CurrentShardRegionState] + regionState.shards should have size (1) + regionState.shards.head.entityIds should have size (0) + + } + + } + +}