make system.actorOf() non-blocking (and working), see #2031

- introducing RepointableActorRef, which starts out with an
  UnstartedActorCell which can cheaply be created; the Supervise()
  message will trigger child.activate() in the supervisor, which means
  that the actual creation (now with normal ActorCell) happens exactly
  in the right place and with the right semantics. Messages which were
  enqueued to the dummy cell are transferred atomically into the
  ActorCell (using normal .tell()), so message sends keep working
  exactly as they used to
- this enables getting rid of the brittle synchronization around
  RoutedActorRef by replacing that one with a RepointableActorRef
  subclass which creates RoutedActorCells upon activate(), with the nice
  benefit that there is no hurry then to get it right because the new
  cell is constructed “on the side”

misc fixes:

- InvalidMessageException is now actually enforced when trying to send
  “null”
- Mailboxes may be created without having an ActorCell, which can come
  in handy later, because the cell is only needed when this mailbox is
  going to be scheduled on some executor
- remove occurrences of Props(), which is equivalent to Props[Nothing],
  which is equivalent to «bug»
- add test case which verifies that context.actorOf is still synchronous
- plus all the stuff I have forgotten.
This commit is contained in:
Roland 2012-06-13 17:57:56 +02:00
parent 8cd11550fa
commit b60210362e
37 changed files with 772 additions and 310 deletions

View file

@ -6,7 +6,7 @@ package akka.dispatch
import akka.AkkaException
import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import akka.util._
import akka.actor.{ ActorCell, ActorRef }
import akka.actor.{ ActorCell, ActorRef, Cell }
import java.util.concurrent._
import annotation.tailrec
import akka.event.Logging.Error
@ -48,11 +48,32 @@ private[akka] object Mailbox {
*
* INTERNAL API
*/
private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: MessageQueue)
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends SystemMessageQueue with Runnable {
import Mailbox._
/*
* This is needed for actually executing the mailbox, i.e. invoking the
* ActorCell. There are situations (e.g. RepointableActorRef) where a Mailbox
* is constructed but we know that we will not execute it, in which case this
* will be null. It must be a var to support switching into an active
* mailbox, should the owning ActorRef turn local.
*
* ANOTHER THING, IMPORTANT:
*
* actorCell.start() publishes actorCell & self to the dispatcher, which
* means that messages may be processed theoretically before selfs constructor
* ends. The JMM guarantees visibility for final fields only after the end
* of the constructor, so safe publication requires that THIS WRITE BELOW
* stay as it is.
*/
@volatile
var actor: ActorCell = _
def setActor(cell: ActorCell): Unit = actor = cell
def dispatcher: MessageDispatcher = actor.dispatcher
/**
* Try to enqueue the message to this queue, or throw an exception.
*/
@ -237,11 +258,12 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
* if we closed the mailbox, we must dump the remaining system messages
* to deadLetters (this is essential for DeathWatch)
*/
val dlm = actor.systemImpl.deadLetterMailbox
while (nextMessage ne null) {
val msg = nextMessage
nextMessage = nextMessage.next
msg.next = null
try actor.systemImpl.deadLetterMailbox.systemEnqueue(actor.self, msg)
try dlm.systemEnqueue(actor.self, msg)
catch {
case NonFatal(e) actor.system.eventStream.publish(
Error(e, actor.self.path.toString, this.getClass, "error while enqueuing " + msg + " to deadLetters: " + e.getMessage))
@ -251,9 +273,6 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
if (failure ne null) actor.handleInvokeFailure(failure, failure.getMessage)
}
@inline
final def dispatcher: MessageDispatcher = actor.dispatcher
/**
* Overridable callback to clean up the mailbox,
* called when an actor is unregistered.
@ -272,7 +291,7 @@ private[akka] abstract class Mailbox(val actor: ActorCell, val messageQueue: Mes
}
if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
messageQueue.cleanUp(actor, actor.systemImpl.deadLetterQueue)
messageQueue.cleanUp(actor.self, actor.systemImpl.deadLetterQueue)
}
}
@ -310,7 +329,7 @@ trait MessageQueue {
* which is passed in. The owner of this MessageQueue is passed in if
* available (e.g. for creating DeadLetters()), /deadletters otherwise.
*/
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit
}
/**
@ -338,10 +357,11 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
assert(message.next eq null)
if (Mailbox.debug) println(actor.self + " having enqueued " + message)
if (Mailbox.debug) println(receiver + " having enqueued " + message)
val head = systemQueueGet
if (head == NoMessage) actor.system.deadLetterMailbox.systemEnqueue(receiver, message)
else {
if (head == NoMessage) {
if (actor ne null) actor.systemImpl.deadLetterMailbox.systemEnqueue(receiver, message)
} else {
/*
* this write is safely published by the compareAndSet contained within
* systemQueuePut; Intra-Thread Semantics on page 12 of the JSR133 spec
@ -373,11 +393,11 @@ trait QueueBasedMessageQueue extends MessageQueue {
def queue: Queue[Envelope]
def numberOfMessages = queue.size
def hasMessages = !queue.isEmpty
def cleanUp(owner: ActorContext, deadLetters: MessageQueue): Unit = {
def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = {
if (hasMessages) {
var envelope = dequeue
while (envelope ne null) {
deadLetters.enqueue(owner.self, envelope)
deadLetters.enqueue(owner, envelope)
envelope = dequeue
}
}
@ -459,7 +479,7 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
* MailboxType is a factory to create MessageQueues for an optionally provided ActorContext
*/
trait MailboxType {
def create(owner: Option[ActorContext]): MessageQueue
def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue
}
/**
@ -469,7 +489,7 @@ case class UnboundedMailbox() extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new ConcurrentLinkedQueue[Envelope]() with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
@ -486,7 +506,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new LinkedBlockingQueue[Envelope](capacity) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedMailbox.this.pushTimeOut
@ -499,7 +519,7 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat
*/
class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope], final val initialCapacity: Int) extends MailboxType {
def this(cmp: Comparator[Envelope]) = this(cmp, 11)
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new PriorityBlockingQueue[Envelope](initialCapacity, cmp) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue: Queue[Envelope] = this
}
@ -514,7 +534,7 @@ class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val cap
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue: BlockingQueue[Envelope] = this
final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut
@ -528,7 +548,7 @@ case class UnboundedDequeBasedMailbox() extends MailboxType {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new LinkedBlockingDeque[Envelope]() with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics {
final val queue = this
}
@ -545,7 +565,7 @@ case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTime
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedDequeBasedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedDequeBasedMailbox can not be null")
final override def create(owner: Option[ActorContext]): MessageQueue =
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new LinkedBlockingDeque[Envelope](capacity) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics {
final val queue = this
final val pushTimeOut = BoundedDequeBasedMailbox.this.pushTimeOut