Preserve sender when sending to deadLetters, see #3009

This commit is contained in:
Patrik Nordwall 2013-02-04 12:41:58 +01:00
parent c6f08fb935
commit 939893ef5f
6 changed files with 32 additions and 23 deletions

View file

@ -57,11 +57,11 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
* This is needed for actually executing the mailbox, i.e. invoking the
* ActorCell. There are situations (e.g. RepointableActorRef) where a Mailbox
* is constructed but we know that we will not execute it, in which case this
* will be null. It must be a var to support switching into an active
* will be null. It must be a var to support switching into an active
* mailbox, should the owning ActorRef turn local.
*
*
* ANOTHER THING, IMPORTANT:
*
*
* actorCell.start() publishes actorCell & self to the dispatcher, which
* means that messages may be processed theoretically before selfs constructor
* ends. The JMM guarantees visibility for final fields only after the end
@ -136,7 +136,8 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
*/
@tailrec
final def resume(): Boolean = status match {
case Closed setStatus(Closed); false
case Closed
setStatus(Closed); false
case s
val next = if (s < suspendUnit) s else s - suspendUnit
if (updateStatus(s, next)) next < suspendUnit
@ -151,7 +152,8 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
*/
@tailrec
final def suspend(): Boolean = status match {
case Closed setStatus(Closed); false
case Closed
setStatus(Closed); false
case s
if (updateStatus(s, s + suspendUnit)) s < suspendUnit
else suspend()
@ -163,8 +165,9 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
*/
@tailrec
final def becomeClosed(): Boolean = status match {
case Closed setStatus(Closed); false
case s updateStatus(s, Closed) || becomeClosed()
case Closed
setStatus(Closed); false
case s updateStatus(s, Closed) || becomeClosed()
}
/**
@ -263,7 +266,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
}
/*
* if we closed the mailbox, we must dump the remaining system messages
* to deadLetters (this is essential for DeathWatch)
* to deadLetters (this is essential for DeathWatch)
*/
val dlm = actor.systemImpl.deadLetterMailbox
while (nextMessage ne null) {
@ -435,7 +438,8 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0) {
if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver)
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
DeadLetter(handle.message, handle.sender, receiver), handle.sender)
} else queue put handle
def dequeue(): Envelope = queue.poll()
@ -470,13 +474,15 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def enqueue(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0) {
if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver)
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
DeadLetter(handle.message, handle.sender, receiver), handle.sender)
} else queue put handle
def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit =
if (pushTimeOut.length > 0) {
if (!queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit))
receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver)
receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell(
DeadLetter(handle.message, handle.sender, receiver), handle.sender)
} else queue putFirst handle
def dequeue(): Envelope = queue.poll()