Merge branch 'wip-2212-recursive-resume-∂π' into wip-scala210M5-√

only merged so that it compiles, will clean up warnings in the following
commits
This commit is contained in:
Roland 2012-07-23 14:17:45 +02:00
commit dca8d15c68
27 changed files with 1568 additions and 801 deletions

View file

@ -118,6 +118,30 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS
expectNoMsg(1 seconds)
system.stop(supervisor)
}
"clear the behavior stack upon restart" in {
case class Become(recv: ActorContext Receive)
val a = system.actorOf(Props(new Actor {
def receive = {
case Become(beh) context.become(beh(context), discardOld = false); sender ! "ok"
case x sender ! 42
}
}))
a ! "hello"
expectMsg(42)
a ! Become(ctx {
case "fail" throw new RuntimeException("buh")
case x ctx.sender ! 43
})
expectMsg("ok")
a ! "hello"
expectMsg(43)
EventFilter[RuntimeException]("buh", occurrences = 1) intercept {
a ! "fail"
}
a ! "hello"
expectMsg(42)
}
}
}

View file

@ -147,7 +147,7 @@ object FSMTimingSpec {
}
def resume(actorRef: ActorRef): Unit = actorRef match {
case l: ActorRefWithCell l.resume()
case l: ActorRefWithCell l.resume(inResponseToFailure = false)
case _
}

View file

@ -4,14 +4,24 @@
package akka.actor
import language.postfixOps
import akka.testkit._
import java.util.concurrent.{ TimeUnit, CountDownLatch }
import scala.concurrent.Await
import akka.pattern.ask
import scala.concurrent.util.Duration
import scala.concurrent.util.duration._
import scala.concurrent.util.duration.intToDurationInt
import scala.math.BigInt.int2bigInt
import scala.util.Random
import scala.util.control.NoStackTrace
import com.typesafe.config.{ ConfigFactory, Config }
import SupervisorStrategy.{ Resume, Restart, Directive }
import akka.actor.SupervisorStrategy.seqThrowable2Decider
import akka.dispatch.{ MessageDispatcher, DispatcherPrerequisites, DispatcherConfigurator, Dispatcher }
import akka.pattern.ask
import akka.testkit.{ ImplicitSender, EventFilter, DefaultTimeout, AkkaSpec }
import akka.testkit.{ filterException, duration2TestDuration, TestLatch }
import akka.testkit.TestEvent.Mute
object SupervisorHierarchySpec {
class FireWorkerException(msg: String) extends Exception(msg)
@ -31,10 +41,361 @@ object SupervisorHierarchySpec {
countDown.countDown()
}
}
class Resumer extends Actor {
override def supervisorStrategy = OneForOneStrategy() { case _ SupervisorStrategy.Resume }
def receive = {
case "spawn" sender ! context.actorOf(Props[Resumer])
case "fail" throw new Exception("expected")
case "ping" sender ! "pong"
}
}
case class Event(msg: Any) { val time: Long = System.nanoTime }
case class ErrorLog(msg: String, log: Vector[Event])
case class Failure(directive: Directive, log: Vector[Event]) extends RuntimeException with NoStackTrace {
override def toString = "Failure(" + directive + ")"
}
val strategy = OneForOneStrategy() { case Failure(directive, _) directive }
val config = ConfigFactory.parseString("""
hierarchy {
type = "akka.actor.SupervisorHierarchySpec$MyDispatcherConfigurator"
}
akka.loglevel = INFO
akka.actor.debug.fsm = on
""")
class MyDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends DispatcherConfigurator(config, prerequisites) {
private val instance: MessageDispatcher =
new Dispatcher(prerequisites,
config.getString("id"),
config.getInt("throughput"),
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
mailboxType,
configureExecutor(),
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) {
override def suspend(cell: ActorCell): Unit = {
val a = cell.actor.asInstanceOf[Hierarchy]
a.log :+= Event("suspended")
super.suspend(cell)
}
override def resume(cell: ActorCell): Unit = {
val a = cell.actor.asInstanceOf[Hierarchy]
a.log :+= Event("resumed")
super.resume(cell)
}
}
override def dispatcher(): MessageDispatcher = instance
}
class Hierarchy(depth: Int, breadth: Int, listener: ActorRef) extends Actor {
override def preStart {
if (depth > 1)
for (_ 1 to breadth)
context.watch(context.actorOf(Props(new Hierarchy(depth - 1, breadth, listener)).withDispatcher("hierarchy")))
listener ! self
}
override def postRestart(cause: Throwable) {
cause match {
case Failure(_, l) log = l
}
log :+= Event("restarted")
}
override def supervisorStrategy = strategy
override def preRestart(cause: Throwable, msg: Option[Any]): Unit = {
// do not scrap children
}
override def postStop {
if (failed || suspended) {
listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log)
}
}
var failed = false
var suspended = false
var log = Vector.empty[Event]
def check(msg: Any) = {
suspended = false
log :+= Event(msg)
if (failed) {
listener ! ErrorLog("processing message while failed", log)
failed = false
context stop self
}
}
def receive = new Receive {
val handler: Receive = {
case f @ Failure(Resume, _) suspended = true; throw f.copy(log = log)
case f: Failure failed = true; throw f.copy(log = log)
case "ping" Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong"
case Terminated(_) listener ! ErrorLog("terminating", log); context stop self
}
override def isDefinedAt(msg: Any) = handler.isDefinedAt(msg)
override def apply(msg: Any) = { check(msg); handler(msg) }
}
}
case class Work(n: Int)
sealed trait Action
case class Ping(ref: ActorRef) extends Action
case class Fail(ref: ActorRef, directive: Directive) extends Action
sealed trait State
case object Idle extends State
case object Init extends State
case object Stress extends State
case object Finishing extends State
case object LastPing extends State
case object Stopping extends State
case object Failed extends State
/*
* This stress test will construct a supervision hierarchy of configurable
* depth and breadth and then randomly fail and check its actors. The actors
* perform certain checks internally (verifying that they do not run when
* suspended, for example), and they are checked for health by the test
* procedure.
*
* Execution happens in phases (which is the reason for FSM):
*
* Idle:
* - upon reception of Init message, construct hierary and go to Init state
*
* Init:
* - receive refs of all contained actors
*
* Stress:
* - deal out actions (Fail or "ping"), keeping the hierarchy busy
* - whenever all actors are in the "pinged" list (i.e. have not yet
* answered with a "pong"), delay processing of the next Work() by
* 100 millis
* - when receiving a Work() while all actors are "pinged", stop the
* hierarchy and go to the Stopping state
*
* Finishing:
* - after dealing out the last action, wait for the outstanding "pong"
* messages
* - when last "pong" is received, goto LastPing state
* - upon state timeout, stop the hierarchy and go to the Failed state
*
* LastPing:
* - upon entering this state, send a "ping" to all actors
* - when last "pong" is received, goto Stopping state
* - upon state timeout, stop the hierarchy and go to the Failed state
*
* Stopping:
* - upon entering this state, stop the hierarchy
* - upon termination of the hierarchy send back successful result
*
* Whenever an ErrorLog is received, goto Failed state
*
* Failed:
* - accumulate ErrorLog messages
* - upon termination of the hierarchy send back failed result and print
* the logs, merged and in chronological order.
*
* TODO RK: also test Stop directive, and keep a complete list of all
* actors ever created, then verify after stop()ping the hierarchy that
* all are terminated, transfer them to a WeakHashMap and verify that
* they are indeed GCed
*
* TODO RK: make hierarchy construction stochastic so that it includes
* different breadth (including the degenerate breadth-1 case).
*
* TODO RK: also test Escalate by adding an exception with a `var depth`
* which gets decremented within the supervisor and gets handled when zero
* is reached (Restart resolution)
*
* TODO RK: also test exceptions during recreate
*
* TODO RK: also test recreate including terminating children
*
* TODO RK: also verify that preRestart is not called more than once per instance
*/
class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Null] {
import context.system
override def supervisorStrategy = strategy
var children = Vector.empty[ActorRef]
var idleChildren = Vector.empty[ActorRef]
var pingChildren = Set.empty[ActorRef]
val nextJob = Iterator.continually(Random.nextFloat match {
case x if x >= 0.5
// ping one child
val pick = ((x - 0.5) * 2 * idleChildren.size).toInt
val ref = idleChildren(pick)
idleChildren = idleChildren.take(pick) ++ idleChildren.drop(pick + 1)
pingChildren += ref
Ping(ref)
case x
// fail one child
val pick = ((if (x > 0.25) x - 0.25 else x) * 4 * children.size).toInt
Fail(children(pick), if (x > 0.25) Restart else Resume)
})
val familySize = ((1 - BigInt(breadth).pow(depth)) / (1 - breadth)).toInt
var hierarchy: ActorRef = _
override def preRestart(cause: Throwable, msg: Option[Any]) {
throw new ActorKilledException("I want to DIE")
}
override def postRestart(cause: Throwable) {
throw new ActorKilledException("I said I wanted to DIE, dammit!")
}
override def postStop {
testActor ! "stressTestStopped"
}
startWith(Idle, null)
when(Idle) {
case Event(Init, _)
hierarchy = context.watch(context.actorOf(Props(new Hierarchy(depth, breadth, self)).withDispatcher("hierarchy")))
setTimer("phase", StateTimeout, 5 seconds, false)
goto(Init)
}
when(Init) {
case Event(ref: ActorRef, _)
if (idleChildren.nonEmpty || pingChildren.nonEmpty)
throw new IllegalStateException("received unexpected child " + children.size)
children :+= ref
if (children.size == familySize) {
idleChildren = children
goto(Stress)
} else stay
case Event(StateTimeout, _)
testActor ! "only got %d out of %d refs".format(children.size, familySize)
stop()
}
onTransition {
case Init -> Stress
self ! Work(familySize * 1000)
// set timeout for completion of the whole test (i.e. including Finishing and Stopping)
setTimer("phase", StateTimeout, 60 seconds, false)
}
val workSchedule = 250.millis
when(Stress) {
case Event(w: Work, _) if idleChildren.isEmpty
context stop hierarchy
goto(Failed)
case Event(Work(x), _) if x > 0
nextJob.next match {
case Ping(ref) ref ! "ping"
case Fail(ref, dir) ref ! Failure(dir, Vector.empty)
}
if (idleChildren.nonEmpty) self ! Work(x - 1)
else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1))
stay
case Event(Work(_), _) if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing)
case Event("pong", _)
pingChildren -= sender
idleChildren :+= sender
stay
}
when(Finishing) {
case Event("pong", _)
pingChildren -= sender
idleChildren :+= sender
if (pingChildren.isEmpty) goto(LastPing) else stay
}
onTransition {
case _ -> LastPing
idleChildren foreach (_ ! "ping")
pingChildren ++= idleChildren
idleChildren = Vector.empty
}
when(LastPing) {
case Event("pong", _)
pingChildren -= sender
idleChildren :+= sender
if (pingChildren.isEmpty) goto(Stopping) else stay
}
onTransition {
case _ -> Stopping context stop hierarchy
}
when(Stopping, stateTimeout = 5 seconds) {
case Event(Terminated(r), _) if r == hierarchy
testActor ! "stressTestSuccessful"
stop
case Event(StateTimeout, _)
testActor ! "timeout in Stopping"
stop
}
var errors = Vector.empty[(ActorRef, ErrorLog)]
when(Failed, stateTimeout = 5 seconds) {
case Event(e: ErrorLog, _)
errors :+= sender -> e
stay
case Event(Terminated(r), _) if r == hierarchy
printErrors()
testActor ! "stressTestFailed"
stop
case Event(StateTimeout, _)
printErrors()
testActor ! "timeout in Failed"
stop
case Event("pong", _) stay // dont care?
}
def printErrors(): Unit = {
val merged = errors flatMap {
case (ref, ErrorLog(msg, log))
println(ref + " " + msg)
log map (l (l.time, ref, l.msg.toString))
}
merged.sorted foreach println
}
whenUnhandled {
case Event(e: ErrorLog, _)
errors :+= sender -> e
// dont stop the hierarchy, that is going to happen all by itself and in the right order
goto(Failed)
case Event(StateTimeout, _)
println("pingChildren:\n" + pingChildren.mkString("\n"))
context stop hierarchy
goto(Failed)
case Event(msg, _)
testActor ! ("received unexpected msg: " + msg)
stop
}
initialize
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) with DefaultTimeout with ImplicitSender {
import SupervisorHierarchySpec._
"A Supervisor Hierarchy" must {
@ -83,6 +444,67 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout {
assert(countDownMax.await(2, TimeUnit.SECONDS))
}
}
"resume children after Resume" in {
val boss = system.actorOf(Props[Resumer], "resumer")
boss ! "spawn"
val middle = expectMsgType[ActorRef]
middle ! "spawn"
val worker = expectMsgType[ActorRef]
worker ! "ping"
expectMsg("pong")
EventFilter[Exception]("expected", occurrences = 1) intercept {
middle ! "fail"
}
middle ! "ping"
expectMsg("pong")
worker ! "ping"
expectMsg("pong")
}
"suspend children while failing" in {
val latch = TestLatch()
val slowResumer = system.actorOf(Props(new Actor {
override def supervisorStrategy = OneForOneStrategy() { case _ Await.ready(latch, 4.seconds.dilated); SupervisorStrategy.Resume }
def receive = {
case "spawn" sender ! context.actorOf(Props[Resumer])
}
}), "slowResumer")
slowResumer ! "spawn"
val boss = expectMsgType[ActorRef]
boss ! "spawn"
val middle = expectMsgType[ActorRef]
middle ! "spawn"
val worker = expectMsgType[ActorRef]
worker ! "ping"
expectMsg("pong")
EventFilter[Exception]("expected", occurrences = 1) intercept {
boss ! "fail"
}
worker ! "ping"
expectNoMsg(2 seconds)
latch.countDown()
expectMsg("pong")
}
"survive being stressed" in {
system.eventStream.publish(Mute(EventFilter[Failure]()))
system.eventStream.publish(Mute(EventFilter.warning(start = "received dead letter")))
val fsm = system.actorOf(Props(new StressTest(testActor, 6, 3)), "stressTest")
fsm ! FSM.SubscribeTransitionCallBack(system.actorOf(Props(new Actor {
def receive = {
case s: FSM.CurrentState[_] log.info("{}", s)
case t: FSM.Transition[_] log.info("{}", t)
}
})))
fsm ! Init
expectMsg(70 seconds, "stressTestSuccessful")
expectMsg("stressTestStopped")
}
}
}

