Initial stab at DeathWatch 2.0, tests don't pass just yet

This commit is contained in:
Viktor Klang 2012-05-28 16:49:49 +02:00
parent 2e8d6a8458
commit 96f264e842
13 changed files with 99 additions and 132 deletions

View file

@ -73,7 +73,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
watch(router)
watch(c2)
system.stop(c2)
expectMsg(Terminated(c2))
expectMsg(Terminated(c2)(stopped = true))
// it might take a while until the Router has actually processed the Terminated message
awaitCond {
router ! ""
@ -84,7 +84,7 @@ class RoutingSpec extends AkkaSpec(RoutingSpec.config) with DefaultTimeout with
res == Seq(c1, c1)
}
system.stop(c1)
expectMsg(Terminated(router))
expectMsg(Terminated(router)(stopped = true))
}
"be able to send their routees" in {

View file

@ -59,7 +59,7 @@ case object Kill extends Kill {
/**
* When Death Watch is used, the watcher will receive a Terminated(watched) message when watched is terminated.
*/
case class Terminated(@BeanProperty actor: ActorRef) extends PossiblyHarmful
case class Terminated(@BeanProperty actor: ActorRef)(@BeanProperty stopped: Boolean) extends PossiblyHarmful
abstract class ReceiveTimeout extends PossiblyHarmful

View file

@ -6,7 +6,6 @@ package akka.actor
import akka.dispatch._
import scala.annotation.tailrec
import scala.collection.immutable.{ Stack, TreeMap }
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.event.Logging.{ Debug, Warning, Error }
@ -16,6 +15,7 @@ import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension
import akka.util.NonFatal
import akka.event.Logging.LogEventException
import collection.immutable.{ TreeSet, Stack, TreeMap }
//TODO: everything here for current compatibility - could be limited more
@ -187,6 +187,8 @@ private[akka] object ActorCell {
final val behaviorStackPlaceHolder: Stack[Actor.Receive] = Stack.empty.push(Actor.emptyBehavior)
final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty
sealed trait SuspendReason
case object UserRequest extends SuspendReason
case class Recreation(cause: Throwable) extends SuspendReason
@ -407,16 +409,14 @@ private[akka] class ActorCell(
actor.asInstanceOf[InternalActorRef].stop()
}
var currentMessage: Envelope = null
var currentMessage: Envelope = _
var actor: Actor = _
private var behaviorStack: Stack[Actor.Receive] = Stack.empty
@volatile //This must be volatile since it isn't protected by the mailbox status
var mailbox: Mailbox = _
var nextNameSequence: Long = 0
var watching: Set[ActorRef] = emptyActorRefSet
var watchedBy: Set[ActorRef] = emptyActorRefSet
//Not thread safe, so should only be used inside the actor that inhabits this ActorCell
final protected def randomName(): String = {
@ -462,13 +462,25 @@ private[akka] class ActorCell(
override final def watch(subject: ActorRef): ActorRef = {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
dispatcher.systemDispatch(this, Link(subject))
subject match {
case a: InternalActorRef
if (!watching.contains(a)) {
watching += a
a.sendSystemMessage(Watch(a, self))
}
}
subject
}
override final def unwatch(subject: ActorRef): ActorRef = {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
dispatcher.systemDispatch(this, Unlink(subject))
subject match {
case a: InternalActorRef
if (watching.contains(a)) {
watching -= a
a.sendSystemMessage(Unwatch(a, self))
}
}
subject
}
@ -567,15 +579,17 @@ private[akka] class ActorCell(
def resume(): Unit = if (isNormal) dispatcher resume this
def link(subject: ActorRef): Unit = if (!isTerminating) {
if (system.deathWatch.subscribe(self, subject)) {
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + subject))
def addWatcher(watcher: ActorRef): Unit = if (!isTerminating) {
if (!watchedBy.contains(watcher)) {
watchedBy += watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " watched by " + watcher))
}
}
def unlink(subject: ActorRef): Unit = if (!isTerminating) {
if (system.deathWatch.unsubscribe(self, subject)) {
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + subject))
def remWatcher(watcher: ActorRef): Unit = if (!isTerminating) {
if (watchedBy.contains(watcher)) {
watchedBy -= watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), self + " unwatched by " + watcher))
}
}
@ -603,15 +617,17 @@ private[akka] class ActorCell(
try {
message match {
case Create() create()
case Recreate(cause) recreate(cause)
case Link(subject) link(subject)
case Unlink(subject) unlink(subject)
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
case Create() create()
case Recreate(cause) recreate(cause)
case Watch(`self`, watcher) addWatcher(watcher)
case Watch(watchee, `self`) watch(watchee)
case Unwatch(`self`, watcher) remWatcher(watcher)
case Unwatch(watchee, `self`) unwatch(watchee)
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
@ -698,7 +714,23 @@ private[akka] class ActorCell(
} finally {
try {
parent.sendSystemMessage(ChildTerminated(self))
system.deathWatch.publish(Terminated(self))
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(stopped = true)
watchedBy foreach {
watcher
try watcher.tell(terminated) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
}
if (!watching.isEmpty) {
watching foreach {
watchee
try watchee.tell(Unwatch(watchee, self)) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
}
if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped"))
} finally {

View file

@ -38,11 +38,6 @@ trait ActorRefProvider {
*/
def deadLetters: ActorRef
/**
* Reference to the death watch service.
*/
def deathWatch: DeathWatch
/**
* The root path for all actors within this actor system, including remote
* address if enabled.
@ -162,10 +157,11 @@ trait ActorRefFactory {
* INTERNAL USE ONLY
*/
protected def provider: ActorRefProvider
/**
* INTERNAL USE ONLY
* Returns the default MessageDispatcher associated with this ActorRefFactory
*/
protected def dispatcher: MessageDispatcher
implicit def dispatcher: MessageDispatcher
/**
* Father of all children created by this interface.
@ -339,8 +335,6 @@ class LocalActorRefProvider(
override val deadLetters: InternalActorRef = new DeadLetterActorRef(this, rootPath / "deadLetters", eventStream)
override val deathWatch: DeathWatch = new LocalDeathWatch(1024) //TODO make configrable
/*
* generate name for temporary actor refs
*/
@ -516,8 +510,8 @@ class LocalActorRefProvider(
def init(_system: ActorSystemImpl) {
system = _system
// chain death watchers so that killing guardian stops the application
deathWatch.subscribe(systemGuardian, guardian)
deathWatch.subscribe(rootGuardian, systemGuardian)
guardian.sendSystemMessage(Watch(systemGuardian, guardian))
rootGuardian.sendSystemMessage(Watch(rootGuardian, systemGuardian))
eventStream.startDefaultLoggers(_system)
}
@ -566,19 +560,3 @@ class LocalActorRefProvider(
def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None
}
class LocalDeathWatch(val mapSize: Int) extends DeathWatch with ActorClassification {
override def publish(event: Event): Unit = {
val monitors = dissociate(classify(event))
if (monitors.nonEmpty) monitors.foreach(_ ! event)
}
override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = {
if (!super.subscribe(subscriber, to)) {
subscriber ! Terminated(to)
false
} else true
}
}

View file

@ -408,11 +408,6 @@ abstract class ExtendedActorSystem extends ActorSystem {
*/
def systemGuardian: InternalActorRef
/**
* Implementation of the mechanism which is used for watch()/unwatch().
*/
def deathWatch: DeathWatch
/**
* A ThreadFactory that can be used if the transport needs to create any Threads
*/
@ -570,7 +565,6 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
def lookupRoot: InternalActorRef = provider.rootGuardian
def guardian: InternalActorRef = provider.guardian
def systemGuardian: InternalActorRef = provider.systemGuardian
def deathWatch: DeathWatch = provider.deathWatch
def /(actorName: String): ActorPath = guardian.path / actorName
def /(path: Iterable[String]): ActorPath = guardian.path / path

View file

@ -102,11 +102,11 @@ private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage
/**
* INTERNAL API
*/
private[akka] case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.watch
private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to self from ActorCell.watch
/**
* INTERNAL API
*/
private[akka] case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch
private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to self from ActorCell.unwatch
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Runnable {
def run(): Unit =

View file

@ -1,19 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.event
import akka.actor._
/**
* The contract of DeathWatch is not properly expressed using the type system
* Whenever there is a publish, all listeners to the Terminated Actor should be atomically removed
* A failed subscribe should also only mean that the Classifier (ActorRef) that is listened to is already shut down
* See LocalDeathWatch for semantics
*/
abstract class DeathWatch extends ActorEventBus with ActorClassifier {
type Event = Terminated
protected final def classify(event: Event): Classifier = event.actor
}

View file

@ -4,12 +4,11 @@
package akka.pattern
import java.util.concurrent.TimeoutException
import akka.dispatch.{ Promise, Terminate, SystemMessage, Future }
import akka.event.DeathWatch
import akka.util.Timeout
import annotation.tailrec
import akka.util.Unsafe
import akka.actor._
import akka.dispatch._
/**
* This is what is used to complete a Future that is returned from an ask/? call,
@ -229,9 +228,14 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
if (!completedJustNow) provider.deadLetters ! message
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case _: Terminate stop()
case _
override def sendSystemMessage(message: SystemMessage): Unit = {
val self = this
message match {
case _: Terminate stop()
case Watch(`self`, watcher) //FIXME IMPLEMENT
case Unwatch(`self`, watcher) //FIXME IMPLEMENT
case _
}
}
override def isTerminated: Boolean = state match {
@ -241,23 +245,22 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
@tailrec
override def stop(): Unit = {
def ensurePromiseCompleted(): Unit =
if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped")))
def ensureCompleted(): Unit = if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped")))
state match {
case null
// if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
if (updateState(null, Stopped)) ensurePromiseCompleted()
else stop()
case null // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
if (updateState(null, Stopped)) ensureCompleted() else stop()
case p: ActorPath
if (updateState(p, StoppedWithPath(p))) {
try {
ensurePromiseCompleted()
provider.deathWatch.publish(Terminated(this))
ensureCompleted()
val termination = Terminated(this)(stopped = true)
// watchedBy foreach { w => w.tell(termination) }
// watching foreach { w.sendSystemMessage(Unwatch(w, self)) }
} finally {
provider.unregisterTempActor(p)
}
} else stop()
case Stopped | _: StoppedWithPath
case Stopped | _: StoppedWithPath // already stopped
case Registering stop() // spin until registration is completed before stopping
}
}

View file

@ -4,9 +4,9 @@
package akka.pattern
import akka.dispatch.{ Promise, Future }
import akka.actor._
import akka.util.{ Timeout, Duration }
import akka.dispatch.{ Unwatch, Watch, Promise, Future }
trait GracefulStopSupport {
/**
@ -39,11 +39,11 @@ trait GracefulStopSupport {
} else system match {
case e: ExtendedActorSystem
val ref = PromiseActorRef(e.provider, Timeout(timeout))
e.deathWatch.subscribe(ref, target)
ref.sendSystemMessage(Watch(target, ref))
ref.result onComplete {
case Right(Terminated(`target`)) () // Ignore
case _ e.deathWatch.unsubscribe(ref, target)
} // Just making sure we're not leaking here
case _ ref.sendSystemMessage(Unwatch(target, ref)) // Just making sure we're not leaking here
}
target ! PoisonPill
ref.result map { case Terminated(`target`) true }
case s throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'")

View file

@ -182,7 +182,7 @@ public class FaultHandlingTestBase {
final TestProbe probe = new TestProbe(system);
probe.watch(child);
child.tell(new IllegalArgumentException());
probe.expectMsg(new Terminated(child));
probe.expectMsg(new Terminated(child, true));
//#stop
//#escalate-kill
@ -190,7 +190,7 @@ public class FaultHandlingTestBase {
probe.watch(child);
assert Await.result(ask(child, "get", 5000), timeout).equals(0);
child.tell(new Exception());
probe.expectMsg(new Terminated(child));
probe.expectMsg(new Terminated(child, true));
//#escalate-kill
//#escalate-restart

View file

@ -111,7 +111,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
//#stop
watch(child) // have testActor watch child
child ! new IllegalArgumentException // break it
expectMsg(Terminated(child))
expectMsg(Terminated(child)(stopped = true))
child.isTerminated must be(true)
//#stop
}
@ -125,7 +125,7 @@ class FaultHandlingDocSpec extends AkkaSpec with ImplicitSender {
expectMsg(0)
child2 ! new Exception("CRASH") // escalate failure
expectMsg(Terminated(child2))
expectMsg(Terminated(child2)(stopped = true))
//#escalate-kill
//#escalate-restart
val supervisor2 = system.actorOf(Props[Supervisor2], "supervisor2")

View file

@ -6,7 +6,7 @@ package akka.remote
import akka.actor._
import akka.dispatch._
import akka.event.{ DeathWatch, Logging, LoggingAdapter }
import akka.event.{ Logging, LoggingAdapter }
import akka.event.EventStream
import akka.serialization.Serialization
import akka.serialization.SerializationExtension
@ -34,8 +34,6 @@ private[akka] class RemoteActorRefProvider(
override def rootPath: ActorPath = local.rootPath
override def deadLetters: InternalActorRef = local.deadLetters
override val deathWatch: DeathWatch = new RemoteDeathWatch(local.deathWatch, this)
// these are only available after init()
override def rootGuardian: InternalActorRef = local.rootGuardian
override def guardian: InternalActorRef = local.guardian
@ -246,25 +244,4 @@ private[akka] class RemoteActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path)
}
private[akka] class RemoteDeathWatch(val local: DeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch {
override def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match {
case r: RemoteRef
val ret = local.subscribe(watcher, watched)
provider.actorFor(r.path.root / "remote") ! DaemonMsgWatch(watcher, watched)
ret
case l: LocalRef
local.subscribe(watcher, watched)
case _
provider.log.error("unknown ActorRef type {} as DeathWatch target", watched.getClass)
false
}
override def unsubscribe(watcher: ActorRef, watched: ActorRef): Boolean = local.unsubscribe(watcher, watched)
override def unsubscribe(watcher: ActorRef): Unit = local.unsubscribe(watcher)
override def publish(event: Terminated): Unit = local.publish(event)
}
}

View file

@ -8,6 +8,7 @@ import scala.annotation.tailrec
import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor }
import akka.event.LoggingAdapter
import akka.dispatch.Watch
private[akka] sealed trait DaemonMsg
private[akka] case class DaemonMsgCreate(props: Props, deploy: Deploy, path: String, supervisor: ActorRef) extends DaemonMsg
@ -62,18 +63,19 @@ private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
path, false, Some(deploy), true)
addChild(subpath.mkString("/"), actor)
system.deathWatch.subscribe(this, actor)
this.sendSystemMessage(Watch(actor, this))
case _
log.error("remote path does not match path from message [{}]", message)
}
case DaemonMsgWatch(watcher, watched)
val other = system.actorFor(watcher.path.root / "remote")
system.deathWatch.subscribe(other, watched)
system.actorFor(watcher.path.root / "remote") match {
case a: InternalActorRef a.sendSystemMessage(Watch(watched, a))
}
}
case Terminated(child: LocalActorRef) removeChild(child.path.elements.drop(1).mkString("/"))
case t: Terminated system.deathWatch.publish(t)
case t: Terminated //FIXME system.deathWatch.publish(t)
case unknown log.warning("Unknown message {} received by {}", unknown, this)
}