Incorporating Roland's feedback

This commit is contained in:
Viktor Klang 2012-06-02 14:49:28 +02:00
parent e461e1490d
commit 17ee47079a
11 changed files with 37 additions and 40 deletions

View file

@ -73,7 +73,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
watch(router) watch(router)
watch(c2) watch(c2)
system.stop(c2) system.stop(c2)
expectMsg(Terminated(c2)(stopped = true)) expectMsg(Terminated(c2)(existenceConfirmed = true))
// it might take a while until the Router has actually processed the Terminated message // it might take a while until the Router has actually processed the Terminated message
awaitCond { awaitCond {
router ! "" router ! ""
@ -84,7 +84,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
res == Seq(c1, c1) res == Seq(c1, c1)
} }
system.stop(c1) system.stop(c1)
expectMsg(Terminated(router)(stopped = true)) expectMsg(Terminated(router)(existenceConfirmed = true))
} }
"be able to send their routees" in { "be able to send their routees" in {

View file

@ -59,7 +59,7 @@ case object Kill extends Kill {
/** /**
* When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated. * When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated.
*/ */
case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty stopped: Boolean) case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty existenceConfirmed: Boolean)
abstract class ReceiveTimeout extends PossiblyHarmful abstract class ReceiveTimeout extends PossiblyHarmful

View file

@ -724,19 +724,20 @@ private[akka] class ActorCell(
parent.sendSystemMessage(ChildTerminated(self)) parent.sendSystemMessage(ChildTerminated(self))
if (!watchedBy.isEmpty) { if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(stopped = true) val terminated = Terminated(self)(existenceConfirmed = true)
try { try {
watchedBy foreach { watchedBy foreach {
watcher try watcher.tell(terminated, self) catch { watcher
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) try watcher.tell(terminated, self) catch {
} case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
} }
} finally watchedBy = emptyActorRefSet } finally watchedBy = emptyActorRefSet
} }
if (!watching.isEmpty) { if (!watching.isEmpty) {
try { try {
watching foreach { watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
case watchee: InternalActorRef try watchee.sendSystemMessage(Unwatch(watchee, self)) catch { case watchee: InternalActorRef try watchee.sendSystemMessage(Unwatch(watchee, self)) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
} }

View file

@ -424,10 +424,11 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
protected def specialHandle(msg: Any): Boolean = msg match { protected def specialHandle(msg: Any): Boolean = msg match {
case w: Watch case w: Watch
if (w.watchee == this && w.watcher != this) w.watcher ! Terminated(w.watchee)(stopped = false) if (w.watchee == this && w.watcher != this)
w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
true true
case w: Unwatch true // Just ignore case _: Unwatch true // Just ignore
case _ false case _ false
} }
} }
@ -449,7 +450,7 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider,
override protected def specialHandle(msg: Any): Boolean = msg match { override protected def specialHandle(msg: Any): Boolean = msg match {
case w: Watch case w: Watch
if (w.watchee != this && w.watcher != this) w.watcher ! Terminated(w.watchee)(stopped = false) if (w.watchee != this && w.watcher != this) w.watcher ! Terminated(w.watchee)(existenceConfirmed = false)
true true
case w: Unwatch true // Just ignore case w: Unwatch true // Just ignore

View file

@ -403,8 +403,8 @@ class LocalActorRefProvider(
def receive = { def receive = {
case Terminated(_) context.stop(self) case Terminated(_) context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case NonFatal(e) e }) // FIXME shouldn't this use NonFatal & Status.Failure? case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case NonFatal(e) Status.Failure(e) })
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case NonFatal(e) e }) // FIXME shouldn't this use NonFatal & Status.Failure? case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case NonFatal(e) Status.Failure(e) })
case StopChild(child) context.stop(child); sender ! "ok" case StopChild(child) context.stop(child); sender ! "ok"
case m deadLetters ! DeadLetter(m, sender, self) case m deadLetters ! DeadLetter(m, sender, self)
} }
@ -435,8 +435,8 @@ class LocalActorRefProvider(
def receive = { def receive = {
case Terminated(_) eventStream.stopDefaultLoggers(); context.stop(self) case Terminated(_) eventStream.stopDefaultLoggers(); context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure? case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case NonFatal(e) Status.Failure(e) })
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure? case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case NonFatal(e) Status.Failure(e) })
case StopChild(child) context.stop(child); sender ! "ok" case StopChild(child) context.stop(child); sender ! "ok"
case m deadLetters ! DeadLetter(m, sender, self) case m deadLetters ! DeadLetter(m, sender, self)
} }
@ -502,8 +502,10 @@ class LocalActorRefProvider(
def init(_system: ActorSystemImpl) { def init(_system: ActorSystemImpl) {
system = _system system = _system
// chain death watchers so that killing guardian stops the application // chain death watchers so that killing guardian stops the application
guardian.sendSystemMessage(Watch(guardian, systemGuardian)) systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian))
rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian)) rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))
//guardian.sendSystemMessage(Watch(guardian, systemGuardian))
//rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian))
eventStream.startDefaultLoggers(_system) eventStream.startDefaultLoggers(_system)
} }

View file

