diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 8e1bc597d9..c5ee2f1dfb 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -39,13 +39,13 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val exampleMessage = createMessageInvocation("test") - for (i ← 1 to config.capacity) q.enqueue(exampleMessage) + for (i ← 1 to config.capacity) q.enqueue(null, exampleMessage) q.numberOfMessages must be === config.capacity q.hasMessages must be === true intercept[MessageQueueAppendFailedException] { - q.enqueue(exampleMessage) + q.enqueue(null, exampleMessage) } q.dequeue must be === exampleMessage @@ -103,7 +103,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn def createProducer(fromNum: Int, toNum: Int): Future[Vector[Envelope]] = spawn { val messages = Vector() ++ (for (i ← fromNum to toNum) yield createMessageInvocation(i)) - for (i ← messages) q.enqueue(i) + for (i ← messages) q.enqueue(null, i) messages } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 3c8dd0bacc..26a3ef7eed 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -364,7 +364,7 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef { throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) } -case class DeadLetter(message: Any, sender: ActorRef) +case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) object DeadLetterActorRef { class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance? @@ -387,11 +387,13 @@ class DeadLetterActorRef(val app: ActorSystem) extends MinimalActorRef { override def isShutdown(): Boolean = true - override def tell(msg: Any, sender: ActorRef): Unit = - app.eventStream.publish(DeadLetter(msg, sender)) + override def tell(msg: Any, sender: ActorRef): Unit = msg match { + case d: DeadLetter ⇒ app.eventStream.publish(d) + case _ ⇒ app.eventStream.publish(DeadLetter(msg, sender, this)) + } override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { - app.eventStream.publish(DeadLetter(message, this)) + app.eventStream.publish(DeadLetter(message, app.provider.dummyAskSender, this)) brokenPromise } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 7e1c881359..7a9fbf0a38 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -49,6 +49,7 @@ trait ActorRefProvider { private[akka] def terminationFuture: Future[ActorSystem.ExitStatus] + private[akka] def dummyAskSender: ActorRef } /** @@ -277,6 +278,8 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { a.result } } + + private[akka] val dummyAskSender = new DeadLetterActorRef(app) } class LocalDeathWatch extends DeathWatch with ActorClassification { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 521e3d3d7e..4e18d07c38 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -9,7 +9,7 @@ import akka.event._ import akka.util.duration._ import java.net.InetAddress import com.eaio.uuid.UUID -import akka.dispatch.{ Dispatchers, Future } +import akka.dispatch.{ Dispatchers, Future, Mailbox, Envelope, SystemMessage } import akka.util.Duration import akka.util.ReflectiveAccess import akka.serialization.Serialization @@ -210,6 +210,17 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF // TODO think about memory consistency effects when doing funky stuff inside constructor val deadLetters = new DeadLetterActorRef(this) + val deadLetterMailbox = new Mailbox(null) { + becomeClosed() + override def dispatcher = null //MessageDispatcher.this + override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } + override def dequeue() = null + override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } + override def systemDrain(): SystemMessage = null + override def hasMessages = false + override def hasSystemMessages = false + override def numberOfMessages = 0 + } val deathWatch = provider.createDeathWatch() diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 291b8e1803..3359251658 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -100,21 +100,9 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { protected[akka] def createMailbox(actor: ActorCell): Mailbox /** - * Create a blackhole mailbox for the purpose of replacing the real one upon actor termination + * a blackhole mailbox for the purpose of replacing the real one upon actor termination */ - protected[akka] val deadLetterMailbox: Mailbox = DeadLetterMailbox - - object DeadLetterMailbox extends Mailbox(null) { - becomeClosed() - override def dispatcher = null //MessageDispatcher.this - override def enqueue(envelope: Envelope) = () - override def dequeue() = null - override def systemEnqueue(handle: SystemMessage): Unit = () - override def systemDrain(): SystemMessage = null - override def hasMessages = false - override def hasSystemMessages = false - override def numberOfMessages = 0 - } + import app.deadLetterMailbox /** * Name of this dispatcher. @@ -225,7 +213,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { // message must be “virgin” before being able to systemEnqueue again val next = message.next message.next = null - deadLetterMailbox.systemEnqueue(message) + deadLetterMailbox.systemEnqueue(actor.self, message) message = next } } @@ -233,7 +221,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { if (mailBox.hasMessages) { var envelope = mailBox.dequeue while (envelope ne null) { - deadLetterMailbox.enqueue(envelope) + deadLetterMailbox.enqueue(actor.self, envelope) envelope = mailBox.dequeue } } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 33545d551f..b64b7ee513 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -5,7 +5,7 @@ package akka.dispatch import util.DynamicVariable -import akka.actor.{ ActorCell, Actor, IllegalActorStateException } +import akka.actor.{ ActorCell, Actor, IllegalActorStateException, ActorRef } import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } import java.util.{ Comparator, Queue } import annotation.tailrec @@ -37,6 +37,8 @@ class BalancingDispatcher( _timeoutMs: Long) extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { + import app.deadLetterMailbox + private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) protected val messageQueue: MessageQueue = mailboxType match { @@ -55,7 +57,7 @@ class BalancingDispatcher( protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { - final def enqueue(handle: Envelope) = messageQueue.enqueue(handle) + final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle) final def dequeue(): Envelope = messageQueue.dequeue() @@ -86,7 +88,7 @@ class BalancingDispatcher( if (mailBox.hasSystemMessages) { var messages = mailBox.systemDrain() while (messages ne null) { - deadLetterMailbox.systemEnqueue(messages) //Send to dead letter queue + deadLetterMailbox.systemEnqueue(actor.self, messages) //Send to dead letter queue messages = messages.next if (messages eq null) //Make sure that any system messages received after the current drain are also sent to the dead letter mbox messages = mailBox.systemDrain() @@ -112,7 +114,7 @@ class BalancingDispatcher( } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { - messageQueue enqueue invocation + messageQueue.enqueue(receiver.self, invocation) intoTheFray(except = receiver) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 065c2b4528..bf12a5cafc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -78,13 +78,13 @@ class Dispatcher( protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { val mbox = receiver.mailbox - mbox enqueue invocation + mbox.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) } protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage) = { val mbox = receiver.mailbox - mbox systemEnqueue invocation + mbox.systemEnqueue(receiver.self, invocation) registerForExecution(mbox, false, true) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 34729914c2..741e6546e1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -7,7 +7,7 @@ import akka.AkkaException import java.util.{ Comparator, PriorityQueue } import akka.util._ import java.util.Queue -import akka.actor.{ ActorContext, ActorCell } +import akka.actor.{ ActorContext, ActorCell, ActorRef } import java.util.concurrent._ import atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } import annotation.tailrec @@ -217,7 +217,7 @@ trait MessageQueue { /* * These method need to be implemented in subclasses; they should not rely on the internal stuff above. */ - def enqueue(handle: Envelope) + def enqueue(receiver: ActorRef, handle: Envelope) def dequeue(): Envelope @@ -230,7 +230,7 @@ trait SystemMessageQueue { /** * Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list. */ - def systemEnqueue(message: SystemMessage): Unit + def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit /** * Dequeue all messages from system queue and return them as single-linked list. @@ -243,7 +243,7 @@ trait SystemMessageQueue { trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec - final def systemEnqueue(message: SystemMessage): Unit = { + final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = { assert(message.next eq null) if (Mailbox.debug) println(actor + " having enqueued " + message) val head = systemQueueGet @@ -256,7 +256,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ message.next = head if (!systemQueuePut(head, message)) { message.next = null - systemEnqueue(message) + systemEnqueue(receiver, message) } } @@ -270,7 +270,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ } trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue { - final def enqueue(handle: Envelope): Unit = queue add handle + final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle final def dequeue(): Envelope = queue.poll() } @@ -278,7 +278,7 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { def pushTimeOut: Duration override def queue: BlockingQueue[Envelope] - final def enqueue(handle: Envelope) { + final def enqueue(receiver: ActorRef, handle: Envelope) { if (pushTimeOut.length > 0) { queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 60906bacc6..e501373fef 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -241,6 +241,8 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider { private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within) + + private[akka] def dummyAskSender = local.dummyAskSender } /** diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index cb0d86099f..781289edea 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -10,9 +10,8 @@ import java.util.concurrent.RejectedExecutionException import akka.util.Switch import java.lang.ref.WeakReference import scala.annotation.tailrec -import akka.actor.ActorCell +import akka.actor.{ ActorCell, ActorRef, ActorSystem } import akka.dispatch._ -import akka.actor.ActorSystem /* * Locking rules: @@ -129,16 +128,17 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr } override def resume(actor: ActorCell) { - val mboxopt = getMailbox(actor) - if (mboxopt.isEmpty) return - val mbox = mboxopt.get - val queue = mbox.queue - val wasActive = queue.isActive - val switched = mbox.suspendSwitch.switchOff { - gatherFromAllOtherQueues(mbox, queue) - } - if (switched && !wasActive) { - runQueue(mbox, queue) + actor.mailbox match { + case mbox: CallingThreadMailbox ⇒ + val queue = mbox.queue + val wasActive = queue.isActive + val switched = mbox.suspendSwitch.switchOff { + gatherFromAllOtherQueues(mbox, queue) + } + if (switched && !wasActive) { + runQueue(mbox, queue) + } + case m ⇒ m.systemEnqueue(actor.self, Resume()) } } @@ -147,35 +147,37 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor) map (_.queue.isEmpty) getOrElse true protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) { - val mboxopt = getMailbox(receiver) - if (mboxopt.isEmpty) return - val mbox = mboxopt.get - mbox.systemEnqueue(message) - val queue = mbox.queue - if (!queue.isActive) { - queue.enter - runQueue(mbox, queue) + receiver.mailbox match { + case mbox: CallingThreadMailbox ⇒ + mbox.systemEnqueue(receiver.self, message) + val queue = mbox.queue + if (!queue.isActive) { + queue.enter + runQueue(mbox, queue) + } + case m ⇒ m.systemEnqueue(receiver.self, message) } } protected[akka] override def dispatch(receiver: ActorCell, handle: Envelope) { - val mboxopt = getMailbox(receiver) - if (mboxopt.isEmpty) return - val mbox = mboxopt.get - val queue = mbox.queue - val execute = mbox.suspendSwitch.fold { - queue.push(handle) - false - } { - queue.push(handle) - if (queue.isActive) - false - else { - queue.enter - true - } + receiver.mailbox match { + case mbox: CallingThreadMailbox ⇒ + val queue = mbox.queue + val execute = mbox.suspendSwitch.fold { + queue.push(handle) + false + } { + queue.push(handle) + if (queue.isActive) + false + else { + queue.enter + true + } + } + if (execute) runQueue(mbox, queue) + case m ⇒ m.enqueue(receiver.self, handle) } - if (execute) runQueue(mbox, queue) } protected[akka] override def executeTask(invocation: TaskInvocation) { invocation.run } @@ -270,7 +272,7 @@ class CallingThreadMailbox(val dispatcher: MessageDispatcher, _receiver: ActorCe val lock = new ReentrantLock val suspendSwitch = new Switch - override def enqueue(msg: Envelope) {} + override def enqueue(receiver: ActorRef, msg: Envelope) {} override def dequeue() = null override def hasMessages = true override def numberOfMessages = 0 diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 70536deb90..2e6c58ce81 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -3,13 +3,15 @@ */ package akka.testkit +import scala.annotation.tailrec import scala.util.matching.Regex -import akka.actor.Actor -import akka.event.Logging._ +import akka.actor.{ DeadLetter, ActorSystem } +import akka.dispatch.SystemMessage +import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug } import akka.event.Logging +import akka.testkit.TestEvent.{ UnMute, Mute } import akka.util.Duration -import akka.actor.ActorSystem /** * Implementation helpers of the EventFilter facilities: send `Mute` @@ -443,10 +445,16 @@ class TestEventListener extends Logging.DefaultLogger { var filters: List[EventFilter] = Nil override def receive = { - case InitializeLogger(bus) ⇒ Seq(classOf[Mute], classOf[UnMute]) foreach (bus.subscribe(context.self, _)) + case InitializeLogger(bus) ⇒ Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter]) foreach (bus.subscribe(context.self, _)) case Mute(filters) ⇒ filters foreach addFilter case UnMute(filters) ⇒ filters foreach removeFilter case event: LogEvent ⇒ if (!filter(event)) print(event) + case DeadLetter(msg: SystemMessage, null, rcp) ⇒ + val event = Warning(rcp, "received dead system message: " + msg) + if (!filter(event)) print(event) + case DeadLetter(msg, snd, rcp) ⇒ + val event = Warning(rcp, "received dead letter from " + snd + ": " + msg) + if (!filter(event)) print(event) } def filter(event: LogEvent): Boolean = filters exists (f ⇒ try { f(event) } catch { case e: Exception ⇒ false })