split up ActorCell functionality into multiple source files

- created package akka.actor.cell to hold the different traits from
  which the ActorCell cake is made
- split up by topic, but leave the message processing itself within
  ActorCell
- move ChildrenContainer into the akka.actor.cell package
- move AbstractActorCell also
- make members of the behavior traits private/protected to tighten their
  scope as much as possible => make it easier to see what’s going on
This commit is contained in:
Roland 2012-07-08 17:41:22 +02:00
parent c0dd08fa94
commit 36ac4d89de
13 changed files with 714 additions and 565 deletions

View file

@ -229,7 +229,7 @@ object SupervisorHierarchySpec {
var pingChildren = Set.empty[ActorRef]
val nextJob = Iterator.continually(Random.nextFloat match {
case x if x > 0.5
case x if x >= 0.5
// ping one child
val pick = ((x - 0.5) * 2 * idleChildren.size).toInt
val ref = idleChildren(pick)

View file

@ -2,8 +2,9 @@
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
package akka.actor.cell;
import akka.actor.ActorCell;
import akka.util.Unsafe;
final class AbstractActorCell {
@ -13,9 +14,9 @@ final class AbstractActorCell {
static {
try {
mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_mailboxDoNotCallMeDirectly"));
childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_childrenRefsDoNotCallMeDirectly"));
nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_nextNameDoNotCallMeDirectly"));
mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Dispatch$$_mailboxDoNotCallMeDirectly"));
childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_childrenRefsDoNotCallMeDirectly"));
nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_nextNameDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}

View file

@ -17,6 +17,7 @@ import collection.immutable.{ TreeSet, TreeMap }
import akka.util.{ Unsafe, Duration, Helpers, NonFatal }
import java.util.concurrent.atomic.AtomicLong
import scala.collection.JavaConverters.asJavaIterableConverter
import akka.actor.cell.ChildrenContainer
//TODO: everything here for current compatibility - could be limited more
@ -214,6 +215,10 @@ private[akka] trait Cell {
* All children of this actor, including only reserved-names.
*/
def childrenRefs: ChildrenContainer
/**
* Get the stats for the named child, if that exists.
*/
def getChildByName(name: String): Option[ChildRestartStats]
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
@ -256,8 +261,6 @@ private[akka] object ActorCell {
def cancel() {}
}
final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, emptyCancellable)
final val emptyBehaviorStack: List[Actor.Receive] = Nil
final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty
@ -266,174 +269,96 @@ private[akka] object ActorCell {
//ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit)
//vars don't need volatile since it's protected with the mailbox status
//Make sure that they are not read/written outside of a message processing (systemInvoke/invoke)
/**
* Everything in here is completely Akka PRIVATE. You will not find any
* supported APIs in this place. This is not the API you were looking
* for! (waves hand)
*/
private[akka] class ActorCell(
val system: ActorSystemImpl,
val self: InternalActorRef,
val props: Props,
@volatile var parent: InternalActorRef) extends UntypedActorContext with Cell {
@volatile var parent: InternalActorRef)
extends UntypedActorContext with Cell
with cell.ReceiveTimeout
with cell.Children
with cell.Dispatch
with cell.DeathWatch
with cell.FaultHandling {
import AbstractActorCell.{ childrenOffset, mailboxOffset, nextNameOffset }
import ActorCell._
import ChildrenContainer._
final def isLocal = true
final def systemImpl = system
protected final def guardian = self
protected final def lookupRoot = self
final def provider = system.provider
/*
* RECEIVE TIMEOUT
*/
var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData
override final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match {
case Duration.Undefined None
case duration Some(duration)
}
final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined))
override final def setReceiveTimeout(timeout: Duration): Unit =
receiveTimeoutData = (
if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout,
receiveTimeoutData._2)
final override def resetReceiveTimeout(): Unit = setReceiveTimeout(None)
/*
* CHILDREN
*/
@volatile
private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer
def childrenRefs: ChildrenContainer = Unsafe.instance.getObjectVolatile(this, childrenOffset).asInstanceOf[ChildrenContainer]
final def children: Iterable[ActorRef] = childrenRefs.children
final def getChildren(): java.lang.Iterable[ActorRef] = asJavaIterableConverter(children).asJava
def actorOf(props: Props): ActorRef = makeChild(this, props, randomName(), async = false)
def actorOf(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = false)
private[akka] def attachChild(props: Props): ActorRef = makeChild(this, props, randomName(), async = true)
private[akka] def attachChild(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = true)
@volatile private var _nextNameDoNotCallMeDirectly = 0L
final protected def randomName(): String = {
@tailrec def inc(): Long = {
val current = Unsafe.instance.getLongVolatile(this, nextNameOffset)
if (Unsafe.instance.compareAndSwapLong(this, nextNameOffset, current, current + 1)) current
else inc()
}
Helpers.base64(inc())
}
final def stop(actor: ActorRef): Unit = {
val started = actor match {
case r: RepointableRef r.isStarted
case _ true
}
if (childrenRefs.getByRef(actor).isDefined && started) shallDie(this, actor)
actor.asInstanceOf[InternalActorRef].stop()
}
/*
* ACTOR STATE
*/
var currentMessage: Envelope = _
var actor: Actor = _
var currentMessage: Envelope = _
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
var watching: Set[ActorRef] = emptyActorRefSet
var watchedBy: Set[ActorRef] = emptyActorRefSet
override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && !watching.contains(a)) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching += a
}
a
/*
* MESSAGE PROCESSING
*/
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def systemInvoke(message: SystemMessage): Unit = try {
message match {
case Create() create()
case Recreate(cause) faultRecreate(cause)
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Suspend() faultSuspend()
case Resume(inRespToFailure) faultResume(inRespToFailure)
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
case NoMessage // only here to suppress warning
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
}
override final def unwatch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && watching.contains(a)) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching -= a
}
a
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def invoke(messageHandle: Envelope): Unit = try {
currentMessage = messageHandle
cancelReceiveTimeout() // FIXME: leave this here???
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg receiveMessage(msg)
}
currentMessage = null // reset current message after successful invocation
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, e.getMessage)
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
def autoReceiveMessage(msg: Envelope): Unit = {
if (system.settings.DebugAutoReceive)
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case Failed(cause) handleFailure(sender, cause)
case t: Terminated watchedActorTerminated(t.actor); receiveMessage(t)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) for (c getChildByName(name)) c.child.tell(m, msg.sender)
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
}
}
final def receiveMessage(msg: Any): Unit = {
//FIXME replace with behaviorStack.head.applyOrElse(msg, unhandled) + "-optimize"
val head = behaviorStack.head
if (head.isDefinedAt(msg)) head.apply(msg) else actor.unhandled(msg)
}
/*
* MAILBOX and DISPATCHER
*/
@volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
@inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, mailboxOffset).asInstanceOf[Mailbox]
@tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = {
val oldMailbox = mailbox
if (!Unsafe.instance.compareAndSwapObject(this, mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox)
else oldMailbox
}
final def hasMessages: Boolean = mailbox.hasMessages
final def numberOfMessages: Int = mailbox.numberOfMessages
val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
/**
* UntypedActorContext impl
*/
final def getDispatcher(): MessageDispatcher = dispatcher
final def isTerminated: Boolean = mailbox.isClosed
final def start(): this.type = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
swapMailbox(dispatcher.createMailbox(this))
mailbox.setActor(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create())
// This call is expected to start off the actor by scheduling its mailbox.
dispatcher.attach(this)
this
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def resume(inResponseToFailure: Boolean): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))
override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message)
/*
* ACTOR CONTEXT IMPL
* ACTOR CONTEXT IMPLEMENTATION
*/
final def sender: ActorRef = currentMessage match {
@ -458,37 +383,9 @@ private[akka] class ActorCell(
}
/*
* FAILURE HANDLING
* ACTOR INSTANCE HANDLING
*/
/* =================
* T H E R U L E S
* =================
*
* Actors can be suspended for two reasons:
* - they fail
* - their supervisor gets suspended
*
* In particular they are not suspended multiple times because of cascading
* own failures, i.e. while currentlyFailed() they do not fail again. In case
* of a restart, failures in constructor/preStart count as new failures.
*/
private def suspendNonRecursive(): Unit = dispatcher suspend this
private def resumeNonRecursive(): Unit = dispatcher resume this
/*
* have we told our supervisor that we Failed() and have not yet heard back?
* (actually: we might have heard back but not yet acted upon it, in case of
* a restart with dying children)
* might well be replaced by ref to a Cancellable in the future (see #2299)
*/
private var _failed = false
def currentlyFailed: Boolean = _failed
def setFailed(): Unit = _failed = true
def setNotFailed(): Unit = _failed = false
//This method is in charge of setting up the contextStack and create a new instance of the Actor
protected def newActor(): Actor = {
contextStack.set(this :: contextStack.get)
@ -509,311 +406,43 @@ private[akka] class ActorCell(
}
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def systemInvoke(message: SystemMessage) {
def create(): Unit = if (childrenRefs.isNormal) {
try {
val created = newActor()
actor = created
created.preStart()
checkReceiveTimeout
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
case NonFatal(i: InstantiationException)
throw new ActorInitializationException(self,
"""exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
private def create(): Unit = if (isNormal) {
try {
val created = newActor()
actor = created
created.preStart()
checkReceiveTimeout
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
case NonFatal(i: InstantiationException)
throw new ActorInitializationException(self,
"""exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new UntypedActorFactory ... )
or is missing an appropriate, reachable no-args constructor.
""", i.getCause)
case NonFatal(e) throw new ActorInitializationException(self, "exception during creation", e)
}
}
def recreate(cause: Throwable): Unit =
if (childrenRefs.isNormal) {
val failedActor = actor
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
if (failedActor ne null) {
try {
// if the actor fails in preRestart, we can do nothing but log it: its best-effort
if (failedActor.context ne null) failedActor.preRestart(cause, Option(currentMessage))
} catch {
case NonFatal(e)
val ex = new PreRestartException(self, e, cause, Option(currentMessage))
publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
} finally {
clearActorFields(failedActor)
}
}
assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status)
childrenRefs match {
case ct: TerminatingChildrenContainer
setChildrenTerminationReason(this, Recreation(cause))
case _
doRecreate(cause, failedActor)
}
} else {
// need to keep that suspend counter balanced
doResume(inResponseToFailure = false)
}
def doSuspend(): Unit = {
// done always to keep that suspend counter balanced
suspendNonRecursive()
childrenRefs.suspendChildren()
}
def doResume(inResponseToFailure: Boolean): Unit = {
// done always to keep that suspend counter balanced
// must happen atomically
try resumeNonRecursive()
finally if (inResponseToFailure) setNotFailed()
childrenRefs.resumeChildren()
}
def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val watcheeSelf = watchee == self
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) {
watchedBy += watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
watch(watchee)
} else {
publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))
}
}
def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val watcheeSelf = watchee == self
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher)) {
watchedBy -= watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
unwatch(watchee)
} else {
publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self)))
}
}
def terminate() {
setReceiveTimeout(None)
cancelReceiveTimeout
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
children foreach stop
childrenRefs match {
case ct: TerminatingChildrenContainer
setChildrenTerminationReason(this, Termination)
// do not process normal messages while waiting for all children to terminate
suspendNonRecursive()
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping"))
case _ doTerminate()
}
}
def supervise(child: ActorRef): Unit = if (!childrenRefs.isTerminating) {
if (childrenRefs.getByRef(child).isEmpty) addChild(this, child)
handleSupervise(child)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
}
try {
message match {
case Create() create()
case Recreate(cause) recreate(cause)
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Suspend() doSuspend()
case Resume(inRespToFailure) doResume(inRespToFailure)
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
case NoMessage // only here to suppress warning
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
case NonFatal(e) throw new ActorInitializationException(self, "exception during creation", e)
}
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def invoke(messageHandle: Envelope): Unit = try {
currentMessage = messageHandle
cancelReceiveTimeout() // FIXME: leave this here???
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg receiveMessage(msg)
}
currentMessage = null // reset current message after successful invocation
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, e.getMessage)
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
final def handleInvokeFailure(t: Throwable, message: String): Unit = {
publish(Error(t, self.path.toString, clazz(actor), message))
// prevent any further messages to be processed until the actor has been restarted
if (!currentlyFailed) {
// suspend self; these two must happen atomically
try suspendNonRecursive()
finally setFailed()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
case Envelope(Failed(`t`), child) Set(child)
case _ Set.empty
}
childrenRefs.suspendChildren(skip)
// tell supervisor
t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t)), self); throw t
case _ parent.tell(Failed(t), self)
}
}
}
def autoReceiveMessage(msg: Envelope): Unit = {
if (system.settings.DebugAutoReceive)
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case Failed(cause) handleFailure(sender, cause)
case t: Terminated watching -= t.actor; receiveMessage(t)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) for (c childrenRefs getByName name) c.child.tell(m, msg.sender)
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
}
}
final def receiveMessage(msg: Any): Unit = {
//FIXME replace with behaviorStack.head.applyOrElse(msg, unhandled) + "-optimize"
val head = behaviorStack.head
if (head.isDefinedAt(msg)) head.apply(msg) else actor.unhandled(msg)
}
private def doTerminate() {
val a = actor
try if (a ne null) a.postStop()
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(ChildTerminated(self))
finally try
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true)
try {
watchedBy foreach {
watcher
try watcher.tell(terminated, self) catch {
case NonFatal(t) publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watchedBy = emptyActorRefSet
}
finally try
if (!watching.isEmpty) {
try {
watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
case watchee: InternalActorRef try watchee.sendSystemMessage(Unwatch(watchee, self)) catch {
case NonFatal(t) publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watching = emptyActorRefSet
}
finally {
if (system.settings.DebugLifecycle)
publish(Debug(self.path.toString, clazz(a), "stopped"))
behaviorStack = emptyBehaviorStack
clearActorFields(a)
actor = null
}
}
private def doRecreate(cause: Throwable, failedActor: Actor): Unit = try {
// must happen atomically
try resumeNonRecursive()
finally setNotFailed()
val survivors = children
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
// only after parent is up and running again do restart the children which were not stopped
survivors foreach (child
try child.asInstanceOf[InternalActorRef].restart(cause)
catch {
case NonFatal(e) publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
})
} catch {
case NonFatal(e)
clearActorFields(actor) // in order to prevent preRestart() from happening again
handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage)
}
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.getByRef(child) match {
case Some(stats) if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.stats)) throw cause
case None publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
}
final def handleChildTerminated(child: ActorRef): Unit = try {
childrenRefs match {
case TerminatingChildrenContainer(_, _, reason)
val n = removeChild(this, child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match {
case Recreation(cause) doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate"
case Termination doTerminate()
case _
}
case _
removeChild(this, child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
}
} catch {
case NonFatal(e) handleInvokeFailure(e, "handleChildTerminated failed")
private def supervise(child: ActorRef): Unit = if (!isTerminating) {
addChild(child)
handleSupervise(child)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
}
// future extension point
protected def handleSupervise(child: ActorRef): Unit = child match {
case r: RepointableActorRef r.activate()
case _
}
final def checkReceiveTimeout() {
val recvtimeout = receiveTimeoutData
if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) {
recvtimeout._2.cancel() //Cancel any ongoing future
//Only reschedule if desired and there are currently no more messages to be processed
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, ReceiveTimeout))
} else cancelReceiveTimeout()
}
final def cancelReceiveTimeout(): Unit =
if (receiveTimeoutData._2 ne emptyCancellable) {
receiveTimeoutData._2.cancel()
receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable)
}
final def clearActorFields(actorInstance: Actor): Unit = {
final protected def clearActorFields(actorInstance: Actor): Unit = {
setActorFields(actorInstance, context = null, self = system.deadLetters)
currentMessage = null
behaviorStack = emptyBehaviorStack
}
final def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef) {
final protected def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef) {
@tailrec
def lookupAndSetField(clazz: Class[_], actor: Actor, name: String, value: Any): Boolean = {
val success = try {
@ -839,8 +468,8 @@ private[akka] class ActorCell(
}
// logging is not the main purpose, and if it fails theres nothing we can do
private final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) }
protected final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) }
private final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass
protected final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass
}

View file

@ -307,7 +307,7 @@ private[akka] class LocalActorRef private[akka] (
* to inject synthetic actor paths like /temp.
*/
protected def getSingleChild(name: String): InternalActorRef =
actorCell.childrenRefs.getByName(name) match {
actorCell.getChildByName(name) match {
case Some(crs) crs.child.asInstanceOf[InternalActorRef]
case None Nobody
}

View file

@ -15,6 +15,7 @@ import akka.util._
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.cell.ChildrenContainer
object ActorSystem {

View file

@ -230,7 +230,8 @@ abstract class SupervisorStrategy {
/**
* This method is called after the child has been removed from the set of children.
* It does not need to do anything special.
* It does not need to do anything special. Exceptions thrown from this method
* do NOT make the actor fail if this happens during termination.
*/
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit

View file

@ -16,6 +16,7 @@ import akka.dispatch.MessageDispatcher
import java.util.concurrent.locks.ReentrantLock
import akka.event.Logging.Warning
import scala.collection.mutable.Queue
import akka.actor.cell.ChildrenContainer
/**
* This actor ref starts out with some dummy cell (by default just enqueuing
@ -102,7 +103,7 @@ private[akka] class RepointableActorRef(
case ".." getParent.getChild(name)
case "" getChild(name)
case other
underlying.childrenRefs.getByName(other) match {
underlying.getChildByName(other) match {
case Some(crs) crs.child.asInstanceOf[InternalActorRef].getChild(name)
case None Nobody
}
@ -176,6 +177,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
def isTerminated: Boolean = false
def parent: InternalActorRef = supervisor
def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer
def getChildByName(name: String): Option[ChildRestartStats] = None
def tell(message: Any, sender: ActorRef): Unit = {
lock.lock()
try {

View file

@ -0,0 +1,181 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
import scala.annotation.tailrec
import scala.collection.JavaConverters.asJavaIterableConverter
import akka.actor.{ RepointableRef, Props, InternalActorRef, ActorRef, ActorCell }
import akka.util.{ Unsafe, Helpers }
import akka.actor.InvalidActorNameException
import akka.serialization.SerializationExtension
import akka.util.NonFatal
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.ActorPath
import akka.actor.ChildRestartStats
import akka.actor.ChildRestartStats
import akka.actor.ChildRestartStats
import akka.actor.ChildRestartStats
trait Children { this: ActorCell
import ChildrenContainer._
@volatile
private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer
def childrenRefs: ChildrenContainer =
Unsafe.instance.getObjectVolatile(this, AbstractActorCell.childrenOffset).asInstanceOf[ChildrenContainer]
final def children: Iterable[ActorRef] = childrenRefs.children
final def getChildren(): java.lang.Iterable[ActorRef] = children.asJava
def actorOf(props: Props): ActorRef = makeChild(this, props, randomName(), async = false)
def actorOf(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = false)
private[akka] def attachChild(props: Props): ActorRef = makeChild(this, props, randomName(), async = true)
private[akka] def attachChild(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = true)
@volatile private var _nextNameDoNotCallMeDirectly = 0L
final protected def randomName(): String = {
@tailrec def inc(): Long = {
val current = Unsafe.instance.getLongVolatile(this, AbstractActorCell.nextNameOffset)
if (Unsafe.instance.compareAndSwapLong(this, AbstractActorCell.nextNameOffset, current, current + 1)) current
else inc()
}
Helpers.base64(inc())
}
final def stop(actor: ActorRef): Unit = {
val started = actor match {
case r: RepointableRef r.isStarted
case _ true
}
if (childrenRefs.getByRef(actor).isDefined && started) shallDie(actor)
actor.asInstanceOf[InternalActorRef].stop()
}
/*
* low level CAS helpers
*/
@inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren)
@tailrec final protected def reserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.reserve(name)) || reserveChild(name)
}
@tailrec final protected def unreserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name)
}
final protected def addChild(ref: ActorRef): Boolean = {
@tailrec def rec(): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.add(ref)) || rec()
}
if (childrenRefs.getByRef(ref).isEmpty) rec() else false
}
@tailrec final protected def shallDie(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref)
}
@tailrec final private def removeChild(ref: ActorRef): ChildrenContainer = {
val c = childrenRefs
val n = c.remove(ref)
if (swapChildrenRefs(c, n)) n
else removeChild(ref)
}
@tailrec final protected def setChildrenTerminationReason(reason: ChildrenContainer.SuspendReason): Boolean = {
childrenRefs match {
case c: ChildrenContainer.TerminatingChildrenContainer
swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason)
case _ false
}
}
/*
* ActorCell-internal API
*/
protected def isNormal = childrenRefs.isNormal
protected def isTerminating = childrenRefs.isTerminating
protected def suspendChildren(skip: Set[ActorRef] = Set.empty): Unit =
childrenRefs.stats collect {
case ChildRestartStats(child, _, _) if !(skip contains child) child
} foreach (_.asInstanceOf[InternalActorRef].suspend())
protected def resumeChildren(): Unit =
childrenRefs.stats foreach (_.child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = false))
def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name)
protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref)
protected def getAllChildStats: Iterable[ChildRestartStats] = childrenRefs.stats
protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = {
childrenRefs match {
case TerminatingChildrenContainer(_, _, reason)
val n = removeChild(child)
if (!n.isInstanceOf[TerminatingChildrenContainer]) Some(reason) else None
case _ removeChild(child); None
}
}
/*
* Private helpers
*/
private def checkName(name: String): String = {
import ActorPath.ElementRegex
name match {
case null throw new InvalidActorNameException("actor name must not be null")
case "" throw new InvalidActorNameException("actor name must not be empty")
case ElementRegex() name
case _ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex)
}
}
private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean): ActorRef = {
if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
val ser = SerializationExtension(cell.system)
ser.serialize(props.creator) match {
case Left(t) throw t
case Right(bytes) ser.deserialize(bytes, props.creator.getClass) match {
case Left(t) throw t
case _ //All good
}
}
}
/*
* in case we are currently terminating, fail external attachChild requests
* (internal calls cannot happen anyway because we are suspended)
*/
if (cell.childrenRefs.isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated")
else {
reserveChild(name)
// this name will either be unreserved or overwritten with a real child below
val actor =
try {
cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name,
systemService = false, deploy = None, lookupDeploy = true, async = async)
} catch {
case NonFatal(e)
unreserveChild(name)
throw e
}
addChild(actor)
actor
}
}
}

View file

@ -2,13 +2,24 @@
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
package akka.actor.cell
import scala.collection.immutable.TreeMap
import scala.annotation.tailrec
import akka.util.Unsafe
import akka.serialization.SerializationExtension
import akka.util.NonFatal
import akka.actor.ActorPath.ElementRegex
import akka.actor.ActorCell
import akka.actor.ActorRef
import akka.actor.ChildNameReserved
import akka.actor.ChildRestartStats
import akka.actor.ChildStats
import akka.actor.InternalActorRef
import akka.actor.InvalidActorNameException
import akka.actor.NoSerializationVerificationNeeded
import akka.actor.Props
import scala.annotation.tailrec
/**
* INTERNAL API
@ -33,13 +44,6 @@ private[akka] trait ChildrenContainer {
def isTerminating: Boolean = false
def isNormal: Boolean = true
def suspendChildren(skip: Set[ActorRef] = Set.empty): Unit =
stats collect {
case ChildRestartStats(child, _, _) if !(skip contains child) child
} foreach (_.asInstanceOf[InternalActorRef].suspend())
def resumeChildren(): Unit = stats foreach (_.child.asInstanceOf[InternalActorRef].suspend())
}
/**
@ -50,88 +54,6 @@ private[akka] trait ChildrenContainer {
*/
private[akka] object ChildrenContainer {
// low level CAS helpers
@inline private def swapChildrenRefs(cell: ActorCell, oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean =
Unsafe.instance.compareAndSwapObject(cell, AbstractActorCell.childrenOffset, oldChildren, newChildren)
@tailrec final def reserveChild(cell: ActorCell, name: String): Boolean = {
val c = cell.childrenRefs
swapChildrenRefs(cell, c, c.reserve(name)) || reserveChild(cell, name)
}
@tailrec final def unreserveChild(cell: ActorCell, name: String): Boolean = {
val c = cell.childrenRefs
swapChildrenRefs(cell, c, c.unreserve(name)) || unreserveChild(cell, name)
}
@tailrec final def addChild(cell: ActorCell, ref: ActorRef): Boolean = {
val c = cell.childrenRefs
swapChildrenRefs(cell, c, c.add(ref)) || addChild(cell, ref)
}
@tailrec final def shallDie(cell: ActorCell, ref: ActorRef): Boolean = {
val c = cell.childrenRefs
swapChildrenRefs(cell, c, c.shallDie(ref)) || shallDie(cell, ref)
}
@tailrec final def removeChild(cell: ActorCell, ref: ActorRef): ChildrenContainer = {
val c = cell.childrenRefs
val n = c.remove(ref)
if (swapChildrenRefs(cell, c, n)) n
else removeChild(cell, ref)
}
@tailrec final def setChildrenTerminationReason(cell: ActorCell, reason: ChildrenContainer.SuspendReason): Boolean = {
cell.childrenRefs match {
case c: ChildrenContainer.TerminatingChildrenContainer
swapChildrenRefs(cell, c, c.copy(reason = reason)) || setChildrenTerminationReason(cell, reason)
case _ false
}
}
def checkName(name: String): String = {
import ActorPath.ElementRegex
name match {
case null throw new InvalidActorNameException("actor name must not be null")
case "" throw new InvalidActorNameException("actor name must not be empty")
case ElementRegex() name
case _ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex)
}
}
def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean): ActorRef = {
if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
val ser = SerializationExtension(cell.system)
ser.serialize(props.creator) match {
case Left(t) throw t
case Right(bytes) ser.deserialize(bytes, props.creator.getClass) match {
case Left(t) throw t
case _ //All good
}
}
}
/*
* in case we are currently terminating, fail external attachChild requests
* (internal calls cannot happen anyway because we are suspended)
*/
if (cell.childrenRefs.isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated")
else {
reserveChild(cell, name)
// this name will either be unreserved or overwritten with a real child below
val actor =
try {
cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name,
systemService = false, deploy = None, lookupDeploy = true, async = async)
} catch {
case NonFatal(e)
unreserveChild(cell, name)
throw e
}
addChild(cell, actor)
actor
}
}
sealed trait SuspendReason
case object UserRequest extends SuspendReason
case class Recreation(cause: Throwable) extends SuspendReason

View file

@ -0,0 +1,93 @@
package akka.actor.cell
import akka.actor.{ InternalActorRef, ActorRef, ActorCell }
import akka.dispatch.{ Watch, Unwatch }
import akka.event.Logging._
import akka.util.NonFatal
import akka.actor.Terminated
import akka.actor.Actor
trait DeathWatch { this: ActorCell
private var watching: Set[ActorRef] = ActorCell.emptyActorRefSet
private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet
override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && !watching.contains(a)) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching += a
}
a
}
override final def unwatch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && watching.contains(a)) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching -= a
}
a
}
protected def watchedActorTerminated(ref: ActorRef): Unit = watching -= ref
protected def tellWatchersWeDied(actor: Actor): Unit = {
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true)
try {
watchedBy foreach {
watcher
try watcher.tell(terminated, self) catch {
case NonFatal(t) publish(Error(t, self.path.toString, clazz(actor), "deathwatch"))
}
}
} finally watchedBy = ActorCell.emptyActorRefSet
}
}
protected def unwatchWatchedActors(actor: Actor): Unit = {
if (!watching.isEmpty) {
try {
watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
case watchee: InternalActorRef try watchee.sendSystemMessage(Unwatch(watchee, self)) catch {
case NonFatal(t) publish(Error(t, self.path.toString, clazz(actor), "deathwatch"))
}
}
} finally watching = ActorCell.emptyActorRefSet
}
}
protected def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val watcheeSelf = watchee == self
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) {
watchedBy += watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
watch(watchee)
} else {
publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))
}
}
protected def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val watcheeSelf = watchee == self
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher)) {
watchedBy -= watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
unwatch(watchee)
} else {
publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self)))
}
}
}

View file

@ -0,0 +1,81 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
import scala.annotation.tailrec
import akka.actor.ActorRef
import akka.dispatch.SystemMessage
import akka.util.Unsafe
import akka.dispatch.MessageDispatcher
import akka.dispatch.Suspend
import akka.dispatch.Recreate
import akka.actor.ActorCell
import akka.dispatch.Terminate
import akka.dispatch.Envelope
import akka.dispatch.Resume
import akka.dispatch.Mailbox
import akka.dispatch.Create
trait Dispatch { this: ActorCell
@volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
@inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, AbstractActorCell.mailboxOffset).asInstanceOf[Mailbox]
@tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = {
val oldMailbox = mailbox
if (!Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox)
else oldMailbox
}
final def hasMessages: Boolean = mailbox.hasMessages
final def numberOfMessages: Int = mailbox.numberOfMessages
val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
/**
* UntypedActorContext impl
*/
final def getDispatcher(): MessageDispatcher = dispatcher
final def isTerminated: Boolean = mailbox.isClosed
final def start(): this.type = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
swapMailbox(dispatcher.createMailbox(this))
mailbox.setActor(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create())
// This call is expected to start off the actor by scheduling its mailbox.
dispatcher.attach(this)
this
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def resume(inResponseToFailure: Boolean): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))
override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message)
}

View file

@ -0,0 +1,184 @@
package akka.actor.cell
import akka.actor.PreRestartException
import akka.actor.ActorCell
import akka.util.NonFatal
import akka.actor.ActorRef
import akka.actor.PostRestartException
import akka.actor.Actor
import akka.dispatch.Envelope
import akka.dispatch.ChildTerminated
import akka.actor.Failed
import akka.actor.InternalActorRef
import akka.event.Logging._
import akka.actor.ActorInterruptedException
trait FaultHandling { this: ActorCell
/* =================
* T H E R U L E S
* =================
*
* Actors can be suspended for two reasons:
* - they fail
* - their supervisor gets suspended
*
* In particular they are not suspended multiple times because of cascading
* own failures, i.e. while currentlyFailed() they do not fail again. In case
* of a restart, failures in constructor/preStart count as new failures.
*/
private def suspendNonRecursive(): Unit = dispatcher suspend this
private def resumeNonRecursive(): Unit = dispatcher resume this
/*
* have we told our supervisor that we Failed() and have not yet heard back?
* (actually: we might have heard back but not yet acted upon it, in case of
* a restart with dying children)
* might well be replaced by ref to a Cancellable in the future (see #2299)
*/
private var _failed = false
private def currentlyFailed: Boolean = _failed
private def setFailed(): Unit = _failed = true
private def setNotFailed(): Unit = _failed = false
protected def faultRecreate(cause: Throwable): Unit =
if (isNormal) {
val failedActor = actor
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
if (failedActor ne null) {
try {
// if the actor fails in preRestart, we can do nothing but log it: its best-effort
if (failedActor.context ne null) failedActor.preRestart(cause, Option(currentMessage))
} catch {
case NonFatal(e)
val ex = new PreRestartException(self, e, cause, Option(currentMessage))
publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
} finally {
clearActorFields(failedActor)
}
}
assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status)
if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor)
} else {
// need to keep that suspend counter balanced
faultResume(inResponseToFailure = false)
}
protected def faultSuspend(): Unit = {
// done always to keep that suspend counter balanced
suspendNonRecursive()
suspendChildren()
}
protected def faultResume(inResponseToFailure: Boolean): Unit = {
// done always to keep that suspend counter balanced
// must happen atomically
try resumeNonRecursive()
finally if (inResponseToFailure) setNotFailed()
resumeChildren()
}
protected def terminate() {
setReceiveTimeout(None)
cancelReceiveTimeout
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
children foreach stop
if (setChildrenTerminationReason(ChildrenContainer.Termination)) {
// do not process normal messages while waiting for all children to terminate
suspendNonRecursive()
// do not propagate failures during shutdown to the supervisor
setFailed()
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping"))
} else finishTerminate()
}
final def handleInvokeFailure(t: Throwable, message: String): Unit = {
publish(Error(t, self.path.toString, clazz(actor), message))
// prevent any further messages to be processed until the actor has been restarted
if (!currentlyFailed) try {
suspendNonRecursive()
setFailed()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
case Envelope(Failed(`t`), child) Set(child)
case _ Set.empty
}
suspendChildren(skip)
// tell supervisor
t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t)), self); throw t
case _ parent.tell(Failed(t), self)
}
} catch {
case NonFatal(e)
publish(Error(e, self.path.toString, clazz(actor), "emergency stop: exception in failure handling"))
try children foreach stop
finally finishTerminate()
}
}
private def finishTerminate() {
val a = actor
try if (a ne null) a.postStop()
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(ChildTerminated(self))
finally try tellWatchersWeDied(a)
finally try unwatchWatchedActors(a)
finally {
if (system.settings.DebugLifecycle)
publish(Debug(self.path.toString, clazz(a), "stopped"))
clearActorFields(a)
actor = null
}
}
private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = try {
try resumeNonRecursive()
finally setNotFailed() // must happen in any case, so that failure is propagated
val survivors = children
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
// only after parent is up and running again do restart the children which were not stopped
survivors foreach (child
try child.asInstanceOf[InternalActorRef].restart(cause)
catch {
case NonFatal(e) publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
})
} catch {
case NonFatal(e)
clearActorFields(actor) // in order to prevent preRestart() from happening again
handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage)
}
final protected def handleFailure(child: ActorRef, cause: Throwable): Unit = getChildByRef(child) match {
case Some(stats) if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause
case None publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
}
final protected def handleChildTerminated(child: ActorRef): Unit = try {
val status = removeChildAndGetStateChange(child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
/*
* if the removal changed the state of the (terminating) children container,
* then we are continuing the previously suspended recreate/terminate action
*/
status match {
case Some(ChildrenContainer.Recreation(cause)) finishRecreate(cause, actor)
case Some(ChildrenContainer.Termination) finishTerminate()
case _
}
} catch {
case NonFatal(e) handleInvokeFailure(e, "handleChildTerminated failed")
}
}

View file

@ -0,0 +1,54 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
import ReceiveTimeout.emptyReceiveTimeoutData
import akka.actor.ActorCell
import akka.actor.ActorCell.emptyCancellable
import akka.actor.Cancellable
import akka.util.Duration
object ReceiveTimeout {
final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, ActorCell.emptyCancellable)
}
trait ReceiveTimeout { this: ActorCell
import ReceiveTimeout._
import ActorCell._
private var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData
final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match {
case Duration.Undefined None
case duration Some(duration)
}
final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined))
final def setReceiveTimeout(timeout: Duration): Unit =
receiveTimeoutData = (
if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout,
receiveTimeoutData._2)
final def resetReceiveTimeout(): Unit = setReceiveTimeout(None)
final def checkReceiveTimeout() {
val recvtimeout = receiveTimeoutData
if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) {
recvtimeout._2.cancel() //Cancel any ongoing future
//Only reschedule if desired and there are currently no more messages to be processed
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, akka.actor.ReceiveTimeout))
} else cancelReceiveTimeout()
}
final def cancelReceiveTimeout(): Unit =
if (receiveTimeoutData._2 ne emptyCancellable) {
receiveTimeoutData._2.cancel()
receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable)
}
}