Initial breakout of ActorInstance. See #1195

This commit is contained in:
Peter Vlugter 2011-09-15 08:12:07 +02:00
parent b94d1cee36
commit b96f3d9260
30 changed files with 838 additions and 762 deletions

View file

@ -129,7 +129,7 @@ class ActorRefSpec extends WordSpec with MustMatchers {
})
}
def refStackMustBeEmpty = Actor.actorRefInCreation.get.headOption must be === None
def refStackMustBeEmpty = ActorInstance.refStack.get.headOption must be === None
refStackMustBeEmpty

View file

@ -62,9 +62,9 @@ class ActorRegistrySpec extends JUnitSuite with BeforeAndAfterAll {
def shouldFindThingsFromLocalActorRegistry {
Actor.registry.local.shutdownAll
val actor = actorOf[TestActor]("test-actor-1")
val found: Option[LocalActorRef] = Actor.registry.local.find({ case a: LocalActorRef if a.actorInstance.get().isInstanceOf[TestActor] a })
val found: Option[LocalActorRef] = Actor.registry.local.find({ case a: LocalActorRef if a.underlyingActorInstance.isInstanceOf[TestActor] a })
assert(found.isDefined)
assert(found.get.actorInstance.get().isInstanceOf[TestActor])
assert(found.get.underlyingActorInstance.isInstanceOf[TestActor])
assert(found.get.address === "test-actor-1")
actor.stop
}
@ -76,8 +76,8 @@ class ActorRegistrySpec extends JUnitSuite with BeforeAndAfterAll {
val actor2 = actorOf[TestActor]("test-actor-2")
val actors = Actor.registry.local.actors
assert(actors.size === 2)
assert(actors.find(_.address == "test-actor-2").get.asInstanceOf[LocalActorRef].actorInstance.get().isInstanceOf[TestActor])
assert(actors.find(_.address == "test-actor-1").get.asInstanceOf[LocalActorRef].actorInstance.get().isInstanceOf[TestActor])
assert(actors.find(_.address == "test-actor-2").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor])
assert(actors.find(_.address == "test-actor-1").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor])
actor1.stop
actor2.stop
}

View file

@ -120,12 +120,12 @@ class FSMTimingSpec extends WordSpec with MustMatchers with TestKit {
object FSMTimingSpec {
def suspend(actorRef: ActorRef): Unit = actorRef match {
case l: LocalActorRef l.dispatcher.suspend(l)
case l: LocalActorRef l.suspend()
case _
}
def resume(actorRef: ActorRef): Unit = actorRef match {
case l: LocalActorRef l.dispatcher.resume(l)
case l: LocalActorRef l.resume()
case _
}

View file

@ -103,28 +103,28 @@ object ActorModelSpec {
stats.get(actorRef)
}
abstract override def suspend(actorRef: LocalActorRef) {
super.suspend(actorRef)
getStats(actorRef).suspensions.incrementAndGet()
abstract override def suspend(actor: ActorInstance) {
super.suspend(actor)
getStats(actor.ref).suspensions.incrementAndGet()
}
abstract override def resume(actorRef: LocalActorRef) {
super.resume(actorRef)
getStats(actorRef).resumes.incrementAndGet()
abstract override def resume(actor: ActorInstance) {
super.resume(actor)
getStats(actor.ref).resumes.incrementAndGet()
}
protected[akka] abstract override def register(actorRef: LocalActorRef) {
super.register(actorRef)
getStats(actorRef).registers.incrementAndGet()
protected[akka] abstract override def register(actor: ActorInstance) {
super.register(actor)
getStats(actor.ref).registers.incrementAndGet()
}
protected[akka] abstract override def unregister(actorRef: LocalActorRef) {
super.unregister(actorRef)
getStats(actorRef).unregisters.incrementAndGet()
protected[akka] abstract override def unregister(actor: ActorInstance) {
super.unregister(actor)
getStats(actor.ref).unregisters.incrementAndGet()
}
protected[akka] abstract override def dispatch(invocation: MessageInvocation) {
getStats(invocation.receiver).msgsReceived.incrementAndGet()
getStats(invocation.receiver.ref).msgsReceived.incrementAndGet()
super.dispatch(invocation)
}
@ -342,12 +342,12 @@ abstract class ActorModelSpec extends JUnitSuite {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.asInstanceOf[LocalActorRef]
val done = new CountDownLatch(1)
dispatcher.suspend(a)
a.suspend
a ! CountDown(done)
assertNoCountDown(done, 1000, "Should not process messages while suspended")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
dispatcher.resume(a)
a.resume
assertCountDown(done, Testing.testTime(3000), "Should resume processing of messages when resumed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)
@ -379,11 +379,11 @@ abstract class ActorModelSpec extends JUnitSuite {
def dispatcherShouldCompleteAllUncompletedSenderFuturesOnDeregister {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor.asInstanceOf[LocalActorRef]
dispatcher.suspend(a)
a.suspend
val f1: Future[String] = a ? Reply("foo") mapTo manifest[String]
val stopped = a ? PoisonPill
val shouldBeCompleted = for (i 1 to 10) yield a ? Reply(i)
dispatcher.resume(a)
a.resume
assert(f1.get === "foo")
stopped.await
for (each shouldBeCompleted)

View file

@ -80,11 +80,11 @@ class BalancingDispatcherSpec extends JUnitSuite with MustMatchers {
}
finishedCounter.await(5, TimeUnit.SECONDS)
fast.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true)
slow.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true)
fast.actorInstance.get().asInstanceOf[DelayableActor].invocationCount must be > sentToFast
fast.actorInstance.get().asInstanceOf[DelayableActor].invocationCount must be >
(slow.actorInstance.get().asInstanceOf[DelayableActor].invocationCount)
fast.underlying.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true)
slow.underlying.mailbox.asInstanceOf[MessageQueue].isEmpty must be(true)
fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be > sentToFast
fast.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount must be >
(slow.underlyingActorInstance.asInstanceOf[DelayableActor].invocationCount)
slow.stop()
fast.stop()
}

View file

@ -84,7 +84,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte
new MessageInvocation(
actorOf(new Actor { //Dummy actor
def receive = { case _ }
}).asInstanceOf[LocalActorRef], msg, NullChannel)
}).asInstanceOf[LocalActorRef].underlying, msg, NullChannel)
}
def ensureInitialMailboxState(config: MailboxType, q: MessageQueue) {

View file

@ -35,12 +35,12 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers {
}
}).withDispatcher(dispatcher)).asInstanceOf[LocalActorRef]
dispatcher.suspend(actor) //Make sure the actor isn't treating any messages, let it buffer the incoming messages
actor.suspend //Make sure the actor isn't treating any messages, let it buffer the incoming messages
val msgs = (1 to 100).toList
for (m msgs) actor ! m
dispatcher.resume(actor) //Signal the actor to start treating it's message backlog
actor.resume //Signal the actor to start treating it's message backlog
actor.?('Result).as[List[Int]].get must be === (msgs.reverse)
}

View file

