From 36ac4d89de1ce91ba1f4e317026ef1c74394b658 Mon Sep 17 00:00:00 2001 From: Roland Date: Sun, 8 Jul 2012 17:41:22 +0200 Subject: [PATCH] split up ActorCell functionality into multiple source files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - created package akka.actor.cell to hold the different traits from which the ActorCell cake is made - split up by topic, but leave the message processing itself within ActorCell - move ChildrenContainer into the akka.actor.cell package - move AbstractActorCell also - make members of the behavior traits private/protected to tighten their scope as much as possible => make it easier to see what’s going on --- .../akka/actor/SupervisorHierarchySpec.scala | 2 +- .../actor/{ => cell}/AbstractActorCell.java | 9 +- .../src/main/scala/akka/actor/ActorCell.scala | 563 +++--------------- .../src/main/scala/akka/actor/ActorRef.scala | 2 +- .../main/scala/akka/actor/ActorSystem.scala | 1 + .../main/scala/akka/actor/FaultHandling.scala | 3 +- .../akka/actor/RepointableActorRef.scala | 4 +- .../main/scala/akka/actor/cell/Children.scala | 181 ++++++ .../actor/{ => cell}/ChildrenContainer.scala | 102 +--- .../scala/akka/actor/cell/DeathWatch.scala | 93 +++ .../main/scala/akka/actor/cell/Dispatch.scala | 81 +++ .../scala/akka/actor/cell/FaultHandling.scala | 184 ++++++ .../akka/actor/cell/ReceiveTimeout.scala | 54 ++ 13 files changed, 714 insertions(+), 565 deletions(-) rename akka-actor/src/main/java/akka/actor/{ => cell}/AbstractActorCell.java (63%) create mode 100644 akka-actor/src/main/scala/akka/actor/cell/Children.scala rename akka-actor/src/main/scala/akka/actor/{ => cell}/ChildrenContainer.scala (67%) create mode 100644 akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala create mode 100644 akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala create mode 100644 akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala create mode 100644 akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 42eccf2a81..31b046486d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -229,7 +229,7 @@ object SupervisorHierarchySpec { var pingChildren = Set.empty[ActorRef] val nextJob = Iterator.continually(Random.nextFloat match { - case x if x > 0.5 ⇒ + case x if x >= 0.5 ⇒ // ping one child val pick = ((x - 0.5) * 2 * idleChildren.size).toInt val ref = idleChildren(pick) diff --git a/akka-actor/src/main/java/akka/actor/AbstractActorCell.java b/akka-actor/src/main/java/akka/actor/cell/AbstractActorCell.java similarity index 63% rename from akka-actor/src/main/java/akka/actor/AbstractActorCell.java rename to akka-actor/src/main/java/akka/actor/cell/AbstractActorCell.java index 95fb7368bc..2d8c4fbc1e 100644 --- a/akka-actor/src/main/java/akka/actor/AbstractActorCell.java +++ b/akka-actor/src/main/java/akka/actor/cell/AbstractActorCell.java @@ -2,8 +2,9 @@ * Copyright (C) 2009-2012 Typesafe Inc. */ -package akka.actor; +package akka.actor.cell; +import akka.actor.ActorCell; import akka.util.Unsafe; final class AbstractActorCell { @@ -13,9 +14,9 @@ final class AbstractActorCell { static { try { - mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_mailboxDoNotCallMeDirectly")); - childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_childrenRefsDoNotCallMeDirectly")); - nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_nextNameDoNotCallMeDirectly")); + mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Dispatch$$_mailboxDoNotCallMeDirectly")); + childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_childrenRefsDoNotCallMeDirectly")); + nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_nextNameDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 266aeff4b2..5aac54dce1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -17,6 +17,7 @@ import collection.immutable.{ TreeSet, TreeMap } import akka.util.{ Unsafe, Duration, Helpers, NonFatal } import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters.asJavaIterableConverter +import akka.actor.cell.ChildrenContainer //TODO: everything here for current compatibility - could be limited more @@ -214,6 +215,10 @@ private[akka] trait Cell { * All children of this actor, including only reserved-names. */ def childrenRefs: ChildrenContainer + /** + * Get the stats for the named child, if that exists. + */ + def getChildByName(name: String): Option[ChildRestartStats] /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. @@ -256,8 +261,6 @@ private[akka] object ActorCell { def cancel() {} } - final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, emptyCancellable) - final val emptyBehaviorStack: List[Actor.Receive] = Nil final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty @@ -266,174 +269,96 @@ private[akka] object ActorCell { //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) //vars don't need volatile since it's protected with the mailbox status //Make sure that they are not read/written outside of a message processing (systemInvoke/invoke) +/** + * Everything in here is completely Akka PRIVATE. You will not find any + * supported APIs in this place. This is not the API you were looking + * for! (waves hand) + */ private[akka] class ActorCell( val system: ActorSystemImpl, val self: InternalActorRef, val props: Props, - @volatile var parent: InternalActorRef) extends UntypedActorContext with Cell { + @volatile var parent: InternalActorRef) + extends UntypedActorContext with Cell + with cell.ReceiveTimeout + with cell.Children + with cell.Dispatch + with cell.DeathWatch + with cell.FaultHandling { - import AbstractActorCell.{ childrenOffset, mailboxOffset, nextNameOffset } import ActorCell._ - import ChildrenContainer._ final def isLocal = true final def systemImpl = system - protected final def guardian = self - protected final def lookupRoot = self - final def provider = system.provider - /* - * RECEIVE TIMEOUT - */ - - var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData - - override final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match { - case Duration.Undefined ⇒ None - case duration ⇒ Some(duration) - } - - final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined)) - - override final def setReceiveTimeout(timeout: Duration): Unit = - receiveTimeoutData = ( - if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout, - receiveTimeoutData._2) - - final override def resetReceiveTimeout(): Unit = setReceiveTimeout(None) - - /* - * CHILDREN - */ - - @volatile - private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer - - def childrenRefs: ChildrenContainer = Unsafe.instance.getObjectVolatile(this, childrenOffset).asInstanceOf[ChildrenContainer] - - final def children: Iterable[ActorRef] = childrenRefs.children - final def getChildren(): java.lang.Iterable[ActorRef] = asJavaIterableConverter(children).asJava - - def actorOf(props: Props): ActorRef = makeChild(this, props, randomName(), async = false) - def actorOf(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = false) - private[akka] def attachChild(props: Props): ActorRef = makeChild(this, props, randomName(), async = true) - private[akka] def attachChild(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = true) - - @volatile private var _nextNameDoNotCallMeDirectly = 0L - final protected def randomName(): String = { - @tailrec def inc(): Long = { - val current = Unsafe.instance.getLongVolatile(this, nextNameOffset) - if (Unsafe.instance.compareAndSwapLong(this, nextNameOffset, current, current + 1)) current - else inc() - } - Helpers.base64(inc()) - } - - final def stop(actor: ActorRef): Unit = { - val started = actor match { - case r: RepointableRef ⇒ r.isStarted - case _ ⇒ true - } - if (childrenRefs.getByRef(actor).isDefined && started) shallDie(this, actor) - actor.asInstanceOf[InternalActorRef].stop() - } - - /* - * ACTOR STATE - */ - - var currentMessage: Envelope = _ var actor: Actor = _ + var currentMessage: Envelope = _ private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack - var watching: Set[ActorRef] = emptyActorRefSet - var watchedBy: Set[ActorRef] = emptyActorRefSet - override final def watch(subject: ActorRef): ActorRef = subject match { - case a: InternalActorRef ⇒ - if (a != self && !watching.contains(a)) { - a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching += a - } - a + /* + * MESSAGE PROCESSING + */ + + //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status + final def systemInvoke(message: SystemMessage): Unit = try { + message match { + case Create() ⇒ create() + case Recreate(cause) ⇒ faultRecreate(cause) + case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher) + case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher) + case Suspend() ⇒ faultSuspend() + case Resume(inRespToFailure) ⇒ faultResume(inRespToFailure) + case Terminate() ⇒ terminate() + case Supervise(child) ⇒ supervise(child) + case ChildTerminated(child) ⇒ handleChildTerminated(child) + case NoMessage ⇒ // only here to suppress warning + } + } catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) } - override final def unwatch(subject: ActorRef): ActorRef = subject match { - case a: InternalActorRef ⇒ - if (a != self && watching.contains(a)) { - a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching -= a - } - a + //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status + final def invoke(messageHandle: Envelope): Unit = try { + currentMessage = messageHandle + cancelReceiveTimeout() // FIXME: leave this here??? + messageHandle.message match { + case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) + case msg ⇒ receiveMessage(msg) + } + currentMessage = null // reset current message after successful invocation + } catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, e.getMessage) + } finally { + checkReceiveTimeout // Reschedule receive timeout + } + + def autoReceiveMessage(msg: Envelope): Unit = { + if (system.settings.DebugAutoReceive) + publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) + + msg.message match { + case Failed(cause) ⇒ handleFailure(sender, cause) + case t: Terminated ⇒ watchedActorTerminated(t.actor); receiveMessage(t) + case Kill ⇒ throw new ActorKilledException("Kill") + case PoisonPill ⇒ self.stop() + case SelectParent(m) ⇒ parent.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ for (c ← getChildByName(name)) c.child.tell(m, msg.sender) + case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) + } + } + + final def receiveMessage(msg: Any): Unit = { + //FIXME replace with behaviorStack.head.applyOrElse(msg, unhandled) + "-optimize" + val head = behaviorStack.head + if (head.isDefinedAt(msg)) head.apply(msg) else actor.unhandled(msg) } /* - * MAILBOX and DISPATCHER - */ - - @volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status - - @inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, mailboxOffset).asInstanceOf[Mailbox] - - @tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = { - val oldMailbox = mailbox - if (!Unsafe.instance.compareAndSwapObject(this, mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox) - else oldMailbox - } - - final def hasMessages: Boolean = mailbox.hasMessages - - final def numberOfMessages: Int = mailbox.numberOfMessages - - val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) - - /** - * UntypedActorContext impl - */ - final def getDispatcher(): MessageDispatcher = dispatcher - - final def isTerminated: Boolean = mailbox.isClosed - - final def start(): this.type = { - - /* - * Create the mailbox and enqueue the Create() message to ensure that - * this is processed before anything else. - */ - swapMailbox(dispatcher.createMailbox(this)) - mailbox.setActor(this) - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - mailbox.systemEnqueue(self, Create()) - - // This call is expected to start off the actor by scheduling its mailbox. - dispatcher.attach(this) - - this - } - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def resume(inResponseToFailure: Boolean): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure)) - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) - - def tell(message: Any, sender: ActorRef): Unit = - dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) - - override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message) - - /* - * ACTOR CONTEXT IMPL + * ACTOR CONTEXT IMPLEMENTATION */ final def sender: ActorRef = currentMessage match { @@ -458,37 +383,9 @@ private[akka] class ActorCell( } /* - * FAILURE HANDLING + * ACTOR INSTANCE HANDLING */ - /* ================= - * T H E R U L E S - * ================= - * - * Actors can be suspended for two reasons: - * - they fail - * - their supervisor gets suspended - * - * In particular they are not suspended multiple times because of cascading - * own failures, i.e. while currentlyFailed() they do not fail again. In case - * of a restart, failures in constructor/preStart count as new failures. - */ - - private def suspendNonRecursive(): Unit = dispatcher suspend this - - private def resumeNonRecursive(): Unit = dispatcher resume this - - /* - * have we told our supervisor that we Failed() and have not yet heard back? - * (actually: we might have heard back but not yet acted upon it, in case of - * a restart with dying children) - * might well be replaced by ref to a Cancellable in the future (see #2299) - */ - private var _failed = false - def currentlyFailed: Boolean = _failed - def setFailed(): Unit = _failed = true - def setNotFailed(): Unit = _failed = false - //This method is in charge of setting up the contextStack and create a new instance of the Actor protected def newActor(): Actor = { contextStack.set(this :: contextStack.get) @@ -509,311 +406,43 @@ private[akka] class ActorCell( } } - //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status - final def systemInvoke(message: SystemMessage) { - - def create(): Unit = if (childrenRefs.isNormal) { - try { - val created = newActor() - actor = created - created.preStart() - checkReceiveTimeout - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) - } catch { - case NonFatal(i: InstantiationException) ⇒ - throw new ActorInitializationException(self, - """exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either, + private def create(): Unit = if (isNormal) { + try { + val created = newActor() + actor = created + created.preStart() + checkReceiveTimeout + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) + } catch { + case NonFatal(i: InstantiationException) ⇒ + throw new ActorInitializationException(self, + """exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either, a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new UntypedActorFactory ... ) or is missing an appropriate, reachable no-args constructor. """, i.getCause) - case NonFatal(e) ⇒ throw new ActorInitializationException(self, "exception during creation", e) - } - } - - def recreate(cause: Throwable): Unit = - if (childrenRefs.isNormal) { - val failedActor = actor - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting")) - if (failedActor ne null) { - try { - // if the actor fails in preRestart, we can do nothing but log it: it’s best-effort - if (failedActor.context ne null) failedActor.preRestart(cause, Option(currentMessage)) - } catch { - case NonFatal(e) ⇒ - val ex = new PreRestartException(self, e, cause, Option(currentMessage)) - publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage)) - } finally { - clearActorFields(failedActor) - } - } - assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status) - childrenRefs match { - case ct: TerminatingChildrenContainer ⇒ - setChildrenTerminationReason(this, Recreation(cause)) - case _ ⇒ - doRecreate(cause, failedActor) - } - } else { - // need to keep that suspend counter balanced - doResume(inResponseToFailure = false) - } - - def doSuspend(): Unit = { - // done always to keep that suspend counter balanced - suspendNonRecursive() - childrenRefs.suspendChildren() - } - - def doResume(inResponseToFailure: Boolean): Unit = { - // done always to keep that suspend counter balanced - // must happen “atomically” - try resumeNonRecursive() - finally if (inResponseToFailure) setNotFailed() - childrenRefs.resumeChildren() - } - - def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { - val watcheeSelf = watchee == self - val watcherSelf = watcher == self - - if (watcheeSelf && !watcherSelf) { - if (!watchedBy.contains(watcher)) { - watchedBy += watcher - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher)) - } - } else if (!watcheeSelf && watcherSelf) { - watch(watchee) - } else { - publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self))) - } - } - - def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { - val watcheeSelf = watchee == self - val watcherSelf = watcher == self - - if (watcheeSelf && !watcherSelf) { - if (watchedBy.contains(watcher)) { - watchedBy -= watcher - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher)) - } - } else if (!watcheeSelf && watcherSelf) { - unwatch(watchee) - } else { - publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self))) - } - } - - def terminate() { - setReceiveTimeout(None) - cancelReceiveTimeout - - // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) - children foreach stop - - childrenRefs match { - case ct: TerminatingChildrenContainer ⇒ - setChildrenTerminationReason(this, Termination) - // do not process normal messages while waiting for all children to terminate - suspendNonRecursive() - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping")) - case _ ⇒ doTerminate() - } - } - - def supervise(child: ActorRef): Unit = if (!childrenRefs.isTerminating) { - if (childrenRefs.getByRef(child).isEmpty) addChild(this, child) - handleSupervise(child) - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) - } - - try { - message match { - case Create() ⇒ create() - case Recreate(cause) ⇒ recreate(cause) - case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher) - case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher) - case Suspend() ⇒ doSuspend() - case Resume(inRespToFailure) ⇒ doResume(inRespToFailure) - case Terminate() ⇒ terminate() - case Supervise(child) ⇒ supervise(child) - case ChildTerminated(child) ⇒ handleChildTerminated(child) - case NoMessage ⇒ // only here to suppress warning - } - } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) + case NonFatal(e) ⇒ throw new ActorInitializationException(self, "exception during creation", e) } } - //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status - final def invoke(messageHandle: Envelope): Unit = try { - currentMessage = messageHandle - cancelReceiveTimeout() // FIXME: leave this here??? - messageHandle.message match { - case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - case msg ⇒ receiveMessage(msg) - } - currentMessage = null // reset current message after successful invocation - } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, e.getMessage) - } finally { - checkReceiveTimeout // Reschedule receive timeout - } - - final def handleInvokeFailure(t: Throwable, message: String): Unit = { - publish(Error(t, self.path.toString, clazz(actor), message)) - // prevent any further messages to be processed until the actor has been restarted - if (!currentlyFailed) { - // suspend self; these two must happen “atomically” - try suspendNonRecursive() - finally setFailed() - // suspend children - val skip: Set[ActorRef] = currentMessage match { - case Envelope(Failed(`t`), child) ⇒ Set(child) - case _ ⇒ Set.empty - } - childrenRefs.suspendChildren(skip) - // tell supervisor - t match { // Wrap InterruptedExceptions and rethrow - case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t)), self); throw t - case _ ⇒ parent.tell(Failed(t), self) - } - } - } - - def autoReceiveMessage(msg: Envelope): Unit = { - if (system.settings.DebugAutoReceive) - publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) - - msg.message match { - case Failed(cause) ⇒ handleFailure(sender, cause) - case t: Terminated ⇒ watching -= t.actor; receiveMessage(t) - case Kill ⇒ throw new ActorKilledException("Kill") - case PoisonPill ⇒ self.stop() - case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ for (c ← childrenRefs getByName name) c.child.tell(m, msg.sender) - case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) - } - } - - final def receiveMessage(msg: Any): Unit = { - //FIXME replace with behaviorStack.head.applyOrElse(msg, unhandled) + "-optimize" - val head = behaviorStack.head - if (head.isDefinedAt(msg)) head.apply(msg) else actor.unhandled(msg) - } - - private def doTerminate() { - val a = actor - try if (a ne null) a.postStop() - finally try dispatcher.detach(this) - finally try parent.sendSystemMessage(ChildTerminated(self)) - finally try - if (!watchedBy.isEmpty) { - val terminated = Terminated(self)(existenceConfirmed = true) - try { - watchedBy foreach { - watcher ⇒ - try watcher.tell(terminated, self) catch { - case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(a), "deathwatch")) - } - } - } finally watchedBy = emptyActorRefSet - } - finally try - if (!watching.isEmpty) { - try { - watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - case watchee: InternalActorRef ⇒ try watchee.sendSystemMessage(Unwatch(watchee, self)) catch { - case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(a), "deathwatch")) - } - } - } finally watching = emptyActorRefSet - } - finally { - if (system.settings.DebugLifecycle) - publish(Debug(self.path.toString, clazz(a), "stopped")) - behaviorStack = emptyBehaviorStack - clearActorFields(a) - actor = null - } - } - - private def doRecreate(cause: Throwable, failedActor: Actor): Unit = try { - // must happen “atomically” - try resumeNonRecursive() - finally setNotFailed() - - val survivors = children - - val freshActor = newActor() - actor = freshActor // this must happen before postRestart has a chance to fail - if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. - - freshActor.postRestart(cause) - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted")) - - // only after parent is up and running again do restart the children which were not stopped - survivors foreach (child ⇒ - try child.asInstanceOf[InternalActorRef].restart(cause) - catch { - case NonFatal(e) ⇒ publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) - }) - } catch { - case NonFatal(e) ⇒ - clearActorFields(actor) // in order to prevent preRestart() from happening again - handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage) - } - - final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.getByRef(child) match { - case Some(stats) ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.stats)) throw cause - case None ⇒ publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) - } - - final def handleChildTerminated(child: ActorRef): Unit = try { - childrenRefs match { - case TerminatingChildrenContainer(_, _, reason) ⇒ - val n = removeChild(this, child) - actor.supervisorStrategy.handleChildTerminated(this, child, children) - if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match { - case Recreation(cause) ⇒ doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate" - case Termination ⇒ doTerminate() - case _ ⇒ - } - case _ ⇒ - removeChild(this, child) - actor.supervisorStrategy.handleChildTerminated(this, child, children) - } - } catch { - case NonFatal(e) ⇒ handleInvokeFailure(e, "handleChildTerminated failed") + private def supervise(child: ActorRef): Unit = if (!isTerminating) { + addChild(child) + handleSupervise(child) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) } + // future extension point protected def handleSupervise(child: ActorRef): Unit = child match { case r: RepointableActorRef ⇒ r.activate() case _ ⇒ } - final def checkReceiveTimeout() { - val recvtimeout = receiveTimeoutData - if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) { - recvtimeout._2.cancel() //Cancel any ongoing future - //Only reschedule if desired and there are currently no more messages to be processed - receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, ReceiveTimeout)) - } else cancelReceiveTimeout() - - } - - final def cancelReceiveTimeout(): Unit = - if (receiveTimeoutData._2 ne emptyCancellable) { - receiveTimeoutData._2.cancel() - receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable) - } - - final def clearActorFields(actorInstance: Actor): Unit = { + final protected def clearActorFields(actorInstance: Actor): Unit = { setActorFields(actorInstance, context = null, self = system.deadLetters) currentMessage = null + behaviorStack = emptyBehaviorStack } - final def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef) { + final protected def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef) { @tailrec def lookupAndSetField(clazz: Class[_], actor: Actor, name: String, value: Any): Boolean = { val success = try { @@ -839,8 +468,8 @@ private[akka] class ActorCell( } // logging is not the main purpose, and if it fails there’s nothing we can do - private final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) ⇒ } + protected final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) ⇒ } - private final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass + protected final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 0d360f4559..6cbd821af4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -307,7 +307,7 @@ private[akka] class LocalActorRef private[akka] ( * to inject “synthetic” actor paths like “/temp”. */ protected def getSingleChild(name: String): InternalActorRef = - actorCell.childrenRefs.getByName(name) match { + actorCell.getChildByName(name) match { case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef] case None ⇒ Nobody } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 4f5eeb41a7..c155f9d092 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -15,6 +15,7 @@ import akka.util._ import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap } import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.cell.ChildrenContainer object ActorSystem { diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 0be24eee51..61a28f3ccb 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -230,7 +230,8 @@ abstract class SupervisorStrategy { /** * This method is called after the child has been removed from the set of children. - * It does not need to do anything special. + * It does not need to do anything special. Exceptions thrown from this method + * do NOT make the actor fail if this happens during termination. */ def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index d7493e42dc..b08c7adbee 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -16,6 +16,7 @@ import akka.dispatch.MessageDispatcher import java.util.concurrent.locks.ReentrantLock import akka.event.Logging.Warning import scala.collection.mutable.Queue +import akka.actor.cell.ChildrenContainer /** * This actor ref starts out with some dummy cell (by default just enqueuing @@ -102,7 +103,7 @@ private[akka] class RepointableActorRef( case ".." ⇒ getParent.getChild(name) case "" ⇒ getChild(name) case other ⇒ - underlying.childrenRefs.getByName(other) match { + underlying.getChildByName(other) match { case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) case None ⇒ Nobody } @@ -176,6 +177,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep def isTerminated: Boolean = false def parent: InternalActorRef = supervisor def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer + def getChildByName(name: String): Option[ChildRestartStats] = None def tell(message: Any, sender: ActorRef): Unit = { lock.lock() try { diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala new file mode 100644 index 0000000000..07871ed6d2 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -0,0 +1,181 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.cell + +import scala.annotation.tailrec +import scala.collection.JavaConverters.asJavaIterableConverter +import akka.actor.{ RepointableRef, Props, InternalActorRef, ActorRef, ActorCell } +import akka.util.{ Unsafe, Helpers } +import akka.actor.InvalidActorNameException +import akka.serialization.SerializationExtension +import akka.util.NonFatal +import akka.actor.NoSerializationVerificationNeeded +import akka.actor.ActorPath +import akka.actor.ChildRestartStats +import akka.actor.ChildRestartStats +import akka.actor.ChildRestartStats +import akka.actor.ChildRestartStats + +trait Children { this: ActorCell ⇒ + + import ChildrenContainer._ + + @volatile + private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer + + def childrenRefs: ChildrenContainer = + Unsafe.instance.getObjectVolatile(this, AbstractActorCell.childrenOffset).asInstanceOf[ChildrenContainer] + + final def children: Iterable[ActorRef] = childrenRefs.children + final def getChildren(): java.lang.Iterable[ActorRef] = children.asJava + + def actorOf(props: Props): ActorRef = makeChild(this, props, randomName(), async = false) + def actorOf(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = false) + private[akka] def attachChild(props: Props): ActorRef = makeChild(this, props, randomName(), async = true) + private[akka] def attachChild(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = true) + + @volatile private var _nextNameDoNotCallMeDirectly = 0L + final protected def randomName(): String = { + @tailrec def inc(): Long = { + val current = Unsafe.instance.getLongVolatile(this, AbstractActorCell.nextNameOffset) + if (Unsafe.instance.compareAndSwapLong(this, AbstractActorCell.nextNameOffset, current, current + 1)) current + else inc() + } + Helpers.base64(inc()) + } + + final def stop(actor: ActorRef): Unit = { + val started = actor match { + case r: RepointableRef ⇒ r.isStarted + case _ ⇒ true + } + if (childrenRefs.getByRef(actor).isDefined && started) shallDie(actor) + actor.asInstanceOf[InternalActorRef].stop() + } + + /* + * low level CAS helpers + */ + + @inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = + Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren) + + @tailrec final protected def reserveChild(name: String): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.reserve(name)) || reserveChild(name) + } + + @tailrec final protected def unreserveChild(name: String): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name) + } + + final protected def addChild(ref: ActorRef): Boolean = { + @tailrec def rec(): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.add(ref)) || rec() + } + if (childrenRefs.getByRef(ref).isEmpty) rec() else false + } + + @tailrec final protected def shallDie(ref: ActorRef): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) + } + + @tailrec final private def removeChild(ref: ActorRef): ChildrenContainer = { + val c = childrenRefs + val n = c.remove(ref) + if (swapChildrenRefs(c, n)) n + else removeChild(ref) + } + + @tailrec final protected def setChildrenTerminationReason(reason: ChildrenContainer.SuspendReason): Boolean = { + childrenRefs match { + case c: ChildrenContainer.TerminatingChildrenContainer ⇒ + swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason) + case _ ⇒ false + } + } + + /* + * ActorCell-internal API + */ + + protected def isNormal = childrenRefs.isNormal + + protected def isTerminating = childrenRefs.isTerminating + + protected def suspendChildren(skip: Set[ActorRef] = Set.empty): Unit = + childrenRefs.stats collect { + case ChildRestartStats(child, _, _) if !(skip contains child) ⇒ child + } foreach (_.asInstanceOf[InternalActorRef].suspend()) + + protected def resumeChildren(): Unit = + childrenRefs.stats foreach (_.child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = false)) + + def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name) + + protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref) + + protected def getAllChildStats: Iterable[ChildRestartStats] = childrenRefs.stats + + protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = { + childrenRefs match { + case TerminatingChildrenContainer(_, _, reason) ⇒ + val n = removeChild(child) + if (!n.isInstanceOf[TerminatingChildrenContainer]) Some(reason) else None + case _ ⇒ removeChild(child); None + } + } + + /* + * Private helpers + */ + + private def checkName(name: String): String = { + import ActorPath.ElementRegex + name match { + case null ⇒ throw new InvalidActorNameException("actor name must not be null") + case "" ⇒ throw new InvalidActorNameException("actor name must not be empty") + case ElementRegex() ⇒ name + case _ ⇒ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex) + } + } + + private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean): ActorRef = { + if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { + val ser = SerializationExtension(cell.system) + ser.serialize(props.creator) match { + case Left(t) ⇒ throw t + case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass) match { + case Left(t) ⇒ throw t + case _ ⇒ //All good + } + } + } + /* + * in case we are currently terminating, fail external attachChild requests + * (internal calls cannot happen anyway because we are suspended) + */ + if (cell.childrenRefs.isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated") + else { + reserveChild(name) + // this name will either be unreserved or overwritten with a real child below + val actor = + try { + cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name, + systemService = false, deploy = None, lookupDeploy = true, async = async) + } catch { + case NonFatal(e) ⇒ + unreserveChild(name) + throw e + } + addChild(actor) + actor + } + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/ChildrenContainer.scala b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala similarity index 67% rename from akka-actor/src/main/scala/akka/actor/ChildrenContainer.scala rename to akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala index 692dd3ba07..641be552e5 100644 --- a/akka-actor/src/main/scala/akka/actor/ChildrenContainer.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala @@ -2,13 +2,24 @@ * Copyright (C) 2009-2012 Typesafe Inc. */ -package akka.actor +package akka.actor.cell import scala.collection.immutable.TreeMap import scala.annotation.tailrec import akka.util.Unsafe import akka.serialization.SerializationExtension import akka.util.NonFatal +import akka.actor.ActorPath.ElementRegex +import akka.actor.ActorCell +import akka.actor.ActorRef +import akka.actor.ChildNameReserved +import akka.actor.ChildRestartStats +import akka.actor.ChildStats +import akka.actor.InternalActorRef +import akka.actor.InvalidActorNameException +import akka.actor.NoSerializationVerificationNeeded +import akka.actor.Props +import scala.annotation.tailrec /** * INTERNAL API @@ -33,13 +44,6 @@ private[akka] trait ChildrenContainer { def isTerminating: Boolean = false def isNormal: Boolean = true - - def suspendChildren(skip: Set[ActorRef] = Set.empty): Unit = - stats collect { - case ChildRestartStats(child, _, _) if !(skip contains child) ⇒ child - } foreach (_.asInstanceOf[InternalActorRef].suspend()) - - def resumeChildren(): Unit = stats foreach (_.child.asInstanceOf[InternalActorRef].suspend()) } /** @@ -50,88 +54,6 @@ private[akka] trait ChildrenContainer { */ private[akka] object ChildrenContainer { - // low level CAS helpers - @inline private def swapChildrenRefs(cell: ActorCell, oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = - Unsafe.instance.compareAndSwapObject(cell, AbstractActorCell.childrenOffset, oldChildren, newChildren) - - @tailrec final def reserveChild(cell: ActorCell, name: String): Boolean = { - val c = cell.childrenRefs - swapChildrenRefs(cell, c, c.reserve(name)) || reserveChild(cell, name) - } - - @tailrec final def unreserveChild(cell: ActorCell, name: String): Boolean = { - val c = cell.childrenRefs - swapChildrenRefs(cell, c, c.unreserve(name)) || unreserveChild(cell, name) - } - - @tailrec final def addChild(cell: ActorCell, ref: ActorRef): Boolean = { - val c = cell.childrenRefs - swapChildrenRefs(cell, c, c.add(ref)) || addChild(cell, ref) - } - - @tailrec final def shallDie(cell: ActorCell, ref: ActorRef): Boolean = { - val c = cell.childrenRefs - swapChildrenRefs(cell, c, c.shallDie(ref)) || shallDie(cell, ref) - } - - @tailrec final def removeChild(cell: ActorCell, ref: ActorRef): ChildrenContainer = { - val c = cell.childrenRefs - val n = c.remove(ref) - if (swapChildrenRefs(cell, c, n)) n - else removeChild(cell, ref) - } - - @tailrec final def setChildrenTerminationReason(cell: ActorCell, reason: ChildrenContainer.SuspendReason): Boolean = { - cell.childrenRefs match { - case c: ChildrenContainer.TerminatingChildrenContainer ⇒ - swapChildrenRefs(cell, c, c.copy(reason = reason)) || setChildrenTerminationReason(cell, reason) - case _ ⇒ false - } - } - - def checkName(name: String): String = { - import ActorPath.ElementRegex - name match { - case null ⇒ throw new InvalidActorNameException("actor name must not be null") - case "" ⇒ throw new InvalidActorNameException("actor name must not be empty") - case ElementRegex() ⇒ name - case _ ⇒ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex) - } - } - - def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean): ActorRef = { - if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { - val ser = SerializationExtension(cell.system) - ser.serialize(props.creator) match { - case Left(t) ⇒ throw t - case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass) match { - case Left(t) ⇒ throw t - case _ ⇒ //All good - } - } - } - /* - * in case we are currently terminating, fail external attachChild requests - * (internal calls cannot happen anyway because we are suspended) - */ - if (cell.childrenRefs.isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated") - else { - reserveChild(cell, name) - // this name will either be unreserved or overwritten with a real child below - val actor = - try { - cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name, - systemService = false, deploy = None, lookupDeploy = true, async = async) - } catch { - case NonFatal(e) ⇒ - unreserveChild(cell, name) - throw e - } - addChild(cell, actor) - actor - } - } - sealed trait SuspendReason case object UserRequest extends SuspendReason case class Recreation(cause: Throwable) extends SuspendReason diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala new file mode 100644 index 0000000000..e65cd134fe --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -0,0 +1,93 @@ +package akka.actor.cell + +import akka.actor.{ InternalActorRef, ActorRef, ActorCell } +import akka.dispatch.{ Watch, Unwatch } +import akka.event.Logging._ +import akka.util.NonFatal +import akka.actor.Terminated +import akka.actor.Actor + +trait DeathWatch { this: ActorCell ⇒ + + private var watching: Set[ActorRef] = ActorCell.emptyActorRefSet + private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet + + override final def watch(subject: ActorRef): ActorRef = subject match { + case a: InternalActorRef ⇒ + if (a != self && !watching.contains(a)) { + a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching += a + } + a + } + + override final def unwatch(subject: ActorRef): ActorRef = subject match { + case a: InternalActorRef ⇒ + if (a != self && watching.contains(a)) { + a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching -= a + } + a + } + + protected def watchedActorTerminated(ref: ActorRef): Unit = watching -= ref + + protected def tellWatchersWeDied(actor: Actor): Unit = { + if (!watchedBy.isEmpty) { + val terminated = Terminated(self)(existenceConfirmed = true) + try { + watchedBy foreach { + watcher ⇒ + try watcher.tell(terminated, self) catch { + case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(actor), "deathwatch")) + } + } + } finally watchedBy = ActorCell.emptyActorRefSet + } + } + + protected def unwatchWatchedActors(actor: Actor): Unit = { + if (!watching.isEmpty) { + try { + watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + case watchee: InternalActorRef ⇒ try watchee.sendSystemMessage(Unwatch(watchee, self)) catch { + case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(actor), "deathwatch")) + } + } + } finally watching = ActorCell.emptyActorRefSet + } + } + + protected def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { + val watcheeSelf = watchee == self + val watcherSelf = watcher == self + + if (watcheeSelf && !watcherSelf) { + if (!watchedBy.contains(watcher)) { + watchedBy += watcher + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher)) + } + } else if (!watcheeSelf && watcherSelf) { + watch(watchee) + } else { + publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self))) + } + } + + protected def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { + val watcheeSelf = watchee == self + val watcherSelf = watcher == self + + if (watcheeSelf && !watcherSelf) { + if (watchedBy.contains(watcher)) { + watchedBy -= watcher + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher)) + } + } else if (!watcheeSelf && watcherSelf) { + unwatch(watchee) + } else { + publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self))) + } + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala new file mode 100644 index 0000000000..7cffefba2e --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala @@ -0,0 +1,81 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.cell + +import scala.annotation.tailrec +import akka.actor.ActorRef +import akka.dispatch.SystemMessage +import akka.util.Unsafe +import akka.dispatch.MessageDispatcher +import akka.dispatch.Suspend +import akka.dispatch.Recreate +import akka.actor.ActorCell +import akka.dispatch.Terminate +import akka.dispatch.Envelope +import akka.dispatch.Resume +import akka.dispatch.Mailbox +import akka.dispatch.Create + +trait Dispatch { this: ActorCell ⇒ + + @volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status + + @inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, AbstractActorCell.mailboxOffset).asInstanceOf[Mailbox] + + @tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = { + val oldMailbox = mailbox + if (!Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox) + else oldMailbox + } + + final def hasMessages: Boolean = mailbox.hasMessages + + final def numberOfMessages: Int = mailbox.numberOfMessages + + val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) + + /** + * UntypedActorContext impl + */ + final def getDispatcher(): MessageDispatcher = dispatcher + + final def isTerminated: Boolean = mailbox.isClosed + + final def start(): this.type = { + + /* + * Create the mailbox and enqueue the Create() message to ensure that + * this is processed before anything else. + */ + swapMailbox(dispatcher.createMailbox(this)) + mailbox.setActor(this) + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + mailbox.systemEnqueue(self, Create()) + + // This call is expected to start off the actor by scheduling its mailbox. + dispatcher.attach(this) + + this + } + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + final def resume(inResponseToFailure: Boolean): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure)) + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) + + def tell(message: Any, sender: ActorRef): Unit = + dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) + + override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message) + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala new file mode 100644 index 0000000000..1ef83575ce --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -0,0 +1,184 @@ +package akka.actor.cell + +import akka.actor.PreRestartException +import akka.actor.ActorCell +import akka.util.NonFatal +import akka.actor.ActorRef +import akka.actor.PostRestartException +import akka.actor.Actor +import akka.dispatch.Envelope +import akka.dispatch.ChildTerminated +import akka.actor.Failed +import akka.actor.InternalActorRef +import akka.event.Logging._ +import akka.actor.ActorInterruptedException + +trait FaultHandling { this: ActorCell ⇒ + + /* ================= + * T H E R U L E S + * ================= + * + * Actors can be suspended for two reasons: + * - they fail + * - their supervisor gets suspended + * + * In particular they are not suspended multiple times because of cascading + * own failures, i.e. while currentlyFailed() they do not fail again. In case + * of a restart, failures in constructor/preStart count as new failures. + */ + + private def suspendNonRecursive(): Unit = dispatcher suspend this + + private def resumeNonRecursive(): Unit = dispatcher resume this + + /* + * have we told our supervisor that we Failed() and have not yet heard back? + * (actually: we might have heard back but not yet acted upon it, in case of + * a restart with dying children) + * might well be replaced by ref to a Cancellable in the future (see #2299) + */ + private var _failed = false + private def currentlyFailed: Boolean = _failed + private def setFailed(): Unit = _failed = true + private def setNotFailed(): Unit = _failed = false + + protected def faultRecreate(cause: Throwable): Unit = + if (isNormal) { + val failedActor = actor + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting")) + if (failedActor ne null) { + try { + // if the actor fails in preRestart, we can do nothing but log it: it’s best-effort + if (failedActor.context ne null) failedActor.preRestart(cause, Option(currentMessage)) + } catch { + case NonFatal(e) ⇒ + val ex = new PreRestartException(self, e, cause, Option(currentMessage)) + publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage)) + } finally { + clearActorFields(failedActor) + } + } + assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status) + if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor) + } else { + // need to keep that suspend counter balanced + faultResume(inResponseToFailure = false) + } + + protected def faultSuspend(): Unit = { + // done always to keep that suspend counter balanced + suspendNonRecursive() + suspendChildren() + } + + protected def faultResume(inResponseToFailure: Boolean): Unit = { + // done always to keep that suspend counter balanced + // must happen “atomically” + try resumeNonRecursive() + finally if (inResponseToFailure) setNotFailed() + resumeChildren() + } + + protected def terminate() { + setReceiveTimeout(None) + cancelReceiveTimeout + + // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) + children foreach stop + + if (setChildrenTerminationReason(ChildrenContainer.Termination)) { + // do not process normal messages while waiting for all children to terminate + suspendNonRecursive() + // do not propagate failures during shutdown to the supervisor + setFailed() + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping")) + } else finishTerminate() + } + + final def handleInvokeFailure(t: Throwable, message: String): Unit = { + publish(Error(t, self.path.toString, clazz(actor), message)) + // prevent any further messages to be processed until the actor has been restarted + if (!currentlyFailed) try { + suspendNonRecursive() + setFailed() + // suspend children + val skip: Set[ActorRef] = currentMessage match { + case Envelope(Failed(`t`), child) ⇒ Set(child) + case _ ⇒ Set.empty + } + suspendChildren(skip) + // tell supervisor + t match { // Wrap InterruptedExceptions and rethrow + case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t)), self); throw t + case _ ⇒ parent.tell(Failed(t), self) + } + } catch { + case NonFatal(e) ⇒ + publish(Error(e, self.path.toString, clazz(actor), "emergency stop: exception in failure handling")) + try children foreach stop + finally finishTerminate() + } + } + + private def finishTerminate() { + val a = actor + try if (a ne null) a.postStop() + finally try dispatcher.detach(this) + finally try parent.sendSystemMessage(ChildTerminated(self)) + finally try tellWatchersWeDied(a) + finally try unwatchWatchedActors(a) + finally { + if (system.settings.DebugLifecycle) + publish(Debug(self.path.toString, clazz(a), "stopped")) + clearActorFields(a) + actor = null + } + } + + private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = try { + try resumeNonRecursive() + finally setNotFailed() // must happen in any case, so that failure is propagated + + val survivors = children + + val freshActor = newActor() + actor = freshActor // this must happen before postRestart has a chance to fail + if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. + + freshActor.postRestart(cause) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted")) + + // only after parent is up and running again do restart the children which were not stopped + survivors foreach (child ⇒ + try child.asInstanceOf[InternalActorRef].restart(cause) + catch { + case NonFatal(e) ⇒ publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) + }) + } catch { + case NonFatal(e) ⇒ + clearActorFields(actor) // in order to prevent preRestart() from happening again + handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage) + } + + final protected def handleFailure(child: ActorRef, cause: Throwable): Unit = getChildByRef(child) match { + case Some(stats) ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause + case None ⇒ publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) + } + + final protected def handleChildTerminated(child: ActorRef): Unit = try { + val status = removeChildAndGetStateChange(child) + actor.supervisorStrategy.handleChildTerminated(this, child, children) + /* + * if the removal changed the state of the (terminating) children container, + * then we are continuing the previously suspended recreate/terminate action + */ + status match { + case Some(ChildrenContainer.Recreation(cause)) ⇒ finishRecreate(cause, actor) + case Some(ChildrenContainer.Termination) ⇒ finishTerminate() + case _ ⇒ + } + } catch { + case NonFatal(e) ⇒ handleInvokeFailure(e, "handleChildTerminated failed") + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala b/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala new file mode 100644 index 0000000000..960372b488 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.cell + +import ReceiveTimeout.emptyReceiveTimeoutData +import akka.actor.ActorCell +import akka.actor.ActorCell.emptyCancellable +import akka.actor.Cancellable +import akka.util.Duration + +object ReceiveTimeout { + final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, ActorCell.emptyCancellable) +} + +trait ReceiveTimeout { this: ActorCell ⇒ + + import ReceiveTimeout._ + import ActorCell._ + + private var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData + + final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match { + case Duration.Undefined ⇒ None + case duration ⇒ Some(duration) + } + + final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined)) + + final def setReceiveTimeout(timeout: Duration): Unit = + receiveTimeoutData = ( + if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout, + receiveTimeoutData._2) + + final def resetReceiveTimeout(): Unit = setReceiveTimeout(None) + + final def checkReceiveTimeout() { + val recvtimeout = receiveTimeoutData + if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) { + recvtimeout._2.cancel() //Cancel any ongoing future + //Only reschedule if desired and there are currently no more messages to be processed + receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, akka.actor.ReceiveTimeout)) + } else cancelReceiveTimeout() + + } + + final def cancelReceiveTimeout(): Unit = + if (receiveTimeoutData._2 ne emptyCancellable) { + receiveTimeoutData._2.cancel() + receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable) + } + +} \ No newline at end of file