diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 8d1230a3df..36093f86a0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -353,14 +353,8 @@ abstract class ActorModelSpec extends AkkaSpec { val stopLatch = new CountDownLatch(num) val waitTime = (30 seconds).dilated.toMillis val boss = actorOf(Props(context ⇒ { - case "run" ⇒ - for (_ ← 1 to num) { - val child = context.actorOf(props) - context.self startsMonitoring child - child ! cachedMessage - } - case Terminated(child) ⇒ - stopLatch.countDown() + case "run" ⇒ for (_ ← 1 to num) (context.self startsMonitoring context.actorOf(props)) ! cachedMessage + case Terminated(child) ⇒ stopLatch.countDown() }).withDispatcher(wavesSupervisorDispatcher(dispatcher))) boss ! "run" assertCountDown(cachedMessage.latch, waitTime, "Counting down from " + num) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index c5ee2f1dfb..ea9cda5d66 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -141,8 +141,8 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn class DefaultMailboxSpec extends MailboxSpec { lazy val name = "The default mailbox implementation" def factory = { - case u: UnboundedMailbox ⇒ u.create(null, null) - case b: BoundedMailbox ⇒ b.create(null, null) + case u: UnboundedMailbox ⇒ u.create(null) + case b: BoundedMailbox ⇒ b.create(null) } } @@ -150,7 +150,7 @@ class PriorityMailboxSpec extends MailboxSpec { val comparator = PriorityGenerator(_.##) lazy val name = "The priority mailbox implementation" def factory = { - case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(null, null) - case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null, null) + case UnboundedMailbox() ⇒ UnboundedPriorityMailbox(comparator).create(null) + case BoundedMailbox(capacity, pushTimeOut) ⇒ BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(null) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 1ceb357c76..cd993a040c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -212,7 +212,6 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF val deadLetters = new DeadLetterActorRef(this) val deadLetterMailbox = new Mailbox(null) { becomeClosed() - override def dispatcher = null //MessageDispatcher.this override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } override def dequeue() = null override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index b64b7ee513..c311067270 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -64,8 +64,6 @@ class BalancingDispatcher( final def numberOfMessages: Int = messageQueue.numberOfMessages final def hasMessages: Boolean = messageQueue.hasMessages - - final val dispatcher = BalancingDispatcher.this } protected[akka] override def register(actor: ActorCell) = { diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index bf12a5cafc..89c1647c5c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -98,14 +98,15 @@ class Dispatcher( } } - protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(this, actor) + protected[akka] def createMailbox(actor: ActorCell): Mailbox = mailboxType.create(actor) protected[akka] def start {} protected[akka] def shutdown { val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) - if (old ne null) - old.shutdownNow() + if (old ne null) { + old.shutdown() + } } /** diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 007f117bc3..745bade6b8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -4,12 +4,10 @@ package akka.dispatch import akka.AkkaException -import java.util.{ Comparator, PriorityQueue } +import java.util.{ Comparator, PriorityQueue, Queue } import akka.util._ -import java.util.Queue -import akka.actor.{ ActorContext, ActorCell, ActorRef } +import akka.actor.{ ActorCell, ActorRef } import java.util.concurrent._ -import atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } import annotation.tailrec import akka.event.Logging.Error @@ -31,7 +29,7 @@ object Mailbox { final val Scheduled = 4 // mailbox debugging helper using println (see below) - // TODO take this out before release + // FIXME TODO take this out before release final val debug = false } @@ -175,7 +173,6 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag do { if (debug) println(actor.self + " processing message " + nextMessage) actor invoke nextMessage - processAllSystemMessages() //After we're done, process all system messages nextMessage = if (isActive) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries @@ -210,7 +207,8 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } } - def dispatcher: MessageDispatcher + @inline + final def dispatcher: MessageDispatcher = actor.dispatcher } trait MessageQueue { @@ -299,17 +297,16 @@ trait QueueBasedMessageQueue extends MessageQueue { * Mailbox configuration. */ trait MailboxType { - def create(dispatcher: MessageDispatcher, receiver: ActorCell): Mailbox + def create(receiver: ActorCell): Mailbox } /** * It's a case class for Java (new UnboundedMailbox) */ case class UnboundedMailbox() extends MailboxType { - override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) = + override def create(receiver: ActorCell) = new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new ConcurrentLinkedQueue[Envelope]() - final val dispatcher = _dispatcher } } @@ -318,19 +315,17 @@ case class BoundedMailbox( final val capacity: Int, final val pushTimeOut: Durat if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) = + override def create(receiver: ActorCell) = new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new LinkedBlockingQueue[Envelope](capacity) final val pushTimeOut = BoundedMailbox.this.pushTimeOut - final val dispatcher = _dispatcher } } case class UnboundedPriorityMailbox( final val cmp: Comparator[Envelope]) extends MailboxType { - override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) = + override def create(receiver: ActorCell) = new Mailbox(receiver) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new PriorityBlockingQueue[Envelope](11, cmp) - final val dispatcher = _dispatcher } } @@ -339,11 +334,10 @@ case class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final va if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(_dispatcher: MessageDispatcher, receiver: ActorCell) = + override def create(receiver: ActorCell) = new Mailbox(receiver) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue { final val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) final val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut - final val dispatcher = _dispatcher } } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 58ad446728..fc01045e79 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -106,7 +106,7 @@ private[testkit] object CallingThreadDispatcher { class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thread") extends MessageDispatcher(_app) { import CallingThreadDispatcher._ - protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this, actor) + protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor) private def getMailbox(actor: ActorCell): Option[CallingThreadMailbox] = actor.mailbox match { case m: CallingThreadMailbox ⇒ Some(m) @@ -257,7 +257,7 @@ class NestingQueue { def isActive = active } -class CallingThreadMailbox(val dispatcher: MessageDispatcher, _receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue { +class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue { private val q = new ThreadLocal[NestingQueue]() { override def initialValue = {