Terminated is now a SystemMessage
This commit is contained in:
parent
2c4c5b0594
commit
7b13ecea25
17 changed files with 82 additions and 69 deletions
|
|
@ -5,7 +5,7 @@
|
|||
package akka.actor
|
||||
|
||||
import language.postfixOps
|
||||
import akka.dispatch.sysmsg.Failed
|
||||
import akka.dispatch.sysmsg.{ DeathWatchNotification, Failed }
|
||||
import akka.pattern.ask
|
||||
import akka.testkit._
|
||||
import scala.concurrent.duration._
|
||||
|
|
@ -176,13 +176,11 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
|
|||
|
||||
"only notify when watching" in {
|
||||
val subject = system.actorOf(Props(new Actor { def receive = Actor.emptyBehavior }))
|
||||
val observer = system.actorOf(Props(new Actor {
|
||||
context.watch(subject)
|
||||
def receive = { case x ⇒ testActor forward x }
|
||||
}))
|
||||
|
||||
subject ! PoisonPill
|
||||
// the testActor is not watching subject and will discard Terminated msg
|
||||
testActor.asInstanceOf[InternalActorRef]
|
||||
.sendSystemMessage(DeathWatchNotification(subject, existenceConfirmed = true, addressTerminated = false))
|
||||
|
||||
// the testActor is not watching subject and will not receive a Terminated msg
|
||||
expectNoMsg
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -345,7 +345,7 @@ object SupervisorHierarchySpec {
|
|||
* Execution happens in phases (which is the reason for FSM):
|
||||
*
|
||||
* Idle:
|
||||
* - upon reception of Init message, construct hierary and go to Init state
|
||||
* - upon reception of Init message, construct hierarchy and go to Init state
|
||||
*
|
||||
* Init:
|
||||
* - receive refs of all contained actors
|
||||
|
|
|
|||
|
|
@ -392,8 +392,14 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
|
||||
override def postRestart(reason: Throwable): Unit = testActor ! "parent restarted"
|
||||
|
||||
// Overriding to disable auto-unwatch
|
||||
override def preRestart(reason: Throwable, msg: Option[Any]): Unit = {
|
||||
context.children foreach context.stop
|
||||
postStop()
|
||||
}
|
||||
|
||||
def receive = {
|
||||
case Terminated(a) if a.path == child.path ⇒ testActor ! "child terminated" // FIXME case t @ Terminated(`child`) ticket #3156
|
||||
case Terminated(a) if a.path == child.path ⇒ testActor ! "child terminated"
|
||||
case l: TestLatch ⇒ child ! l
|
||||
case "test" ⇒ sender ! "green"
|
||||
case "testchild" ⇒ child forward "test"
|
||||
|
|
|
|||
|
|
@ -122,7 +122,6 @@ object SerializationTests {
|
|||
classOf[Resume],
|
||||
classOf[Terminate],
|
||||
classOf[Supervise],
|
||||
classOf[ChildTerminated],
|
||||
classOf[Watch],
|
||||
classOf[Unwatch],
|
||||
classOf[Failed],
|
||||
|
|
@ -350,11 +349,8 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR
|
|||
"be preserved for the Supervise SystemMessage" in {
|
||||
verify(Supervise(FakeActorRef("child"), true), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001e616b6b612e64697370617463682e7379736d73672e5375706572766973652d0b363f56ab5feb0200025a00056173796e634c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b7870017372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
|
||||
}
|
||||
"be preserved for the ChildTerminated SystemMessage" in {
|
||||
verify(ChildTerminated(FakeActorRef("child")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720024616b6b612e64697370617463682e7379736d73672e4368696c645465726d696e617465644c84222437ed5db40200014c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
|
||||
}
|
||||
"be preserved for the Watch SystemMessage" in {
|
||||
verify(Watch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001a616b6b612e64697370617463682e7379736d73672e57617463682e1e65bc74394fc40200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003")
|
||||
verify(Watch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001a616b6b612e64697370617463682e7379736d73672e57617463682e1e65bc74394fc40200024c00077761746368656574001d4c616b6b612f6163746f722f496e7465726e616c4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003")
|
||||
}
|
||||
"be preserved for the Unwatch SystemMessage" in {
|
||||
verify(Unwatch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001c616b6b612e64697370617463682e7379736d73672e556e776174636858501f7ee63dc2100200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003")
|
||||
|
|
@ -364,7 +360,7 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR
|
|||
}
|
||||
"be preserved for the Failed SystemMessage" in {
|
||||
// Using null as the cause to avoid a large serialized message
|
||||
verify(Failed(FakeActorRef("child"), cause = null, uid = 0), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001b616b6b612e64697370617463682e7379736d73672e4661696c656400000000000000030200034900037569644c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b4c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b787000000000707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
|
||||
verify(Failed(FakeActorRef("child"), cause = null, uid = 0), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001b616b6b612e64697370617463682e7379736d73672e4661696c656400000000000000010200034900037569644c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b4c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b787000000000707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566869e6c3669e60e5f02000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) {
|
|||
@SerialVersionUID(1L)
|
||||
case class Terminated private[akka] (@BeanProperty actor: ActorRef)(
|
||||
@BeanProperty val existenceConfirmed: Boolean,
|
||||
@BeanProperty val addressTerminated: Boolean) extends AutoReceivedMessage with PossiblyHarmful
|
||||
@BeanProperty val addressTerminated: Boolean) extends PossiblyHarmful
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -500,7 +500,10 @@ trait Actor {
|
|||
*/
|
||||
@throws(classOf[Exception])
|
||||
def preRestart(reason: Throwable, message: Option[Any]) {
|
||||
context.children foreach context.stop
|
||||
context.children foreach { child ⇒
|
||||
context.unwatch(child)
|
||||
context.stop(child)
|
||||
}
|
||||
postStop()
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import akka.actor.dungeon.ChildrenContainer
|
|||
import akka.dispatch.Envelope
|
||||
import akka.dispatch.NullMessage
|
||||
import akka.dispatch.sysmsg._
|
||||
import akka.dispatch.sysmsg.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, Create, ChildTerminated }
|
||||
import akka.event.Logging.{ LogEvent, Debug, Error }
|
||||
import akka.japi.Procedure
|
||||
import java.io.{ ObjectOutputStream, NotSerializableException }
|
||||
|
|
@ -424,6 +423,7 @@ private[akka] class ActorCell(
|
|||
message match {
|
||||
case message: SystemMessage if shouldStash(message, currentState) ⇒ stash(message)
|
||||
case f: Failed ⇒ handleFailure(f)
|
||||
case DeathWatchNotification(a, ec, at) ⇒ watchedActorTerminated(a, ec, at)
|
||||
case Create() ⇒ create()
|
||||
case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher)
|
||||
case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher)
|
||||
|
|
@ -432,7 +432,6 @@ private[akka] class ActorCell(
|
|||
case Resume(inRespToFailure) ⇒ faultResume(inRespToFailure)
|
||||
case Terminate() ⇒ terminate()
|
||||
case Supervise(child, async) ⇒ supervise(child, async)
|
||||
case ChildTerminated(child) ⇒ handleChildTerminated(child)
|
||||
case NoMessage ⇒ // only here to suppress warning
|
||||
}
|
||||
} catch handleNonFatalOrInterruptedException { e ⇒
|
||||
|
|
@ -471,7 +470,6 @@ private[akka] class ActorCell(
|
|||
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
|
||||
|
||||
msg.message match {
|
||||
case t: Terminated ⇒ watchedActorTerminated(t)
|
||||
case AddressTerminated(address) ⇒ addressTerminated(address)
|
||||
case Kill ⇒ throw new ActorKilledException("Kill")
|
||||
case PoisonPill ⇒ self.stop()
|
||||
|
|
|
|||
|
|
@ -495,7 +495,8 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
|
|||
protected def specialHandle(msg: Any, sender: ActorRef): Boolean = msg match {
|
||||
case w: Watch ⇒
|
||||
if (w.watchee == this && w.watcher != this)
|
||||
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false)
|
||||
w.watcher.sendSystemMessage(
|
||||
DeathWatchNotification(w.watchee, existenceConfirmed = false, addressTerminated = false))
|
||||
true
|
||||
case _: Unwatch ⇒ true // Just ignore
|
||||
case Identify(messageId) ⇒
|
||||
|
|
@ -529,7 +530,8 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider,
|
|||
override protected def specialHandle(msg: Any, sender: ActorRef): Boolean = msg match {
|
||||
case w: Watch ⇒
|
||||
if (w.watchee != this && w.watcher != this)
|
||||
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false, addressTerminated = false)
|
||||
w.watcher.sendSystemMessage(
|
||||
DeathWatchNotification(w.watchee, existenceConfirmed = false, addressTerminated = false))
|
||||
true
|
||||
case w: Unwatch ⇒ true // Just ignore
|
||||
case Identify(messageId) ⇒
|
||||
|
|
|
|||
|
|
@ -433,10 +433,10 @@ private[akka] class LocalActorRefProvider private[akka] (
|
|||
|
||||
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
|
||||
message match {
|
||||
case Failed(child, ex, _) ⇒ { causeOfTermination = Some(ex); child.asInstanceOf[InternalActorRef].stop() }
|
||||
case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead
|
||||
case ChildTerminated(_) ⇒ stop()
|
||||
case _ ⇒ log.error(this + " received unexpected system message [" + message + "]")
|
||||
case Failed(child, ex, _) ⇒ { causeOfTermination = Some(ex); child.asInstanceOf[InternalActorRef].stop() }
|
||||
case _: Supervise ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead
|
||||
case _: DeathWatchNotification ⇒ stop()
|
||||
case _ ⇒ log.error(this + " received unexpected system message [" + message + "]")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.actor.dungeon
|
||||
|
||||
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, ActorCell, Actor, Address, AddressTerminated }
|
||||
import akka.dispatch.sysmsg.{ ChildTerminated, Watch, Unwatch }
|
||||
import akka.dispatch.sysmsg.{ DeathWatchNotification, Watch, Unwatch }
|
||||
import akka.event.Logging.{ Warning, Error, Debug }
|
||||
import scala.util.control.NonFatal
|
||||
import akka.actor.MinimalActorRef
|
||||
|
|
@ -41,13 +41,15 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
|
|||
* When this actor is watching the subject of [[akka.actor.Terminated]] message
|
||||
* it will be propagated to user's receive.
|
||||
*/
|
||||
protected def watchedActorTerminated(t: Terminated): Unit =
|
||||
if (watchingContains(t.actor)) {
|
||||
maintainAddressTerminatedSubscription(t.actor) {
|
||||
removeWatching(t.actor)
|
||||
protected def watchedActorTerminated(actor: ActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = {
|
||||
if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor)
|
||||
if (watchingContains(actor)) {
|
||||
maintainAddressTerminatedSubscription(actor) {
|
||||
removeWatching(actor)
|
||||
}
|
||||
receiveMessage(t)
|
||||
if (!isTerminating) self.tell(Terminated(actor)(existenceConfirmed, addressTerminated), actor)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO this should be removed and be replaced with `watching.contains(subject)`
|
||||
// when all actor references have uid, i.e. actorFor is removed
|
||||
|
|
@ -66,10 +68,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
|
|||
|
||||
protected def tellWatchersWeDied(actor: Actor): Unit = {
|
||||
if (!watchedBy.isEmpty) {
|
||||
val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false)
|
||||
try {
|
||||
def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit =
|
||||
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.tell(terminated, self)
|
||||
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.asInstanceOf[InternalActorRef].sendSystemMessage(
|
||||
DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
|
||||
|
||||
/*
|
||||
* It is important to notify the remote watchers first, otherwise RemoteDaemon might shut down, causing
|
||||
|
|
@ -141,15 +143,14 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
|
|||
for (a ← watchedBy; if a.path.address == address) watchedBy -= a
|
||||
}
|
||||
|
||||
// send Terminated to self for all matching subjects
|
||||
// existenceConfirmed = false because we could have been watching a
|
||||
// send DeathWatchNotification to self for all matching subjects
|
||||
// that are not child with existenceConfirmed = false because we could have been watching a
|
||||
// non-local ActorRef that had never resolved before the other node went down
|
||||
// When a parent is watching a child and it terminates due to AddressTerminated
|
||||
// it is removed by sending ChildTerminated to support immediate creation of child
|
||||
// with same name.
|
||||
// it is removed by sending DeathWatchNotification with existenceConfirmed = true to support
|
||||
// immediate creation of child with same name.
|
||||
for (a ← watching; if a.path.address == address) {
|
||||
childrenRefs.getByRef(a) foreach { _ ⇒ self.sendSystemMessage(ChildTerminated(a)) }
|
||||
self ! Terminated(a)(existenceConfirmed = false, addressTerminated = true)
|
||||
self.sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = childrenRefs.getByRef(a).isDefined, addressTerminated = true))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ import akka.actor.PostRestartException
|
|||
import akka.actor.PreRestartException
|
||||
import akka.actor.{ InternalActorRef, ActorRef, ActorInterruptedException, ActorCell, Actor }
|
||||
import akka.dispatch._
|
||||
import akka.dispatch.sysmsg.ChildTerminated
|
||||
import akka.dispatch.sysmsg._
|
||||
import akka.event.Logging
|
||||
import akka.event.Logging.Debug
|
||||
|
|
@ -202,7 +201,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
|
|||
try if (a ne null) a.postStop()
|
||||
catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(a), e.getMessage)) }
|
||||
finally try dispatcher.detach(this)
|
||||
finally try parent.sendSystemMessage(ChildTerminated(self))
|
||||
finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
|
||||
finally try parent ! NullMessage // read ScalaDoc of NullMessage to see why
|
||||
finally try tellWatchersWeDied(a)
|
||||
finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.dispatch.sysmsg
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import akka.actor.{ ActorRef, PossiblyHarmful }
|
||||
import akka.actor.{ InternalActorRef, ActorRef, PossiblyHarmful }
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -227,16 +227,11 @@ private[akka] case class Terminate() extends SystemMessage // sent to self from
|
|||
*/
|
||||
@SerialVersionUID(3245747602115485675L)
|
||||
private[akka] case class Supervise(child: ActorRef, async: Boolean) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(5513569382760799668L)
|
||||
private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to supervisor from ActorCell.doTerminate
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(3323205435124174788L)
|
||||
private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to establish a DeathWatch
|
||||
private[akka] case class Watch(watchee: InternalActorRef, watcher: InternalActorRef) extends SystemMessage // sent to establish a DeathWatch
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
@ -251,7 +246,13 @@ private[akka] case object NoMessage extends SystemMessage // switched into the m
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@SerialVersionUID(3L)
|
||||
@SerialVersionUID(1L)
|
||||
private[akka] case class Failed(child: ActorRef, cause: Throwable, uid: Int) extends SystemMessage
|
||||
with StashWhenFailed
|
||||
with StashWhenWaitingForChildren
|
||||
with StashWhenWaitingForChildren
|
||||
|
||||
@SerialVersionUID(1L)
|
||||
private[akka] case class DeathWatchNotification(
|
||||
actor: ActorRef,
|
||||
existenceConfirmed: Boolean,
|
||||
addressTerminated: Boolean) extends SystemMessage
|
||||
|
|
|
|||
|
|
@ -206,10 +206,13 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
|
|||
}
|
||||
|
||||
override def sendSystemMessage(message: SystemMessage): Unit = message match {
|
||||
case _: Terminate ⇒ stop()
|
||||
case _: Terminate ⇒ stop()
|
||||
case DeathWatchNotification(a, ec, at) ⇒ this.!(Terminated(a)(existenceConfirmed = ec, addressTerminated = at))
|
||||
case Watch(watchee, watcher) ⇒
|
||||
if (watchee == this && watcher != this) {
|
||||
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true, addressTerminated = false)
|
||||
if (!addWatcher(watcher))
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
watcher.sendSystemMessage(DeathWatchNotification(watchee, existenceConfirmed = true, addressTerminated = false))
|
||||
} else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
|
||||
case Unwatch(watchee, watcher) ⇒
|
||||
if (watchee == this && watcher != this) remWatcher(watcher)
|
||||
|
|
@ -228,8 +231,11 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
|
|||
result tryComplete Failure(new ActorKilledException("Stopped"))
|
||||
val watchers = clearWatchers()
|
||||
if (!watchers.isEmpty) {
|
||||
val termination = Terminated(this)(existenceConfirmed = true, addressTerminated = false)
|
||||
watchers foreach { _.tell(termination, this) }
|
||||
watchers foreach { watcher ⇒
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
watcher.asInstanceOf[InternalActorRef]
|
||||
.sendSystemMessage(DeathWatchNotification(watcher, existenceConfirmed = true, addressTerminated = false))
|
||||
}
|
||||
}
|
||||
}
|
||||
state match {
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ trait GracefulStopSupport {
|
|||
else {
|
||||
val internalTarget = target.asInstanceOf[InternalActorRef]
|
||||
val ref = PromiseActorRef(internalTarget.provider, Timeout(timeout))
|
||||
internalTarget.sendSystemMessage(Watch(target, ref))
|
||||
internalTarget.sendSystemMessage(Watch(internalTarget, ref))
|
||||
target.tell(stopMessage, Actor.noSender)
|
||||
ref.result.future.transform(
|
||||
{
|
||||
|
|
|
|||
|
|
@ -75,8 +75,8 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
|
|||
*/
|
||||
|
||||
def applyRoute(sender: ActorRef, message: Any): immutable.Iterable[Destination] = message match {
|
||||
case _: AutoReceivedMessage ⇒ Destination(sender, self) :: Nil
|
||||
case CurrentRoutees ⇒ { sender ! RouterRoutees(_routees); Nil }
|
||||
case _: AutoReceivedMessage | _: Terminated ⇒ Destination(sender, self) :: Nil
|
||||
case CurrentRoutees ⇒ { sender ! RouterRoutees(_routees); Nil }
|
||||
case _ ⇒
|
||||
val payload = (sender, message)
|
||||
if (route isDefinedAt payload) route(payload) else Nil
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ import akka.actor.Props
|
|||
import akka.actor.Scheduler
|
||||
import akka.actor.Scope
|
||||
import akka.actor.Terminated
|
||||
import akka.dispatch.sysmsg.ChildTerminated
|
||||
import akka.dispatch.sysmsg.DeathWatchNotification
|
||||
import akka.event.EventStream
|
||||
import akka.japi.Util.immutableSeq
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
|
|
@ -101,8 +101,8 @@ private[akka] class RemoteDeploymentWatcher extends Actor {
|
|||
context.watch(a)
|
||||
|
||||
case t @ Terminated(a) if supervisors isDefinedAt a ⇒
|
||||
// send extra ChildTerminated to the supervisor so that it will remove the child
|
||||
supervisors(a).sendSystemMessage(ChildTerminated(a))
|
||||
// send extra DeathWatchNotification to the supervisor so that it will remove the child
|
||||
supervisors(a).sendSystemMessage(DeathWatchNotification(a, existenceConfirmed = false, addressTerminated = true))
|
||||
supervisors -= a
|
||||
|
||||
case _: Terminated ⇒
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import scala.annotation.tailrec
|
|||
import scala.util.control.NonFatal
|
||||
import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor, AddressTerminated }
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.dispatch.sysmsg.Watch
|
||||
import akka.dispatch.sysmsg.{ DeathWatchNotification, SystemMessage, Watch }
|
||||
import akka.actor.ActorRefWithCell
|
||||
import akka.actor.ActorRefScope
|
||||
import akka.util.Switch
|
||||
|
|
@ -80,6 +80,15 @@ private[akka] class RemoteSystemDaemon(
|
|||
}
|
||||
}
|
||||
|
||||
override def sendSystemMessage(message: SystemMessage): Unit = message match {
|
||||
case DeathWatchNotification(child: ActorRefWithCell with ActorRefScope, _, _) if child.isLocal ⇒
|
||||
terminating.locked {
|
||||
removeChild(child.path.elements.drop(1).mkString("/"))
|
||||
terminationHookDoneWhenNoChildren()
|
||||
}
|
||||
case _ ⇒ super.sendSystemMessage(message)
|
||||
}
|
||||
|
||||
override def !(msg: Any)(implicit sender: ActorRef = Actor.noSender): Unit = try msg match {
|
||||
case message: DaemonMsg ⇒
|
||||
log.debug("Received command [{}] to RemoteSystemDaemon on [{}]", message, path.address)
|
||||
|
|
@ -126,12 +135,6 @@ private[akka] class RemoteSystemDaemon(
|
|||
|
||||
case Identify(messageId) ⇒ sender ! ActorIdentity(messageId, Some(this))
|
||||
|
||||
case Terminated(child: ActorRefWithCell) if child.asInstanceOf[ActorRefScope].isLocal ⇒
|
||||
terminating.locked {
|
||||
removeChild(child.path.elements.drop(1).mkString("/"))
|
||||
terminationHookDoneWhenNoChildren()
|
||||
}
|
||||
|
||||
case t: Terminated ⇒
|
||||
|
||||
case TerminationHook ⇒
|
||||
|
|
|
|||
|
|
@ -276,7 +276,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
|
|||
else {
|
||||
val internalTarget = target.asInstanceOf[InternalActorRef]
|
||||
val ref = PromiseActorRef(internalTarget.provider, timeout)
|
||||
internalTarget.sendSystemMessage(Watch(target, ref))
|
||||
internalTarget.sendSystemMessage(Watch(internalTarget, ref))
|
||||
target.tell(mode, ref)
|
||||
ref.result.future.transform({
|
||||
case Terminated(t) if t.path == target.path ⇒ SetThrottleAck
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue