Start entity ack not needed in shard (#29164)

* The only sender of StartEntityAck to shard is RememberEntitiesStarter

That is not really needed, because the shard already knows if it started an entity, so we can drop that logic

* Separate command for the case where ids has been relocated by a new shard id extractor algo

* Store that entities were moved on shard id extractor change and cover with test

* Do'h. Can't mark entity as being remembered before checking if there are any entities being remembered

* Comment explaining test
This commit is contained in:
Johan Andrén 2020-06-03 10:40:33 +02:00 committed by GitHub
parent 224fb1592d
commit 85e11c8941
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 235 additions and 113 deletions

View file

@ -63,6 +63,12 @@ private[akka] object Shard {
*/
final case class RestartRememberedEntities(entity: Set[EntityId]) extends RememberEntityCommand
/**
* If the shard id extractor is changed, remembered entities will start in a different shard
* and this message is sent to the shard to not leak `entityId -> RememberedButNotStarted` entries
*/
final case class EntitiesMovedToOtherShard(ids: Set[ShardRegion.ShardId]) extends RememberEntityCommand
/**
* A query for information about the shard
*/
@ -114,33 +120,36 @@ private[akka] object Shard {
/**
* State machine for an entity:
* {{{
* Entity id remembered on shard start +-------------------------+ restart (via region) StartEntity
* +--------------------------------->| RememberedButNotCreated |------------------------------+
* | +-------------------------+ |
* | | |
* | | early message for entity |
* | v |
* | Remember entities entity start +-------------------+ start stored and entity started |
* | +-----------------------> | RememberingStart |-------------+ v
* No state for id | | +-------------------+ | +------------+
* +---+ | | +-------------> | Active |
* | |--------|--------|------------------------------------------------------------ +------------+
* +---+ | Non remember entities entity start |
* ^ | |
* | | |
* | | restart after backoff entity terminated |
* | | or message for entity +-------------------+ without passivation | passivation initiated +-------------+
* | +<------------------------------| WaitingForRestart |<-----------------+-----------+----------------------------> | Passivating |
* | | +-------------------+ | +-------------+
* | | | |
* | | | entity terminated +--------------+
* | | | v |
* | | There were buffered messages for entity | +-------------------+ |
* | +<---------------------------------------------------------------------+ | RememberingStop | |
* | +-------------------+ |
* | | |
* | | |
* +-------------------------------------------------------------------------------------------------------------------------------------------+<--------------
* Started on another shard bc. shard id extractor changed (we need to store that)
* +------------------------------------------------------------------+
* | |
* Entity id remembered on shard start +-------------------------+ StartEntity or early message for entity |
* +--------------------------------->| RememberedButNotCreated |------------------------------+ |
* | +-------------------------+ | |
* | | |
* | | |
* | Remember entities | |
* | message or StartEntity +-------------------+ start stored and entity started | |
* | +-----------------------> | RememberingStart |-------------+ v |
* No state for id | | +-------------------+ | +------------+ |
* +---+ | | +-------------> | Active | |
* | |--------|--------+-----------------------------------------------------------+ +------------+ |
* +---+ | Non remember entities message or StartEntity | |
* ^ | | |
* | | entity terminated | |
* | | restart after backoff without passivation | passivation |
* | | or message for entity +-------------------+ remember ent. | initiated \ +-------------+
* | +<------------------------------| WaitingForRestart |<---+-------------+-----------+--------------------|-------> | Passivating |
* | | +-------------------+ | | / +-------------+
* | | | | remember entities | entity |
* | | | | not used | terminated +--------------+
* | | | | | v |
* | | There were buffered messages for entity | | | +-------------------+ |
* | +<-------------------------------------------------------+ | +----> | RememberingStop | | remember entities
* | | +-------------------+ | not used
* | | | |
* | v | |
* +------------------------------------------------------------------------------------------+------------------------------------------------+<-------------+
* stop stored/passivation complete
* }}}
**/
@ -165,16 +174,13 @@ private[akka] object Shard {
/**
* In this state we know the entity has been stored in the remember sore but
* it hasn't been created yet. E.g. on restart when first getting all the
* remembered entity ids
* remembered entity ids.
*/
// FIXME: since remember entities on start has a hop via region this could be a (small) resource leak
// if the shard extractor has changed, the entities will stay in the map forever as RememberedButNotCreated
// do we really need to track it?
case object RememberedButNotCreated extends EntityState {
override def transition(newState: EntityState): EntityState = newState match {
case NoState => RememberedButNotCreated
case active: Active => active
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
case active: Active => active // started on this shard
case RememberingStop => RememberingStop // started on other shard
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
}
}
@ -207,16 +213,12 @@ private[akka] object Shard {
}
private val RememberingStartNoAck = new RememberingStart(Set.empty)
sealed trait StopReason
final case object PassivationComplete extends StopReason
final case object StartedElsewhere extends StopReason
/**
* When remember entities is enabled an entity is in this state while
* its stop is being recorded in the remember entities store, or while the stop is queued up
* to be stored in the next batch.
*/
final case class RememberingStop(stopReason: StopReason) extends EntityState {
final case object RememberingStop extends EntityState {
override def transition(newState: EntityState): EntityState = newState match {
case NoState => NoState
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
@ -235,9 +237,9 @@ private[akka] object Shard {
}
final case class Passivating(ref: ActorRef) extends WithRef {
override def transition(newState: EntityState): EntityState = newState match {
case r: RememberingStop => r
case NoState => NoState
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
case RememberingStop => RememberingStop
case NoState => NoState
case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed")
}
}
@ -268,10 +270,10 @@ private[akka] object Shard {
if (rememberingEntities)
remembering.add(entityId)
}
def rememberingStop(entityId: EntityId, reason: StopReason): Unit = {
def rememberingStop(entityId: EntityId): Unit = {
val state = entityState(entityId)
removeRefIfThereIsOne(state)
entities.put(entityId, state.transition(RememberingStop(reason)))
entities.put(entityId, state.transition(RememberingStop))
if (rememberingEntities)
remembering.add(entityId)
}
@ -356,7 +358,7 @@ private[akka] object Shard {
remembering.forEach(entityId =>
entityState(entityId) match {
case r: RememberingStart => starts += (entityId -> r)
case _: RememberingStop => stops += entityId
case RememberingStop => stops += entityId
case wat => throw new IllegalStateException(s"$entityId was in the remembering set but has state $wat")
})
(starts.result(), stops.result())
@ -565,7 +567,9 @@ private[akka] class Shard(
def restartEntities(ids: Set[EntityId]): Unit = {
log.debug("Restarting set of [{}] entities", ids.size)
context.actorOf(RememberEntityStarter.props(context.parent, ids, settings, sender()))
context.actorOf(
RememberEntityStarter.props(context.parent, self, shardId, ids, settings),
"RememberEntitiesStarter")
}
// ===== shard up and running =====
@ -576,7 +580,6 @@ private[akka] class Shard(
case msg: CoordinatorMessage => receiveCoordinatorMessage(msg)
case msg: RememberEntityCommand => receiveRememberEntityCommand(msg)
case msg: ShardRegion.StartEntity => startEntity(msg.entityId, Some(sender()))
case msg: ShardRegion.StartEntityAck => receiveStartEntityAck(msg)
case Passivate(stopMessage) => passivate(sender(), stopMessage)
case msg: ShardQuery => receiveShardQuery(msg)
case PassivateIdleTick => passivateIdleEntities()
@ -643,7 +646,6 @@ private[akka] class Shard(
case _: CoordinatorMessage => stash()
case cmd: RememberEntityCommand => receiveRememberEntityCommand(cmd)
case l: LeaseLost => receiveLeaseLost(l)
case ack: ShardRegion.StartEntityAck => receiveStartEntityAck(ack)
case ShardRegion.Passivate(stopMessage) =>
if (verboseDebug)
log.debug(
@ -682,13 +684,9 @@ private[akka] class Shard(
}
stops.foreach { entityId =>
entities.entityState(entityId) match {
case RememberingStop(PassivationComplete) =>
case RememberingStop =>
// this updates entity state
passivateCompleted(entityId)
case RememberingStop(StartedElsewhere) =>
// Drop buffered messages if any (to not cause re-ordering)
dropBufferFor(entityId, "Entity started on another node")
entities.removeEntity(entityId)
case state =>
throw new IllegalStateException(
s"Unexpected state [$state] when storing stop completed for entity id [$entityId]")
@ -742,6 +740,19 @@ private[akka] class Shard(
}
case RestartRememberedEntities(ids) => restartEntities(ids)
case EntitiesMovedToOtherShard(movedEntityIds) =>
log.info(
"Clearing [{}] remembered entities started elsewhere because of changed shard id extractor",
movedEntityIds.size)
movedEntityIds.foreach { entityId =>
entities.entityState(entityId) match {
case RememberedButNotCreated =>
entities.rememberingStop(entityId)
case other =>
throw new IllegalStateException(s"Unexpected state for [$entityId] when getting ShardIdsMoved: [$other]")
}
}
rememberUpdate(remove = movedEntityIds)
}
// this could be because of a start message or due to a new message for the entity
@ -761,6 +772,8 @@ private[akka] class Shard(
touchLastMessageTimestamp(entityId)
ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId))
case NoState =>
// started manually from the outside, or the shard id extractor was changed since the entity was remembered
// we need to store that it was started
log.debug("Request to start entity [{}] and ack to [{}]", entityId, ackTo)
entities.rememberingStart(entityId, ackTo)
rememberUpdate(add = Set(entityId))
@ -770,40 +783,6 @@ private[akka] class Shard(
}
}
// FIXME in what scenario do we get this to the shard?
private def receiveStartEntityAck(ack: ShardRegion.StartEntityAck): Unit = {
if (ack.shardId != shardId) {
entities.entityState(shardId) match {
case RememberingStart(_) | RememberingStop(_) =>
log.debug(
"Entity [{}] previously owned by shard [{}] started in shard [{}] while waiting to be written, stashing for later handling",
ack.entityId,
shardId,
ack.shardId)
stash()
case Active(ref) =>
log.debug(
"Entity [{}] previously owned by shard [{}] started in shard [{}]",
ack.entityId,
shardId,
ack.shardId)
entities.entityPassivating(ack.entityId)
ref ! handOffStopMessage
case RememberedButNotCreated =>
// FIXME could be pending with the starting strategy though, what happens when that arrives?
log.debug(
"Entity [{}] (remembered but not yet created) previously owned by shard [{}] started in shard [{}]",
ack.entityId,
shardId,
ack.shardId)
entities.removeEntity(ack.entityId)
case other =>
throw new IllegalStateException(
s"Unexpected state [$other] when start entity ack of entity [${ack.entityId}] was seen from shard [${ack.shardId}]")
}
}
}
private def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match {
case HandOff(`shardId`) => handOff(sender())
case HandOff(shard) => log.warning("Shard [{}] can not hand off for another Shard [{}]", shardId, shard)
@ -830,8 +809,10 @@ private[akka] class Shard(
log.debug("Starting HandOffStopper for shard [{}] to terminate [{}] entities.", shardId, activeEntities.size)
activeEntities.foreach(context.unwatch(_))
handOffStopper = Some(
context.watch(context.actorOf(
handOffStopperProps(shardId, replyTo, activeEntities, handOffStopMessage, entityHandOffTimeout))))
context.watch(
context.actorOf(
handOffStopperProps(shardId, replyTo, activeEntities, handOffStopMessage, entityHandOffTimeout),
"HandOffStopper")))
//During hand off we only care about watching for termination of the hand off stopper
context.become {
@ -862,7 +843,7 @@ private[akka] class Shard(
lastMessageTimestamp -= entityId
}
entities.entityState(entityId) match {
case RememberingStop(_) =>
case RememberingStop =>
if (verboseDebug)
log.debug("Stop of [{}] arrived, already is among the pending stops", entityId)
case Active(_) =>
@ -878,9 +859,9 @@ private[akka] class Shard(
log.debug(
"Stop of [{}] after passivating, arrived while updating, adding it to batch of pending stops",
entityId)
entities.rememberingStop(entityId, PassivationComplete)
entities.rememberingStop(entityId)
} else {
entities.rememberingStop(entityId, PassivationComplete)
entities.rememberingStop(entityId)
rememberUpdate(remove = Set(entityId))
}
case unexpected =>
@ -974,7 +955,7 @@ private[akka] class Shard(
log.debug("Delivering message of type [{}] to [{}]", payload.getClass, entityId)
touchLastMessageTimestamp(entityId)
ref.tell(payload, snd)
case RememberingStart(_) | RememberingStop(_) | Passivating(_) =>
case RememberingStart(_) | RememberingStop | Passivating(_) =>
appendToMessageBuffer(entityId, msg, snd)
case state @ (WaitingForRestart | RememberedButNotCreated) =>
if (verboseDebug)