diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 564f9812f3..f5685c89f8 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -7,6 +7,7 @@ package akka.cluster.sharding import java.net.URLEncoder import java.util.Optional import java.util.concurrent.ConcurrentHashMap +import java.util.function.{ Function ⇒ JFunc } import scala.collection.JavaConverters._ import scala.collection.immutable @@ -157,6 +158,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProv * @see [[ClusterSharding$ ClusterSharding companion object]] */ class ClusterSharding(system: ExtendedActorSystem) extends Extension { + import ClusterShardingGuardian._ import ShardCoordinator.LeastShardAllocationStrategy import ShardCoordinator.ShardAllocationStrategy @@ -229,16 +231,18 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { handOffStopMessage: Any): ActorRef = { if (settings.shouldHostShard(cluster)) { + regions.computeIfAbsent(typeName, new JFunc[String, ActorRef] { + def apply(typeName: String): ActorRef = { + implicit val timeout = system.settings.CreationTimeout + val startMsg = Start(typeName, entityProps, settings, + extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) + val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) + shardRegion + } + }) - implicit val timeout = system.settings.CreationTimeout - val startMsg = Start(typeName, entityProps, settings, - extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) - val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) - regions.put(typeName, shardRegion) - shardRegion } else { log.debug("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...", typeName) - startProxy( typeName, settings.role, @@ -413,14 +417,16 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { dataCenter: Option[DataCenter], extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId): ActorRef = { - - implicit val timeout = system.settings.CreationTimeout - val settings = ClusterShardingSettings(system).withRole(role) - val startMsg = StartProxy(typeName, dataCenter, settings, extractEntityId, extractShardId) - val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) // it must be possible to start several proxies, one per data center - proxies.put(proxyName(typeName, dataCenter), shardRegion) - shardRegion + proxies.computeIfAbsent(proxyName(typeName, dataCenter), new JFunc[String, ActorRef] { + def apply(name: String): ActorRef = { + implicit val timeout = system.settings.CreationTimeout + val settings = ClusterShardingSettings(system).withRole(role) + val startMsg = StartProxy(typeName, dataCenter, settings, extractEntityId, extractShardId) + val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) + shardRegion + } + }) } private def proxyName(typeName: String, dataCenter: Option[DataCenter]): String = {