Publish Dropped message when sharding drops buffered messages (#29163)
This commit is contained in:
parent
f70d33145c
commit
13aed055fd
3 changed files with 29 additions and 11 deletions
|
|
@ -4,7 +4,7 @@
|
||||||
|
|
||||||
package akka.util
|
package akka.util
|
||||||
|
|
||||||
import akka.actor.ActorRef
|
import akka.actor.{ ActorRef, Dropped }
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.japi.function.Procedure2
|
import akka.japi.function.Procedure2
|
||||||
|
|
||||||
|
|
@ -247,6 +247,19 @@ final class MessageBufferMap[I] {
|
||||||
bufferMap.remove(id)
|
bufferMap.remove(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the buffer for an id, but publish a [[akka.actor.Dropped]] for each dropped buffered message
|
||||||
|
* @return how many buffered messages were dropped
|
||||||
|
*/
|
||||||
|
def drop(id: I, reason: String, deadLetters: ActorRef): Int = {
|
||||||
|
val entries = bufferMap.get(id)
|
||||||
|
if (entries.nonEmpty) {
|
||||||
|
entries.foreach((msg, ref) => deadLetters ! Dropped(msg, reason, ref, ActorRef.noSender))
|
||||||
|
}
|
||||||
|
remove(id)
|
||||||
|
entries.size
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the buffer map contains an id.
|
* Check if the buffer map contains an id.
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,7 @@ import akka.actor.ActorLogging
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
import akka.actor.DeadLetterSuppression
|
import akka.actor.DeadLetterSuppression
|
||||||
import akka.actor.Deploy
|
import akka.actor.Deploy
|
||||||
|
import akka.actor.Dropped
|
||||||
import akka.actor.NoSerializationVerificationNeeded
|
import akka.actor.NoSerializationVerificationNeeded
|
||||||
import akka.actor.Props
|
import akka.actor.Props
|
||||||
import akka.actor.Stash
|
import akka.actor.Stash
|
||||||
|
|
@ -722,7 +723,7 @@ private[akka] class Shard(
|
||||||
passivateCompleted(entityId)
|
passivateCompleted(entityId)
|
||||||
case RememberingStop(StartedElsewhere) =>
|
case RememberingStop(StartedElsewhere) =>
|
||||||
// Drop buffered messages if any (to not cause re-ordering)
|
// Drop buffered messages if any (to not cause re-ordering)
|
||||||
messageBuffers.remove(entityId)
|
dropBufferFor(entityId, "Entity started on another node")
|
||||||
entities.removeEntity(entityId)
|
entities.removeEntity(entityId)
|
||||||
case state =>
|
case state =>
|
||||||
throw new IllegalStateException(
|
throw new IllegalStateException(
|
||||||
|
|
@ -961,7 +962,7 @@ private[akka] class Shard(
|
||||||
val (entityId, payload) = extractEntityId(msg)
|
val (entityId, payload) = extractEntityId(msg)
|
||||||
if (entityId == null || entityId == "") {
|
if (entityId == null || entityId == "") {
|
||||||
log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName)
|
log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName)
|
||||||
context.system.deadLetters ! msg
|
context.system.deadLetters ! Dropped(msg, "No recipient entity id", snd, self)
|
||||||
} else {
|
} else {
|
||||||
payload match {
|
payload match {
|
||||||
case start: ShardRegion.StartEntity =>
|
case start: ShardRegion.StartEntity =>
|
||||||
|
|
@ -1042,7 +1043,7 @@ private[akka] class Shard(
|
||||||
if (messageBuffers.totalSize >= settings.tuningParameters.bufferSize) {
|
if (messageBuffers.totalSize >= settings.tuningParameters.bufferSize) {
|
||||||
if (log.isDebugEnabled)
|
if (log.isDebugEnabled)
|
||||||
log.debug("Buffer is full, dropping message of type [{}] for entity [{}]", msg.getClass.getName, id)
|
log.debug("Buffer is full, dropping message of type [{}] for entity [{}]", msg.getClass.getName, id)
|
||||||
context.system.deadLetters ! msg
|
context.system.deadLetters ! Dropped(msg, s"Buffer for [$id] is full", snd, self)
|
||||||
} else {
|
} else {
|
||||||
if (log.isDebugEnabled)
|
if (log.isDebugEnabled)
|
||||||
log.debug("Message of type [{}] for entity [{}] buffered", msg.getClass.getName, id)
|
log.debug("Message of type [{}] for entity [{}] buffered", msg.getClass.getName, id)
|
||||||
|
|
@ -1068,13 +1069,11 @@ private[akka] class Shard(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def dropBufferFor(entityId: EntityId): Unit = {
|
def dropBufferFor(entityId: EntityId, reason: String): Unit = {
|
||||||
if (log.isDebugEnabled) {
|
val count = messageBuffers.drop(entityId, reason, context.system.deadLetters)
|
||||||
val messages = messageBuffers.getOrEmpty(entityId)
|
if (log.isDebugEnabled && count > 0) {
|
||||||
if (messages.nonEmpty)
|
log.debug("Dropping [{}] buffered messages for [{}] because {}", count, entityId, reason)
|
||||||
log.debug("Dropping [{}] buffered messages", entityId, messages.size)
|
|
||||||
}
|
}
|
||||||
messageBuffers.remove(entityId)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private def rememberEntityStoreCrashed(msg: RememberEntityStoreCrashed): Unit = {
|
private def rememberEntityStoreCrashed(msg: RememberEntityStoreCrashed): Unit = {
|
||||||
|
|
|
||||||
|
|
@ -755,7 +755,13 @@ private[akka] class ShardRegion(
|
||||||
// because they might be forwarded from other regions and there
|
// because they might be forwarded from other regions and there
|
||||||
// is a risk or message re-ordering otherwise
|
// is a risk or message re-ordering otherwise
|
||||||
if (shardBuffers.contains(shard)) {
|
if (shardBuffers.contains(shard)) {
|
||||||
shardBuffers.remove(shard)
|
val dropped = shardBuffers
|
||||||
|
.drop(shard, "Avoiding reordering of buffered messages at shard handoff", context.system.deadLetters)
|
||||||
|
if (dropped > 0)
|
||||||
|
log.warning(
|
||||||
|
"Dropping [{}] buffered messages to shard [{}] during hand off to avoid re-ordering",
|
||||||
|
dropped,
|
||||||
|
shard)
|
||||||
loggedFullBufferWarning = false
|
loggedFullBufferWarning = false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue