From 422cf386c8ba5fefd2c17f144386a8fbd964083f Mon Sep 17 00:00:00 2001 From: Roland Date: Tue, 19 Jun 2012 14:52:02 +0200 Subject: [PATCH] incorporate review comments, add docs, see #2031 also add Java sample for creating custom MailboxType --- .../src/main/scala/akka/actor/ActorCell.scala | 12 +++--- .../src/main/scala/akka/actor/ActorRef.scala | 16 ++++++++ .../scala/akka/actor/ActorRefProvider.scala | 13 +++--- .../main/scala/akka/actor/ActorSystem.scala | 29 ++++--------- .../akka/actor/RepointableActorRef.scala | 27 ++++++++---- .../dispatcher/DispatcherDocTestBase.java | 37 +++++++++++++++++ akka-docs/java/dispatchers.rst | 41 +++++++++++++++++++ akka-docs/java/untyped-actors.rst | 7 ---- akka-docs/scala/actors.rst | 7 ---- .../docs/dispatcher/DispatcherDocSpec.scala | 4 +- akka-docs/scala/dispatchers.rst | 20 +++++++++ .../akka/remote/RemoteActorRefProvider.scala | 4 +- 12 files changed, 157 insertions(+), 60 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 5a1269b5fe..c0d15d07bb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -328,12 +328,12 @@ private[akka] object ActorCell { def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name) - def getByName(name: String): Option[ChildRestartStats] = c get name match { + def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] case _ ⇒ None } - def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match { + def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] case _ ⇒ None } @@ -349,7 +349,7 @@ private[akka] object ActorCell { throw new InvalidActorNameException("actor name " + name + " is not unique!") else new NormalChildrenContainer(c.updated(name, ChildNameReserved)) - def unreserve(name: String): ChildrenContainer = c get name match { + def unreserve(name: String): ChildrenContainer = c.get(name) match { case Some(ChildNameReserved) ⇒ NormalChildrenContainer(c - name) case _ ⇒ this } @@ -389,12 +389,12 @@ private[akka] object ActorCell { else copy(c - child.path.name, t) } - def getByName(name: String): Option[ChildRestartStats] = c get name match { + def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] case _ ⇒ None } - def getByRef(actor: ActorRef): Option[ChildRestartStats] = c get actor.path.name match { + def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] case _ ⇒ None } @@ -413,7 +413,7 @@ private[akka] object ActorCell { else copy(c = c.updated(name, ChildNameReserved)) } - def unreserve(name: String): ChildrenContainer = c get name match { + def unreserve(name: String): ChildrenContainer = c.get(name) match { case Some(ChildNameReserved) ⇒ copy(c = c - name) case _ ⇒ this } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index bde2a2194c..8d42714b00 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -163,10 +163,20 @@ private[akka] trait ActorRefScope { def isLocal: Boolean } +/** + * Refs which are statically known to be local inherit from this Scope + */ private[akka] trait LocalRef extends ActorRefScope { final def isLocal = true } +/** + * RepointableActorRef (and potentially others) may change their locality at + * runtime, meaning that isLocal might not be stable. RepointableActorRef has + * the feature that it starts out “not fully started” (but you can send to it), + * which is why `isStarted` features here; it is not improbable that cluster + * actor refs will have the same behavior. + */ private[akka] trait RepointableRef extends ActorRefScope { def isStarted: Boolean } @@ -214,6 +224,12 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe def isLocal: Boolean } +/** + * Common trait of all actor refs which actually have a Cell, most notably + * LocalActorRef and RepointableActorRef. The former specializes the return + * type of `underlying` so that follow-up calls can use invokevirtual instead + * of invokeinterface. + */ private[akka] abstract class ActorRefWithCell extends InternalActorRef { this: ActorRefScope ⇒ def underlying: Cell } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 8195aea64c..bbb84144c5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -26,12 +26,12 @@ trait ActorRefProvider { /** * Reference to the supervisor used for all top-level user actors. */ - def guardian: InternalActorRef + def guardian: LocalActorRef /** * Reference to the supervisor used for all top-level system actors. */ - def systemGuardian: InternalActorRef + def systemGuardian: LocalActorRef /** * Dead letter destination for this provider. @@ -482,13 +482,10 @@ class LocalActorRefProvider( } } - lazy val guardian: InternalActorRef = - actorOf(system, guardianProps, rootGuardian, rootPath / "user", - systemService = true, deploy = None, lookupDeploy = false, async = false) + lazy val guardian: LocalActorRef = new LocalActorRef(system, guardianProps, rootGuardian, rootPath / "user") - lazy val systemGuardian: InternalActorRef = - actorOf(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system", - systemService = true, deploy = None, lookupDeploy = false, async = false) + lazy val systemGuardian: LocalActorRef = + new LocalActorRef(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system") lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 030fa4a8b5..e45363253a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -426,8 +426,10 @@ abstract class ExtendedActorSystem extends ActorSystem { /** * For debugging: traverse actor hierarchy and make string representation. + * Careful, this may OOM on large actor systems, and it is only meant for + * helping debugging in case something already went terminally wrong. */ - def printTree: String + private[akka] def printTree: String } private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classLoader: ClassLoader) extends ExtendedActorSystem { @@ -485,26 +487,11 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, protected def systemImpl: ActorSystemImpl = this - private[akka] def systemActorOf(props: Props, name: String): ActorRef = { - systemGuardian match { - case g: LocalActorRef ⇒ g.underlying.attachChild(props, name) - case s ⇒ throw new UnsupportedOperationException("unknown systemGuardian type " + s.getClass) - } - } + private[akka] def systemActorOf(props: Props, name: String): ActorRef = systemGuardian.underlying.attachChild(props, name) - def actorOf(props: Props, name: String): ActorRef = { - guardian match { - case g: LocalActorRef ⇒ g.underlying.attachChild(props, name) - case s ⇒ throw new UnsupportedOperationException("unknown guardian type " + s.getClass) - } - } + def actorOf(props: Props, name: String): ActorRef = guardian.underlying.attachChild(props, name) - def actorOf(props: Props): ActorRef = { - guardian match { - case g: LocalActorRef ⇒ g.underlying.attachChild(props) - case s ⇒ throw new UnsupportedOperationException("unknown guardian type " + s.getClass) - } - } + def actorOf(props: Props): ActorRef = guardian.underlying.attachChild(props) def stop(actor: ActorRef): Unit = { implicit val timeout = settings.CreationTimeout @@ -569,8 +556,8 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def terminationFuture: Future[Unit] = provider.terminationFuture def lookupRoot: InternalActorRef = provider.rootGuardian - def guardian: InternalActorRef = provider.guardian - def systemGuardian: InternalActorRef = provider.systemGuardian + def guardian: LocalActorRef = provider.guardian + def systemGuardian: LocalActorRef = provider.systemGuardian def /(actorName: String): ActorPath = guardian.path / actorName def /(path: Iterable[String]): ActorPath = guardian.path / path diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 1344735993..ad9a7cb0c4 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -43,11 +43,13 @@ private[akka] class RepointableActorRef( if (Unsafe.instance.compareAndSwapObject(this, cellOffset, old, next)) old else swapCell(next) } - /* + /** * Initialize: make a dummy cell which holds just a mailbox, then tell our - * supervisor that we exist so that he can create the real Cell in + * supervisor that we exist so that he can create the real Cell in * handleSupervise(). - * + * + * Call twice on your own peril! + * * This is protected so that others can have different initialization. */ def initialize(): this.type = { @@ -57,9 +59,10 @@ private[akka] class RepointableActorRef( } /** - * This method is supposedly called by the supervisor in handleSupervise() + * This method is supposed to be called by the supervisor in handleSupervise() * to replace the UnstartedCell with the real one. It assumes no concurrent - * modification of the underlying Cell. + * modification of the `underlying` field, though it is safe to send messages + * at any time. */ def activate(): this.type = { underlying match { @@ -69,6 +72,10 @@ private[akka] class RepointableActorRef( this } + /** + * This is called by activate() to obtain the cell which is to replace the + * unstarted cell. The cell must be fully functional. + */ def newCell(): Cell = new ActorCell(system, this, props, supervisor).start() def suspend(): Unit = underlying.suspend() @@ -138,11 +145,17 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep while (systemQueue.nonEmpty || queue.nonEmpty) { while (systemQueue.nonEmpty) { val msg = systemQueue.dequeue() - try cell sendSystemMessage msg catch { case _: InterruptedException ⇒ interrupted = true } + try cell.sendSystemMessage(msg) + catch { + case _: InterruptedException ⇒ interrupted = true + } } if (queue.nonEmpty) { val envelope = queue.dequeue() - try cell tell (envelope.message, envelope.sender) catch { case _: InterruptedException ⇒ interrupted = true } + try cell.tell(envelope.message, envelope.sender) + catch { + case _: InterruptedException ⇒ interrupted = true + } } } if (interrupted) throw new InterruptedException diff --git a/akka-docs/java/code/docs/dispatcher/DispatcherDocTestBase.java b/akka-docs/java/code/docs/dispatcher/DispatcherDocTestBase.java index 94e4b38121..ca5569657e 100644 --- a/akka-docs/java/code/docs/dispatcher/DispatcherDocTestBase.java +++ b/akka-docs/java/code/docs/dispatcher/DispatcherDocTestBase.java @@ -24,6 +24,15 @@ import com.typesafe.config.Config; //#imports-prio-mailbox +//#imports-custom +import akka.dispatch.Envelope; +import akka.dispatch.MessageQueue; +import akka.dispatch.MailboxType; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +//#imports-custom + import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -136,4 +145,32 @@ public class DispatcherDocTestBase { } } //#prio-mailbox + + //#mailbox-implementation-example + class MyUnboundedMailbox implements MailboxType { + + // This constructor signature must exist, it will be called by Akka + public MyUnboundedMailbox(ActorSystem.Settings settings, Config config) { + // put your initialization code here + } + + // The create method is called to create the MessageQueue + public MessageQueue create(Option owner, Option system) { + return new MessageQueue() { + private final Queue queue = new ConcurrentLinkedQueue(); + + // these must be implemented; queue used as example + public void enqueue(ActorRef receiver, Envelope handle) { queue.offer(handle); } + public Envelope dequeue() { return queue.poll(); } + public int numberOfMessages() { return queue.size(); } + public boolean hasMessages() { return !queue.isEmpty(); } + public void cleanUp(ActorRef owner, MessageQueue deadLetters) { + for (Envelope handle: queue) { + deadLetters.enqueue(owner, handle); + } + } + }; + } + } + //#mailbox-implementation-example } diff --git a/akka-docs/java/dispatchers.rst b/akka-docs/java/dispatchers.rst index 2723883e9c..27716275c0 100644 --- a/akka-docs/java/dispatchers.rst +++ b/akka-docs/java/dispatchers.rst @@ -183,3 +183,44 @@ And then an example on how you would use it: the configuration which describes the dispatcher using this mailbox type; the mailbox type will be instantiated once for each dispatcher using it. +Creating your own Mailbox type +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +An example is worth a thousand quacks: + +.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#imports-custom + +.. includecode:: code/docs/dispatcher/DispatcherDocTestBase.java#mailbox-implementation-example + +And then you just specify the FQCN of your MailboxType as the value of the "mailbox-type" in the dispatcher configuration. + +.. note:: + + Make sure to include a constructor which takes + ``akka.actor.ActorSystem.Settings`` and ``com.typesafe.config.Config`` + arguments, as this constructor is invoked reflectively to construct your + mailbox type. The config passed in as second argument is that section from + the configuration which describes the dispatcher using this mailbox type; the + mailbox type will be instantiated once for each dispatcher using it. + + +Special Semantics of ``system.actorOf`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In order to make ``system.actorOf`` both synchronous and non-blocking while +keeping the return type :class:`ActorRef` (and the semantics that the returned +ref is fully functional), special handling takes place for this case. Behind +the scenes, a hollow kind of actor reference is constructed, which is sent to +the system’s guardian actor who actually creates the actor and its context and +puts those inside the reference. Until that has happened, messages sent to the +:class:`ActorRef` will be queued locally, and only upon swapping the real +filling in will they be transferred into the real mailbox. Thus, + +.. code-block:: scala + + system.actorOf(...).tell("bang"); + assert(bangIsInMyCustomMailbx); + +will probably fail; you will have to allow for some time to pass and retry the +check à la :meth:`TestKit.awaitCond`. + diff --git a/akka-docs/java/untyped-actors.rst b/akka-docs/java/untyped-actors.rst index ac911fd216..57dbaa5604 100644 --- a/akka-docs/java/untyped-actors.rst +++ b/akka-docs/java/untyped-actors.rst @@ -82,13 +82,6 @@ that is used in log messages and for identifying actors. The name must not be em or start with ``$``. If the given name is already in use by another child to the same parent actor an `InvalidActorNameException` is thrown. -.. warning:: - - Creating top-level actors with ``system.actorOf`` is a blocking operation, - hence it may dead-lock due to starvation if the default dispatcher is - overloaded. To avoid problems, do not call this method from within actors or - futures which run on the default dispatcher. - Actors are automatically started asynchronously when created. When you create the ``UntypedActor`` then it will automatically call the ``preStart`` callback method on the ``UntypedActor`` class. This is an excellent place to diff --git a/akka-docs/scala/actors.rst b/akka-docs/scala/actors.rst index 9b2cb9a7e5..47a2318e53 100644 --- a/akka-docs/scala/actors.rst +++ b/akka-docs/scala/actors.rst @@ -76,13 +76,6 @@ that is used in log messages and for identifying actors. The name must not be em or start with ``$``. If the given name is already in use by another child to the same parent actor an `InvalidActorNameException` is thrown. -.. warning:: - - Creating top-level actors with ``system.actorOf`` is a blocking operation, - hence it may dead-lock due to starvation if the default dispatcher is - overloaded. To avoid problems, do not call this method from within actors or - futures which run on the default dispatcher. - Actors are automatically started asynchronously when created. When you create the ``Actor`` then it will automatically call the ``preStart`` callback method on the ``Actor`` trait. This is an excellent place to diff --git a/akka-docs/scala/code/docs/dispatcher/DispatcherDocSpec.scala b/akka-docs/scala/code/docs/dispatcher/DispatcherDocSpec.scala index 09a2f810bf..7fdd0cd9bf 100644 --- a/akka-docs/scala/code/docs/dispatcher/DispatcherDocSpec.scala +++ b/akka-docs/scala/code/docs/dispatcher/DispatcherDocSpec.scala @@ -134,7 +134,7 @@ object DispatcherDocSpec { } //#mailbox-implementation-example - case class MyUnboundedMailbox() extends akka.dispatch.MailboxType { + class MyUnboundedMailbox extends akka.dispatch.MailboxType { import akka.actor.{ ActorRef, ActorSystem } import com.typesafe.config.Config import java.util.concurrent.ConcurrentLinkedQueue @@ -153,8 +153,8 @@ object DispatcherDocSpec { new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope]() } - //#mailbox-implementation-example } + //#mailbox-implementation-example } class DispatcherDocSpec extends AkkaSpec(DispatcherDocSpec.config) { diff --git a/akka-docs/scala/dispatchers.rst b/akka-docs/scala/dispatchers.rst index cea9ee6e0a..5be19ad799 100644 --- a/akka-docs/scala/dispatchers.rst +++ b/akka-docs/scala/dispatchers.rst @@ -198,3 +198,23 @@ And then you just specify the FQCN of your MailboxType as the value of the "mail the configuration which describes the dispatcher using this mailbox type; the mailbox type will be instantiated once for each dispatcher using it. +Special Semantics of ``system.actorOf`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In order to make ``system.actorOf`` both synchronous and non-blocking while +keeping the return type :class:`ActorRef` (and the semantics that the returned +ref is fully functional), special handling takes place for this case. Behind +the scenes, a hollow kind of actor reference is constructed, which is sent to +the system’s guardian actor who actually creates the actor and its context and +puts those inside the reference. Until that has happened, messages sent to the +:class:`ActorRef` will be queued locally, and only upon swapping the real +filling in will they be transferred into the real mailbox. Thus, + +.. code-block:: scala + + system.actorOf(...) ! "bang" + assert(bangIsInMyCustomMailbx) + +will probably fail; you will have to allow for some time to pass and retry the +check à la :meth:`TestKit.awaitCond`. + diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index dfdf25759b..cdf9ad9d70 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -36,8 +36,8 @@ private[akka] class RemoteActorRefProvider( // these are only available after init() override def rootGuardian: InternalActorRef = local.rootGuardian - override def guardian: InternalActorRef = local.guardian - override def systemGuardian: InternalActorRef = local.systemGuardian + override def guardian: LocalActorRef = local.guardian + override def systemGuardian: LocalActorRef = local.systemGuardian override def terminationFuture: Promise[Unit] = local.terminationFuture override def dispatcher: MessageDispatcher = local.dispatcher override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path)