Merge pull request #2052 from akka/wip-3900-systemActorOf-∂π
+act #3900 make systemActorOf available to Extensions
This commit is contained in:
commit
96be914979
11 changed files with 22 additions and 18 deletions
|
|
@ -88,7 +88,7 @@ object ActorDSL extends dsl.Inbox with dsl.Creators {
|
|||
|
||||
protected class Extension(val system: ExtendedActorSystem) extends akka.actor.Extension with InboxExtension {
|
||||
|
||||
val boss = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(
|
||||
val boss = system.systemActorOf(Props(
|
||||
new Actor {
|
||||
def receive = { case any ⇒ sender() ! any }
|
||||
}), "dsl").asInstanceOf[RepointableActorRef]
|
||||
|
|
|
|||
|
|
@ -463,6 +463,12 @@ abstract class ExtendedActorSystem extends ActorSystem {
|
|||
*/
|
||||
def systemGuardian: InternalActorRef
|
||||
|
||||
/**
|
||||
* Create an actor in the "/system" namespace. This actor will be shut down
|
||||
* during system shutdown only after all user actors have terminated.
|
||||
*/
|
||||
def systemActorOf(props: Props, name: String): ActorRef
|
||||
|
||||
/**
|
||||
* A ThreadFactory that can be used if the transport needs to create any Threads
|
||||
*/
|
||||
|
|
@ -541,7 +547,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
|||
|
||||
protected def systemImpl: ActorSystemImpl = this
|
||||
|
||||
private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name, systemService = true)
|
||||
def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name, systemService = true)
|
||||
|
||||
def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name, systemService = false)
|
||||
|
||||
|
|
|
|||
|
|
@ -556,7 +556,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
|
|||
*
|
||||
*/
|
||||
val manager: ActorRef = {
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(
|
||||
system.systemActorOf(
|
||||
props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher).withDeploy(Deploy.local),
|
||||
name = "IO-TCP")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -212,7 +212,7 @@ class UdpExt(system: ExtendedActorSystem) extends IO.Extension {
|
|||
val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp"))
|
||||
|
||||
val manager: ActorRef = {
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(
|
||||
system.systemActorOf(
|
||||
props = Props(classOf[UdpManager], this).withDeploy(Deploy.local),
|
||||
name = "IO-UDP-FF")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -149,7 +149,7 @@ class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension {
|
|||
val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-connected"))
|
||||
|
||||
val manager: ActorRef = {
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(
|
||||
system.systemActorOf(
|
||||
props = Props(classOf[UdpConnectedManager], this).withDeploy(Deploy.local),
|
||||
name = "IO-UDP-CONN")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -157,7 +157,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
|
|||
|
||||
// create supervisor for daemons under path "/system/cluster"
|
||||
private val clusterDaemons: ActorRef = {
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(classOf[ClusterDaemon], settings).
|
||||
system.systemActorOf(Props(classOf[ClusterDaemon], settings).
|
||||
withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
|
|||
|
||||
// create actor that subscribes to the cluster eventBus to update current read view state
|
||||
private val eventBusListener: ActorRef = {
|
||||
cluster.system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
|
||||
cluster.system.systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
|
||||
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent])
|
||||
override def postStop(): Unit = cluster.unsubscribe(self)
|
||||
|
||||
|
|
|
|||
|
|
@ -200,7 +200,6 @@ trait ConsumerBehavior {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
class Producer extends Actor with ProducerBehavior {
|
||||
def receive = producerBehavior
|
||||
}
|
||||
|
|
@ -209,10 +208,9 @@ class Consumer extends Actor with ActorLogging with ConsumerBehavior {
|
|||
def receive = consumerBehavior
|
||||
}
|
||||
|
||||
|
||||
class ProducerConsumer extends Actor with ActorLogging
|
||||
with ProducerBehavior with ConsumerBehavior {
|
||||
|
||||
|
||||
def receive = producerBehavior orElse consumerBehavior
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -93,11 +93,11 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
|||
else DefaultPluginDispatcherId
|
||||
}
|
||||
|
||||
private val confirmationBatchLayer = system.asInstanceOf[ActorSystemImpl]
|
||||
.systemActorOf(Props(classOf[DeliveredByChannelBatching], journal, settings), "confirmation-batch-layer")
|
||||
private val confirmationBatchLayer = system.systemActorOf(
|
||||
Props(classOf[DeliveredByChannelBatching], journal, settings), "confirmation-batch-layer")
|
||||
|
||||
private val deletionBatchLayer = system.asInstanceOf[ActorSystemImpl]
|
||||
.systemActorOf(Props(classOf[DeliveredByPersistentChannelBatching], journal, settings), "deletion-batch-layer")
|
||||
private val deletionBatchLayer = system.systemActorOf(
|
||||
Props(classOf[DeliveredByPersistentChannelBatching], journal, settings), "deletion-batch-layer")
|
||||
|
||||
/**
|
||||
* Creates a canonical processor id from a processor actor ref.
|
||||
|
|
@ -145,7 +145,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
|
|||
val pluginClassName = pluginConfig.getString("class")
|
||||
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
|
||||
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
|
||||
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType)
|
||||
system.systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType)
|
||||
}
|
||||
|
||||
private def id(ref: ActorRef) = ref.path.toStringWithoutAddress
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
|
|||
|
||||
import provider.remoteSettings._
|
||||
|
||||
val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf(
|
||||
val transportSupervisor = system.systemActorOf(
|
||||
configureDispatcher(Props[TransportSupervisor]),
|
||||
"transports")
|
||||
|
||||
|
|
@ -159,7 +159,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
|
|||
endpointManager match {
|
||||
case None ⇒
|
||||
log.info("Starting remoting")
|
||||
val manager: ActorRef = system.asInstanceOf[ActorSystemImpl].systemActorOf(
|
||||
val manager: ActorRef = system.systemActorOf(
|
||||
configureDispatcher(Props(classOf[EndpointManager], provider.remoteSettings.config, log)).withDeploy(Deploy.local),
|
||||
Remoting.EndpointManagerName)
|
||||
endpointManager = Some(manager)
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ trait TestKitBase {
|
|||
* registration as message target.
|
||||
*/
|
||||
val testActor: ActorRef = {
|
||||
val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559
|
||||
val impl = system.asInstanceOf[ExtendedActorSystem]
|
||||
val ref = impl.systemActorOf(TestActor.props(queue)
|
||||
.withDispatcher(CallingThreadDispatcher.Id),
|
||||
"testActor" + TestKit.testActorId.incrementAndGet)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue