From f3f69f55d4904344681a9c56beed3bc2a705fa2e Mon Sep 17 00:00:00 2001 From: Martin Krasser Date: Tue, 26 Nov 2013 09:01:55 +0100 Subject: [PATCH] =act #3733 Re-receive unstashed Terminated messages --- .../scala/akka/actor/ActorWithStashSpec.scala | 27 +++++++++++++++++++ .../src/main/scala/akka/actor/Stash.scala | 23 +++++++++++++--- .../scala/akka/actor/dungeon/DeathWatch.scala | 5 +++- 3 files changed, 50 insertions(+), 5 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala index 4071d7969a..4662c34328 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorWithStashSpec.scala @@ -66,6 +66,27 @@ object ActorWithStashSpec { } } + class WatchedActor extends Actor { + def receive = Actor.emptyBehavior + } + + class TerminatedMessageStashingActor(probe: ActorRef) extends Actor with Stash { + val watched = context.watch(context.actorOf(Props[WatchedActor])) + var stashed = false + + context.stop(watched) + + def receive = { + case Terminated(`watched`) ⇒ + if (!stashed) { + stash() + stashed = true + unstashAll() + } + probe ! "terminated" + } + } + object state { @volatile var s: String = "" @@ -156,6 +177,12 @@ class ActorWithStashSpec extends AkkaSpec(ActorWithStashSpec.testConf) with Defa Await.ready(restartLatch, 10 seconds) Await.ready(hasMsgLatch, 10 seconds) } + + "re-receive unstashed Terminated messages" in { + system.actorOf(Props(classOf[TerminatedMessageStashingActor], testActor)) + expectMsg("terminated") + expectMsg("terminated") + } } "An ActWithStash" must { diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index e770db5d19..724d7a20b5 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -119,6 +119,8 @@ private[akka] trait StashSupport { */ private var theStash = Vector.empty[Envelope] + private def actorCell = context.asInstanceOf[ActorCell] + /* The capacity of the stash. Configured in the actor's mailbox or dispatcher config. */ private val capacity: Int = { @@ -137,7 +139,7 @@ private[akka] trait StashSupport { * `mailbox.queue` is the underlying `Deque`. */ private[akka] val mailbox: DequeBasedMessageQueueSemantics = { - context.asInstanceOf[ActorCell].mailbox.messageQueue match { + actorCell.mailbox.messageQueue match { case queue: DequeBasedMessageQueueSemantics ⇒ queue case other ⇒ throw ActorInitializationException(self, s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" + """An (unbounded) deque-based mailbox can be configured as follows: @@ -156,7 +158,7 @@ private[akka] trait StashSupport { * @throws IllegalStateException if the same message is stashed more than once */ def stash(): Unit = { - val currMsg = context.asInstanceOf[ActorCell].currentMessage + val currMsg = actorCell.currentMessage if (theStash.nonEmpty && (currMsg eq theStash.last)) throw new IllegalStateException("Can't stash the same message " + currMsg + " more than once") if (capacity <= 0 || theStash.size < capacity) theStash :+= currMsg @@ -182,7 +184,7 @@ private[akka] trait StashSupport { * if the `unstash()` call successfully returns or throws an exception. */ private[akka] def unstash(): Unit = if (theStash.nonEmpty) try { - mailbox.enqueueFirst(self, theStash.head) + enqueueFirst(theStash.head) } finally { theStash = theStash.tail } @@ -216,7 +218,7 @@ private[akka] trait StashSupport { private[akka] def unstashAll(filterPredicate: Any ⇒ Boolean): Unit = { try { val i = theStash.reverseIterator.filter(envelope ⇒ filterPredicate(envelope.message)) - while (i.hasNext) mailbox.enqueueFirst(self, i.next()) + while (i.hasNext) enqueueFirst(i.next()) } finally { theStash = Vector.empty[Envelope] } @@ -232,6 +234,19 @@ private[akka] trait StashSupport { theStash = Vector.empty[Envelope] stashed } + + /** + * Enqueues `envelope` at the first position in the mailbox. If the message contained in + * the envelope is a `Terminated` message, it will be ensured that it can be re-received + * by the actor. + */ + private def enqueueFirst(envelope: Envelope): Unit = { + mailbox.enqueueFirst(self, envelope) + envelope.message match { + case Terminated(ref) ⇒ actorCell.terminatedQueuedFor(ref) + case _ ⇒ + } + } } /** diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index ef15064f98..56a541d327 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -57,11 +57,14 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ } if (!isTerminating) { self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor) - terminatedQueued += actor + terminatedQueuedFor(actor) } } } + private[akka] def terminatedQueuedFor(subject: ActorRef): Unit = + terminatedQueued += subject + // TODO this should be removed and be replaced with `watching.contains(subject)` // when all actor references have uid, i.e. actorFor is removed private def watchingContains(subject: ActorRef): Boolean =