From 01edcab657b7e5e8d9238553c39d734705bffc9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 15 Jul 2020 08:38:23 +0200 Subject: [PATCH] Allow entities to stop by terminating without remember entities (#29384) * Allow entities to stop by terminating in sharding without remember entities #29383 We missed an allowed transition from running/active to stopped/NoState in shard. when the logic was rewritten. * Add a toggle to opt-in crash shard on illegal state transitions Default is logging an error and not crashing shard and all other entities, our tests have the toggle enabled. * A fix for passivation when not using remember entities fixing #29359 and possibly #27549 --- ...shard-state-transition-regression.excludes | 11 ++ .../src/main/resources/reference.conf | 5 + .../scala/akka/cluster/sharding/Shard.scala | 126 ++++++++++++------ .../MultiNodeClusterShardingConfig.scala | 1 + .../ClusterShardingInternalsSpec.scala | 1 + .../sharding/ClusterShardingLeaseSpec.scala | 1 + .../akka/cluster/sharding/EntitiesSpec.scala | 12 +- .../sharding/EntityTerminationSpec.scala | 25 +++- .../sharding/GetShardTypeNamesSpec.scala | 1 + .../InactiveEntityPassivationSpec.scala | 1 + .../PersistentShardingMigrationSpec.scala | 1 + .../sharding/PersistentStartEntitySpec.scala | 1 + .../cluster/sharding/ProxyShardingSpec.scala | 1 + .../RememberEntitiesBatchedUpdatesSpec.scala | 1 + .../RememberEntitiesFailureSpec.scala | 1 + ...erEntitiesShardIdExtractorChangeSpec.scala | 2 +- .../cluster/sharding/ShardRegionSpec.scala | 1 + .../cluster/sharding/ShardWithLeaseSpec.scala | 1 + .../cluster/sharding/StartEntitySpec.scala | 1 + .../cluster/sharding/SupervisionSpec.scala | 6 +- 20 files changed, 144 insertions(+), 56 deletions(-) create mode 100644 akka-cluster-sharding/src/main/mima-filters/2.6.7.backwards.excludes/shard-state-transition-regression.excludes diff --git a/akka-cluster-sharding/src/main/mima-filters/2.6.7.backwards.excludes/shard-state-transition-regression.excludes b/akka-cluster-sharding/src/main/mima-filters/2.6.7.backwards.excludes/shard-state-transition-regression.excludes new file mode 100644 index 0000000000..bdc6c2f2da --- /dev/null +++ b/akka-cluster-sharding/src/main/mima-filters/2.6.7.backwards.excludes/shard-state-transition-regression.excludes @@ -0,0 +1,11 @@ +# Fixes for state transition in sharding causing a crash, all internals +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#Passivating.transition") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#WaitingForRestart.transition") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#NoState.transition") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#RememberedButNotCreated.transition") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#RememberingStart.transition") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#Active.transition") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#Entities.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#RememberingStop.transition") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#EntityState.transition") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.Shard#EntityState.transition") \ No newline at end of file diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index 78cde1a77c..519fc8f6eb 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -217,6 +217,11 @@ akka.cluster.sharding { # Provide a higher level of details in the debug logs, often per routed message. Be careful about enabling # in production systems. verbose-debug-logging = off + + # Throw an exception if the internal state machine in the Shard actor does an invalid state transition. + # Mostly for the Akka test suite, if off the invalid transition is logged as a warning instead of throwing and + # crashing the shard. + fail-on-invalid-entity-state-transition = off } # //#sharding-ext-config diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index e398300e51..3867e4f719 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -148,7 +148,19 @@ private[akka] object Shard { * }}} **/ sealed trait EntityState { - def transition(newState: EntityState): EntityState + def transition(newState: EntityState, entities: Entities): EntityState + final def invalidTransition(to: EntityState, entities: Entities): EntityState = { + val exception = new IllegalArgumentException( + s"Transition from $this to $to not allowed, remember entities: ${entities.rememberingEntities}") + if (entities.failOnIllegalTransition) { + // crash shard + throw exception + } else { + // log and ignore + entities.log.error(exception, "Ignoring illegal state transition in shard") + to + } + } } /** @@ -157,11 +169,11 @@ private[akka] object Shard { * and as return value instead of null */ case object NoState extends EntityState { - override def transition(newState: EntityState): EntityState = newState match { - case RememberedButNotCreated => RememberedButNotCreated - case remembering: RememberingStart => remembering - case active: Active => active - case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + override def transition(newState: EntityState, entities: Entities): EntityState = newState match { + case RememberedButNotCreated if entities.rememberingEntities => RememberedButNotCreated + case remembering: RememberingStart if entities.rememberingEntities => remembering + case active: Active if !entities.rememberingEntities => active + case _ => invalidTransition(newState, entities) } } @@ -171,10 +183,10 @@ private[akka] object Shard { * remembered entity ids. */ case object RememberedButNotCreated extends EntityState { - override def transition(newState: EntityState): EntityState = newState match { + override def transition(newState: EntityState, entities: Entities): EntityState = newState match { case active: Active => active // started on this shard case RememberingStop => RememberingStop // started on other shard - case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + case _ => invalidTransition(newState, entities) } } @@ -194,7 +206,7 @@ private[akka] object Shard { * to be stored in the next batch. */ final case class RememberingStart(ackTo: Set[ActorRef]) extends EntityState { - override def transition(newState: EntityState): EntityState = newState match { + override def transition(newState: EntityState, entities: Entities): EntityState = newState match { case active: Active => active case r: RememberingStart => if (ackTo.isEmpty) { @@ -204,7 +216,7 @@ private[akka] object Shard { if (r.ackTo.isEmpty) this else RememberingStart(ackTo.union(r.ackTo)) } - case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + case _ => invalidTransition(newState, entities) } } @@ -214,9 +226,9 @@ private[akka] object Shard { * to be stored in the next batch. */ final case object RememberingStop extends EntityState { - override def transition(newState: EntityState): EntityState = newState match { + override def transition(newState: EntityState, entities: Entities): EntityState = newState match { case NoState => NoState - case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + case _ => invalidTransition(newState, entities) } } @@ -224,29 +236,34 @@ private[akka] object Shard { def ref: ActorRef } final case class Active(ref: ActorRef) extends WithRef { - override def transition(newState: EntityState): EntityState = newState match { - case passivating: Passivating => passivating - case WaitingForRestart => WaitingForRestart - case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + override def transition(newState: EntityState, entities: Entities): EntityState = newState match { + case passivating: Passivating => passivating + case WaitingForRestart => WaitingForRestart + case NoState if !entities.rememberingEntities => NoState + case _ => invalidTransition(newState, entities) } } final case class Passivating(ref: ActorRef) extends WithRef { - override def transition(newState: EntityState): EntityState = newState match { - case RememberingStop => RememberingStop - case NoState => NoState - case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + override def transition(newState: EntityState, entities: Entities): EntityState = newState match { + case RememberingStop => RememberingStop + case NoState if !entities.rememberingEntities => NoState + case _ => invalidTransition(newState, entities) } } case object WaitingForRestart extends EntityState { - override def transition(newState: EntityState): EntityState = newState match { + override def transition(newState: EntityState, entities: Entities): EntityState = newState match { case remembering: RememberingStart => remembering case active: Active => active - case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") + case _ => invalidTransition(newState, entities) } } - final class Entities(log: LoggingAdapter, rememberingEntities: Boolean, verboseDebug: Boolean) { + final class Entities( + val log: LoggingAdapter, + val rememberingEntities: Boolean, + verboseDebug: Boolean, + val failOnIllegalTransition: Boolean) { private val entities: java.util.Map[EntityId, EntityState] = new util.HashMap[EntityId, EntityState]() // needed to look up entity by ref when a Passivating is received private val byRef = new util.HashMap[ActorRef, EntityId]() @@ -255,13 +272,13 @@ private[akka] object Shard { def alreadyRemembered(set: Set[EntityId]): Unit = { set.foreach { entityId => - val state = entityState(entityId).transition(RememberedButNotCreated) + val state = entityState(entityId).transition(RememberedButNotCreated, this) entities.put(entityId, state) } } def rememberingStart(entityId: EntityId, ackTo: Option[ActorRef]): Unit = { val newState = RememberingStart(ackTo) - val state = entityState(entityId).transition(newState) + val state = entityState(entityId).transition(newState, this) entities.put(entityId, state) if (rememberingEntities) remembering.add(entityId) @@ -269,7 +286,7 @@ private[akka] object Shard { def rememberingStop(entityId: EntityId): Unit = { val state = entityState(entityId) removeRefIfThereIsOne(state) - entities.put(entityId, state.transition(RememberingStop)) + entities.put(entityId, state.transition(RememberingStop, this)) if (rememberingEntities) remembering.add(entityId) } @@ -281,19 +298,19 @@ private[akka] object Shard { case null => NoState case other => other } - entities.put(id, state.transition(WaitingForRestart)) + entities.put(id, state.transition(WaitingForRestart, this)) } def removeEntity(entityId: EntityId): Unit = { val state = entityState(entityId) // just verify transition - state.transition(NoState) + state.transition(NoState, this) removeRefIfThereIsOne(state) entities.remove(entityId) if (rememberingEntities) remembering.remove(entityId) } def addEntity(entityId: EntityId, ref: ActorRef): Unit = { - val state = entityState(entityId).transition(Active(ref)) + val state = entityState(entityId).transition(Active(ref), this) entities.put(entityId, state) byRef.put(ref, entityId) if (rememberingEntities) @@ -322,7 +339,7 @@ private[akka] object Shard { if (verboseDebug) log.debug("[{}] passivating", entityId) entities.get(entityId) match { case wf: WithRef => - val state = entityState(entityId).transition(Passivating(wf.ref)) + val state = entityState(entityId).transition(Passivating(wf.ref), this) entities.put(entityId, state) case other => throw new IllegalStateException( @@ -421,7 +438,11 @@ private[akka] class Shard( private val flightRecorder = ShardingFlightRecorder(context.system) @InternalStableApi - private val entities = new Entities(log, settings.rememberEntities, verboseDebug) + private val entities = { + val failOnInvalidStateTransition = + context.system.settings.config.getBoolean("akka.cluster.sharding.fail-on-invalid-entity-state-transition") + new Entities(log, settings.rememberEntities, verboseDebug, failOnInvalidStateTransition) + } private var lastMessageTimestamp = Map.empty[EntityId, Long] @@ -851,16 +872,30 @@ private[akka] class Shard( } case Passivating(_) => - if (entities.pendingRememberedEntitiesExist()) { - // will go in next batch update - if (verboseDebug) - log.debug( - "Stop of [{}] after passivating, arrived while updating, adding it to batch of pending stops", - entityId) - entities.rememberingStop(entityId) + if (rememberEntitiesStore.isDefined) { + if (entities.pendingRememberedEntitiesExist()) { + // will go in next batch update + if (verboseDebug) + log.debug( + "[{}] terminated after passivating, arrived while updating, adding it to batch of pending stops", + entityId) + entities.rememberingStop(entityId) + } else { + entities.rememberingStop(entityId) + rememberUpdate(remove = Set(entityId)) + } } else { - entities.rememberingStop(entityId) - rememberUpdate(remove = Set(entityId)) + if (messageBuffers.getOrEmpty(entityId).nonEmpty) { + if (verboseDebug) + log.debug("[{}] terminated after passivating, buffered messages found, restarting", entityId) + entities.removeEntity(entityId) + getOrCreateEntity(entityId) + sendMsgBuffer(entityId) + } else { + if (verboseDebug) + log.debug("[{}] terminated after passivating", entityId) + entities.removeEntity(entityId) + } } case unexpected => val ref = entities.entity(entityId) @@ -919,9 +954,14 @@ private[akka] class Shard( if (hasBufferedMessages) { log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages", entityId) flightRecorder.entityPassivateRestart(entityId) - // trigger start or batch in case we're already writing to the remember store - entities.rememberingStart(entityId, None) - if (!entities.pendingRememberedEntitiesExist()) rememberUpdate(Set(entityId)) + if (rememberEntities) { + // trigger start or batch in case we're already writing to the remember store + entities.rememberingStart(entityId, None) + if (!entities.pendingRememberedEntitiesExist()) rememberUpdate(Set(entityId)) + } else { + getOrCreateEntity(entityId) + sendMsgBuffer(entityId) + } } else { log.debug("Entity stopped after passivation [{}]", entityId) } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala index 8c33ff22db..df984f8197 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingConfig.scala @@ -114,6 +114,7 @@ abstract class MultiNodeClusterShardingConfig( dir = $targetDir/sharding-ddata map-size = 10 MiB } + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on akka.loglevel = $loglevel akka.remote.log-remote-lifecycle-events = off """) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala index f3b1102340..0499ffe829 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingInternalsSpec.scala @@ -33,6 +33,7 @@ class ClusterShardingInternalsSpec extends AkkaSpec(""" |akka.remote.artery.canonical.port = 0 |akka.loglevel = DEBUG |akka.cluster.sharding.verbose-debug-logging = on + |akka.cluster.sharding.fail-on-invalid-entity-state-transition = on |akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] |""".stripMargin) with WithLogCapturing { import ClusterShardingInternalsSpec._ diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingLeaseSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingLeaseSpec.scala index 070c75cf2c..df4f9680a1 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingLeaseSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ClusterShardingLeaseSpec.scala @@ -29,6 +29,7 @@ object ClusterShardingLeaseSpec { keys = [] } verbose-debug-logging = on + fail-on-invalid-entity-state-transition = on } """).withFallback(TestLease.config) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntitiesSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntitiesSpec.scala index 8b0f63408d..6d1fd2a84b 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntitiesSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntitiesSpec.scala @@ -19,7 +19,11 @@ import org.scalatest.wordspec.AnyWordSpec class EntitiesSpec extends AnyWordSpec with Matchers { private def newEntities(rememberingEntities: Boolean) = - new sharding.Shard.Entities(NoLogging, rememberingEntities = rememberingEntities, false) + new sharding.Shard.Entities( + NoLogging, + rememberingEntities = rememberingEntities, + verboseDebug = false, + failOnIllegalTransition = true) "Entities" should { "start empty" in { @@ -54,6 +58,7 @@ class EntitiesSpec extends AnyWordSpec with Matchers { } "set state to remembering stop" in { val entities = newEntities(rememberingEntities = true) + entities.rememberingStart("a", None) // need to go through remembering start to become active entities.addEntity("a", ActorRef.noSender) // need to go through active to passivate entities.entityPassivating("a") // need to go through passivate to remember stop entities.rememberingStop("a") @@ -71,9 +76,10 @@ class EntitiesSpec extends AnyWordSpec with Matchers { "fully remove an entity" in { val entities = newEntities(rememberingEntities = true) - val ref = ActorRef.noSender - entities.addEntity("a", ref) + entities.rememberingStart("a", None) // need to go through remembering start to become active + entities.addEntity("a", ActorRef.noSender) // need to go through active to passivate entities.entityPassivating("a") // needs to go through passivating to be removed + entities.rememberingStop("a") // need to go through remembering stop to become active entities.removeEntity("a") entities.entityState("a") shouldEqual NoState entities.activeEntities() should be(empty) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityTerminationSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityTerminationSpec.scala index d88e317416..558cd6c26a 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityTerminationSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/EntityTerminationSpec.scala @@ -33,6 +33,7 @@ object EntityTerminationSpec { # no leaks between test runs thank you akka.cluster.sharding.distributed-data.durable.keys = [] akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on akka.cluster.sharding.entity-restart-backoff = 250ms """.stripMargin) @@ -40,9 +41,12 @@ object EntityTerminationSpec { def props(): Props = Props(new StoppingActor) } class StoppingActor extends Actor { + private var counter = 0 def receive = { - case "stop" => context.stop(self) - case "ping" => sender() ! "pong" + case "stop" => context.stop(self) + case "ping" => + counter += 1 + sender() ! s"pong-$counter" case "passivate" => context.parent ! ShardRegion.Passivate("stop") } } @@ -79,10 +83,13 @@ class EntityTerminationSpec extends AkkaSpec(EntityTerminationSpec.config) with extractShardId) sharding ! EntityEnvelope("1", "ping") - expectMsg("pong") + expectMsg("pong-1") val entity = lastSender - watch(entity) + sharding ! EntityEnvelope("2", "ping") + expectMsg("pong-1") + + watch(entity) sharding ! EntityEnvelope("1", "stop") expectTerminated(entity) @@ -90,7 +97,11 @@ class EntityTerminationSpec extends AkkaSpec(EntityTerminationSpec.config) with sharding ! ShardRegion.GetShardRegionState val regionState = expectMsgType[ShardRegion.CurrentShardRegionState] regionState.shards should have size (1) - regionState.shards.head.entityIds should be(empty) + regionState.shards.head.entityIds should be(Set("2")) + + // make sure the shard didn't crash (coverage for regression bug #29383) + sharding ! EntityEnvelope("2", "ping") + expectMsg("pong-2") // if it lost state we know it restarted } "automatically restart a terminating entity (not passivating) if remembering entities" in { @@ -102,7 +113,7 @@ class EntityTerminationSpec extends AkkaSpec(EntityTerminationSpec.config) with extractShardId) sharding ! EntityEnvelope("1", "ping") - expectMsg("pong") + expectMsg("pong-1") val entity = lastSender watch(entity) @@ -127,7 +138,7 @@ class EntityTerminationSpec extends AkkaSpec(EntityTerminationSpec.config) with extractShardId) sharding ! EntityEnvelope("1", "ping") - expectMsg("pong") + expectMsg("pong-1") val entity = lastSender watch(entity) diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala index 084d193d33..8b0798aaf0 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/GetShardTypeNamesSpec.scala @@ -18,6 +18,7 @@ object GetShardTypeNamesSpec { akka.actor.provider = "cluster" akka.remote.classic.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on """ val extractEntityId: ShardRegion.ExtractEntityId = { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/InactiveEntityPassivationSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/InactiveEntityPassivationSpec.scala index c6b86cccef..eb130c5289 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/InactiveEntityPassivationSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/InactiveEntityPassivationSpec.scala @@ -24,6 +24,7 @@ object InactiveEntityPassivationSpec { akka.remote.classic.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on """) val enabledConfig = ConfigFactory.parseString(""" diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala index 47cf6ac63b..cf133d3088 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala @@ -37,6 +37,7 @@ object PersistentShardingMigrationSpec { snapshot-after = 5 verbose-debug-logging = on + fail-on-invalid-entity-state-transition = on # Lots of sharding setup, make it quicker retry-interval = 500ms diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentStartEntitySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentStartEntitySpec.scala index f8b2338397..8dfaaf903d 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentStartEntitySpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentStartEntitySpec.scala @@ -47,6 +47,7 @@ object PersistentStartEntitySpec { akka.remote.classic.netty.tcp.port = 0 akka.persistence.journal.plugin = "akka.persistence.journal.inmem" akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on """.stripMargin) } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ProxyShardingSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ProxyShardingSpec.scala index 2c94875d4d..72e52d14a0 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ProxyShardingSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ProxyShardingSpec.scala @@ -21,6 +21,7 @@ object ProxyShardingSpec { akka.remote.classic.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on """ } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesBatchedUpdatesSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesBatchedUpdatesSpec.scala index 6c822607d9..b65d1bf514 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesBatchedUpdatesSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesBatchedUpdatesSpec.scala @@ -50,6 +50,7 @@ object RememberEntitiesBatchedUpdatesSpec { # no leaks between test runs thank you akka.cluster.sharding.distributed-data.durable.keys = [] akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on """.stripMargin) } class RememberEntitiesBatchedUpdatesSpec diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala index b1c60c6a3a..bb47325f98 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesFailureSpec.scala @@ -41,6 +41,7 @@ object RememberEntitiesFailureSpec { akka.cluster.sharding.coordinator-failure-backoff = 1s akka.cluster.sharding.updating-state-timeout = 1s akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on """) class EntityActor extends Actor with ActorLogging { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala index 78dfd950fe..f2cc684354 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala @@ -34,7 +34,7 @@ object RememberEntitiesShardIdExtractorChangeSpec { remember-entities-store = "eventsourced" state-store-mode = "ddata" } - + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" akka.persistence.snapshot-store.local.dir = "target/RememberEntitiesShardIdExtractorChangeSpec-${UUID diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardRegionSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardRegionSpec.scala index f7664aadc3..2da47bd1b8 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardRegionSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardRegionSpec.scala @@ -38,6 +38,7 @@ object ShardRegionSpec { akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning akka.cluster.jmx.enabled = off akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on """)) val shardTypeName = "Caat" diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardWithLeaseSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardWithLeaseSpec.scala index 14dbdbc1fe..b4fd43143b 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardWithLeaseSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ShardWithLeaseSpec.scala @@ -37,6 +37,7 @@ object ShardWithLeaseSpec { lease-operation-timeout = 3s } akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on """ class EntityActor extends Actor with ActorLogging { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/StartEntitySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/StartEntitySpec.scala index cb7ecc61a9..6f75e69479 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/StartEntitySpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/StartEntitySpec.scala @@ -34,6 +34,7 @@ object StartEntitySpec { # no leaks between test runs thank you akka.cluster.sharding.distributed-data.durable.keys = [] akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on """.stripMargin) object EntityActor { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala index b9b99e21f2..e650c47959 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/SupervisionSpec.scala @@ -24,6 +24,7 @@ object SupervisionSpec { akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] akka.loglevel = DEBUG akka.cluster.sharding.verbose-debug-logging = on + akka.cluster.sharding.fail-on-invalid-entity-state-transition = on """) case class Msg(id: Long, msg: Any) @@ -59,6 +60,7 @@ object SupervisionSpec { case "hello" => sender() ! Response(self) case StopMessage => + // note that we never see this because we stop early log.info("Received stop from region") context.parent ! PoisonPill } @@ -71,7 +73,7 @@ class DeprecatedSupervisionSpec extends AkkaSpec(SupervisionSpec.config) with Im "Supervision for a sharded actor (deprecated)" must { - "allow passivation" in { + "allow passivation and early stop" in { val supervisedProps = BackoffOpts @@ -112,7 +114,7 @@ class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSend "Supervision for a sharded actor" must { - "allow passivation" in { + "allow passivation and early stop" in { val supervisedProps = BackoffSupervisor.props( BackoffOpts