View file

@ -137,6 +137,9 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul
}))
parent ! "engage"
expectMsg("green")
EventFilter[IllegalStateException]("handleChildTerminated failed", occurrences = 1) intercept {
system.stop(parent)
}
}
}

View file

@ -341,7 +341,12 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 10 seconds)(classOf[Exception] :: Nil))))
val dyingProps = Props(new Actor {
if (inits.incrementAndGet % 2 == 0) throw new IllegalStateException("Don't wanna!")
val init = inits.getAndIncrement()
if (init % 3 == 1) throw new IllegalStateException("Don't wanna!")
override def preRestart(cause: Throwable, msg: Option[Any]) {
if (init % 3 == 0) throw new IllegalStateException("Don't wanna!")
}
def receive = {
case Ping sender ! PongMessage
@ -351,16 +356,20 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
throw e
}
})
val dyingActor = Await.result((supervisor ? dyingProps).mapTo[ActorRef], timeout.duration)
supervisor ! dyingProps
val dyingActor = expectMsgType[ActorRef]
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1),
EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) {
filterEvents(
EventFilter[RuntimeException]("Expected", occurrences = 1),
EventFilter[PreRestartException]("Don't wanna!", occurrences = 1),
EventFilter[PostRestartException]("Don't wanna!", occurrences = 1)) {
intercept[RuntimeException] {
Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout)
}
}
Await.result(dyingActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage
dyingActor ! Ping
expectMsg(PongMessage)
inits.get must be(3)

View file

@ -348,7 +348,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
assertNoCountDown(done, 1000, "Should not process messages while suspended")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1)
a.resume
a.resume(inResponseToFailure = false)
assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1,
suspensions = 1, resumes = 1)

View file

@ -65,7 +65,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit
val msgs = (1 to 100).toList
for (m msgs) actor ! m
actor.resume //Signal the actor to start treating it's message backlog
actor.resume(inResponseToFailure = false) //Signal the actor to start treating it's message backlog
Await.result(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse
}

View file

