Merge pull request #17868 from akka/wip-17866-systemActorOf-patriknw

!clt, cls #17866 Use systemActorOf for exension actors
This commit is contained in:
Patrik Nordwall 2015-06-30 16:38:02 +02:00
commit 202e64722c
10 changed files with 65 additions and 20 deletions

View file

@ -10,8 +10,8 @@
# Settings for the ClusterShardingExtension # Settings for the ClusterShardingExtension
akka.cluster.sharding { akka.cluster.sharding {
# The extension creates a top level actor with this name in top level user scope, # The extension creates a top level actor with this name in top level system scope,
# e.g. '/user/sharding' # e.g. '/system/sharding'
guardian-name = sharding guardian-name = sharding
# Specifies that entities runs on cluster nodes with a specific role. # Specifies that entities runs on cluster nodes with a specific role.
@ -82,5 +82,12 @@ akka.cluster.sharding {
# Settings for the coordinator singleton. Same layout as akka.cluster.singleton. # Settings for the coordinator singleton. Same layout as akka.cluster.singleton.
coordinator-singleton = ${akka.cluster.singleton} coordinator-singleton = ${akka.cluster.singleton}
# The id of the dispatcher to use for ClusterSharding actors.
# If not specified default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
# This dispatcher for the entity actors is defined by the user provided
# Props, i.e. this dispatcher is not used for the entity actors.
use-dispatcher = ""
} }
# //#sharding-ext-config # //#sharding-ext-config

View file

@ -23,6 +23,7 @@ import akka.cluster.singleton.ClusterSingletonManager
import akka.persistence.BackoffSupervisor import akka.persistence.BackoffSupervisor
import akka.util.ByteString import akka.util.ByteString
import akka.pattern.ask import akka.pattern.ask
import akka.dispatch.Dispatchers
/** /**
* This extension provides sharding functionality of actors in a cluster. * This extension provides sharding functionality of actors in a cluster.
@ -160,7 +161,11 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
private val regions: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap private val regions: ConcurrentHashMap[String, ActorRef] = new ConcurrentHashMap
private lazy val guardian = { private lazy val guardian = {
val guardianName: String = system.settings.config.getString("akka.cluster.sharding.guardian-name") val guardianName: String = system.settings.config.getString("akka.cluster.sharding.guardian-name")
system.actorOf(Props[ClusterShardingGuardian], guardianName) val dispatcher = system.settings.config.getString("akka.cluster.sharding.use-dispatcher") match {
case "" Dispatchers.DefaultDispatcherId
case id id
}
system.systemActorOf(Props[ClusterShardingGuardian].withDispatcher(dispatcher), guardianName)
} }
private[akka] def requireClusterRole(role: Option[String]): Unit = private[akka] def requireClusterRole(role: Option[String]): Unit =
@ -437,7 +442,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
context.actorOf(ClusterSingletonManager.props( context.actorOf(ClusterSingletonManager.props(
singletonProps, singletonProps,
terminationMessage = PoisonPill, terminationMessage = PoisonPill,
singletonSettings), singletonSettings).withDispatcher(context.props.dispatcher),
name = cName) name = cName)
} }
@ -448,7 +453,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
coordinatorPath = cPath, coordinatorPath = cPath,
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId, extractShardId = extractShardId,
handOffStopMessage = handOffStopMessage), handOffStopMessage = handOffStopMessage).withDispatcher(context.props.dispatcher),
name = encName) name = encName)
} }
sender() ! Started(shardRegion) sender() ! Started(shardRegion)
@ -463,7 +468,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
settings = settings, settings = settings,
coordinatorPath = cPath, coordinatorPath = cPath,
extractEntityId = extractEntityId, extractEntityId = extractEntityId,
extractShardId = extractShardId), extractShardId = extractShardId).withDispatcher(context.props.dispatcher),
name = encName) name = encName)
} }
sender() ! Started(shardRegion) sender() ! Started(shardRegion)

View file

@ -660,7 +660,8 @@ class ShardCoordinator(typeName: String, settings: ClusterShardingSettings,
rebalanceInProgress += shard rebalanceInProgress += shard
log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion) log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout, context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout,
persistentState.regions.keySet ++ persistentState.regionProxies)) persistentState.regions.keySet ++ persistentState.regionProxies)
.withDispatcher(context.props.dispatcher))
case None case None
log.debug("Rebalance of non-existing shard [{}] is ignored", shard) log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
} }

View file

@ -524,7 +524,7 @@ class ShardRegion(
settings, settings,
extractEntityId, extractEntityId,
extractShardId, extractShardId,
handOffStopMessage), handOffStopMessage).withDispatcher(context.props.dispatcher),
name)) name))
shards = shards.updated(id, shard) shards = shards.updated(id, shard)
shardsByRef = shardsByRef.updated(shard, id) shardsByRef = shardsByRef.updated(shard, id)

View file

@ -185,12 +185,12 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar
lastSender.path should be(region.path / "2" / "2") lastSender.path should be(region.path / "2" / "2")
} }
runOn(second) { runOn(second) {
lastSender.path should be(node(first) / "user" / "sharding" / "Entity" / "2" / "2") lastSender.path should be(node(first) / "system" / "sharding" / "Entity" / "2" / "2")
} }
enterBarrier("second-started") enterBarrier("second-started")
runOn(first) { runOn(first) {
system.actorSelection(node(second) / "user" / "sharding" / "Entity") ! Identify(None) system.actorSelection(node(second) / "system" / "sharding" / "Entity") ! Identify(None)
val secondRegion = expectMsgType[ActorIdentity].ref.get val secondRegion = expectMsgType[ActorIdentity].ref.get
allocator ! UseRegion(secondRegion) allocator ! UseRegion(secondRegion)
expectMsg(UseRegionAck) expectMsg(UseRegionAck)
@ -203,7 +203,7 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar
lastSender.path should be(region.path / "3" / "3") lastSender.path should be(region.path / "3" / "3")
} }
runOn(first) { runOn(first) {
lastSender.path should be(node(second) / "user" / "sharding" / "Entity" / "3" / "3") lastSender.path should be(node(second) / "system" / "sharding" / "Entity" / "3" / "3")
} }
enterBarrier("after-2") enterBarrier("after-2")
@ -218,7 +218,7 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar
val p = TestProbe() val p = TestProbe()
region.tell(2, p.ref) region.tell(2, p.ref)
p.expectMsg(2.second, 2) p.expectMsg(2.second, 2)
p.lastSender.path should be(node(second) / "user" / "sharding" / "Entity" / "2" / "2") p.lastSender.path should be(node(second) / "system" / "sharding" / "Entity" / "2" / "2")
} }
region ! 1 region ! 1

View file

@ -8,7 +8,7 @@
# //#pub-sub-ext-config # //#pub-sub-ext-config
# Settings for the DistributedPubSub extension # Settings for the DistributedPubSub extension
akka.cluster.pub-sub { akka.cluster.pub-sub {
# Actor name of the mediator actor, /user/distributedPubSubMediator # Actor name of the mediator actor, /system/distributedPubSubMediator
name = distributedPubSubMediator name = distributedPubSubMediator
# Start the mediator on members tagged with this role. # Start the mediator on members tagged with this role.
@ -28,6 +28,11 @@ akka.cluster.pub-sub {
# Maximum number of elements to transfer in one message when synchronizing the registries. # Maximum number of elements to transfer in one message when synchronizing the registries.
# Next chunk will be transferred in next round of gossip. # Next chunk will be transferred in next round of gossip.
max-delta-elements = 3000 max-delta-elements = 3000
# The id of the dispatcher to use for DistributedPubSubMediator actors.
# If not specified default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = ""
} }
# //#pub-sub-ext-config # //#pub-sub-ext-config
@ -49,7 +54,7 @@ akka.actor {
# //#receptionist-ext-config # //#receptionist-ext-config
# Settings for the ClusterClientReceptionist extension # Settings for the ClusterClientReceptionist extension
akka.cluster.client.receptionist { akka.cluster.client.receptionist {
# Actor name of the ClusterReceptionist actor, /user/receptionist # Actor name of the ClusterReceptionist actor, /system/receptionist
name = receptionist name = receptionist
# Start the receptionist on members tagged with this role. # Start the receptionist on members tagged with this role.
@ -62,6 +67,11 @@ akka.cluster.client.receptionist {
# The actor that tunnel response messages to the client will be stopped # The actor that tunnel response messages to the client will be stopped
# after this time of inactivity. # after this time of inactivity.
response-tunnel-receive-timeout = 30s response-tunnel-receive-timeout = 30s
# The id of the dispatcher to use for ClusterReceptionist actors.
# If not specified default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher = ""
} }
# //#receptionist-ext-config # //#receptionist-ext-config
@ -71,7 +81,7 @@ akka.cluster.client {
# that the client will try to contact initially. It is mandatory to specify # that the client will try to contact initially. It is mandatory to specify
# at least one initial contact. # at least one initial contact.
# Comma separated full actor paths defined by a string on the form of # Comma separated full actor paths defined by a string on the form of
# "akka.tcp://system@hostname:port/user/receptionist" # "akka.tcp://system@hostname:port/system/receptionist"
initial-contacts = [] initial-contacts = []
# Interval at which the client retries to establish contact with one of # Interval at which the client retries to establish contact with one of

View file

@ -36,6 +36,7 @@ import akka.routing.MurmurHash
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.actor.DeadLetterSuppression import akka.actor.DeadLetterSuppression
import akka.remote.DeadlineFailureDetector import akka.remote.DeadlineFailureDetector
import akka.dispatch.Dispatchers
object ClusterClientSettings { object ClusterClientSettings {
/** /**
@ -77,6 +78,9 @@ object ClusterClientSettings {
/** /**
* @param initialContacts Actor paths of the `ClusterReceptionist` actors on * @param initialContacts Actor paths of the `ClusterReceptionist` actors on
* the servers (cluster nodes) that the client will try to contact initially. * the servers (cluster nodes) that the client will try to contact initially.
* It is mandatory to specify at least one initial contact. The path of the
* default receptionist is
* "akka.tcp://system@hostname:port/system/receptionist"
* @param establishingGetContactsInterval Interval at which the client retries * @param establishingGetContactsInterval Interval at which the client retries
* to establish contact with one of ClusterReceptionist on the servers (cluster nodes) * to establish contact with one of ClusterReceptionist on the servers (cluster nodes)
* @param refreshContactsInterval Interval at which the client will ask the * @param refreshContactsInterval Interval at which the client will ask the
@ -391,9 +395,14 @@ final class ClusterClientReceptionist(system: ExtendedActorSystem) extends Exten
system.deadLetters system.deadLetters
else { else {
val name = config.getString("name") val name = config.getString("name")
val dispatcher = config.getString("use-dispatcher") match {
case "" Dispatchers.DefaultDispatcherId
case id id
}
// important to use val mediator here to activate it outside of ClusterReceptionist constructor // important to use val mediator here to activate it outside of ClusterReceptionist constructor
val mediator = pubSubMediator val mediator = pubSubMediator
system.actorOf(ClusterReceptionist.props(mediator, ClusterReceptionistSettings(config)), name) system.systemActorOf(ClusterReceptionist.props(mediator, ClusterReceptionistSettings(config))
.withDispatcher(dispatcher), name)
} }
} }
} }

View file

@ -40,6 +40,7 @@ import scala.collection.immutable.TreeMap
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.actor.NoSerializationVerificationNeeded import akka.actor.NoSerializationVerificationNeeded
import akka.actor.Deploy import akka.actor.Deploy
import akka.dispatch.Dispatchers
object DistributedPubSubSettings { object DistributedPubSubSettings {
/** /**
@ -739,7 +740,11 @@ class DistributedPubSub(system: ExtendedActorSystem) extends Extension {
system.deadLetters system.deadLetters
else { else {
val name = system.settings.config.getString("akka.cluster.pub-sub.name") val name = system.settings.config.getString("akka.cluster.pub-sub.name")
system.actorOf(DistributedPubSubMediator.props(settings), name) val dispatcher = system.settings.config.getString("akka.cluster.pub-sub.use-dispatcher") match {
case "" Dispatchers.DefaultDispatcherId
case id id
}
system.systemActorOf(DistributedPubSubMediator.props(settings).withDispatcher(dispatcher), name)
} }
} }
} }

View file

@ -7,6 +7,7 @@ import language.postfixOps
import scala.concurrent.duration._ import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorPath
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.Props import akka.actor.Props
import akka.cluster.Cluster import akka.cluster.Cluster
@ -87,7 +88,7 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
def roleName(addr: Address): Option[RoleName] = remainingServerRoleNames.find(node(_).address == addr) def roleName(addr: Address): Option[RoleName] = remainingServerRoleNames.find(node(_).address == addr)
def initialContacts = (remainingServerRoleNames - first - fourth).map { r def initialContacts = (remainingServerRoleNames - first - fourth).map { r
node(r) / "user" / "receptionist" node(r) / "system" / "receptionist"
} }
"A ClusterClient" must { "A ClusterClient" must {
@ -162,8 +163,10 @@ class ClusterClientSpec extends MultiNodeSpec(ClusterClientSpec) with STMultiNod
lazy val docOnly = { //not used, only demo lazy val docOnly = { //not used, only demo
//#initialContacts //#initialContacts
val initialContacts = Set( val initialContacts = Set(
system.actorSelection("akka.tcp://OtherSys@host1:2552/user/receptionist"), ActorPath.fromString("akka.tcp://OtherSys@host1:2552/system/receptionist"),
system.actorSelection("akka.tcp://OtherSys@host2:2552/user/receptionist")) ActorPath.fromString("akka.tcp://OtherSys@host2:2552/system/receptionist"))
val settings = ClusterClientSettings(system)
.withInitialContacts(initialContacts)
//#initialContacts //#initialContacts
} }

View file

@ -282,6 +282,11 @@ The parameters of the ``Props`` factory methods in the ``ClusterReceptionist`` c
has been moved to settings object ``ClusterReceptionistSettings``. This can be created from has been moved to settings object ``ClusterReceptionistSettings``. This can be created from
system configuration properties and also amended with API as needed. system configuration properties and also amended with API as needed.
The ``ClusterReceptionist`` actor that is started by the ``ClusterReceptionistExtension``
is now started as a ``system`` actor instead of a ``user`` actor, i.e. the default path for
the ``ClusterClient`` initial contacts has changed to
``"akka.tcp://system@hostname:port/system/receptionist"``.
Asynchronous ShardAllocationStrategy Asynchronous ShardAllocationStrategy
==================================== ====================================