A stash that should have been buffer #28957 (#28987)

This commit is contained in:
Johan Andrén 2020-05-05 10:45:49 +02:00 committed by GitHub
parent 29d288a4ba
commit bc2671757f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 19 deletions

View file

@ -353,7 +353,7 @@ private[akka] class Shard(
settings.tuningParameters.updatingStateTimeout)
context.become {
case RememberEntitiesShardStore.UpdateDone(entityId) =>
case RememberEntitiesShardStore.UpdateDone(`entityId`) =>
if (VerboseDebug) log.debug("Update of [{}] {} done", entityId, command)
timers.cancel(RememberEntityTimeoutKey)
whenDone(entityId)
@ -550,7 +550,7 @@ private[akka] class Shard(
val hasBufferedMessages = messageBuffers.getOrEmpty(entityId).nonEmpty
entityIds = entityIds - entityId
if (hasBufferedMessages) {
log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages.", entityId)
log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages", entityId)
waitForAsyncWrite(entityId, RememberEntitiesShardStore.AddEntity(entityId))(sendMsgBuffer)
} else {
log.debug("Entity stopped after passivation [{}]", entityId)
@ -614,23 +614,35 @@ private[akka] class Shard(
actor.tell(payload, snd)
} else {
if (entityIdWaitingForWrite.isEmpty) {
// No actor and id is unknown, start actor and deliver message when started
// Note; we only do this if remembering, otherwise the buffer is an overhead
if (VerboseDebug)
log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, id)
appendToMessageBuffer(id, msg, snd)
waitForAsyncWrite(id, RememberEntitiesShardStore.AddEntity(id))(sendMsgBuffer)
} else {
// we'd need to start the entity but a start/stop write is already in progress
// see waitForAsyncWrite for unstash
if (VerboseDebug)
log.debug(
"Stashing message [{}] to [{}] because of write in progress for [{}]",
payload.getClass,
id,
entityIdWaitingForWrite.get)
stash()
entityIdWaitingForWrite match {
case OptionVal.None =>
// No actor running and no write in progress, start actor and deliver message when started
// Note; we only do this if remembering, otherwise the buffer is an overhead
if (VerboseDebug)
log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, id)
appendToMessageBuffer(id, msg, snd)
waitForAsyncWrite(id, RememberEntitiesShardStore.AddEntity(id))(sendMsgBuffer)
case OptionVal.Some(`id`) =>
// No actor running and write in progress for this particular id, buffer message for deliver when
// write completes
if (VerboseDebug)
log.debug(
"Buffering message [{}] to [{}] because of write in progress for it",
payload.getClass,
id)
appendToMessageBuffer(id, msg, snd)
case OptionVal.Some(otherId) =>
// No actor running and write in progress for some other entity id, stash message for deliver when
// unstash happens on async write complete
if (VerboseDebug)
log.debug(
"Stashing message [{}] to [{}] because of write in progress for [{}]",
payload.getClass,
id,
otherId)
stash()
}
}
}