diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 0936e5acde..086ecbe3db 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -366,6 +366,7 @@ class ShardRegion( var loggedFullBufferWarning = false var shards = Map.empty[ShardId, ActorRef] var shardsByRef = Map.empty[ActorRef, ShardId] + var startingShards = Set.empty[ShardId] var handingOff = Set.empty[ActorRef] var gracefulShutdownInProgress = false @@ -560,6 +561,7 @@ class ShardRegion( shardsByRef = shardsByRef - ref shards = shards - shardId + startingShards -= shardId if (handingOff.contains(ref)) { handingOff = handingOff - ref log.debug("Shard [{}] handoff complete", shardId) @@ -632,7 +634,7 @@ class ShardRegion( def initializeShard(id: ShardId, shard: ActorRef): Unit = { log.debug("Shard was initialized {}", id) - shards = shards.updated(id, shard) + startingShards -= id deliverBufferedMessages(id, shard) } @@ -722,29 +724,38 @@ class ShardRegion( } } - def getShard(id: ShardId): Option[ActorRef] = shards.get(id).orElse( - entityProps match { - case Some(props) if !shardsByRef.values.exists(_ == id) ⇒ - log.debug("Starting shard [{}] in region", id) + def getShard(id: ShardId): Option[ActorRef] = { + if (startingShards.contains(id)) + None + else { + shards.get(id).orElse( + entityProps match { + case Some(props) if !shardsByRef.values.exists(_ == id) ⇒ + log.debug("Starting shard [{}] in region", id) - val name = URLEncoder.encode(id, "utf-8") - val shard = context.watch(context.actorOf( - Shard.props( - typeName, - id, - props, - settings, - extractEntityId, - extractShardId, - handOffStopMessage).withDispatcher(context.props.dispatcher), - name)) - shardsByRef = shardsByRef.updated(shard, id) - None - case Some(props) ⇒ - None - case None ⇒ - throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion") - }) + val name = URLEncoder.encode(id, "utf-8") + val shard = context.watch(context.actorOf( + Shard.props( + typeName, + id, + props, + settings, + extractEntityId, + extractShardId, + handOffStopMessage).withDispatcher(context.props.dispatcher), + name)) + shardsByRef = shardsByRef.updated(shard, id) + shards = shards.updated(id, shard) + startingShards += id + None + case Some(props) ⇒ + None + case None ⇒ + throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion") + } + ) + } + } def sendGracefulShutdownToCoordinator(): Unit = if (gracefulShutdownInProgress)