@ -19,7 +19,6 @@ import akka.event.EventHandler
import akka.experimental
import akka.AkkaException
import scala.collection.immutable.Stack
import scala.reflect.BeanProperty
import com.eaio.uuid.UUID
@ -174,10 +173,6 @@ object Actor {
*/
type Receive = PartialFunction[Any, Unit]
private[actor] val actorRefInCreation = new ThreadLocal[Stack[ScalaActorRef with SelfActorRef]] {
override def initialValue = Stack[ScalaActorRef with SelfActorRef]()
}
private[akka] val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
private[akka] val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
@ -467,7 +462,7 @@ trait Actor {
*/
@transient
val someSelf: Some[ScalaActorRef with SelfActorRef] = {
val refStack = Actor.actorRefInCreation.get
val refStack = ActorInstance.refStack.get
if (refStack.isEmpty) throw new ActorInitializationException(
"\n\tYou can not create an instance of an " + getClass.getName + " explicitly using 'new MyActor'." +
"\n\tYou have to use one of the factory methods in the 'Actor' object to create a new actor." +
@ -481,7 +476,7 @@ trait Actor {
throw new ActorInitializationException("Trying to create an instance of " + getClass.getName + " outside of a wrapping 'actorOf'")
else {
// Push a null marker so any subsequent calls to new Actor doesn't reuse this actor ref
Actor.actorRefInCreation.set(refStack.push(null))
ActorInstance.refStack.set(refStack.push(null))
Some(ref)
}
}

View file

@ -0,0 +1,433 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.event.EventHandler
import akka.config.Supervision._
import akka.dispatch._
import akka.util._
import java.util.{ Collection JCollection }
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable.Stack
private[akka] object ActorInstance {
sealed trait Status
object Status {
object Unstarted extends Status
object Running extends Status
object BeingRestarted extends Status
object Shutdown extends Status
}
val refStack = new ThreadLocal[Stack[ScalaActorRef with SelfActorRef]] {
override def initialValue = Stack[ScalaActorRef with SelfActorRef]()
}
}
private[akka] class ActorInstance(props: Props, self: LocalActorRef) {
import ActorInstance._
val guard = new ReentrantGuard // TODO: remove this last synchronization point
@volatile
var status: Status = Status.Unstarted
@volatile
var mailbox: AnyRef = _
@volatile
var futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@volatile
var _supervisor: Option[ActorRef] = None
@volatile
var maxNrOfRetriesCount: Int = 0
@volatile
var restartTimeWindowStartNanos: Long = 0L
@volatile
lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef]
val actor: AtomicReference[Actor] = new AtomicReference[Actor]()
def ref: ActorRef = self
def uuid: Uuid = self.uuid
def actorClass: Class[_] = actor.get.getClass
def dispatcher: MessageDispatcher = props.dispatcher
def isRunning: Boolean = status match {
case Status.BeingRestarted | Status.Running true
case _ false
}
def isShutdown: Boolean = status == Status.Shutdown
def start(): Unit = guard.withGuard {
if (isShutdown) throw new ActorStartException("Can't start an actor that has been stopped")
if (!isRunning) {
if (props.supervisor.isDefined) props.supervisor.get.link(self)
actor.set(newActor)
dispatcher.attach(this)
status = Status.Running
try {
val a = actor.get
if (Actor.debugLifecycle) EventHandler.debug(a, "started")
a.preStart()
Actor.registry.register(self)
checkReceiveTimeout // schedule the initial receive timeout
} catch {
case e
status = Status.Unstarted
throw e
}
}
}
def newActor: Actor = {
val stackBefore = refStack.get
refStack.set(stackBefore.push(self))
try {
if (status == Status.BeingRestarted) {
val a = actor.get()
val fresh = try a.freshInstance catch {
case e
EventHandler.error(e, a, "freshInstance() failed, falling back to initial actor factory")
None
}
fresh match {
case Some(actor) actor
case None props.creator()
}
} else {
props.creator()
}
} finally {
val stackAfter = refStack.get
if (stackAfter.nonEmpty)
refStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) //pop null marker plus self
}
} match {
case null throw new ActorInitializationException("Actor instance passed to actorOf can't be 'null'")
case valid valid
}
def suspend(): Unit = dispatcher.suspend(this)
def resume(): Unit = dispatcher.resume(this)
def stop(): Unit = guard.withGuard {
if (isRunning) {
self.receiveTimeout = None
cancelReceiveTimeout
Actor.registry.unregister(self)
status = Status.Shutdown
dispatcher.detach(this)
try {
val a = actor.get
if (Actor.debugLifecycle) EventHandler.debug(a, "stopping")
a.postStop()
stopSupervisedActors()
} finally {
self.currentMessage = null
setActorSelf(null)
}
}
}
def stopSupervisedActors(): Unit = guard.withGuard {
val i = _linkedActors.values.iterator
while (i.hasNext) {
i.next.stop()
i.remove()
}
}
def link(actorRef: ActorRef): ActorRef = {
guard.withGuard {
val actorRefSupervisor = actorRef.supervisor
val hasSupervisorAlready = actorRefSupervisor.isDefined
if (hasSupervisorAlready && actorRefSupervisor.get.uuid == self.uuid) return actorRef // we already supervise this guy
else if (hasSupervisorAlready) throw new IllegalActorStateException(
"Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
else {
_linkedActors.put(actorRef.uuid, actorRef)
actorRef.supervisor = Some(self)
}
}
if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "now supervising " + actorRef)
actorRef
}
def unlink(actorRef: ActorRef): ActorRef = {
guard.withGuard {
if (_linkedActors.remove(actorRef.uuid) eq null)
throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink")
actorRef.supervisor = None
if (Actor.debugLifecycle) EventHandler.debug(actor.get(), "stopped supervising " + actorRef)
}
actorRef
}
def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors.values)
def supervisor: Option[ActorRef] = _supervisor
def supervisor_=(sup: Option[ActorRef]): Unit = _supervisor = sup
def sender: Option[ActorRef] = {
val msg = self.currentMessage
if (msg eq null) None
else msg.channel match {
case ref: ActorRef Some(ref)
case _ None
}
}
def senderFuture(): Option[Promise[Any]] = {
val msg = self.currentMessage
if (msg eq null) None
else msg.channel match {
case f: ActorPromise Some(f)
case _ None
}
}
def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit =
if (isRunning) dispatcher dispatchMessage new MessageInvocation(this, message, channel)
else throw new ActorInitializationException("Actor has not been started")
def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Timeout,
channel: UntypedChannel): Future[Any] = if (isRunning) {
val future = channel match {
case f: ActorPromise f
case _ new ActorPromise(timeout)(dispatcher)
}
dispatcher dispatchMessage new MessageInvocation(this, message, future)
future
} else throw new ActorInitializationException("Actor has not been started")
def invoke(messageHandle: MessageInvocation): Unit = {
guard.lock.lock()
try {
if (!isShutdown) {
self.currentMessage = messageHandle
try {
try {
cancelReceiveTimeout() // FIXME: leave this here?
actor.get().apply(messageHandle.message)
self.currentMessage = null // reset current message after successful invocation
} catch {
case e
EventHandler.error(e, self, e.getMessage)
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
self.channel.sendException(e)
if (supervisor.isDefined) supervisor.get ! Death(self, e, true) else dispatcher.resume(this)
if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
} catch {
case e
EventHandler.error(e, actor.get(), e.getMessage)
throw e
}
} else {
// throwing away message if actor is shut down, no use throwing an exception in receiving actor's thread, isShutdown is enforced on caller side
}
} finally {
guard.lock.unlock()
}
}
def handleDeath(death: Death) {
props.faultHandler match {
case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass))
restartLinkedActors(death.cause, maxRetries, within)
case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass))
restartLinkedActors(death.cause, None, None)
case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass))
death.deceased.restart(death.cause, maxRetries, within)
case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass))
unlink(death.deceased)
death.deceased.stop()
self ! MaximumNumberOfRestartsWithinTimeRangeReached(death.deceased, None, None, death.cause)
case _
if (_supervisor.isDefined) throw death.cause else death.deceased.stop() //Escalate problem if not handled here
}
}
def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
def performRestart() {
val failedActor = actor.get
if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting")
val message = if (self.currentMessage ne null) Some(self.currentMessage.message) else None
failedActor.preRestart(reason, message)
val freshActor = newActor
setActorSelf(null) // only null out the references if we could instantiate the new actor
actor.set(freshActor) // assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.postRestart(reason)
if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted")
}
@tailrec
def attemptRestart() {
val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) {
guard.withGuard[Boolean] {
status = Status.BeingRestarted
val success =
try {
performRestart()
true
} catch {
case e
EventHandler.error(e, self, "Exception in restart of Actor [%s]".format(toString))
false // an error or exception here should trigger a retry
} finally {
self.currentMessage = null
}
if (success) {
status = Status.Running
dispatcher.resume(this)
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
}
success
}
} else {
// tooManyRestarts
if (supervisor.isDefined)
supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(self, maxNrOfRetries, withinTimeRange, reason)
stop()
true // done
}
if (success) () // alles gut
else attemptRestart()
}
attemptRestart() // recur
}
def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = {
val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) {
// immortal
false
} else if (withinTimeRange.isEmpty) {
// restrict number of restarts
val retries = maxNrOfRetriesCount + 1
maxNrOfRetriesCount = retries //Increment number of retries
retries > maxNrOfRetries.get
} else {
// cannot restart more than N within M timerange
val retries = maxNrOfRetriesCount + 1
val windowStart = restartTimeWindowStartNanos
val now = System.nanoTime
// we are within the time window if it isn't the first restart, or if the window hasn't closed
val insideWindow = if (windowStart == 0) true else (now - windowStart) <= TimeUnit.MILLISECONDS.toNanos(withinTimeRange.get)
if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window
restartTimeWindowStartNanos = now
// reset number of restarts if window has expired, otherwise, increment it
maxNrOfRetriesCount = if (windowStart != 0 && !insideWindow) 1 else retries // increment number of retries
val restartCountLimit = if (maxNrOfRetries.isDefined) maxNrOfRetries.get else 1
// the actor is dead if it dies X times within the window of restart
insideWindow && retries > restartCountLimit
}
denied == false // if we weren't denied, we have a go
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = {
props.faultHandler.lifeCycle match {
case Temporary
val i = _linkedActors.values.iterator
while (i.hasNext) {
val actorRef = i.next()
i.remove()
actorRef.stop()
// when this comes down through the handleDeath path, we get here when the temp actor is restarted
if (supervisor.isDefined) {
supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(actorRef, Some(0), None, reason)
//FIXME if last temporary actor is gone, then unlink me from supervisor <-- should this exist?
if (!i.hasNext)
supervisor.get ! UnlinkAndStop(self)
}
}
case Permanent
val i = _linkedActors.values.iterator
while (i.hasNext) i.next().restart(reason, maxNrOfRetries, withinTimeRange)
}
}
def checkReceiveTimeout() {
cancelReceiveTimeout()
val recvtimeout = self.receiveTimeout
if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) {
//Only reschedule if desired and there are currently no more messages to be processed
futureTimeout = Some(Scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS))
}
}
def cancelReceiveTimeout() {
if (futureTimeout.isDefined) {
futureTimeout.get.cancel(true)
futureTimeout = None
}
}
def setActorSelf(value: ActorRef): Unit = {
@tailrec
def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, value: ActorRef): Boolean = {
val success = try {
val selfField = clazz.getDeclaredField("self")
val someSelfField = clazz.getDeclaredField("someSelf")
selfField.setAccessible(true)
someSelfField.setAccessible(true)
selfField.set(actor, value)
someSelfField.set(actor, if (value ne null) Some(value) else null)
true
} catch {
case e: NoSuchFieldException false
}
if (success) true
else {
val parent = clazz.getSuperclass
if (parent eq null)
throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait")
lookupAndSetSelfFields(parent, actor, value)
}
}
lookupAndSetSelfFields(actor.get.getClass, actor.get, value)
}
}

