diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index b382dc3045..a39475ba8d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -194,18 +194,18 @@ object ActorModelSpec { assert(stats.restarts.get() === restarts, "Restarts") } - def await(condition: ⇒ Boolean)(withinMs: Long, intervalMs: Long = 25): Boolean = try { + def await(condition: ⇒ Boolean)(withinMs: Long, intervalMs: Long = 25): Unit = try { val until = System.currentTimeMillis() + withinMs while (System.currentTimeMillis() <= until) { try { - if (condition) return true + if (condition) return Thread.sleep(intervalMs) } catch { case e: InterruptedException ⇒ } } - false + assert(0 === 1, "await failed") } def newTestActor(implicit d: MessageDispatcherInterceptor) = actorOf(Props[DispatcherActor].withDispatcher(d)) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index bf3a37c257..362bb45382 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -148,6 +148,7 @@ class Dispatcher( protected override def cleanUpMailboxFor(actor: ActorCell) { val m = actor.mailbox + actor.mailbox = deadLetterMailbox //FIXME switch to getAndSet semantics if (m.hasMessages) { var invocation = m.dequeue lazy val exception = new ActorKilledException("Actor has been stopped") diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index 006065047a..89282909d8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -123,25 +123,17 @@ trait BoundedMessageQueueSemantics { self: BlockingQueueMailbox ⇒ } trait QueueMailbox extends Mailbox { - val queue: Queue[Envelope] + def queue: Queue[Envelope] final def numberOfMessages = queue.size final def hasMessages = !queue.isEmpty } -trait BlockingQueueMailbox extends QueueMailbox { - val queue: BlockingQueue[Envelope] +abstract class NonblockingQueueMailbox(val dispatcher: MessageDispatcher) extends QueueMailbox { + def queue: Queue[Envelope] } -abstract class PriorityBlockingQueueMailbox(cmp: Comparator[Envelope], val dispatcher: MessageDispatcher) extends BlockingQueueMailbox { - override val queue = new PriorityBlockingQueue(11, cmp) // 11 is the default initial capacity in PriorityQueue.java -} - -abstract class ConcurrentLinkedQueueMailbox(val dispatcher: MessageDispatcher) extends QueueMailbox { - override val queue = new ConcurrentLinkedQueue[Envelope] -} - -abstract class LinkedBlockingQueueMailbox(val dispatcher: MessageDispatcher) extends BlockingQueueMailbox { - override val queue = new LinkedBlockingQueue[Envelope] +abstract class BlockingQueueMailbox(val dispatcher: MessageDispatcher) extends QueueMailbox { + def queue: BlockingQueue[Envelope] } /** @@ -152,7 +144,9 @@ trait MailboxType { } case class UnboundedMailbox() extends MailboxType { - override def create(dispatcher: MessageDispatcher) = new ConcurrentLinkedQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics with DefaultSystemMessageImpl + override def create(dispatcher: MessageDispatcher) = new NonblockingQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics with DefaultSystemMessageImpl { + val queue = new ConcurrentLinkedQueue[Envelope]() + } } case class BoundedMailbox( @@ -162,14 +156,16 @@ case class BoundedMailbox( if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(dispatcher: MessageDispatcher) = new LinkedBlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics with DefaultSystemMessageImpl { - val capacity = BoundedMailbox.this.capacity + override def create(dispatcher: MessageDispatcher) = new BlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics with DefaultSystemMessageImpl { + val queue = new LinkedBlockingQueue[Envelope](capacity) val pushTimeOut = BoundedMailbox.this.pushTimeOut } } case class UnboundedPriorityMailbox(cmp: Comparator[Envelope]) extends MailboxType { - override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with UnboundedMessageQueueSemantics with DefaultSystemMessageImpl + override def create(dispatcher: MessageDispatcher) = new BlockingQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics with DefaultSystemMessageImpl { + val queue = new PriorityBlockingQueue[Envelope](11, cmp) + } } case class BoundedPriorityMailbox( @@ -180,8 +176,8 @@ case class BoundedPriorityMailbox( if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with BoundedMessageQueueSemantics with DefaultSystemMessageImpl { - val capacity = BoundedPriorityMailbox.this.capacity + override def create(dispatcher: MessageDispatcher) = new BlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics with DefaultSystemMessageImpl { + val queue = new BoundedBlockingQueue[Envelope](capacity, new PriorityQueue[Envelope](11, cmp)) val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut } } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index ba75df13e0..b64541cd17 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -175,7 +175,6 @@ abstract class MessageDispatcher extends Serializable { protected[akka] def unregister(actor: ActorCell) = { if (uuids remove actor.uuid) { cleanUpMailboxFor(actor) - actor.mailbox = deadLetterMailbox if (uuids.isEmpty && _tasks.get == 0) { shutdownSchedule match { case UNSCHEDULED ⇒ diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala index dc5b0637b7..4d653b557d 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala @@ -95,8 +95,6 @@ case class DurableDispatcher( throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from ?") super.dispatch(invocation) } - - protected override def cleanUpMailboxFor(actorRef: LocalActorRef) {} //No need to clean up Futures since we don't support them } /**