Another round of improvements to PromiseActorRef

This commit is contained in:
Mathias 2012-03-24 23:02:37 +01:00
parent 8fc66ce044
commit 791ec0f611

View file

@ -4,13 +4,12 @@
package akka.pattern
import java.util.concurrent.TimeoutException
import akka.actor.{ Terminated, Status, MinimalActorRef, InternalActorRef, ActorRef, ActorPath }
import akka.dispatch.{ Promise, Terminate, SystemMessage, Future }
import akka.event.DeathWatch
import akka.actor.ActorRefProvider
import akka.util.Timeout
import annotation.tailrec
import akka.util.Unsafe
import akka.actor._
/**
* This is what is used to complete a Future that is returned from an ask/? call,
@ -165,64 +164,70 @@ private[akka] final class PromiseActorRef private (
val result: Promise[Any],
val deathWatch: DeathWatch) extends MinimalActorRef {
import PromiseActorRef._
import AbstractPromiseActorRef.stateOffset
/**
* As an optimization for the common (local) case we only register this PromiseActorRef
* with the provider when the `path` member is actually queried, which happens during
* serialization (but also during a simple `toString`!).
* serialization (but also during a simple call to `toString`, `equals` or `hashCode`!).
*
* Defined states:
* null => started, path not yet created
* Registering => currently creating temp path and registering it
* path: ActorPath => path is available and was registered
* RefStopped(path) => stopped, path available
* Stopped => stopped, path not yet created
* null => started, path not yet created
* Registering => currently creating temp path and registering it
* path: ActorPath => path is available and was registered
* StoppedWithPath(path) => stopped, path available
* Stopped => stopped, path not yet created
*/
@volatile
private var _stateDoNotCallMeDirectly: AnyRef = _
@inline
final def state: AnyRef = Unsafe.instance.getObjectVolatile(this, AbstractPromiseActorRef.stateOffset)
private def state: AnyRef = Unsafe.instance.getObjectVolatile(this, stateOffset)
@inline
private def updateState(oldState: AnyRef, newState: AnyRef): Boolean =
Unsafe.instance.compareAndSwapObject(this, stateOffset, oldState, newState)
@inline
private def setState(newState: AnyRef) =
Unsafe.instance.putObjectVolatile(this, stateOffset, newState)
/**
* Contract of this method:
* Must always return a valid path string, which must have
* Must always return the same ActorPath, which must have
* been registered if we haven't been stopped yet.
*/
@tailrec
def path: ActorPath = state match {
case null
if (Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, null, Registering)) {
if (updateState(null, Registering)) {
var p: ActorPath = null
try {
p = provider.tempPath()
provider.registerTempActor(this, p)
p
} finally {
Unsafe.instance.putObjectVolatile(this, AbstractPromiseActorRef.stateOffset, p)
}
} finally { setState(p) }
} else path
case p: ActorPath p
case x: RefStopped x.path
case p: ActorPath p
case StoppedWithPath(p) p
case Stopped
// even if we are already stopped we still need to produce a proper path
Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset,
Stopped, new RefStopped(provider.tempPath()))
updateState(Stopped, StoppedWithPath(provider.tempPath()))
path
case Registering path // spin until registration is completed
case x throw new IllegalStateException
}
override def !(message: Any)(implicit sender: ActorRef = null): Unit = state match {
case Stopped
case _: RefStopped
case _ result.tryComplete {
message match {
case Status.Success(r) Right(r)
case Status.Failure(f) Left(f)
case other Right(other)
case Stopped | _: StoppedWithPath provider.deadLetters ! message
case _
val completedJustNow = result.tryComplete {
message match {
case Status.Success(r) Right(r)
case Status.Failure(f) Left(f)
case other Right(other)
}
}
}
if (!completedJustNow) provider.deadLetters ! message
}
override def sendSystemMessage(message: SystemMessage): Unit = message match {
@ -230,33 +235,39 @@ private[akka] final class PromiseActorRef private (
case _
}
override def isTerminated = result.isCompleted
override def isTerminated = state match {
case Stopped | _: StoppedWithPath true
case _ false
}
@tailrec
override def stop(): Unit = state match {
case null
// if path was never called nobody can possibly be watching us, so we don't have to publish termination either
if (!Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, null, Stopped)) stop()
case p: ActorPath
if (Unsafe.instance.compareAndSwapObject(this, AbstractPromiseActorRef.stateOffset, p, new RefStopped(p))) {
try {
deathWatch.publish(Terminated(this))
} finally {
provider.unregisterTempActor(p)
}
} else stop()
case Registering stop() // spin until registration is completed before stopping
case Stopped
case _: RefStopped
case x throw new IllegalStateException
override def stop(): Unit = {
def ensurePromiseCompleted(): Unit =
if (!result.isCompleted) result.tryComplete(Left(new ActorKilledException("Stopped")))
state match {
case null
// if path was never queried nobody can possibly be watching us, so we don't have to publish termination either
if (updateState(null, Stopped)) ensurePromiseCompleted()
else stop()
case p: ActorPath
if (updateState(p, StoppedWithPath(p))) {
try {
deathWatch.publish(Terminated(this))
ensurePromiseCompleted()
} finally {
provider.unregisterTempActor(p)
}
} else stop()
case Stopped | _: StoppedWithPath
case Registering stop() // spin until registration is completed before stopping
}
}
}
private[akka] object PromiseActorRef {
private val Registering = new AnyRef
private val Stopped = new AnyRef
private final class RefStopped(val path: ActorPath)
private case object Registering
private case object Stopped
private case class StoppedWithPath(path: ActorPath)
def apply(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = {
val result = Promise[Any]()(provider.dispatcher)