=act #3733 Re-receive unstashed Terminated messages

This commit is contained in:
Martin Krasser 2013-11-26 09:01:55 +01:00
parent c35988de2c
commit f3f69f55d4
3 changed files with 50 additions and 5 deletions

View file

@ -66,6 +66,27 @@ object ActorWithStashSpec {
}
}
class WatchedActor extends Actor {
def receive = Actor.emptyBehavior
}
class TerminatedMessageStashingActor(probe: ActorRef) extends Actor with Stash {
val watched = context.watch(context.actorOf(Props[WatchedActor]))
var stashed = false
context.stop(watched)
def receive = {
case Terminated(`watched`)
if (!stashed) {
stash()
stashed = true
unstashAll()
}
probe ! "terminated"
}
}
object state {
@volatile
var s: String = ""
@ -156,6 +177,12 @@ class ActorWithStashSpec extends AkkaSpec(ActorWithStashSpec.testConf) with Defa
Await.ready(restartLatch, 10 seconds)
Await.ready(hasMsgLatch, 10 seconds)
}
"re-receive unstashed Terminated messages" in {
system.actorOf(Props(classOf[TerminatedMessageStashingActor], testActor))
expectMsg("terminated")
expectMsg("terminated")
}
}
"An ActWithStash" must {

View file

@ -119,6 +119,8 @@ private[akka] trait StashSupport {
*/
private var theStash = Vector.empty[Envelope]
private def actorCell = context.asInstanceOf[ActorCell]
/* The capacity of the stash. Configured in the actor's mailbox or dispatcher config.
*/
private val capacity: Int = {
@ -137,7 +139,7 @@ private[akka] trait StashSupport {
* `mailbox.queue` is the underlying `Deque`.
*/
private[akka] val mailbox: DequeBasedMessageQueueSemantics = {
context.asInstanceOf[ActorCell].mailbox.messageQueue match {
actorCell.mailbox.messageQueue match {
case queue: DequeBasedMessageQueueSemantics queue
case other throw ActorInitializationException(self, s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" +
"""An (unbounded) deque-based mailbox can be configured as follows:
@ -156,7 +158,7 @@ private[akka] trait StashSupport {
* @throws IllegalStateException if the same message is stashed more than once
*/
def stash(): Unit = {
val currMsg = context.asInstanceOf[ActorCell].currentMessage
val currMsg = actorCell.currentMessage
if (theStash.nonEmpty && (currMsg eq theStash.last))
throw new IllegalStateException("Can't stash the same message " + currMsg + " more than once")
if (capacity <= 0 || theStash.size < capacity) theStash :+= currMsg
@ -182,7 +184,7 @@ private[akka] trait StashSupport {
* if the `unstash()` call successfully returns or throws an exception.
*/
private[akka] def unstash(): Unit = if (theStash.nonEmpty) try {
mailbox.enqueueFirst(self, theStash.head)
enqueueFirst(theStash.head)
} finally {
theStash = theStash.tail
}
@ -216,7 +218,7 @@ private[akka] trait StashSupport {
private[akka] def unstashAll(filterPredicate: Any Boolean): Unit = {
try {
val i = theStash.reverseIterator.filter(envelope filterPredicate(envelope.message))
while (i.hasNext) mailbox.enqueueFirst(self, i.next())
while (i.hasNext) enqueueFirst(i.next())
} finally {
theStash = Vector.empty[Envelope]
}
@ -232,6 +234,19 @@ private[akka] trait StashSupport {
theStash = Vector.empty[Envelope]
stashed
}
/**
* Enqueues `envelope` at the first position in the mailbox. If the message contained in
* the envelope is a `Terminated` message, it will be ensured that it can be re-received
* by the actor.
*/
private def enqueueFirst(envelope: Envelope): Unit = {
mailbox.enqueueFirst(self, envelope)
envelope.message match {
case Terminated(ref) actorCell.terminatedQueuedFor(ref)
case _
}
}
}
/**

View file

@ -57,11 +57,14 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
}
if (!isTerminating) {
self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor)
terminatedQueued += actor
terminatedQueuedFor(actor)
}
}
}
private[akka] def terminatedQueuedFor(subject: ActorRef): Unit =
terminatedQueued += subject
// TODO this should be removed and be replaced with `watching.contains(subject)`
// when all actor references have uid, i.e. actorFor is removed
private def watchingContains(subject: ActorRef): Boolean =