From 2832dd55c5fc4099b093fdae776d32cb3a03e73a Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 30 Jun 2015 09:28:30 +0200 Subject: [PATCH] !clt, cls #17866 Use systemActorOf for exension actors * ClusterSharding * ClusterClientReceptionist * dispatcher config, since deployment config can't be used for system actors --- .../src/main/resources/reference.conf | 11 +++++++++-- .../akka/cluster/sharding/ClusterSharding.scala | 13 +++++++++---- .../akka/cluster/sharding/ShardCoordinator.scala | 3 ++- .../akka/cluster/sharding/ShardRegion.scala | 2 +- ...lusterShardingCustomShardAllocationSpec.scala | 8 ++++---- .../src/main/resources/reference.conf | 16 +++++++++++++--- .../akka/cluster/client/ClusterClient.scala | 11 ++++++++++- .../pubsub/DistributedPubSubMediator.scala | 7 ++++++- .../akka/cluster/client/ClusterClientSpec.scala | 9 ++++++--- .../rst/project/migration-guide-2.3.x-2.4.x.rst | 5 +++++ 10 files changed, 65 insertions(+), 20 deletions(-) diff --git a/akka-cluster-sharding/src/main/resources/reference.conf b/akka-cluster-sharding/src/main/resources/reference.conf index e11b6f0c2d..79a35c52ba 100644 --- a/akka-cluster-sharding/src/main/resources/reference.conf +++ b/akka-cluster-sharding/src/main/resources/reference.conf @@ -10,8 +10,8 @@ # Settings for the ClusterShardingExtension akka.cluster.sharding { - # The extension creates a top level actor with this name in top level user scope, - # e.g. '/user/sharding' + # The extension creates a top level actor with this name in top level system scope, + # e.g. '/system/sharding' guardian-name = sharding # Specifies that entities runs on cluster nodes with a specific role. @@ -82,5 +82,12 @@ akka.cluster.sharding { # Settings for the coordinator singleton. Same layout as akka.cluster.singleton. coordinator-singleton = ${akka.cluster.singleton} + + # The id of the dispatcher to use for ClusterSharding actors. + # If not specified default dispatcher is used. + # If specified you need to define the settings of the actual dispatcher. + # This dispatcher for the entity actors is defined by the user provided + # Props, i.e. this dispatcher is not used for the entity actors. + use-dispatcher = "" } # //#sharding-ext-config diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index 429a977744..390c327062 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -23,6 +23,7 @@ import akka.cluster.singleton.ClusterSingletonManager import akka.persistence.BackoffSupervisor import akka.util.ByteString import akka.pattern.ask +import akka.dispatch.Dispatchers /** * This extension provides sharding functionality of actors in a cluster. @@ -160,7 +161,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { private val regions: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap private lazy val guardian = { val guardianName: String = system.settings.config.getString("akka.cluster.sharding.guardian-name") - system.actorOf(Props[ClusterShardingGuardian], guardianName) + val dispatcher = system.settings.config.getString("akka.cluster.sharding.use-dispatcher") match { + case "" ⇒ Dispatchers.DefaultDispatcherId + case id ⇒ id + } + system.systemActorOf(Props[ClusterShardingGuardian].withDispatcher(dispatcher), guardianName) } private[akka] def requireClusterRole(role: Option[String]): Unit = @@ -437,7 +442,7 @@ private[akka] class ClusterShardingGuardian extends Actor { context.actorOf(ClusterSingletonManager.props( singletonProps, terminationMessage = PoisonPill, - singletonSettings), + singletonSettings).withDispatcher(context.props.dispatcher), name = cName) } @@ -448,7 +453,7 @@ private[akka] class ClusterShardingGuardian extends Actor { coordinatorPath = cPath, extractEntityId = extractEntityId, extractShardId = extractShardId, - handOffStopMessage = handOffStopMessage), + handOffStopMessage = handOffStopMessage).withDispatcher(context.props.dispatcher), name = encName) } sender() ! Started(shardRegion) @@ -463,7 +468,7 @@ private[akka] class ClusterShardingGuardian extends Actor { settings = settings, coordinatorPath = cPath, extractEntityId = extractEntityId, - extractShardId = extractShardId), + extractShardId = extractShardId).withDispatcher(context.props.dispatcher), name = encName) } sender() ! Started(shardRegion) diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 9a90b99b8d..7867a8c621 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -660,7 +660,8 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings, rebalanceInProgress += shard log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, - persistentState.regions.keySet ++ persistentState.regionProxies)) + persistentState.regions.keySet ++ persistentState.regionProxies) + .withDispatcher(context.props.dispatcher)) case None ⇒ log.debug("Rebalance of non-existing shard [{}] is ignored", shard) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index a4bc32f314..0ad8718cf7 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -524,7 +524,7 @@ class ShardRegion( settings, extractEntityId, extractShardId, - handOffStopMessage), + handOffStopMessage).withDispatcher(context.props.dispatcher), name)) shards = shards.updated(id, shard) shardsByRef = shardsByRef.updated(shard, id) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index 92c992183c..1c45ad0698 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -185,12 +185,12 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar lastSender.path should be(region.path / "2" / "2") } runOn(second) { - lastSender.path should be(node(first) / "user" / "sharding" / "Entity" / "2" / "2") + lastSender.path should be(node(first) / "system" / "sharding" / "Entity" / "2" / "2") } enterBarrier("second-started") runOn(first) { - system.actorSelection(node(second) / "user" / "sharding" / "Entity") ! Identify(None) + system.actorSelection(node(second) / "system" / "sharding" / "Entity") ! Identify(None) val secondRegion = expectMsgType[ActorIdentity].ref.get allocator ! UseRegion(secondRegion) expectMsg(UseRegionAck) @@ -203,7 +203,7 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar lastSender.path should be(region.path / "3" / "3") } runOn(first) { - lastSender.path should be(node(second) / "user" / "sharding" / "Entity" / "3" / "3") + lastSender.path should be(node(second) / "system" / "sharding" / "Entity" / "3" / "3") } enterBarrier("after-2") @@ -218,7 +218,7 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar val p = TestProbe() region.tell(2, p.ref) p.expectMsg(2.second, 2) - p.lastSender.path should be(node(second) / "user" / "sharding" / "Entity" / "2" / "2") + p.lastSender.path should be(node(second) / "system" / "sharding" / "Entity" / "2" / "2") } region ! 1 diff --git a/akka-cluster-tools/src/main/resources/reference.conf b/akka-cluster-tools/src/main/resources/reference.conf index 6b8a34cf14..99e1509a8d 100644 --- a/akka-cluster-tools/src/main/resources/reference.conf +++ b/akka-cluster-tools/src/main/resources/reference.conf @@ -8,7 +8,7 @@ # //#pub-sub-ext-config # Settings for the DistributedPubSub extension akka.cluster.pub-sub { - # Actor name of the mediator actor, /user/distributedPubSubMediator + # Actor name of the mediator actor, /system/distributedPubSubMediator name = distributedPubSubMediator # Start the mediator on members tagged with this role. @@ -28,6 +28,11 @@ akka.cluster.pub-sub { # Maximum number of elements to transfer in one message when synchronizing the registries. # Next chunk will be transferred in next round of gossip. max-delta-elements = 3000 + + # The id of the dispatcher to use for DistributedPubSubMediator actors. + # If not specified default dispatcher is used. + # If specified you need to define the settings of the actual dispatcher. + use-dispatcher = "" } # //#pub-sub-ext-config @@ -49,7 +54,7 @@ akka.actor { # //#receptionist-ext-config # Settings for the ClusterClientReceptionist extension akka.cluster.client.receptionist { - # Actor name of the ClusterReceptionist actor, /user/receptionist + # Actor name of the ClusterReceptionist actor, /system/receptionist name = receptionist # Start the receptionist on members tagged with this role. @@ -62,6 +67,11 @@ akka.cluster.client.receptionist { # The actor that tunnel response messages to the client will be stopped # after this time of inactivity. response-tunnel-receive-timeout = 30s + + # The id of the dispatcher to use for ClusterReceptionist actors. + # If not specified default dispatcher is used. + # If specified you need to define the settings of the actual dispatcher. + use-dispatcher = "" } # //#receptionist-ext-config @@ -71,7 +81,7 @@ akka.cluster.client { # that the client will try to contact initially. It is mandatory to specify # at least one initial contact. # Comma separated full actor paths defined by a string on the form of - # "akka.tcp://system@hostname:port/user/receptionist" + # "akka.tcp://system@hostname:port/system/receptionist" initial-contacts = [] # Interval at which the client retries to establish contact with one of diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala index 0f8bb673d3..30818975cd 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/client/ClusterClient.scala @@ -36,6 +36,7 @@ import akka.routing.MurmurHash import com.typesafe.config.Config import akka.actor.DeadLetterSuppression import akka.remote.DeadlineFailureDetector +import akka.dispatch.Dispatchers object ClusterClientSettings { /** @@ -77,6 +78,9 @@ object ClusterClientSettings { /** * @param initialContacts Actor paths of the `ClusterReceptionist` actors on * the servers (cluster nodes) that the client will try to contact initially. + * It is mandatory to specify at least one initial contact. The path of the + * default receptionist is + * "akka.tcp://system@hostname:port/system/receptionist" * @param establishingGetContactsInterval Interval at which the client retries * to establish contact with one of ClusterReceptionist on the servers (cluster nodes) * @param refreshContactsInterval Interval at which the client will ask the @@ -391,9 +395,14 @@ final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Exten system.deadLetters else { val name = config.getString("name") + val dispatcher = config.getString("use-dispatcher") match { + case "" ⇒ Dispatchers.DefaultDispatcherId + case id ⇒ id + } // important to use val mediator here to activate it outside of ClusterReceptionist constructor val mediator = pubSubMediator - system.actorOf(ClusterReceptionist.props(mediator, ClusterReceptionistSettings(config)), name) + system.systemActorOf(ClusterReceptionist.props(mediator, ClusterReceptionistSettings(config)) + .withDispatcher(dispatcher), name) } } } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala index 86790a14dc..f3608339dd 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/DistributedPubSubMediator.scala @@ -40,6 +40,7 @@ import scala.collection.immutable.TreeMap import com.typesafe.config.Config import akka.actor.NoSerializationVerificationNeeded import akka.actor.Deploy +import akka.dispatch.Dispatchers object DistributedPubSubSettings { /** @@ -739,7 +740,11 @@ class DistributedPubSub(system: ExtendedActorSystem) extends Extension { system.deadLetters else { val name = system.settings.config.getString("akka.cluster.pub-sub.name") - system.actorOf(DistributedPubSubMediator.props(settings), name) + val dispatcher = system.settings.config.getString("akka.cluster.pub-sub.use-dispatcher") match { + case "" ⇒ Dispatchers.DefaultDispatcherId + case id ⇒ id + } + system.systemActorOf(DistributedPubSubMediator.props(settings).withDispatcher(dispatcher), name) } } } diff --git a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala index 74d362d8be..2a4fcfa447 100644 --- a/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala +++ b/akka-cluster-tools/src/multi-jvm/scala/akka/cluster/client/ClusterClientSpec.scala @@ -7,6 +7,7 @@ import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor.Actor +import akka.actor.ActorPath import akka.actor.ActorRef import akka.actor.Props import akka.cluster.Cluster @@ -87,7 +88,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod def roleName(addr: Address): Option[RoleName] = remainingServerRoleNames.find(node(_).address == addr) def initialContacts = (remainingServerRoleNames - first - fourth).map { r ⇒ - node(r) / "user" / "receptionist" + node(r) / "system" / "receptionist" } "A ClusterClient" must { @@ -162,8 +163,10 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod lazy val docOnly = { //not used, only demo //#initialContacts val initialContacts = Set( - system.actorSelection("akka.tcp://OtherSys@host1:2552/user/receptionist"), - system.actorSelection("akka.tcp://OtherSys@host2:2552/user/receptionist")) + ActorPath.fromString("akka.tcp://OtherSys@host1:2552/system/receptionist"), + ActorPath.fromString("akka.tcp://OtherSys@host2:2552/system/receptionist")) + val settings = ClusterClientSettings(system) + .withInitialContacts(initialContacts) //#initialContacts } diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index cb8bfe0170..003a2bbbe9 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -282,6 +282,11 @@ The parameters of the ``Props`` factory methods in the ``ClusterReceptionist`` c has been moved to settings object ``ClusterReceptionistSettings``. This can be created from system configuration properties and also amended with API as needed. +The ``ClusterReceptionist`` actor that is started by the ``ClusterReceptionistExtension`` +is now started as a ``system`` actor instead of a ``user`` actor, i.e. the default path for +the ``ClusterClient`` initial contacts has changed to +``"akka.tcp://system@hostname:port/system/receptionist"``. + Asynchronous ShardAllocationStrategy ====================================