start clean-up of ActorSystem structure vs. initialization

- move guardians, dead letters and deathWatch info provider
- move scheduler into ActorSystem
- keep forwarding methods in the first step to let the tests run
This commit is contained in:
Roland 2011-11-14 14:21:53 +01:00
parent c3521a799d
commit fc4598d711
5 changed files with 123 additions and 126 deletions

View file

@ -12,6 +12,7 @@ import akka.serialization.Serialization
import java.net.InetSocketAddress
import akka.remote.RemoteAddress
import java.util.concurrent.TimeUnit
import akka.event.EventStream
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -384,25 +385,22 @@ object DeadLetterActorRef {
val serialized = new SerializedDeadLetterActorRef
}
class DeadLetterActorRef(val app: ActorSystem) extends MinimalActorRef {
val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(app.dispatcher)
class DeadLetterActorRef(val eventStream: EventStream, val path: ActorPath, val dispatcher: MessageDispatcher) extends MinimalActorRef {
val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(dispatcher)
override val name: String = "dead-letter"
// FIXME (actor path): put this under the sys guardian supervisor
val path: ActorPath = app.root / "sys" / name
def address: String = app.address + path.toString
def address: String = path.toString
override def isShutdown(): Boolean = true
override def tell(msg: Any, sender: ActorRef): Unit = msg match {
case d: DeadLetter app.eventStream.publish(d)
case _ app.eventStream.publish(DeadLetter(msg, sender, this))
case d: DeadLetter eventStream.publish(d)
case _ eventStream.publish(DeadLetter(msg, sender, this))
}
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
app.eventStream.publish(DeadLetter(message, app.provider.dummyAskSender, this))
eventStream.publish(DeadLetter(message, this, this))
brokenPromise
}

View file

@ -5,17 +5,13 @@
package akka.actor
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{ TimeUnit, Executors }
import java.util.concurrent.{ ConcurrentHashMap, TimeUnit }
import scala.annotation.tailrec
import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer }
import akka.actor.Timeout.intToTimeout
import akka.config.ConfigurationException
import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise }
import akka.event.{ Logging, DeathWatch, ActorClassification }
import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise, Dispatcher, Mailbox, Envelope }
import akka.event.{ Logging, DeathWatch, ActorClassification, EventStream }
import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter }
import akka.util.Helpers
import akka.AkkaException
@ -29,6 +25,16 @@ trait ActorRefProvider {
def actorFor(path: Iterable[String]): Option[ActorRef]
def guardian: ActorRef
def systemGuardian: ActorRef
def deathWatch: DeathWatch
def deadLetters: ActorRef
def deadLetterMailbox: Mailbox
/**
* What deployer will be used to resolve deployment configuration?
*/
@ -54,8 +60,6 @@ trait ActorRefProvider {
private[akka] def terminationFuture: Future[ActorSystem.ExitStatus]
private[akka] def dummyAskSender: ActorRef
private[akka] def tempPath: String
}
@ -114,18 +118,43 @@ class ActorRefProviderException(message: String) extends AkkaException(message)
/**
* Local ActorRef provider.
*/
class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
class LocalActorRefProvider(
private val app: ActorSystem,
val root: ActorPath,
val eventStream: EventStream,
val dispatcher: MessageDispatcher,
val scheduler: Scheduler) extends ActorRefProvider {
val log = Logging(app.eventStream, this)
val log = Logging(eventStream, this)
private[akka] val deployer: Deployer = new Deployer(app)
val terminationFuture = new DefaultPromise[ActorSystem.ExitStatus](Timeout.never)(app.dispatcher)
private[akka] val scheduler: Scheduler = { //TODO FIXME Make this configurable
val s = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512))
terminationFuture.onComplete(_ s.stop())
s
/*
* generate name for temporary actor refs
*/
private val tempNumber = new AtomicLong
def tempPath = {
val l = tempNumber.getAndIncrement()
"$_" + Helpers.base64(l)
}
// FIXME (actor path): this could become a cache for the new tree traversal actorFor
// currently still used for tmp actors (e.g. ask actor refs)
private val actors = new ConcurrentHashMap[String, AnyRef]
val deadLetters = new DeadLetterActorRef(eventStream, root / "nul", dispatcher)
val deadLetterMailbox = new Mailbox(null) {
becomeClosed()
override def dispatcher = null //MessageDispatcher.this
override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
override def dequeue() = null
override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) }
override def systemDrain(): SystemMessage = null
override def hasMessages = false
override def hasSystemMessages = false
override def numberOfMessages = 0
}
/**
@ -163,9 +192,39 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
}
}
// FIXME (actor path): this could become a cache for the new tree traversal actorFor
// currently still used for tmp actors (e.g. ask actor refs)
private val actors = new ConcurrentHashMap[String, AnyRef]
private class Guardian extends Actor {
def receive = {
case Terminated(_) context.self.stop()
}
}
private class SystemGuardian extends Actor {
def receive = {
case Terminated(_)
eventStream.stopDefaultLoggers()
context.self.stop()
}
}
private val guardianFaultHandlingStrategy = {
import akka.actor.FaultHandlingStrategy._
OneForOneStrategy {
case _: ActorKilledException Stop
case _: ActorInitializationException Stop
case _: Exception Restart
}
}
private val guardianProps = Props(new Guardian).withFaultHandler(guardianFaultHandlingStrategy)
private val rootGuardian: ActorRef = actorOf(guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, root, true)
val guardian: ActorRef = actorOf(guardianProps, rootGuardian, "app", true)
val systemGuardian: ActorRef = actorOf(guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true)
val deathWatch = createDeathWatch()
// chain death watchers so that killing guardian stops the application
deathWatch.subscribe(systemGuardian, guardian)
deathWatch.subscribe(rootGuardian, systemGuardian)
// FIXME (actor path): should start at the new root guardian, and not use the tail (just to avoid the expected "app" name for now)
def actorFor(path: Iterable[String]): Option[ActorRef] = findInCache(ActorPath.join(path)) orElse findInTree(Some(app.guardian), path.tail)
@ -286,14 +345,6 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
a.result
}
}
private[akka] val dummyAskSender = new DeadLetterActorRef(app)
private val tempNumber = new AtomicLong
def tempPath = {
val l = tempNumber.getAndIncrement()
"$_" + Helpers.base64(l)
}
}
class LocalDeathWatch extends DeathWatch with ActorClassification {

View file

@ -6,14 +6,17 @@ package akka.actor
import akka.config._
import akka.actor._
import akka.event._
import akka.dispatch._
import akka.util.duration._
import java.net.InetAddress
import com.eaio.uuid.UUID
import akka.dispatch.{ Dispatchers, Future, Mailbox, Envelope, SystemMessage }
import akka.util.Duration
import akka.util.ReflectiveAccess
import akka.serialization.Serialization
import akka.remote.RemoteAddress
import org.jboss.netty.akka.util.HashedWheelTimer
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
object ActorSystem {
@ -162,10 +165,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
def scheduler = provider.scheduler
// TODO think about memory consistency effects when doing funky stuff inside constructor
val reflective = new ReflectiveAccess(this)
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512))
/**
* The root actor path for this application.
@ -173,60 +173,29 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
val root: ActorPath = new RootActorPath(this)
// TODO think about memory consistency effects when doing funky stuff inside constructor
val provider: ActorRefProvider = reflective.createProvider
val provider: ActorRefProvider = {
val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match {
case Left(e) throw e
case Right(b) b
}
val params: Array[Class[_]] = Array(classOf[ActorSystem], classOf[ActorPath], classOf[EventStream], classOf[MessageDispatcher], classOf[Scheduler])
val args: Array[AnyRef] = Array(this, root, eventStream, dispatcher, scheduler)
ReflectiveAccess.createInstance[ActorRefProvider](providerClass, params, args) match {
case Left(e) throw e
case Right(p) p
}
}
def terminationFuture: Future[ExitStatus] = provider.terminationFuture
def guardian: ActorRef = provider.guardian
def systemGuardian: ActorRef = provider.systemGuardian
def deathWatch: DeathWatch = provider.deathWatch
def deadLetters: ActorRef = provider.deadLetters
def deadLetterMailbox: Mailbox = provider.deadLetterMailbox
private class Guardian extends Actor {
def receive = {
case Terminated(_) context.self.stop()
}
}
private class SystemGuardian extends Actor {
def receive = {
case Terminated(_)
eventStream.stopDefaultLoggers()
context.self.stop()
}
}
private val guardianFaultHandlingStrategy = {
import akka.actor.FaultHandlingStrategy._
OneForOneStrategy {
case _: ActorKilledException Stop
case _: ActorInitializationException Stop
case _: Exception Restart
}
}
private val guardianProps = Props(new Guardian).withFaultHandler(guardianFaultHandlingStrategy)
private val rootGuardian: ActorRef =
provider.actorOf(guardianProps, provider.theOneWhoWalksTheBubblesOfSpaceTime, root, true)
protected[akka] val guardian: ActorRef =
provider.actorOf(guardianProps, rootGuardian, "app", true)
protected[akka] val systemGuardian: ActorRef =
provider.actorOf(guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true)
// TODO think about memory consistency effects when doing funky stuff inside constructor
val deadLetters = new DeadLetterActorRef(this)
val deadLetterMailbox = new Mailbox(null) {
becomeClosed()
override def dispatcher = null //MessageDispatcher.this
override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
override def dequeue() = null
override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) }
override def systemDrain(): SystemMessage = null
override def hasMessages = false
override def hasSystemMessages = false
override def numberOfMessages = 0
}
val deathWatch = provider.createDeathWatch()
// chain death watchers so that killing guardian stops the application
deathWatch.subscribe(systemGuardian, guardian)
deathWatch.subscribe(rootGuardian, systemGuardian)
terminationFuture.onComplete(_ scheduler.stop())
terminationFuture.onComplete(_ dispatcher.shutdown())
// this starts the reaper actor and the user-configured logging subscribers, which are also actors
eventStream.start(this)
@ -251,5 +220,4 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
guardian.stop()
}
terminationFuture.onComplete(_ dispatcher.shutdown())
}

View file

@ -112,29 +112,3 @@ object ReflectiveAccess {
}
/**
* Helper class for reflective access to different modules in order to allow optional loading of modules.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ReflectiveAccess(val app: ActorSystem) {
import ReflectiveAccess._
def providerClass: Class[_] = {
getClassFor(app.AkkaConfig.ProviderClass) match {
case Left(e) throw e
case Right(b) b
}
}
def createProvider: ActorRefProvider = {
val params: Array[Class[_]] = Array(classOf[ActorSystem])
val args: Array[AnyRef] = Array(app)
createInstance[ActorRefProvider](providerClass, params, args) match {
case Right(p) p
case Left(e) throw e
}
}
}

View file

@ -17,26 +17,36 @@ import akka.serialization.{ Serialization, Serializer, Compression }
import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import com.google.protobuf.ByteString
import java.util.concurrent.atomic.AtomicBoolean
import akka.event.EventStream
/**
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
class RemoteActorRefProvider(
val app: ActorSystem,
val root: ActorPath,
val eventStream: EventStream,
val dispatcher: MessageDispatcher,
val scheduler: Scheduler) extends ActorRefProvider {
val log = Logging(app, this)
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
val local = new LocalActorRefProvider(app)
val local = new LocalActorRefProvider(app, root, eventStream, dispatcher, scheduler)
def deadLetterMailbox = local.deadLetterMailbox
def deadLetters = local.deadLetters
def deathWatch = local.deathWatch
def guardian = local.guardian
def systemGuardian = local.systemGuardian
val remote = new Remote(app)
private val actors = new ConcurrentHashMap[String, AnyRef]
@ -51,8 +61,6 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
def defaultDispatcher = app.dispatcher
def defaultTimeout = app.AkkaConfig.ActorTimeout
def scheduler: Scheduler = local.scheduler
private[akka] def actorOf(props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef =
actorOf(props, supervisor, supervisor.path / name, systemService)
@ -242,8 +250,6 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider {
private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within)
private[akka] def dummyAskSender = local.dummyAskSender
private[akka] def tempPath = local.tempPath
}