Accumulate and ack all requests to start entity (#29165)

This commit is contained in:
Johan Andrén 2020-06-02 16:30:16 +02:00 committed by GitHub
parent 5643f7e194
commit aa50c63965
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -182,24 +182,34 @@ private[akka] object Shard {
} }
} }
object RememberingStart {
def apply(ackTo: Option[ActorRef]): RememberingStart =
ackTo match {
case None => RememberingStartNoAck
case Some(ackTo) => RememberingStart(Set(ackTo))
}
}
/** /**
* When remember entities is enabled an entity is in this state while * When remember entities is enabled an entity is in this state while
* its existence is being recorded in the remember entities store, or while the stop is queued up * its existence is being recorded in the remember entities store, or while the stop is queued up
* to be stored in the next batch. * to be stored in the next batch.
*/ */
final case class RememberingStart(ackTo: Option[ActorRef]) extends EntityState { final case class RememberingStart(ackTo: Set[ActorRef]) extends EntityState {
override def transition(newState: EntityState): EntityState = newState match { override def transition(newState: EntityState): EntityState = newState match {
case active: Active => active case active: Active => active
case r: RememberingStart => case r: RememberingStart =>
ackTo match { if (ackTo.isEmpty) {
case None => r if (r.ackTo.isEmpty) RememberingStartNoAck
case Some(_) => else newState
r // FIXME is this really fine, replace ackTo with another one or None } else {
if (r.ackTo.isEmpty) this
else RememberingStart(ackTo ++ r.ackTo)
} }
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
} }
} }
private val RememberingStartNoAck = RememberingStart(None) private val RememberingStartNoAck = new RememberingStart(Set.empty)
sealed trait StopReason sealed trait StopReason
final case object PassivationComplete extends StopReason final case object PassivationComplete extends StopReason
@ -256,11 +266,7 @@ private[akka] object Shard {
} }
} }
def rememberingStart(entityId: EntityId, ackTo: Option[ActorRef]): Unit = { def rememberingStart(entityId: EntityId, ackTo: Option[ActorRef]): Unit = {
// FIXME we queue these without bounds, is that ok? val newState = RememberingStart(ackTo)
val newState: RememberingStart = ackTo match {
case None => RememberingStartNoAck // small optimization avoiding alloc for regular start
case some => RememberingStart(some)
}
val state = entityState(entityId).transition(newState) val state = entityState(entityId).transition(newState)
entities.put(entityId, state) entities.put(entityId, state)
if (rememberingEntities) if (rememberingEntities)
@ -673,8 +679,8 @@ private[akka] class Shard(
getOrCreateEntity(entityId) getOrCreateEntity(entityId)
sendMsgBuffer(entityId) sendMsgBuffer(entityId)
stateBeforeStart match { stateBeforeStart match {
case RememberingStart(Some(ackTo)) => ackTo ! ShardRegion.StartEntityAck(entityId, shardId) case RememberingStart(ackTo) => ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId))
case _ => case _ =>
} }
touchLastMessageTimestamp(entityId) touchLastMessageTimestamp(entityId)
} }
@ -751,7 +757,6 @@ private[akka] class Shard(
touchLastMessageTimestamp(entityId) touchLastMessageTimestamp(entityId)
ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId))
case _: RememberingStart => case _: RememberingStart =>
// FIXME safe to replace ackTo?
entities.rememberingStart(entityId, ackTo) entities.rememberingStart(entityId, ackTo)
case RememberedButNotCreated => case RememberedButNotCreated =>
// already remembered, just start it - this will be the normal path for initially remembered entities // already remembered, just start it - this will be the normal path for initially remembered entities