diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index 5ad6da271f..98d3e71384 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -73,7 +73,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with watch(router) watch(c2) system.stop(c2) - expectMsg(Terminated(c2)) + expectMsg(Terminated(c2)(stopped = true)) // it might take a while until the Router has actually processed the Terminated message awaitCond { router ! "" @@ -84,7 +84,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with res == Seq(c1, c1) } system.stop(c1) - expectMsg(Terminated(router)) + expectMsg(Terminated(router)(stopped = true)) } "be able to send their routees" in { diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 3d93e52a54..8906dcd60e 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -59,7 +59,7 @@ case object Kill extends Kill { /** * When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated. */ -case class Terminated(@BeanProperty actor: ActorRef) extends PossiblyHarmful +case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty stopped: Boolean) extends PossiblyHarmful abstract class ReceiveTimeout extends PossiblyHarmful diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 0955595640..6d49045099 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -6,7 +6,6 @@ package akka.actor import akka.dispatch._ import scala.annotation.tailrec -import scala.collection.immutable.{ Stack, TreeMap } import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit.MILLISECONDS import akka.event.Logging.{ Debug, Warning, Error } @@ -16,6 +15,7 @@ import java.io.{ NotSerializableException, ObjectOutputStream } import akka.serialization.SerializationExtension import akka.util.NonFatal import akka.event.Logging.LogEventException +import collection.immutable.{ TreeSet, Stack, TreeMap } //TODO: everything here for current compatibility - could be limited more @@ -187,6 +187,8 @@ private[akka] object ActorCell { final val behaviorStackPlaceHolder: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior) + final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty + sealed trait SuspendReason case object UserRequest extends SuspendReason case class Recreation(cause: Throwable) extends SuspendReason @@ -407,16 +409,14 @@ private[akka] class ActorCell( actor.asInstanceOf[InternalActorRef].stop() } - var currentMessage: Envelope = null - + var currentMessage: Envelope = _ var actor: Actor = _ - private var behaviorStack: Stack[Actor.Receive] = Stack.empty - @volatile //This must be volatile since it isn't protected by the mailbox status var mailbox: Mailbox = _ - var nextNameSequence: Long = 0 + var watching: Set[ActorRef] = emptyActorRefSet + var watchedBy: Set[ActorRef] = emptyActorRefSet //Not thread safe, so should only be used inside the actor that inhabits this ActorCell final protected def randomName(): String = { @@ -462,13 +462,25 @@ private[akka] class ActorCell( override final def watch(subject: ActorRef): ActorRef = { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - dispatcher.systemDispatch(this, Link(subject)) + subject match { + case a: InternalActorRef ⇒ + if (!watching.contains(a)) { + watching += a + a.sendSystemMessage(Watch(a, self)) + } + } subject } override final def unwatch(subject: ActorRef): ActorRef = { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - dispatcher.systemDispatch(this, Unlink(subject)) + subject match { + case a: InternalActorRef ⇒ + if (watching.contains(a)) { + watching -= a + a.sendSystemMessage(Unwatch(a, self)) + } + } subject } @@ -567,15 +579,17 @@ private[akka] class ActorCell( def resume(): Unit = if (isNormal) dispatcher resume this - def link(subject: ActorRef): Unit = if (!isTerminating) { - if (system.deathWatch.subscribe(self, subject)) { - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject)) + def addWatcher(watcher: ActorRef): Unit = if (!isTerminating) { + if (!watchedBy.contains(watcher)) { + watchedBy += watcher + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " watched by " + watcher)) } } - def unlink(subject: ActorRef): Unit = if (!isTerminating) { - if (system.deathWatch.unsubscribe(self, subject)) { - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject)) + def remWatcher(watcher: ActorRef): Unit = if (!isTerminating) { + if (watchedBy.contains(watcher)) { + watchedBy -= watcher + if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " unwatched by " + watcher)) } } @@ -603,15 +617,17 @@ private[akka] class ActorCell( try { message match { - case Create() ⇒ create() - case Recreate(cause) ⇒ recreate(cause) - case Link(subject) ⇒ link(subject) - case Unlink(subject) ⇒ unlink(subject) - case Suspend() ⇒ suspend() - case Resume() ⇒ resume() - case Terminate() ⇒ terminate() - case Supervise(child) ⇒ supervise(child) - case ChildTerminated(child) ⇒ handleChildTerminated(child) + case Create() ⇒ create() + case Recreate(cause) ⇒ recreate(cause) + case Watch(`self`, watcher) ⇒ addWatcher(watcher) + case Watch(watchee, `self`) ⇒ watch(watchee) + case Unwatch(`self`, watcher) ⇒ remWatcher(watcher) + case Unwatch(watchee, `self`) ⇒ unwatch(watchee) + case Suspend() ⇒ suspend() + case Resume() ⇒ resume() + case Terminate() ⇒ terminate() + case Supervise(child) ⇒ supervise(child) + case ChildTerminated(child) ⇒ handleChildTerminated(child) } } catch { case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) @@ -698,7 +714,23 @@ private[akka] class ActorCell( } finally { try { parent.sendSystemMessage(ChildTerminated(self)) - system.deathWatch.publish(Terminated(self)) + if (!watchedBy.isEmpty) { + val terminated = Terminated(self)(stopped = true) + watchedBy foreach { + watcher ⇒ + try watcher.tell(terminated) catch { + case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) + } + } + } + if (!watching.isEmpty) { + watching foreach { + watchee ⇒ + try watchee.tell(Unwatch(watchee, self)) catch { + case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) + } + } + } if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped")) } finally { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 41473e7f7c..ca971de40e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -38,11 +38,6 @@ trait ActorRefProvider { */ def deadLetters: ActorRef - /** - * Reference to the death watch service. - */ - def deathWatch: DeathWatch - /** * The root path for all actors within this actor system, including remote * address if enabled. @@ -162,10 +157,11 @@ trait ActorRefFactory { * INTERNAL USE ONLY */ protected def provider: ActorRefProvider + /** - * INTERNAL USE ONLY + * Returns the default MessageDispatcher associated with this ActorRefFactory */ - protected def dispatcher: MessageDispatcher + implicit def dispatcher: MessageDispatcher /** * Father of all children created by this interface. @@ -339,8 +335,6 @@ class LocalActorRefProvider( override val deadLetters: InternalActorRef = new DeadLetterActorRef(this, rootPath / "deadLetters", eventStream) - override val deathWatch: DeathWatch = new LocalDeathWatch(1024) //TODO make configrable - /* * generate name for temporary actor refs */ @@ -516,8 +510,8 @@ class LocalActorRefProvider( def init(_system: ActorSystemImpl) { system = _system // chain death watchers so that killing guardian stops the application - deathWatch.subscribe(systemGuardian, guardian) - deathWatch.subscribe(rootGuardian, systemGuardian) + guardian.sendSystemMessage(Watch(systemGuardian, guardian)) + rootGuardian.sendSystemMessage(Watch(rootGuardian, systemGuardian)) eventStream.startDefaultLoggers(_system) } @@ -566,19 +560,3 @@ class LocalActorRefProvider( def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None } - -class LocalDeathWatch(val mapSize: Int) extends DeathWatch with ActorClassification { - - override def publish(event: Event): Unit = { - val monitors = dissociate(classify(event)) - if (monitors.nonEmpty) monitors.foreach(_ ! event) - } - - override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = { - if (!super.subscribe(subscriber, to)) { - subscriber ! Terminated(to) - false - } else true - } -} - diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index c5595212c2..94ee24336a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -408,11 +408,6 @@ abstract class ExtendedActorSystem extends ActorSystem { */ def systemGuardian: InternalActorRef - /** - * Implementation of the mechanism which is used for watch()/unwatch(). - */ - def deathWatch: DeathWatch - /** * A ThreadFactory that can be used if the transport needs to create any Threads */ @@ -570,7 +565,6 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def lookupRoot: InternalActorRef = provider.rootGuardian def guardian: InternalActorRef = provider.guardian def systemGuardian: InternalActorRef = provider.systemGuardian - def deathWatch: DeathWatch = provider.deathWatch def /(actorName: String): ActorPath = guardian.path / actorName def /(path: Iterable[String]): ActorPath = guardian.path / path diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 682e6ba4bf..8e160276e8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -102,11 +102,11 @@ private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage /** * INTERNAL API */ -private[akka] case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.watch +private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to self from ActorCell.watch /** * INTERNAL API */ -private[akka] case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch +private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Runnable { def run(): Unit = diff --git a/akka-actor/src/main/scala/akka/event/DeathWatch.scala b/akka-actor/src/main/scala/akka/event/DeathWatch.scala deleted file mode 100644 index 8bf6935619..0000000000 --- a/akka-actor/src/main/scala/akka/event/DeathWatch.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2012 Typesafe Inc. - */ - -package akka.event - -import akka.actor._ - -/** - * The contract of DeathWatch is not properly expressed using the type system - * Whenever there is a publish, all listeners to the Terminated Actor should be atomically removed - * A failed subscribe should also only mean that the Classifier (ActorRef) that is listened to is already shut down - * See LocalDeathWatch for semantics - */ -abstract class DeathWatch extends ActorEventBus with ActorClassifier { - type Event = Terminated - - protected final def classify(event: Event): Classifier = event.actor -} diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index a20baaf533..634299248d 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -4,12 +4,11 @@ package akka.pattern import java.util.concurrent.TimeoutException -import akka.dispatch.{ Promise, Terminate, SystemMessage, Future } -import akka.event.DeathWatch import akka.util.Timeout import annotation.tailrec import akka.util.Unsafe import akka.actor._ +import akka.dispatch._ /** * This is what is used to complete a Future that is returned from an ask/? call, @@ -229,9 +228,14 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide if (!completedJustNow) provider.deadLetters ! message } - override def sendSystemMessage(message: SystemMessage): Unit = message match { - case _: Terminate ⇒ stop() - case _ ⇒ + override def sendSystemMessage(message: SystemMessage): Unit = { + val self = this + message match { + case _: Terminate ⇒ stop() + case Watch(`self`, watcher) ⇒ //FIXME IMPLEMENT + case Unwatch(`self`, watcher) ⇒ //FIXME IMPLEMENT + case _ ⇒ + } } override def isTerminated: Boolean = state match { @@ -241,23 +245,22 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide @tailrec override def stop(): Unit = { - def ensurePromiseCompleted(): Unit = - if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped"))) + def ensureCompleted(): Unit = if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped"))) state match { - case null ⇒ - // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either - if (updateState(null, Stopped)) ensurePromiseCompleted() - else stop() + case null ⇒ // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either + if (updateState(null, Stopped)) ensureCompleted() else stop() case p: ActorPath ⇒ if (updateState(p, StoppedWithPath(p))) { try { - ensurePromiseCompleted() - provider.deathWatch.publish(Terminated(this)) + ensureCompleted() + val termination = Terminated(this)(stopped = true) + // watchedBy foreach { w => w.tell(termination) } + // watching foreach { w.sendSystemMessage(Unwatch(w, self)) } } finally { provider.unregisterTempActor(p) } } else stop() - case Stopped | _: StoppedWithPath ⇒ + case Stopped | _: StoppedWithPath ⇒ // already stopped case Registering ⇒ stop() // spin until registration is completed before stopping } } diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index adcbe53f0b..d1e7fab327 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -4,9 +4,9 @@ package akka.pattern -import akka.dispatch.{ Promise, Future } import akka.actor._ import akka.util.{ Timeout, Duration } +import akka.dispatch.{ Unwatch, Watch, Promise, Future } trait GracefulStopSupport { /** @@ -39,11 +39,11 @@ trait GracefulStopSupport { } else system match { case e: ExtendedActorSystem ⇒ val ref = PromiseActorRef(e.provider, Timeout(timeout)) - e.deathWatch.subscribe(ref, target) + ref.sendSystemMessage(Watch(target, ref)) ref.result onComplete { case Right(Terminated(`target`)) ⇒ () // Ignore - case _ ⇒ e.deathWatch.unsubscribe(ref, target) - } // Just making sure we're not leaking here + case _ ⇒ ref.sendSystemMessage(Unwatch(target, ref)) // Just making sure we're not leaking here + } target ! PoisonPill ref.result map { case Terminated(`target`) ⇒ true } case s ⇒ throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'") diff --git a/akka-docs/java/code/docs/actor/FaultHandlingTestBase.java b/akka-docs/java/code/docs/actor/FaultHandlingTestBase.java index bcc4705948..2d40071fe8 100644 --- a/akka-docs/java/code/docs/actor/FaultHandlingTestBase.java +++ b/akka-docs/java/code/docs/actor/FaultHandlingTestBase.java @@ -182,7 +182,7 @@ public class FaultHandlingTestBase { final TestProbe probe = new TestProbe(system); probe.watch(child); child.tell(new IllegalArgumentException()); - probe.expectMsg(new Terminated(child)); + probe.expectMsg(new Terminated(child, true)); //#stop //#escalate-kill @@ -190,7 +190,7 @@ public class FaultHandlingTestBase { probe.watch(child); assert Await.result(ask(child, "get", 5000), timeout).equals(0); child.tell(new Exception()); - probe.expectMsg(new Terminated(child)); + probe.expectMsg(new Terminated(child, true)); //#escalate-kill //#escalate-restart diff --git a/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala b/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala index 8ce16f1021..4e0fdc5ee5 100644 --- a/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala +++ b/akka-docs/scala/code/docs/actor/FaultHandlingDocSpec.scala @@ -111,7 +111,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { //#stop watch(child) // have testActor watch “child” child ! new IllegalArgumentException // break it - expectMsg(Terminated(child)) + expectMsg(Terminated(child)(stopped = true)) child.isTerminated must be(true) //#stop } @@ -125,7 +125,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender { expectMsg(0) child2 ! new Exception("CRASH") // escalate failure - expectMsg(Terminated(child2)) + expectMsg(Terminated(child2)(stopped = true)) //#escalate-kill //#escalate-restart val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index a12c5f5578..eaecf67792 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -6,7 +6,7 @@ package akka.remote import akka.actor._ import akka.dispatch._ -import akka.event.{ DeathWatch, Logging, LoggingAdapter } +import akka.event.{ Logging, LoggingAdapter } import akka.event.EventStream import akka.serialization.Serialization import akka.serialization.SerializationExtension @@ -34,8 +34,6 @@ private[akka] class RemoteActorRefProvider( override def rootPath: ActorPath = local.rootPath override def deadLetters: InternalActorRef = local.deadLetters - override val deathWatch: DeathWatch = new RemoteDeathWatch(local.deathWatch, this) - // these are only available after init() override def rootGuardian: InternalActorRef = local.rootGuardian override def guardian: InternalActorRef = local.guardian @@ -246,25 +244,4 @@ private[akka] class RemoteActorRef private[akka] ( @throws(classOf[java.io.ObjectStreamException]) private def writeReplace(): AnyRef = SerializedActorRef(path) -} - -private[akka] class RemoteDeathWatch(val local: DeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch { - - override def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match { - case r: RemoteRef ⇒ - val ret = local.subscribe(watcher, watched) - provider.actorFor(r.path.root / "remote") ! DaemonMsgWatch(watcher, watched) - ret - case l: LocalRef ⇒ - local.subscribe(watcher, watched) - case _ ⇒ - provider.log.error("unknown ActorRef type {} as DeathWatch target", watched.getClass) - false - } - - override def unsubscribe(watcher: ActorRef, watched: ActorRef): Boolean = local.unsubscribe(watcher, watched) - - override def unsubscribe(watcher: ActorRef): Unit = local.unsubscribe(watcher) - - override def publish(event: Terminated): Unit = local.publish(event) -} +} \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 7e4beecc7d..1e81cfaac6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -8,6 +8,7 @@ import scala.annotation.tailrec import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor } import akka.event.LoggingAdapter +import akka.dispatch.Watch private[akka] sealed trait DaemonMsg private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg @@ -62,18 +63,19 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], path, false, Some(deploy), true) addChild(subpath.mkString("/"), actor) - system.deathWatch.subscribe(this, actor) + this.sendSystemMessage(Watch(actor, this)) case _ ⇒ log.error("remote path does not match path from message [{}]", message) } case DaemonMsgWatch(watcher, watched) ⇒ - val other = system.actorFor(watcher.path.root / "remote") - system.deathWatch.subscribe(other, watched) + system.actorFor(watcher.path.root / "remote") match { + case a: InternalActorRef ⇒ a.sendSystemMessage(Watch(watched, a)) + } } case Terminated(child: LocalActorRef) ⇒ removeChild(child.path.elements.drop(1).mkString("/")) - case t: Terminated ⇒ system.deathWatch.publish(t) + case t: Terminated ⇒ //FIXME system.deathWatch.publish(t) case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) }