From 13aed055fd6e5ea1eabf1e6810db8fe29918f640 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 2 Jun 2020 14:58:04 +0200 Subject: [PATCH] Publish Dropped message when sharding drops buffered messages (#29163) --- .../main/scala/akka/util/MessageBuffer.scala | 15 ++++++++++++++- .../scala/akka/cluster/sharding/Shard.scala | 17 ++++++++--------- .../akka/cluster/sharding/ShardRegion.scala | 8 +++++++- 3 files changed, 29 insertions(+), 11 deletions(-) diff --git a/akka-actor/src/main/scala/akka/util/MessageBuffer.scala b/akka-actor/src/main/scala/akka/util/MessageBuffer.scala index 6bae55b8b8..24ec0938f2 100644 --- a/akka-actor/src/main/scala/akka/util/MessageBuffer.scala +++ b/akka-actor/src/main/scala/akka/util/MessageBuffer.scala @@ -4,7 +4,7 @@ package akka.util -import akka.actor.ActorRef +import akka.actor.{ ActorRef, Dropped } import akka.annotation.InternalApi import akka.japi.function.Procedure2 @@ -247,6 +247,19 @@ final class MessageBufferMap[I] { bufferMap.remove(id) } + /** + * Remove the buffer for an id, but publish a [[akka.actor.Dropped]] for each dropped buffered message + * @return how many buffered messages were dropped + */ + def drop(id: I, reason: String, deadLetters: ActorRef): Int = { + val entries = bufferMap.get(id) + if (entries.nonEmpty) { + entries.foreach((msg, ref) => deadLetters ! Dropped(msg, reason, ref, ActorRef.noSender)) + } + remove(id) + entries.size + } + /** * Check if the buffer map contains an id. * diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index d0c1c1d6c5..13d743c45f 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -12,6 +12,7 @@ import akka.actor.ActorLogging import akka.actor.ActorRef import akka.actor.DeadLetterSuppression import akka.actor.Deploy +import akka.actor.Dropped import akka.actor.NoSerializationVerificationNeeded import akka.actor.Props import akka.actor.Stash @@ -722,7 +723,7 @@ private[akka] class Shard( passivateCompleted(entityId) case RememberingStop(StartedElsewhere) => // Drop buffered messages if any (to not cause re-ordering) - messageBuffers.remove(entityId) + dropBufferFor(entityId, "Entity started on another node") entities.removeEntity(entityId) case state => throw new IllegalStateException( @@ -961,7 +962,7 @@ private[akka] class Shard( val (entityId, payload) = extractEntityId(msg) if (entityId == null || entityId == "") { log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName) - context.system.deadLetters ! msg + context.system.deadLetters ! Dropped(msg, "No recipient entity id", snd, self) } else { payload match { case start: ShardRegion.StartEntity => @@ -1042,7 +1043,7 @@ private[akka] class Shard( if (messageBuffers.totalSize >= settings.tuningParameters.bufferSize) { if (log.isDebugEnabled) log.debug("Buffer is full, dropping message of type [{}] for entity [{}]", msg.getClass.getName, id) - context.system.deadLetters ! msg + context.system.deadLetters ! Dropped(msg, s"Buffer for [$id] is full", snd, self) } else { if (log.isDebugEnabled) log.debug("Message of type [{}] for entity [{}] buffered", msg.getClass.getName, id) @@ -1068,13 +1069,11 @@ private[akka] class Shard( } } - def dropBufferFor(entityId: EntityId): Unit = { - if (log.isDebugEnabled) { - val messages = messageBuffers.getOrEmpty(entityId) - if (messages.nonEmpty) - log.debug("Dropping [{}] buffered messages", entityId, messages.size) + def dropBufferFor(entityId: EntityId, reason: String): Unit = { + val count = messageBuffers.drop(entityId, reason, context.system.deadLetters) + if (log.isDebugEnabled && count > 0) { + log.debug("Dropping [{}] buffered messages for [{}] because {}", count, entityId, reason) } - messageBuffers.remove(entityId) } private def rememberEntityStoreCrashed(msg: RememberEntityStoreCrashed): Unit = { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 4572cd71a6..59253564b5 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -755,7 +755,13 @@ private[akka] class ShardRegion( // because they might be forwarded from other regions and there // is a risk or message re-ordering otherwise if (shardBuffers.contains(shard)) { - shardBuffers.remove(shard) + val dropped = shardBuffers + .drop(shard, "Avoiding reordering of buffered messages at shard handoff", context.system.deadLetters) + if (dropped > 0) + log.warning( + "Dropping [{}] buffered messages to shard [{}] during hand off to avoid re-ordering", + dropped, + shard) loggedFullBufferWarning = false }