Merge pull request #1335 from akka/wip-3222-DeathPact-∂π

clean up DeathWatch semantics, see #3222
This commit is contained in:
Roland Kuhn 2013-04-16 02:18:52 -07:00
commit 3f76e9298d
8 changed files with 101 additions and 18 deletions

View file

@ -127,7 +127,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
"fail a monitor which does not handle Terminated()" in {
filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) {
case class FF(fail: Failed)
val strategy = new OneForOneStrategy(maxNrOfRetries = 0)(SupervisorStrategy.makeDecider(List(classOf[Exception]))) {
val strategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) {
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
testActor.tell(FF(Failed(child, cause, 0)), child)
super.handleFailure(context, child, cause, stats, children)
@ -183,6 +183,43 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
// the testActor is not watching subject and will not receive a Terminated msg
expectNoMsg
}
"discard Terminated when unwatched between sysmsg and processing" in {
case class W(ref: ActorRef)
case class U(ref: ActorRef)
class Watcher extends Actor {
def receive = {
case W(ref) context watch ref
case U(ref) context unwatch ref
case (t1: TestLatch, t2: TestLatch)
t1.countDown()
Await.ready(t2, 3.seconds)
}
}
val t1, t2 = TestLatch()
val w = system.actorOf(Props(new Watcher), "myDearWatcher")
val p = TestProbe()
w ! W(p.ref)
w ! ((t1, t2))
Await.ready(t1, 3.seconds)
watch(p.ref)
system stop p.ref
expectTerminated(p.ref)
w ! U(p.ref)
t2.countDown()
/*
* now the Watcher will
* - process the DeathWatchNotification and enqueue Terminated
* - process the unwatch command
* - process the Terminated
* If it receives the Terminated it will die, which in fact it should not
*/
w ! Identify(())
expectMsg(ActorIdentity((), Some(w)))
w ! Identify(())
expectMsg(ActorIdentity((), Some(w)))
}
}
}

View file

@ -96,7 +96,7 @@ case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) {
@SerialVersionUID(1L)
case class Terminated private[akka] (@BeanProperty actor: ActorRef)(
@BeanProperty val existenceConfirmed: Boolean,
@BeanProperty val addressTerminated: Boolean) extends PossiblyHarmful
@BeanProperty val addressTerminated: Boolean) extends AutoReceivedMessage with PossiblyHarmful
/**
* INTERNAL API

View file

@ -470,6 +470,7 @@ private[akka] class ActorCell(
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case t: Terminated receivedTerminated(t)
case AddressTerminated(address) addressTerminated(address)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()

View file

@ -154,6 +154,7 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
def defaultDecider: Decider = {
case _: ActorInitializationException Stop
case _: ActorKilledException Stop
case _: DeathPactException Stop
case _: Exception Restart
}
OneForOneStrategy()(defaultDecider)

View file

@ -14,6 +14,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
private var watching: Set[ActorRef] = ActorCell.emptyActorRefSet
private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet
private var terminatedQueued: Set[ActorRef] = ActorCell.emptyActorRefSet
override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
@ -29,14 +30,22 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
override final def unwatch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && watchingContains(a)) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
removeWatching(a)
watching = removeFromSet(a, watching)
}
}
terminatedQueued = removeFromSet(a, terminatedQueued)
a
}
protected def receivedTerminated(t: Terminated): Unit = {
if (terminatedQueued(t.actor)) {
terminatedQueued -= t.actor // here we know that it is the SAME ref which was put in
receiveMessage(t)
}
}
/**
* When this actor is watching the subject of [[akka.actor.Terminated]] message
* it will be propagated to user's receive.
@ -45,9 +54,12 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor)
if (watchingContains(actor)) {
maintainAddressTerminatedSubscription(actor) {
removeWatching(actor)
watching = removeFromSet(actor, watching)
}
if (!isTerminating) {
self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor)
terminatedQueued += actor
}
if (!isTerminating) self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor)
}
}
@ -58,12 +70,13 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
watching.contains(new UndefinedUidActorRef(subject)))
}
// TODO this should be removed and be replaced with `watching -= subject`
// TODO this should be removed and be replaced with `set - subject`
// when all actor references have uid, i.e. actorFor is removed
private def removeWatching(subject: ActorRef): Unit = {
watching -= subject
private def removeFromSet(subject: ActorRef, set: Set[ActorRef]): Set[ActorRef] = {
val removed = if (set(subject)) set - subject else set
if (subject.path.uid != ActorCell.undefinedUid)
watching -= new UndefinedUidActorRef(subject)
removed - new UndefinedUidActorRef(subject)
else removed filterNot (_.path == subject.path)
}
protected def tellWatchersWeDied(actor: Actor): Unit = {
@ -100,6 +113,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
}
} finally {
watching = ActorCell.emptyActorRefSet
terminatedQueued = ActorCell.emptyActorRefSet
unsubscribeAddressTerminated()
}
}

View file

@ -196,9 +196,10 @@ monitoring of an already terminated actor leads to the immediate generation of
the :class:`Terminated` message.
It is also possible to deregister from watching another actors liveliness
using ``context.unwatch(target)``, but obviously this cannot guarantee
non-reception of the :class:`Terminated` message because that may already have
been queued.
using ``getContext().unwatch(target)``. This works even if the
:class:`Terminated` message has already been enqueued in the mailbox; after
calling :meth:`unwatch` no :class:`Terminated` message for that actor will be
processed anymore.
Start Hook
----------

View file

@ -236,10 +236,39 @@ Instead, use actorSelection followed by identify request, and watch the verified
case Terminated(`ref`) => // ...
}
Use ``watch`` instead of ``isTerminated``
=========================================
``ActorRef.isTerminated`` is deprecated in favor of ``ActorContext.watch`` because
``isTerminated`` behaves differently for local and remote actors.
DeathWatch Semantics are Simplified
===================================
DeathPactException is now Fatal
-------------------------------
Previously an unhandled :class:`Terminated` message which led to a
:class:`DeathPactException` to the thrown would be answered with a ``Restart``
directive by the default supervisor strategy. This is not intuitive given the
name of the exception and the Erlang linking feature by which it was inspired.
The default strategy has thus be changed to return ``Stop`` in this case.
It can be argued that previously the actor would likely run into a restart loop
because watching a terminated actor would lead to a :class:`DeathPactException`
immediately again.
Unwatching now Prevents Reception of Terminated
-----------------------------------------------
Previously calling :meth:`ActorContext.unwatch` would unregister lifecycle
monitoring interest, but if the target actor had terminated already the
:class:`Terminated` message had already been enqueued and would be received
later—possibly leading to a :class:`DeathPactException`. This behavior has been
modified such that the :class:`Terminated` message will be silently discarded
if :meth:`unwatch` is called before processing the :class:`Terminated`
message. Therefore the following is now safe::
context.stop(target)
context.unwatch(target)

View file

@ -310,9 +310,9 @@ monitoring of an already terminated actor leads to the immediate generation of
the :class:`Terminated` message.
It is also possible to deregister from watching another actors liveliness
using ``context.unwatch(target)``, but obviously this cannot guarantee
non-reception of the :class:`Terminated` message because that may already have
been queued.
using ``context.unwatch(target)``. This works even if the :class:`Terminated`
message has already been enqueued in the mailbox; after calling :meth:`unwatch`
no :class:`Terminated` message for that actor will be processed anymore.
Start Hook
----------