View file

@ -4,169 +4,16 @@
package akka.actor
import akka.event.EventHandler
import akka.dispatch._
import akka.config._
import akka.config.Supervision._
import akka.util._
import akka.serialization.{ Serializer, Serialization }
import ReflectiveAccess._
import ClusterModule._
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ ScheduledFuture, ConcurrentHashMap, TimeUnit }
import java.util.{ Collection JCollection }
import scala.collection.immutable.Stack
import scala.annotation.tailrec
import java.lang.{ UnsupportedOperationException, IllegalStateException }
import akka.japi.Creator
private[akka] object ActorRefInternals {
/**
* LifeCycles for ActorRefs.
*/
private[akka] sealed trait StatusType
object UNSTARTED extends StatusType
object RUNNING extends StatusType
object BEING_RESTARTED extends StatusType
object SHUTDOWN extends StatusType
}
/**
* ActorRef configuration object, this is threadsafe and fully sharable
*
* Props() returns default configuration
* FIXME document me
*/
object Props {
final val defaultCreator: () Actor = () throw new UnsupportedOperationException("No actor creator specified!")
final val defaultDeployId: String = ""
final val defaultDispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
final val defaultTimeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis"))
final val defaultFaultHandler: FaultHandlingStrategy = AllForOnePermanentStrategy(classOf[Exception] :: Nil, None, None)
final val defaultSupervisor: Option[ActorRef] = None
/**
* The default Props instance, uses the settings from the Props object starting with default*
*/
final val default = new Props()
/**
* Returns a cached default implementation of Props
*/
def apply(): Props = default
/**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance
* of the supplied type using the default constructor
*/
def apply[T <: Actor: ClassManifest]: Props =
default.withCreator(implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[_ <: Actor]].newInstance)
/**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance
* of the supplied class using the default constructor
*/
def apply(actorClass: Class[_ <: Actor]): Props =
default.withCreator(actorClass.newInstance)
/**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance
* using the supplied thunk
*/
def apply(creator: Actor): Props = default.withCreator(creator)
/**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance
* using the supplied thunk
*/
def apply(creator: Creator[_ <: Actor]): Props = default.withCreator(creator.create)
def apply(behavior: (ScalaActorRef with SelfActorRef) Actor.Receive): Props = apply(new Actor { def receive = behavior(self) })
}
/**
* ActorRef configuration object, this is thread safe and fully sharable
*/
case class Props(creator: () Actor = Props.defaultCreator,
deployId: String = Props.defaultDeployId,
@transient dispatcher: MessageDispatcher = Props.defaultDispatcher,
timeout: Timeout = Props.defaultTimeout,
faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler,
supervisor: Option[ActorRef] = Props.defaultSupervisor) {
/**
* No-args constructor that sets all the default values
* Java API
*/
def this() = this(
creator = Props.defaultCreator,
deployId = Props.defaultDeployId,
dispatcher = Props.defaultDispatcher,
timeout = Props.defaultTimeout,
faultHandler = Props.defaultFaultHandler,
supervisor = Props.defaultSupervisor)
/**
* Returns a new Props with the specified creator set
* Scala API
*/
def withCreator(c: Actor) = copy(creator = () c)
/**
* Returns a new Props with the specified creator set
* Java API
*/
def withCreator(c: Creator[Actor]) = copy(creator = () c.create)
/**
* Returns a new Props with the specified deployId set
* Java and Scala API
*/
def withDeployId(id: String) = copy(deployId = if (id eq null) "" else id)
/**
* Returns a new Props with the specified dispatcher set
* Java API
*/
def withDispatcher(d: MessageDispatcher) = copy(dispatcher = d)
/**
* Returns a new Props with the specified timeout set
* Java API
*/
def withTimeout(t: Timeout) = copy(timeout = t)
/**
* Returns a new Props with the specified faulthandler set
* Java API
*/
def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f)
/**
* Returns a new Props with the specified supervisor set, if null, it's equivalent to withSupervisor(Option.none())
* Java API
*/
def withSupervisor(s: ActorRef) = copy(supervisor = Option(s))
/**
* Returns a new Props with the specified supervisor set
* Java API
*/
def withSupervisor(s: akka.japi.Option[ActorRef]) = copy(supervisor = s.asScala)
/**
* Returns a new Props with the specified supervisor set
* Scala API
*/
def withSupervisor(s: scala.Option[ActorRef]) = copy(supervisor = s)
}
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -204,17 +51,8 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
private[akka] val uuid = newUuid
@volatile
protected[this] var _status: ActorRefInternals.StatusType = ActorRefInternals.UNSTARTED
def address: String
/**
* This is a reference to the message currently being processed by the actor
*/
@volatile
protected[akka] var currentMessage: MessageInvocation = null
/**
* Comparison only takes address into account.
*/
@ -266,13 +104,21 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
else forward(message)(sender)
}
/**
* Suspends the actor. It will not process messages while suspended.
*/
def suspend(): Unit
/**
* Resumes a suspended actor.
*/
def resume(): Unit
/**
* Shuts down the actor its dispatcher and message queue.
* Alias for 'stop'.
*/
def exit() {
stop()
}
def exit(): Unit = stop()
/**
* Shuts down the actor its dispatcher and message queue.
@ -282,15 +128,12 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
/**
* Is the actor running?
*/
def isRunning: Boolean = _status match { //TODO Remove this method
case ActorRefInternals.BEING_RESTARTED | ActorRefInternals.RUNNING true
case _ false
}
def isRunning: Boolean // TODO remove this method
/**
* Is the actor shut down?
*/
def isShutdown: Boolean = _status == ActorRefInternals.SHUTDOWN
def isShutdown: Boolean
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
@ -347,6 +190,12 @@ abstract class SelfActorRef extends ActorRef with ForwardableChannel { self: Loc
@volatile
protected[akka] var hotswap = Stack[PartialFunction[Any, Unit]]()
/**
* This is a reference to the message currently being processed by the actor
*/
@volatile
protected[akka] var currentMessage: MessageInvocation = null
/**
* User overridable callback/setting.
* <p/>
@ -480,65 +329,6 @@ class LocalActorRef private[akka] (
override private[akka] val uuid: Uuid = newUuid)
extends SelfActorRef with ScalaActorRef {
protected[akka] val guard = new ReentrantGuard //TODO FIXME remove the last synchronization point
@volatile
protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@volatile
private[akka] lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef]
@volatile
private[akka] var _supervisor: Option[ActorRef] = None
@volatile
private var maxNrOfRetriesCount: Int = 0
@volatile
private var restartTimeWindowStartNanos: Long = 0L
@volatile
protected[akka] var mailbox: AnyRef = _
protected[akka] val actorInstance = guard.withGuard {
new AtomicReference[Actor]({
if (props.supervisor.isDefined) props.supervisor.get.link(this)
newActor
})
} //TODO Why is the guard needed here?
protected[akka] override def timeout: Long = props.timeout.duration.toMillis //TODO remove this if possible
private def serializer: Serializer = //TODO Is this used or needed?
try { Serialization.serializerFor(this.getClass) } catch {
case e: Exception throw new akka.config.ConfigurationException(
"Could not create Serializer object for [" + this.getClass.getName + "]")
}
private lazy val hasReplicationStorage: Boolean = if (!systemService) {
import DeploymentConfig._
isReplicated(replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient))
} else false
private lazy val replicationStorage: Option[TransactionLog] = if (!systemService) {
import DeploymentConfig._
val replicationScheme = replicationSchemeFor(Deployer.deploymentFor(address)).getOrElse(Transient)
if (isReplicated(replicationScheme)) {
if (isReplicatedWithTransactionLog(replicationScheme)) {
EventHandler.debug(this, "Creating a transaction log for Actor [%s] with replication strategy [%s]".format(address, replicationScheme))
Some(transactionLog.newLogFor(uuid.toString, isWriteBehindReplication(replicationScheme), replicationScheme)) //TODO FIXME @jboner shouldn't this be address?
} else if (isReplicatedWithDataGrid(replicationScheme)) {
throw new ConfigurationException("Replication storage type \"data-grid\" is not yet supported")
} else {
throw new ConfigurationException("Unknown replication storage type [" + replicationScheme + "]")
}
} else None
} else None
// If it was started inside "newActor", initialize it
if (isRunning) initializeActorInstance
// used only for deserialization
private[akka] def this(
__uuid: Uuid,
@ -551,10 +341,22 @@ class LocalActorRef private[akka] (
hotswap = __hotswap
receiveTimeout = __receiveTimeout
setActorSelfFields(actorInstance.get(), this) //TODO Why is this needed?
actorInstance.setActorSelf(this) // TODO: why is this needed?
}
// ========= PUBLIC FUNCTIONS =========
private[this] val actorInstance = new ActorInstance(props, this)
actorInstance.start()
/**
* Is the actor running?
*/
def isRunning: Boolean = actorInstance.isRunning
/**
* Is the actor shut down?
*/
def isShutdown: Boolean = actorInstance.isShutdown
/**
* Returns the dispatcher (MessageDispatcher) that is used for this Actor
@ -562,61 +364,19 @@ class LocalActorRef private[akka] (
def dispatcher: MessageDispatcher = props.dispatcher
/**
* Starts up the actor and its message queue.
* Suspends the actor. It will not process messages while suspended.
*/
protected[akka] def startInternal(): this.type = guard.withGuard[this.type] {
if (isShutdown) throw new ActorStartException("Can't restart an actor that has been shut down with 'stop' or 'exit'")
if (!isRunning) {
dispatcher.attach(this)
_status = ActorRefInternals.RUNNING
try {
// If we are not currently creating this ActorRef instance
if ((actorInstance ne null) && (actorInstance.get ne null))
initializeActorInstance
checkReceiveTimeout //Schedule the initial Receive timeout
} catch {
case e
_status = ActorRefInternals.UNSTARTED
throw e
}
}
this
}
def suspend(): Unit = actorInstance.suspend()
/**
* Shuts down the actor its dispatcher and message queue.
* Resumes a suspended actor.
*/
def stop() {
guard.withGuard {
if (isRunning) {
receiveTimeout = None
cancelReceiveTimeout
Actor.registry.unregister(this)
def resume(): Unit = actorInstance.resume()
// This lines can trigger cluster start which makes cluster ZK client hang trying to reconnect indefinitely
//if (ClusterModule.isEnabled) Actor.remote.unregister(this)
_status = ActorRefInternals.SHUTDOWN
dispatcher.detach(this)
try {
val a = actorInstance.get()
if (Actor.debugLifecycle) EventHandler.debug(a, "stopping")
a.postStop()
} finally {
currentMessage = null
try { //When a supervisor is stopped, it's linked actors should also be stopped
val i = _linkedActors.values.iterator
while (i.hasNext) {
i.next.stop()
i.remove
}
} finally { setActorSelfFields(actorInstance.get, null) }
}
}
if (hasReplicationStorage) replicationStorage.get.delete() //TODO shouldn't this be inside the if (isRunning?)
}
}
/**
* Shuts down the actor and its message queue
*/
def stop(): Unit = actorInstance.stop()
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will
@ -629,21 +389,7 @@ class LocalActorRef private[akka] (
* To be invoked from within the actor itself.
* Returns the ref that was passed into it
*/
def link(actorRef: ActorRef): ActorRef = {
guard.withGuard {
val actorRefSupervisor = actorRef.supervisor
val hasSupervisorAlready = actorRefSupervisor.isDefined
if (hasSupervisorAlready && actorRefSupervisor.get.uuid == uuid) return actorRef // we already supervise this guy
else if (hasSupervisorAlready) throw new IllegalActorStateException(
"Actor can only have one supervisor [" + actorRef + "], e.g. link(actor) fails")
else {
_linkedActors.put(actorRef.uuid, actorRef)
actorRef.supervisor = Some(this)
}
}
if (Actor.debugLifecycle) EventHandler.debug(actorInstance.get(), "now supervising " + actorRef)
actorRef
}
def link(actorRef: ActorRef): ActorRef = actorInstance.link(actorRef)
/**
* Unlink the actor.
@ -651,50 +397,69 @@ class LocalActorRef private[akka] (
* To be invoked from within the actor itself.
* Returns the ref that was passed into it
*/
def unlink(actorRef: ActorRef): ActorRef = {
guard.withGuard {
if (_linkedActors.remove(actorRef.uuid) eq null)
throw new IllegalActorStateException("Actor [" + actorRef + "] is not a linked actor, can't unlink")
actorRef.supervisor = None
if (Actor.debugLifecycle) EventHandler.debug(actorInstance.get(), "stopped supervising " + actorRef)
}
actorRef
}
def unlink(actorRef: ActorRef): ActorRef = actorInstance.unlink(actorRef)
/**
* Returns an unmodifiable Java Collection containing the linked actors
*/
def linkedActors: JCollection[ActorRef] = actorInstance.linkedActors
/**
* Returns the supervisor, if there is one.
*/
def supervisor: Option[ActorRef] = actorInstance.supervisor
/**
* The reference sender Actor of the last received message.
* Is defined if the message was sent from another Actor, else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def sender: Option[ActorRef] = {
val msg = currentMessage
if (msg eq null) None
else msg.channel match {
case ref: ActorRef Some(ref)
case _ None
}
}
def sender: Option[ActorRef] = actorInstance.sender
/**
* The reference sender future of the last received message.
* Is defined if the message was sent with sent with '?'/'ask', else None.
*/
@deprecated("will be removed in 2.0, use channel instead", "1.2")
def senderFuture(): Option[Promise[Any]] = {
val msg = currentMessage
if (msg eq null) None
else msg.channel match {
case f: ActorPromise Some(f)
case _ None
}
}
/**
* Returns the supervisor, if there is one.
*/
def supervisor: Option[ActorRef] = _supervisor
def senderFuture(): Option[Promise[Any]] = actorInstance.senderFuture
// ========= AKKA PROTECTED FUNCTIONS =========
protected[akka] def actorClass: Class[_] = actorInstance.actorClass
protected[akka] def underlying: ActorInstance = actorInstance
protected[akka] def underlyingActorInstance: Actor = actorInstance.actor.get
protected[akka] override def timeout: Long = props.timeout.duration.toMillis // TODO: remove this if possible
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = {
actorInstance.supervisor = sup
}
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
actorInstance.postMessageToMailbox(message, channel)
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Timeout,
channel: UntypedChannel): Future[Any] = {
actorInstance.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel)
}
protected[akka] def handleDeath(death: Death): Unit = actorInstance.handleDeath(death)
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = {
actorInstance.restart(reason, maxNrOfRetries, withinTimeRange)
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = {
actorInstance.restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
}
// ========= PRIVATE FUNCTIONS =========
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = {
val inetaddr =
@ -702,286 +467,6 @@ class LocalActorRef private[akka] (
else ReflectiveAccess.RemoteModule.configDefaultAddress
SerializedActorRef(uuid, address, inetaddr.getAddress.getHostAddress, inetaddr.getPort, timeout)
}
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
_supervisor = sup
}
protected[akka] def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit =
if (isRunning) dispatcher dispatchMessage new MessageInvocation(this, message, channel)
else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor' before using it")
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Timeout,
channel: UntypedChannel): Future[Any] = if (isRunning) {
val future = channel match {
case f: ActorPromise f
case _ new ActorPromise(timeout)(dispatcher)
}
dispatcher dispatchMessage new MessageInvocation(this, message, future)
future
} else throw new ActorInitializationException("Actor has not been started, you need to invoke 'actor' before using it")
/**
* Callback for the dispatcher. This is the single entry point to the user Actor implementation.
*/
protected[akka] def invoke(messageHandle: MessageInvocation) {
guard.lock.lock()
try {
if (!isShutdown) {
currentMessage = messageHandle
try {
try {
cancelReceiveTimeout() // FIXME: leave this here?
actorInstance.get().apply(messageHandle.message)
currentMessage = null // reset current message after successful invocation
} catch {
case e
EventHandler.error(e, this, e.getMessage)
//Prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
channel.sendException(e)
if (supervisor.isDefined) supervisor.get ! Death(this, e, true) else dispatcher.resume(this)
if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
} catch {
case e
EventHandler.error(e, actorInstance.get(), e.getMessage)
throw e
}
} else {
// throwing away message if actor is shut down, no use throwing an exception in receiving actor's thread, isShutdown is enforced on caller side
}
} finally {
guard.lock.unlock()
if (hasReplicationStorage) replicationStorage.get.recordEntry(messageHandle, this)
}
}
protected[akka] def handleDeath(death: Death) {
props.faultHandler match {
case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass))
restartLinkedActors(death.cause, maxRetries, within)
case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass))
restartLinkedActors(death.cause, None, None)
case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass))
death.deceased.restart(death.cause, maxRetries, within)
case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass))
unlink(death.deceased)
death.deceased.stop()
this ! MaximumNumberOfRestartsWithinTimeRangeReached(death.deceased, None, None, death.cause)
case _
if (_supervisor.isDefined) throw death.cause else death.deceased.stop() //Escalate problem if not handled here
}
}
private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = {
val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) {
//Immortal
false
} else if (withinTimeRange.isEmpty) {
// restrict number of restarts
val retries = maxNrOfRetriesCount + 1
maxNrOfRetriesCount = retries //Increment number of retries
retries > maxNrOfRetries.get
} else {
// cannot restart more than N within M timerange
val retries = maxNrOfRetriesCount + 1
val windowStart = restartTimeWindowStartNanos
val now = System.nanoTime
//We are within the time window if it isn't the first restart, or if the window hasn't closed
val insideWindow = if (windowStart == 0) true else (now - windowStart) <= TimeUnit.MILLISECONDS.toNanos(withinTimeRange.get)
if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window
restartTimeWindowStartNanos = now
//Reset number of restarts if window has expired, otherwise, increment it
maxNrOfRetriesCount = if (windowStart != 0 && !insideWindow) 1 else retries //Increment number of retries
val restartCountLimit = if (maxNrOfRetries.isDefined) maxNrOfRetries.get else 1
//The actor is dead if it dies X times within the window of restart
insideWindow && retries > restartCountLimit
}
denied == false //If we weren't denied, we have a go
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
def performRestart() {
val failedActor = actorInstance.get
if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting")
val message = if (currentMessage ne null) Some(currentMessage.message) else None
failedActor.preRestart(reason, message)
val freshActor = newActor
setActorSelfFields(failedActor, null) // Only null out the references if we could instantiate the new actor
actorInstance.set(freshActor) // Assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.postRestart(reason)
if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted")
}
@tailrec
def attemptRestart() {
val success = if (requestRestartPermission(maxNrOfRetries, withinTimeRange)) {
guard.withGuard[Boolean] {
_status = ActorRefInternals.BEING_RESTARTED
val success =
try {
performRestart()
true
} catch {
case e
EventHandler.error(e, this, "Exception in restart of Actor [%s]".format(toString))
false // an error or exception here should trigger a retry
} finally {
currentMessage = null
}
if (success) {
_status = ActorRefInternals.RUNNING
dispatcher.resume(this)
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
}
success
}
} else {
// tooManyRestarts
if (supervisor.isDefined)
supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
stop()
true // done
}
if (success) () // alles gut
else attemptRestart()
}
attemptRestart() // recur
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) =
props.faultHandler.lifeCycle match {
case Temporary
val i = _linkedActors.values.iterator
while (i.hasNext) {
val actorRef = i.next()
i.remove()
actorRef.stop()
// when this comes down through the handleDeath path, we get here when the temp actor is restarted
if (supervisor.isDefined) {
supervisor.get ! MaximumNumberOfRestartsWithinTimeRangeReached(actorRef, Some(0), None, reason)
//FIXME if last temporary actor is gone, then unlink me from supervisor <-- should this exist?
if (!i.hasNext)
supervisor.get ! UnlinkAndStop(this)
}
}
case Permanent
val i = _linkedActors.values.iterator
while (i.hasNext) i.next().restart(reason, maxNrOfRetries, withinTimeRange)
}
def linkedActors: JCollection[ActorRef] = java.util.Collections.unmodifiableCollection(_linkedActors.values)
// ========= PRIVATE FUNCTIONS =========
private[this] def newActor: Actor = {
import Actor.{ actorRefInCreation refStack }
val stackBefore = refStack.get
refStack.set(stackBefore.push(this))
try {
if (_status == ActorRefInternals.BEING_RESTARTED) {
val a = actorInstance.get()
val fresh = try a.freshInstance catch {
case e
EventHandler.error(e, a, "freshInstance() failed, falling back to initial actor factory")
None
}
fresh match {
case Some(ref) ref
case None props.creator()
}
} else {
props.creator()
}
} finally {
val stackAfter = refStack.get
if (stackAfter.nonEmpty)
refStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) //pop null marker plus self
}
} match {
case null throw new ActorInitializationException("Actor instance passed to ActorRef can not be 'null'")
case valid valid
}
private def setActorSelfFields(actor: Actor, value: ActorRef) {
@tailrec
def lookupAndSetSelfFields(clazz: Class[_], actor: Actor, value: ActorRef): Boolean = {
val success = try {
val selfField = clazz.getDeclaredField("self")
val someSelfField = clazz.getDeclaredField("someSelf")
selfField.setAccessible(true)
someSelfField.setAccessible(true)
selfField.set(actor, value)
someSelfField.set(actor, if (value ne null) Some(value) else null)
true
} catch {
case e: NoSuchFieldException false
}
if (success) true
else {
val parent = clazz.getSuperclass
if (parent eq null)
throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait")
lookupAndSetSelfFields(parent, actor, value)
}
}
lookupAndSetSelfFields(actorInstance.get().getClass, actorInstance.get(), value)
}
private def initializeActorInstance() {
val a = actorInstance.get()
if (Actor.debugLifecycle) EventHandler.debug(a, "started")
a.preStart() // run actor preStart
Actor.registry.register(this)
}
protected[akka] def checkReceiveTimeout() {
cancelReceiveTimeout()
val recvtimeout = receiveTimeout
if (recvtimeout.isDefined && dispatcher.mailboxIsEmpty(this)) {
//Only reschedule if desired and there are currently no more messages to be processed
_futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, recvtimeout.get, TimeUnit.MILLISECONDS))
}
}
protected[akka] def cancelReceiveTimeout() {
if (_futureTimeout.isDefined) {
_futureTimeout.get.cancel(true)
_futureTimeout = None
}
}
startInternal()
}
/**
@ -1006,6 +491,13 @@ private[akka] case class RemoteActorRef private[akka] (
loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
@volatile
private var running: Boolean = true
def isRunning: Boolean = running
def isShutdown: Boolean = !running
ClusterModule.ensureEnabled()
protected[akka] override def timeout: Long = _timeout
@ -1028,10 +520,14 @@ private[akka] case class RemoteActorRef private[akka] (
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}
def suspend(): Unit = unsupported
def resume(): Unit = unsupported
def stop() {
synchronized {
if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN
if (running) {
running = false
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
}
}
@ -1057,10 +553,6 @@ private[akka] case class RemoteActorRef private[akka] (
}
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
/* If you start me up... */
if (_status == ActorRefInternals.UNSTARTED)
_status = ActorRefInternals.RUNNING
}
/**
@ -1156,6 +648,10 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef {
unsupported
}
def suspend(): Unit = unsupported
def resume(): Unit = unsupported
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
unsupported
}

View file

@ -200,7 +200,7 @@ trait IO {
if (!reinvoked && (_next eq Idle) && _messages.nonEmpty) {
try {
reinvoked = true
while ((_next eq Idle) && _messages.nonEmpty) self.asInstanceOf[LocalActorRef] invoke _messages.dequeue
while ((_next eq Idle) && _messages.nonEmpty) self.asInstanceOf[LocalActorRef].underlying invoke _messages.dequeue
} finally {
reinvoked = false
}

View file

@ -0,0 +1,139 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import akka.config.Supervision._
import akka.dispatch._
import akka.japi.Creator
import akka.util._
/**
* ActorRef configuration object, this is threadsafe and fully sharable
*
* Props() returns default configuration
* FIXME document me
*/
object Props {
final val defaultCreator: () Actor = () throw new UnsupportedOperationException("No actor creator specified!")
final val defaultDeployId: String = ""
final val defaultDispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher
final val defaultTimeout: Timeout = Timeout(Duration(Actor.TIMEOUT, "millis"))
final val defaultFaultHandler: FaultHandlingStrategy = AllForOnePermanentStrategy(classOf[Exception] :: Nil, None, None)
final val defaultSupervisor: Option[ActorRef] = None
/**
* The default Props instance, uses the settings from the Props object starting with default*
*/
final val default = new Props()
/**
* Returns a cached default implementation of Props
*/
def apply(): Props = default
/**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance
* of the supplied type using the default constructor
*/
def apply[T <: Actor: ClassManifest]: Props =
default.withCreator(implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[_ <: Actor]].newInstance)
/**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance
* of the supplied class using the default constructor
*/
def apply(actorClass: Class[_ <: Actor]): Props =
default.withCreator(actorClass.newInstance)
/**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance
* using the supplied thunk
*/
def apply(creator: Actor): Props = default.withCreator(creator)
/**
* Returns a Props that has default values except for "creator" which will be a function that creates an instance
* using the supplied thunk
*/
def apply(creator: Creator[_ <: Actor]): Props = default.withCreator(creator.create)
def apply(behavior: (ScalaActorRef with SelfActorRef) Actor.Receive): Props = apply(new Actor { def receive = behavior(self) })
}
/**
* ActorRef configuration object, this is thread safe and fully sharable
*/
case class Props(creator: () Actor = Props.defaultCreator,
deployId: String = Props.defaultDeployId,
@transient dispatcher: MessageDispatcher = Props.defaultDispatcher,
timeout: Timeout = Props.defaultTimeout,
faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler,
supervisor: Option[ActorRef] = Props.defaultSupervisor) {
/**
* No-args constructor that sets all the default values
* Java API
*/
def this() = this(
creator = Props.defaultCreator,
deployId = Props.defaultDeployId,
dispatcher = Props.defaultDispatcher,
timeout = Props.defaultTimeout,
faultHandler = Props.defaultFaultHandler,
supervisor = Props.defaultSupervisor)
/**
* Returns a new Props with the specified creator set
* Scala API
*/
def withCreator(c: Actor) = copy(creator = () c)
/**
* Returns a new Props with the specified creator set
* Java API
*/
def withCreator(c: Creator[Actor]) = copy(creator = () c.create)
/**
* Returns a new Props with the specified deployId set
* Java and Scala API
*/
def withDeployId(id: String) = copy(deployId = if (id eq null) "" else id)
/**
* Returns a new Props with the specified dispatcher set
* Java API
*/
def withDispatcher(d: MessageDispatcher) = copy(dispatcher = d)
/**
* Returns a new Props with the specified timeout set
* Java API
*/
def withTimeout(t: Timeout) = copy(timeout = t)
/**
* Returns a new Props with the specified faulthandler set
* Java API
*/
def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f)
/**
* Returns a new Props with the specified supervisor set, if null, it's equivalent to withSupervisor(Option.none())
* Java API
*/
def withSupervisor(s: ActorRef) = copy(supervisor = Option(s))
/**
* Returns a new Props with the specified supervisor set
* Java API
*/
def withSupervisor(s: akka.japi.Option[ActorRef]) = copy(supervisor = s.asScala)
/**
* Returns a new Props with the specified supervisor set
* Scala API
*/
def withSupervisor(s: scala.Option[ActorRef]) = copy(supervisor = s)
}

