diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 7912dcecd9..864365328c 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -738,21 +738,28 @@ private[akka] class Shard( ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) case _: RememberingStart => entities.rememberingStart(entityId, ackTo) - case RememberedButNotCreated => - // already remembered, just start it - this will be the normal path for initially remembered entities - log.debug("Request to start (already remembered) entity [{}]", entityId) + case state @ (RememberedButNotCreated | WaitingForRestart) => + // already remembered or waiting for backoff to restart, just start it - + // this is the normal path for initially remembered entities getting started + log.debug("Request to start entity [{}] (in state [{}])", entityId, state) getOrCreateEntity(entityId) touchLastMessageTimestamp(entityId) ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) + case Passivating(_) => + // since StartEntity is handled in deliverMsg we can buffer a StartEntity to handle when + // passivation completes (triggering an immediate restart) + messageBuffers.append(entityId, ShardRegion.StartEntity(entityId), ackTo.getOrElse(ActorRef.noSender)) + + case RememberingStop => + // Optimally: if stop is already write in progress, we want to stash, if it is batched for later write we'd want to cancel + // but for now + stash() case NoState => // started manually from the outside, or the shard id extractor was changed since the entity was remembered // we need to store that it was started log.debug("Request to start entity [{}] and ack to [{}]", entityId, ackTo) entities.rememberingStart(entityId, ackTo) rememberUpdate(add = Set(entityId)) - case other => - // FIXME what do we do here? - throw new IllegalStateException(s"Unhandled state when wanting to start $entityId: $other") } } @@ -858,8 +865,7 @@ private[akka] class Shard( if (entities.isPassivating(id)) { log.debug("Passivation already in progress for [{}]. Not sending stopMessage back to entity", id) } else if (messageBuffers.getOrEmpty(id).nonEmpty) { - log.debug("Passivation when there are buffered messages for [{}], ignoring", id) - // FIXME should we buffer the stop message then? + log.debug("Passivation when there are buffered messages for [{}], ignoring passivation", id) } else { if (verboseDebug) log.debug("Passivation started for [{}]", id) @@ -924,7 +930,7 @@ private[akka] class Shard( } else { if (verboseDebug) log.debug("StartEntity({}) from [{}], starting", start.entityId, snd) - startEntity(start.entityId, Some(sender())) + startEntity(start.entityId, Some(snd)) } case _ => entities.entityState(entityId) match { @@ -1010,7 +1016,8 @@ private[akka] class Shard( // Now there is no deliveryBuffer we can try to redeliver // and as the child exists, the message will be directly forwarded messages.foreach { - case (msg, snd) => deliverMessage(msg, snd) + case (ShardRegion.StartEntity(entityId), snd) => startEntity(entityId, Some(snd)) + case (msg, snd) => deliverMessage(msg, snd) } touchLastMessageTimestamp(entityId) } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/StartEntitySpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/StartEntitySpec.scala new file mode 100644 index 0000000000..cb7ecc61a9 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/StartEntitySpec.scala @@ -0,0 +1,182 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.cluster.sharding + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.testkit.WithLogCapturing +import com.typesafe.config.ConfigFactory + +import scala.concurrent.duration._ + +/** + * Covers some corner cases around sending triggering an entity with StartEntity + */ +object StartEntitySpec { + + final case class EntityEnvelope(id: String, msg: Any) + + def config = ConfigFactory.parseString(""" + akka.loglevel=DEBUG + akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] + akka.actor.provider = cluster + akka.remote.artery.canonical.port = 0 + akka.remote.classic.netty.tcp.port = 0 + akka.cluster.sharding.state-store-mode = ddata + akka.cluster.sharding.remember-entities = on + # no leaks between test runs thank you + akka.cluster.sharding.distributed-data.durable.keys = [] + akka.cluster.sharding.verbose-debug-logging = on + """.stripMargin) + + object EntityActor { + def props(): Props = Props(new EntityActor) + } + class EntityActor extends Actor { + private var waitingForPassivateAck: Option[ActorRef] = None + override def receive: Receive = { + case "ping" => + sender() ! "pong" + case "passivate" => + context.parent ! ShardRegion.Passivate("complete-passivation") + waitingForPassivateAck = Some(sender()) + case "simulate-slow-passivate" => + context.parent ! ShardRegion.Passivate("slow-passivate-stop") + waitingForPassivateAck = Some(sender()) + case "slow-passivate-stop" => + // actually, we just don't stop, keeping the passivation state forever for this test + waitingForPassivateAck.foreach(_ ! "slow-passivate-ack") + waitingForPassivateAck = None + case "complete-passivation" | "just-stop" => + context.stop(self) + } + } + +} + +class StartEntitySpec extends AkkaSpec(StartEntitySpec.config) with ImplicitSender with WithLogCapturing { + import StartEntitySpec._ + + val extractEntityId: ShardRegion.ExtractEntityId = { + case EntityEnvelope(id, payload) => (id.toString, payload) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case EntityEnvelope(_, _) => "1" // single shard for all entities + case ShardRegion.StartEntity(_) => "1" + } + + override def atStartup(): Unit = { + // Form a one node cluster + val cluster = Cluster(system) + cluster.join(cluster.selfAddress) + awaitAssert(cluster.readView.members.count(_.status == MemberStatus.Up) should ===(1)) + } + + "StartEntity while entity is passivating" should { + "start it again when the entity terminates" in { + val sharding = ClusterSharding(system).start( + "start-entity-1", + EntityActor.props(), + ClusterShardingSettings(system), + extractEntityId, + extractShardId) + + sharding ! EntityEnvelope("1", "ping") + expectMsg("pong") + val entity = lastSender + + sharding ! EntityEnvelope("1", "simulate-slow-passivate") + expectMsg("slow-passivate-ack") + + // entity is now in passivating state in shard + // bypass region and send start entity directly to shard + system.actorSelection(entity.path.parent) ! ShardRegion.StartEntity("1") + // bypass sharding and tell entity to complete passivation + entity ! "complete-passivation" + + // should trigger start of entity again, and an ack + expectMsg(ShardRegion.StartEntityAck("1", "1")) + awaitAssert({ + sharding ! ShardRegion.GetShardRegionState + val state = expectMsgType[ShardRegion.CurrentShardRegionState] + state.shards should have size (1) + state.shards.head.entityIds should ===(Set("1")) + }) + } + } + + // entity crashed and before restart-backoff hit we sent it a StartEntity + "StartEntity while the entity is waiting for restart" should { + "restart it immediately" in { + val sharding = ClusterSharding(system).start( + "start-entity-2", + EntityActor.props(), + ClusterShardingSettings(system), + extractEntityId, + extractShardId) + sharding ! EntityEnvelope("1", "ping") + expectMsg("pong") + val entity = lastSender + watch(entity) + + // stop without passivation + entity ! "just-stop" + expectTerminated(entity) + + // the backoff is 10s by default, so plenty time to + // bypass region and send start entity directly to shard + system.actorSelection(entity.path.parent) ! ShardRegion.StartEntity("1") + expectMsg(ShardRegion.StartEntityAck("1", "1")) + awaitAssert({ + sharding ! ShardRegion.GetShardRegionState + val state = expectMsgType[ShardRegion.CurrentShardRegionState] + state.shards should have size (1) + state.shards.head.entityIds should ===(Set("1")) + }) + } + } + + "StartEntity while the entity is queued remember stop" should { + "start it again when that is done" in { + // this is hard to do deterministically + val sharding = ClusterSharding(system).start( + "start-entity-3", + EntityActor.props(), + ClusterShardingSettings(system), + extractEntityId, + extractShardId) + sharding ! EntityEnvelope("1", "ping") + expectMsg("pong") + val entity = lastSender + watch(entity) + + // resolve before passivation to save some time + val shard = system.actorSelection(entity.path.parent).resolveOne(3.seconds).futureValue + + // stop passivation + entity ! "passivate" + // store of stop happens after passivation when entity has terminated + expectTerminated(entity) + shard ! ShardRegion.StartEntity("1") // if we are lucky this happens while remember stop is in progress + + // regardless we should get an ack and the entity should be alive + expectMsg(ShardRegion.StartEntityAck("1", "1")) + awaitAssert({ + sharding ! ShardRegion.GetShardRegionState + val state = expectMsgType[ShardRegion.CurrentShardRegionState] + state.shards should have size (1) + state.shards.head.entityIds should ===(Set("1")) + }) + + } + } + +}