diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 7e8d6ae929..5181776db1 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -146,8 +146,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn class DefaultMailboxSpec extends MailboxSpec { lazy val name = "The default mailbox implementation" def factory = { - case u: UnboundedMailbox ⇒ u.create(null) - case b: BoundedMailbox ⇒ b.create(null) + case u: UnboundedMailbox ⇒ u.create(null, null) + case b: BoundedMailbox ⇒ b.create(null, null) } } @@ -155,7 +155,7 @@ class PriorityMailboxSpec extends MailboxSpec { val comparator = PriorityGenerator(_.##) lazy val name = "The priority mailbox implementation" def factory = { - case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(null) - case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null) + case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(null, null) + case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null, null) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 23014cd884..f010e06045 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -245,7 +245,7 @@ private[akka] class ActorCell( if (props.supervisor.isDefined) { props.supervisor.get match { case l: LocalActorRef ⇒ - l.underlying.dispatcher.systemDispatch(SystemEnvelope(l.underlying, akka.dispatch.Supervise(self), NullChannel)) + l.underlying.dispatcher.systemDispatch(l.underlying, akka.dispatch.Supervise(self)) case other ⇒ throw new UnsupportedOperationException("Supervision failure: " + other + " cannot be a supervisor, only LocalActorRefs can") } } @@ -253,20 +253,20 @@ private[akka] class ActorCell( dispatcher.attach(this) } - def suspend(): Unit = dispatcher.systemDispatch(SystemEnvelope(this, Suspend, NullChannel)) + def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) - def resume(): Unit = dispatcher.systemDispatch(SystemEnvelope(this, Resume, NullChannel)) + def resume(): Unit = dispatcher.systemDispatch(this, Resume()) private[akka] def stop(): Unit = - dispatcher.systemDispatch(SystemEnvelope(this, Terminate, NullChannel)) + dispatcher.systemDispatch(this, Terminate()) def startsMonitoring(subject: ActorRef): ActorRef = { - dispatcher.systemDispatch(SystemEnvelope(this, Link(subject), NullChannel)) + dispatcher.systemDispatch(this, Link(subject)) subject } def stopsMonitoring(subject: ActorRef): ActorRef = { - dispatcher.systemDispatch(SystemEnvelope(this, Unlink(subject), NullChannel)) + dispatcher.systemDispatch(this, Unlink(subject)) subject } @@ -324,7 +324,7 @@ private[akka] class ActorCell( } } - def systemInvoke(envelope: SystemEnvelope) { + def systemInvoke(message: SystemMessage) { def create(): Unit = try { val created = newActor() @@ -337,7 +337,6 @@ private[akka] class ActorCell( app.eventHandler.error(e, self, "error while creating actor") // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) - envelope.channel.sendException(e) } finally { if (supervisor.isDefined) supervisor.get ! Failed(self, e) else self.stop() } @@ -369,7 +368,6 @@ private[akka] class ActorCell( app.eventHandler.error(e, self, "error while creating actor") // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) - envelope.channel.sendException(e) } finally { if (supervisor.isDefined) supervisor.get ! Failed(self, e) else self.stop() } @@ -424,24 +422,24 @@ private[akka] class ActorCell( try { val isClosed = mailbox.isClosed //Fence plus volatile read if (!isClosed) { - envelope.message match { - case Create ⇒ create() - case Recreate(cause) ⇒ recreate(cause) - case Link(subject) ⇒ + message match { + case Create(_) ⇒ create() + case Recreate(cause, _) ⇒ recreate(cause) + case Link(subject, _) ⇒ app.deathWatch.subscribe(self, subject) if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "now monitoring " + subject) - case Unlink(subject) ⇒ + case Unlink(subject, _) ⇒ app.deathWatch.unsubscribe(self, subject) if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "stopped monitoring " + subject) - case Suspend ⇒ suspend() - case Resume ⇒ resume() - case Terminate ⇒ terminate() - case Supervise(child) ⇒ supervise(child) + case Suspend(_) ⇒ suspend() + case Resume(_) ⇒ resume() + case Terminate(_) ⇒ terminate() + case Supervise(child, _) ⇒ supervise(child) } } } catch { case e ⇒ //Should we really catch everything here? - app.eventHandler.error(e, self, "error while processing " + envelope.message) + app.eventHandler.error(e, self, "error while processing " + message) //TODO FIXME How should problems here be handled? throw e } @@ -495,7 +493,7 @@ private[akka] class ActorCell( def handleChildTerminated(child: ActorRef): Unit = _children = props.faultHandler.handleChildTerminated(child, _children) - def restart(cause: Throwable): Unit = dispatcher.systemDispatch(SystemEnvelope(this, Recreate(cause), NullChannel)) + def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) def checkReceiveTimeout() { cancelReceiveTimeout() diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index c7e7fab35a..279543bb23 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -24,25 +24,17 @@ final case class Envelope(val receiver: ActorCell, val message: Any, val channel } } -sealed trait SystemMessage extends PossiblyHarmful -case object Create extends SystemMessage -case class Recreate(cause: Throwable) extends SystemMessage -case object Suspend extends SystemMessage -case object Resume extends SystemMessage -case object Terminate extends SystemMessage -case class Supervise(child: ActorRef) extends SystemMessage -case class Link(subject: ActorRef) extends SystemMessage -case class Unlink(subject: ActorRef) extends SystemMessage - -final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMessage, val channel: UntypedChannel) { - if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") - /** - * @return whether to proceed with processing other messages - */ - final def invoke() { - receiver systemInvoke this - } +sealed trait SystemMessage extends PossiblyHarmful { + def next: SystemMessage } +case class Create(next: SystemMessage = null) extends SystemMessage +case class Recreate(cause: Throwable, next: SystemMessage = null) extends SystemMessage +case class Suspend(next: SystemMessage = null) extends SystemMessage +case class Resume(next: SystemMessage = null) extends SystemMessage +case class Terminate(next: SystemMessage = null) extends SystemMessage +case class Supervise(child: ActorRef, next: SystemMessage = null) extends SystemMessage +case class Link(subject: ActorRef, next: SystemMessage = null) extends SystemMessage +case class Unlink(subject: ActorRef, next: SystemMessage = null) extends SystemMessage final case class TaskInvocation(app: AkkaApplication, function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { def run() { @@ -87,13 +79,13 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable */ protected[akka] val deadLetterMailbox: Mailbox = DeadLetterMailbox - object DeadLetterMailbox extends Mailbox { + object DeadLetterMailbox extends Mailbox(null) { becomeClosed() override def dispatcher = null //MessageDispatcher.this override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") } override def dequeue() = null - override def systemEnqueue(handle: SystemEnvelope): Unit = () - override def systemDequeue(): SystemEnvelope = null + override def systemEnqueue(handle: SystemMessage): Unit = () + override def systemDequeue(): SystemMessage = null override def hasMessages = false override def hasSystemMessages = false override def numberOfMessages = 0 @@ -174,7 +166,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable */ protected[akka] def register(actor: ActorCell) { if (uuids add actor.uuid) { - systemDispatch(SystemEnvelope(actor, Create, NullChannel)) //FIXME should this be here or moved into ActorCell.start perhaps? + systemDispatch(actor, Create()) //FIXME should this be here or moved into ActorCell.start perhaps? } else System.err.println("Couldn't register: " + actor) } @@ -258,7 +250,7 @@ abstract class MessageDispatcher(val app: AkkaApplication) extends Serializable /** * Will be called when the dispatcher is to queue an invocation for execution */ - protected[akka] def systemDispatch(invocation: SystemEnvelope) + protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage) /** * Will be called when the dispatcher is to queue an invocation for execution diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index e5c5cf198b..551eea21ce 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -54,7 +54,7 @@ class BalancingDispatcher( protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) - class SharingMailbox(val actor: ActorCell) extends Mailbox with DefaultSystemMessageQueue { + class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { final def enqueue(handle: Envelope) = messageQueue.enqueue(handle) final def dequeue(): Envelope = { diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index fd3dd697da..53d28a79a3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -84,8 +84,8 @@ class Dispatcher( registerForExecution(mbox, true, false) } - protected[akka] def systemDispatch(invocation: SystemEnvelope) = { - val mbox = invocation.receiver.mailbox + protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage) = { + val mbox = receiver.mailbox mbox systemEnqueue invocation registerForExecution(mbox, false, true) } @@ -100,7 +100,7 @@ class Dispatcher( } } - protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(this) + protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(this, actor) protected[akka] def start {} diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 5e21d5ea87..095a0f9cd5 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -8,7 +8,7 @@ import akka.AkkaException import java.util.{ Comparator, PriorityQueue } import akka.util._ import java.util.Queue -import akka.actor.ActorContext +import akka.actor.{ ActorContext, ActorCell } import java.util.concurrent._ import atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } import annotation.tailrec @@ -34,7 +34,7 @@ private[dispatch] object Mailbox { /** * @author Jonas Bonér */ -abstract class Mailbox extends AbstractMailbox with MessageQueue with SystemMessageQueue with Runnable { +abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with MessageQueue with SystemMessageQueue with Runnable { import Mailbox._ @inline @@ -186,7 +186,7 @@ abstract class Mailbox extends AbstractMailbox with MessageQueue with SystemMess def processAllSystemMessages() { var nextMessage = systemDequeue() while (nextMessage ne null) { - nextMessage.invoke() + actor systemInvoke nextMessage nextMessage = systemDequeue() } } @@ -208,19 +208,20 @@ trait MessageQueue { } trait SystemMessageQueue { - def systemEnqueue(handle: SystemEnvelope): Unit + def systemEnqueue(message: SystemMessage): Unit - def systemDequeue(): SystemEnvelope + def systemDequeue(): SystemMessage def hasSystemMessages: Boolean } trait DefaultSystemMessageQueue { self: SystemMessageQueue ⇒ - final val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]() - def systemEnqueue(handle: SystemEnvelope): Unit = systemMessages offer handle + final val systemMessages = new ConcurrentLinkedQueue[SystemMessage]() - def systemDequeue(): SystemEnvelope = systemMessages.poll() + def systemEnqueue(message: SystemMessage): Unit = systemMessages offer message + + def systemDequeue(): SystemMessage = systemMessages.poll() def hasSystemMessages: Boolean = !systemMessages.isEmpty } @@ -255,17 +256,18 @@ trait QueueBasedMessageQueue extends MessageQueue { * Mailbox configuration. */ trait MailboxType { - def create(dispatcher: MessageDispatcher): Mailbox + def create(dispatcher: MessageDispatcher, receiver: ActorCell): Mailbox } /** * It's a case class for Java (new UnboundedMailbox) */ case class UnboundedMailbox() extends MailboxType { - override def create(_dispatcher: MessageDispatcher) = new Mailbox with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { - final val queue = new ConcurrentLinkedQueue[Envelope]() - final val dispatcher = _dispatcher - } + override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) = + new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + final val queue = new ConcurrentLinkedQueue[Envelope]() + final val dispatcher = _dispatcher + } } case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { @@ -273,18 +275,20 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(_dispatcher: MessageDispatcher) = new Mailbox with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { - final val queue = new LinkedBlockingQueue[Envelope](capacity) - final val pushTimeOut = BoundedMailbox.this.pushTimeOut - final val dispatcher = _dispatcher - } + override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) = + new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { + final val queue = new LinkedBlockingQueue[Envelope](capacity) + final val pushTimeOut = BoundedMailbox.this.pushTimeOut + final val dispatcher = _dispatcher + } } case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { - override def create(_dispatcher: MessageDispatcher) = new Mailbox with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { - final val queue = new PriorityBlockingQueue[Envelope](11, cmp) - final val dispatcher = _dispatcher - } + override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) = + new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { + final val queue = new PriorityBlockingQueue[Envelope](11, cmp) + final val dispatcher = _dispatcher + } } case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) extends MailboxType { @@ -292,10 +296,11 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(_dispatcher: MessageDispatcher) = new Mailbox with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { - final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) - final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut - final val dispatcher = _dispatcher - } + override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) = + new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { + final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) + final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut + final val dispatcher = _dispatcher + } } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 9cf9992809..829cce72c3 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -106,7 +106,7 @@ object CallingThreadDispatcher { class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling-thread", val warnings: Boolean = true) extends MessageDispatcher(_app) { import CallingThreadDispatcher._ - protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this) + protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this, actor) private def getMailbox(actor: ActorCell) = actor.mailbox.asInstanceOf[CallingThreadMailbox] @@ -140,11 +140,11 @@ class CallingThreadDispatcher(_app: AkkaApplication, val name: String = "calling override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor).queue.isEmpty - protected[akka] override def systemDispatch(handle: SystemEnvelope) { - val mbox = getMailbox(handle.receiver) + protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) { + val mbox = getMailbox(receiver) mbox.lock.lock try { - handle.invoke() + receiver systemInvoke message } finally { mbox.lock.unlock } @@ -241,7 +241,7 @@ class NestingQueue { def isActive = active } -class CallingThreadMailbox(val dispatcher: MessageDispatcher) extends Mailbox with DefaultSystemMessageQueue { +class CallingThreadMailbox(val dispatcher: MessageDispatcher, _receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue { private val q = new ThreadLocal[NestingQueue]() { override def initialValue = new NestingQueue