From 1ef65091f62cb05ad4a58131ea0f3763b85c5602 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Mon, 20 May 2019 08:37:55 +0200 Subject: [PATCH] Deliver buffered messages after passivation, #26957 * This problem was introduced in the optimization in PR #26878, and that regression has not been released. * While waiting for the ddata update response it buffers messages for the entity that is stopped/started and in the case of passivation those buffered messages were not delivered afterwards. Therefore the test failed when waiting for the expected response. --- .../main/scala/akka/cluster/sharding/Shard.scala | 13 +++++++++++-- .../akka/cluster/sharding/ShardCoordinator.scala | 8 +++++++- .../sharding/ClusterShardingFailureSpec.scala | 12 +++++++++--- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 4385b18105..d58808431e 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -430,9 +430,18 @@ private[akka] class Shard( // EntityStopped handler def passivateCompleted(event: EntityStopped): Unit = { - log.debug("Entity stopped after passivation [{}]", event.entityId) + val hasBufferedMessages = messageBuffers.getOrEmpty(event.entityId).nonEmpty state = state.copy(state.entities - event.entityId) - messageBuffers.remove(event.entityId) + if (hasBufferedMessages) { + log.debug( + "Entity stopped after passivation [{}], but will be started again due to buffered messages.", + event.entityId) + processChange(EntityStarted(event.entityId))(sendMsgBuffer) + } else { + log.debug("Entity stopped after passivation [{}]", event.entityId) + messageBuffers.remove(event.entityId) + } + } // EntityStarted handler diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index d9ddf45a18..417043dac4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -1171,8 +1171,14 @@ class DDataShardCoordinator( unstashAll() } - private def stashGetShardHomeRequest(sender: ActorRef, request: GetShardHome): Unit = + private def stashGetShardHomeRequest(sender: ActorRef, request: GetShardHome): Unit = { + log.debug( + "GetShardHome [{}] request from [{}] stashed, because waiting for initial state or update of state. " + + "It will be handled afterwards.", + request.shard, + sender) getShardHomeRequests += (sender -> request) + } private def unstashGetShardHomeRequests(): Unit = { getShardHomeRequests.foreach { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index aaea2e3e56..8d6e98989f 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -238,10 +238,16 @@ abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConf //Test the Shard passivate works after a journal failure shard2.tell(Passivate(PoisonPill), entity21) - awaitCond({ + awaitAssert { + // Note that the order between this Get message to 21 and the above Passivate to 21 is undefined. + // If this Get arrives first the reply will be Value("21", 3) and then it is retried by the + // awaitAssert. + // Also note that there is no timeout parameter on below expectMsg because messages should not + // be lost here. They should be buffered and delivered also after Passivate completed. region ! Get("21") - expectMsgType[Value] == Value("21", 0) - }, message = "Passivating did not reset Value down to 0") + // counter reset to 0 when started again + expectMsg(Value("21", 0)) + } region ! Add("21", 1)