diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 173934c281..e05a51f286 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -267,8 +267,8 @@ class ActorRefSpec extends WordSpec with MustMatchers { val inetAddress = ReflectiveAccess.RemoteModule.configDefaultAddress val expectedSerializedRepresentation = SerializedActorRef( - newUuid, - "nonsense", + a.uuid, + a.address, inetAddress.getAddress.getHostAddress, inetAddress.getPort, a.timeout) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 00ff5c7817..b382dc3045 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -317,8 +317,12 @@ abstract class ActorModelSpec extends JUnitSuite { assertCountDown(bParallel, Testing.testTime(3000), "Should process other actors in parallel") aStop.countDown() - a.stop() - b.stop() + + a.stop + b.stop + + while (a.isRunning && b.isRunning) {} //Busy wait for termination + assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala index 9e697c3af4..b90af71f74 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala @@ -80,8 +80,8 @@ class BalancingDispatcherSpec extends JUnitSuite with MustMatchers { } finishedCounter.await(5, TimeUnit.SECONDS) - fast.underlying.mailbox.asInstanceOf[Mailbox].isEmpty must be(true) - slow.underlying.mailbox.asInstanceOf[Mailbox].isEmpty must be(true) + fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) + slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > sentToFast fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > (slow.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index a4625fe648..130881b3a0 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -41,16 +41,16 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte for (i ← 1 to config.capacity) q.enqueue(exampleMessage) - q.size must be === config.capacity - q.isEmpty must be === false + q.numberOfMessages must be === config.capacity + q.hasMessages must be === true intercept[MessageQueueAppendFailedException] { q.enqueue(exampleMessage) } q.dequeue must be === exampleMessage - q.size must be(config.capacity - 1) - q.isEmpty must be === false + q.numberOfMessages must be(config.capacity - 1) + q.hasMessages must be === true } "dequeue what was enqueued properly for unbounded mailboxes" in { @@ -97,8 +97,8 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte } case _ ⇒ } - q.size must be === 0 - q.isEmpty must be === true + q.numberOfMessages must be === 0 + q.hasMessages must be === false } def testEnqueueDequeue(config: MailboxType) { @@ -119,7 +119,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte def createConsumer: Future[Vector[Envelope]] = spawn { var r = Vector[Envelope]() - while (producers.exists(_.isCompleted == false) || !q.isEmpty) { + while (producers.exists(_.isCompleted == false) || q.hasMessages) { q.dequeue match { case null ⇒ case message ⇒ r = r :+ message diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index ba09de903b..5cc7d5efe2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -264,7 +264,7 @@ class LocalActorRef private[akka] ( protected[akka] def underlyingActorInstance: Actor = { var instance = actorCell.actor.get - while (instance eq null) { + while ((instance eq null) && actorCell.isRunning) { try { Thread.sleep(1) } catch { case i: InterruptedException ⇒ } instance = actorCell.actor.get } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 88b7aeb184..c81b7eb510 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -65,7 +65,7 @@ class BalancingDispatcher( override protected[akka] def dispatch(invocation: Envelope) = { val mbox = invocation.receiver.mailbox - if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) { + if (donationInProgress.value == false && (mbox.hasMessages || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) { //We were busy and we got to donate the message to some other lucky guy, we're done here } else { mbox enqueue invocation @@ -79,7 +79,7 @@ class BalancingDispatcher( while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor } finally { donationInProgress.value = false } - if (!mbox.isEmpty) //If we still have messages left to process, reschedule for execution + if (mbox.hasMessages) //If we still have messages left to process, reschedule for execution super.reRegisterForExecution(mbox) } @@ -131,10 +131,11 @@ class BalancingDispatcher( while ((i < prSz) && (recipient eq null)) { val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap - val mbox = actor.mailbox - - if ((mbox ne donorMbox) && mbox.isEmpty) { //Don't donate to yourself - recipient = actor //Found! + actor.mailbox match { + case `donorMbox` | `deadLetterMailbox` ⇒ //Not interesting + case mbox ⇒ + if (!mbox.hasMessages) //Don't donate to yourself + recipient = actor //Found! } i += 1 diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 9f7ee83c06..bf3a37c257 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -128,7 +128,7 @@ class Dispatcher( protected[akka] def registerForExecution(mbox: Mailbox): Unit = { if (mbox.dispatcherLock.tryLock()) { - if (active.isOn && (!mbox.suspended.locked || !mbox.systemMessages.isEmpty)) { //If the dispatcher is active and the actor not suspended + if (active.isOn && (!mbox.suspended.locked || mbox.hasSystemMessages)) { //If the dispatcher is active and the actor not suspended try { executorService.get() execute mbox } catch { @@ -148,7 +148,7 @@ class Dispatcher( protected override def cleanUpMailboxFor(actor: ActorCell) { val m = actor.mailbox - if (!m.isEmpty) { + if (m.hasMessages) { var invocation = m.dequeue lazy val exception = new ActorKilledException("Actor has been stopped") while (invocation ne null) { @@ -156,6 +156,9 @@ class Dispatcher( invocation = m.dequeue } } + while (m.systemDequeue() ne null) { + //Empty the system messages + } } override val toString = getClass.getSimpleName + "[" + name + "]" diff --git a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala index edb04c9a4d..006065047a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MailboxHandling.scala @@ -6,10 +6,10 @@ package akka.dispatch import akka.AkkaException import java.util.{ Comparator, PriorityQueue } -import java.util.concurrent._ import akka.util._ import java.util.Queue import akka.actor.ActorContext +import java.util.concurrent._ class MessageQueueAppendFailedException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -22,19 +22,13 @@ trait Mailbox extends Runnable { */ final val dispatcherLock = new SimpleLock(startLocked = false) final val suspended = new SimpleLock(startLocked = false) //(startLocked = true) - final val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]() - - final def systemEnqueue(handle: SystemEnvelope): Unit = systemMessages.offer(handle) - final def systemDequeue(): SystemEnvelope = systemMessages.poll() - - def dispatcher: MessageDispatcher final def run = { try { processMailbox() } catch { case ie: InterruptedException ⇒ Thread.currentThread().interrupt() //Restore interrupt } finally { dispatcherLock.unlock() - if (!isEmpty || !systemMessages.isEmpty) + if (hasMessages || hasSystemMessages) dispatcher.reRegisterForExecution(this) } } @@ -87,8 +81,24 @@ trait Mailbox extends Runnable { def enqueue(handle: Envelope) def dequeue(): Envelope - def size: Int - def isEmpty: Boolean + def numberOfMessages: Int + + def systemEnqueue(handle: SystemEnvelope): Unit + def systemDequeue(): SystemEnvelope + + def hasMessages: Boolean + def hasSystemMessages: Boolean + + def dispatcher: MessageDispatcher +} + +trait DefaultSystemMessageImpl { self: Mailbox ⇒ + val systemMessages = new ConcurrentLinkedQueue[SystemEnvelope]() + + def systemEnqueue(handle: SystemEnvelope): Unit = systemMessages offer handle + def systemDequeue(): SystemEnvelope = systemMessages.poll() + + def hasSystemMessages: Boolean = !systemMessages.isEmpty } trait UnboundedMessageQueueSemantics { self: QueueMailbox ⇒ @@ -114,8 +124,8 @@ trait BoundedMessageQueueSemantics { self: BlockingQueueMailbox ⇒ trait QueueMailbox extends Mailbox { val queue: Queue[Envelope] - final def size = queue.size - final def isEmpty = queue.isEmpty + final def numberOfMessages = queue.size + final def hasMessages = !queue.isEmpty } trait BlockingQueueMailbox extends QueueMailbox { @@ -142,7 +152,7 @@ trait MailboxType { } case class UnboundedMailbox() extends MailboxType { - override def create(dispatcher: MessageDispatcher) = new ConcurrentLinkedQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics + override def create(dispatcher: MessageDispatcher) = new ConcurrentLinkedQueueMailbox(dispatcher) with UnboundedMessageQueueSemantics with DefaultSystemMessageImpl } case class BoundedMailbox( @@ -152,14 +162,14 @@ case class BoundedMailbox( if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(dispatcher: MessageDispatcher) = new LinkedBlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics { + override def create(dispatcher: MessageDispatcher) = new LinkedBlockingQueueMailbox(dispatcher) with BoundedMessageQueueSemantics with DefaultSystemMessageImpl { val capacity = BoundedMailbox.this.capacity val pushTimeOut = BoundedMailbox.this.pushTimeOut } } case class UnboundedPriorityMailbox(cmp: Comparator[Envelope]) extends MailboxType { - override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with UnboundedMessageQueueSemantics + override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with UnboundedMessageQueueSemantics with DefaultSystemMessageImpl } case class BoundedPriorityMailbox( @@ -170,7 +180,7 @@ case class BoundedPriorityMailbox( if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") - override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with BoundedMessageQueueSemantics { + override def create(dispatcher: MessageDispatcher) = new PriorityBlockingQueueMailbox(cmp, dispatcher) with BoundedMessageQueueSemantics with DefaultSystemMessageImpl { val capacity = BoundedPriorityMailbox.this.capacity val pushTimeOut = BoundedPriorityMailbox.this.pushTimeOut } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 2586df3ec1..372ff9a5f2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -78,12 +78,17 @@ abstract class MessageDispatcher extends Serializable { /** * Create a blackhole mailbox for the purpose of replacing the real one upon actor termination */ - protected[akka] def createDeadletterMailbox = new Mailbox { - override def dispatcher = MessageDispatcher.this - override def enqueue(envelope: Envelope) {} + protected[akka] val deadLetterMailbox = new Mailbox { + override def dispatcher = null //MessageDispatcher.this + dispatcherLock.tryLock() + + override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") } override def dequeue() = null - override def isEmpty = true - override def size = 0 + override def systemEnqueue(handle: SystemEnvelope): Unit = () + override def systemDequeue(): SystemEnvelope = null + override def hasMessages = false + override def hasSystemMessages = false + override def numberOfMessages = 0 } /** @@ -170,7 +175,7 @@ abstract class MessageDispatcher extends Serializable { protected[akka] def unregister(actor: ActorCell) = { if (uuids remove actor.uuid) { cleanUpMailboxFor(actor) - actor.mailbox = createDeadletterMailbox + actor.mailbox = deadLetterMailbox if (uuids.isEmpty && _tasks.get == 0) { shutdownSchedule match { case UNSCHEDULED ⇒ @@ -274,12 +279,12 @@ abstract class MessageDispatcher extends Serializable { /** * Returns the size of the mailbox for the specified actor */ - def mailboxSize(actor: ActorCell): Int = actor.mailbox.size + def mailboxSize(actor: ActorCell): Int = actor.mailbox.numberOfMessages /** * Returns the "current" emptiness status of the mailbox for the specified actor */ - def mailboxIsEmpty(actor: ActorCell): Boolean = actor.mailbox.isEmpty + def mailboxIsEmpty(actor: ActorCell): Boolean = actor.mailbox.hasMessages /** * Returns the amount of tasks queued for execution diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index e5c75a1940..aca3852e66 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -243,7 +243,7 @@ class NestingQueue { def isActive = active } -class CallingThreadMailbox(val dispatcher: MessageDispatcher) extends Mailbox { +class CallingThreadMailbox(val dispatcher: MessageDispatcher) extends Mailbox with DefaultSystemMessageImpl { private val q = new ThreadLocal[NestingQueue]() { override def initialValue = new NestingQueue @@ -256,6 +256,6 @@ class CallingThreadMailbox(val dispatcher: MessageDispatcher) extends Mailbox { override def enqueue(msg: Envelope) {} override def dequeue() = null - override def isEmpty = true - override def size = 0 + override def hasMessages = true + override def numberOfMessages = 0 }