Use watchWith for entity terminated (#29230)

Also, internal API changes for Cinnamon.
This commit is contained in:
Christopher Batey 2020-06-16 08:13:52 +01:00 committed by GitHub
parent 988ead1ef6
commit 7f48be9ef8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 82 additions and 66 deletions

View file

@ -103,6 +103,8 @@ private[akka] object Shard {
case object PassivateIdleTick extends NoSerializationVerificationNeeded case object PassivateIdleTick extends NoSerializationVerificationNeeded
private final case class EntityTerminated(ref: ActorRef)
private final case class RememberedEntityIds(ids: Set[EntityId]) private final case class RememberedEntityIds(ids: Set[EntityId])
private final case class RememberEntityStoreCrashed(store: ActorRef) private final case class RememberEntityStoreCrashed(store: ActorRef)
@ -378,7 +380,6 @@ private[akka] object Shard {
* *
* @see [[ClusterSharding$ ClusterSharding extension]] * @see [[ClusterSharding$ ClusterSharding extension]]
*/ */
// FIXME I broke bin comp here
@InternalStableApi @InternalStableApi
private[akka] class Shard( private[akka] class Shard(
typeName: String, typeName: String,
@ -413,6 +414,8 @@ private[akka] class Shard(
store store
} }
private val rememberEntities: Boolean = rememberEntitiesProvider.isDefined
private val flightRecorder = ShardingFlightRecorder(context.system) private val flightRecorder = ShardingFlightRecorder(context.system)
private val entities = new Entities(log, settings.rememberEntities, verboseDebug) private val entities = new Entities(log, settings.rememberEntities, verboseDebug)
@ -560,6 +563,7 @@ private[akka] class Shard(
// when not remembering entities, we stay in this state all the time // when not remembering entities, we stay in this state all the time
def idle: Receive = { def idle: Receive = {
case Terminated(ref) => receiveTerminated(ref) case Terminated(ref) => receiveTerminated(ref)
case EntityTerminated(ref) => entityTerminated(ref)
case msg: CoordinatorMessage => receiveCoordinatorMessage(msg) case msg: CoordinatorMessage => receiveCoordinatorMessage(msg)
case msg: RememberEntityCommand => receiveRememberEntityCommand(msg) case msg: RememberEntityCommand => receiveRememberEntityCommand(msg)
case msg: ShardRegion.StartEntity => startEntity(msg.entityId, Some(sender())) case msg: ShardRegion.StartEntity => startEntity(msg.entityId, Some(sender()))
@ -570,7 +574,6 @@ private[akka] class Shard(
case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg) case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg)
case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender()) case msg if extractEntityId.isDefinedAt(msg) => deliverMessage(msg, sender())
} }
def rememberUpdate(add: Set[EntityId] = Set.empty, remove: Set[EntityId] = Set.empty): Unit = { def rememberUpdate(add: Set[EntityId] = Set.empty, remove: Set[EntityId] = Set.empty): Unit = {
rememberEntitiesStore match { rememberEntitiesStore match {
case None => case None =>
@ -628,6 +631,7 @@ private[akka] class Shard(
s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}") s"Async write timed out after ${settings.tuningParameters.updatingStateTimeout.pretty}")
case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender())) case ShardRegion.StartEntity(entityId) => startEntity(entityId, Some(sender()))
case Terminated(ref) => receiveTerminated(ref) case Terminated(ref) => receiveTerminated(ref)
case EntityTerminated(ref) => entityTerminated(ref)
case _: CoordinatorMessage => stash() case _: CoordinatorMessage => stash()
case cmd: RememberEntityCommand => receiveRememberEntityCommand(cmd) case cmd: RememberEntityCommand => receiveRememberEntityCommand(cmd)
case l: LeaseLost => receiveLeaseLost(l) case l: LeaseLost => receiveLeaseLost(l)
@ -798,7 +802,7 @@ private[akka] class Shard(
if (activeEntities.nonEmpty) { if (activeEntities.nonEmpty) {
val entityHandOffTimeout = (settings.tuningParameters.handOffTimeout - 5.seconds).max(1.seconds) val entityHandOffTimeout = (settings.tuningParameters.handOffTimeout - 5.seconds).max(1.seconds)
log.debug("Starting HandOffStopper for shard [{}] to terminate [{}] entities.", shardId, activeEntities.size) log.debug("Starting HandOffStopper for shard [{}] to terminate [{}] entities.", shardId, activeEntities.size)
activeEntities.foreach(context.unwatch(_)) activeEntities.foreach(context.unwatch)
handOffStopper = Some( handOffStopper = Some(
context.watch( context.watch(
context.actorOf( context.actorOf(
@ -818,55 +822,53 @@ private[akka] class Shard(
private def receiveTerminated(ref: ActorRef): Unit = { private def receiveTerminated(ref: ActorRef): Unit = {
if (handOffStopper.contains(ref)) if (handOffStopper.contains(ref))
context.stop(self) context.stop(self)
else {
// workaround for watchWith not working with stash #29101
entities.entityId(ref) match {
case OptionVal.Some(id) => entityTerminated(id)
case _ =>
}
}
} }
@InternalStableApi @InternalStableApi
def entityTerminated(entityId: EntityId): Unit = { def entityTerminated(ref: ActorRef): Unit = {
import settings.tuningParameters._ import settings.tuningParameters._
if (passivateIdleTask.isDefined) { entities.entityId(ref) match {
lastMessageTimestamp -= entityId case OptionVal.Some(entityId) =>
} if (passivateIdleTask.isDefined) {
entities.entityState(entityId) match { lastMessageTimestamp -= entityId
case RememberingStop =>
if (verboseDebug)
log.debug("Stop of [{}] arrived, already is among the pending stops", entityId)
case Active(_) =>
if (rememberEntitiesStore.isDefined) {
log.debug("Entity [{}] stopped without passivating, will restart after backoff", entityId)
entities.waitingForRestart(entityId)
val msg = RestartTerminatedEntity(entityId)
timers.startSingleTimer(msg, msg, entityRestartBackoff)
} else {
log.debug("Entity [{}] terminated", entityId)
entities.removeEntity(entityId)
} }
entities.entityState(entityId) match {
case RememberingStop =>
if (verboseDebug)
log.debug("Stop of [{}] arrived, already is among the pending stops", entityId)
case Active(_) =>
if (rememberEntitiesStore.isDefined) {
log.debug("Entity [{}] stopped without passivating, will restart after backoff", entityId)
entities.waitingForRestart(entityId)
val msg = RestartTerminatedEntity(entityId)
timers.startSingleTimer(msg, msg, entityRestartBackoff)
} else {
log.debug("Entity [{}] terminated", entityId)
entities.removeEntity(entityId)
}
case Passivating(_) => case Passivating(_) =>
if (entities.pendingRememberedEntitiesExist()) { if (entities.pendingRememberedEntitiesExist()) {
// will go in next batch update // will go in next batch update
if (verboseDebug) if (verboseDebug)
log.debug( log.debug(
"Stop of [{}] after passivating, arrived while updating, adding it to batch of pending stops", "Stop of [{}] after passivating, arrived while updating, adding it to batch of pending stops",
entityId) entityId)
entities.rememberingStop(entityId) entities.rememberingStop(entityId)
} else { } else {
entities.rememberingStop(entityId) entities.rememberingStop(entityId)
rememberUpdate(remove = Set(entityId)) rememberUpdate(remove = Set(entityId))
}
case unexpected =>
val ref = entities.entity(entityId)
log.warning(
"Got a terminated for [{}], entityId [{}] which is in unexpected state [{}]",
ref,
entityId,
unexpected)
} }
case unexpected => case OptionVal.None =>
val ref = entities.entity(entityId) log.warning("Unexpected entity terminated: {}", ref)
log.warning(
"Got a terminated for [{}], entityId [{}] which is in unexpected state [{}]",
ref,
entityId,
unexpected)
} }
} }
@ -963,25 +965,30 @@ private[akka] class Shard(
touchLastMessageTimestamp(entityId) touchLastMessageTimestamp(entityId)
actor.tell(payload, snd) actor.tell(payload, snd)
case NoState => case NoState =>
if (entities.pendingRememberedEntitiesExist()) { if (!rememberEntities) {
// No actor running and write in progress for some other entity id (can only happen with remember entities enabled) // don't buffer if remember entities not enabled
if (verboseDebug) getOrCreateEntity(entityId).tell(payload, snd)
log.debug( touchLastMessageTimestamp(entityId)
"Buffer message [{}] to [{}] (which is not started) because of write in progress for [{}]",
payload.getClass,
entityId,
entities.pendingRememberEntities())
appendToMessageBuffer(entityId, msg, snd)
entities.rememberingStart(entityId, ackTo = None)
} else { } else {
// No actor running and no write in progress, start actor and deliver message when started if (entities.pendingRememberedEntitiesExist()) {
if (verboseDebug) // No actor running and write in progress for some other entity id (can only happen with remember entities enabled)
log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, entityId) if (verboseDebug)
appendToMessageBuffer(entityId, msg, snd) log.debug(
entities.rememberingStart(entityId, ackTo = None) "Buffer message [{}] to [{}] (which is not started) because of write in progress for [{}]",
rememberUpdate(add = Set(entityId)) payload.getClass,
entityId,
entities.pendingRememberEntities())
appendToMessageBuffer(entityId, msg, snd)
entities.rememberingStart(entityId, ackTo = None)
} else {
// No actor running and no write in progress, start actor and deliver message when started
if (verboseDebug)
log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, entityId)
appendToMessageBuffer(entityId, msg, snd)
entities.rememberingStart(entityId, ackTo = None)
rememberUpdate(add = Set(entityId))
}
} }
} }
} }
} }
@ -994,14 +1001,22 @@ private[akka] class Shard(
case OptionVal.None => case OptionVal.None =>
val name = URLEncoder.encode(id, "utf-8") val name = URLEncoder.encode(id, "utf-8")
val a = context.actorOf(entityProps(id), name) val a = context.actorOf(entityProps(id), name)
context.watch(a) context.watchWith(a, EntityTerminated(a))
log.debug("Started entity [{}] with entity id [{}] in shard [{}]", a, id, shardId) log.debug("Started entity [{}] with entity id [{}] in shard [{}]", a, id, shardId)
entities.addEntity(id, a) entities.addEntity(id, a)
touchLastMessageTimestamp(id) touchLastMessageTimestamp(id)
entityCreated(id)
a a
} }
} }
/**
* Called when an entity has been created. Returning the number
* of active entities.
*/
@InternalStableApi
def entityCreated(@unused id: EntityId): Int = entities.activeEntities().size
// ===== buffering while busy saving a start or stop when remembering entities ===== // ===== buffering while busy saving a start or stop when remembering entities =====
def appendToMessageBuffer(id: EntityId, msg: Any, snd: ActorRef): Unit = { def appendToMessageBuffer(id: EntityId, msg: Any, snd: ActorRef): Unit = {
if (messageBuffers.totalSize >= settings.tuningParameters.bufferSize) { if (messageBuffers.totalSize >= settings.tuningParameters.bufferSize) {

View file

@ -11,7 +11,7 @@ import scala.util.Success
import com.github.ghik.silencer.silent import com.github.ghik.silencer.silent
import akka.actor._ import akka.actor._
import akka.actor.DeadLetterSuppression import akka.actor.DeadLetterSuppression
import akka.annotation.InternalApi import akka.annotation.{ InternalApi, InternalStableApi }
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.ClusterEvent import akka.cluster.ClusterEvent
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
@ -56,6 +56,7 @@ object ShardCoordinator {
* INTERNAL API * INTERNAL API
* Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor with state based on ddata. * Factory method for the [[akka.actor.Props]] of the [[ShardCoordinator]] actor with state based on ddata.
*/ */
@InternalStableApi
private[akka] def props( private[akka] def props(
typeName: String, typeName: String,
settings: ClusterShardingSettings, settings: ClusterShardingSettings,

View file

@ -13,10 +13,9 @@ import scala.concurrent.duration._
import scala.reflect.ClassTag import scala.reflect.ClassTag
import scala.runtime.AbstractFunction1 import scala.runtime.AbstractFunction1
import scala.util.{ Failure, Success } import scala.util.{ Failure, Success }
import akka.Done import akka.Done
import akka.actor._ import akka.actor._
import akka.annotation.InternalApi import akka.annotation.{ InternalApi, InternalStableApi }
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.ClusterEvent._ import akka.cluster.ClusterEvent._
import akka.cluster.ClusterSettings import akka.cluster.ClusterSettings
@ -516,6 +515,7 @@ object ShardRegion {
* *
* @see [[ClusterSharding$ ClusterSharding extension]] * @see [[ClusterSharding$ ClusterSharding extension]]
*/ */
@InternalStableApi
private[akka] class ShardRegion( private[akka] class ShardRegion(
typeName: String, typeName: String,
entityProps: Option[String => Props], entityProps: Option[String => Props],