From 1ba168774fa0828d2f009511d91acd68fac22aa8 Mon Sep 17 00:00:00 2001 From: Roland Date: Sat, 12 Nov 2011 10:57:28 +0100 Subject: [PATCH] improve DeadLetter reporting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (since I know now what’s causing these Jenkins failures ;-) ) - include recipient in DeadLetter - include recipient in calls to enqueue/systemEnqueue - move DeadLetterMailbox to ActorSystem (saves some space, too) - hook up DeadLetterMailbox so it sends DeadLetters to app.deadLetters, which publishes them on the eventStream - subscribe TestEventListener to DeadLetter and turn it into Warning The generated warnings about ChildTerminated are very much correct, they remind us that we still need to fix supervisor.stop() to await all children’s death before actually committing suicide. --- .../akka/dispatch/MailboxConfigSpec.scala | 6 +- .../src/main/scala/akka/actor/ActorRef.scala | 10 ++- .../scala/akka/actor/ActorRefProvider.scala | 3 + .../main/scala/akka/actor/ActorSystem.scala | 13 +++- .../akka/dispatch/AbstractDispatcher.scala | 20 +---- .../akka/dispatch/BalancingDispatcher.scala | 10 ++- .../main/scala/akka/dispatch/Dispatcher.scala | 4 +- .../main/scala/akka/dispatch/Mailbox.scala | 14 ++-- .../akka/remote/RemoteActorRefProvider.scala | 2 + .../testkit/CallingThreadDispatcher.scala | 76 ++++++++++--------- .../akka/testkit/TestEventListener.scala | 16 +++- 11 files changed, 96 insertions(+), 78 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 8e1bc597d9..c5ee2f1dfb 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -39,13 +39,13 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val exampleMessage = createMessageInvocation("test") - for (i ← 1 to config.capacity) q.enqueue(exampleMessage) + for (i ← 1 to config.capacity) q.enqueue(null, exampleMessage) q.numberOfMessages must be === config.capacity q.hasMessages must be === true intercept[MessageQueueAppendFailedException] { - q.enqueue(exampleMessage) + q.enqueue(null, exampleMessage) } q.dequeue must be === exampleMessage @@ -103,7 +103,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn def createProducer(fromNum: Int, toNum: Int): Future[Vector[Envelope]] = spawn { val messages = Vector() ++ (for (i ← fromNum to toNum) yield createMessageInvocation(i)) - for (i ← messages) q.enqueue(i) + for (i ← messages) q.enqueue(null, i) messages } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 3c8dd0bacc..26a3ef7eed 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -364,7 +364,7 @@ trait MinimalActorRef extends ActorRef with ScalaActorRef { throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) } -case class DeadLetter(message: Any, sender: ActorRef) +case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) object DeadLetterActorRef { class SerializedDeadLetterActorRef extends Serializable { //TODO implement as Protobuf for performance? @@ -387,11 +387,13 @@ class DeadLetterActorRef(val app: ActorSystem) extends MinimalActorRef { override def isShutdown(): Boolean = true - override def tell(msg: Any, sender: ActorRef): Unit = - app.eventStream.publish(DeadLetter(msg, sender)) + override def tell(msg: Any, sender: ActorRef): Unit = msg match { + case d: DeadLetter ⇒ app.eventStream.publish(d) + case _ ⇒ app.eventStream.publish(DeadLetter(msg, sender, this)) + } override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { - app.eventStream.publish(DeadLetter(message, this)) + app.eventStream.publish(DeadLetter(message, app.provider.dummyAskSender, this)) brokenPromise } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 7e1c881359..7a9fbf0a38 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -49,6 +49,7 @@ trait ActorRefProvider { private[akka] def terminationFuture: Future[ActorSystem.ExitStatus] + private[akka] def dummyAskSender: ActorRef } /** @@ -277,6 +278,8 @@ class LocalActorRefProvider(val app: ActorSystem) extends ActorRefProvider { a.result } } + + private[akka] val dummyAskSender = new DeadLetterActorRef(app) } class LocalDeathWatch extends DeathWatch with ActorClassification { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 521e3d3d7e..4e18d07c38 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -9,7 +9,7 @@ import akka.event._ import akka.util.duration._ import java.net.InetAddress import com.eaio.uuid.UUID -import akka.dispatch.{ Dispatchers, Future } +import akka.dispatch.{ Dispatchers, Future, Mailbox, Envelope, SystemMessage } import akka.util.Duration import akka.util.ReflectiveAccess import akka.serialization.Serialization @@ -210,6 +210,17 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF // TODO think about memory consistency effects when doing funky stuff inside constructor val deadLetters = new DeadLetterActorRef(this) + val deadLetterMailbox = new Mailbox(null) { + becomeClosed() + override def dispatcher = null //MessageDispatcher.this + override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } + override def dequeue() = null + override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } + override def systemDrain(): SystemMessage = null + override def hasMessages = false + override def hasSystemMessages = false + override def numberOfMessages = 0 + } val deathWatch = provider.createDeathWatch() diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 291b8e1803..3359251658 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -100,21 +100,9 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { protected[akka] def createMailbox(actor: ActorCell): Mailbox /** - * Create a blackhole mailbox for the purpose of replacing the real one upon actor termination + * a blackhole mailbox for the purpose of replacing the real one upon actor termination */ - protected[akka] val deadLetterMailbox: Mailbox = DeadLetterMailbox - - object DeadLetterMailbox extends Mailbox(null) { - becomeClosed() - override def dispatcher = null //MessageDispatcher.this - override def enqueue(envelope: Envelope) = () - override def dequeue() = null - override def systemEnqueue(handle: SystemMessage): Unit = () - override def systemDrain(): SystemMessage = null - override def hasMessages = false - override def hasSystemMessages = false - override def numberOfMessages = 0 - } + import app.deadLetterMailbox /** * Name of this dispatcher. @@ -225,7 +213,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { // message must be “virgin” before being able to systemEnqueue again val next = message.next message.next = null - deadLetterMailbox.systemEnqueue(message) + deadLetterMailbox.systemEnqueue(actor.self, message) message = next } } @@ -233,7 +221,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { if (mailBox.hasMessages) { var envelope = mailBox.dequeue while (envelope ne null) { - deadLetterMailbox.enqueue(envelope) + deadLetterMailbox.enqueue(actor.self, envelope) envelope = mailBox.dequeue } } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 33545d551f..b64b7ee513 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -5,7 +5,7 @@ package akka.dispatch import util.DynamicVariable -import akka.actor.{ ActorCell, Actor, IllegalActorStateException } +import akka.actor.{ ActorCell, Actor, IllegalActorStateException, ActorRef } import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, ConcurrentSkipListSet } import java.util.{ Comparator, Queue } import annotation.tailrec @@ -37,6 +37,8 @@ class BalancingDispatcher( _timeoutMs: Long) extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { + import app.deadLetterMailbox + private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) protected val messageQueue: MessageQueue = mailboxType match { @@ -55,7 +57,7 @@ class BalancingDispatcher( protected[akka] override def createMailbox(actor: ActorCell): Mailbox = new SharingMailbox(actor) class SharingMailbox(_actor: ActorCell) extends Mailbox(_actor) with DefaultSystemMessageQueue { - final def enqueue(handle: Envelope) = messageQueue.enqueue(handle) + final def enqueue(receiver: ActorRef, handle: Envelope) = messageQueue.enqueue(receiver, handle) final def dequeue(): Envelope = messageQueue.dequeue() @@ -86,7 +88,7 @@ class BalancingDispatcher( if (mailBox.hasSystemMessages) { var messages = mailBox.systemDrain() while (messages ne null) { - deadLetterMailbox.systemEnqueue(messages) //Send to dead letter queue + deadLetterMailbox.systemEnqueue(actor.self, messages) //Send to dead letter queue messages = messages.next if (messages eq null) //Make sure that any system messages received after the current drain are also sent to the dead letter mbox messages = mailBox.systemDrain() @@ -112,7 +114,7 @@ class BalancingDispatcher( } override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { - messageQueue enqueue invocation + messageQueue.enqueue(receiver.self, invocation) intoTheFray(except = receiver) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 065c2b4528..bf12a5cafc 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -78,13 +78,13 @@ class Dispatcher( protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = { val mbox = receiver.mailbox - mbox enqueue invocation + mbox.enqueue(receiver.self, invocation) registerForExecution(mbox, true, false) } protected[akka] def systemDispatch(receiver: ActorCell, invocation: SystemMessage) = { val mbox = receiver.mailbox - mbox systemEnqueue invocation + mbox.systemEnqueue(receiver.self, invocation) registerForExecution(mbox, false, true) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 34729914c2..741e6546e1 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -7,7 +7,7 @@ import akka.AkkaException import java.util.{ Comparator, PriorityQueue } import akka.util._ import java.util.Queue -import akka.actor.{ ActorContext, ActorCell } +import akka.actor.{ ActorContext, ActorCell, ActorRef } import java.util.concurrent._ import atomic.{ AtomicInteger, AtomicReferenceFieldUpdater } import annotation.tailrec @@ -217,7 +217,7 @@ trait MessageQueue { /* * These method need to be implemented in subclasses; they should not rely on the internal stuff above. */ - def enqueue(handle: Envelope) + def enqueue(receiver: ActorRef, handle: Envelope) def dequeue(): Envelope @@ -230,7 +230,7 @@ trait SystemMessageQueue { /** * Enqueue a new system message, e.g. by prepending atomically as new head of a single-linked list. */ - def systemEnqueue(message: SystemMessage): Unit + def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit /** * Dequeue all messages from system queue and return them as single-linked list. @@ -243,7 +243,7 @@ trait SystemMessageQueue { trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec - final def systemEnqueue(message: SystemMessage): Unit = { + final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = { assert(message.next eq null) if (Mailbox.debug) println(actor + " having enqueued " + message) val head = systemQueueGet @@ -256,7 +256,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ message.next = head if (!systemQueuePut(head, message)) { message.next = null - systemEnqueue(message) + systemEnqueue(receiver, message) } } @@ -270,7 +270,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ } trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue { - final def enqueue(handle: Envelope): Unit = queue add handle + final def enqueue(receiver: ActorRef, handle: Envelope): Unit = queue add handle final def dequeue(): Envelope = queue.poll() } @@ -278,7 +278,7 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { def pushTimeOut: Duration override def queue: BlockingQueue[Envelope] - final def enqueue(handle: Envelope) { + final def enqueue(receiver: ActorRef, handle: Envelope) { if (pushTimeOut.length > 0) { queue.offer(handle, pushTimeOut.length, pushTimeOut.unit) || { throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + toString) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 60906bacc6..e501373fef 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -241,6 +241,8 @@ class RemoteActorRefProvider(val app: ActorSystem) extends ActorRefProvider { private[akka] def createDeathWatch(): DeathWatch = local.createDeathWatch() //FIXME Implement Remote DeathWatch private[akka] def ask(message: Any, recipient: ActorRef, within: Timeout): Future[Any] = local.ask(message, recipient, within) + + private[akka] def dummyAskSender = local.dummyAskSender } /** diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index cb0d86099f..781289edea 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -10,9 +10,8 @@ import java.util.concurrent.RejectedExecutionException import akka.util.Switch import java.lang.ref.WeakReference import scala.annotation.tailrec -import akka.actor.ActorCell +import akka.actor.{ ActorCell, ActorRef, ActorSystem } import akka.dispatch._ -import akka.actor.ActorSystem /* * Locking rules: @@ -129,16 +128,17 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr } override def resume(actor: ActorCell) { - val mboxopt = getMailbox(actor) - if (mboxopt.isEmpty) return - val mbox = mboxopt.get - val queue = mbox.queue - val wasActive = queue.isActive - val switched = mbox.suspendSwitch.switchOff { - gatherFromAllOtherQueues(mbox, queue) - } - if (switched && !wasActive) { - runQueue(mbox, queue) + actor.mailbox match { + case mbox: CallingThreadMailbox ⇒ + val queue = mbox.queue + val wasActive = queue.isActive + val switched = mbox.suspendSwitch.switchOff { + gatherFromAllOtherQueues(mbox, queue) + } + if (switched && !wasActive) { + runQueue(mbox, queue) + } + case m ⇒ m.systemEnqueue(actor.self, Resume()) } } @@ -147,35 +147,37 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor) map (_.queue.isEmpty) getOrElse true protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) { - val mboxopt = getMailbox(receiver) - if (mboxopt.isEmpty) return - val mbox = mboxopt.get - mbox.systemEnqueue(message) - val queue = mbox.queue - if (!queue.isActive) { - queue.enter - runQueue(mbox, queue) + receiver.mailbox match { + case mbox: CallingThreadMailbox ⇒ + mbox.systemEnqueue(receiver.self, message) + val queue = mbox.queue + if (!queue.isActive) { + queue.enter + runQueue(mbox, queue) + } + case m ⇒ m.systemEnqueue(receiver.self, message) } } protected[akka] override def dispatch(receiver: ActorCell, handle: Envelope) { - val mboxopt = getMailbox(receiver) - if (mboxopt.isEmpty) return - val mbox = mboxopt.get - val queue = mbox.queue - val execute = mbox.suspendSwitch.fold { - queue.push(handle) - false - } { - queue.push(handle) - if (queue.isActive) - false - else { - queue.enter - true - } + receiver.mailbox match { + case mbox: CallingThreadMailbox ⇒ + val queue = mbox.queue + val execute = mbox.suspendSwitch.fold { + queue.push(handle) + false + } { + queue.push(handle) + if (queue.isActive) + false + else { + queue.enter + true + } + } + if (execute) runQueue(mbox, queue) + case m ⇒ m.enqueue(receiver.self, handle) } - if (execute) runQueue(mbox, queue) } protected[akka] override def executeTask(invocation: TaskInvocation) { invocation.run } @@ -270,7 +272,7 @@ class CallingThreadMailbox(val dispatcher: MessageDispatcher, _receiver: ActorCe val lock = new ReentrantLock val suspendSwitch = new Switch - override def enqueue(msg: Envelope) {} + override def enqueue(receiver: ActorRef, msg: Envelope) {} override def dequeue() = null override def hasMessages = true override def numberOfMessages = 0 diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 70536deb90..2e6c58ce81 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -3,13 +3,15 @@ */ package akka.testkit +import scala.annotation.tailrec import scala.util.matching.Regex -import akka.actor.Actor -import akka.event.Logging._ +import akka.actor.{ DeadLetter, ActorSystem } +import akka.dispatch.SystemMessage +import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug } import akka.event.Logging +import akka.testkit.TestEvent.{ UnMute, Mute } import akka.util.Duration -import akka.actor.ActorSystem /** * Implementation helpers of the EventFilter facilities: send `Mute` @@ -443,10 +445,16 @@ class TestEventListener extends Logging.DefaultLogger { var filters: List[EventFilter] = Nil override def receive = { - case InitializeLogger(bus) ⇒ Seq(classOf[Mute], classOf[UnMute]) foreach (bus.subscribe(context.self, _)) + case InitializeLogger(bus) ⇒ Seq(classOf[Mute], classOf[UnMute], classOf[DeadLetter]) foreach (bus.subscribe(context.self, _)) case Mute(filters) ⇒ filters foreach addFilter case UnMute(filters) ⇒ filters foreach removeFilter case event: LogEvent ⇒ if (!filter(event)) print(event) + case DeadLetter(msg: SystemMessage, null, rcp) ⇒ + val event = Warning(rcp, "received dead system message: " + msg) + if (!filter(event)) print(event) + case DeadLetter(msg, snd, rcp) ⇒ + val event = Warning(rcp, "received dead letter from " + snd + ": " + msg) + if (!filter(event)) print(event) } def filter(event: LogEvent): Boolean = filters exists (f ⇒ try { f(event) } catch { case e: Exception ⇒ false })