diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index b5bb7ea8d4..b5341accd1 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -103,6 +103,8 @@ private[akka] object Shard { case object PassivateIdleTick extends NoSerializationVerificationNeeded + private final case class EntityTerminated(ref: ActorRef) + private final case class RememberedEntityIds(ids: Set[EntityId]) private final case class RememberEntityStoreCrashed(store: ActorRef) @@ -378,7 +380,6 @@ private[akka] object Shard { * * @see [[ClusterSharding$ ClusterSharding extension]] */ -// FIXME I broke bin comp here @InternalStableApi private[akka] class Shard( typeName: String, @@ -413,6 +414,8 @@ private[akka] class Shard( store } + private val rememberEntities: Boolean = rememberEntitiesProvider.isDefined + private val flightRecorder = ShardingFlightRecorder(context.system) private val entities = new Entities(log, settings.rememberEntities, verboseDebug) @@ -560,6 +563,7 @@ private[akka] class Shard( // when not remembering entities, we stay in this state all the time def idle: Receive = { case Terminated(ref) => receiveTerminated(ref) + case EntityTerminated(ref) => entityTerminated(ref) case msg: CoordinatorMessage => receiveCoordinatorMessage(msg) case msg: RememberEntityCommand => receiveRememberEntityCommand(msg) case msg: ShardRegion.StartEntity => startEntity(msg.entityId, Some(sender())) @@ -570,7 +574,6 @@ private[akka] class Shard( case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender()) } - def rememberUpdate(add: Set[EntityId] = Set.empty, remove: Set[EntityId] = Set.empty): Unit = { rememberEntitiesStore match { case None => @@ -628,6 +631,7 @@ private[akka] class Shard( s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender())) case Terminated(ref) => receiveTerminated(ref) + case EntityTerminated(ref) => entityTerminated(ref) case _: CoordinatorMessage => stash() case cmd: RememberEntityCommand => receiveRememberEntityCommand(cmd) case l: LeaseLost => receiveLeaseLost(l) @@ -798,7 +802,7 @@ private[akka] class Shard( if (activeEntities.nonEmpty) { val entityHandOffTimeout = (settings.tuningParameters.handOffTimeout - 5.seconds).max(1.seconds) log.debug("Starting HandOffStopper for shard [{}] to terminate [{}] entities.", shardId, activeEntities.size) - activeEntities.foreach(context.unwatch(_)) + activeEntities.foreach(context.unwatch) handOffStopper = Some( context.watch( context.actorOf( @@ -818,55 +822,53 @@ private[akka] class Shard( private def receiveTerminated(ref: ActorRef): Unit = { if (handOffStopper.contains(ref)) context.stop(self) - else { - // workaround for watchWith not working with stash #29101 - entities.entityId(ref) match { - case OptionVal.Some(id) => entityTerminated(id) - case _ => - } - } } @InternalStableApi - def entityTerminated(entityId: EntityId): Unit = { + def entityTerminated(ref: ActorRef): Unit = { import settings.tuningParameters._ - if (passivateIdleTask.isDefined) { - lastMessageTimestamp -= entityId - } - entities.entityState(entityId) match { - case RememberingStop => - if (verboseDebug) - log.debug("Stop of [{}] arrived, already is among the pending stops", entityId) - case Active(_) => - if (rememberEntitiesStore.isDefined) { - log.debug("Entity [{}] stopped without passivating, will restart after backoff", entityId) - entities.waitingForRestart(entityId) - val msg = RestartTerminatedEntity(entityId) - timers.startSingleTimer(msg, msg, entityRestartBackoff) - } else { - log.debug("Entity [{}] terminated", entityId) - entities.removeEntity(entityId) + entities.entityId(ref) match { + case OptionVal.Some(entityId) => + if (passivateIdleTask.isDefined) { + lastMessageTimestamp -= entityId } + entities.entityState(entityId) match { + case RememberingStop => + if (verboseDebug) + log.debug("Stop of [{}] arrived, already is among the pending stops", entityId) + case Active(_) => + if (rememberEntitiesStore.isDefined) { + log.debug("Entity [{}] stopped without passivating, will restart after backoff", entityId) + entities.waitingForRestart(entityId) + val msg = RestartTerminatedEntity(entityId) + timers.startSingleTimer(msg, msg, entityRestartBackoff) + } else { + log.debug("Entity [{}] terminated", entityId) + entities.removeEntity(entityId) + } - case Passivating(_) => - if (entities.pendingRememberedEntitiesExist()) { - // will go in next batch update - if (verboseDebug) - log.debug( - "Stop of [{}] after passivating, arrived while updating, adding it to batch of pending stops", - entityId) - entities.rememberingStop(entityId) - } else { - entities.rememberingStop(entityId) - rememberUpdate(remove = Set(entityId)) + case Passivating(_) => + if (entities.pendingRememberedEntitiesExist()) { + // will go in next batch update + if (verboseDebug) + log.debug( + "Stop of [{}] after passivating, arrived while updating, adding it to batch of pending stops", + entityId) + entities.rememberingStop(entityId) + } else { + entities.rememberingStop(entityId) + rememberUpdate(remove = Set(entityId)) + } + case unexpected => + val ref = entities.entity(entityId) + log.warning( + "Got a terminated for [{}], entityId [{}] which is in unexpected state [{}]", + ref, + entityId, + unexpected) } - case unexpected => - val ref = entities.entity(entityId) - log.warning( - "Got a terminated for [{}], entityId [{}] which is in unexpected state [{}]", - ref, - entityId, - unexpected) + case OptionVal.None => + log.warning("Unexpected entity terminated: {}", ref) } } @@ -963,25 +965,30 @@ private[akka] class Shard( touchLastMessageTimestamp(entityId) actor.tell(payload, snd) case NoState => - if (entities.pendingRememberedEntitiesExist()) { - // No actor running and write in progress for some other entity id (can only happen with remember entities enabled) - if (verboseDebug) - log.debug( - "Buffer message [{}] to [{}] (which is not started) because of write in progress for [{}]", - payload.getClass, - entityId, - entities.pendingRememberEntities()) - appendToMessageBuffer(entityId, msg, snd) - entities.rememberingStart(entityId, ackTo = None) + if (!rememberEntities) { + // don't buffer if remember entities not enabled + getOrCreateEntity(entityId).tell(payload, snd) + touchLastMessageTimestamp(entityId) } else { - // No actor running and no write in progress, start actor and deliver message when started - if (verboseDebug) - log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, entityId) - appendToMessageBuffer(entityId, msg, snd) - entities.rememberingStart(entityId, ackTo = None) - rememberUpdate(add = Set(entityId)) + if (entities.pendingRememberedEntitiesExist()) { + // No actor running and write in progress for some other entity id (can only happen with remember entities enabled) + if (verboseDebug) + log.debug( + "Buffer message [{}] to [{}] (which is not started) because of write in progress for [{}]", + payload.getClass, + entityId, + entities.pendingRememberEntities()) + appendToMessageBuffer(entityId, msg, snd) + entities.rememberingStart(entityId, ackTo = None) + } else { + // No actor running and no write in progress, start actor and deliver message when started + if (verboseDebug) + log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, entityId) + appendToMessageBuffer(entityId, msg, snd) + entities.rememberingStart(entityId, ackTo = None) + rememberUpdate(add = Set(entityId)) + } } - } } } @@ -994,14 +1001,22 @@ private[akka] class Shard( case OptionVal.None => val name = URLEncoder.encode(id, "utf-8") val a = context.actorOf(entityProps(id), name) - context.watch(a) + context.watchWith(a, EntityTerminated(a)) log.debug("Started entity [{}] with entity id [{}] in shard [{}]", a, id, shardId) entities.addEntity(id, a) touchLastMessageTimestamp(id) + entityCreated(id) a } } + /** + * Called when an entity has been created. Returning the number + * of active entities. + */ + @InternalStableApi + def entityCreated(@unused id: EntityId): Int = entities.activeEntities().size + // ===== buffering while busy saving a start or stop when remembering entities ===== def appendToMessageBuffer(id: EntityId, msg: Any, snd: ActorRef): Unit = { if (messageBuffers.totalSize >= settings.tuningParameters.bufferSize) { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index eefea65243..2e45332175 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -11,7 +11,7 @@ import scala.util.Success import com.github.ghik.silencer.silent import akka.actor._ import akka.actor.DeadLetterSuppression -import akka.annotation.InternalApi +import akka.annotation.{ InternalApi, InternalStableApi } import akka.cluster.Cluster import akka.cluster.ClusterEvent import akka.cluster.ClusterEvent._ @@ -56,6 +56,7 @@ object ShardCoordinator { * INTERNAL API * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor with state based on ddata. */ + @InternalStableApi private[akka] def props( typeName: String, settings: ClusterShardingSettings, diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index c75021992e..66507dad6a 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -13,10 +13,9 @@ import scala.concurrent.duration._ import scala.reflect.ClassTag import scala.runtime.AbstractFunction1 import scala.util.{ Failure, Success } - import akka.Done import akka.actor._ -import akka.annotation.InternalApi +import akka.annotation.{ InternalApi, InternalStableApi } import akka.cluster.Cluster import akka.cluster.ClusterEvent._ import akka.cluster.ClusterSettings @@ -516,6 +515,7 @@ object ShardRegion { * * @see [[ClusterSharding$ ClusterSharding extension]] */ +@InternalStableApi private[akka] class ShardRegion( typeName: String, entityProps: Option[String => Props],