Handle some corner case states when receiving StartEntity (#29176)

This commit is contained in:
Johan Andrén 2020-06-04 10:52:53 +02:00 committed by GitHub
parent 1254595c7d
commit 078d7bd2fb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 199 additions and 10 deletions

View file

@ -738,21 +738,28 @@ private[akka] class Shard(
ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId))
case _: RememberingStart => case _: RememberingStart =>
entities.rememberingStart(entityId, ackTo) entities.rememberingStart(entityId, ackTo)
case RememberedButNotCreated => case state @ (RememberedButNotCreated | WaitingForRestart) =>
// already remembered, just start it - this will be the normal path for initially remembered entities // already remembered or waiting for backoff to restart, just start it -
log.debug("Request to start (already remembered) entity [{}]", entityId) // this is the normal path for initially remembered entities getting started
log.debug("Request to start entity [{}] (in state [{}])", entityId, state)
getOrCreateEntity(entityId) getOrCreateEntity(entityId)
touchLastMessageTimestamp(entityId) touchLastMessageTimestamp(entityId)
ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId))
case Passivating(_) =>
// since StartEntity is handled in deliverMsg we can buffer a StartEntity to handle when
// passivation completes (triggering an immediate restart)
messageBuffers.append(entityId, ShardRegion.StartEntity(entityId), ackTo.getOrElse(ActorRef.noSender))
case RememberingStop =>
// Optimally: if stop is already write in progress, we want to stash, if it is batched for later write we'd want to cancel
// but for now
stash()
case NoState => case NoState =>
// started manually from the outside, or the shard id extractor was changed since the entity was remembered // started manually from the outside, or the shard id extractor was changed since the entity was remembered
// we need to store that it was started // we need to store that it was started
log.debug("Request to start entity [{}] and ack to [{}]", entityId, ackTo) log.debug("Request to start entity [{}] and ack to [{}]", entityId, ackTo)
entities.rememberingStart(entityId, ackTo) entities.rememberingStart(entityId, ackTo)
rememberUpdate(add = Set(entityId)) rememberUpdate(add = Set(entityId))
case other =>
// FIXME what do we do here?
throw new IllegalStateException(s"Unhandled state when wanting to start $entityId: $other")
} }
} }
@ -858,8 +865,7 @@ private[akka] class Shard(
if (entities.isPassivating(id)) { if (entities.isPassivating(id)) {
log.debug("Passivation already in progress for [{}]. Not sending stopMessage back to entity", id) log.debug("Passivation already in progress for [{}]. Not sending stopMessage back to entity", id)
} else if (messageBuffers.getOrEmpty(id).nonEmpty) { } else if (messageBuffers.getOrEmpty(id).nonEmpty) {
log.debug("Passivation when there are buffered messages for [{}], ignoring", id) log.debug("Passivation when there are buffered messages for [{}], ignoring passivation", id)
// FIXME should we buffer the stop message then?
} else { } else {
if (verboseDebug) if (verboseDebug)
log.debug("Passivation started for [{}]", id) log.debug("Passivation started for [{}]", id)
@ -924,7 +930,7 @@ private[akka] class Shard(
} else { } else {
if (verboseDebug) if (verboseDebug)
log.debug("StartEntity({}) from [{}], starting", start.entityId, snd) log.debug("StartEntity({}) from [{}], starting", start.entityId, snd)
startEntity(start.entityId, Some(sender())) startEntity(start.entityId, Some(snd))
} }
case _ => case _ =>
entities.entityState(entityId) match { entities.entityState(entityId) match {
@ -1010,6 +1016,7 @@ private[akka] class Shard(
// Now there is no deliveryBuffer we can try to redeliver // Now there is no deliveryBuffer we can try to redeliver
// and as the child exists, the message will be directly forwarded // and as the child exists, the message will be directly forwarded
messages.foreach { messages.foreach {
case (ShardRegion.StartEntity(entityId), snd) => startEntity(entityId, Some(snd))
case (msg, snd) => deliverMessage(msg, snd) case (msg, snd) => deliverMessage(msg, snd)
} }
touchLastMessageTimestamp(entityId) touchLastMessageTimestamp(entityId)

View file

@ -0,0 +1,182 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.WithLogCapturing
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
/**
* Covers some corner cases around sending triggering an entity with StartEntity
*/
object StartEntitySpec {
final case class EntityEnvelope(id: String, msg: Any)
def config = ConfigFactory.parseString("""
akka.loglevel=DEBUG
akka.loggers = ["akka.testkit.SilenceAllTestEventListener"]
akka.actor.provider = cluster
akka.remote.artery.canonical.port = 0
akka.remote.classic.netty.tcp.port = 0
akka.cluster.sharding.state-store-mode = ddata
akka.cluster.sharding.remember-entities = on
# no leaks between test runs thank you
akka.cluster.sharding.distributed-data.durable.keys = []
akka.cluster.sharding.verbose-debug-logging = on
""".stripMargin)
object EntityActor {
def props(): Props = Props(new EntityActor)
}
class EntityActor extends Actor {
private var waitingForPassivateAck: Option[ActorRef] = None
override def receive: Receive = {
case "ping" =>
sender() ! "pong"
case "passivate" =>
context.parent ! ShardRegion.Passivate("complete-passivation")
waitingForPassivateAck = Some(sender())
case "simulate-slow-passivate" =>
context.parent ! ShardRegion.Passivate("slow-passivate-stop")
waitingForPassivateAck = Some(sender())
case "slow-passivate-stop" =>
// actually, we just don't stop, keeping the passivation state forever for this test
waitingForPassivateAck.foreach(_ ! "slow-passivate-ack")
waitingForPassivateAck = None
case "complete-passivation" | "just-stop" =>
context.stop(self)
}
}
}
class StartEntitySpec extends AkkaSpec(StartEntitySpec.config) with ImplicitSender with WithLogCapturing {
import StartEntitySpec._
val extractEntityId: ShardRegion.ExtractEntityId = {
case EntityEnvelope(id, payload) => (id.toString, payload)
}
val extractShardId: ShardRegion.ExtractShardId = {
case EntityEnvelope(_, _) => "1" // single shard for all entities
case ShardRegion.StartEntity(_) => "1"
}
override def atStartup(): Unit = {
// Form a one node cluster
val cluster = Cluster(system)
cluster.join(cluster.selfAddress)
awaitAssert(cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1))
}
"StartEntity while entity is passivating" should {
"start it again when the entity terminates" in {
val sharding = ClusterSharding(system).start(
"start-entity-1",
EntityActor.props(),
ClusterShardingSettings(system),
extractEntityId,
extractShardId)
sharding ! EntityEnvelope("1", "ping")
expectMsg("pong")
val entity = lastSender
sharding ! EntityEnvelope("1", "simulate-slow-passivate")
expectMsg("slow-passivate-ack")
// entity is now in passivating state in shard
// bypass region and send start entity directly to shard
system.actorSelection(entity.path.parent) ! ShardRegion.StartEntity("1")
// bypass sharding and tell entity to complete passivation
entity ! "complete-passivation"
// should trigger start of entity again, and an ack
expectMsg(ShardRegion.StartEntityAck("1", "1"))
awaitAssert({
sharding ! ShardRegion.GetShardRegionState
val state = expectMsgType[ShardRegion.CurrentShardRegionState]
state.shards should have size (1)
state.shards.head.entityIds should ===(Set("1"))
})
}
}
// entity crashed and before restart-backoff hit we sent it a StartEntity
"StartEntity while the entity is waiting for restart" should {
"restart it immediately" in {
val sharding = ClusterSharding(system).start(
"start-entity-2",
EntityActor.props(),
ClusterShardingSettings(system),
extractEntityId,
extractShardId)
sharding ! EntityEnvelope("1", "ping")
expectMsg("pong")
val entity = lastSender
watch(entity)
// stop without passivation
entity ! "just-stop"
expectTerminated(entity)
// the backoff is 10s by default, so plenty time to
// bypass region and send start entity directly to shard
system.actorSelection(entity.path.parent) ! ShardRegion.StartEntity("1")
expectMsg(ShardRegion.StartEntityAck("1", "1"))
awaitAssert({
sharding ! ShardRegion.GetShardRegionState
val state = expectMsgType[ShardRegion.CurrentShardRegionState]
state.shards should have size (1)
state.shards.head.entityIds should ===(Set("1"))
})
}
}
"StartEntity while the entity is queued remember stop" should {
"start it again when that is done" in {
// this is hard to do deterministically
val sharding = ClusterSharding(system).start(
"start-entity-3",
EntityActor.props(),
ClusterShardingSettings(system),
extractEntityId,
extractShardId)
sharding ! EntityEnvelope("1", "ping")
expectMsg("pong")
val entity = lastSender
watch(entity)
// resolve before passivation to save some time
val shard = system.actorSelection(entity.path.parent).resolveOne(3.seconds).futureValue
// stop passivation
entity ! "passivate"
// store of stop happens after passivation when entity has terminated
expectTerminated(entity)
shard ! ShardRegion.StartEntity("1") // if we are lucky this happens while remember stop is in progress
// regardless we should get an ack and the entity should be alive
expectMsg(ShardRegion.StartEntityAck("1", "1"))
awaitAssert({
sharding ! ShardRegion.GetShardRegionState
val state = expectMsgType[ShardRegion.CurrentShardRegionState]
state.shards should have size (1)
state.shards.head.entityIds should ===(Set("1"))
})
}
}
}