diff --git a/akka-actor/src/main/java/akka/pattern/AbstractPromiseActorRef.java b/akka-actor/src/main/java/akka/pattern/AbstractPromiseActorRef.java new file mode 100644 index 0000000000..e21d58204e --- /dev/null +++ b/akka-actor/src/main/java/akka/pattern/AbstractPromiseActorRef.java @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.pattern; + +import akka.util.Unsafe; + +final class AbstractPromiseActorRef { + final static long stateOffset; + + static { + try { + stateOffset = Unsafe.instance.objectFieldOffset(PromiseActorRef.class.getDeclaredField("_stateDoNotCallMeDirectly")); + } catch(Throwable t){ + throw new ExceptionInInitializerError(t); + } + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 4dc8e69016..0c48335cce 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -3,7 +3,6 @@ */ package akka.pattern -import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.TimeoutException import akka.actor.{ Terminated, Status, MinimalActorRef, InternalActorRef, ActorRef, ActorPath } import akka.dispatch.{ Promise, Terminate, SystemMessage, Future } @@ -11,6 +10,7 @@ import akka.event.DeathWatch import akka.actor.ActorRefProvider import akka.util.Timeout import annotation.tailrec +import akka.util.Unsafe /** * This is what is used to complete a Future that is returned from an ask/? call, @@ -81,7 +81,7 @@ trait AskSupport { actorRef.tell(message) Promise.failed(new AskTimeoutException("not asking with negative timeout"))(provider.dispatcher) } else { - val a = createAsker(provider, timeout) + val a = PromiseActorRef(provider, timeout) actorRef.tell(message, a) a.result } @@ -153,101 +153,112 @@ trait AskSupport { */ def ?(message: Any)(implicit timeout: Timeout): Future[Any] = akka.pattern.ask(actorRef, message)(timeout) } +} + +/** + * Akka private optimized representation of the temporary actor spawned to + * receive the reply to an "ask" operation. + */ +private[akka] final class PromiseActorRef private ( + val provider: ActorRefProvider, + override val getParent: InternalActorRef, + val result: Promise[Any], + val deathWatch: DeathWatch) extends MinimalActorRef { + import PromiseActorRef._ /** - * Akka private optimized representation of the temporary actor spawned to - * receive the reply to an "ask" operation. + * As an optimization for the common (local) case we only register this PromiseActorRef + * with the provider when the `path` member is actually queried, which happens during + * serialization (but also during a simple `toString`!). + * + * Defined states: + * null => started, path not yet created + * Registering => currently creating temp path and registering it + * path: ActorPath => path is available and was registered + * RefStopped(path) => stopped, path available + * Stopped => stopped, path not yet created */ - private[akka] final class PromiseActorRef( - val provider: ActorRefProvider, - override val getParent: InternalActorRef, - val result: Promise[Any], - val deathWatch: DeathWatch) extends MinimalActorRef { + @volatile + private var _stateDoNotCallMeDirectly: AnyRef = _ - /** - * As an optimization for the common (local) case we only register this PromiseActorRef - * with the provider when the `path` member is actually queried, which happens during - * serialization (but also during a simple `toString`!). - * - * Defined states: - * null => started, path not yet created - * path: ActorPath => path is available and was registered - * RefStopped(path) => stopped, path available - * 'stopped => stopped, path not yet created - * 'busy => another thread is in a state change - */ - final val state = new AtomicReference[Any] + @inline + final def state: AnyRef = Unsafe.instance.getObjectVolatile(this, AbstractPromiseActorRef.stateOffset) - /** - * Contract of this method: - * Must always return a valid path string, which must have - * been registered if we haven't been stopped yet. - */ - @tailrec - def path: ActorPath = state.get match { - case null ⇒ - if (state.compareAndSet(null, 'busy)) { - // TODO: are there any exceptions "expected" from the following two lines that we need to take care of? - val p = provider.tempPath() + /** + * Contract of this method: + * Must always return a valid path string, which must have + * been registered if we haven't been stopped yet. + */ + @tailrec + def path: ActorPath = state match { + case null ⇒ + if (Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, null, Registering)) { + var p: ActorPath = null + try { + p = provider.tempPath() provider.registerTempActor(this, p) - state.set(p) p - } else path - case p: ActorPath ⇒ p - case x: RefStopped ⇒ x.path - case 'stopped ⇒ - if (state.compareAndSet('stopped, 'busy)) { - // TODO: are there any exceptions "expected" from the `tempPath` call that we need to take care of? - state.set(new RefStopped(provider.tempPath())) + } finally { + Unsafe.instance.putObjectVolatile(this, AbstractPromiseActorRef.stateOffset, p) } - path - case 'busy ⇒ path - case x ⇒ throw new IllegalStateException(x.toString) - } + } else path + case p: ActorPath ⇒ p + case x: RefStopped ⇒ x.path + case Stopped ⇒ + // even if we are already stopped we still need to produce a proper path + Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, + Stopped, new RefStopped(provider.tempPath())) + path + case Registering ⇒ path // spin until registration is completed + case x ⇒ throw new IllegalStateException + } - override def !(message: Any)(implicit sender: ActorRef = null): Unit = { - try { - message match { - case Status.Success(r) ⇒ result.success(r) - case Status.Failure(f) ⇒ result.failure(f) - case other ⇒ result.success(other) - } - } catch { - case _: IllegalStateException ⇒ // drop "already completed" error + override def !(message: Any)(implicit sender: ActorRef = null): Unit = state match { + case Stopped ⇒ + case _: RefStopped ⇒ + case _ ⇒ result.tryComplete { + message match { + case Status.Success(r) ⇒ Right(r) + case Status.Failure(f) ⇒ Left(f) + case other ⇒ Right(other) } } - - override def sendSystemMessage(message: SystemMessage): Unit = message match { - case _: Terminate ⇒ stop() - case _ ⇒ - } - - override def isTerminated = result.isCompleted - - @tailrec - override def stop(): Unit = state.get match { - case null ⇒ - if (state.compareAndSet(null, 'busy)) { - state.set('stopped) - // TODO: are there any exceptions "expected" from the following line that we need to take care of? - deathWatch.publish(Terminated(this)) - } else stop() - case p: ActorPath ⇒ - if (state.compareAndSet(p, 'busy)) { - state.set(new RefStopped(p)) - // TODO: are there any exceptions "expected" from the following two lines that we need to take care of? - deathWatch.publish(Terminated(this)) - provider.unregisterTempActor(p) - } else stop() - case 'busy ⇒ stop() - case _ ⇒ // 'stopped and RefStopped do not require any action here - } } - /** - * INTERNAL AKKA USE ONLY - */ - private[akka] def createAsker(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = { + override def sendSystemMessage(message: SystemMessage): Unit = message match { + case _: Terminate ⇒ stop() + case _ ⇒ + } + + override def isTerminated = result.isCompleted + + @tailrec + override def stop(): Unit = state match { + case null ⇒ + // if path was never called nobody can possibly be watching us, so we don't have to publish termination either + if (!Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, null, Stopped)) stop() + case p: ActorPath ⇒ + if (Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, p, new RefStopped(p))) { + try { + deathWatch.publish(Terminated(this)) + } finally { + provider.unregisterTempActor(p) + } + } else stop() + case Registering ⇒ stop() // spin until registration is completed before stopping + case Stopped ⇒ + case _: RefStopped ⇒ + case x ⇒ throw new IllegalStateException + } +} + +private[akka] object PromiseActorRef { + private val Registering = new AnyRef + private val Stopped = new AnyRef + + private final class RefStopped(val path: ActorPath) + + def apply(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = { val result = Promise[Any]()(provider.dispatcher) val a = new PromiseActorRef(provider, provider.tempContainer, result, provider.deathWatch) val f = provider.scheduler.scheduleOnce(timeout.duration) { result.tryComplete(Left(new AskTimeoutException("Timed out"))) } @@ -256,6 +267,4 @@ trait AskSupport { } a } -} - -private[akka] final class RefStopped(val path: ActorPath) \ No newline at end of file +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index e0b205fff0..97ac930b78 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -1066,7 +1066,7 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ { case (sender, message) ⇒ val provider: ActorRefProvider = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider - val asker = akka.pattern.createAsker(provider, within) + val asker = akka.pattern.PromiseActorRef(provider, within) asker.result.pipeTo(sender) toAll(asker, routeeProvider.routees) }