diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 98d3e71384..f1952b8f79 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -73,7 +73,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with watch(router) watch(c2) system.stop(c2) - expectMsg(Terminated(c2)(stopped = true)) + expectMsg(Terminated(c2)(existenceConfirmed = true)) // it might take a while until the Router has actually processed the Terminated message awaitCond { router ! "" @@ -84,7 +84,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with res == Seq(c1, c1) } system.stop(c1) - expectMsg(Terminated(router)(stopped = true)) + expectMsg(Terminated(router)(existenceConfirmed = true)) } "be able to send their routees" in { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index c8962e819f..c795534cdf 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -59,7 +59,7 @@ case object Kill extends Kill { /** * When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated. */ -case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty stopped: Boolean) +case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty existenceConfirmed: Boolean) abstract class ReceiveTimeout extends PossiblyHarmful diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 3db70d5735..736e004c6e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -724,19 +724,20 @@ private[akka] class ActorCell( parent.sendSystemMessage(ChildTerminated(self)) if (!watchedBy.isEmpty) { - val terminated = Terminated(self)(stopped = true) + val terminated = Terminated(self)(existenceConfirmed = true) try { watchedBy foreach { - watcher ⇒ try watcher.tell(terminated, self) catch { - case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) - } + watcher ⇒ + try watcher.tell(terminated, self) catch { + case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) + } } } finally watchedBy = emptyActorRefSet } if (!watching.isEmpty) { try { - watching foreach { + watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ case watchee: InternalActorRef ⇒ try watchee.sendSystemMessage(Unwatch(watchee, self)) catch { case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index a713a61ddc..7368ae434a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -424,10 +424,11 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider, protected def specialHandle(msg: Any): Boolean = msg match { case w: Watch ⇒ - if (w.watchee == this && w.watcher != this) w.watcher ! Terminated(w.watchee)(stopped = false) + if (w.watchee == this && w.watcher != this) + w.watcher ! Terminated(w.watchee)(existenceConfirmed = false) true - case w: Unwatch ⇒ true // Just ignore + case _: Unwatch ⇒ true // Just ignore case _ ⇒ false } } @@ -449,7 +450,7 @@ private[akka] class DeadLetterActorRef(_provider: ActorRefProvider, override protected def specialHandle(msg: Any): Boolean = msg match { case w: Watch ⇒ - if (w.watchee != this && w.watcher != this) w.watcher ! Terminated(w.watchee)(stopped = false) + if (w.watchee != this && w.watcher != this) w.watcher ! Terminated(w.watchee)(existenceConfirmed = false) true case w: Unwatch ⇒ true // Just ignore diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index eede9e1bef..6807e34c55 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -403,8 +403,8 @@ class LocalActorRefProvider( def receive = { case Terminated(_) ⇒ context.stop(self) - case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case NonFatal(e) ⇒ e }) // FIXME shouldn't this use NonFatal & Status.Failure? - case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case NonFatal(e) ⇒ e }) // FIXME shouldn't this use NonFatal & Status.Failure? + case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case NonFatal(e) ⇒ Status.Failure(e) }) + case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case NonFatal(e) ⇒ Status.Failure(e) }) case StopChild(child) ⇒ context.stop(child); sender ! "ok" case m ⇒ deadLetters ! DeadLetter(m, sender, self) } @@ -435,8 +435,8 @@ class LocalActorRefProvider( def receive = { case Terminated(_) ⇒ eventStream.stopDefaultLoggers(); context.stop(self) - case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e }) // FIXME shouldn't this use NonFatal & Status.Failure? - case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case e: Exception ⇒ e }) // FIXME shouldn't this use NonFatal & Status.Failure? + case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case NonFatal(e) ⇒ Status.Failure(e) }) + case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case NonFatal(e) ⇒ Status.Failure(e) }) case StopChild(child) ⇒ context.stop(child); sender ! "ok" case m ⇒ deadLetters ! DeadLetter(m, sender, self) } @@ -502,8 +502,10 @@ class LocalActorRefProvider( def init(_system: ActorSystemImpl) { system = _system // chain death watchers so that killing guardian stops the application - guardian.sendSystemMessage(Watch(guardian, systemGuardian)) + systemGuardian.sendSystemMessage(Watch(guardian, systemGuardian)) rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian)) + //guardian.sendSystemMessage(Watch(guardian, systemGuardian)) + //rootGuardian.sendSystemMessage(Watch(systemGuardian, rootGuardian)) eventStream.startDefaultLoggers(_system) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 993e7e98e4..008610c333 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -480,26 +480,17 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, private[akka] def systemActorOf(props: Props, name: String): ActorRef = { implicit val timeout = settings.CreationTimeout - Await.result(systemGuardian ? CreateChild(props, name), timeout.duration) match { - case ref: ActorRef ⇒ ref - case ex: Exception ⇒ throw ex - } + Await.result((systemGuardian ? CreateChild(props, name)).mapTo[ActorRef], timeout.duration) } def actorOf(props: Props, name: String): ActorRef = { implicit val timeout = settings.CreationTimeout - Await.result(guardian ? CreateChild(props, name), timeout.duration) match { - case ref: ActorRef ⇒ ref - case ex: Exception ⇒ throw ex - } + Await.result((guardian ? CreateChild(props, name)).mapTo[ActorRef], timeout.duration) } def actorOf(props: Props): ActorRef = { implicit val timeout = settings.CreationTimeout - Await.result(guardian ? CreateRandomNameChild(props), timeout.duration) match { - case ref: ActorRef ⇒ ref - case ex: Exception ⇒ throw ex - } + Await.result((guardian ? CreateRandomNameChild(props)).mapTo[ActorRef], timeout.duration) } def stop(actor: ActorRef): Unit = { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 8e160276e8..9517a59b7c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -102,11 +102,11 @@ private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage /** * INTERNAL API */ -private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to self from ActorCell.watch +private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to establish a DeathWatch /** * INTERNAL API */ -private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch +private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Runnable { def run(): Unit = diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 42154ff522..c66fa4178d 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -192,7 +192,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide @tailrec // Returns false if the Promise is already completed private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match { case null ⇒ false - case other ⇒ if (updateWatchedBy(other, other + watcher)) true else addWatcher(watcher) + case other ⇒ updateWatchedBy(other, other + watcher) || addWatcher(watcher) } @tailrec @@ -259,7 +259,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide case _: Terminate ⇒ stop() case Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this) { - if (!addWatcher(watcher)) watcher ! Terminated(watchee)(stopped = true) + if (!addWatcher(watcher)) watcher ! Terminated(watchee)(existenceConfirmed = true) } else System.err.println("BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, this)) case Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher) @@ -278,7 +278,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped"))) val watchers = clearWatchers() if (!watchers.isEmpty) { - val termination = Terminated(this)(stopped = true) + val termination = Terminated(this)(existenceConfirmed = true) watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } } } } diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 35004e637d..91293cb0d1 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -40,12 +40,15 @@ trait GracefulStopSupport { val internalTarget = target.asInstanceOf[InternalActorRef] val ref = PromiseActorRef(e.provider, Timeout(timeout)) internalTarget.sendSystemMessage(Watch(target, ref)) - val result = ref.result map { - case Terminated(`target`) ⇒ true - case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, ref)); false // Just making sure we're not leaking here + ref.result onComplete { // Just making sure we're not leaking here + case Right(Terminated(`target`)) ⇒ () + case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, ref)) } target ! PoisonPill - result + ref.result map { + case Terminated(`target`) ⇒ true + case _ ⇒ false + } case s ⇒ throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'") } } diff --git a/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala b/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala index 4e0fdc5ee5..65e03bd2ea 100644 --- a/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala @@ -111,7 +111,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { //#stop watch(child) // have testActor watch “child” child ! new IllegalArgumentException // break it - expectMsg(Terminated(child)(stopped = true)) + expectMsg(Terminated(child)(existenceConfirmed = true)) child.isTerminated must be(true) //#stop } @@ -125,7 +125,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { expectMsg(0) child2 ! new Exception("CRASH") // escalate failure - expectMsg(Terminated(child2)(stopped = true)) + expectMsg(Terminated(child2)(existenceConfirmed = true)) //#escalate-kill //#escalate-restart val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2") diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index d2eeeee776..5eb0c0538a 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -12,7 +12,6 @@ import akka.util.duration._ import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.actor.PoisonPill -import akka.actor.CreateChild import akka.actor.DeadLetter import java.util.concurrent.TimeoutException import akka.dispatch.{ Await, MessageDispatcher }