@ -480,26 +480,17 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
private[akka] def systemActorOf(props: Props, name: String): ActorRef = { private[akka] def systemActorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout implicit val timeout = settings.CreationTimeout
Await.result(systemGuardian ? CreateChild(props, name), timeout.duration) match { Await.result((systemGuardian ? CreateChild(props, name)).mapTo[ActorRef], timeout.duration)
case ref: ActorRef ref
case ex: Exception throw ex
}
} }
def actorOf(props: Props, name: String): ActorRef = { def actorOf(props: Props, name: String): ActorRef = {
implicit val timeout = settings.CreationTimeout implicit val timeout = settings.CreationTimeout
Await.result(guardian ? CreateChild(props, name), timeout.duration) match { Await.result((guardian ? CreateChild(props, name)).mapTo[ActorRef], timeout.duration)
case ref: ActorRef ref
case ex: Exception throw ex
}
} }
def actorOf(props: Props): ActorRef = { def actorOf(props: Props): ActorRef = {
implicit val timeout = settings.CreationTimeout implicit val timeout = settings.CreationTimeout
Await.result(guardian ? CreateRandomNameChild(props), timeout.duration) match { Await.result((guardian ? CreateRandomNameChild(props)).mapTo[ActorRef], timeout.duration)
case ref: ActorRef ref
case ex: Exception throw ex
}
} }
def stop(actor: ActorRef): Unit = { def stop(actor: ActorRef): Unit = {

View file

@ -102,11 +102,11 @@ private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to self from ActorCell.watch private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to establish a DeathWatch
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable { final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable {
def run(): Unit = def run(): Unit =

View file

@ -192,7 +192,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
@tailrec // Returns false if the Promise is already completed @tailrec // Returns false if the Promise is already completed
private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match { private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match {
case null false case null false
case other if (updateWatchedBy(other, other + watcher)) true else addWatcher(watcher) case other updateWatchedBy(other, other + watcher) || addWatcher(watcher)
} }
@tailrec @tailrec
@ -259,7 +259,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
case _: Terminate stop() case _: Terminate stop()
case Watch(watchee, watcher) case Watch(watchee, watcher)
if (watchee == this && watcher != this) { if (watchee == this && watcher != this) {
if (!addWatcher(watcher)) watcher ! Terminated(watchee)(stopped = true) if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true)
} else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this)) } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this))
case Unwatch(watchee, watcher) case Unwatch(watchee, watcher)
if (watchee == this && watcher != this) remWatcher(watcher) if (watchee == this && watcher != this) remWatcher(watcher)
@ -278,7 +278,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped"))) if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped")))
val watchers = clearWatchers() val watchers = clearWatchers()
if (!watchers.isEmpty) { if (!watchers.isEmpty) {
val termination = Terminated(this)(stopped = true) val termination = Terminated(this)(existenceConfirmed = true)
watchers foreach { w try w.tell(termination, this) catch { case NonFatal(t) /* FIXME LOG THIS */ } } watchers foreach { w try w.tell(termination, this) catch { case NonFatal(t) /* FIXME LOG THIS */ } }
} }
} }

View file

@ -40,12 +40,15 @@ trait GracefulStopSupport {
val internalTarget = target.asInstanceOf[InternalActorRef] val internalTarget = target.asInstanceOf[InternalActorRef]
val ref = PromiseActorRef(e.provider, Timeout(timeout)) val ref = PromiseActorRef(e.provider, Timeout(timeout))
internalTarget.sendSystemMessage(Watch(target, ref)) internalTarget.sendSystemMessage(Watch(target, ref))
val result = ref.result map { ref.result onComplete { // Just making sure we're not leaking here
case Terminated(`target`) true case Right(Terminated(`target`)) ()
case _ internalTarget.sendSystemMessage(Unwatch(target, ref)); false // Just making sure we're not leaking here case _ internalTarget.sendSystemMessage(Unwatch(target, ref))
} }
target ! PoisonPill target ! PoisonPill
result ref.result map {
case Terminated(`target`) true
case _ false
}
case s throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'") case s throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'")
} }
} }

View file

@ -111,7 +111,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
//#stop //#stop
watch(child) // have testActor watch child watch(child) // have testActor watch child
child ! new IllegalArgumentException // break it child ! new IllegalArgumentException // break it
expectMsg(Terminated(child)(stopped = true)) expectMsg(Terminated(child)(existenceConfirmed = true))
child.isTerminated must be(true) child.isTerminated must be(true)
//#stop //#stop
} }
@ -125,7 +125,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
expectMsg(0) expectMsg(0)
child2 ! new Exception("CRASH") // escalate failure child2 ! new Exception("CRASH") // escalate failure
expectMsg(Terminated(child2)(stopped = true)) expectMsg(Terminated(child2)(existenceConfirmed = true))
//#escalate-kill //#escalate-kill
//#escalate-restart //#escalate-restart
val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2") val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")

View file

@ -12,7 +12,6 @@ import akka.util.duration._
import com.typesafe.config.Config import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import akka.actor.PoisonPill import akka.actor.PoisonPill
import akka.actor.CreateChild
import akka.actor.DeadLetter import akka.actor.DeadLetter
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import akka.dispatch.{ Await, MessageDispatcher } import akka.dispatch.{ Await, MessageDispatcher }