From d4a764cfd94b4ccc7a451210a4bb6818deddcecc Mon Sep 17 00:00:00 2001 From: Roland Date: Sat, 10 Dec 2011 16:56:53 +0100 Subject: [PATCH] remove LocalActorRef.underlyingActorInstance - was used only in BalancingDispatcherSpec (was dirty, fixed dirty) and TestActorRef - fixing the latter involved opening up ActorCell slightly for allowing new AutoReceivedMessage types (that trait is now not sealed anymore) - TestActorRef then checks underlying.actor and retrieves it using a message-based request if it is still null --- .../dispatch/BalancingDispatcherSpec.scala | 6 +-- .../src/main/scala/akka/actor/Actor.scala | 2 +- .../src/main/scala/akka/actor/ActorCell.scala | 32 +++++++------- .../src/main/scala/akka/actor/ActorRef.scala | 26 +++++------ .../scala/akka/testkit/TestActorRef.scala | 44 ++++++++++++++++--- 5 files changed, 69 insertions(+), 41 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala index 5ad7794f77..6ebc81409e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/BalancingDispatcherSpec.scala @@ -71,9 +71,9 @@ class BalancingDispatcherSpec extends AkkaSpec { finishedCounter.await(5, TimeUnit.SECONDS) fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false) - fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > sentToFast - fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > - (slow.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount) + fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast + fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > + (slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount) slow.stop() fast.stop() } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index ff41b5c2be..b7a08e9946 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -25,7 +25,7 @@ import java.util.regex.Pattern /** * Marker trait to show which Messages are automatically handled by Akka */ -sealed trait AutoReceivedMessage extends Serializable +trait AutoReceivedMessage extends Serializable trait PossiblyHarmful diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 4e0cefa194..fb1a7a5636 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -171,7 +171,7 @@ private[akka] object ActorCell { //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) //vars don't need volatile since it's protected with the mailbox status //Make sure that they are not read/written outside of a message processing (systemInvoke/invoke) -private[akka] final class ActorCell( +private[akka] class ActorCell( val system: ActorSystemImpl, val self: InternalActorRef, val props: Props, @@ -181,7 +181,7 @@ private[akka] final class ActorCell( import ActorCell._ - def systemImpl = system + final def systemImpl = system protected final def guardian = self @@ -189,9 +189,9 @@ private[akka] final class ActorCell( final def provider = system.provider - override def receiveTimeout: Option[Duration] = if (receiveTimeoutData._1 > 0) Some(Duration(receiveTimeoutData._1, MILLISECONDS)) else None + override final def receiveTimeout: Option[Duration] = if (receiveTimeoutData._1 > 0) Some(Duration(receiveTimeoutData._1, MILLISECONDS)) else None - override def receiveTimeout_=(timeout: Option[Duration]): Unit = { + override final def receiveTimeout_=(timeout: Option[Duration]): Unit = { val timeoutMs = timeout match { case None ⇒ -1L case Some(duration) ⇒ @@ -207,20 +207,20 @@ private[akka] final class ActorCell( /** * In milliseconds */ - var receiveTimeoutData: (Long, Cancellable) = + final var receiveTimeoutData: (Long, Cancellable) = if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData /** * UntypedActorContext impl */ - def getReceiveTimeout: Option[Duration] = receiveTimeout + final def getReceiveTimeout: Option[Duration] = receiveTimeout /** * UntypedActorContext impl */ - def setReceiveTimeout(timeout: Duration): Unit = receiveTimeout = Some(timeout) + final def setReceiveTimeout(timeout: Duration): Unit = receiveTimeout = Some(timeout) - var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs + final var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs private def _actorOf(props: Props, name: String): ActorRef = { val actor = provider.actorOf(systemImpl, props, guardian, name, false) @@ -238,19 +238,19 @@ private[akka] final class ActorCell( _actorOf(props, name) } - var currentMessage: Envelope = null + final var currentMessage: Envelope = null - var actor: Actor = _ + final var actor: Actor = _ - var stopping = false + final var stopping = false @volatile //This must be volatile since it isn't protected by the mailbox status var mailbox: Mailbox = _ - var nextNameSequence: Long = 0 + final var nextNameSequence: Long = 0 //Not thread safe, so should only be used inside the actor that inhabits this ActorCell - protected def randomName(): String = { + final protected def randomName(): String = { val n = nextNameSequence + 1 nextNameSequence = n Helpers.base64(n) @@ -262,7 +262,7 @@ private[akka] final class ActorCell( /** * UntypedActorContext impl */ - def getDispatcher(): MessageDispatcher = dispatcher + final def getDispatcher(): MessageDispatcher = dispatcher final def isTerminated: Boolean = mailbox.isClosed @@ -282,7 +282,7 @@ private[akka] final class ActorCell( final def resume(): Unit = dispatcher.systemDispatch(this, Resume()) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - private[akka] def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) + final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) override final def watch(subject: ActorRef): ActorRef = { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ @@ -301,7 +301,7 @@ private[akka] final class ActorCell( /** * Impl UntypedActorContext */ - def getChildren(): java.lang.Iterable[ActorRef] = { + final def getChildren(): java.lang.Iterable[ActorRef] = { import scala.collection.JavaConverters.asJavaIterableConverter asJavaIterableConverter(children).asJava } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 0c72fc8901..33e501a74b 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -190,7 +190,7 @@ private[akka] case object Nobody extends MinimalActorRef { * @author Jonas Bonér */ private[akka] class LocalActorRef private[akka] ( - system: ActorSystemImpl, + _system: ActorSystemImpl, _props: Props, _supervisor: InternalActorRef, val path: ActorPath, @@ -209,9 +209,18 @@ private[akka] class LocalActorRef private[akka] ( * us to use purely factory methods for creating LocalActorRefs. */ @volatile - private var actorCell = new ActorCell(system, this, _props, _supervisor, _receiveTimeout, _hotswap) + private var actorCell = newActorCell(_system, this, _props, _supervisor, _receiveTimeout, _hotswap) actorCell.start() + protected def newActorCell( + system: ActorSystemImpl, + ref: InternalActorRef, + props: Props, + supervisor: InternalActorRef, + receiveTimeout: Option[Duration], + hotswap: Stack[PartialFunction[Any, Unit]]): ActorCell = + new ActorCell(system, ref, props, supervisor, receiveTimeout, hotswap) + protected def actorContext: ActorContext = actorCell /** @@ -228,13 +237,11 @@ private[akka] class LocalActorRef private[akka] ( * message sends done from the same thread after calling this method will not * be processed until resumed. */ - //FIXME TODO REMOVE THIS, NO REPLACEMENT, ticket #1415 def suspend(): Unit = actorCell.suspend() /** * Resumes a suspended actor. */ - //FIXME TODO REMOVE THIS, NO REPLACEMENT, ticket #1415 def resume(): Unit = actorCell.resume() /** @@ -284,17 +291,6 @@ private[akka] class LocalActorRef private[akka] ( protected[akka] def underlying: ActorCell = actorCell - // FIXME TODO: remove this method. It is used in testkit. - // @deprecated("This method does a spin-lock to block for the actor, which might never be there, do not use this", "2.0") - protected[akka] def underlyingActorInstance: Actor = { - var instance = actorCell.actor - while ((instance eq null) && !actorCell.isTerminated) { - try { Thread.sleep(1) } catch { case i: InterruptedException ⇒ } - instance = actorCell.actor - } - instance - } - def sendSystemMessage(message: SystemMessage) { underlying.dispatcher.systemDispatch(underlying, message) } def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index ecbfcb315c..c8e956211e 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -5,13 +5,14 @@ package akka.testkit import akka.actor._ -import akka.util.ReflectiveAccess +import akka.util.{ ReflectiveAccess, Duration } import com.eaio.uuid.UUID import akka.actor.Props._ import akka.actor.ActorSystem import java.util.concurrent.atomic.AtomicLong import akka.event.EventStream -import akka.dispatch.{ DefaultDispatcherPrerequisites, DispatcherPrerequisites, Mailbox } +import akka.dispatch.{ DefaultDispatcherPrerequisites, DispatcherPrerequisites, Mailbox, Envelope } +import scala.collection.immutable.Stack /** * This special ActorRef is exclusively for use during unit testing in a single-threaded environment. Therefore, it @@ -27,20 +28,51 @@ class TestActorRef[T <: Actor]( _props: Props, _supervisor: InternalActorRef, name: String) - extends LocalActorRef(_system, _props.withDispatcher(new CallingThreadDispatcher(_prerequisites)), _supervisor, _supervisor.path / name, false) { + extends LocalActorRef( + _system, + _props.withDispatcher(new CallingThreadDispatcher(_prerequisites)), + _supervisor, + _supervisor.path / name, + false) { + + private case object InternalGetActor extends AutoReceivedMessage + + override def newActorCell( + system: ActorSystemImpl, + ref: InternalActorRef, + props: Props, + supervisor: InternalActorRef, + receiveTimeout: Option[Duration], + hotswap: Stack[PartialFunction[Any, Unit]]): ActorCell = + new ActorCell(system, ref, props, supervisor, receiveTimeout, hotswap) { + override def autoReceiveMessage(msg: Envelope) { + msg.message match { + case InternalGetActor ⇒ sender ! actor + case _ ⇒ super.autoReceiveMessage(msg) + } + } + } + /** * Directly inject messages into actor receive behavior. Any exceptions * thrown will be available to you, while still being able to use - * become/unbecome and their message counterparts. + * become/unbecome. */ - def apply(o: Any) { underlyingActorInstance.apply(o) } + def apply(o: Any) { underlyingActor.apply(o) } /** * Retrieve reference to the underlying actor, where the static type matches the factory used inside the * constructor. Beware that this reference is discarded by the ActorRef upon restarting the actor (should this * reference be linked to a supervisor). The old Actor may of course still be used in post-mortem assertions. */ - def underlyingActor: T = underlyingActorInstance.asInstanceOf[T] + def underlyingActor: T = { + // volatile mailbox read to bring in actor field + if (isTerminated) throw new IllegalActorStateException("underlying actor is terminated") + underlying.actor.asInstanceOf[T] match { + case null ⇒ ?(InternalGetActor)(underlying.system.settings.ActorTimeout).get.asInstanceOf[T] + case ref ⇒ ref + } + } /** * Registers this actor to be a death monitor of the provided ActorRef