diff --git a/akka-actor/src/main/java/akka/pattern/AbstractPromiseActorRef.java b/akka-actor/src/main/java/akka/pattern/AbstractPromiseActorRef.java index e21d58204e..bb0f03861c 100644 --- a/akka-actor/src/main/java/akka/pattern/AbstractPromiseActorRef.java +++ b/akka-actor/src/main/java/akka/pattern/AbstractPromiseActorRef.java @@ -8,10 +8,12 @@ import akka.util.Unsafe; final class AbstractPromiseActorRef { final static long stateOffset; + final static long watchedByOffset; static { try { stateOffset = Unsafe.instance.objectFieldOffset(PromiseActorRef.class.getDeclaredField("_stateDoNotCallMeDirectly")); + watchedByOffset = Unsafe.instance.objectFieldOffset(PromiseActorRef.class.getDeclaredField("_watchedByDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index ca971de40e..d4e9595f62 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -8,8 +8,8 @@ import java.util.concurrent.atomic.AtomicLong import akka.dispatch._ import akka.routing._ import akka.AkkaException -import akka.util.{ Switch, Helpers } import akka.event._ +import akka.util.{ NonFatal, Switch, Helpers } /** * Interface for all ActorRef providers to implement. @@ -373,9 +373,9 @@ class LocalActorRefProvider( override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { message match { - case Supervise(child) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead - case ChildTerminated(child) ⇒ stop() - case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") + case Supervise(_) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead + case ChildTerminated(_) ⇒ stop() + case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") } } } @@ -403,8 +403,8 @@ class LocalActorRefProvider( def receive = { case Terminated(_) ⇒ context.stop(self) - case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e }) // FIXME shouldn't this use NonFatal & Status.Failure? - case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case e: Exception ⇒ e }) // FIXME shouldn't this use NonFatal & Status.Failure? + case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case NonFatal(e) ⇒ e }) // FIXME shouldn't this use NonFatal & Status.Failure? + case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case NonFatal(e) ⇒ e }) // FIXME shouldn't this use NonFatal & Status.Failure? case StopChild(child) ⇒ context.stop(child); sender ! "ok" case m ⇒ deadLetters ! DeadLetter(m, sender, self) } diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 2837bd6546..47154f7853 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -4,11 +4,10 @@ package akka.pattern import java.util.concurrent.TimeoutException -import akka.util.Timeout import annotation.tailrec -import akka.util.Unsafe import akka.actor._ import akka.dispatch._ +import akka.util.{ NonFatal, Timeout, Unsafe } /** * This is what is used to complete a Future that is returned from an ask/? call, @@ -163,6 +162,7 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide extends MinimalActorRef { import PromiseActorRef._ import AbstractPromiseActorRef.stateOffset + import AbstractPromiseActorRef.watchedByOffset /** * As an optimization for the common (local) case we only register this PromiseActorRef @@ -179,14 +179,43 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide @volatile private[this] var _stateDoNotCallMeDirectly: AnyRef = _ - @inline - private def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset) + @volatile + private[this] var _watchedByDoNotCallMeDirectly: Set[ActorRef] = ActorCell.emptyActorRefSet @inline - private def updateState(oldState: AnyRef, newState: AnyRef): Boolean = Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState) + private[this] def watchedBy: Set[ActorRef] = Unsafe.instance.getObjectVolatile(this, watchedByOffset).asInstanceOf[Set[ActorRef]] @inline - private def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState) + private[this] def updateWatchedBy(oldWatchedBy: Set[ActorRef], newWatchedBy: Set[ActorRef]): Boolean = + Unsafe.instance.compareAndSwapObject(this, watchedByOffset, oldWatchedBy, newWatchedBy) + + @tailrec // Returns false if the Promise is already completed + private[this] final def addWatcher(watcher: ActorRef): Boolean = watchedBy match { + case null ⇒ false + case other ⇒ if (updateWatchedBy(other, other + watcher)) true else addWatcher(watcher) + } + + @tailrec + private[this] final def remWatcher(watcher: ActorRef): Unit = watchedBy match { + case null ⇒ () + case other ⇒ if (!updateWatchedBy(other, other - watcher)) remWatcher(watcher) + } + + @tailrec + private[this] final def clearWatchers(): Set[ActorRef] = watchedBy match { + case null ⇒ ActorCell.emptyActorRefSet + case other ⇒ if (!updateWatchedBy(other, null)) clearWatchers() else other + } + + @inline + private[this] def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset) + + @inline + private[this] def updateState(oldState: AnyRef, newState: AnyRef): Boolean = + Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState) + + @inline + private[this] def setState(newState: AnyRef): Unit = Unsafe.instance.putObjectVolatile(this, stateOffset, newState) override def getParent: InternalActorRef = provider.tempContainer @@ -230,8 +259,8 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide override def sendSystemMessage(message: SystemMessage): Unit = message match { case _: Terminate ⇒ stop() - case Watch(watchee, watcher) ⇒ //FIXME IMPLEMENT - case Unwatch(watchee, watcher) ⇒ //FIXME IMPLEMENT + case Watch(watchee, watcher) ⇒ if (watchee == this && watcher != this && !addWatcher(watcher)) watcher ! Terminated(watchee)(stopped = true) + case Unwatch(watchee, watcher) ⇒ if (watchee == this && watcher != this) remWatcher(watcher) case _ ⇒ } @@ -242,20 +271,20 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide @tailrec override def stop(): Unit = { - def ensureCompleted(): Unit = if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped"))) + def ensureCompleted(): Unit = { + if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped"))) + val watchers = clearWatchers() + if (!watchers.isEmpty) { + val termination = Terminated(this)(stopped = true) + watchers foreach { w ⇒ try w.tell(termination, this) catch { case NonFatal(t) ⇒ /* FIXME LOG THIS */ } } + } + } state match { case null ⇒ // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either if (updateState(null, Stopped)) ensureCompleted() else stop() case p: ActorPath ⇒ if (updateState(p, StoppedWithPath(p))) { - try { - ensureCompleted() - val termination = Terminated(this)(stopped = true) - // FIXME watchedBy foreach { w => w.tell(termination) } - // FIXME watching foreach { w.sendSystemMessage(Unwatch(w, self)) } - } finally { - provider.unregisterTempActor(p) - } + try ensureCompleted() finally provider.unregisterTempActor(p) } else stop() case Stopped | _: StoppedWithPath ⇒ // already stopped case Registering ⇒ stop() // spin until registration is completed before stopping diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index d1e7fab327..35004e637d 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -34,18 +34,18 @@ trait GracefulStopSupport { * is completed with failure [[akka.pattern.AskTimeoutException]]. */ def gracefulStop(target: ActorRef, timeout: Duration)(implicit system: ActorSystem): Future[Boolean] = { - if (target.isTerminated) { - Promise.successful(true) - } else system match { + if (target.isTerminated) Promise.successful(true) + else system match { case e: ExtendedActorSystem ⇒ + val internalTarget = target.asInstanceOf[InternalActorRef] val ref = PromiseActorRef(e.provider, Timeout(timeout)) - ref.sendSystemMessage(Watch(target, ref)) - ref.result onComplete { - case Right(Terminated(`target`)) ⇒ () // Ignore - case _ ⇒ ref.sendSystemMessage(Unwatch(target, ref)) // Just making sure we're not leaking here + internalTarget.sendSystemMessage(Watch(target, ref)) + val result = ref.result map { + case Terminated(`target`) ⇒ true + case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, ref)); false // Just making sure we're not leaking here } target ! PoisonPill - ref.result map { case Terminated(`target`) ⇒ true } + result case s ⇒ throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'") } }