diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index ad4e243a96..7128fc1c07 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -353,7 +353,7 @@ private[akka] class Shard( settings.tuningParameters.updatingStateTimeout) context.become { - case RememberEntitiesShardStore.UpdateDone(entityId) => + case RememberEntitiesShardStore.UpdateDone(`entityId`) => if (VerboseDebug) log.debug("Update of [{}] {} done", entityId, command) timers.cancel(RememberEntityTimeoutKey) whenDone(entityId) @@ -550,7 +550,7 @@ private[akka] class Shard( val hasBufferedMessages = messageBuffers.getOrEmpty(entityId).nonEmpty entityIds = entityIds - entityId if (hasBufferedMessages) { - log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages.", entityId) + log.debug("Entity stopped after passivation [{}], but will be started again due to buffered messages", entityId) waitForAsyncWrite(entityId, RememberEntitiesShardStore.AddEntity(entityId))(sendMsgBuffer) } else { log.debug("Entity stopped after passivation [{}]", entityId) @@ -614,23 +614,35 @@ private[akka] class Shard( actor.tell(payload, snd) } else { - if (entityIdWaitingForWrite.isEmpty) { - // No actor and id is unknown, start actor and deliver message when started - // Note; we only do this if remembering, otherwise the buffer is an overhead - if (VerboseDebug) - log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, id) - appendToMessageBuffer(id, msg, snd) - waitForAsyncWrite(id, RememberEntitiesShardStore.AddEntity(id))(sendMsgBuffer) - } else { - // we'd need to start the entity but a start/stop write is already in progress - // see waitForAsyncWrite for unstash - if (VerboseDebug) - log.debug( - "Stashing message [{}] to [{}] because of write in progress for [{}]", - payload.getClass, - id, - entityIdWaitingForWrite.get) - stash() + entityIdWaitingForWrite match { + case OptionVal.None => + // No actor running and no write in progress, start actor and deliver message when started + // Note; we only do this if remembering, otherwise the buffer is an overhead + if (VerboseDebug) + log.debug("Buffering message [{}] to [{}] and starting actor", payload.getClass, id) + appendToMessageBuffer(id, msg, snd) + waitForAsyncWrite(id, RememberEntitiesShardStore.AddEntity(id))(sendMsgBuffer) + + case OptionVal.Some(`id`) => + // No actor running and write in progress for this particular id, buffer message for deliver when + // write completes + if (VerboseDebug) + log.debug( + "Buffering message [{}] to [{}] because of write in progress for it", + payload.getClass, + id) + appendToMessageBuffer(id, msg, snd) + + case OptionVal.Some(otherId) => + // No actor running and write in progress for some other entity id, stash message for deliver when + // unstash happens on async write complete + if (VerboseDebug) + log.debug( + "Stashing message [{}] to [{}] because of write in progress for [{}]", + payload.getClass, + id, + otherId) + stash() } } } diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala index b8559b4668..218aed25ab 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/MultiNodeClusterShardingSpec.scala @@ -94,6 +94,8 @@ abstract class MultiNodeClusterShardingSpec(val config: MultiNodeClusterSharding protected lazy val storageLocations = List( new File(system.settings.config.getString("akka.cluster.sharding.distributed-data.durable.lmdb.dir")).getParentFile) + override def expectedTestDuration = 120.seconds + override protected def atStartup(): Unit = { storageLocations.foreach(dir => if (dir.exists) FileUtils.deleteQuietly(dir)) enterBarrier("startup")