diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 5fa63fc0f8..e7ce5a59a2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -5,7 +5,7 @@ package akka.actor import language.postfixOps -import akka.dispatch.sysmsg.Failed +import akka.dispatch.sysmsg.{ DeathWatchNotification, Failed } import akka.pattern.ask import akka.testkit._ import scala.concurrent.duration._ @@ -176,13 +176,11 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout "only notify when watching" in { val subject = system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior })) - val observer = system.actorOf(Props(new Actor { - context.watch(subject) - def receive = { case x ⇒ testActor forward x } - })) - subject ! PoisonPill - // the testActor is not watching subject and will discard Terminated msg + testActor.asInstanceOf[InternalActorRef] + .sendSystemMessage(DeathWatchNotification(subject, existenceConfirmed = true, addressTerminated = false)) + + // the testActor is not watching subject and will not receive a Terminated msg expectNoMsg } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index b4e8840b0e..75bd0a2e3d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -345,7 +345,7 @@ object SupervisorHierarchySpec { * Execution happens in phases (which is the reason for FSM): * * Idle: - * - upon reception of Init message, construct hierary and go to Init state + * - upon reception of Init message, construct hierarchy and go to Init state * * Init: * - receive refs of all contained actors diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 6a0bc9abbb..8b14918c79 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -392,8 +392,14 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende override def postRestart(reason: Throwable): Unit = testActor ! "parent restarted" + // Overriding to disable auto-unwatch + override def preRestart(reason: Throwable, msg: Option[Any]): Unit = { + context.children foreach context.stop + postStop() + } + def receive = { - case Terminated(a) if a.path == child.path ⇒ testActor ! "child terminated" // FIXME case t @ Terminated(`child`) ticket #3156 + case Terminated(a) if a.path == child.path ⇒ testActor ! "child terminated" case l: TestLatch ⇒ child ! l case "test" ⇒ sender ! "green" case "testchild" ⇒ child forward "test" diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 07b4065e3c..9ab451e677 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -122,7 +122,6 @@ object SerializationTests { classOf[Resume], classOf[Terminate], classOf[Supervise], - classOf[ChildTerminated], classOf[Watch], classOf[Unwatch], classOf[Failed], @@ -350,11 +349,8 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR "be preserved for the Supervise SystemMessage" in { verify(Supervise(FakeActorRef("child"), true), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001e616b6b612e64697370617463682e7379736d73672e5375706572766973652d0b363f56ab5feb0200025a00056173796e634c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b7870017372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") } - "be preserved for the ChildTerminated SystemMessage" in { - verify(ChildTerminated(FakeActorRef("child")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720024616b6b612e64697370617463682e7379736d73672e4368696c645465726d696e617465644c84222437ed5db40200014c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") - } "be preserved for the Watch SystemMessage" in { - verify(Watch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001a616b6b612e64697370617463682e7379736d73672e57617463682e1e65bc74394fc40200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003") + verify(Watch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001a616b6b612e64697370617463682e7379736d73672e57617463682e1e65bc74394fc40200024c00077761746368656574001d4c616b6b612f6163746f722f496e7465726e616c4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003") } "be preserved for the Unwatch SystemMessage" in { verify(Unwatch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001c616b6b612e64697370617463682e7379736d73672e556e776174636858501f7ee63dc2100200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003") @@ -364,7 +360,7 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR } "be preserved for the Failed SystemMessage" in { // Using null as the cause to avoid a large serialized message - verify(Failed(FakeActorRef("child"), cause = null, uid = 0), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001b616b6b612e64697370617463682e7379736d73672e4661696c656400000000000000030200034900037569644c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b4c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b787000000000707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") + verify(Failed(FakeActorRef("child"), cause = null, uid = 0), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001b616b6b612e64697370617463682e7379736d73672e4661696c656400000000000000010200034900037569644c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b4c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b787000000000707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 096b810e84..288045804c 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -96,7 +96,7 @@ case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) { @SerialVersionUID(1L) case class Terminated private[akka] (@BeanProperty actor: ActorRef)( @BeanProperty val existenceConfirmed: Boolean, - @BeanProperty val addressTerminated: Boolean) extends AutoReceivedMessage with PossiblyHarmful + @BeanProperty val addressTerminated: Boolean) extends PossiblyHarmful /** * INTERNAL API @@ -500,7 +500,10 @@ trait Actor { */ @throws(classOf[Exception]) def preRestart(reason: Throwable, message: Option[Any]) { - context.children foreach context.stop + context.children foreach { child ⇒ + context.unwatch(child) + context.stop(child) + } postStop() } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 9154eaec14..cdc699fe63 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -8,7 +8,6 @@ import akka.actor.dungeon.ChildrenContainer import akka.dispatch.Envelope import akka.dispatch.NullMessage import akka.dispatch.sysmsg._ -import akka.dispatch.sysmsg.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, Create, ChildTerminated } import akka.event.Logging.{ LogEvent, Debug, Error } import akka.japi.Procedure import java.io.{ ObjectOutputStream, NotSerializableException } @@ -424,6 +423,7 @@ private[akka] class ActorCell( message match { case message: SystemMessage if shouldStash(message, currentState) ⇒ stash(message) case f: Failed ⇒ handleFailure(f) + case DeathWatchNotification(a, ec, at) ⇒ watchedActorTerminated(a, ec, at) case Create() ⇒ create() case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher) case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher) @@ -432,7 +432,6 @@ private[akka] class ActorCell( case Resume(inRespToFailure) ⇒ faultResume(inRespToFailure) case Terminate() ⇒ terminate() case Supervise(child, async) ⇒ supervise(child, async) - case ChildTerminated(child) ⇒ handleChildTerminated(child) case NoMessage ⇒ // only here to suppress warning } } catch handleNonFatalOrInterruptedException { e ⇒ @@ -471,7 +470,6 @@ private[akka] class ActorCell( publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { - case t: Terminated ⇒ watchedActorTerminated(t) case AddressTerminated(address) ⇒ addressTerminated(address) case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c48c304f87..08a7e84421 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -495,7 +495,8 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, protected def specialHandle(msg: Any, sender: ActorRef): Boolean = msg match { case w: Watch ⇒ if (w.watchee == this && w.watcher != this) - w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false) + w.watcher.sendSystemMessage( + DeathWatchNotification(w.watchee, existenceConfirmed = false, addressTerminated = false)) true case _: Unwatch ⇒ true // Just ignore case Identify(messageId) ⇒ @@ -529,7 +530,8 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, override protected def specialHandle(msg: Any, sender: ActorRef): Boolean = msg match { case w: Watch ⇒ if (w.watchee != this && w.watcher != this) - w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false) + w.watcher.sendSystemMessage( + DeathWatchNotification(w.watchee, existenceConfirmed = false, addressTerminated = false)) true case w: Unwatch ⇒ true // Just ignore case Identify(messageId) ⇒ diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index a8a0b07778..18a3077e22 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -433,10 +433,10 @@ private[akka] class LocalActorRefProvider private[akka] ( override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { message match { - case Failed(child, ex, _) ⇒ { causeOfTermination = Some(ex); child.asInstanceOf[InternalActorRef].stop() } - case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead - case ChildTerminated(_) ⇒ stop() - case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") + case Failed(child, ex, _) ⇒ { causeOfTermination = Some(ex); child.asInstanceOf[InternalActorRef].stop() } + case _: Supervise ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead + case _: DeathWatchNotification ⇒ stop() + case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") } } } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index 6dbebb806b..ce12a7ca78 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -5,7 +5,7 @@ package akka.actor.dungeon import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, ActorCell, Actor, Address, AddressTerminated } -import akka.dispatch.sysmsg.{ ChildTerminated, Watch, Unwatch } +import akka.dispatch.sysmsg.{ DeathWatchNotification, Watch, Unwatch } import akka.event.Logging.{ Warning, Error, Debug } import scala.util.control.NonFatal import akka.actor.MinimalActorRef @@ -41,13 +41,15 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ * When this actor is watching the subject of [[akka.actor.Terminated]] message * it will be propagated to user's receive. */ - protected def watchedActorTerminated(t: Terminated): Unit = - if (watchingContains(t.actor)) { - maintainAddressTerminatedSubscription(t.actor) { - removeWatching(t.actor) + protected def watchedActorTerminated(actor: ActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = { + if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor) + if (watchingContains(actor)) { + maintainAddressTerminatedSubscription(actor) { + removeWatching(actor) } - receiveMessage(t) + if (!isTerminating) self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor) } + } // TODO this should be removed and be replaced with `watching.contains(subject)` // when all actor references have uid, i.e. actorFor is removed @@ -66,10 +68,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ protected def tellWatchersWeDied(actor: Actor): Unit = { if (!watchedBy.isEmpty) { - val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false) try { def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit = - if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.tell(terminated, self) + if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.asInstanceOf[InternalActorRef].sendSystemMessage( + DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false)) /* * It is important to notify the remote watchers first, otherwise RemoteDaemon might shut down, causing @@ -141,15 +143,14 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ for (a ← watchedBy; if a.path.address == address) watchedBy -= a } - // send Terminated to self for all matching subjects - // existenceConfirmed = false because we could have been watching a + // send DeathWatchNotification to self for all matching subjects + // that are not child with existenceConfirmed = false because we could have been watching a // non-local ActorRef that had never resolved before the other node went down // When a parent is watching a child and it terminates due to AddressTerminated - // it is removed by sending ChildTerminated to support immediate creation of child - // with same name. + // it is removed by sending DeathWatchNotification with existenceConfirmed = true to support + // immediate creation of child with same name. for (a ← watching; if a.path.address == address) { - childrenRefs.getByRef(a) foreach { _ ⇒ self.sendSystemMessage(ChildTerminated(a)) } - self ! Terminated(a)(existenceConfirmed = false, addressTerminated = true) + self.sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = childrenRefs.getByRef(a).isDefined, addressTerminated = true)) } } diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index 5badb8c180..baad10ed8a 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -8,7 +8,6 @@ import akka.actor.PostRestartException import akka.actor.PreRestartException import akka.actor.{ InternalActorRef, ActorRef, ActorInterruptedException, ActorCell, Actor } import akka.dispatch._ -import akka.dispatch.sysmsg.ChildTerminated import akka.dispatch.sysmsg._ import akka.event.Logging import akka.event.Logging.Debug @@ -202,7 +201,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ try if (a ne null) a.postStop() catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(a), e.getMessage)) } finally try dispatcher.detach(this) - finally try parent.sendSystemMessage(ChildTerminated(self)) + finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false)) finally try parent ! NullMessage // read ScalaDoc of NullMessage to see why finally try tellWatchersWeDied(a) finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure diff --git a/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala b/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala index 7f043ebd3d..e7f127ed59 100644 --- a/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala +++ b/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala @@ -4,7 +4,7 @@ package akka.dispatch.sysmsg import scala.annotation.tailrec -import akka.actor.{ ActorRef, PossiblyHarmful } +import akka.actor.{ InternalActorRef, ActorRef, PossiblyHarmful } /** * INTERNAL API @@ -227,16 +227,11 @@ private[akka] case class Terminate() extends SystemMessage // sent to self from */ @SerialVersionUID(3245747602115485675L) private[akka] case class Supervise(child: ActorRef, async: Boolean) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start -/** - * INTERNAL API - */ -@SerialVersionUID(5513569382760799668L) -private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to supervisor from ActorCell.doTerminate /** * INTERNAL API */ @SerialVersionUID(3323205435124174788L) -private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to establish a DeathWatch +private[akka] case class Watch(watchee: InternalActorRef, watcher: InternalActorRef) extends SystemMessage // sent to establish a DeathWatch /** * INTERNAL API */ @@ -251,7 +246,13 @@ private[akka] case object NoMessage extends SystemMessage // switched into the m /** * INTERNAL API */ -@SerialVersionUID(3L) +@SerialVersionUID(1L) private[akka] case class Failed(child: ActorRef, cause: Throwable, uid: Int) extends SystemMessage with StashWhenFailed - with StashWhenWaitingForChildren \ No newline at end of file + with StashWhenWaitingForChildren + +@SerialVersionUID(1L) +private[akka] case class DeathWatchNotification( + actor: ActorRef, + existenceConfirmed: Boolean, + addressTerminated: Boolean) extends SystemMessage diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 085b7ad231..0a26eca0ca 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -206,10 +206,13 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide } override def sendSystemMessage(message: SystemMessage): Unit = message match { - case _: Terminate ⇒ stop() + case _: Terminate ⇒ stop() + case DeathWatchNotification(a, ec, at) ⇒ this.!(Terminated(a)(existenceConfirmed = ec, addressTerminated = at)) case Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this) { - if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true, addressTerminated = false) + if (!addWatcher(watcher)) + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watcher.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed = true, addressTerminated = false)) } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this)) case Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher) @@ -228,8 +231,11 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide result tryComplete Failure(new ActorKilledException("Stopped")) val watchers = clearWatchers() if (!watchers.isEmpty) { - val termination = Terminated(this)(existenceConfirmed = true, addressTerminated = false) - watchers foreach { _.tell(termination, this) } + watchers foreach { watcher ⇒ + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watcher.asInstanceOf[InternalActorRef] + .sendSystemMessage(DeathWatchNotification(watcher, existenceConfirmed = true, addressTerminated = false)) + } } } state match { diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 2fa9f0876e..0cdc5df377 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -50,7 +50,7 @@ trait GracefulStopSupport { else { val internalTarget = target.asInstanceOf[InternalActorRef] val ref = PromiseActorRef(internalTarget.provider, Timeout(timeout)) - internalTarget.sendSystemMessage(Watch(target, ref)) + internalTarget.sendSystemMessage(Watch(internalTarget, ref)) target.tell(stopMessage, Actor.noSender) ref.result.future.transform( { diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index c43e80871f..69f41fcbbf 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -75,8 +75,8 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo */ def applyRoute(sender: ActorRef, message: Any): immutable.Iterable[Destination] = message match { - case _: AutoReceivedMessage ⇒ Destination(sender, self) :: Nil - case CurrentRoutees ⇒ { sender ! RouterRoutees(_routees); Nil } + case _: AutoReceivedMessage | _: Terminated ⇒ Destination(sender, self) :: Nil + case CurrentRoutees ⇒ { sender ! RouterRoutees(_routees); Nil } case _ ⇒ val payload = (sender, message) if (route isDefinedAt payload) route(payload) else Nil diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index af6affaed5..89c29503f7 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -18,7 +18,7 @@ import akka.actor.Props import akka.actor.Scheduler import akka.actor.Scope import akka.actor.Terminated -import akka.dispatch.sysmsg.ChildTerminated +import akka.dispatch.sysmsg.DeathWatchNotification import akka.event.EventStream import akka.japi.Util.immutableSeq import akka.remote.RemoteActorRefProvider @@ -101,8 +101,8 @@ private[akka] class RemoteDeploymentWatcher extends Actor { context.watch(a) case t @ Terminated(a) if supervisors isDefinedAt a ⇒ - // send extra ChildTerminated to the supervisor so that it will remove the child - supervisors(a).sendSystemMessage(ChildTerminated(a)) + // send extra DeathWatchNotification to the supervisor so that it will remove the child + supervisors(a).sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true)) supervisors -= a case _: Terminated ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 47cce0d3d9..c7844b5350 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -8,7 +8,7 @@ import scala.annotation.tailrec import scala.util.control.NonFatal import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor, AddressTerminated } import akka.event.LoggingAdapter -import akka.dispatch.sysmsg.Watch +import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Watch } import akka.actor.ActorRefWithCell import akka.actor.ActorRefScope import akka.util.Switch @@ -80,6 +80,15 @@ private[akka] class RemoteSystemDaemon( } } + override def sendSystemMessage(message: SystemMessage): Unit = message match { + case DeathWatchNotification(child: ActorRefWithCell with ActorRefScope, _, _) if child.isLocal ⇒ + terminating.locked { + removeChild(child.path.elements.drop(1).mkString("/")) + terminationHookDoneWhenNoChildren() + } + case _ ⇒ super.sendSystemMessage(message) + } + override def !(msg: Any)(implicit sender: ActorRef = Actor.noSender): Unit = try msg match { case message: DaemonMsg ⇒ log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address) @@ -126,12 +135,6 @@ private[akka] class RemoteSystemDaemon( case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(this)) - case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal ⇒ - terminating.locked { - removeChild(child.path.elements.drop(1).mkString("/")) - terminationHookDoneWhenNoChildren() - } - case t: Terminated ⇒ case TerminationHook ⇒ diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 8ce3aff700..40102ee7c3 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -276,7 +276,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A else { val internalTarget = target.asInstanceOf[InternalActorRef] val ref = PromiseActorRef(internalTarget.provider, timeout) - internalTarget.sendSystemMessage(Watch(target, ref)) + internalTarget.sendSystemMessage(Watch(internalTarget, ref)) target.tell(mode, ref) ref.result.future.transform({ case Terminated(t) if t.path == target.path ⇒ SetThrottleAck