diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 51a54abdfc..689d1443f5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -408,7 +408,7 @@ class LocalActorRefProvider private[akka] ( def receive = { case Terminated(_) ⇒ context.stop(self) case StopChild(child) ⇒ context.stop(child) - case m ⇒ deadLetters ! DeadLetter(m, sender, self) + case m ⇒ deadLetters forward DeadLetter(m, sender, self) } // guardian MUST NOT lose its children during restart @@ -439,13 +439,13 @@ class LocalActorRefProvider private[akka] ( case RegisterTerminationHook if sender != context.system.deadLetters ⇒ terminationHooks += sender context watch sender - case m ⇒ deadLetters ! DeadLetter(m, sender, self) + case m ⇒ deadLetters forward DeadLetter(m, sender, self) } def terminating: Receive = { case Terminated(a) ⇒ stopWhenAllTerminationHooksDone(a) case TerminationHookDone ⇒ stopWhenAllTerminationHooksDone(sender) - case m ⇒ deadLetters ! DeadLetter(m, sender, self) + case m ⇒ deadLetters forward DeadLetter(m, sender, self) } def stopWhenAllTerminationHooksDone(remove: ActorRef): Unit = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index d0139af91d..75a8ba280f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -546,7 +546,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, //FIXME Why do we need this at all? val deadLetterQueue: MessageQueue = new MessageQueue { def enqueue(receiver: ActorRef, envelope: Envelope): Unit = - deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) + deadLetters.tell(DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender) def dequeue() = null def hasMessages = false def numberOfMessages = 0 diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index fec648be06..5b265a6055 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -37,7 +37,7 @@ private[akka] class RepointableActorRef( /* * H E R E B E D R A G O N S ! - * + * * There are two main functions of a Cell: message queueing and child lookup. * When switching out the UnstartedCell for its real replacement, the former * must be switched after all messages have been drained from the temporary @@ -92,8 +92,8 @@ private[akka] class RepointableActorRef( underlying match { case u: UnstartedCell ⇒ /* - * The problem here was that if the real actor (which will start running - * at cell.start()) creates children in its constructor, then this may + * The problem here was that if the real actor (which will start running + * at cell.start()) creates children in its constructor, then this may * happen before the swapCell in u.replaceWith, meaning that those * children cannot be looked up immediately, e.g. if they shall become * routees. @@ -166,7 +166,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val uid: Int) extends Cell { /* - * This lock protects all accesses to this cell’s queues. It also ensures + * This lock protects all accesses to this cell’s queues. It also ensures * safe switching to the started ActorCell. */ private[this] final val lock = new ReentrantLock @@ -211,12 +211,12 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, cell.sendMessage(msg) } else if (!queue.offer(msg)) { system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type " + msg.message.getClass + " due to enqueue failure")) - system.deadLetters ! DeadLetter(msg.message, msg.sender, self) + system.deadLetters.tell(DeadLetter(msg.message, msg.sender, self), msg.sender) } else if (Mailbox.debug) println(s"$self temp queueing ${msg.message} from ${msg.sender}") } finally lock.unlock() } else { system.eventStream.publish(Warning(self.path.toString, getClass, "dropping message of type" + msg.message.getClass + " due to lock timeout")) - system.deadLetters ! DeadLetter(msg.message, msg.sender, self) + system.deadLetters.tell(DeadLetter(msg.message, msg.sender, self), msg.sender) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index e47f869a3e..8f5f376788 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -57,11 +57,11 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) * This is needed for actually executing the mailbox, i.e. invoking the * ActorCell. There are situations (e.g. RepointableActorRef) where a Mailbox * is constructed but we know that we will not execute it, in which case this - * will be null. It must be a var to support switching into an “active” + * will be null. It must be a var to support switching into an “active” * mailbox, should the owning ActorRef turn local. - * + * * ANOTHER THING, IMPORTANT: - * + * * actorCell.start() publishes actorCell & self to the dispatcher, which * means that messages may be processed theoretically before self’s constructor * ends. The JMM guarantees visibility for final fields only after the end @@ -136,7 +136,8 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) */ @tailrec final def resume(): Boolean = status match { - case Closed ⇒ setStatus(Closed); false + case Closed ⇒ + setStatus(Closed); false case s ⇒ val next = if (s < suspendUnit) s else s - suspendUnit if (updateStatus(s, next)) next < suspendUnit @@ -151,7 +152,8 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) */ @tailrec final def suspend(): Boolean = status match { - case Closed ⇒ setStatus(Closed); false + case Closed ⇒ + setStatus(Closed); false case s ⇒ if (updateStatus(s, s + suspendUnit)) s < suspendUnit else suspend() @@ -163,8 +165,9 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) */ @tailrec final def becomeClosed(): Boolean = status match { - case Closed ⇒ setStatus(Closed); false - case s ⇒ updateStatus(s, Closed) || becomeClosed() + case Closed ⇒ + setStatus(Closed); false + case s ⇒ updateStatus(s, Closed) || becomeClosed() } /** @@ -263,7 +266,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) } /* * if we closed the mailbox, we must dump the remaining system messages - * to deadLetters (this is essential for DeathWatch) + * to deadLetters (this is essential for DeathWatch) */ val dlm = actor.systemImpl.deadLetterMailbox while (nextMessage ne null) { @@ -435,7 +438,8 @@ trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue { def enqueue(receiver: ActorRef, handle: Envelope): Unit = if (pushTimeOut.length > 0) { if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit)) - receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver) + receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( + DeadLetter(handle.message, handle.sender, receiver), handle.sender) } else queue put handle def dequeue(): Envelope = queue.poll() @@ -470,13 +474,15 @@ trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue { def enqueue(receiver: ActorRef, handle: Envelope): Unit = if (pushTimeOut.length > 0) { if (!queue.offer(handle, pushTimeOut.length, pushTimeOut.unit)) - receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver) + receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( + DeadLetter(handle.message, handle.sender, receiver), handle.sender) } else queue put handle def enqueueFirst(receiver: ActorRef, handle: Envelope): Unit = if (pushTimeOut.length > 0) { if (!queue.offerFirst(handle, pushTimeOut.length, pushTimeOut.unit)) - receiver.asInstanceOf[InternalActorRef].provider.deadLetters ! DeadLetter(handle.message, handle.sender, receiver) + receiver.asInstanceOf[InternalActorRef].provider.deadLetters.tell( + DeadLetter(handle.message, handle.sender, receiver), handle.sender) } else queue putFirst handle def dequeue(): Envelope = queue.poll() diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index e78af62b27..1cc08772aa 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -446,7 +446,10 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends case Terminated(_) ⇒ // why should we care now? } - private def forwardToDeadLetters(s: Send): Unit = extendedSystem.deadLetters.tell(s.message, s.senderOption.orNull) + private def forwardToDeadLetters(s: Send): Unit = { + val sender = s.senderOption.getOrElse(extendedSystem.deadLetters) + extendedSystem.deadLetters.tell(DeadLetter(s.message, sender, s.recipient), sender) + } private def listens: Future[Seq[(Transport, Address, Promise[AssociationEventListener])]] = { /* diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 6dcd6450ef..a07c235abd 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -76,7 +76,7 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor { override def postStop() = { import scala.collection.JavaConverters._ - queue.asScala foreach { m ⇒ context.system.deadLetters ! DeadLetter(m.msg, m.sender, self) } + queue.asScala foreach { m ⇒ context.system.deadLetters.tell(DeadLetter(m.msg, m.sender, self), m.sender) } } }