Implementing death watch for PromiseActorRef

This commit is contained in:
Viktor Klang 2012-05-30 13:24:38 +02:00
parent a413a9394b
commit 8bdb870246
4 changed files with 62 additions and 31 deletions

View file

@ -8,10 +8,12 @@ import akka.util.Unsafe;
final class AbstractPromiseActorRef {
final static long stateOffset;
final static long watchedByOffset;
static {
try {
stateOffset = Unsafe.instance.objectFieldOffset(PromiseActorRef.class.getDeclaredField("_stateDoNotCallMeDirectly"));
watchedByOffset = Unsafe.instance.objectFieldOffset(PromiseActorRef.class.getDeclaredField("_watchedByDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}

View file

@ -8,8 +8,8 @@ import java.util.concurrent.atomic.AtomicLong
import akka.dispatch._
import akka.routing._
import akka.AkkaException
import akka.util.{ Switch, Helpers }
import akka.event._
import akka.util.{ NonFatal, Switch, Helpers }
/**
* Interface for all ActorRef providers to implement.
@ -373,8 +373,8 @@ class LocalActorRefProvider(
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
message match {
case Supervise(child) // TODO register child in some map to keep track of it and enable shutdown after all dead
case ChildTerminated(child) stop()
case Supervise(_) // TODO register child in some map to keep track of it and enable shutdown after all dead
case ChildTerminated(_) stop()
case _ log.error(this + " received unexpected system message [" + message + "]")
}
}
@ -403,8 +403,8 @@ class LocalActorRefProvider(
def receive = {
case Terminated(_) context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure?
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case e: Exception e }) // FIXME shouldn't this use NonFatal & Status.Failure?
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case NonFatal(e) e }) // FIXME shouldn't this use NonFatal & Status.Failure?
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case NonFatal(e) e }) // FIXME shouldn't this use NonFatal & Status.Failure?
case StopChild(child) context.stop(child); sender ! "ok"
case m deadLetters ! DeadLetter(m, sender, self)
}

View file

@ -4,11 +4,10 @@
package akka.pattern
import java.util.concurrent.TimeoutException
import akka.util.Timeout
import annotation.tailrec
import akka.util.Unsafe
import akka.actor._
import akka.dispatch._
import akka.util.{ NonFatal, Timeout, Unsafe }
/**
* This is what is used to complete a Future that is returned from an ask/? call,
@ -163,6 +162,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
extends MinimalActorRef {
import PromiseActorRef._
import AbstractPromiseActorRef.stateOffset
import AbstractPromiseActorRef.watchedByOffset
/**
* As an optimization for the common (local) case we only register this PromiseActorRef
@ -179,14 +179,43 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
@volatile
private[this] var _stateDoNotCallMeDirectly: AnyRef = _
@inline
private def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset)
@volatile
private[this] var _watchedByDoNotCallMeDirectly: Set[ActorRef] = ActorCell.emptyActorRefSet
@inline
private def updateState(oldState: AnyRef, newState: AnyRef): Boolean = Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)
private[this] def watchedBy: Set[ActorRef] = Unsafe.instance.getObjectVolatile(this, watchedByOffset).asInstanceOf[Set[ActorRef]]
@inline
private def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState)
private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: Set[ActorRef]): Boolean =
Unsafe.instance.compareAndSwapObject(this, watchedByOffset, oldWatchedBy, newWatchedBy)
@tailrec // Returns false if the Promise is already completed
private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match {
case null false
case other if (updateWatchedBy(other, other + watcher)) true else addWatcher(watcher)
}
@tailrec
private[this] final def remWatcher(watcher: ActorRef): Unit = watchedBy match {
case null ()
case other if (!updateWatchedBy(other, other - watcher)) remWatcher(watcher)
}
@tailrec
private[this] final def clearWatchers(): Set[ActorRef] = watchedBy match {
case null ActorCell.emptyActorRefSet
case other if (!updateWatchedBy(other, null)) clearWatchers() else other
}
@inline
private[this] def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset)
@inline
private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)
@inline
private[this] def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState)
override def getParent: InternalActorRef = provider.tempContainer
@ -230,8 +259,8 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
override def sendSystemMessage(message: SystemMessage): Unit = message match {
case _: Terminate stop()
case Watch(watchee, watcher) //FIXME IMPLEMENT
case Unwatch(watchee, watcher) //FIXME IMPLEMENT
case Watch(watchee, watcher) if (watchee == this && watcher != this && !addWatcher(watcher)) watcher ! Terminated(watchee)(stopped = true)
case Unwatch(watchee, watcher) if (watchee == this && watcher != this) remWatcher(watcher)
case _
}
@ -242,20 +271,20 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide
@tailrec
override def stop(): Unit = {
def ensureCompleted(): Unit = if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped")))
def ensureCompleted(): Unit = {
if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped")))
val watchers = clearWatchers()
if (!watchers.isEmpty) {
val termination = Terminated(this)(stopped = true)
watchers foreach { w try w.tell(termination, this) catch { case NonFatal(t) /* FIXME LOG THIS */ } }
}
}
state match {
case null // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
if (updateState(null, Stopped)) ensureCompleted() else stop()
case p: ActorPath
if (updateState(p, StoppedWithPath(p))) {
try {
ensureCompleted()
val termination = Terminated(this)(stopped = true)
// FIXME watchedBy foreach { w => w.tell(termination) }
// FIXME watching foreach { w.sendSystemMessage(Unwatch(w, self)) }
} finally {
provider.unregisterTempActor(p)
}
try ensureCompleted() finally provider.unregisterTempActor(p)
} else stop()
case Stopped | _: StoppedWithPath // already stopped
case Registering stop() // spin until registration is completed before stopping

View file

@ -34,18 +34,18 @@ trait GracefulStopSupport {
* is completed with failure [[akka.pattern.AskTimeoutException]].
*/
def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = {
if (target.isTerminated) {
Promise.successful(true)
} else system match {
if (target.isTerminated) Promise.successful(true)
else system match {
case e: ExtendedActorSystem
val internalTarget = target.asInstanceOf[InternalActorRef]
val ref = PromiseActorRef(e.provider, Timeout(timeout))
ref.sendSystemMessage(Watch(target, ref))
ref.result onComplete {
case Right(Terminated(`target`)) () // Ignore
case _ ref.sendSystemMessage(Unwatch(target, ref)) // Just making sure we're not leaking here
internalTarget.sendSystemMessage(Watch(target, ref))
val result = ref.result map {
case Terminated(`target`) true
case _ internalTarget.sendSystemMessage(Unwatch(target, ref)); false // Just making sure we're not leaking here
}
target ! PoisonPill
ref.result map { case Terminated(`target`) true }
result
case s throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'")
}
}