diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 00b8dcf386..e6d470111e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -12,6 +12,7 @@ import akka.serialization.Serialization import java.net.InetSocketAddress import akka.remote.RemoteAddress import java.util.concurrent.TimeUnit +import akka.event.EventStream /** * ActorRef is an immutable and serializable handle to an Actor. @@ -384,25 +385,22 @@ object DeadLetterActorRef { val serialized = new SerializedDeadLetterActorRef } -class DeadLetterActorRef(val app: ActorSystem) extends MinimalActorRef { - val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(app.dispatcher) +class DeadLetterActorRef(val eventStream: EventStream, val path: ActorPath, val dispatcher: MessageDispatcher) extends MinimalActorRef { + val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(dispatcher) override val name: String = "dead-letter" - // FIXME (actor path): put this under the sys guardian supervisor - val path: ActorPath = app.root / "sys" / name - - def address: String = app.address + path.toString + def address: String = path.toString override def isShutdown(): Boolean = true override def tell(msg: Any, sender: ActorRef): Unit = msg match { - case d: DeadLetter ⇒ app.eventStream.publish(d) - case _ ⇒ app.eventStream.publish(DeadLetter(msg, sender, this)) + case d: DeadLetter ⇒ eventStream.publish(d) + case _ ⇒ eventStream.publish(DeadLetter(msg, sender, this)) } override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { - app.eventStream.publish(DeadLetter(message, app.provider.dummyAskSender, this)) + eventStream.publish(DeadLetter(message, this, this)) brokenPromise } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 0bc355cf69..ae26fda28d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -5,17 +5,13 @@ package akka.actor import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.{ TimeUnit, Executors } - +import java.util.concurrent.{ ConcurrentHashMap, TimeUnit } import scala.annotation.tailrec - import org.jboss.netty.akka.util.{ TimerTask, HashedWheelTimer } - import akka.actor.Timeout.intToTimeout import akka.config.ConfigurationException -import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise } -import akka.event.{ Logging, DeathWatch, ActorClassification } +import akka.dispatch.{ SystemMessage, Supervise, Promise, MessageDispatcher, Future, DefaultPromise, Dispatcher, Mailbox, Envelope } +import akka.event.{ Logging, DeathWatch, ActorClassification, EventStream } import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter } import akka.util.Helpers import akka.AkkaException @@ -29,6 +25,16 @@ trait ActorRefProvider { def actorFor(path: Iterable[String]): Option[ActorRef] + def guardian: ActorRef + + def systemGuardian: ActorRef + + def deathWatch: DeathWatch + + def deadLetters: ActorRef + + def deadLetterMailbox: Mailbox + /** * What deployer will be used to resolve deployment configuration? */ @@ -54,8 +60,6 @@ trait ActorRefProvider { private[akka] def terminationFuture: Future[ActorSystem.ExitStatus] - private[akka] def dummyAskSender: ActorRef - private[akka] def tempPath: String } @@ -114,18 +118,43 @@ class ActorRefProviderException(message: String) extends AkkaException(message) /** * Local ActorRef provider. */ -class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { +class LocalActorRefProvider( + private val app: ActorSystem, + val root: ActorPath, + val eventStream: EventStream, + val dispatcher: MessageDispatcher, + val scheduler: Scheduler) extends ActorRefProvider { - val log = Logging(app.eventStream, this) + val log = Logging(eventStream, this) private[akka] val deployer: Deployer = new Deployer(app) val terminationFuture = new DefaultPromise[ActorSystem.ExitStatus](Timeout.never)(app.dispatcher) - private[akka] val scheduler: Scheduler = { //TODO FIXME Make this configurable - val s = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512)) - terminationFuture.onComplete(_ ⇒ s.stop()) - s + /* + * generate name for temporary actor refs + */ + private val tempNumber = new AtomicLong + def tempPath = { + val l = tempNumber.getAndIncrement() + "$_" + Helpers.base64(l) + } + + // FIXME (actor path): this could become a cache for the new tree traversal actorFor + // currently still used for tmp actors (e.g. ask actor refs) + private val actors = new ConcurrentHashMap[String, AnyRef] + + val deadLetters = new DeadLetterActorRef(eventStream, root / "nul", dispatcher) + val deadLetterMailbox = new Mailbox(null) { + becomeClosed() + override def dispatcher = null //MessageDispatcher.this + override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } + override def dequeue() = null + override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } + override def systemDrain(): SystemMessage = null + override def hasMessages = false + override def hasSystemMessages = false + override def numberOfMessages = 0 } /** @@ -163,9 +192,39 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { } } - // FIXME (actor path): this could become a cache for the new tree traversal actorFor - // currently still used for tmp actors (e.g. ask actor refs) - private val actors = new ConcurrentHashMap[String, AnyRef] + private class Guardian extends Actor { + def receive = { + case Terminated(_) ⇒ context.self.stop() + } + } + private class SystemGuardian extends Actor { + def receive = { + case Terminated(_) ⇒ + eventStream.stopDefaultLoggers() + context.self.stop() + } + } + private val guardianFaultHandlingStrategy = { + import akka.actor.FaultHandlingStrategy._ + OneForOneStrategy { + case _: ActorKilledException ⇒ Stop + case _: ActorInitializationException ⇒ Stop + case _: Exception ⇒ Restart + } + } + private val guardianProps = Props(new Guardian).withFaultHandler(guardianFaultHandlingStrategy) + + private val rootGuardian: ActorRef = actorOf(guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, root, true) + + val guardian: ActorRef = actorOf(guardianProps, rootGuardian, "app", true) + + val systemGuardian: ActorRef = actorOf(guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true) + + val deathWatch = createDeathWatch() + + // chain death watchers so that killing guardian stops the application + deathWatch.subscribe(systemGuardian, guardian) + deathWatch.subscribe(rootGuardian, systemGuardian) // FIXME (actor path): should start at the new root guardian, and not use the tail (just to avoid the expected "app" name for now) def actorFor(path: Iterable[String]): Option[ActorRef] = findInCache(ActorPath.join(path)) orElse findInTree(Some(app.guardian), path.tail) @@ -286,14 +345,6 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { a.result } } - - private[akka] val dummyAskSender = new DeadLetterActorRef(app) - - private val tempNumber = new AtomicLong - def tempPath = { - val l = tempNumber.getAndIncrement() - "$_" + Helpers.base64(l) - } } class LocalDeathWatch extends DeathWatch with ActorClassification { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 1ceb357c76..b9cd41bace 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -6,14 +6,17 @@ package akka.actor import akka.config._ import akka.actor._ import akka.event._ +import akka.dispatch._ import akka.util.duration._ import java.net.InetAddress import com.eaio.uuid.UUID -import akka.dispatch.{ Dispatchers, Future, Mailbox, Envelope, SystemMessage } import akka.util.Duration import akka.util.ReflectiveAccess import akka.serialization.Serialization import akka.remote.RemoteAddress +import org.jboss.netty.akka.util.HashedWheelTimer +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit object ActorSystem { @@ -162,10 +165,7 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher - def scheduler = provider.scheduler - - // TODO think about memory consistency effects when doing funky stuff inside constructor - val reflective = new ReflectiveAccess(this) + val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512)) /** * The root actor path for this application. @@ -173,60 +173,29 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF val root: ActorPath = new RootActorPath(this) // TODO think about memory consistency effects when doing funky stuff inside constructor - val provider: ActorRefProvider = reflective.createProvider + val provider: ActorRefProvider = { + val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match { + case Left(e) ⇒ throw e + case Right(b) ⇒ b + } + val params: Array[Class[_]] = Array(classOf[ActorSystem], classOf[ActorPath], classOf[EventStream], classOf[MessageDispatcher], classOf[Scheduler]) + val args: Array[AnyRef] = Array(this, root, eventStream, dispatcher, scheduler) + + ReflectiveAccess.createInstance[ActorRefProvider](providerClass, params, args) match { + case Left(e) ⇒ throw e + case Right(p) ⇒ p + } + } def terminationFuture: Future[ExitStatus] = provider.terminationFuture + def guardian: ActorRef = provider.guardian + def systemGuardian: ActorRef = provider.systemGuardian + def deathWatch: DeathWatch = provider.deathWatch + def deadLetters: ActorRef = provider.deadLetters + def deadLetterMailbox: Mailbox = provider.deadLetterMailbox - private class Guardian extends Actor { - def receive = { - case Terminated(_) ⇒ context.self.stop() - } - } - private class SystemGuardian extends Actor { - def receive = { - case Terminated(_) ⇒ - eventStream.stopDefaultLoggers() - context.self.stop() - } - } - private val guardianFaultHandlingStrategy = { - import akka.actor.FaultHandlingStrategy._ - OneForOneStrategy { - case _: ActorKilledException ⇒ Stop - case _: ActorInitializationException ⇒ Stop - case _: Exception ⇒ Restart - } - } - private val guardianProps = Props(new Guardian).withFaultHandler(guardianFaultHandlingStrategy) - - private val rootGuardian: ActorRef = - provider.actorOf(guardianProps, provider.theOneWhoWalksTheBubblesOfSpaceTime, root, true) - - protected[akka] val guardian: ActorRef = - provider.actorOf(guardianProps, rootGuardian, "app", true) - - protected[akka] val systemGuardian: ActorRef = - provider.actorOf(guardianProps.withCreator(new SystemGuardian), rootGuardian, "sys", true) - - // TODO think about memory consistency effects when doing funky stuff inside constructor - val deadLetters = new DeadLetterActorRef(this) - val deadLetterMailbox = new Mailbox(null) { - becomeClosed() - override def dispatcher = null //MessageDispatcher.this - override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } - override def dequeue() = null - override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } - override def systemDrain(): SystemMessage = null - override def hasMessages = false - override def hasSystemMessages = false - override def numberOfMessages = 0 - } - - val deathWatch = provider.createDeathWatch() - - // chain death watchers so that killing guardian stops the application - deathWatch.subscribe(systemGuardian, guardian) - deathWatch.subscribe(rootGuardian, systemGuardian) + terminationFuture.onComplete(_ ⇒ scheduler.stop()) + terminationFuture.onComplete(_ ⇒ dispatcher.shutdown()) // this starts the reaper actor and the user-configured logging subscribers, which are also actors eventStream.start(this) @@ -251,5 +220,4 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF guardian.stop() } - terminationFuture.onComplete(_ ⇒ dispatcher.shutdown()) } diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 285cdaeeb9..d331190de6 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -112,29 +112,3 @@ object ReflectiveAccess { } -/** - * Helper class for reflective access to different modules in order to allow optional loading of modules. - * - * @author Jonas Bonér - */ -class ReflectiveAccess(val app: ActorSystem) { - - import ReflectiveAccess._ - - def providerClass: Class[_] = { - getClassFor(app.AkkaConfig.ProviderClass) match { - case Left(e) ⇒ throw e - case Right(b) ⇒ b - } - } - - def createProvider: ActorRefProvider = { - val params: Array[Class[_]] = Array(classOf[ActorSystem]) - val args: Array[AnyRef] = Array(app) - - createInstance[ActorRefProvider](providerClass, params, args) match { - case Right(p) ⇒ p - case Left(e) ⇒ throw e - } - } -} diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 1eeeb2ccf5..5f0223176e 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -17,26 +17,36 @@ import akka.serialization.{ Serialization, Serializer, Compression } import akka.serialization.Compression.LZF import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ - import java.net.InetSocketAddress import java.util.concurrent.ConcurrentHashMap - import com.google.protobuf.ByteString import java.util.concurrent.atomic.AtomicBoolean +import akka.event.EventStream /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. * * @author Jonas Bonér */ -class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider { +class RemoteActorRefProvider( + val app: ActorSystem, + val root: ActorPath, + val eventStream: EventStream, + val dispatcher: MessageDispatcher, + val scheduler: Scheduler) extends ActorRefProvider { val log = Logging(app, this) import java.util.concurrent.ConcurrentHashMap import akka.dispatch.Promise - val local = new LocalActorRefProvider(app) + val local = new LocalActorRefProvider(app, root, eventStream, dispatcher, scheduler) + def deadLetterMailbox = local.deadLetterMailbox + def deadLetters = local.deadLetters + def deathWatch = local.deathWatch + def guardian = local.guardian + def systemGuardian = local.systemGuardian + val remote = new Remote(app) private val actors = new ConcurrentHashMap[String, AnyRef] @@ -51,8 +61,6 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider { def defaultDispatcher = app.dispatcher def defaultTimeout = app.AkkaConfig.ActorTimeout - def scheduler: Scheduler = local.scheduler - private[akka] def actorOf(props: Props, supervisor: ActorRef, name: String, systemService: Boolean): ActorRef = actorOf(props, supervisor, supervisor.path / name, systemService) @@ -242,8 +250,6 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider { private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within) - private[akka] def dummyAskSender = local.dummyAskSender - private[akka] def tempPath = local.tempPath }