Allow entities to stop by terminating without remember entities (#29384)

* Allow entities to stop by terminating in sharding without remember entities #29383
  We missed an allowed transition from running/active to stopped/NoState in shard. 
  when the logic was rewritten.
* Add a toggle to opt-in crash shard on illegal state transitions
  Default is logging an error and not crashing shard and all other entities, our tests have the toggle enabled.
* A fix for passivation when not using remember entities fixing #29359 and possibly #27549
This commit is contained in:
Johan Andrén 2020-07-15 08:38:23 +02:00 committed by GitHub
parent 9739e6f44d
commit 01edcab657
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 144 additions and 56 deletions

View file

@ -0,0 +1,11 @@
# Fixes for state transition in sharding causing a crash, all internals
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#Passivating.transition")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#WaitingForRestart.transition")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#NoState.transition")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#RememberedButNotCreated.transition")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#RememberingStart.transition")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#Active.transition")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#Entities.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#RememberingStop.transition")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.sharding.Shard#EntityState.transition")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.sharding.Shard#EntityState.transition")

View file

@ -217,6 +217,11 @@ akka.cluster.sharding {
# Provide a higher level of details in the debug logs, often per routed message. Be careful about enabling
# in production systems.
verbose-debug-logging = off
# Throw an exception if the internal state machine in the Shard actor does an invalid state transition.
# Mostly for the Akka test suite, if off the invalid transition is logged as a warning instead of throwing and
# crashing the shard.
fail-on-invalid-entity-state-transition = off
}
# //#sharding-ext-config

View file

@ -148,7 +148,19 @@ private[akka] object Shard {
* }}}
**/
sealed trait EntityState {
def transition(newState: EntityState): EntityState
def transition(newState: EntityState, entities: Entities): EntityState
final def invalidTransition(to: EntityState, entities: Entities): EntityState = {
val exception = new IllegalArgumentException(
s"Transition from $this to $to not allowed, remember entities: ${entities.rememberingEntities}")
if (entities.failOnIllegalTransition) {
// crash shard
throw exception
} else {
// log and ignore
entities.log.error(exception, "Ignoring illegal state transition in shard")
to
}
}
}
/**
@ -157,11 +169,11 @@ private[akka] object Shard {
* and as return value instead of null
*/
case object NoState extends EntityState {
override def transition(newState: EntityState): EntityState = newState match {
case RememberedButNotCreated => RememberedButNotCreated
case remembering: RememberingStart => remembering
case active: Active => active
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
override def transition(newState: EntityState, entities: Entities): EntityState = newState match {
case RememberedButNotCreated if entities.rememberingEntities => RememberedButNotCreated
case remembering: RememberingStart if entities.rememberingEntities => remembering
case active: Active if !entities.rememberingEntities => active
case _ => invalidTransition(newState, entities)
}
}
@ -171,10 +183,10 @@ private[akka] object Shard {
* remembered entity ids.
*/
case object RememberedButNotCreated extends EntityState {
override def transition(newState: EntityState): EntityState = newState match {
override def transition(newState: EntityState, entities: Entities): EntityState = newState match {
case active: Active => active // started on this shard
case RememberingStop => RememberingStop // started on other shard
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
case _ => invalidTransition(newState, entities)
}
}
@ -194,7 +206,7 @@ private[akka] object Shard {
* to be stored in the next batch.
*/
final case class RememberingStart(ackTo: Set[ActorRef]) extends EntityState {
override def transition(newState: EntityState): EntityState = newState match {
override def transition(newState: EntityState, entities: Entities): EntityState = newState match {
case active: Active => active
case r: RememberingStart =>
if (ackTo.isEmpty) {
@ -204,7 +216,7 @@ private[akka] object Shard {
if (r.ackTo.isEmpty) this
else RememberingStart(ackTo.union(r.ackTo))
}
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
case _ => invalidTransition(newState, entities)
}
}
@ -214,9 +226,9 @@ private[akka] object Shard {
* to be stored in the next batch.
*/
final case object RememberingStop extends EntityState {
override def transition(newState: EntityState): EntityState = newState match {
override def transition(newState: EntityState, entities: Entities): EntityState = newState match {
case NoState => NoState
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
case _ => invalidTransition(newState, entities)
}
}
@ -224,29 +236,34 @@ private[akka] object Shard {
def ref: ActorRef
}
final case class Active(ref: ActorRef) extends WithRef {
override def transition(newState: EntityState): EntityState = newState match {
case passivating: Passivating => passivating
case WaitingForRestart => WaitingForRestart
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
override def transition(newState: EntityState, entities: Entities): EntityState = newState match {
case passivating: Passivating => passivating
case WaitingForRestart => WaitingForRestart
case NoState if !entities.rememberingEntities => NoState
case _ => invalidTransition(newState, entities)
}
}
final case class Passivating(ref: ActorRef) extends WithRef {
override def transition(newState: EntityState): EntityState = newState match {
case RememberingStop => RememberingStop
case NoState => NoState
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
override def transition(newState: EntityState, entities: Entities): EntityState = newState match {
case RememberingStop => RememberingStop
case NoState if !entities.rememberingEntities => NoState
case _ => invalidTransition(newState, entities)
}
}
case object WaitingForRestart extends EntityState {
override def transition(newState: EntityState): EntityState = newState match {
override def transition(newState: EntityState, entities: Entities): EntityState = newState match {
case remembering: RememberingStart => remembering
case active: Active => active
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
case _ => invalidTransition(newState, entities)
}
}
final class Entities(log: LoggingAdapter, rememberingEntities: Boolean, verboseDebug: Boolean) {
final class Entities(
val log: LoggingAdapter,
val rememberingEntities: Boolean,
verboseDebug: Boolean,
val failOnIllegalTransition: Boolean) {
private val entities: java.util.Map[EntityId, EntityState] = new util.HashMap[EntityId, EntityState]()
// needed to look up entity by ref when a Passivating is received
private val byRef = new util.HashMap[ActorRef, EntityId]()
@ -255,13 +272,13 @@ private[akka] object Shard {
def alreadyRemembered(set: Set[EntityId]): Unit = {
set.foreach { entityId =>
val state = entityState(entityId).transition(RememberedButNotCreated)
val state = entityState(entityId).transition(RememberedButNotCreated, this)
entities.put(entityId, state)
}
}
def rememberingStart(entityId: EntityId, ackTo: Option[ActorRef]): Unit = {
val newState = RememberingStart(ackTo)
val state = entityState(entityId).transition(newState)
val state = entityState(entityId).transition(newState, this)
entities.put(entityId, state)
if (rememberingEntities)
remembering.add(entityId)
@ -269,7 +286,7 @@ private[akka] object Shard {
def rememberingStop(entityId: EntityId): Unit = {
val state = entityState(entityId)
removeRefIfThereIsOne(state)
entities.put(entityId, state.transition(RememberingStop))
entities.put(entityId, state.transition(RememberingStop, this))
if (rememberingEntities)
remembering.add(entityId)
}
@ -281,19 +298,19 @@ private[akka] object Shard {
case null => NoState
case other => other
}
entities.put(id, state.transition(WaitingForRestart))
entities.put(id, state.transition(WaitingForRestart, this))
}
def removeEntity(entityId: EntityId): Unit = {
val state = entityState(entityId)
// just verify transition
state.transition(NoState)
state.transition(NoState, this)
removeRefIfThereIsOne(state)
entities.remove(entityId)
if (rememberingEntities)
remembering.remove(entityId)
}
def addEntity(entityId: EntityId, ref: ActorRef): Unit = {
val state = entityState(entityId).transition(Active(ref))
val state = entityState(entityId).transition(Active(ref), this)
entities.put(entityId, state)
byRef.put(ref, entityId)
if (rememberingEntities)
@ -322,7 +339,7 @@ private[akka] object Shard {
if (verboseDebug) log.debug("[{}] passivating", entityId)
entities.get(entityId) match {
case wf: WithRef =>
val state = entityState(entityId).transition(Passivating(wf.ref))
val state = entityState(entityId).transition(Passivating(wf.ref), this)
entities.put(entityId, state)
case other =>
throw new IllegalStateException(
@ -421,7 +438,11 @@ private[akka] class Shard(
private val flightRecorder = ShardingFlightRecorder(context.system)
@InternalStableApi
private val entities = new Entities(log, settings.rememberEntities, verboseDebug)
private val entities = {
val failOnInvalidStateTransition =
context.system.settings.config.getBoolean("akka.cluster.sharding.fail-on-invalid-entity-state-transition")
new Entities(log, settings.rememberEntities, verboseDebug, failOnInvalidStateTransition)
}
private var lastMessageTimestamp = Map.empty[EntityId, Long]
@ -851,16 +872,30 @@ private[akka] class Shard(
}
case Passivating(_) =>
if (entities.pendingRememberedEntitiesExist()) {
// will go in next batch update
if (verboseDebug)
log.debug(
"Stop of [{}] after passivating, arrived while updating, adding it to batch of pending stops",
entityId)
entities.rememberingStop(entityId)
if (rememberEntitiesStore.isDefined) {
if (entities.pendingRememberedEntitiesExist()) {
// will go in next batch update
if (verboseDebug)
log.debug(
"[{}] terminated after passivating, arrived while updating, adding it to batch of pending stops",
entityId)
entities.rememberingStop(entityId)
} else {
entities.rememberingStop(entityId)
rememberUpdate(remove = Set(entityId))
}
} else {
entities.rememberingStop(entityId)
rememberUpdate(remove = Set(entityId))
if (messageBuffers.getOrEmpty(entityId).nonEmpty) {
if (verboseDebug)
log.debug("[{}] terminated after passivating, buffered messages found, restarting", entityId)
entities.removeEntity(entityId)
getOrCreateEntity(entityId)
sendMsgBuffer(entityId)
} else {
if (verboseDebug)
log.debug("[{}] terminated after passivating", entityId)
entities.removeEntity(entityId)
}
}
case unexpected =>
val ref = entities.entity(entityId)
@ -919,9 +954,14 @@ private[akka] class Shard(
if (hasBufferedMessages) {
log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages", entityId)
flightRecorder.entityPassivateRestart(entityId)
// trigger start or batch in case we're already writing to the remember store
entities.rememberingStart(entityId, None)
if (!entities.pendingRememberedEntitiesExist()) rememberUpdate(Set(entityId))
if (rememberEntities) {
// trigger start or batch in case we're already writing to the remember store
entities.rememberingStart(entityId, None)
if (!entities.pendingRememberedEntitiesExist()) rememberUpdate(Set(entityId))
} else {
getOrCreateEntity(entityId)
sendMsgBuffer(entityId)
}
} else {
log.debug("Entity stopped after passivation [{}]", entityId)
}

View file

@ -114,6 +114,7 @@ abstract class MultiNodeClusterShardingConfig(
dir = $targetDir/sharding-ddata
map-size = 10 MiB
}
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
akka.loglevel = $loglevel
akka.remote.log-remote-lifecycle-events = off
""")

View file

@ -33,6 +33,7 @@ class ClusterShardingInternalsSpec extends AkkaSpec("""
|akka.remote.artery.canonical.port = 0
|akka.loglevel = DEBUG
|akka.cluster.sharding.verbose-debug-logging = on
|akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
|akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
|""".stripMargin) with WithLogCapturing {
import ClusterShardingInternalsSpec._

View file

@ -29,6 +29,7 @@ object ClusterShardingLeaseSpec {
keys = []
}
verbose-debug-logging = on
fail-on-invalid-entity-state-transition = on
}
""").withFallback(TestLease.config)

View file

@ -19,7 +19,11 @@ import org.scalatest.wordspec.AnyWordSpec
class EntitiesSpec extends AnyWordSpec with Matchers {
private def newEntities(rememberingEntities: Boolean) =
new sharding.Shard.Entities(NoLogging, rememberingEntities = rememberingEntities, false)
new sharding.Shard.Entities(
NoLogging,
rememberingEntities = rememberingEntities,
verboseDebug = false,
failOnIllegalTransition = true)
"Entities" should {
"start empty" in {
@ -54,6 +58,7 @@ class EntitiesSpec extends AnyWordSpec with Matchers {
}
"set state to remembering stop" in {
val entities = newEntities(rememberingEntities = true)
entities.rememberingStart("a", None) // need to go through remembering start to become active
entities.addEntity("a", ActorRef.noSender) // need to go through active to passivate
entities.entityPassivating("a") // need to go through passivate to remember stop
entities.rememberingStop("a")
@ -71,9 +76,10 @@ class EntitiesSpec extends AnyWordSpec with Matchers {
"fully remove an entity" in {
val entities = newEntities(rememberingEntities = true)
val ref = ActorRef.noSender
entities.addEntity("a", ref)
entities.rememberingStart("a", None) // need to go through remembering start to become active
entities.addEntity("a", ActorRef.noSender) // need to go through active to passivate
entities.entityPassivating("a") // needs to go through passivating to be removed
entities.rememberingStop("a") // need to go through remembering stop to become active
entities.removeEntity("a")
entities.entityState("a") shouldEqual NoState
entities.activeEntities() should be(empty)

View file

@ -33,6 +33,7 @@ object EntityTerminationSpec {
# no leaks between test runs thank you
akka.cluster.sharding.distributed-data.durable.keys = []
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
akka.cluster.sharding.entity-restart-backoff = 250ms
""".stripMargin)
@ -40,9 +41,12 @@ object EntityTerminationSpec {
def props(): Props = Props(new StoppingActor)
}
class StoppingActor extends Actor {
private var counter = 0
def receive = {
case "stop" => context.stop(self)
case "ping" => sender() ! "pong"
case "stop" => context.stop(self)
case "ping" =>
counter += 1
sender() ! s"pong-$counter"
case "passivate" => context.parent ! ShardRegion.Passivate("stop")
}
}
@ -79,10 +83,13 @@ class EntityTerminationSpec extends AkkaSpec(EntityTerminationSpec.config) with
extractShardId)
sharding ! EntityEnvelope("1", "ping")
expectMsg("pong")
expectMsg("pong-1")
val entity = lastSender
watch(entity)
sharding ! EntityEnvelope("2", "ping")
expectMsg("pong-1")
watch(entity)
sharding ! EntityEnvelope("1", "stop")
expectTerminated(entity)
@ -90,7 +97,11 @@ class EntityTerminationSpec extends AkkaSpec(EntityTerminationSpec.config) with
sharding ! ShardRegion.GetShardRegionState
val regionState = expectMsgType[ShardRegion.CurrentShardRegionState]
regionState.shards should have size (1)
regionState.shards.head.entityIds should be(empty)
regionState.shards.head.entityIds should be(Set("2"))
// make sure the shard didn't crash (coverage for regression bug #29383)
sharding ! EntityEnvelope("2", "ping")
expectMsg("pong-2") // if it lost state we know it restarted
}
"automatically restart a terminating entity (not passivating) if remembering entities" in {
@ -102,7 +113,7 @@ class EntityTerminationSpec extends AkkaSpec(EntityTerminationSpec.config) with
extractShardId)
sharding ! EntityEnvelope("1", "ping")
expectMsg("pong")
expectMsg("pong-1")
val entity = lastSender
watch(entity)
@ -127,7 +138,7 @@ class EntityTerminationSpec extends AkkaSpec(EntityTerminationSpec.config) with
extractShardId)
sharding ! EntityEnvelope("1", "ping")
expectMsg("pong")
expectMsg("pong-1")
val entity = lastSender
watch(entity)

View file

@ -18,6 +18,7 @@ object GetShardTypeNamesSpec {
akka.actor.provider = "cluster"
akka.remote.classic.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
"""
val extractEntityId: ShardRegion.ExtractEntityId = {

View file

@ -24,6 +24,7 @@ object InactiveEntityPassivationSpec {
akka.remote.classic.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
""")
val enabledConfig = ConfigFactory.parseString("""

View file

@ -37,6 +37,7 @@ object PersistentShardingMigrationSpec {
snapshot-after = 5
verbose-debug-logging = on
fail-on-invalid-entity-state-transition = on
# Lots of sharding setup, make it quicker
retry-interval = 500ms

View file

@ -47,6 +47,7 @@ object PersistentStartEntitySpec {
akka.remote.classic.netty.tcp.port = 0
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
""".stripMargin)
}

View file

@ -21,6 +21,7 @@ object ProxyShardingSpec {
akka.remote.classic.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
"""
}

View file

@ -50,6 +50,7 @@ object RememberEntitiesBatchedUpdatesSpec {
# no leaks between test runs thank you
akka.cluster.sharding.distributed-data.durable.keys = []
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
""".stripMargin)
}
class RememberEntitiesBatchedUpdatesSpec

View file

@ -41,6 +41,7 @@ object RememberEntitiesFailureSpec {
akka.cluster.sharding.coordinator-failure-backoff = 1s
akka.cluster.sharding.updating-state-timeout = 1s
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
""")
class EntityActor extends Actor with ActorLogging {

View file

@ -34,7 +34,7 @@ object RememberEntitiesShardIdExtractorChangeSpec {
remember-entities-store = "eventsourced"
state-store-mode = "ddata"
}
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/RememberEntitiesShardIdExtractorChangeSpec-${UUID

View file

@ -38,6 +38,7 @@ object ShardRegionSpec {
akka.cluster.downing-provider-class = akka.cluster.testkit.AutoDowning
akka.cluster.jmx.enabled = off
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
"""))
val shardTypeName = "Caat"

View file

@ -37,6 +37,7 @@ object ShardWithLeaseSpec {
lease-operation-timeout = 3s
}
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
"""
class EntityActor extends Actor with ActorLogging {

View file

@ -34,6 +34,7 @@ object StartEntitySpec {
# no leaks between test runs thank you
akka.cluster.sharding.distributed-data.durable.keys = []
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
""".stripMargin)
object EntityActor {

View file

@ -24,6 +24,7 @@ object SupervisionSpec {
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.loglevel = DEBUG
akka.cluster.sharding.verbose-debug-logging = on
akka.cluster.sharding.fail-on-invalid-entity-state-transition = on
""")
case class Msg(id: Long, msg: Any)
@ -59,6 +60,7 @@ object SupervisionSpec {
case "hello" =>
sender() ! Response(self)
case StopMessage =>
// note that we never see this because we stop early
log.info("Received stop from region")
context.parent ! PoisonPill
}
@ -71,7 +73,7 @@ class DeprecatedSupervisionSpec extends AkkaSpec(SupervisionSpec.config) with Im
"Supervision for a sharded actor (deprecated)" must {
"allow passivation" in {
"allow passivation and early stop" in {
val supervisedProps =
BackoffOpts
@ -112,7 +114,7 @@ class SupervisionSpec extends AkkaSpec(SupervisionSpec.config) with ImplicitSend
"Supervision for a sharded actor" must {
"allow passivation" in {
"allow passivation and early stop" in {
val supervisedProps = BackoffSupervisor.props(
BackoffOpts