From cb7271f9741742f0b41b08ca01729bf7a1bda7f9 Mon Sep 17 00:00:00 2001 From: Mathias Date: Fri, 23 Mar 2012 15:52:36 +0100 Subject: [PATCH] Make provider registration of PromiseActorRefs lazy By only registering a PromiseActorRef on access to `path` we can shave off about 30% of the overhead involved in the common, purely local `ask` use case. Unfortunately `path` is accessed not only for ActorRef serialization but also during `toString`, `equals` and `hashCode`, so one should be careful how to handle PromiseActorRefs in hot paths. --- .../main/scala/akka/pattern/AskSupport.scala | 73 ++++++++++++++++--- 1 file changed, 63 insertions(+), 10 deletions(-) diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index c96262fdbc..4dc8e69016 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -3,13 +3,14 @@ */ package akka.pattern -import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.TimeoutException import akka.actor.{ Terminated, Status, MinimalActorRef, InternalActorRef, ActorRef, ActorPath } import akka.dispatch.{ Promise, Terminate, SystemMessage, Future } import akka.event.DeathWatch import akka.actor.ActorRefProvider import akka.util.Timeout +import annotation.tailrec /** * This is what is used to complete a Future that is returned from an ask/? call, @@ -159,12 +160,50 @@ trait AskSupport { */ private[akka] final class PromiseActorRef( val provider: ActorRefProvider, - val path: ActorPath, override val getParent: InternalActorRef, val result: Promise[Any], val deathWatch: DeathWatch) extends MinimalActorRef { - final val running = new AtomicBoolean(true) + /** + * As an optimization for the common (local) case we only register this PromiseActorRef + * with the provider when the `path` member is actually queried, which happens during + * serialization (but also during a simple `toString`!). + * + * Defined states: + * null => started, path not yet created + * path: ActorPath => path is available and was registered + * RefStopped(path) => stopped, path available + * 'stopped => stopped, path not yet created + * 'busy => another thread is in a state change + */ + final val state = new AtomicReference[Any] + + /** + * Contract of this method: + * Must always return a valid path string, which must have + * been registered if we haven't been stopped yet. + */ + @tailrec + def path: ActorPath = state.get match { + case null ⇒ + if (state.compareAndSet(null, 'busy)) { + // TODO: are there any exceptions "expected" from the following two lines that we need to take care of? + val p = provider.tempPath() + provider.registerTempActor(this, p) + state.set(p) + p + } else path + case p: ActorPath ⇒ p + case x: RefStopped ⇒ x.path + case 'stopped ⇒ + if (state.compareAndSet('stopped, 'busy)) { + // TODO: are there any exceptions "expected" from the `tempPath` call that we need to take care of? + state.set(new RefStopped(provider.tempPath())) + } + path + case 'busy ⇒ path + case x ⇒ throw new IllegalStateException(x.toString) + } override def !(message: Any)(implicit sender: ActorRef = null): Unit = { try { @@ -185,8 +224,23 @@ trait AskSupport { override def isTerminated = result.isCompleted - override def stop(): Unit = if (running.getAndSet(false)) { - deathWatch.publish(Terminated(this)) + @tailrec + override def stop(): Unit = state.get match { + case null ⇒ + if (state.compareAndSet(null, 'busy)) { + state.set('stopped) + // TODO: are there any exceptions "expected" from the following line that we need to take care of? + deathWatch.publish(Terminated(this)) + } else stop() + case p: ActorPath ⇒ + if (state.compareAndSet(p, 'busy)) { + state.set(new RefStopped(p)) + // TODO: are there any exceptions "expected" from the following two lines that we need to take care of? + deathWatch.publish(Terminated(this)) + provider.unregisterTempActor(p) + } else stop() + case 'busy ⇒ stop() + case _ ⇒ // 'stopped and RefStopped do not require any action here } } @@ -194,15 +248,14 @@ trait AskSupport { * INTERNAL AKKA USE ONLY */ private[akka] def createAsker(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = { - val path = provider.tempPath() val result = Promise[Any]()(provider.dispatcher) - val a = new PromiseActorRef(provider, path, provider.tempContainer, result, provider.deathWatch) - provider.registerTempActor(a, path) + val a = new PromiseActorRef(provider, provider.tempContainer, result, provider.deathWatch) val f = provider.scheduler.scheduleOnce(timeout.duration) { result.tryComplete(Left(new AskTimeoutException("Timed out"))) } result onComplete { _ ⇒ - try { try a.stop() finally f.cancel() } - finally { provider.unregisterTempActor(path) } + try a.stop() finally f.cancel() } a } } + +private[akka] final class RefStopped(val path: ActorPath) \ No newline at end of file