View file

@ -5,7 +5,7 @@
package akka.dispatch
import util.DynamicVariable
import akka.actor.{ LocalActorRef, ActorRef, Actor, IllegalActorStateException }
import akka.actor.{ ActorInstance, Actor, IllegalActorStateException }
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -52,27 +52,27 @@ class BalancingDispatcher(
@volatile
private var actorType: Option[Class[_]] = None
@volatile
private var members = Vector[LocalActorRef]()
private var members = Vector[ActorInstance]()
private val donationInProgress = new DynamicVariable(false)
protected[akka] override def register(actorRef: LocalActorRef) = {
protected[akka] override def register(actor: ActorInstance) = {
//Verify actor type conformity
actorType match {
case None actorType = Some(actorRef.actorInstance.get().getClass)
case None actorType = Some(actor.actorClass)
case Some(aType)
if (aType != actorRef.actorInstance.get().getClass)
if (aType != actor.actorClass)
throw new IllegalActorStateException(String.format(
"Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
actorRef, aType))
actor, aType))
}
members :+= actorRef //Update members, doesn't need synchronized, is guarded in attach
super.register(actorRef)
members :+= actor //Update members, doesn't need synchronized, is guarded in attach
super.register(actor)
}
protected[akka] override def unregister(actorRef: LocalActorRef) = {
members = members.filterNot(actorRef eq) //Update members, doesn't need synchronized, is guarded in detach
super.unregister(actorRef)
protected[akka] override def unregister(actor: ActorInstance) = {
members = members.filterNot(actor eq) //Update members, doesn't need synchronized, is guarded in detach
super.unregister(actor)
}
override protected[akka] def dispatch(invocation: MessageInvocation) = {
@ -126,7 +126,7 @@ class BalancingDispatcher(
* Rewrites the message and adds that message to the recipients mailbox
* returns true if the message is non-null
*/
protected def donate(organ: MessageInvocation, recipient: ActorRef): Boolean = {
protected def donate(organ: MessageInvocation, recipient: ActorInstance): Boolean = {
if (organ ne null) {
recipient.postMessageToMailbox(organ.message, organ.channel)
true
@ -136,10 +136,10 @@ class BalancingDispatcher(
/**
* Returns an available recipient for the message, if any
*/
protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[LocalActorRef], startIndex: Int): ActorRef = {
protected def doFindDonorRecipient(donorMbox: MessageQueue with ExecutableMailbox, potentialRecipients: Vector[ActorInstance], startIndex: Int): ActorInstance = {
val prSz = potentialRecipients.size
var i = 0
var recipient: ActorRef = null
var recipient: ActorInstance = null
while ((i < prSz) && (recipient eq null)) {
val actor = potentialRecipients((i + startIndex) % prSz) //Wrap-around, one full lap

View file

@ -7,7 +7,7 @@ package akka.dispatch
import akka.event.EventHandler
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue }
import akka.actor.{ LocalActorRef, ActorKilledException, ActorRef }
import akka.actor.{ ActorInstance, ActorKilledException }
/**
* Default settings are:
@ -108,13 +108,13 @@ class Dispatcher(
/**
* @return the mailbox associated with the actor
*/
protected def getMailbox(receiver: LocalActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
protected def getMailbox(receiver: ActorInstance) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
override def mailboxIsEmpty(actorRef: LocalActorRef): Boolean = getMailbox(actorRef).isEmpty
override def mailboxIsEmpty(actor: ActorInstance): Boolean = getMailbox(actor).isEmpty
override def mailboxSize(actorRef: LocalActorRef): Int = getMailbox(actorRef).size
override def mailboxSize(actor: ActorInstance): Int = getMailbox(actor).size
def createMailbox(actorRef: LocalActorRef): AnyRef = mailboxType match {
def createMailbox(actor: ActorInstance): AnyRef = mailboxType match {
case b: UnboundedMailbox
new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox {
@inline
@ -160,8 +160,8 @@ class Dispatcher(
protected[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox)
protected override def cleanUpMailboxFor(actorRef: LocalActorRef) {
val m = getMailbox(actorRef)
protected override def cleanUpMailboxFor(actor: ActorInstance) {
val m = getMailbox(actor)
if (!m.isEmpty) {
var invocation = m.dequeue
lazy val exception = new ActorKilledException("Actor has been stopped")
@ -174,11 +174,11 @@ class Dispatcher(
override val toString = getClass.getSimpleName + "[" + name + "]"
def suspend(actorRef: LocalActorRef): Unit =
getMailbox(actorRef).suspended.tryLock
def suspend(actor: ActorInstance): Unit =
getMailbox(actor).suspended.tryLock
def resume(actorRef: LocalActorRef): Unit = {
val mbox = getMailbox(actorRef)
def resume(actor: ActorInstance): Unit = {
val mbox = getMailbox(actor)
mbox.suspended.tryUnlock
reRegisterForExecution(mbox)
}
@ -296,7 +296,7 @@ class PriorityDispatcher(
trait PriorityMailbox { self: Dispatcher
def comparator: java.util.Comparator[MessageInvocation]
override def createMailbox(actorRef: LocalActorRef): AnyRef = self.mailboxType match {
override def createMailbox(actor: ActorInstance): AnyRef = self.mailboxType match {
case b: UnboundedMailbox
new UnboundedPriorityMessageQueue(comparator) with ExecutableMailbox {
@inline

View file

@ -4,7 +4,7 @@
package akka.dispatch
import akka.actor.ActorRef
import akka.actor.LocalActorRef
import akka.actor.newUuid
import akka.config.Config._
import akka.util.{ Duration, ReflectiveAccess }
@ -65,9 +65,9 @@ object Dispatchers {
* <p/>
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(actor: ActorRef) = actor match {
def newPinnedDispatcher(actor: LocalActorRef) = actor match {
case null new PinnedDispatcher()
case some new PinnedDispatcher(some)
case some new PinnedDispatcher(some.underlying)
}
/**
@ -76,9 +76,9 @@ object Dispatchers {
* <p/>
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(actor: ActorRef, mailboxType: MailboxType) = actor match {
def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match {
case null new PinnedDispatcher(mailboxType)
case some new PinnedDispatcher(some, mailboxType)
case some new PinnedDispatcher(some.underlying, mailboxType)
}
/**

View file

@ -16,7 +16,7 @@ import akka.actor._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final case class MessageInvocation(val receiver: LocalActorRef,
final case class MessageInvocation(val receiver: ActorInstance,
val message: Any,
val channel: UntypedChannel) {
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
@ -68,7 +68,7 @@ abstract class MessageDispatcher extends Serializable {
/**
* Creates and returns a mailbox for the given actor.
*/
protected[akka] def createMailbox(actorRef: LocalActorRef): AnyRef
protected[akka] def createMailbox(actor: ActorInstance): AnyRef
/**
* Name of this dispatcher.
@ -76,20 +76,20 @@ abstract class MessageDispatcher extends Serializable {
def name: String
/**
* Attaches the specified actorRef to this dispatcher
* Attaches the specified actor instance to this dispatcher
*/
final def attach(actorRef: LocalActorRef) {
final def attach(actor: ActorInstance) {
guard withGuard {
register(actorRef)
register(actor)
}
}
/**
* Detaches the specified actorRef from this dispatcher
* Detaches the specified actor instance from this dispatcher
*/
final def detach(actorRef: LocalActorRef) {
final def detach(actor: ActorInstance) {
guard withGuard {
unregister(actorRef)
unregister(actor)
}
}
@ -132,11 +132,11 @@ abstract class MessageDispatcher extends Serializable {
* Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER,
* and only call it under the dispatcher-guard, see "attach" for the only invocation
*/
protected[akka] def register(actorRef: LocalActorRef) {
if (actorRef.mailbox eq null)
actorRef.mailbox = createMailbox(actorRef)
protected[akka] def register(actor: ActorInstance) {
if (actor.mailbox eq null)
actor.mailbox = createMailbox(actor)
uuids add actorRef.uuid
uuids add actor.uuid
if (active.isOff) {
active.switchOn {
start()
@ -148,10 +148,10 @@ abstract class MessageDispatcher extends Serializable {
* Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER,
* and only call it under the dispatcher-guard, see "detach" for the only invocation
*/
protected[akka] def unregister(actorRef: LocalActorRef) = {
if (uuids remove actorRef.uuid) {
cleanUpMailboxFor(actorRef)
actorRef.mailbox = null
protected[akka] def unregister(actor: ActorInstance) = {
if (uuids remove actor.uuid) {
cleanUpMailboxFor(actor)
actor.mailbox = null
if (uuids.isEmpty && _tasks.get == 0) {
shutdownSchedule match {
case UNSCHEDULED
@ -169,7 +169,7 @@ abstract class MessageDispatcher extends Serializable {
* Overridable callback to clean up the mailbox for a given actor,
* called when an actor is unregistered.
*/
protected def cleanUpMailboxFor(actorRef: LocalActorRef) {}
protected def cleanUpMailboxFor(actor: ActorInstance) {}
/**
* Traverses the list of actors (uuids) currently being attached to this dispatcher and stops those actors
@ -214,12 +214,12 @@ abstract class MessageDispatcher extends Serializable {
/**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
*/
def suspend(actorRef: LocalActorRef)
def suspend(actor: ActorInstance)
/*
* After the call to this method, the dispatcher must begin any new message processing for the specified reference
*/
def resume(actorRef: LocalActorRef)
def resume(actor: ActorInstance)
/**
* Will be called when the dispatcher is to queue an invocation for execution
@ -241,12 +241,12 @@ abstract class MessageDispatcher extends Serializable {
/**
* Returns the size of the mailbox for the specified actor
*/
def mailboxSize(actorRef: LocalActorRef): Int
def mailboxSize(actor: ActorInstance): Int
/**
* Returns the "current" emptiness status of the mailbox for the specified actor
*/
def mailboxIsEmpty(actorRef: LocalActorRef): Boolean
def mailboxIsEmpty(actor: ActorInstance): Boolean
/**
* Returns the amount of tasks queued for execution

View file

@ -5,44 +5,44 @@
package akka.dispatch
import java.util.concurrent.atomic.AtomicReference
import akka.actor.{ LocalActorRef, ActorRef }
import akka.actor.ActorInstance
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class PinnedDispatcher(_actor: ActorRef, _name: String, _mailboxType: MailboxType)
class PinnedDispatcher(_actor: ActorInstance, _name: String, _mailboxType: MailboxType)
extends Dispatcher(
_name, Dispatchers.THROUGHPUT, -1, _mailboxType, PinnedDispatcher.oneThread) {
def this(_name: String, _mailboxType: MailboxType) = this(null, _name, _mailboxType)
def this(_actor: ActorRef, _name: String) = this(_actor, _name, Dispatchers.MAILBOX_TYPE)
def this(_actor: ActorInstance, _name: String) = this(_actor, _name, Dispatchers.MAILBOX_TYPE)
def this(_name: String) = this(null, _name, Dispatchers.MAILBOX_TYPE)
def this(_mailboxType: MailboxType) = this(null, "anon", _mailboxType)
def this(_actor: ActorRef, _mailboxType: MailboxType) = this(_actor, _actor.uuid.toString, _mailboxType)
def this(_actor: ActorInstance, _mailboxType: MailboxType) = this(_actor, _actor.uuid.toString, _mailboxType)
def this(_actor: ActorRef) = this(_actor, _actor.uuid.toString, Dispatchers.MAILBOX_TYPE)
def this(_actor: ActorInstance) = this(_actor, _actor.uuid.toString, Dispatchers.MAILBOX_TYPE)
def this() = this(Dispatchers.MAILBOX_TYPE)
protected[akka] val owner = new AtomicReference[ActorRef](_actor)
protected[akka] val owner = new AtomicReference[ActorInstance](_actor)
//Relies on an external lock provided by MessageDispatcher.attach
protected[akka] override def register(actorRef: LocalActorRef) = {
protected[akka] override def register(actorInstance: ActorInstance) = {
val actor = owner.get()
if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
owner.compareAndSet(null, actorRef) //Register if unregistered
super.register(actorRef)
if ((actor ne null) && actorInstance != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
owner.compareAndSet(null, actorInstance) //Register if unregistered
super.register(actorInstance)
}
//Relies on an external lock provided by MessageDispatcher.detach
protected[akka] override def unregister(actorRef: LocalActorRef) = {
super.unregister(actorRef)
owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak)
protected[akka] override def unregister(actor: ActorInstance) = {
super.unregister(actor)
owner.compareAndSet(actor, null) //Unregister (prevent memory leak)
}
}

View file

@ -152,7 +152,7 @@ trait SmallestMailboxSelector {
var take = if (partialFill) math.min(selectionCount, delegates.length) else selectionCount
def mailboxSize(a: ActorRef): Int = a match {
case l: LocalActorRef l.dispatcher.mailboxSize(l)
case l: LocalActorRef l.dispatcher.mailboxSize(l.underlying)
case _ Int.MaxValue //Non-local actors mailbox size is unknown, so consider them lowest priority
}
@ -238,7 +238,7 @@ trait MailboxPressureCapacitor {
def pressureThreshold: Int
def pressure(delegates: Seq[ActorRef]): Int =
delegates count {
case a: LocalActorRef a.dispatcher.mailboxSize(a) > pressureThreshold
case a: LocalActorRef a.dispatcher.mailboxSize(a.underlying) > pressureThreshold
case _ false
}
}

View file

@ -227,20 +227,23 @@ abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) exte
*/
private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends AbstractRoutedActorRef(routedProps) {
router.init(new RemoveConnectionOnFirstFailureLocalFailureDetector(routedProps.connections))
@volatile
private var running: Boolean = true
def isRunning: Boolean = running
def isShutdown: Boolean = !running
def stop() {
synchronized {
if (_status == ActorRefInternals.RUNNING) {
_status = ActorRefInternals.SHUTDOWN
if (running) {
running = false
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
}
}
}
/*If you start me up*/
if (_status == ActorRefInternals.UNSTARTED)
_status = ActorRefInternals.RUNNING
router.init(new RemoveConnectionOnFirstFailureLocalFailureDetector(routedProps.connections))
}
/**

View file

@ -44,7 +44,7 @@ private[camel] object TypedConsumer {
private def withConsumeAnnotatedMethodsonImplClass[T](tc: AnyRef, actorRef: ActorRef, f: (AnyRef, Method) T): List[T] = actorRef match {
case l: LocalActorRef
val implClass = l.actorInstance.get().asInstanceOf[TypedActor.TypedActor[AnyRef, AnyRef]].me.getClass
val implClass = l.underlyingActorInstance.asInstanceOf[TypedActor.TypedActor[AnyRef, AnyRef]].me.getClass
for (m implClass.getDeclaredMethods.toList; if (m.isAnnotationPresent(classOf[consume]))) yield f(tc, m)
case _ Nil
}

View file

@ -141,7 +141,7 @@ private[camel] object Consumer {
*/
def withConsumer[T](actorRef: ActorRef)(f: Consumer T): Option[T] = actorRef match {
case l: LocalActorRef
l.actorInstance.get() match {
l.underlyingActorInstance match {
case c: Consumer Some(f(c))
case _ None
}

View file

@ -265,14 +265,24 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
val address = exchange.getExchangeId
@volatile
private var running: Boolean = false
def isRunning: Boolean = running
def isShutdown: Boolean = !running
def start = {
if (_status == ActorRefInternals.UNSTARTED)
_status = ActorRefInternals.RUNNING
running = true
this
}
def stop() = {
_status = ActorRefInternals.SHUTDOWN
def suspend(): Unit = ()
def resume(): Unit = ()
def stop(): Unit = {
running = false
}
/**

View file

@ -39,7 +39,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite {
requestor ! ActorRegistered(consumer.address, consumer, None)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher ? GetRetainedMessage).get ===
ConsumerActorRegistered(consumer, consumer.actorInstance.get.asInstanceOf[Consumer]))
ConsumerActorRegistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer]))
}
@Test
@ -48,7 +48,7 @@ class ConsumerPublishRequestorTest extends JUnitSuite {
requestor ! ActorUnregistered(consumer.address, consumer, None)
assert(latch.await(5000, TimeUnit.MILLISECONDS))
assert((publisher ? GetRetainedMessage).get ===
ConsumerActorUnregistered(consumer, consumer.actorInstance.get.asInstanceOf[Consumer]))
ConsumerActorUnregistered(consumer, consumer.underlyingActorInstance.asInstanceOf[Consumer]))
}
}

View file

@ -49,7 +49,7 @@ class ConsumerRegisteredTest extends JUnitSuite {
}
private def consumerOf(ref: ActorRef) = ref match {
case l: LocalActorRef l.actorInstance.get.asInstanceOf[Consumer]
case l: LocalActorRef l.underlyingActorInstance.asInstanceOf[Consumer]
case _ null: Consumer
}
}

View file

@ -79,7 +79,7 @@ object ActorSerialization {
.setTimeout(actorRef.timeout)
if (localRef.isDefined)
builder.setActorClassname(localRef.get.actorInstance.get.getClass.getName) //TODO FIXME Why is the classname needed anymore?
builder.setActorClassname(localRef.get.actorClass.getName) //TODO FIXME Why is the classname needed anymore?
replicationScheme match {
case _: Transient | Transient
@ -104,7 +104,7 @@ object ActorSerialization {
localRef foreach { l
if (serializeMailBox) {
l.mailbox match {
l.underlying.mailbox match {
case null throw new IllegalActorStateException("Can't serialize an actor that has not been started.")
case q: java.util.Queue[_]
val l = new scala.collection.mutable.ListBuffer[MessageInvocation]
@ -113,7 +113,7 @@ object ActorSerialization {
l map { m
RemoteActorSerialization.createRemoteMessageProtocolBuilder(
Option(m.receiver),
Option(m.receiver.ref),
Left(actorRef.uuid),
actorRef.address,
actorRef.timeout,
@ -130,7 +130,7 @@ object ActorSerialization {
}
l.receiveTimeout.foreach(builder.setReceiveTimeout(_))
val actorInstance = l.actorInstance.get
val actorInstance = l.underlyingActorInstance
Serialization.serialize(actorInstance.asInstanceOf[T]) match {
case Right(bytes) builder.setActorInstance(ByteString.copyFrom(bytes))
case Left(exception) throw new Exception("Error serializing : " + actorInstance.getClass.getName)

View file

@ -39,15 +39,15 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll
val actor1 = new LocalActorRef(Props[MyStatelessActorWithMessagesInMailbox], newUuid.toString, systemService = true)
for (i 1 to 10) actor1 ! "hello"
actor1.getDispatcher.mailboxSize(actor1) should be > (0)
actor1.getDispatcher.mailboxSize(actor1.underlying) should be > (0)
val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef]
Thread.sleep(1000)
actor2.getDispatcher.mailboxSize(actor1) should be > (0)
actor2.getDispatcher.mailboxSize(actor1.underlying) should be > (0)
(actor2 ? "hello-reply").get should equal("world")
val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef]
Thread.sleep(1000)
actor3.getDispatcher.mailboxSize(actor1) should equal(0)
actor3.getDispatcher.mailboxSize(actor1.underlying) should equal(0)
(actor3 ? "hello-reply").get should equal("world")
}
@ -65,15 +65,15 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll
(actor1 ! p1)
(actor1 ! p1)
(actor1 ! p1)
actor1.getDispatcher.mailboxSize(actor1) should be > (0)
actor1.getDispatcher.mailboxSize(actor1.underlying) should be > (0)
val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef]
Thread.sleep(1000)
actor2.getDispatcher.mailboxSize(actor1) should be > (0)
actor2.getDispatcher.mailboxSize(actor1.underlying) should be > (0)
(actor2 ? "hello-reply").get should equal("hello")
val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef]
Thread.sleep(1000)
actor3.getDispatcher.mailboxSize(actor1) should equal(0)
actor3.getDispatcher.mailboxSize(actor1.underlying) should equal(0)
(actor3 ? "hello-reply").get should equal("hello")
}
}
@ -102,15 +102,15 @@ class ActorSerializeSpec extends Spec with ShouldMatchers with BeforeAndAfterAll
val msg = MyMessage(123, "debasish ghosh", true)
val b = ProtobufProtocol.MyMessage.newBuilder.setId(msg.id).setName(msg.name).setStatus(msg.status).build
for (i 1 to 10) actor1 ! b
actor1.getDispatcher.mailboxSize(actor1) should be > (0)
actor1.getDispatcher.mailboxSize(actor1.underlying) should be > (0)
val actor2 = fromBinary(toBinary(actor1)).asInstanceOf[LocalActorRef]
Thread.sleep(1000)
actor2.getDispatcher.mailboxSize(actor1) should be > (0)
actor2.getDispatcher.mailboxSize(actor1.underlying) should be > (0)
(actor2 ? "hello-reply").get should equal("world")
val actor3 = fromBinary(toBinary(actor1, false)).asInstanceOf[LocalActorRef]
Thread.sleep(1000)
actor3.getDispatcher.mailboxSize(actor1) should equal(0)
actor3.getDispatcher.mailboxSize(actor1.underlying) should equal(0)
(actor3 ? "hello-reply").get should equal("world")
}
}

View file

@ -151,7 +151,7 @@ class Agent[T](initialValue: T) {
* still be executed in order.
*/
def sendOff(f: T T): Unit = send((value: T) {
suspend
suspend()
val threadBased = actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(new PinnedDispatcher()))
threadBased ! Update(f)
value
@ -167,7 +167,7 @@ class Agent[T](initialValue: T) {
def alterOff(f: T T)(timeout: Long): Future[T] = {
val result = new DefaultPromise[T](timeout)
send((value: T) {
suspend
suspend()
val threadBased = Actor.actorOf(new ThreadBasedAgentUpdater(this))
result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]]
value
@ -206,12 +206,12 @@ class Agent[T](initialValue: T) {
/**
* Suspends processing of `send` actions for the agent.
*/
def suspend() = updater.dispatcher.suspend(updater)
def suspend() = updater.suspend()
/**
* Resumes processing of `send` actions for the agent.
*/
def resume() = updater.dispatcher.resume(updater)
def resume() = updater.resume()
/**
* Closes the agents and makes it eligible for garbage collection.
@ -300,7 +300,7 @@ class ThreadBasedAgentUpdater[T](agent: Agent[T]) extends Actor {
case update: Update[_] try {
self.tryReply(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T T] })
} finally {
agent.resume
agent.resume()
self.stop()
}
case _ self.stop()

View file

@ -11,7 +11,7 @@ import java.util.concurrent.RejectedExecutionException
import akka.util.Switch
import java.lang.ref.WeakReference
import scala.annotation.tailrec
import akka.actor.{ LocalActorRef, ActorRef }
import akka.actor.ActorInstance
/*
* Locking rules:
@ -107,9 +107,9 @@ object CallingThreadDispatcher {
class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: Boolean = true) extends MessageDispatcher {
import CallingThreadDispatcher._
protected[akka] override def createMailbox(actor: LocalActorRef) = new CallingThreadMailbox
protected[akka] override def createMailbox(actor: ActorInstance) = new CallingThreadMailbox
private def getMailbox(actor: LocalActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox]
private def getMailbox(actor: ActorInstance) = actor.mailbox.asInstanceOf[CallingThreadMailbox]
protected[akka] override def start() {}
@ -117,11 +117,11 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
protected[akka] override def timeoutMs = 100L
override def suspend(actor: LocalActorRef) {
override def suspend(actor: ActorInstance) {
getMailbox(actor).suspended.switchOn
}
override def resume(actor: LocalActorRef) {
override def resume(actor: ActorInstance) {
val mbox = getMailbox(actor)
val queue = mbox.queue
val wasActive = queue.isActive
@ -133,9 +133,9 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
}
}
override def mailboxSize(actor: LocalActorRef) = getMailbox(actor).queue.size
override def mailboxSize(actor: ActorInstance) = getMailbox(actor).queue.size
override def mailboxIsEmpty(actorRef: LocalActorRef): Boolean = getMailbox(actorRef).queue.isEmpty
override def mailboxIsEmpty(actor: ActorInstance): Boolean = getMailbox(actor).queue.isEmpty
protected[akka] override def dispatch(handle: MessageInvocation) {
val mbox = getMailbox(handle.receiver)

View file

@ -25,14 +25,14 @@ class TestActorRef[T <: Actor](props: Props, address: String) extends LocalActor
* thrown will be available to you, while still being able to use
* become/unbecome and their message counterparts.
*/
def apply(o: Any) { actorInstance.get().apply(o) }
def apply(o: Any) { underlyingActorInstance.apply(o) }
/**
* Retrieve reference to the underlying actor, where the static type matches the factory used inside the
* constructor. Beware that this reference is discarded by the ActorRef upon restarting the actor (should this
* reference be linked to a supervisor). The old Actor may of course still be used in post-mortem assertions.
*/
def underlyingActor: T = actorInstance.get().asInstanceOf[T]
def underlyingActor: T = underlyingActorInstance.asInstanceOf[T]
override def toString = "TestActor[" + address + ":" + uuid + "]"