diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 0c48335cce..f16403da4a 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -4,13 +4,12 @@ package akka.pattern import java.util.concurrent.TimeoutException -import akka.actor.{ Terminated, Status, MinimalActorRef, InternalActorRef, ActorRef, ActorPath } import akka.dispatch.{ Promise, Terminate, SystemMessage, Future } import akka.event.DeathWatch -import akka.actor.ActorRefProvider import akka.util.Timeout import annotation.tailrec import akka.util.Unsafe +import akka.actor._ /** * This is what is used to complete a Future that is returned from an ask/? call, @@ -165,64 +164,70 @@ private[akka] final class PromiseActorRef private ( val result: Promise[Any], val deathWatch: DeathWatch) extends MinimalActorRef { import PromiseActorRef._ + import AbstractPromiseActorRef.stateOffset /** * As an optimization for the common (local) case we only register this PromiseActorRef * with the provider when the `path` member is actually queried, which happens during - * serialization (but also during a simple `toString`!). + * serialization (but also during a simple call to `toString`, `equals` or `hashCode`!). * * Defined states: - * null => started, path not yet created - * Registering => currently creating temp path and registering it - * path: ActorPath => path is available and was registered - * RefStopped(path) => stopped, path available - * Stopped => stopped, path not yet created + * null => started, path not yet created + * Registering => currently creating temp path and registering it + * path: ActorPath => path is available and was registered + * StoppedWithPath(path) => stopped, path available + * Stopped => stopped, path not yet created */ @volatile private var _stateDoNotCallMeDirectly: AnyRef = _ @inline - final def state: AnyRef = Unsafe.instance.getObjectVolatile(this, AbstractPromiseActorRef.stateOffset) + private def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset) + + @inline + private def updateState(oldState: AnyRef, newState: AnyRef): Boolean = + Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState) + + @inline + private def setState(newState: AnyRef) = + Unsafe.instance.putObjectVolatile(this, stateOffset, newState) /** * Contract of this method: - * Must always return a valid path string, which must have + * Must always return the same ActorPath, which must have * been registered if we haven't been stopped yet. */ @tailrec def path: ActorPath = state match { case null ⇒ - if (Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, null, Registering)) { + if (updateState(null, Registering)) { var p: ActorPath = null try { p = provider.tempPath() provider.registerTempActor(this, p) p - } finally { - Unsafe.instance.putObjectVolatile(this, AbstractPromiseActorRef.stateOffset, p) - } + } finally { setState(p) } } else path - case p: ActorPath ⇒ p - case x: RefStopped ⇒ x.path + case p: ActorPath ⇒ p + case StoppedWithPath(p) ⇒ p case Stopped ⇒ // even if we are already stopped we still need to produce a proper path - Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, - Stopped, new RefStopped(provider.tempPath())) + updateState(Stopped, StoppedWithPath(provider.tempPath())) path case Registering ⇒ path // spin until registration is completed - case x ⇒ throw new IllegalStateException } override def !(message: Any)(implicit sender: ActorRef = null): Unit = state match { - case Stopped ⇒ - case _: RefStopped ⇒ - case _ ⇒ result.tryComplete { - message match { - case Status.Success(r) ⇒ Right(r) - case Status.Failure(f) ⇒ Left(f) - case other ⇒ Right(other) + case Stopped | _: StoppedWithPath ⇒ provider.deadLetters ! message + case _ ⇒ + val completedJustNow = result.tryComplete { + message match { + case Status.Success(r) ⇒ Right(r) + case Status.Failure(f) ⇒ Left(f) + case other ⇒ Right(other) + } } - } + if (!completedJustNow) provider.deadLetters ! message } override def sendSystemMessage(message: SystemMessage): Unit = message match { @@ -230,33 +235,39 @@ private[akka] final class PromiseActorRef private ( case _ ⇒ } - override def isTerminated = result.isCompleted + override def isTerminated = state match { + case Stopped | _: StoppedWithPath ⇒ true + case _ ⇒ false + } @tailrec - override def stop(): Unit = state match { - case null ⇒ - // if path was never called nobody can possibly be watching us, so we don't have to publish termination either - if (!Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, null, Stopped)) stop() - case p: ActorPath ⇒ - if (Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, p, new RefStopped(p))) { - try { - deathWatch.publish(Terminated(this)) - } finally { - provider.unregisterTempActor(p) - } - } else stop() - case Registering ⇒ stop() // spin until registration is completed before stopping - case Stopped ⇒ - case _: RefStopped ⇒ - case x ⇒ throw new IllegalStateException + override def stop(): Unit = { + def ensurePromiseCompleted(): Unit = + if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped"))) + state match { + case null ⇒ + // if path was never queried nobody can possibly be watching us, so we don't have to publish termination either + if (updateState(null, Stopped)) ensurePromiseCompleted() + else stop() + case p: ActorPath ⇒ + if (updateState(p, StoppedWithPath(p))) { + try { + deathWatch.publish(Terminated(this)) + ensurePromiseCompleted() + } finally { + provider.unregisterTempActor(p) + } + } else stop() + case Stopped | _: StoppedWithPath ⇒ + case Registering ⇒ stop() // spin until registration is completed before stopping + } } } private[akka] object PromiseActorRef { - private val Registering = new AnyRef - private val Stopped = new AnyRef - - private final class RefStopped(val path: ActorPath) + private case object Registering + private case object Stopped + private case class StoppedWithPath(path: ActorPath) def apply(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = { val result = Promise[Any]()(provider.dispatcher)