Revert "Avoid ask to sharding guardian if region already cached", #25683

* revert a7656ab8c4a93a3fe8af7a22690cb33f7a280af5
* test for reproducing the issue with computeIfAbsent
* use non-blocking get as optimization for avoiding ask
This commit is contained in:
Patrik Nordwall 2018-09-26 08:39:29 +02:00
parent 4b7f05b280
commit fd0153a090
2 changed files with 98 additions and 12 deletions

View file

@ -7,7 +7,6 @@ package akka.cluster.sharding
import java.net.URLEncoder
import java.util.Optional
import java.util.concurrent.ConcurrentHashMap
import java.util.function.{ Function JFunc }
import scala.collection.JavaConverters._
import scala.collection.immutable
@ -158,7 +157,6 @@ object ClusterSharding extends ExtensionId[ClusterSharding] with ExtensionIdProv
* @see [[ClusterSharding$ ClusterSharding companion object]]
*/
class ClusterSharding(system: ExtendedActorSystem) extends Extension {
import ClusterShardingGuardian._
import ShardCoordinator.LeastShardAllocationStrategy
import ShardCoordinator.ShardAllocationStrategy
@ -231,18 +229,20 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
handOffStopMessage: Any): ActorRef = {
if (settings.shouldHostShard(cluster)) {
regions.computeIfAbsent(typeName, new JFunc[String, ActorRef] {
def apply(typeName: String): ActorRef = {
regions.get(typeName) match {
case null
// it's ok to Start several time, the guardian will deduplicate concurrent requests
implicit val timeout = system.settings.CreationTimeout
val startMsg = Start(typeName, entityProps, settings,
extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
regions.put(typeName, shardRegion)
shardRegion
}
})
case ref ref // already started, use cached ActorRef
}
} else {
log.debug("Starting Shard Region Proxy [{}] (no actors will be hosted on this node)...", typeName)
startProxy(
typeName,
settings.role,
@ -417,16 +417,19 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
dataCenter: Option[DataCenter],
extractEntityId: ShardRegion.ExtractEntityId,
extractShardId: ShardRegion.ExtractShardId): ActorRef = {
// it must be possible to start several proxies, one per data center
proxies.computeIfAbsent(proxyName(typeName, dataCenter), new JFunc[String, ActorRef] {
def apply(name: String): ActorRef = {
proxies.get(proxyName(typeName, dataCenter)) match {
case null
// it's ok to StartProxy several time, the guardian will deduplicate concurrent requests
implicit val timeout = system.settings.CreationTimeout
val settings = ClusterShardingSettings(system).withRole(role)
val startMsg = StartProxy(typeName, dataCenter, settings, extractEntityId, extractShardId)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
// it must be possible to start several proxies, one per data center
proxies.put(proxyName(typeName, dataCenter), shardRegion)
shardRegion
}
})
case ref ref // already started, use cached ActorRef
}
}
private def proxyName(typeName: String, dataCenter: Option[DataCenter]): String = {

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2017-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.sharding
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.MemberStatus
import akka.testkit.AkkaSpec
import akka.testkit.DeadLettersFilter
import akka.testkit.TestEvent.Mute
object ConcurrentStartupShardingSpec {
val config =
"""
akka.actor.provider = "cluster"
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.actor {
default-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 5
parallelism-max = 5
}
}
}
"""
object Starter {
def props(n: Int, probe: ActorRef): Props =
Props(new Starter(n, probe))
}
class Starter(n: Int, probe: ActorRef) extends Actor {
override def preStart(): Unit = {
val region = ClusterSharding(context.system).start(s"type-$n", Props.empty, ClusterShardingSettings(context.system),
{ case msg (msg.toString, msg) },
_ "1")
probe ! region
}
def receive = {
case _
}
}
}
class ConcurrentStartupShardingSpec extends AkkaSpec(ConcurrentStartupShardingSpec.config) {
import ConcurrentStartupShardingSpec._
// mute logging of deadLetters
if (!log.isDebugEnabled)
system.eventStream.publish(Mute(DeadLettersFilter[Any]))
// The intended usage is to start sharding in one (or a few) places when the the ActorSystem
// is started and not to do it concurrently from many threads. However, we can do our best and when using
// FJP the Await will create additional threads when needed.
"Concurrent Sharding startup" must {
"init cluster" in {
Cluster(system).join(Cluster(system).selfAddress)
awaitAssert(Cluster(system).selfMember.status should ===(MemberStatus.Up))
val total = 20
(1 to total).foreach { n
system.actorOf(Starter.props(n, testActor))
}
receiveN(total, 60.seconds)
}
}
}