Avoid ask to sharding guardian if region already cached (#25407)

This commit is contained in:
Christopher Batey 2018-07-30 15:48:16 +01:00 committed by Konrad `ktoso` Malawski
parent 4ddacb724d
commit 90c1f2f79b

View file

@ -7,6 +7,7 @@ package akka.cluster.sharding
import java.net.URLEncoder import java.net.URLEncoder
import java.util.Optional import java.util.Optional
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ Function JFunc }
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.immutable import scala.collection.immutable
@ -157,6 +158,7 @@ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProv
* @see [[ClusterSharding$ ClusterSharding companion object]] * @see [[ClusterSharding$ ClusterSharding companion object]]
*/ */
class ClusterSharding(system: ExtendedActorSystem) extends Extension { class ClusterSharding(system: ExtendedActorSystem) extends Extension {
import ClusterShardingGuardian._ import ClusterShardingGuardian._
import ShardCoordinator.LeastShardAllocationStrategy import ShardCoordinator.LeastShardAllocationStrategy
import ShardCoordinator.ShardAllocationStrategy import ShardCoordinator.ShardAllocationStrategy
@ -229,16 +231,18 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
handOffStopMessage: Any): ActorRef = { handOffStopMessage: Any): ActorRef = {
if (settings.shouldHostShard(cluster)) { if (settings.shouldHostShard(cluster)) {
regions.computeIfAbsent(typeName, new JFunc[String, ActorRef] {
def apply(typeName: String): ActorRef = {
implicit val timeout = system.settings.CreationTimeout
val startMsg = Start(typeName, entityProps, settings,
extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
shardRegion
}
})
implicit val timeout = system.settings.CreationTimeout
val startMsg = Start(typeName, entityProps, settings,
extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
regions.put(typeName, shardRegion)
shardRegion
} else { } else {
log.debug("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...", typeName) log.debug("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...", typeName)
startProxy( startProxy(
typeName, typeName,
settings.role, settings.role,
@ -413,14 +417,16 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
dataCenter: Option[DataCenter], dataCenter: Option[DataCenter],
extractEntityId: ShardRegion.ExtractEntityId, extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId): ActorRef = { extractShardId: ShardRegion.ExtractShardId): ActorRef = {
implicit val timeout = system.settings.CreationTimeout
val settings = ClusterShardingSettings(system).withRole(role)
val startMsg = StartProxy(typeName, dataCenter, settings, extractEntityId, extractShardId)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
// it must be possible to start several proxies, one per data center // it must be possible to start several proxies, one per data center
proxies.put(proxyName(typeName, dataCenter), shardRegion) proxies.computeIfAbsent(proxyName(typeName, dataCenter), new JFunc[String, ActorRef] {
shardRegion def apply(name: String): ActorRef = {
implicit val timeout = system.settings.CreationTimeout
val settings = ClusterShardingSettings(system).withRole(role)
val startMsg = StartProxy(typeName, dataCenter, settings, extractEntityId, extractShardId)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
shardRegion
}
})
} }
private def proxyName(typeName: String, dataCenter: Option[DataCenter]): String = { private def proxyName(typeName: String, dataCenter: Option[DataCenter]): String = {