+act #3900 make systemActorOf available to Extensions

This commit is contained in:
Roland Kuhn 2014-03-03 12:00:25 +01:00
parent 8396e923cf
commit b5eeb08fde
11 changed files with 22 additions and 18 deletions

View file

@ -88,7 +88,7 @@ object ActorDSL extends dsl.Inbox with dsl.Creators {
protected class Extension(val system: ExtendedActorSystem) extends akka.actor.Extension with InboxExtension { protected class Extension(val system: ExtendedActorSystem) extends akka.actor.Extension with InboxExtension {
val boss = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props( val boss = system.systemActorOf(Props(
new Actor { new Actor {
def receive = { case any sender() ! any } def receive = { case any sender() ! any }
}), "dsl").asInstanceOf[RepointableActorRef] }), "dsl").asInstanceOf[RepointableActorRef]

View file

@ -463,6 +463,12 @@ abstract class ExtendedActorSystem extends ActorSystem {
*/ */
def systemGuardian: InternalActorRef def systemGuardian: InternalActorRef
/**
* Create an actor in the "/system" namespace. This actor will be shut down
* during system shutdown only after all user actors have terminated.
*/
def systemActorOf(props: Props, name: String): ActorRef
/** /**
* A ThreadFactory that can be used if the transport needs to create any Threads * A ThreadFactory that can be used if the transport needs to create any Threads
*/ */
@ -541,7 +547,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
protected def systemImpl: ActorSystemImpl = this protected def systemImpl: ActorSystemImpl = this
private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name, systemService = true) def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name, systemService = true)
def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name, systemService = false) def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name, systemService = false)

View file

@ -556,7 +556,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
* *
*/ */
val manager: ActorRef = { val manager: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf( system.systemActorOf(
props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher).withDeploy(Deploy.local), props = Props(classOf[TcpManager], this).withDispatcher(Settings.ManagementDispatcher).withDeploy(Deploy.local),
name = "IO-TCP") name = "IO-TCP")
} }

View file

@ -212,7 +212,7 @@ class UdpExt(system: ExtendedActorSystem) extends IO.Extension {
val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp")) val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp"))
val manager: ActorRef = { val manager: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf( system.systemActorOf(
props = Props(classOf[UdpManager], this).withDeploy(Deploy.local), props = Props(classOf[UdpManager], this).withDeploy(Deploy.local),
name = "IO-UDP-FF") name = "IO-UDP-FF")
} }

View file

@ -149,7 +149,7 @@ class UdpConnectedExt(system: ExtendedActorSystem) extends IO.Extension {
val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-connected")) val settings: UdpSettings = new UdpSettings(system.settings.config.getConfig("akka.io.udp-connected"))
val manager: ActorRef = { val manager: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf( system.systemActorOf(
props = Props(classOf[UdpConnectedManager], this).withDeploy(Deploy.local), props = Props(classOf[UdpConnectedManager], this).withDeploy(Deploy.local),
name = "IO-UDP-CONN") name = "IO-UDP-CONN")
} }

View file

@ -157,7 +157,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
// create supervisor for daemons under path "/system/cluster" // create supervisor for daemons under path "/system/cluster"
private val clusterDaemons: ActorRef = { private val clusterDaemons: ActorRef = {
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(classOf[ClusterDaemon], settings). system.systemActorOf(Props(classOf[ClusterDaemon], settings).
withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster") withDispatcher(UseDispatcher).withDeploy(Deploy.local), name = "cluster")
} }

View file

@ -45,7 +45,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
// create actor that subscribes to the cluster eventBus to update current read view state // create actor that subscribes to the cluster eventBus to update current read view state
private val eventBusListener: ActorRef = { private val eventBusListener: ActorRef = {
cluster.system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { cluster.system.systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] {
override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent])
override def postStop(): Unit = cluster.unsubscribe(self) override def postStop(): Unit = cluster.unsubscribe(self)

View file

@ -217,7 +217,6 @@ trait ConsumerBehavior {
} }
} }
class Producer extends Actor with ProducerBehavior { class Producer extends Actor with ProducerBehavior {
def receive = producerBehavior def receive = producerBehavior
} }
@ -226,10 +225,9 @@ class Consumer extends Actor with ActorLogging with ConsumerBehavior {
def receive = consumerBehavior def receive = consumerBehavior
} }
class ProducerConsumer extends Actor with ActorLogging class ProducerConsumer extends Actor with ActorLogging
with ProducerBehavior with ConsumerBehavior { with ProducerBehavior with ConsumerBehavior {
def receive = producerBehavior orElse consumerBehavior def receive = producerBehavior orElse consumerBehavior
} }

View file

@ -93,11 +93,11 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
else DefaultPluginDispatcherId else DefaultPluginDispatcherId
} }
private val confirmationBatchLayer = system.asInstanceOf[ActorSystemImpl] private val confirmationBatchLayer = system.systemActorOf(
.systemActorOf(Props(classOf[DeliveredByChannelBatching], journal, settings), "confirmation-batch-layer") Props(classOf[DeliveredByChannelBatching], journal, settings), "confirmation-batch-layer")
private val deletionBatchLayer = system.asInstanceOf[ActorSystemImpl] private val deletionBatchLayer = system.systemActorOf(
.systemActorOf(Props(classOf[DeliveredByPersistentChannelBatching], journal, settings), "deletion-batch-layer") Props(classOf[DeliveredByPersistentChannelBatching], journal, settings), "deletion-batch-layer")
/** /**
* Creates a canonical processor id from a processor actor ref. * Creates a canonical processor id from a processor actor ref.
@ -145,7 +145,7 @@ class Persistence(val system: ExtendedActorSystem) extends Extension {
val pluginClassName = pluginConfig.getString("class") val pluginClassName = pluginConfig.getString("class")
val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get
val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass) val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass)
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType) system.systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType)
} }
private def id(ref: ActorRef) = ref.path.toStringWithoutAddress private def id(ref: ActorRef) = ref.path.toStringWithoutAddress

View file

@ -114,7 +114,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
import provider.remoteSettings._ import provider.remoteSettings._
val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf( val transportSupervisor = system.systemActorOf(
configureDispatcher(Props[TransportSupervisor]), configureDispatcher(Props[TransportSupervisor]),
"transports") "transports")
@ -159,7 +159,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
endpointManager match { endpointManager match {
case None case None
log.info("Starting remoting") log.info("Starting remoting")
val manager: ActorRef = system.asInstanceOf[ActorSystemImpl].systemActorOf( val manager: ActorRef = system.systemActorOf(
configureDispatcher(Props(classOf[EndpointManager], provider.remoteSettings.config, log)).withDeploy(Deploy.local), configureDispatcher(Props(classOf[EndpointManager], provider.remoteSettings.config, log)).withDeploy(Deploy.local),
Remoting.EndpointManagerName) Remoting.EndpointManagerName)
endpointManager = Some(manager) endpointManager = Some(manager)

View file

@ -120,7 +120,7 @@ trait TestKitBase {
* registration as message target. * registration as message target.
*/ */
val testActor: ActorRef = { val testActor: ActorRef = {
val impl = system.asInstanceOf[ActorSystemImpl] //TODO ticket #1559 val impl = system.asInstanceOf[ExtendedActorSystem]
val ref = impl.systemActorOf(TestActor.props(queue) val ref = impl.systemActorOf(TestActor.props(queue)
.withDispatcher(CallingThreadDispatcher.Id), .withDispatcher(CallingThreadDispatcher.Id),
"testActor" + TestKit.testActorId.incrementAndGet) "testActor" + TestKit.testActorId.incrementAndGet)