Multiple remember entities fixes (#29124)

* Logging cleanup
* Cherry pick of Chbateys fix for watch/termination of entity
* When write is in progress or in batch queue message needs to go in buffer (If it doesn't it can re-order delivery)
This commit is contained in:
Johan Andrén 2020-05-26 09:03:56 +02:00 committed by GitHub
parent bee6f3cbf0
commit 0baf31cef7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 48 deletions

View file

@ -453,7 +453,7 @@ private[akka] class Shard(
whenDone()
case Some(store) =>
if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds, command)
if (VerboseDebug) log.debug("Update of [{}] [{}] triggered", entityIds.mkString(", "), command)
entityIds.foreach(entities.remembering)
store ! command
@ -469,7 +469,7 @@ private[akka] class Shard(
def waitingForUpdate(pendingStarts: Map[EntityId, Option[ActorRef]]): Receive = {
// none of the current impls will send back a partial update, yet!
case RememberEntitiesShardStore.UpdateDone(ids) =>
if (VerboseDebug) log.debug("Update done for ids {}", ids)
if (VerboseDebug) log.debug("Update done for ids [{}]", ids.mkString(", "))
timers.cancel(RememberEntityTimeoutKey)
whenDone()
if (pendingStarts.isEmpty) {
@ -477,7 +477,10 @@ private[akka] class Shard(
context.become(idle)
unstashAll()
} else {
if (VerboseDebug) log.debug("New entities encountered while waiting starting those: {}", pendingStarts)
if (VerboseDebug)
log.debug(
"New entities encountered while waiting starting those: [{}]",
pendingStarts.keys.mkString(", "))
startEntities(pendingStarts)
}
case RememberEntityTimeout(`command`) =>
@ -486,39 +489,40 @@ private[akka] class Shard(
case msg: ShardRegion.StartEntity =>
if (VerboseDebug)
log.debug(
"Start entity while a write already in progress. Pending writes {}. Writes in progress {}",
pendingStarts,
entityIds)
"Start entity while a write already in progress. Pending writes [{}]. Writes in progress [{}]",
pendingStarts.keys.mkString(", "),
entityIds.mkString(", "))
if (!entities.entityIdExists(msg.entityId))
context.become(waitingForUpdate(pendingStarts + (msg.entityId -> Some(sender()))))
// below cases should handle same messages as in idle
case _: Terminated => stash()
case _: EntityTerminated => stash()
case _: CoordinatorMessage => stash()
case _: RememberEntityCommand => stash()
case _: ShardRegion.StartEntityAck => stash()
case _: ShardRegionCommand => stash()
case msg: ShardQuery => receiveShardQuery(msg)
case PassivateIdleTick => stash()
case msg: LeaseLost => receiveLeaseLost(msg)
case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg)
case _: Terminated => stash()
case _: EntityTerminated => stash()
case _: CoordinatorMessage => stash()
case _: RememberEntityCommand => stash()
case _: ShardRegion.StartEntityAck => stash()
case _: ShardRegionCommand => stash()
case msg: ShardQuery => receiveShardQuery(msg)
case PassivateIdleTick => stash()
case msg: LeaseLost => receiveLeaseLost(msg)
case msg: RememberEntityStoreCrashed => rememberEntityStoreCrashed(msg)
case msg if extractEntityId.isDefinedAt(msg) =>
// FIXME now the delivery logic is again spread out across two places, is this needed over what is in deliverMessage?
val (id, _) = extractEntityId(msg)
if (entities.entityIdExists(id)) {
if (VerboseDebug) log.debug("Entity already known about. Try and deliver. {}", id)
if (VerboseDebug) log.debug("Entity already known about. Try and deliver. [{}]", id)
deliverMessage(msg, sender(), OptionVal.Some(entityIds))
} else {
if (VerboseDebug) log.debug("New entity, add it to batch of pending starts. {}", id)
stash()
if (VerboseDebug) log.debug("New entity, add it to batch of pending starts. [{}]", id)
appendToMessageBuffer(id, msg, sender())
context.become(waitingForUpdate(pendingStarts + (id -> None)))
}
case msg =>
// shouldn't be any other message types, but just in case
log.warning(
"Stashing unexpected message [{}] while waiting for remember entities update of {}",
"Stashing unexpected message [{}] while waiting for remember entities update of [{}]",
msg.getClass,
entityIds)
entityIds.mkString(", "))
stash()
}
}
@ -553,12 +557,12 @@ private[akka] class Shard(
private def startEntities(entities: Map[EntityId, Option[ActorRef]]): Unit = {
val alreadyStarted = entities.filterKeys(entity => this.entities.entityIdExists(entity))
val needStarting = entities -- alreadyStarted.keySet
log.debug(
"Request to start entities {}. Already started {}. Need starting {}",
entities,
alreadyStarted,
needStarting)
if (log.isDebugEnabled) {
log.debug(
"Request to start entities. Already started [{}]. Need starting [{}]",
alreadyStarted.keys.mkString(", "),
needStarting.keys.mkString(", "))
}
alreadyStarted.foreach {
case (entityId, requestor) =>
getOrCreateEntity(entityId)
@ -632,6 +636,13 @@ private[akka] class Shard(
private def receiveTerminated(ref: ActorRef): Unit = {
if (handOffStopper.contains(ref))
context.stop(self)
else {
// workaround for watchWith not working with stash #29101
entities.entityId(ref) match {
case OptionVal.Some(id) => entityTerminated(ref, id)
case _ =>
}
}
}
@InternalStableApi
@ -758,11 +769,11 @@ private[akka] class Shard(
// unstash happens on async write complete
if (VerboseDebug)
log.debug(
"Stashing message [{}] to [{}] because of write in progress for [{}]",
"Buffer message [{}] to [{}] (which is not started) because of write in progress for [{}]",
payload.getClass,
id,
entityIdsWaitingForWrite.get)
stash()
appendToMessageBuffer(id, msg, snd)
}
}
}
@ -775,7 +786,7 @@ private[akka] class Shard(
case OptionVal.None =>
val name = URLEncoder.encode(id, "utf-8")
val a = context.actorOf(entityProps(id), name)
context.watchWith(a, EntityTerminated(id, a))
context.watch(a)
log.debug("Started entity [{}] with entity id [{}] in shard [{}]", a, id, shardId)
entities.addEntity(id, a)
touchLastMessageTimestamp(id)