@ -2,8 +2,9 @@
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor;
package akka.actor.cell;
import akka.actor.ActorCell;
import akka.util.Unsafe;
final class AbstractActorCell {
@ -13,9 +14,9 @@ final class AbstractActorCell {
static {
try {
mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_mailboxDoNotCallMeDirectly"));
childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_childrenRefsDoNotCallMeDirectly"));
nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_nextNameDoNotCallMeDirectly"));
mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Dispatch$$_mailboxDoNotCallMeDirectly"));
childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_childrenRefsDoNotCallMeDirectly"));
nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_nextNameDoNotCallMeDirectly"));
} catch(Throwable t){
throw new ExceptionInInitializerError(t);
}

View file

@ -122,12 +122,36 @@ class InvalidActorNameException(message: String) extends AkkaException(message)
/**
* An ActorInitializationException is thrown when the the initialization logic for an Actor fails.
*/
class ActorInitializationException private[akka] (actor: ActorRef, message: String, cause: Throwable)
extends AkkaException(message, cause) /*with NoStackTrace*/ {
class ActorInitializationException private[akka] (val actor: ActorRef, message: String, cause: Throwable)
extends AkkaException(message, cause) {
def this(msg: String) = this(null, msg, null)
def this(actor: ActorRef, msg: String) = this(actor, msg, null)
}
/**
* A PreRestartException is thrown when the preRestart() method failed.
*
* @param actor is the actor whose preRestart() hook failed
* @param cause is the exception thrown by that actor within preRestart()
* @param origCause is the exception which caused the restart in the first place
* @param msg is the message which was optionally passed into preRestart()
*/
class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable, val msg: Option[Any])
extends ActorInitializationException(actor, "exception in preRestart(" + origCause.getClass + ", " + msg.map(_.getClass) + ")", cause) {
}
/**
* A PostRestartException is thrown when constructor or postRestart() method
* fails during a restart attempt.
*
* @param actor is the actor whose constructor or postRestart() hook failed
* @param cause is the exception thrown by that actor within preRestart()
* @param origCause is the exception which caused the restart in the first place
*/
class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable)
extends ActorInitializationException(actor, "exception post restart (" + origCause.getClass + ")", cause) {
}
/**
* InvalidMessageException is thrown when an invalid message is sent to an Actor.
* Technically it's only "null" which is an InvalidMessageException but who knows,

View file

@ -4,23 +4,17 @@
package akka.actor
import language.existentials
import java.io.{ ObjectOutputStream, NotSerializableException }
import akka.dispatch._
import scala.annotation.tailrec
import java.util.concurrent.TimeUnit
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.event.Logging.{ Debug, Warning, Error }
import akka.japi.Procedure
import java.io.{ NotSerializableException, ObjectOutputStream }
import akka.serialization.SerializationExtension
import akka.event.Logging.LogEventException
import scala.collection.immutable.{ TreeSet, TreeMap }
import akka.util.{ Unsafe, Helpers }
import scala.util.control.NonFatal
import scala.collection.immutable.TreeSet
import scala.concurrent.util.Duration
import scala.util.control.NonFatal
//TODO: everything here for current compatibility - could be limited more
import akka.actor.cell.ChildrenContainer
import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, MessageDispatcher, Envelope, Create, ChildTerminated }
import akka.event.Logging.{ LogEvent, Debug }
import akka.japi.Procedure
/**
* The actor context - the view of the actor cell from the actor.
@ -96,9 +90,9 @@ trait ActorContext extends ActorRefFactory {
def sender: ActorRef
/**
* Returns all supervised children; this method returns a view onto the
* internal collection of children. Targeted lookups should be using
* `actorFor` instead for performance reasons:
* Returns all supervised children; this method returns a view (i.e. a lazy
* collection) onto the internal collection of children. Targeted lookups
* should be using `actorFor` instead for performance reasons:
*
* {{{
* val badLookup = context.children find (_.path.name == "kid")
@ -194,7 +188,7 @@ private[akka] trait Cell {
/**
* Recursively resume this actor and all its children.
*/
def resume(): Unit
def resume(inResponseToFailure: Boolean): Unit
/**
* Restart this actor (will recursively restart or stop all children).
*/
@ -215,7 +209,11 @@ private[akka] trait Cell {
/**
* All children of this actor, including only reserved-names.
*/
def childrenRefs: ActorCell.ChildrenContainer
def childrenRefs: ChildrenContainer
/**
* Get the stats for the named child, if that exists.
*/
def getChildByName(name: String): Option[ChildRestartStats]
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
@ -258,436 +256,102 @@ private[akka] object ActorCell {
def cancel() {}
}
final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, emptyCancellable)
final val emptyBehaviorStack: List[Actor.Receive] = Nil
final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty
sealed trait SuspendReason
case object UserRequest extends SuspendReason
case class Recreation(cause: Throwable) extends SuspendReason
case object Termination extends SuspendReason
trait ChildrenContainer {
def add(child: ActorRef): ChildrenContainer
def remove(child: ActorRef): ChildrenContainer
def getByName(name: String): Option[ChildRestartStats]
def getByRef(actor: ActorRef): Option[ChildRestartStats]
def children: Iterable[ActorRef]
def stats: Iterable[ChildRestartStats]
def shallDie(actor: ActorRef): ChildrenContainer
/**
* reserve that name or throw an exception
*/
def reserve(name: String): ChildrenContainer
/**
* cancel a reservation
*/
def unreserve(name: String): ChildrenContainer
}
trait EmptyChildrenContainer extends ChildrenContainer {
val emptyStats = TreeMap.empty[String, ChildStats]
def add(child: ActorRef): ChildrenContainer =
new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child)))
def remove(child: ActorRef): ChildrenContainer = this
def getByName(name: String): Option[ChildRestartStats] = None
def getByRef(actor: ActorRef): Option[ChildRestartStats] = None
def children: Iterable[ActorRef] = Nil
def stats: Iterable[ChildRestartStats] = Nil
def shallDie(actor: ActorRef): ChildrenContainer = this
def reserve(name: String): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, ChildNameReserved))
def unreserve(name: String): ChildrenContainer = this
override def toString = "no children"
}
/**
* This is the empty container, shared among all leaf actors.
*/
object EmptyChildrenContainer extends EmptyChildrenContainer
/**
* This is the empty container which is installed after the last child has
* terminated while stopping; it is necessary to distinguish from the normal
* empty state while calling handleChildTerminated() for the last time.
*/
object TerminatedChildrenContainer extends EmptyChildrenContainer {
override def add(child: ActorRef): ChildrenContainer = this
override def reserve(name: String): ChildrenContainer =
throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated")
}
/**
* Normal children container: we do have at least one child, but none of our
* children are currently terminating (which is the time period between
* calling context.stop(child) and processing the ChildTerminated() system
* message).
*/
class NormalChildrenContainer(c: TreeMap[String, ChildStats]) extends ChildrenContainer {
def add(child: ActorRef): ChildrenContainer =
new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child)))
def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name)
def getByName(name: String): Option[ChildRestartStats] = c.get(name) match {
case s @ Some(_: ChildRestartStats) s.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match {
case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) child }
def stats: Iterable[ChildRestartStats] = c.values.collect { case c: ChildRestartStats c }
def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest)
def reserve(name: String): ChildrenContainer =
if (c contains name)
throw new InvalidActorNameException("actor name " + name + " is not unique!")
else new NormalChildrenContainer(c.updated(name, ChildNameReserved))
def unreserve(name: String): ChildrenContainer = c.get(name) match {
case Some(ChildNameReserved) NormalChildrenContainer(c - name)
case _ this
}
override def toString =
if (c.size > 20) c.size + " children"
else c.mkString("children:\n ", "\n ", "")
}
object NormalChildrenContainer {
def apply(c: TreeMap[String, ChildStats]): ChildrenContainer =
if (c.isEmpty) EmptyChildrenContainer
else new NormalChildrenContainer(c)
}
/**
* Waiting state: there are outstanding termination requests (i.e. context.stop(child)
* was called but the corresponding ChildTerminated() system message has not yet been
* processed). There could be no specific reason (UserRequested), we could be Restarting
* or Terminating.
*
* Removing the last child which was supposed to be terminating will return a different
* type of container, depending on whether or not children are left and whether or not
* the reason was Terminating.
*/
case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason)
extends ChildrenContainer {
def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child)))
def remove(child: ActorRef): ChildrenContainer = {
val t = toDie - child
if (t.isEmpty) reason match {
case Termination TerminatedChildrenContainer
case _ NormalChildrenContainer(c - child.path.name)
}
else copy(c - child.path.name, t)
}
def getByName(name: String): Option[ChildRestartStats] = c.get(name) match {
case s @ Some(_: ChildRestartStats) s.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match {
case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) child }
def stats: Iterable[ChildRestartStats] = c.values.collect { case c: ChildRestartStats c }
def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor)
def reserve(name: String): ChildrenContainer = reason match {
case Termination throw new IllegalStateException("cannot reserve actor name '" + name + "': terminating")
case _
if (c contains name)
throw new InvalidActorNameException("actor name " + name + " is not unique!")
else copy(c = c.updated(name, ChildNameReserved))
}
def unreserve(name: String): ChildrenContainer = c.get(name) match {
case Some(ChildNameReserved) copy(c = c - name)
case _ this
}
override def toString =
if (c.size > 20) c.size + " children"
else c.mkString("children (" + toDie.size + " terminating):\n ", "\n ", "\n") + toDie
}
}
//ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit)
//vars don't need volatile since it's protected with the mailbox status
//Make sure that they are not read/written outside of a message processing (systemInvoke/invoke)
/**
* Everything in here is completely Akka PRIVATE. You will not find any
* supported APIs in this place. This is not the API you were looking
* for! (waves hand)
*/
private[akka] class ActorCell(
val system: ActorSystemImpl,
val self: InternalActorRef,
val props: Props,
@volatile var parent: InternalActorRef) extends UntypedActorContext with Cell {
val parent: InternalActorRef)
extends UntypedActorContext with Cell
with cell.ReceiveTimeout
with cell.Children
with cell.Dispatch
with cell.DeathWatch
with cell.FaultHandling {
import AbstractActorCell.{ mailboxOffset, childrenOffset, nextNameOffset }
import ActorCell._
final def isLocal = true
final def systemImpl = system
protected final def guardian = self
protected final def lookupRoot = self
final def provider = system.provider
override final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match {
case Duration.Undefined None
case duration Some(duration)
}
final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined))
override final def setReceiveTimeout(timeout: Duration): Unit =
receiveTimeoutData = (
if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout,
receiveTimeoutData._2)
final override def resetReceiveTimeout(): Unit = setReceiveTimeout(None)
/**
* In milliseconds
*/
var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData
@volatile
private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer
def childrenRefs: ChildrenContainer = Unsafe.instance.getObjectVolatile(this, childrenOffset).asInstanceOf[ChildrenContainer]
private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean =
Unsafe.instance.compareAndSwapObject(this, childrenOffset, oldChildren, newChildren)
@tailrec private def reserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.reserve(name)) || reserveChild(name)
}
@tailrec private def unreserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name)
}
@tailrec private def addChild(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.add(ref)) || addChild(ref)
}
@tailrec private def shallDie(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref)
}
@tailrec private def removeChild(ref: ActorRef): ChildrenContainer = {
val c = childrenRefs
val n = c.remove(ref)
if (swapChildrenRefs(c, n)) n
else removeChild(ref)
}
@tailrec private def setChildrenTerminationReason(reason: SuspendReason): Boolean = {
childrenRefs match {
case c: TerminatingChildrenContainer swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason)
case _ false
}
}
private def isTerminating = childrenRefs match {
case TerminatingChildrenContainer(_, _, Termination) true
case TerminatedChildrenContainer true
case _ false
}
private def isNormal = childrenRefs match {
case TerminatingChildrenContainer(_, _, Termination | _: Recreation) false
case _ true
}
private def _actorOf(props: Props, name: String, async: Boolean): ActorRef = {
if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
val ser = SerializationExtension(system)
ser.serialize(props.creator) match {
case Left(t) throw t
case Right(bytes) ser.deserialize(bytes, props.creator.getClass) match {
case Left(t) throw t
case _ //All good
}
}
}
/*
* in case we are currently terminating, fail external attachChild requests
* (internal calls cannot happen anyway because we are suspended)
*/
if (isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated")
else {
reserveChild(name)
// this name will either be unreserved or overwritten with a real child below
val actor =
try {
provider.actorOf(systemImpl, props, self, self.path / name,
systemService = false, deploy = None, lookupDeploy = true, async = async)
} catch {
case NonFatal(e)
unreserveChild(name)
throw e
}
addChild(actor)
actor
}
}
def actorOf(props: Props): ActorRef = _actorOf(props, randomName(), async = false)
def actorOf(props: Props, name: String): ActorRef = _actorOf(props, checkName(name), async = false)
private def checkName(name: String): String = {
import ActorPath.ElementRegex
name match {
case null throw new InvalidActorNameException("actor name must not be null")
case "" throw new InvalidActorNameException("actor name must not be empty")
case ElementRegex() name
case _ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex)
}
}
private[akka] def attachChild(props: Props, name: String): ActorRef =
_actorOf(props, checkName(name), async = true)
private[akka] def attachChild(props: Props): ActorRef =
_actorOf(props, randomName(), async = true)
final def stop(actor: ActorRef): Unit = {
val started = actor match {
case r: RepointableRef r.isStarted
case _ true
}
if (childrenRefs.getByRef(actor).isDefined && started) shallDie(actor)
actor.asInstanceOf[InternalActorRef].stop()
}
private[this] var _actor: Actor = _
def actor: Actor = _actor
protected def actor_=(a: Actor): Unit = _actor = a
var currentMessage: Envelope = _
var actor: Actor = _
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
var watching: Set[ActorRef] = emptyActorRefSet
var watchedBy: Set[ActorRef] = emptyActorRefSet
@volatile private var _nextNameDoNotCallMeDirectly = 0L
final protected def randomName(): String = {
@tailrec def inc(): Long = {
val current = Unsafe.instance.getLongVolatile(this, nextNameOffset)
if (Unsafe.instance.compareAndSwapLong(this, nextNameOffset, current, current + 1)) current
else inc()
/*
* MESSAGE PROCESSING
*/
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def systemInvoke(message: SystemMessage): Unit = try {
message match {
case Create() create()
case Recreate(cause) faultRecreate(cause)
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Suspend() faultSuspend()
case Resume(inRespToFailure) faultResume(inRespToFailure)
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
case NoMessage // only here to suppress warning
}
Helpers.base64(inc())
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
}
@volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def invoke(messageHandle: Envelope): Unit = try {
currentMessage = messageHandle
cancelReceiveTimeout() // FIXME: leave this here???
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg receiveMessage(msg)
}
currentMessage = null // reset current message after successful invocation
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, e.getMessage)
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
/**
* INTERNAL API
*
* Returns a reference to the current mailbox
def autoReceiveMessage(msg: Envelope): Unit = {
if (system.settings.DebugAutoReceive)
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case Failed(cause) handleFailure(sender, cause)
case t: Terminated watchedActorTerminated(t.actor); receiveMessage(t)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) for (c getChildByName(name)) c.child.tell(m, msg.sender)
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
}
}
final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
/*
* ACTOR CONTEXT IMPLEMENTATION
*/
@inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, mailboxOffset).asInstanceOf[Mailbox]
/**
* INTERNAL API
*
* replaces the current mailbox using getAndSet semantics
*/
@tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = {
val oldMailbox = mailbox
if (!Unsafe.instance.compareAndSwapObject(this, mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox)
else oldMailbox
}
final def hasMessages: Boolean = mailbox.hasMessages
final def numberOfMessages: Int = mailbox.numberOfMessages
val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
/**
* UntypedActorContext impl
*/
final def getDispatcher(): MessageDispatcher = dispatcher
final def isTerminated: Boolean = mailbox.isClosed
final def start(): this.type = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
swapMailbox(dispatcher.createMailbox(this))
mailbox.setActor(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create())
// This call is expected to start off the actor by scheduling its mailbox.
dispatcher.attach(this)
this
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def resume(): Unit = dispatcher.systemDispatch(this, Resume())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && !watching.contains(a)) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching += a
}
a
}
override final def unwatch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && watching.contains(a)) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching -= a
}
a
}
final def children: Iterable[ActorRef] = childrenRefs.children
/**
* Impl UntypedActorContext
*/
final def getChildren(): java.lang.Iterable[ActorRef] =
scala.collection.JavaConverters.asJavaIterableConverter(children).asJava
def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))
override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message)
final def sender: ActorRef = currentMessage match {
case null system.deadLetters
@ -695,6 +359,25 @@ private[akka] class ActorCell(
case _ system.deadLetters
}
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit =
behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack)
def become(behavior: Procedure[Any]): Unit = become(behavior, false)
def become(behavior: Procedure[Any], discardOld: Boolean): Unit =
become({ case msg behavior.apply(msg) }: Actor.Receive, discardOld)
def unbecome(): Unit = {
val original = behaviorStack
behaviorStack =
if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack
else original.tail
}
/*
* ACTOR INSTANCE HANDLING
*/
//This method is in charge of setting up the contextStack and create a new instance of the Actor
protected def newActor(): Actor = {
contextStack.set(this :: contextStack.get)
@ -715,321 +398,43 @@ private[akka] class ActorCell(
}
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def systemInvoke(message: SystemMessage) {
def create(): Unit = if (isNormal) {
try {
val created = newActor()
actor = created
created.preStart()
checkReceiveTimeout
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
case NonFatal(i: InstantiationException)
throw new ActorInitializationException(self,
"""exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
private def create(): Unit = if (isNormal) {
try {
val created = newActor()
actor = created
created.preStart()
checkReceiveTimeout
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")"))
} catch {
case NonFatal(i: InstantiationException)
throw new ActorInitializationException(self,
"""exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either,
a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new UntypedActorFactory ... )
or is missing an appropriate, reachable no-args constructor.
""", i.getCause)
case NonFatal(e) throw new ActorInitializationException(self, "exception during creation", e)
}
}
def recreate(cause: Throwable): Unit = if (isNormal) {
try {
val failedActor = actor
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
if (failedActor ne null) {
val c = currentMessage //One read only plz
try {
if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None)
} finally {
clearActorFields(failedActor)
}
}
childrenRefs match {
case ct: TerminatingChildrenContainer
setChildrenTerminationReason(Recreation(cause))
dispatcher suspend this
case _
doRecreate(cause, failedActor)
}
} catch {
case NonFatal(e) throw new ActorInitializationException(self, "exception during creation", e match {
case i: InstantiationException i.getCause
case other other
})
}
}
def suspend(): Unit = if (isNormal) dispatcher suspend this
def resume(): Unit = if (isNormal) dispatcher resume this
def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val watcheeSelf = watchee == self
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) {
watchedBy += watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
watch(watchee)
} else {
system.eventStream.publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))
}
}
def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val watcheeSelf = watchee == self
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher)) {
watchedBy -= watcher
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
unwatch(watchee)
} else {
system.eventStream.publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self)))
}
}
def terminate() {
setReceiveTimeout(None)
cancelReceiveTimeout
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
children foreach stop
childrenRefs match {
case ct: TerminatingChildrenContainer
setChildrenTerminationReason(Termination)
// do not process normal messages while waiting for all children to terminate
dispatcher suspend this
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping"))
case _ doTerminate()
}
}
def supervise(child: ActorRef): Unit = if (!isTerminating) {
if (childrenRefs.getByRef(child).isEmpty) addChild(child)
handleSupervise(child)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
}
try {
message match {
case Create() create()
case Recreate(cause) recreate(cause)
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Suspend() suspend()
case Resume() resume()
case Terminate() terminate()
case Supervise(child) supervise(child)
case ChildTerminated(child) handleChildTerminated(child)
case NoMessage // only here to suppress warning
}
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, "error while processing " + message)
case NonFatal(e) throw new ActorInitializationException(self, "exception during creation", e)
}
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
final def invoke(messageHandle: Envelope): Unit = try {
currentMessage = messageHandle
cancelReceiveTimeout() // FIXME: leave this here???
messageHandle.message match {
case msg: AutoReceivedMessage autoReceiveMessage(messageHandle)
case msg receiveMessage(msg)
}
currentMessage = null // reset current message after successful invocation
} catch {
case e @ (_: InterruptedException | NonFatal(_)) handleInvokeFailure(e, e.getMessage)
} finally {
checkReceiveTimeout // Reschedule receive timeout
}
final def handleInvokeFailure(t: Throwable, message: String): Unit = try {
dispatcher.reportFailure(new LogEventException(Error(t, self.path.toString, clazz(actor), message), t))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children)
} finally {
t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t)), self); throw t
case _ parent.tell(Failed(t), self)
}
}
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit =
behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack)
/**
* UntypedActorContext impl
*/
def become(behavior: Procedure[Any]): Unit = become(behavior, false)
/*
* UntypedActorContext impl
*/
def become(behavior: Procedure[Any], discardOld: Boolean): Unit =
become({ case msg behavior.apply(msg) }: Actor.Receive, discardOld)
def unbecome(): Unit = {
val original = behaviorStack
behaviorStack =
if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack
else original.tail
}
def autoReceiveMessage(msg: Envelope): Unit = {
if (system.settings.DebugAutoReceive)
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case Failed(cause) handleFailure(sender, cause)
case t: Terminated watching -= t.actor; receiveMessage(t)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) for (c childrenRefs getByName name) c.child.tell(m, msg.sender)
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
}
}
final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled)
private def doTerminate() {
val a = actor
try {
try {
if (a ne null) a.postStop()
} finally {
dispatcher.detach(this)
}
} finally {
try {
parent.sendSystemMessage(ChildTerminated(self))
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true)
try {
watchedBy foreach {
watcher
try watcher.tell(terminated, self) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watchedBy = emptyActorRefSet
}
if (!watching.isEmpty) {
try {
watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
case watchee: InternalActorRef try watchee.sendSystemMessage(Unwatch(watchee, self)) catch {
case NonFatal(t) system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch"))
}
}
} finally watching = emptyActorRefSet
}
if (system.settings.DebugLifecycle)
system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped"))
} finally {
behaviorStack = emptyBehaviorStack
clearActorFields(a)
actor = null
}
}
}
private def doRecreate(cause: Throwable, failedActor: Actor): Unit = try {
// after all killed children have terminated, recreate the rest, then go on to start the new instance
actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
dispatcher.resume(this)
} catch {
case NonFatal(e) try {
dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e))
// prevent any further messages to be processed until the actor has been restarted
dispatcher.suspend(this)
actor.supervisorStrategy.handleSupervisorFailing(self, children) // FIXME Should this be called on actor or failedActor?
clearActorFields(actor) // If this fails, we need to ensure that preRestart isn't called.
} finally {
parent.tell(Failed(new ActorInitializationException(self, "exception during re-creation", e)), self)
}
}
final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.getByRef(child) match {
case Some(stats) if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.stats)) throw cause
case None system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
}
final def handleChildTerminated(child: ActorRef): Unit = try {
childrenRefs match {
case tc @ TerminatingChildrenContainer(_, _, reason)
val n = removeChild(child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match {
case Recreation(cause) doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate"
case Termination doTerminate()
case _
}
case _
removeChild(child)
actor.supervisorStrategy.handleChildTerminated(this, child, children)
}
} catch {
case NonFatal(e)
try {
dispatcher suspend this
actor.supervisorStrategy.handleSupervisorFailing(self, children)
} finally {
parent.tell(Failed(e), self)
}
private def supervise(child: ActorRef): Unit = if (!isTerminating) {
addChild(child)
handleSupervise(child)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
}
// future extension point
protected def handleSupervise(child: ActorRef): Unit = child match {
case r: RepointableActorRef r.activate()
case _
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause))
final def checkReceiveTimeout() {
val recvtimeout = receiveTimeoutData
if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) {
recvtimeout._2.cancel() //Cancel any ongoing future
//Only reschedule if desired and there are currently no more messages to be processed
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, ReceiveTimeout))
} else cancelReceiveTimeout()
}
final def cancelReceiveTimeout(): Unit =
if (receiveTimeoutData._2 ne emptyCancellable) {
receiveTimeoutData._2.cancel()
receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable)
}
final def clearActorFields(actorInstance: Actor): Unit = {
final protected def clearActorFields(actorInstance: Actor): Unit = {
setActorFields(actorInstance, context = null, self = system.deadLetters)
currentMessage = null
behaviorStack = emptyBehaviorStack
}
final def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef) {
final protected def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef) {
@tailrec
def lookupAndSetField(clazz: Class[_], actor: Actor, name: String, value: Any): Boolean = {
val success = try {
@ -1054,6 +459,9 @@ private[akka] class ActorCell(
}
}
private final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass
// logging is not the main purpose, and if it fails theres nothing we can do
protected final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) }
protected final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass
}

View file

@ -191,7 +191,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
/*
* Actor life-cycle management, invoked only internally (in response to user requests via ActorContext).
*/
def resume(): Unit
def resume(inResponseToFailure: Boolean): Unit
def suspend(): Unit
def restart(cause: Throwable): Unit
def stop(): Unit
@ -262,10 +262,7 @@ private[akka] class LocalActorRef private[akka] (
* that is reached).
*/
private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor)
actorCell.start()
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
_supervisor.sendSystemMessage(akka.dispatch.Supervise(this))
actorCell.start(sendSupervise = true)
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, supervisor)
@ -291,7 +288,7 @@ private[akka] class LocalActorRef private[akka] (
/**
* Resumes a suspended actor.
*/
override def resume(): Unit = actorCell.resume()
override def resume(inResponseToFailure: Boolean): Unit = actorCell.resume(inResponseToFailure)
/**
* Shuts down the actor and its message queue
@ -307,7 +304,7 @@ private[akka] class LocalActorRef private[akka] (
* to inject synthetic actor paths like /temp.
*/
protected def getSingleChild(name: String): InternalActorRef =
actorCell.childrenRefs.getByName(name) match {
actorCell.getChildByName(name) match {
case Some(crs) crs.child.asInstanceOf[InternalActorRef]
case None Nobody
}
@ -391,7 +388,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody
override def suspend(): Unit = ()
override def resume(): Unit = ()
override def resume(inResponseToFailure: Boolean): Unit = ()
override def stop(): Unit = ()
override def isTerminated = false

View file

@ -17,6 +17,7 @@ import akka.util._
import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap }
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.cell.ChildrenContainer
object ActorSystem {
@ -701,19 +702,30 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
node match {
case wc: ActorRefWithCell
val cell = wc.underlying
indent + "-> " + node.path.name + " " + Logging.simpleName(node) + " " +
(if (indent.isEmpty) "-> " else indent.dropRight(1) + "⌊-> ") +
node.path.name + " " + Logging.simpleName(node) + " " +
(cell match {
case real: ActorCell if (real.actor ne null) real.actor.getClass else "null"
case _ Logging.simpleName(cell)
}) +
(cell match {
case real: ActorCell " status=" + real.mailbox.status
case _ ""
}) +
" " + (cell.childrenRefs match {
case ActorCell.TerminatingChildrenContainer(_, toDie, reason)
case ChildrenContainer.TerminatingChildrenContainer(_, toDie, reason)
"Terminating(" + reason + ")" +
(toDie.toSeq.sorted mkString ("\n" + indent + " toDie: ", "\n" + indent + " ", ""))
(toDie.toSeq.sorted mkString ("\n" + indent + " | toDie: ", "\n" + indent + " | ", ""))
case x @ (ChildrenContainer.TerminatedChildrenContainer | ChildrenContainer.EmptyChildrenContainer) x.toString
case n: ChildrenContainer.NormalChildrenContainer n.c.size + " children"
case x Logging.simpleName(x)
}) +
(if (cell.childrenRefs.children.isEmpty) "" else "\n") +
(cell.childrenRefs.children.toSeq.sorted map (printNode(_, indent + " |")) mkString ("\n"))
({
val children = cell.childrenRefs.children.toSeq.sorted
val bulk = children.dropRight(1) map (printNode(_, indent + " |"))
bulk ++ (children.lastOption map (printNode(_, indent + " ")))
} mkString ("\n"))
case _
indent + node.path.name + " " + Logging.simpleName(node)
}

View file

@ -208,19 +208,31 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
}
/**
* An Akka SupervisorStrategy is the policy to apply for crashing children
* An Akka SupervisorStrategy is the policy to apply for crashing children.
*
* <b>IMPORTANT:</b>
*
* You should not normally need to create new subclasses, instead use the
* existing [[akka.actor.OneForOneStrategy]] or [[akka.actor.AllForOneStrategy]],
* but if you do, please read the docs of the methods below carefully, as
* incorrect implementations may lead to blocked actor systems (i.e.
* permanently suspended actors).
*/
abstract class SupervisorStrategy {
import SupervisorStrategy._
/**
* Returns the Decider that is associated with this SupervisorStrategy
* Returns the Decider that is associated with this SupervisorStrategy.
* The Decider is invoked by the default implementation of `handleFailure`
* to obtain the Directive to be applied.
*/
def decider: Decider
/**
* This method is called after the child has been removed from the set of children.
* It does not need to do anything special. Exceptions thrown from this method
* do NOT make the actor fail if this happens during termination.
*/
def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit
@ -229,27 +241,48 @@ abstract class SupervisorStrategy {
*/
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit
//FIXME docs
def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit =
if (children.nonEmpty) children.foreach(_.asInstanceOf[InternalActorRef].suspend())
//FIXME docs
def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit =
if (children.nonEmpty) children.foreach(_.asInstanceOf[InternalActorRef].restart(cause))
/**
* Returns whether it processed the failure or not
* This is the main entry point: in case of a childs failure, this method
* must try to handle the failure by resuming, restarting or stopping the
* child (and returning `true`), or it returns `false` to escalate the
* failure, which will lead to this actor re-throwing the exception which
* caused the failure. The exception will not be wrapped.
*
* @param children is a lazy collection (a view)
*/
def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = {
val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate //FIXME applyOrElse in Scala 2.10
directive match {
case Resume child.asInstanceOf[InternalActorRef].resume(); true
case Resume resumeChild(child); true
case Restart processFailure(context, true, child, cause, stats, children); true
case Stop processFailure(context, false, child, cause, stats, children); true
case Escalate false
}
}
/**
* Resume the previously failed child: <b>do never apply this to a child which
* is not the currently failing child</b>. Suspend/resume needs to be done in
* matching pairs, otherwise actors will wake up too soon or never at all.
*/
final def resumeChild(child: ActorRef): Unit = child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = true)
/**
* Restart the given child, possibly suspending it first.
*
* <b>IMPORTANT:</b>
*
* If the child is the currently failing one, it will already have been
* suspended, hence `suspendFirst` is false. If the child is not the
* currently failing one, then it did not request this treatment and is
* therefore not prepared to be resumed without prior suspend.
*/
final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = {
val c = child.asInstanceOf[InternalActorRef]
if (suspendFirst) c.suspend()
c.restart(cause)
}
}
/**
@ -288,7 +321,7 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (children.nonEmpty) {
if (restart && children.forall(_.requestRestartPermission(retriesWindow)))
children.foreach(_.child.asInstanceOf[InternalActorRef].restart(cause))
children foreach (crs restartChild(crs.child, cause, suspendFirst = (crs.child != child)))
else
for (c children) context.stop(c.child)
}
@ -330,7 +363,7 @@ case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration
def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = {
if (restart && stats.requestRestartPermission(retriesWindow))
child.asInstanceOf[InternalActorRef].restart(cause)
restartChild(child, cause, suspendFirst = false)
else
context.stop(child) //TODO optimization to drop child here already?
}

View file

@ -16,6 +16,7 @@ import akka.dispatch.MessageDispatcher
import java.util.concurrent.locks.ReentrantLock
import akka.event.Logging.Warning
import scala.collection.mutable.Queue
import akka.actor.cell.ChildrenContainer
/**
* This actor ref starts out with some dummy cell (by default just enqueuing
@ -76,11 +77,11 @@ private[akka] class RepointableActorRef(
* This is called by activate() to obtain the cell which is to replace the
* unstarted cell. The cell must be fully functional.
*/
def newCell(): Cell = new ActorCell(system, this, props, supervisor).start()
def newCell(): Cell = new ActorCell(system, this, props, supervisor).start(sendSupervise = false)
def suspend(): Unit = underlying.suspend()
def resume(): Unit = underlying.resume()
def resume(inResponseToFailure: Boolean): Unit = underlying.resume(inResponseToFailure)
def stop(): Unit = underlying.stop()
@ -102,7 +103,7 @@ private[akka] class RepointableActorRef(
case ".." getParent.getChild(name)
case "" getChild(name)
case other
underlying.childrenRefs.getByName(other) match {
underlying.getChildByName(other) match {
case Some(crs) crs.child.asInstanceOf[InternalActorRef].getChild(name)
case None Nobody
}
@ -129,6 +130,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
// use Envelope to keep on-send checks in the same place
val queue: Queue[Envelope] = Queue()
val systemQueue: Queue[SystemMessage] = Queue()
var suspendCount = 0
def replaceWith(cell: Cell): Unit = {
lock.lock()
@ -161,18 +163,21 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
if (interrupted) throw new InterruptedException
} finally try
self.swapCell(cell)
finally try
for (_ 1 to suspendCount) cell.suspend()
finally
lock.unlock()
}
def system: ActorSystem = systemImpl
def suspend(): Unit = {}
def resume(): Unit = {}
def restart(cause: Throwable): Unit = {}
def suspend(): Unit = { lock.lock(); try suspendCount += 1 finally lock.unlock() }
def resume(inResponseToFailure: Boolean): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() }
def restart(cause: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() }
def stop(): Unit = sendSystemMessage(Terminate())
def isTerminated: Boolean = false
def parent: InternalActorRef = supervisor
def childrenRefs: ActorCell.ChildrenContainer = ActorCell.EmptyChildrenContainer
def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer
def getChildByName(name: String): Option[ChildRestartStats] = None
def tell(message: Any, sender: ActorRef): Unit = {
lock.lock()
try {

View file

@ -0,0 +1,188 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
import scala.annotation.tailrec
import scala.collection.JavaConverters.asJavaIterableConverter
import scala.util.control.NonFatal
import akka.actor.{ RepointableRef, Props, NoSerializationVerificationNeeded, InvalidActorNameException, InternalActorRef, ChildRestartStats, ActorRef }
import akka.actor.ActorCell
import akka.actor.ActorPath.ElementRegex
import akka.serialization.SerializationExtension
import akka.util.{ Unsafe, Helpers }
private[akka] trait Children { this: ActorCell
import ChildrenContainer._
@volatile
private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer
def childrenRefs: ChildrenContainer =
Unsafe.instance.getObjectVolatile(this, AbstractActorCell.childrenOffset).asInstanceOf[ChildrenContainer]
final def children: Iterable[ActorRef] = childrenRefs.children
final def getChildren(): java.lang.Iterable[ActorRef] = children.asJava
def actorOf(props: Props): ActorRef = makeChild(this, props, randomName(), async = false)
def actorOf(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = false)
private[akka] def attachChild(props: Props): ActorRef = makeChild(this, props, randomName(), async = true)
private[akka] def attachChild(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = true)
@volatile private var _nextNameDoNotCallMeDirectly = 0L
final protected def randomName(): String = {
@tailrec def inc(): Long = {
val current = Unsafe.instance.getLongVolatile(this, AbstractActorCell.nextNameOffset)
if (Unsafe.instance.compareAndSwapLong(this, AbstractActorCell.nextNameOffset, current, current + 1)) current
else inc()
}
Helpers.base64(inc())
}
final def stop(actor: ActorRef): Unit = {
val started = actor match {
case r: RepointableRef r.isStarted
case _ true
}
if (childrenRefs.getByRef(actor).isDefined && started) shallDie(actor)
actor.asInstanceOf[InternalActorRef].stop()
}
/*
* low level CAS helpers
*/
@inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren)
@tailrec final protected def reserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.reserve(name)) || reserveChild(name)
}
@tailrec final protected def unreserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name)
}
final protected def addChild(ref: ActorRef): Boolean = {
@tailrec def rec(): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.add(ref)) || rec()
}
/*
* This does not need to check getByRef every tailcall, because the change
* cannot happen in that direction as a race: the only entity removing a
* child is the actor itself, and the only entity which could be racing is
* somebody who calls attachChild, and there we are guaranteed that that
* child cannot yet have died (since it has not yet been created).
*/
if (childrenRefs.getByRef(ref).isEmpty) rec() else false
}
@tailrec final protected def shallDie(ref: ActorRef): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref)
}
@tailrec final private def removeChild(ref: ActorRef): ChildrenContainer = {
val c = childrenRefs
val n = c.remove(ref)
if (swapChildrenRefs(c, n)) n
else removeChild(ref)
}
@tailrec final protected def setChildrenTerminationReason(reason: ChildrenContainer.SuspendReason): Boolean = {
childrenRefs match {
case c: ChildrenContainer.TerminatingChildrenContainer
swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason)
case _ false
}
}
final protected def setTerminated(): Unit = Unsafe.instance.putObjectVolatile(this, AbstractActorCell.childrenOffset, TerminatedChildrenContainer)
/*
* ActorCell-internal API
*/
protected def isNormal = childrenRefs.isNormal
protected def isTerminating = childrenRefs.isTerminating
protected def suspendChildren(skip: Set[ActorRef] = Set.empty): Unit =
childrenRefs.stats foreach {
case ChildRestartStats(child, _, _) if !(skip contains child) child.asInstanceOf[InternalActorRef].suspend()
case _
}
protected def resumeChildren(): Unit =
childrenRefs.stats foreach (_.child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = false))
def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name)
protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref)
protected def getAllChildStats: Iterable[ChildRestartStats] = childrenRefs.stats
protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = {
childrenRefs match {
case TerminatingChildrenContainer(_, _, reason)
val newContainer = removeChild(child)
if (!newContainer.isInstanceOf[TerminatingChildrenContainer]) Some(reason) else None
case _
removeChild(child)
None
}
}
/*
* Private helpers
*/
private def checkName(name: String): String = {
name match {
case null throw new InvalidActorNameException("actor name must not be null")
case "" throw new InvalidActorNameException("actor name must not be empty")
case ElementRegex() name
case _ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex)
}
}
private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean): ActorRef = {
if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) {
val ser = SerializationExtension(cell.system)
ser.serialize(props.creator) match {
case Left(t) throw t
case Right(bytes) ser.deserialize(bytes, props.creator.getClass) match {
case Left(t) throw t
case _ //All good
}
}
}
/*
* in case we are currently terminating, fail external attachChild requests
* (internal calls cannot happen anyway because we are suspended)
*/
if (cell.childrenRefs.isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated")
else {
reserveChild(name)
// this name will either be unreserved or overwritten with a real child below
val actor =
try {
cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name,
systemService = false, deploy = None, lookupDeploy = true, async = async)
} catch {
case NonFatal(e)
unreserveChild(name)
throw e
}
addChild(actor)
actor
}
}
}

View file

@ -0,0 +1,195 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
import scala.collection.immutable.TreeMap
import akka.actor.{ InvalidActorNameException, ChildStats, ChildRestartStats, ChildNameReserved, ActorRef }
/**
* INTERNAL API
*/
private[akka] trait ChildrenContainer {
def add(child: ActorRef): ChildrenContainer
def remove(child: ActorRef): ChildrenContainer
def getByName(name: String): Option[ChildRestartStats]
def getByRef(actor: ActorRef): Option[ChildRestartStats]
def children: Iterable[ActorRef]
def stats: Iterable[ChildRestartStats]
def shallDie(actor: ActorRef): ChildrenContainer
// reserve that name or throw an exception
def reserve(name: String): ChildrenContainer
// cancel a reservation
def unreserve(name: String): ChildrenContainer
def isTerminating: Boolean = false
def isNormal: Boolean = true
}
/**
* INTERNAL API
*
* This object holds the classes performing the logic of managing the children
* of an actor, hence they are intimately tied to ActorCell.
*/
private[akka] object ChildrenContainer {
sealed trait SuspendReason
case object UserRequest extends SuspendReason
case class Recreation(cause: Throwable) extends SuspendReason
case object Termination extends SuspendReason
trait EmptyChildrenContainer extends ChildrenContainer {
val emptyStats = TreeMap.empty[String, ChildStats]
override def add(child: ActorRef): ChildrenContainer =
new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child)))
override def remove(child: ActorRef): ChildrenContainer = this
override def getByName(name: String): Option[ChildRestartStats] = None
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = None
override def children: Iterable[ActorRef] = Nil
override def stats: Iterable[ChildRestartStats] = Nil
override def shallDie(actor: ActorRef): ChildrenContainer = this
override def reserve(name: String): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, ChildNameReserved))
override def unreserve(name: String): ChildrenContainer = this
}
/**
* This is the empty container, shared among all leaf actors.
*/
object EmptyChildrenContainer extends EmptyChildrenContainer {
override def toString = "no children"
}
/**
* This is the empty container which is installed after the last child has
* terminated while stopping; it is necessary to distinguish from the normal
* empty state while calling handleChildTerminated() for the last time.
*/
object TerminatedChildrenContainer extends EmptyChildrenContainer {
override def add(child: ActorRef): ChildrenContainer = this
override def reserve(name: String): ChildrenContainer =
throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated")
override def isTerminating: Boolean = true
override def isNormal: Boolean = false
override def toString = "terminated"
}
/**
* Normal children container: we do have at least one child, but none of our
* children are currently terminating (which is the time period between
* calling context.stop(child) and processing the ChildTerminated() system
* message).
*/
class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer {
override def add(child: ActorRef): ChildrenContainer =
new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child)))
override def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name)
override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match {
case s @ Some(_: ChildRestartStats) s.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match {
case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) child }
override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats c }
override def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest)
override def reserve(name: String): ChildrenContainer =
if (c contains name)
throw new InvalidActorNameException("actor name " + name + " is not unique!")
else new NormalChildrenContainer(c.updated(name, ChildNameReserved))
override def unreserve(name: String): ChildrenContainer = c.get(name) match {
case Some(ChildNameReserved) NormalChildrenContainer(c - name)
case _ this
}
override def toString =
if (c.size > 20) c.size + " children"
else c.mkString("children:\n ", "\n ", "")
}
object NormalChildrenContainer {
def apply(c: TreeMap[String, ChildStats]): ChildrenContainer =
if (c.isEmpty) EmptyChildrenContainer
else new NormalChildrenContainer(c)
}
/**
* Waiting state: there are outstanding termination requests (i.e. context.stop(child)
* was called but the corresponding ChildTerminated() system message has not yet been
* processed). There could be no specific reason (UserRequested), we could be Restarting
* or Terminating.
*
* Removing the last child which was supposed to be terminating will return a different
* type of container, depending on whether or not children are left and whether or not
* the reason was Terminating.
*/
case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason)
extends ChildrenContainer {
override def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child)))
override def remove(child: ActorRef): ChildrenContainer = {
val t = toDie - child
if (t.isEmpty) reason match {
case Termination TerminatedChildrenContainer
case _ NormalChildrenContainer(c - child.path.name)
}
else copy(c - child.path.name, t)
}
override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match {
case s @ Some(_: ChildRestartStats) s.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match {
case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) child }
override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats c }
override def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor)
override def reserve(name: String): ChildrenContainer = reason match {
case Termination throw new IllegalStateException("cannot reserve actor name '" + name + "': terminating")
case _
if (c contains name)
throw new InvalidActorNameException("actor name " + name + " is not unique!")
else copy(c = c.updated(name, ChildNameReserved))
}
override def unreserve(name: String): ChildrenContainer = c.get(name) match {
case Some(ChildNameReserved) copy(c = c - name)
case _ this
}
override def isTerminating: Boolean = reason == Termination
override def isNormal: Boolean = reason == UserRequest
override def toString =
if (c.size > 20) c.size + " children"
else c.mkString("children (" + toDie.size + " terminating):\n ", "\n ", "\n") + toDie
}
}

View file

@ -0,0 +1,95 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor }
import akka.dispatch.{ Watch, Unwatch }
import akka.event.Logging.{ Warning, Error, Debug }
import scala.util.control.NonFatal
private[akka] trait DeathWatch { this: ActorCell
private var watching: Set[ActorRef] = ActorCell.emptyActorRefSet
private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet
override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && !watching.contains(a)) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching += a
}
a
}
override final def unwatch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && watching.contains(a)) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching -= a
}
a
}
protected def watchedActorTerminated(ref: ActorRef): Unit = watching -= ref
protected def tellWatchersWeDied(actor: Actor): Unit = {
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true)
try {
watchedBy foreach {
watcher
try watcher.tell(terminated, self) catch {
case NonFatal(t) publish(Error(t, self.path.toString, clazz(actor), "deathwatch"))
}
}
} finally watchedBy = ActorCell.emptyActorRefSet
}
}
protected def unwatchWatchedActors(actor: Actor): Unit = {
if (!watching.isEmpty) {
try {
watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
case watchee: InternalActorRef try watchee.sendSystemMessage(Unwatch(watchee, self)) catch {
case NonFatal(t) publish(Error(t, self.path.toString, clazz(actor), "deathwatch"))
}
}
} finally watching = ActorCell.emptyActorRefSet
}
}
protected def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val watcheeSelf = watchee == self
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (!watchedBy.contains(watcher)) {
watchedBy += watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
watch(watchee)
} else {
publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self)))
}
}
protected def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = {
val watcheeSelf = watchee == self
val watcherSelf = watcher == self
if (watcheeSelf && !watcherSelf) {
if (watchedBy.contains(watcher)) {
watchedBy -= watcher
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher))
}
} else if (!watcheeSelf && watcherSelf) {
unwatch(watchee)
} else {
publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self)))
}
}
}

View file

@ -0,0 +1,78 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
import scala.annotation.tailrec
import akka.actor.{ ActorRef, ActorCell }
import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create }
import akka.util.Unsafe
private[akka] trait Dispatch { this: ActorCell
@volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
@inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, AbstractActorCell.mailboxOffset).asInstanceOf[Mailbox]
@tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = {
val oldMailbox = mailbox
if (!Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox)
else oldMailbox
}
final def hasMessages: Boolean = mailbox.hasMessages
final def numberOfMessages: Int = mailbox.numberOfMessages
val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
/**
* UntypedActorContext impl
*/
final def getDispatcher(): MessageDispatcher = dispatcher
final def isTerminated: Boolean = mailbox.isClosed
final def start(sendSupervise: Boolean): this.type = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
*/
swapMailbox(dispatcher.createMailbox(this))
mailbox.setActor(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create())
if (sendSupervise) {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.Supervise(self))
}
// This call is expected to start off the actor by scheduling its mailbox.
dispatcher.attach(this)
this
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend())
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def resume(inResponseToFailure: Boolean): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause))
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def stop(): Unit = dispatcher.systemDispatch(this, Terminate())
def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))
override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message)
}

View file

@ -0,0 +1,209 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
import scala.annotation.tailrec
import akka.actor.{ PreRestartException, PostRestartException, InternalActorRef, Failed, ActorRef, ActorInterruptedException, ActorCell, Actor }
import akka.dispatch.{ Envelope, ChildTerminated }
import akka.event.Logging.{ Warning, Error, Debug }
import scala.util.control.NonFatal
private[akka] trait FaultHandling { this: ActorCell
/* =================
* T H E R U L E S
* =================
*
* Actors can be suspended for two reasons:
* - they fail
* - their supervisor gets suspended
*
* In particular they are not suspended multiple times because of cascading
* own failures, i.e. while currentlyFailed() they do not fail again. In case
* of a restart, failures in constructor/preStart count as new failures.
*/
private def suspendNonRecursive(): Unit = dispatcher suspend this
private def resumeNonRecursive(): Unit = dispatcher resume this
/*
* have we told our supervisor that we Failed() and have not yet heard back?
* (actually: we might have heard back but not yet acted upon it, in case of
* a restart with dying children)
* might well be replaced by ref to a Cancellable in the future (see #2299)
*/
private var _failed = false
private def isFailed: Boolean = _failed
private def setFailed(): Unit = _failed = true
private def clearFailed(): Unit = _failed = false
/**
* Do re-create the actor in response to a failure.
*/
protected def faultRecreate(cause: Throwable): Unit =
if (isNormal) {
val failedActor = actor
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting"))
if (failedActor ne null) {
try {
// if the actor fails in preRestart, we can do nothing but log it: its best-effort
if (failedActor.context ne null) failedActor.preRestart(cause, Option(currentMessage))
} catch {
case NonFatal(e)
val ex = new PreRestartException(self, e, cause, Option(currentMessage))
publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
} finally {
clearActorFields(failedActor)
}
}
assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status)
if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor)
} else {
// need to keep that suspend counter balanced
faultResume(inResponseToFailure = false)
}
/**
* Do suspend the actor in response to a failure of a parent (i.e. the
* recursive suspend feature).
*/
protected def faultSuspend(): Unit = {
// done always to keep that suspend counter balanced
suspendNonRecursive()
suspendChildren()
}
/**
* Do resume the actor in response to a failure.
*
* @param inResponseToFailure signifies if it was our own failure which
* prompted this action.
*/
protected def faultResume(inResponseToFailure: Boolean): Unit = {
// done always to keep that suspend counter balanced
// must happen atomically
try resumeNonRecursive()
finally if (inResponseToFailure) clearFailed()
resumeChildren()
}
protected def terminate() {
setReceiveTimeout(None)
cancelReceiveTimeout
// stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children)
children foreach stop
val wasTerminating = isTerminating
if (setChildrenTerminationReason(ChildrenContainer.Termination)) {
if (!wasTerminating) {
// do not process normal messages while waiting for all children to terminate
suspendNonRecursive()
// do not propagate failures during shutdown to the supervisor
setFailed()
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping"))
}
} else {
setTerminated()
finishTerminate()
}
}
final def handleInvokeFailure(t: Throwable, message: String): Unit = {
publish(Error(t, self.path.toString, clazz(actor), message))
// prevent any further messages to be processed until the actor has been restarted
if (!isFailed) try {
suspendNonRecursive()
setFailed()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
case Envelope(Failed(`t`), child) Set(child)
case _ Set.empty
}
suspendChildren(skip)
// tell supervisor
t match { // Wrap InterruptedExceptions and rethrow
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t)), self); throw t
case _ parent.tell(Failed(t), self)
}
} catch {
case NonFatal(e)
publish(Error(e, self.path.toString, clazz(actor), "emergency stop: exception in failure handling"))
try children foreach stop
finally finishTerminate()
}
}
private def finishTerminate() {
val a = actor
try if (a ne null) a.postStop()
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(ChildTerminated(self))
finally try tellWatchersWeDied(a)
finally try unwatchWatchedActors(a)
finally {
if (system.settings.DebugLifecycle)
publish(Debug(self.path.toString, clazz(a), "stopped"))
clearActorFields(a)
actor = null
}
}
private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = try {
try resumeNonRecursive()
finally clearFailed() // must happen in any case, so that failure is propagated
// need to keep a snapshot of the surviving children before the new actor instance creates new ones
val survivors = children
val freshActor = newActor()
actor = freshActor // this must happen before postRestart has a chance to fail
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
freshActor.postRestart(cause)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
// only after parent is up and running again do restart the children which were not stopped
survivors foreach (child
try child.asInstanceOf[InternalActorRef].restart(cause)
catch {
case NonFatal(e) publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))
})
} catch {
case NonFatal(e)
clearActorFields(actor) // in order to prevent preRestart() from happening again
handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage)
}
final protected def handleFailure(child: ActorRef, cause: Throwable): Unit = getChildByRef(child) match {
case Some(stats) if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause
case None publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
}
final protected def handleChildTerminated(child: ActorRef): Unit = {
val status = removeChildAndGetStateChange(child)
/*
* if this fails, we do nothing in case of terminating/restarting state,
* otherwise tell the supervisor etc. (in that second case, the match
* below will hit the empty default case, too)
*/
try actor.supervisorStrategy.handleChildTerminated(this, child, children)
catch {
case NonFatal(e) handleInvokeFailure(e, "handleChildTerminated failed")
}
/*
* if the removal changed the state of the (terminating) children container,
* then we are continuing the previously suspended recreate/terminate action
*/
status match {
case Some(ChildrenContainer.Recreation(cause)) finishRecreate(cause, actor)
case Some(ChildrenContainer.Termination) finishTerminate()
case _
}
}
}

View file

@ -0,0 +1,54 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor.cell
import ReceiveTimeout.emptyReceiveTimeoutData
import akka.actor.ActorCell
import akka.actor.ActorCell.emptyCancellable
import akka.actor.Cancellable
import scala.concurrent.util.Duration
private[akka] object ReceiveTimeout {
final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, ActorCell.emptyCancellable)
}
private[akka] trait ReceiveTimeout { this: ActorCell
import ReceiveTimeout._
import ActorCell._
private var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData
final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match {
case Duration.Undefined None
case duration Some(duration)
}
final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined))
final def setReceiveTimeout(timeout: Duration): Unit =
receiveTimeoutData = (
if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout,
receiveTimeoutData._2)
final def resetReceiveTimeout(): Unit = setReceiveTimeout(None)
final def checkReceiveTimeout() {
val recvtimeout = receiveTimeoutData
if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) {
recvtimeout._2.cancel() //Cancel any ongoing future
//Only reschedule if desired and there are currently no more messages to be processed
receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, akka.actor.ReceiveTimeout))
} else cancelReceiveTimeout()
}
final def cancelReceiveTimeout(): Unit =
if (receiveTimeoutData._2 ne emptyCancellable) {
receiveTimeoutData._2.cancel()
receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable)
}
}

View file

@ -90,7 +90,7 @@ private[akka] case class Suspend() extends SystemMessage // sent to self from Ac
/**
* INTERNAL API
*/
private[akka] case class Resume() extends SystemMessage // sent to self from ActorCell.resume
private[akka] case class Resume(inResponseToFailure: Boolean) extends SystemMessage // sent to self from ActorCell.resume
/**
* INTERNAL API
*/
@ -306,7 +306,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
def suspend(actor: ActorCell): Unit = {
val mbox = actor.mailbox
if ((mbox.actor eq actor) && (mbox.dispatcher eq this))
mbox.becomeSuspended()
mbox.suspend()
}
/*
@ -314,7 +314,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
*/
def resume(actor: ActorCell): Unit = {
val mbox = actor.mailbox
if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.becomeOpen())
if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume())
registerForExecution(mbox, false, false)
}

View file

@ -25,12 +25,16 @@ private[akka] object Mailbox {
* the following assigned numbers CANNOT be changed without looking at the code which uses them!
*/
// primary status: only first three
// primary status
final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant
final val Suspended = 1 // Deliberately without type ascription to make it a compile-time constant
final val Closed = 2 // Deliberately without type ascription to make it a compile-time constant
final val Closed = 1 // Deliberately without type ascription to make it a compile-time constant
// secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 4 // Deliberately without type ascription to make it a compile-time constant
final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant
// shifted by 2: the suspend count!
final val shouldScheduleMask = 3
final val shouldNotProcessMask = ~2
final val suspendMask = ~3
final val suspendUnit = 4
// mailbox debugging helper using println (see below)
// since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1)
@ -101,10 +105,10 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset)
@inline
final def shouldProcessMessage: Boolean = (status & 3) == Open
final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0
@inline
final def isSuspended: Boolean = (status & 3) == Suspended
final def isSuspended: Boolean = (status & suspendMask) != 0
@inline
final def isClosed: Boolean = status == Closed
@ -121,23 +125,32 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
Unsafe.instance.putIntVolatile(this, AbstractMailbox.mailboxStatusOffset, newStatus)
/**
* set new primary status Open. Caller does not need to worry about whether
* Reduce the suspend count by one. Caller does not need to worry about whether
* status was Scheduled or not.
*
* @returns true if the suspend count reached zero
*/
@tailrec
final def becomeOpen(): Boolean = status match {
final def resume(): Boolean = status match {
case Closed setStatus(Closed); false
case s updateStatus(s, Open | s & Scheduled) || becomeOpen()
case s
val next = if (s < suspendUnit) s else s - suspendUnit
if (updateStatus(s, next)) next < suspendUnit
else resume()
}
/**
* set new primary status Suspended. Caller does not need to worry about whether
* Increment the suspend count by one. Caller does not need to worry about whether
* status was Scheduled or not.
*
* @returns true if the previous suspend count was zero
*/
@tailrec
final def becomeSuspended(): Boolean = status match {
final def suspend(): Boolean = status match {
case Closed setStatus(Closed); false
case s updateStatus(s, Suspended | s & Scheduled) || becomeSuspended()
case s
if (updateStatus(s, s + suspendUnit)) s < suspendUnit
else suspend()
}
/**
@ -158,11 +171,10 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
val s = status
/*
* only try to add Scheduled bit if pure Open/Suspended, not Closed or with
* Scheduled bit already set (this is one of the reasons why the numbers
* cannot be changed in object Mailbox above)
* Scheduled bit already set
*/
if (s <= Suspended) updateStatus(s, s | Scheduled) || setAsScheduled()
else false
if ((s & shouldScheduleMask) != Open) false
else updateStatus(s, s | Scheduled) || setAsScheduled()
}
/**
@ -171,12 +183,6 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
@tailrec
final def setAsIdle(): Boolean = {
val s = status
/*
* only try to remove Scheduled bit if currently Scheduled, not Closed or
* without Scheduled bit set (this is one of the reasons why the numbers
* cannot be changed in object Mailbox above)
*/
updateStatus(s, s & ~Scheduled) || setAsIdle()
}

View file

@ -73,7 +73,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
if (routerConfig.resizer.isEmpty && _routees.isEmpty)
throw new ActorInitializationException("router " + routerConfig + " did not register routees!")
start()
start(sendSupervise = false)
/*
* end of construction

View file

@ -214,7 +214,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
/**
* Resumes processing of `send` actions for the agent.
*/
def resume(): Unit = updater.resume()
def resume(): Unit = updater.resume(inResponseToFailure = false)
/**
* Closes the agents and makes it eligible for garbage collection.

View file

@ -235,7 +235,7 @@ private[akka] class RemoteActorRef private[akka] (
def suspend(): Unit = sendSystemMessage(Suspend())
def resume(): Unit = sendSystemMessage(Resume())
def resume(inResponseToFailure: Boolean): Unit = sendSystemMessage(Resume(inResponseToFailure))
def stop(): Unit = sendSystemMessage(Terminate())

View file

@ -154,7 +154,7 @@ class CallingThreadDispatcher(
override def suspend(actor: ActorCell) {
actor.mailbox match {
case m: CallingThreadMailbox m.suspendSwitch.switchOn
case m: CallingThreadMailbox m.suspendSwitch.switchOn; m.suspend()
case m m.systemEnqueue(actor.self, Suspend())
}
}
@ -166,11 +166,12 @@ class CallingThreadDispatcher(
val wasActive = queue.isActive
val switched = mbox.suspendSwitch.switchOff {
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(mbox, queue)
mbox.resume()
}
if (switched && !wasActive) {
runQueue(mbox, queue)
}
case m m.systemEnqueue(actor.self, Resume())
case m m.systemEnqueue(actor.self, Resume(false))
}
}

View file

@ -15,6 +15,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
import java.util.concurrent.TimeoutException
import akka.dispatch.{ MessageDispatcher, Dispatchers }
import akka.pattern.ask
import akka.actor.ActorSystemImpl
object TimingTest extends Tag("timing")
object LongRunningTest extends Tag("long-running")
@ -76,7 +77,9 @@ abstract class AkkaSpec(_system: ActorSystem)
beforeShutdown()
system.shutdown()
try system.awaitTermination(5 seconds) catch {
case _: TimeoutException system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
case _: TimeoutException
system.log.warning("Failed to stop [{}] within 5 seconds", system.name)
println(system.asInstanceOf[ActorSystemImpl].printTree)
}
atTermination()
}