diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 4385b18105..d58808431e 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -430,9 +430,18 @@ private[akka] class Shard( // EntityStopped handler def passivateCompleted(event: EntityStopped): Unit = { - log.debug("Entity stopped after passivation [{}]", event.entityId) + val hasBufferedMessages = messageBuffers.getOrEmpty(event.entityId).nonEmpty state = state.copy(state.entities - event.entityId) - messageBuffers.remove(event.entityId) + if (hasBufferedMessages) { + log.debug( + "Entity stopped after passivation [{}], but will be started again due to buffered messages.", + event.entityId) + processChange(EntityStarted(event.entityId))(sendMsgBuffer) + } else { + log.debug("Entity stopped after passivation [{}]", event.entityId) + messageBuffers.remove(event.entityId) + } + } // EntityStarted handler diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index d9ddf45a18..417043dac4 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -1171,8 +1171,14 @@ class DDataShardCoordinator( unstashAll() } - private def stashGetShardHomeRequest(sender: ActorRef, request: GetShardHome): Unit = + private def stashGetShardHomeRequest(sender: ActorRef, request: GetShardHome): Unit = { + log.debug( + "GetShardHome [{}] request from [{}] stashed, because waiting for initial state or update of state. " + + "It will be handled afterwards.", + request.shard, + sender) getShardHomeRequests += (sender -> request) + } private def unstashGetShardHomeRequests(): Unit = { getShardHomeRequests.foreach { diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala index aaea2e3e56..8d6e98989f 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingFailureSpec.scala @@ -238,10 +238,16 @@ abstract class ClusterShardingFailureSpec(config: ClusterShardingFailureSpecConf //Test the Shard passivate works after a journal failure shard2.tell(Passivate(PoisonPill), entity21) - awaitCond({ + awaitAssert { + // Note that the order between this Get message to 21 and the above Passivate to 21 is undefined. + // If this Get arrives first the reply will be Value("21", 3) and then it is retried by the + // awaitAssert. + // Also note that there is no timeout parameter on below expectMsg because messages should not + // be lost here. They should be buffered and delivered also after Passivate completed. region ! Get("21") - expectMsgType[Value] == Value("21", 0) - }, message = "Passivating did not reset Value down to 0") + // counter reset to 0 when started again + expectMsg(Value("21", 0)) + } region ! Add("21", 1)