diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 9882a6bc3c..ed50afb403 100755 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -7,7 +7,6 @@ package akka.cluster.sharding import java.net.URLEncoder import java.util.Optional import java.util.concurrent.ConcurrentHashMap -import java.util.function.{ Function ⇒ JFunc } import scala.collection.JavaConverters._ import scala.collection.immutable @@ -158,7 +157,6 @@ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProv * @see [[ClusterSharding$ ClusterSharding companion object]] */ class ClusterSharding(system: ExtendedActorSystem) extends Extension { - import ClusterShardingGuardian._ import ShardCoordinator.LeastShardAllocationStrategy import ShardCoordinator.ShardAllocationStrategy @@ -231,18 +229,20 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { handOffStopMessage: Any): ActorRef = { if (settings.shouldHostShard(cluster)) { - regions.computeIfAbsent(typeName, new JFunc[String, ActorRef] { - def apply(typeName: String): ActorRef = { + regions.get(typeName) match { + case null ⇒ + // it's ok to Start several time, the guardian will deduplicate concurrent requests implicit val timeout = system.settings.CreationTimeout val startMsg = Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) + regions.put(typeName, shardRegion) shardRegion - } - }) - + case ref ⇒ ref // already started, use cached ActorRef + } } else { log.debug("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...", typeName) + startProxy( typeName, settings.role, @@ -417,16 +417,19 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { dataCenter: Option[DataCenter], extractEntityId: ShardRegion.ExtractEntityId, extractShardId: ShardRegion.ExtractShardId): ActorRef = { - // it must be possible to start several proxies, one per data center - proxies.computeIfAbsent(proxyName(typeName, dataCenter), new JFunc[String, ActorRef] { - def apply(name: String): ActorRef = { + + proxies.get(proxyName(typeName, dataCenter)) match { + case null ⇒ + // it's ok to StartProxy several time, the guardian will deduplicate concurrent requests implicit val timeout = system.settings.CreationTimeout val settings = ClusterShardingSettings(system).withRole(role) val startMsg = StartProxy(typeName, dataCenter, settings, extractEntityId, extractShardId) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) + // it must be possible to start several proxies, one per data center + proxies.put(proxyName(typeName, dataCenter), shardRegion) shardRegion - } - }) + case ref ⇒ ref // already started, use cached ActorRef + } } private def proxyName(typeName: String, dataCenter: Option[DataCenter]): String = { diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConcurrentStartupShardingSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConcurrentStartupShardingSpec.scala new file mode 100644 index 0000000000..2ab66fb088 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/ConcurrentStartupShardingSpec.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2017-2018 Lightbend Inc. + */ + +package akka.cluster.sharding + +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.MemberStatus +import akka.testkit.AkkaSpec +import akka.testkit.DeadLettersFilter +import akka.testkit.TestEvent.Mute + +object ConcurrentStartupShardingSpec { + + val config = + """ + akka.actor.provider = "cluster" + akka.remote.netty.tcp.port = 0 + akka.remote.artery.canonical.port = 0 + akka.log-dead-letters = off + akka.log-dead-letters-during-shutdown = off + + akka.actor { + default-dispatcher { + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 5 + parallelism-max = 5 + } + } + } + """ + + object Starter { + def props(n: Int, probe: ActorRef): Props = + Props(new Starter(n, probe)) + } + + class Starter(n: Int, probe: ActorRef) extends Actor { + + override def preStart(): Unit = { + val region = ClusterSharding(context.system).start(s"type-$n", Props.empty, ClusterShardingSettings(context.system), + { case msg ⇒ (msg.toString, msg) }, + _ ⇒ "1") + probe ! region + } + + def receive = { + case _ ⇒ + } + } +} + +class ConcurrentStartupShardingSpec extends AkkaSpec(ConcurrentStartupShardingSpec.config) { + import ConcurrentStartupShardingSpec._ + + // mute logging of deadLetters + if (!log.isDebugEnabled) + system.eventStream.publish(Mute(DeadLettersFilter[Any])) + + // The intended usage is to start sharding in one (or a few) places when the the ActorSystem + // is started and not to do it concurrently from many threads. However, we can do our best and when using + // FJP the Await will create additional threads when needed. + "Concurrent Sharding startup" must { + "init cluster" in { + Cluster(system).join(Cluster(system).selfAddress) + awaitAssert(Cluster(system).selfMember.status should ===(MemberStatus.Up)) + + val total = 20 + (1 to total).foreach { n ⇒ + system.actorOf(Starter.props(n, testActor)) + } + + receiveN(total, 60.seconds) + } + } + +}