diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index e7ce5a59a2..7c676c4ec0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -127,7 +127,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "fail a monitor which does not handle Terminated()" in { filterEvents(EventFilter[ActorKilledException](), EventFilter[DeathPactException]()) { case class FF(fail: Failed) - val strategy = new OneForOneStrategy(maxNrOfRetries = 0)(SupervisorStrategy.makeDecider(List(classOf[Exception]))) { + val strategy = new OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider) { override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { testActor.tell(FF(Failed(child, cause, 0)), child) super.handleFailure(context, child, cause, stats, children) @@ -183,6 +183,43 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout // the testActor is not watching subject and will not receive a Terminated msg expectNoMsg } + + "discard Terminated when unwatched between sysmsg and processing" in { + case class W(ref: ActorRef) + case class U(ref: ActorRef) + class Watcher extends Actor { + def receive = { + case W(ref) ⇒ context watch ref + case U(ref) ⇒ context unwatch ref + case (t1: TestLatch, t2: TestLatch) ⇒ + t1.countDown() + Await.ready(t2, 3.seconds) + } + } + + val t1, t2 = TestLatch() + val w = system.actorOf(Props(new Watcher), "myDearWatcher") + val p = TestProbe() + w ! W(p.ref) + w ! ((t1, t2)) + Await.ready(t1, 3.seconds) + watch(p.ref) + system stop p.ref + expectTerminated(p.ref) + w ! U(p.ref) + t2.countDown() + /* + * now the Watcher will + * - process the DeathWatchNotification and enqueue Terminated + * - process the unwatch command + * - process the Terminated + * If it receives the Terminated it will die, which in fact it should not + */ + w ! Identify(()) + expectMsg(ActorIdentity((), Some(w))) + w ! Identify(()) + expectMsg(ActorIdentity((), Some(w))) + } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d6cbc54051..01ae610a65 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -96,7 +96,7 @@ case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) { @SerialVersionUID(1L) case class Terminated private[akka] (@BeanProperty actor: ActorRef)( @BeanProperty val existenceConfirmed: Boolean, - @BeanProperty val addressTerminated: Boolean) extends PossiblyHarmful + @BeanProperty val addressTerminated: Boolean) extends AutoReceivedMessage with PossiblyHarmful /** * INTERNAL API diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 86a4b37cec..242897ad30 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -470,6 +470,7 @@ private[akka] class ActorCell( publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { + case t: Terminated ⇒ receivedTerminated(t) case AddressTerminated(address) ⇒ addressTerminated(address) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 2bfa1c60ea..93dc032440 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -154,6 +154,7 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { def defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop + case _: DeathPactException ⇒ Stop case _: Exception ⇒ Restart } OneForOneStrategy()(defaultDecider) diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index ce12a7ca78..9c96d890dd 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -14,6 +14,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ private var watching: Set[ActorRef] = ActorCell.emptyActorRefSet private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet + private var terminatedQueued: Set[ActorRef] = ActorCell.emptyActorRefSet override final def watch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ @@ -29,14 +30,22 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def unwatch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ if (a != self && watchingContains(a)) { + a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ maintainAddressTerminatedSubscription(a) { - a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - removeWatching(a) + watching = removeFromSet(a, watching) } } + terminatedQueued = removeFromSet(a, terminatedQueued) a } + protected def receivedTerminated(t: Terminated): Unit = { + if (terminatedQueued(t.actor)) { + terminatedQueued -= t.actor // here we know that it is the SAME ref which was put in + receiveMessage(t) + } + } + /** * When this actor is watching the subject of [[akka.actor.Terminated]] message * it will be propagated to user's receive. @@ -45,9 +54,12 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor) if (watchingContains(actor)) { maintainAddressTerminatedSubscription(actor) { - removeWatching(actor) + watching = removeFromSet(actor, watching) + } + if (!isTerminating) { + self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor) + terminatedQueued += actor } - if (!isTerminating) self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor) } } @@ -58,12 +70,13 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ watching.contains(new UndefinedUidActorRef(subject))) } - // TODO this should be removed and be replaced with `watching -= subject` + // TODO this should be removed and be replaced with `set - subject` // when all actor references have uid, i.e. actorFor is removed - private def removeWatching(subject: ActorRef): Unit = { - watching -= subject + private def removeFromSet(subject: ActorRef, set: Set[ActorRef]): Set[ActorRef] = { + val removed = if (set(subject)) set - subject else set if (subject.path.uid != ActorCell.undefinedUid) - watching -= new UndefinedUidActorRef(subject) + removed - new UndefinedUidActorRef(subject) + else removed filterNot (_.path == subject.path) } protected def tellWatchersWeDied(actor: Actor): Unit = { @@ -100,6 +113,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ } } finally { watching = ActorCell.emptyActorRefSet + terminatedQueued = ActorCell.emptyActorRefSet unsubscribeAddressTerminated() } } diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index ef9bccf0a0..2dea93805f 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -196,9 +196,10 @@ monitoring of an already terminated actor leads to the immediate generation of the :class:`Terminated` message. It is also possible to deregister from watching another actor’s liveliness -using ``context.unwatch(target)``, but obviously this cannot guarantee -non-reception of the :class:`Terminated` message because that may already have -been queued. +using ``getContext().unwatch(target)``. This works even if the +:class:`Terminated` message has already been enqueued in the mailbox; after +calling :meth:`unwatch` no :class:`Terminated` message for that actor will be +processed anymore. Start Hook ---------- diff --git a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst index a2c1f6c2f6..2133be9e34 100644 --- a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst +++ b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst @@ -236,10 +236,39 @@ Instead, use actorSelection followed by identify request, and watch the verified case Terminated(`ref`) => // ... } - - Use ``watch`` instead of ``isTerminated`` ========================================= ``ActorRef.isTerminated`` is deprecated in favor of ``ActorContext.watch`` because ``isTerminated`` behaves differently for local and remote actors. + +DeathWatch Semantics are Simplified +=================================== + +DeathPactException is now Fatal +------------------------------- + +Previously an unhandled :class:`Terminated` message which led to a +:class:`DeathPactException` to the thrown would be answered with a ``Restart`` +directive by the default supervisor strategy. This is not intuitive given the +name of the exception and the Erlang linking feature by which it was inspired. +The default strategy has thus be changed to return ``Stop`` in this case. + +It can be argued that previously the actor would likely run into a restart loop +because watching a terminated actor would lead to a :class:`DeathPactException` +immediately again. + +Unwatching now Prevents Reception of Terminated +----------------------------------------------- + +Previously calling :meth:`ActorContext.unwatch` would unregister lifecycle +monitoring interest, but if the target actor had terminated already the +:class:`Terminated` message had already been enqueued and would be received +later—possibly leading to a :class:`DeathPactException`. This behavior has been +modified such that the :class:`Terminated` message will be silently discarded +if :meth:`unwatch` is called before processing the :class:`Terminated` +message. Therefore the following is now safe:: + + context.stop(target) + context.unwatch(target) + diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index f9fdcec3ce..6bd14235fe 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -310,9 +310,9 @@ monitoring of an already terminated actor leads to the immediate generation of the :class:`Terminated` message. It is also possible to deregister from watching another actor’s liveliness -using ``context.unwatch(target)``, but obviously this cannot guarantee -non-reception of the :class:`Terminated` message because that may already have -been queued. +using ``context.unwatch(target)``. This works even if the :class:`Terminated` +message has already been enqueued in the mailbox; after calling :meth:`unwatch` +no :class:`Terminated` message for that actor will be processed anymore. Start Hook ----------