diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala index 8a5062118c..382ae5e56e 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/Shard.scala @@ -182,24 +182,34 @@ private[akka] object Shard { } } + object RememberingStart { + def apply(ackTo: Option[ActorRef]): RememberingStart = + ackTo match { + case None => RememberingStartNoAck + case Some(ackTo) => RememberingStart(Set(ackTo)) + } + } + /** * When remember entities is enabled an entity is in this state while * its existence is being recorded in the remember entities store, or while the stop is queued up * to be stored in the next batch. */ - final case class RememberingStart(ackTo: Option[ActorRef]) extends EntityState { + final case class RememberingStart(ackTo: Set[ActorRef]) extends EntityState { override def transition(newState: EntityState): EntityState = newState match { case active: Active => active case r: RememberingStart => - ackTo match { - case None => r - case Some(_) => - r // FIXME is this really fine, replace ackTo with another one or None + if (ackTo.isEmpty) { + if (r.ackTo.isEmpty) RememberingStartNoAck + else newState + } else { + if (r.ackTo.isEmpty) this + else RememberingStart(ackTo ++ r.ackTo) } case _ => throw new IllegalArgumentException(s"Transition from $this to $newState not allowed") } } - private val RememberingStartNoAck = RememberingStart(None) + private val RememberingStartNoAck = new RememberingStart(Set.empty) sealed trait StopReason final case object PassivationComplete extends StopReason @@ -256,11 +266,7 @@ private[akka] object Shard { } } def rememberingStart(entityId: EntityId, ackTo: Option[ActorRef]): Unit = { - // FIXME we queue these without bounds, is that ok? - val newState: RememberingStart = ackTo match { - case None => RememberingStartNoAck // small optimization avoiding alloc for regular start - case some => RememberingStart(some) - } + val newState = RememberingStart(ackTo) val state = entityState(entityId).transition(newState) entities.put(entityId, state) if (rememberingEntities) @@ -673,8 +679,8 @@ private[akka] class Shard( getOrCreateEntity(entityId) sendMsgBuffer(entityId) stateBeforeStart match { - case RememberingStart(Some(ackTo)) => ackTo ! ShardRegion.StartEntityAck(entityId, shardId) - case _ => + case RememberingStart(ackTo) => ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) + case _ => } touchLastMessageTimestamp(entityId) } @@ -751,7 +757,6 @@ private[akka] class Shard( touchLastMessageTimestamp(entityId) ackTo.foreach(_ ! ShardRegion.StartEntityAck(entityId, shardId)) case _: RememberingStart => - // FIXME safe to replace ackTo? entities.rememberingStart(entityId, ackTo) case RememberedButNotCreated => // already remembered, just start it - this will be the normal path for initially remembered entities