remove LocalActorRef.underlyingActorInstance
- was used only in BalancingDispatcherSpec (was dirty, fixed dirty) and TestActorRef - fixing the latter involved opening up ActorCell slightly for allowing new AutoReceivedMessage types (that trait is now not sealed anymore) - TestActorRef then checks underlying.actor and retrieves it using a message-based request if it is still null
This commit is contained in:
parent
57d88594b6
commit
d4a764cfd9
5 changed files with 69 additions and 41 deletions
|
|
@ -71,9 +71,9 @@ class BalancingDispatcherSpec extends AkkaSpec {
|
|||
finishedCounter.await(5, TimeUnit.SECONDS)
|
||||
fast.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
|
||||
slow.underlying.mailbox.asInstanceOf[Mailbox].hasMessages must be(false)
|
||||
fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
|
||||
fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be >
|
||||
(slow.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount)
|
||||
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
|
||||
fast.underlying.actor.asInstanceOf[DelayableActor].invocationCount must be >
|
||||
(slow.underlying.actor.asInstanceOf[DelayableActor].invocationCount)
|
||||
slow.stop()
|
||||
fast.stop()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ import java.util.regex.Pattern
|
|||
/**
|
||||
* Marker trait to show which Messages are automatically handled by Akka
|
||||
*/
|
||||
sealed trait AutoReceivedMessage extends Serializable
|
||||
trait AutoReceivedMessage extends Serializable
|
||||
|
||||
trait PossiblyHarmful
|
||||
|
||||
|
|
|
|||
|
|
@ -171,7 +171,7 @@ private[akka] object ActorCell {
|
|||
//ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit)
|
||||
//vars don't need volatile since it's protected with the mailbox status
|
||||
//Make sure that they are not read/written outside of a message processing (systemInvoke/invoke)
|
||||
private[akka] final class ActorCell(
|
||||
private[akka] class ActorCell(
|
||||
val system: ActorSystemImpl,
|
||||
val self: InternalActorRef,
|
||||
val props: Props,
|
||||
|
|
@ -181,7 +181,7 @@ private[akka] final class ActorCell(
|
|||
|
||||
import ActorCell._
|
||||
|
||||
def systemImpl = system
|
||||
final def systemImpl = system
|
||||
|
||||
protected final def guardian = self
|
||||
|
||||
|
|
@ -189,9 +189,9 @@ private[akka] final class ActorCell(
|
|||
|
||||
final def provider = system.provider
|
||||
|
||||
override def receiveTimeout: Option[Duration] = if (receiveTimeoutData._1 > 0) Some(Duration(receiveTimeoutData._1, MILLISECONDS)) else None
|
||||
override final def receiveTimeout: Option[Duration] = if (receiveTimeoutData._1 > 0) Some(Duration(receiveTimeoutData._1, MILLISECONDS)) else None
|
||||
|
||||
override def receiveTimeout_=(timeout: Option[Duration]): Unit = {
|
||||
override final def receiveTimeout_=(timeout: Option[Duration]): Unit = {
|
||||
val timeoutMs = timeout match {
|
||||
case None ⇒ -1L
|
||||
case Some(duration) ⇒
|
||||
|
|
@ -207,20 +207,20 @@ private[akka] final class ActorCell(
|
|||
/**
|
||||
* In milliseconds
|
||||
*/
|
||||
var receiveTimeoutData: (Long, Cancellable) =
|
||||
final var receiveTimeoutData: (Long, Cancellable) =
|
||||
if (_receiveTimeout.isDefined) (_receiveTimeout.get.toMillis, emptyCancellable) else emptyReceiveTimeoutData
|
||||
|
||||
/**
|
||||
* UntypedActorContext impl
|
||||
*/
|
||||
def getReceiveTimeout: Option[Duration] = receiveTimeout
|
||||
final def getReceiveTimeout: Option[Duration] = receiveTimeout
|
||||
|
||||
/**
|
||||
* UntypedActorContext impl
|
||||
*/
|
||||
def setReceiveTimeout(timeout: Duration): Unit = receiveTimeout = Some(timeout)
|
||||
final def setReceiveTimeout(timeout: Duration): Unit = receiveTimeout = Some(timeout)
|
||||
|
||||
var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
|
||||
final var childrenRefs: TreeMap[String, ChildRestartStats] = emptyChildrenRefs
|
||||
|
||||
private def _actorOf(props: Props, name: String): ActorRef = {
|
||||
val actor = provider.actorOf(systemImpl, props, guardian, name, false)
|
||||
|
|
@ -238,19 +238,19 @@ private[akka] final class ActorCell(
|
|||
_actorOf(props, name)
|
||||
}
|
||||
|
||||
var currentMessage: Envelope = null
|
||||
final var currentMessage: Envelope = null
|
||||
|
||||
var actor: Actor = _
|
||||
final var actor: Actor = _
|
||||
|
||||
var stopping = false
|
||||
final var stopping = false
|
||||
|
||||
@volatile //This must be volatile since it isn't protected by the mailbox status
|
||||
var mailbox: Mailbox = _
|
||||
|
||||
var nextNameSequence: Long = 0
|
||||
final var nextNameSequence: Long = 0
|
||||
|
||||
//Not thread safe, so should only be used inside the actor that inhabits this ActorCell
|
||||
protected def randomName(): String = {
|
||||
final protected def randomName(): String = {
|
||||
val n = nextNameSequence + 1
|
||||
nextNameSequence = n
|
||||
Helpers.base64(n)
|
||||
|
|
@ -262,7 +262,7 @@ private[akka] final class ActorCell(
|
|||
/**
|
||||
* UntypedActorContext impl
|
||||
*/
|
||||
def getDispatcher(): MessageDispatcher = dispatcher
|
||||
final def getDispatcher(): MessageDispatcher = dispatcher
|
||||
|
||||
final def isTerminated: Boolean = mailbox.isClosed
|
||||
|
||||
|
|
@ -282,7 +282,7 @@ private[akka] final class ActorCell(
|
|||
final def resume(): Unit = dispatcher.systemDispatch(this, Resume())
|
||||
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
private[akka] def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
|
||||
final def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
|
||||
|
||||
override final def watch(subject: ActorRef): ActorRef = {
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
|
|
@ -301,7 +301,7 @@ private[akka] final class ActorCell(
|
|||
/**
|
||||
* Impl UntypedActorContext
|
||||
*/
|
||||
def getChildren(): java.lang.Iterable[ActorRef] = {
|
||||
final def getChildren(): java.lang.Iterable[ActorRef] = {
|
||||
import scala.collection.JavaConverters.asJavaIterableConverter
|
||||
asJavaIterableConverter(children).asJava
|
||||
}
|
||||
|
|
|
|||
|
|
@ -190,7 +190,7 @@ private[akka] case object Nobody extends MinimalActorRef {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
private[akka] class LocalActorRef private[akka] (
|
||||
system: ActorSystemImpl,
|
||||
_system: ActorSystemImpl,
|
||||
_props: Props,
|
||||
_supervisor: InternalActorRef,
|
||||
val path: ActorPath,
|
||||
|
|
@ -209,9 +209,18 @@ private[akka] class LocalActorRef private[akka] (
|
|||
* us to use purely factory methods for creating LocalActorRefs.
|
||||
*/
|
||||
@volatile
|
||||
private var actorCell = new ActorCell(system, this, _props, _supervisor, _receiveTimeout, _hotswap)
|
||||
private var actorCell = newActorCell(_system, this, _props, _supervisor, _receiveTimeout, _hotswap)
|
||||
actorCell.start()
|
||||
|
||||
protected def newActorCell(
|
||||
system: ActorSystemImpl,
|
||||
ref: InternalActorRef,
|
||||
props: Props,
|
||||
supervisor: InternalActorRef,
|
||||
receiveTimeout: Option[Duration],
|
||||
hotswap: Stack[PartialFunction[Any, Unit]]): ActorCell =
|
||||
new ActorCell(system, ref, props, supervisor, receiveTimeout, hotswap)
|
||||
|
||||
protected def actorContext: ActorContext = actorCell
|
||||
|
||||
/**
|
||||
|
|
@ -228,13 +237,11 @@ private[akka] class LocalActorRef private[akka] (
|
|||
* message sends done from the same thread after calling this method will not
|
||||
* be processed until resumed.
|
||||
*/
|
||||
//FIXME TODO REMOVE THIS, NO REPLACEMENT, ticket #1415
|
||||
def suspend(): Unit = actorCell.suspend()
|
||||
|
||||
/**
|
||||
* Resumes a suspended actor.
|
||||
*/
|
||||
//FIXME TODO REMOVE THIS, NO REPLACEMENT, ticket #1415
|
||||
def resume(): Unit = actorCell.resume()
|
||||
|
||||
/**
|
||||
|
|
@ -284,17 +291,6 @@ private[akka] class LocalActorRef private[akka] (
|
|||
|
||||
protected[akka] def underlying: ActorCell = actorCell
|
||||
|
||||
// FIXME TODO: remove this method. It is used in testkit.
|
||||
// @deprecated("This method does a spin-lock to block for the actor, which might never be there, do not use this", "2.0")
|
||||
protected[akka] def underlyingActorInstance: Actor = {
|
||||
var instance = actorCell.actor
|
||||
while ((instance eq null) && !actorCell.isTerminated) {
|
||||
try { Thread.sleep(1) } catch { case i: InterruptedException ⇒ }
|
||||
instance = actorCell.actor
|
||||
}
|
||||
instance
|
||||
}
|
||||
|
||||
def sendSystemMessage(message: SystemMessage) { underlying.dispatcher.systemDispatch(underlying, message) }
|
||||
|
||||
def !(message: Any)(implicit sender: ActorRef = null): Unit = actorCell.tell(message, sender)
|
||||
|
|
|
|||
|
|
@ -5,13 +5,14 @@
|
|||
package akka.testkit
|
||||
|
||||
import akka.actor._
|
||||
import akka.util.ReflectiveAccess
|
||||
import akka.util.{ ReflectiveAccess, Duration }
|
||||
import com.eaio.uuid.UUID
|
||||
import akka.actor.Props._
|
||||
import akka.actor.ActorSystem
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import akka.event.EventStream
|
||||
import akka.dispatch.{ DefaultDispatcherPrerequisites, DispatcherPrerequisites, Mailbox }
|
||||
import akka.dispatch.{ DefaultDispatcherPrerequisites, DispatcherPrerequisites, Mailbox, Envelope }
|
||||
import scala.collection.immutable.Stack
|
||||
|
||||
/**
|
||||
* This special ActorRef is exclusively for use during unit testing in a single-threaded environment. Therefore, it
|
||||
|
|
@ -27,20 +28,51 @@ class TestActorRef[T <: Actor](
|
|||
_props: Props,
|
||||
_supervisor: InternalActorRef,
|
||||
name: String)
|
||||
extends LocalActorRef(_system, _props.withDispatcher(new CallingThreadDispatcher(_prerequisites)), _supervisor, _supervisor.path / name, false) {
|
||||
extends LocalActorRef(
|
||||
_system,
|
||||
_props.withDispatcher(new CallingThreadDispatcher(_prerequisites)),
|
||||
_supervisor,
|
||||
_supervisor.path / name,
|
||||
false) {
|
||||
|
||||
private case object InternalGetActor extends AutoReceivedMessage
|
||||
|
||||
override def newActorCell(
|
||||
system: ActorSystemImpl,
|
||||
ref: InternalActorRef,
|
||||
props: Props,
|
||||
supervisor: InternalActorRef,
|
||||
receiveTimeout: Option[Duration],
|
||||
hotswap: Stack[PartialFunction[Any, Unit]]): ActorCell =
|
||||
new ActorCell(system, ref, props, supervisor, receiveTimeout, hotswap) {
|
||||
override def autoReceiveMessage(msg: Envelope) {
|
||||
msg.message match {
|
||||
case InternalGetActor ⇒ sender ! actor
|
||||
case _ ⇒ super.autoReceiveMessage(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Directly inject messages into actor receive behavior. Any exceptions
|
||||
* thrown will be available to you, while still being able to use
|
||||
* become/unbecome and their message counterparts.
|
||||
* become/unbecome.
|
||||
*/
|
||||
def apply(o: Any) { underlyingActorInstance.apply(o) }
|
||||
def apply(o: Any) { underlyingActor.apply(o) }
|
||||
|
||||
/**
|
||||
* Retrieve reference to the underlying actor, where the static type matches the factory used inside the
|
||||
* constructor. Beware that this reference is discarded by the ActorRef upon restarting the actor (should this
|
||||
* reference be linked to a supervisor). The old Actor may of course still be used in post-mortem assertions.
|
||||
*/
|
||||
def underlyingActor: T = underlyingActorInstance.asInstanceOf[T]
|
||||
def underlyingActor: T = {
|
||||
// volatile mailbox read to bring in actor field
|
||||
if (isTerminated) throw new IllegalActorStateException("underlying actor is terminated")
|
||||
underlying.actor.asInstanceOf[T] match {
|
||||
case null ⇒ ?(InternalGetActor)(underlying.system.settings.ActorTimeout).get.asInstanceOf[T]
|
||||
case ref ⇒ ref
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers this actor to be a death monitor of the provided ActorRef
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue