Future proofing ActorRefProvider

This commit is contained in:
Viktor Klang 2012-05-15 16:26:08 +02:00
parent 6c2bee0533
commit c6d60e1089
2 changed files with 50 additions and 35 deletions

View file

@ -49,8 +49,12 @@ trait ActorRefProvider {
*/
def rootPath: ActorPath
/**
* The Settings associated with this ActorRefProvider
*/
def settings: ActorSystem.Settings
//FIXME WHY IS THIS HERE?
def dispatcher: MessageDispatcher
/**
@ -61,8 +65,12 @@ trait ActorRefProvider {
*/
def init(system: ActorSystemImpl): Unit
/**
* The Deployer associated with this ActorRefProvider
*/
def deployer: Deployer
//FIXME WHY IS THIS HERE?
def scheduler: Scheduler
/**
@ -131,6 +139,7 @@ trait ActorRefProvider {
*/
def terminationFuture: Future[Unit]
//FIXME I PROPOSE TO REMOVE THIS IN 2.1 -
/**
* Obtain the address which is to be used within sender references when
* sending to the given other address or none if the other address cannot be
@ -141,22 +150,33 @@ trait ActorRefProvider {
}
/**
* Interface implemented by ActorSystem and AkkaContext, the only two places
* Interface implemented by ActorSystem and ActorContext, the only two places
* from which you can get fresh actors.
*/
trait ActorRefFactory {
/**
* INTERNAL USE ONLY
*/
protected def systemImpl: ActorSystemImpl
/**
* INTERNAL USE ONLY
*/
protected def provider: ActorRefProvider
/**
* INTERNAL USE ONLY
*/
protected def dispatcher: MessageDispatcher
/**
* Father of all children created by this interface.
*
* INTERNAL USE ONLY
*/
protected def guardian: InternalActorRef
/**
* INTERNAL USE ONLY
*/
protected def lookupRoot: InternalActorRef
/**
@ -276,8 +296,6 @@ trait ActorRefFactory {
def stop(actor: ActorRef): Unit
}
class ActorRefProviderException(message: String) extends AkkaException(message)
/**
* Internal Akka use only, used in implementation of system.actorOf.
*/
@ -298,10 +316,10 @@ private[akka] case class StopChild(child: ActorRef)
*/
class LocalActorRefProvider(
_systemName: String,
val settings: ActorSystem.Settings,
override val settings: ActorSystem.Settings,
val eventStream: EventStream,
val scheduler: Scheduler,
val deployer: Deployer) extends ActorRefProvider {
override val scheduler: Scheduler,
override val deployer: Deployer) extends ActorRefProvider {
// this is the constructor needed for reflectively instantiating the provider
def this(_systemName: String,
@ -315,13 +333,13 @@ class LocalActorRefProvider(
scheduler,
new Deployer(settings, dynamicAccess))
val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))
override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))
val log = Logging(eventStream, "LocalActorRefProvider(" + rootPath.address + ")")
private[akka] val log: LoggingAdapter = Logging(eventStream, "LocalActorRefProvider(" + rootPath.address + ")")
val deadLetters = new DeadLetterActorRef(this, rootPath / "deadLetters", eventStream)
override val deadLetters: InternalActorRef = new DeadLetterActorRef(this, rootPath / "deadLetters", eventStream)
val deathWatch = new LocalDeathWatch(1024) //TODO make configrable
override val deathWatch: DeathWatch = new LocalDeathWatch(1024) //TODO make configrable
/*
* generate name for temporary actor refs
@ -332,7 +350,7 @@ class LocalActorRefProvider(
private val tempNode = rootPath / "temp"
def tempPath() = tempNode / tempName()
override def tempPath() = tempNode / tempName()
/**
* Top-level anchor for the supervision hierarchy of this actor system. Will
@ -348,11 +366,11 @@ class LocalActorRefProvider(
def provider: ActorRefProvider = LocalActorRefProvider.this
override def stop() = stopped switchOn {
override def stop(): Unit = stopped switchOn {
terminationFuture.complete(causeOfTermination.toLeft(()))
}
override def isTerminated = stopped.isOn
override def isTerminated: Boolean = stopped.isOn
override def !(message: Any)(implicit sender: ActorRef = null): Unit = stopped.ifOff(message match {
case Failed(ex) if sender ne null causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop()
@ -371,7 +389,7 @@ class LocalActorRefProvider(
/**
* Overridable supervision strategy to be used by the /user guardian.
*/
protected def guardianSupervisionStrategy = {
protected def guardianSupervisionStrategy: SupervisorStrategy = {
import akka.actor.SupervisorStrategy._
OneForOneStrategy() {
case _: ActorKilledException Stop
@ -387,12 +405,12 @@ class LocalActorRefProvider(
*/
private class Guardian extends Actor {
override val supervisorStrategy = guardianSupervisionStrategy
override val supervisorStrategy: SupervisorStrategy = guardianSupervisionStrategy
def receive = {
case Terminated(_) context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e })
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e })
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure?
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure?
case StopChild(child) context.stop(child); sender ! "ok"
case m deadLetters ! DeadLetter(m, sender, self)
}
@ -404,11 +422,10 @@ class LocalActorRefProvider(
/**
* Overridable supervision strategy to be used by the /system guardian.
*/
protected def systemGuardianSupervisionStrategy = {
protected def systemGuardianSupervisionStrategy: SupervisorStrategy = {
import akka.actor.SupervisorStrategy._
OneForOneStrategy() {
case _: ActorKilledException Stop
case _: ActorInitializationException Stop
case _: ActorKilledException | _: ActorInitializationException Stop
case _: Exception Restart
}
}
@ -420,14 +437,12 @@ class LocalActorRefProvider(
*/
private class SystemGuardian extends Actor {
override val supervisorStrategy = systemGuardianSupervisionStrategy
override val supervisorStrategy: SupervisorStrategy = systemGuardianSupervisionStrategy
def receive = {
case Terminated(_)
eventStream.stopDefaultLoggers()
context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e })
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e })
case Terminated(_) eventStream.stopDefaultLoggers(); context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure?
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure?
case StopChild(child) context.stop(child); sender ! "ok"
case m deadLetters ! DeadLetter(m, sender, self)
}

View file

@ -256,9 +256,9 @@ private[akka] class RemoteActorRef private[akka] (
private def writeReplace(): AnyRef = SerializedActorRef(path)
}
class RemoteDeathWatch(val local: LocalDeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch {
class RemoteDeathWatch(val local: DeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch {
def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match {
override def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match {
case r: RemoteRef
val ret = local.subscribe(watcher, watched)
provider.actorFor(r.path.root / "remote") ! DaemonMsgWatch(watcher, watched)
@ -270,10 +270,10 @@ class RemoteDeathWatch(val local: LocalDeathWatch, val provider: RemoteActorRefP
false
}
def unsubscribe(watcher: ActorRef, watched: ActorRef): Boolean = local.unsubscribe(watcher, watched)
override def unsubscribe(watcher: ActorRef, watched: ActorRef): Boolean = local.unsubscribe(watcher, watched)
def unsubscribe(watcher: ActorRef): Unit = local.unsubscribe(watcher)
override def unsubscribe(watcher: ActorRef): Unit = local.unsubscribe(watcher)
def publish(event: Terminated): Unit = local.publish(event)
override def publish(event: Terminated): Unit = local.publish(event)
}