diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala index b849796434..40907e74a0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLifeCycleSpec.scala @@ -118,6 +118,30 @@ class ActorLifeCycleSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitS expectNoMsg(1 seconds) system.stop(supervisor) } + + "clear the behavior stack upon restart" in { + case class Become(recv: ActorContext ⇒ Receive) + val a = system.actorOf(Props(new Actor { + def receive = { + case Become(beh) ⇒ context.become(beh(context), discardOld = false); sender ! "ok" + case x ⇒ sender ! 42 + } + })) + a ! "hello" + expectMsg(42) + a ! Become(ctx ⇒ { + case "fail" ⇒ throw new RuntimeException("buh") + case x ⇒ ctx.sender ! 43 + }) + expectMsg("ok") + a ! "hello" + expectMsg(43) + EventFilter[RuntimeException]("buh", occurrences = 1) intercept { + a ! "fail" + } + a ! "hello" + expectMsg(42) + } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala index 21c059ddba..1fa9a1ff03 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/FSMTimingSpec.scala @@ -147,7 +147,7 @@ object FSMTimingSpec { } def resume(actorRef: ActorRef): Unit = actorRef match { - case l: ActorRefWithCell ⇒ l.resume() + case l: ActorRefWithCell ⇒ l.resume(inResponseToFailure = false) case _ ⇒ } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index e932fd61e2..c0c7fa3c45 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -4,14 +4,24 @@ package akka.actor -import language.postfixOps - -import akka.testkit._ import java.util.concurrent.{ TimeUnit, CountDownLatch } + import scala.concurrent.Await -import akka.pattern.ask import scala.concurrent.util.Duration -import scala.concurrent.util.duration._ +import scala.concurrent.util.duration.intToDurationInt +import scala.math.BigInt.int2bigInt +import scala.util.Random +import scala.util.control.NoStackTrace + +import com.typesafe.config.{ ConfigFactory, Config } + +import SupervisorStrategy.{ Resume, Restart, Directive } +import akka.actor.SupervisorStrategy.seqThrowable2Decider +import akka.dispatch.{ MessageDispatcher, DispatcherPrerequisites, DispatcherConfigurator, Dispatcher } +import akka.pattern.ask +import akka.testkit.{ ImplicitSender, EventFilter, DefaultTimeout, AkkaSpec } +import akka.testkit.{ filterException, duration2TestDuration, TestLatch } +import akka.testkit.TestEvent.Mute object SupervisorHierarchySpec { class FireWorkerException(msg: String) extends Exception(msg) @@ -31,10 +41,361 @@ object SupervisorHierarchySpec { countDown.countDown() } } + + class Resumer extends Actor { + override def supervisorStrategy = OneForOneStrategy() { case _ ⇒ SupervisorStrategy.Resume } + def receive = { + case "spawn" ⇒ sender ! context.actorOf(Props[Resumer]) + case "fail" ⇒ throw new Exception("expected") + case "ping" ⇒ sender ! "pong" + } + } + + case class Event(msg: Any) { val time: Long = System.nanoTime } + case class ErrorLog(msg: String, log: Vector[Event]) + case class Failure(directive: Directive, log: Vector[Event]) extends RuntimeException with NoStackTrace { + override def toString = "Failure(" + directive + ")" + } + val strategy = OneForOneStrategy() { case Failure(directive, _) ⇒ directive } + + val config = ConfigFactory.parseString(""" + hierarchy { + type = "akka.actor.SupervisorHierarchySpec$MyDispatcherConfigurator" + } + akka.loglevel = INFO + akka.actor.debug.fsm = on + """) + + class MyDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites) + extends DispatcherConfigurator(config, prerequisites) { + + private val instance: MessageDispatcher = + new Dispatcher(prerequisites, + config.getString("id"), + config.getInt("throughput"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), + mailboxType, + configureExecutor(), + Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) { + + override def suspend(cell: ActorCell): Unit = { + val a = cell.actor.asInstanceOf[Hierarchy] + a.log :+= Event("suspended") + super.suspend(cell) + } + + override def resume(cell: ActorCell): Unit = { + val a = cell.actor.asInstanceOf[Hierarchy] + a.log :+= Event("resumed") + super.resume(cell) + } + + } + + override def dispatcher(): MessageDispatcher = instance + } + + class Hierarchy(depth: Int, breadth: Int, listener: ActorRef) extends Actor { + + override def preStart { + if (depth > 1) + for (_ ← 1 to breadth) + context.watch(context.actorOf(Props(new Hierarchy(depth - 1, breadth, listener)).withDispatcher("hierarchy"))) + listener ! self + } + override def postRestart(cause: Throwable) { + cause match { + case Failure(_, l) ⇒ log = l + } + log :+= Event("restarted") + } + + override def supervisorStrategy = strategy + override def preRestart(cause: Throwable, msg: Option[Any]): Unit = { + // do not scrap children + } + + override def postStop { + if (failed || suspended) { + listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log) + } + } + + var failed = false + var suspended = false + var log = Vector.empty[Event] + def check(msg: Any) = { + suspended = false + log :+= Event(msg) + if (failed) { + listener ! ErrorLog("processing message while failed", log) + failed = false + context stop self + } + } + + def receive = new Receive { + val handler: Receive = { + case f @ Failure(Resume, _) ⇒ suspended = true; throw f.copy(log = log) + case f: Failure ⇒ failed = true; throw f.copy(log = log) + case "ping" ⇒ Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong" + case Terminated(_) ⇒ listener ! ErrorLog("terminating", log); context stop self + } + override def isDefinedAt(msg: Any) = handler.isDefinedAt(msg) + override def apply(msg: Any) = { check(msg); handler(msg) } + } + } + + case class Work(n: Int) + + sealed trait Action + case class Ping(ref: ActorRef) extends Action + case class Fail(ref: ActorRef, directive: Directive) extends Action + + sealed trait State + case object Idle extends State + case object Init extends State + case object Stress extends State + case object Finishing extends State + case object LastPing extends State + case object Stopping extends State + case object Failed extends State + + /* + * This stress test will construct a supervision hierarchy of configurable + * depth and breadth and then randomly fail and check its actors. The actors + * perform certain checks internally (verifying that they do not run when + * suspended, for example), and they are checked for health by the test + * procedure. + * + * Execution happens in phases (which is the reason for FSM): + * + * Idle: + * - upon reception of Init message, construct hierary and go to Init state + * + * Init: + * - receive refs of all contained actors + * + * Stress: + * - deal out actions (Fail or "ping"), keeping the hierarchy busy + * - whenever all actors are in the "pinged" list (i.e. have not yet + * answered with a "pong"), delay processing of the next Work() by + * 100 millis + * - when receiving a Work() while all actors are "pinged", stop the + * hierarchy and go to the Stopping state + * + * Finishing: + * - after dealing out the last action, wait for the outstanding "pong" + * messages + * - when last "pong" is received, goto LastPing state + * - upon state timeout, stop the hierarchy and go to the Failed state + * + * LastPing: + * - upon entering this state, send a "ping" to all actors + * - when last "pong" is received, goto Stopping state + * - upon state timeout, stop the hierarchy and go to the Failed state + * + * Stopping: + * - upon entering this state, stop the hierarchy + * - upon termination of the hierarchy send back successful result + * + * Whenever an ErrorLog is received, goto Failed state + * + * Failed: + * - accumulate ErrorLog messages + * - upon termination of the hierarchy send back failed result and print + * the logs, merged and in chronological order. + * + * TODO RK: also test Stop directive, and keep a complete list of all + * actors ever created, then verify after stop()ping the hierarchy that + * all are terminated, transfer them to a WeakHashMap and verify that + * they are indeed GCed + * + * TODO RK: make hierarchy construction stochastic so that it includes + * different breadth (including the degenerate breadth-1 case). + * + * TODO RK: also test Escalate by adding an exception with a `var depth` + * which gets decremented within the supervisor and gets handled when zero + * is reached (Restart resolution) + * + * TODO RK: also test exceptions during recreate + * + * TODO RK: also test recreate including terminating children + * + * TODO RK: also verify that preRestart is not called more than once per instance + */ + + class StressTest(testActor: ActorRef, depth: Int, breadth: Int) extends Actor with LoggingFSM[State, Null] { + import context.system + + override def supervisorStrategy = strategy + + var children = Vector.empty[ActorRef] + var idleChildren = Vector.empty[ActorRef] + var pingChildren = Set.empty[ActorRef] + + val nextJob = Iterator.continually(Random.nextFloat match { + case x if x >= 0.5 ⇒ + // ping one child + val pick = ((x - 0.5) * 2 * idleChildren.size).toInt + val ref = idleChildren(pick) + idleChildren = idleChildren.take(pick) ++ idleChildren.drop(pick + 1) + pingChildren += ref + Ping(ref) + case x ⇒ + // fail one child + val pick = ((if (x > 0.25) x - 0.25 else x) * 4 * children.size).toInt + Fail(children(pick), if (x > 0.25) Restart else Resume) + }) + + val familySize = ((1 - BigInt(breadth).pow(depth)) / (1 - breadth)).toInt + var hierarchy: ActorRef = _ + + override def preRestart(cause: Throwable, msg: Option[Any]) { + throw new ActorKilledException("I want to DIE") + } + + override def postRestart(cause: Throwable) { + throw new ActorKilledException("I said I wanted to DIE, dammit!") + } + + override def postStop { + testActor ! "stressTestStopped" + } + + startWith(Idle, null) + + when(Idle) { + case Event(Init, _) ⇒ + hierarchy = context.watch(context.actorOf(Props(new Hierarchy(depth, breadth, self)).withDispatcher("hierarchy"))) + setTimer("phase", StateTimeout, 5 seconds, false) + goto(Init) + } + + when(Init) { + case Event(ref: ActorRef, _) ⇒ + if (idleChildren.nonEmpty || pingChildren.nonEmpty) + throw new IllegalStateException("received unexpected child " + children.size) + children :+= ref + if (children.size == familySize) { + idleChildren = children + goto(Stress) + } else stay + case Event(StateTimeout, _) ⇒ + testActor ! "only got %d out of %d refs".format(children.size, familySize) + stop() + } + + onTransition { + case Init -> Stress ⇒ + self ! Work(familySize * 1000) + // set timeout for completion of the whole test (i.e. including Finishing and Stopping) + setTimer("phase", StateTimeout, 60 seconds, false) + } + + val workSchedule = 250.millis + + when(Stress) { + case Event(w: Work, _) if idleChildren.isEmpty ⇒ + context stop hierarchy + goto(Failed) + case Event(Work(x), _) if x > 0 ⇒ + nextJob.next match { + case Ping(ref) ⇒ ref ! "ping" + case Fail(ref, dir) ⇒ ref ! Failure(dir, Vector.empty) + } + if (idleChildren.nonEmpty) self ! Work(x - 1) + else context.system.scheduler.scheduleOnce(workSchedule, self, Work(x - 1)) + stay + case Event(Work(_), _) ⇒ if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing) + case Event("pong", _) ⇒ + pingChildren -= sender + idleChildren :+= sender + stay + } + + when(Finishing) { + case Event("pong", _) ⇒ + pingChildren -= sender + idleChildren :+= sender + if (pingChildren.isEmpty) goto(LastPing) else stay + } + + onTransition { + case _ -> LastPing ⇒ + idleChildren foreach (_ ! "ping") + pingChildren ++= idleChildren + idleChildren = Vector.empty + } + + when(LastPing) { + case Event("pong", _) ⇒ + pingChildren -= sender + idleChildren :+= sender + if (pingChildren.isEmpty) goto(Stopping) else stay + } + + onTransition { + case _ -> Stopping ⇒ context stop hierarchy + } + + when(Stopping, stateTimeout = 5 seconds) { + case Event(Terminated(r), _) if r == hierarchy ⇒ + testActor ! "stressTestSuccessful" + stop + case Event(StateTimeout, _) ⇒ + testActor ! "timeout in Stopping" + stop + } + + var errors = Vector.empty[(ActorRef, ErrorLog)] + + when(Failed, stateTimeout = 5 seconds) { + case Event(e: ErrorLog, _) ⇒ + errors :+= sender -> e + stay + case Event(Terminated(r), _) if r == hierarchy ⇒ + printErrors() + testActor ! "stressTestFailed" + stop + case Event(StateTimeout, _) ⇒ + printErrors() + testActor ! "timeout in Failed" + stop + case Event("pong", _) ⇒ stay // don’t care? + } + + def printErrors(): Unit = { + val merged = errors flatMap { + case (ref, ErrorLog(msg, log)) ⇒ + println(ref + " " + msg) + log map (l ⇒ (l.time, ref, l.msg.toString)) + } + merged.sorted foreach println + } + + whenUnhandled { + case Event(e: ErrorLog, _) ⇒ + errors :+= sender -> e + // don’t stop the hierarchy, that is going to happen all by itself and in the right order + goto(Failed) + case Event(StateTimeout, _) ⇒ + println("pingChildren:\n" + pingChildren.mkString("\n")) + context stop hierarchy + goto(Failed) + case Event(msg, _) ⇒ + testActor ! ("received unexpected msg: " + msg) + stop + } + + initialize + + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout { +class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) with DefaultTimeout with ImplicitSender { import SupervisorHierarchySpec._ "A Supervisor Hierarchy" must { @@ -83,6 +444,67 @@ class SupervisorHierarchySpec extends AkkaSpec with DefaultTimeout { assert(countDownMax.await(2, TimeUnit.SECONDS)) } } + + "resume children after Resume" in { + val boss = system.actorOf(Props[Resumer], "resumer") + boss ! "spawn" + val middle = expectMsgType[ActorRef] + middle ! "spawn" + val worker = expectMsgType[ActorRef] + worker ! "ping" + expectMsg("pong") + EventFilter[Exception]("expected", occurrences = 1) intercept { + middle ! "fail" + } + middle ! "ping" + expectMsg("pong") + worker ! "ping" + expectMsg("pong") + } + + "suspend children while failing" in { + val latch = TestLatch() + val slowResumer = system.actorOf(Props(new Actor { + override def supervisorStrategy = OneForOneStrategy() { case _ ⇒ Await.ready(latch, 4.seconds.dilated); SupervisorStrategy.Resume } + def receive = { + case "spawn" ⇒ sender ! context.actorOf(Props[Resumer]) + } + }), "slowResumer") + slowResumer ! "spawn" + val boss = expectMsgType[ActorRef] + boss ! "spawn" + val middle = expectMsgType[ActorRef] + middle ! "spawn" + val worker = expectMsgType[ActorRef] + worker ! "ping" + expectMsg("pong") + EventFilter[Exception]("expected", occurrences = 1) intercept { + boss ! "fail" + } + worker ! "ping" + expectNoMsg(2 seconds) + latch.countDown() + expectMsg("pong") + } + + "survive being stressed" in { + system.eventStream.publish(Mute(EventFilter[Failure]())) + system.eventStream.publish(Mute(EventFilter.warning(start = "received dead letter"))) + + val fsm = system.actorOf(Props(new StressTest(testActor, 6, 3)), "stressTest") + + fsm ! FSM.SubscribeTransitionCallBack(system.actorOf(Props(new Actor { + def receive = { + case s: FSM.CurrentState[_] ⇒ log.info("{}", s) + case t: FSM.Transition[_] ⇒ log.info("{}", t) + } + }))) + + fsm ! Init + + expectMsg(70 seconds, "stressTestSuccessful") + expectMsg("stressTestStopped") + } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala index 54f531752a..b13457338c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorMiscSpec.scala @@ -137,6 +137,9 @@ class SupervisorMiscSpec extends AkkaSpec(SupervisorMiscSpec.config) with Defaul })) parent ! "engage" expectMsg("green") + EventFilter[IllegalStateException]("handleChildTerminated failed", occurrences = 1) intercept { + system.stop(parent) + } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index aa21459935..5362ad4153 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -341,7 +341,12 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende OneForOneStrategy(maxNrOfRetries = 3, withinTimeRange = 10 seconds)(classOf[Exception] :: Nil)))) val dyingProps = Props(new Actor { - if (inits.incrementAndGet % 2 == 0) throw new IllegalStateException("Don't wanna!") + val init = inits.getAndIncrement() + if (init % 3 == 1) throw new IllegalStateException("Don't wanna!") + + override def preRestart(cause: Throwable, msg: Option[Any]) { + if (init % 3 == 0) throw new IllegalStateException("Don't wanna!") + } def receive = { case Ping ⇒ sender ! PongMessage @@ -351,16 +356,20 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende throw e } }) - val dyingActor = Await.result((supervisor ? dyingProps).mapTo[ActorRef], timeout.duration) + supervisor ! dyingProps + val dyingActor = expectMsgType[ActorRef] - filterEvents(EventFilter[RuntimeException]("Expected", occurrences = 1), - EventFilter[IllegalStateException]("error while creating actor", occurrences = 1)) { + filterEvents( + EventFilter[RuntimeException]("Expected", occurrences = 1), + EventFilter[PreRestartException]("Don't wanna!", occurrences = 1), + EventFilter[PostRestartException]("Don't wanna!", occurrences = 1)) { intercept[RuntimeException] { Await.result(dyingActor.?(DieReply)(DilatedTimeout), DilatedTimeout) } } - Await.result(dyingActor.?(Ping)(DilatedTimeout), DilatedTimeout) must be === PongMessage + dyingActor ! Ping + expectMsg(PongMessage) inits.get must be(3) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 030f46801d..b5d284d7af 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -348,7 +348,7 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa assertNoCountDown(done, 1000, "Should not process messages while suspended") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, suspensions = 1) - a.resume + a.resume(inResponseToFailure = false) assertCountDown(done, 3.seconds.dilated.toMillis, "Should resume processing of messages when resumed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1, suspensions = 1, resumes = 1) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index ca57f28384..1d0f3ec416 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -65,7 +65,7 @@ class PriorityDispatcherSpec extends AkkaSpec(PriorityDispatcherSpec.config) wit val msgs = (1 to 100).toList for (m ← msgs) actor ! m - actor.resume //Signal the actor to start treating it's message backlog + actor.resume(inResponseToFailure = false) //Signal the actor to start treating it's message backlog Await.result(actor.?('Result).mapTo[List[Int]], timeout.duration) must be === msgs.reverse } diff --git a/akka-actor/src/main/java/akka/actor/AbstractActorCell.java b/akka-actor/src/main/java/akka/actor/cell/AbstractActorCell.java similarity index 63% rename from akka-actor/src/main/java/akka/actor/AbstractActorCell.java rename to akka-actor/src/main/java/akka/actor/cell/AbstractActorCell.java index 95fb7368bc..2d8c4fbc1e 100644 --- a/akka-actor/src/main/java/akka/actor/AbstractActorCell.java +++ b/akka-actor/src/main/java/akka/actor/cell/AbstractActorCell.java @@ -2,8 +2,9 @@ * Copyright (C) 2009-2012 Typesafe Inc. */ -package akka.actor; +package akka.actor.cell; +import akka.actor.ActorCell; import akka.util.Unsafe; final class AbstractActorCell { @@ -13,9 +14,9 @@ final class AbstractActorCell { static { try { - mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_mailboxDoNotCallMeDirectly")); - childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_childrenRefsDoNotCallMeDirectly")); - nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("_nextNameDoNotCallMeDirectly")); + mailboxOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Dispatch$$_mailboxDoNotCallMeDirectly")); + childrenOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_childrenRefsDoNotCallMeDirectly")); + nextNameOffset = Unsafe.instance.objectFieldOffset(ActorCell.class.getDeclaredField("akka$actor$cell$Children$$_nextNameDoNotCallMeDirectly")); } catch(Throwable t){ throw new ExceptionInInitializerError(t); } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 8b9476efe9..c1ae9c57bf 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -122,12 +122,36 @@ class InvalidActorNameException(message: String) extends AkkaException(message) /** * An ActorInitializationException is thrown when the the initialization logic for an Actor fails. */ -class ActorInitializationException private[akka] (actor: ActorRef, message: String, cause: Throwable) - extends AkkaException(message, cause) /*with NoStackTrace*/ { +class ActorInitializationException private[akka] (val actor: ActorRef, message: String, cause: Throwable) + extends AkkaException(message, cause) { def this(msg: String) = this(null, msg, null) def this(actor: ActorRef, msg: String) = this(actor, msg, null) } +/** + * A PreRestartException is thrown when the preRestart() method failed. + * + * @param actor is the actor whose preRestart() hook failed + * @param cause is the exception thrown by that actor within preRestart() + * @param origCause is the exception which caused the restart in the first place + * @param msg is the message which was optionally passed into preRestart() + */ +class PreRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable, val msg: Option[Any]) + extends ActorInitializationException(actor, "exception in preRestart(" + origCause.getClass + ", " + msg.map(_.getClass) + ")", cause) { +} + +/** + * A PostRestartException is thrown when constructor or postRestart() method + * fails during a restart attempt. + * + * @param actor is the actor whose constructor or postRestart() hook failed + * @param cause is the exception thrown by that actor within preRestart() + * @param origCause is the exception which caused the restart in the first place + */ +class PostRestartException private[akka] (actor: ActorRef, cause: Throwable, val origCause: Throwable) + extends ActorInitializationException(actor, "exception post restart (" + origCause.getClass + ")", cause) { +} + /** * InvalidMessageException is thrown when an invalid message is sent to an Actor. * Technically it's only "null" which is an InvalidMessageException but who knows, diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 4b120a969e..125ae22133 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -4,23 +4,17 @@ package akka.actor -import language.existentials +import java.io.{ ObjectOutputStream, NotSerializableException } -import akka.dispatch._ import scala.annotation.tailrec -import java.util.concurrent.TimeUnit -import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.event.Logging.{ Debug, Warning, Error } -import akka.japi.Procedure -import java.io.{ NotSerializableException, ObjectOutputStream } -import akka.serialization.SerializationExtension -import akka.event.Logging.LogEventException -import scala.collection.immutable.{ TreeSet, TreeMap } -import akka.util.{ Unsafe, Helpers } -import scala.util.control.NonFatal +import scala.collection.immutable.TreeSet import scala.concurrent.util.Duration +import scala.util.control.NonFatal -//TODO: everything here for current compatibility - could be limited more +import akka.actor.cell.ChildrenContainer +import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, MessageDispatcher, Envelope, Create, ChildTerminated } +import akka.event.Logging.{ LogEvent, Debug } +import akka.japi.Procedure /** * The actor context - the view of the actor cell from the actor. @@ -96,9 +90,9 @@ trait ActorContext extends ActorRefFactory { def sender: ActorRef /** - * Returns all supervised children; this method returns a view onto the - * internal collection of children. Targeted lookups should be using - * `actorFor` instead for performance reasons: + * Returns all supervised children; this method returns a view (i.e. a lazy + * collection) onto the internal collection of children. Targeted lookups + * should be using `actorFor` instead for performance reasons: * * {{{ * val badLookup = context.children find (_.path.name == "kid") @@ -194,7 +188,7 @@ private[akka] trait Cell { /** * Recursively resume this actor and all its children. */ - def resume(): Unit + def resume(inResponseToFailure: Boolean): Unit /** * Restart this actor (will recursively restart or stop all children). */ @@ -215,7 +209,11 @@ private[akka] trait Cell { /** * All children of this actor, including only reserved-names. */ - def childrenRefs: ActorCell.ChildrenContainer + def childrenRefs: ChildrenContainer + /** + * Get the stats for the named child, if that exists. + */ + def getChildByName(name: String): Option[ChildRestartStats] /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. @@ -258,436 +256,102 @@ private[akka] object ActorCell { def cancel() {} } - final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, emptyCancellable) - final val emptyBehaviorStack: List[Actor.Receive] = Nil final val emptyActorRefSet: Set[ActorRef] = TreeSet.empty - - sealed trait SuspendReason - case object UserRequest extends SuspendReason - case class Recreation(cause: Throwable) extends SuspendReason - case object Termination extends SuspendReason - - trait ChildrenContainer { - def add(child: ActorRef): ChildrenContainer - def remove(child: ActorRef): ChildrenContainer - def getByName(name: String): Option[ChildRestartStats] - def getByRef(actor: ActorRef): Option[ChildRestartStats] - def children: Iterable[ActorRef] - def stats: Iterable[ChildRestartStats] - def shallDie(actor: ActorRef): ChildrenContainer - /** - * reserve that name or throw an exception - */ - def reserve(name: String): ChildrenContainer - /** - * cancel a reservation - */ - def unreserve(name: String): ChildrenContainer - } - - trait EmptyChildrenContainer extends ChildrenContainer { - val emptyStats = TreeMap.empty[String, ChildStats] - def add(child: ActorRef): ChildrenContainer = - new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child))) - def remove(child: ActorRef): ChildrenContainer = this - def getByName(name: String): Option[ChildRestartStats] = None - def getByRef(actor: ActorRef): Option[ChildRestartStats] = None - def children: Iterable[ActorRef] = Nil - def stats: Iterable[ChildRestartStats] = Nil - def shallDie(actor: ActorRef): ChildrenContainer = this - def reserve(name: String): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, ChildNameReserved)) - def unreserve(name: String): ChildrenContainer = this - override def toString = "no children" - } - - /** - * This is the empty container, shared among all leaf actors. - */ - object EmptyChildrenContainer extends EmptyChildrenContainer - - /** - * This is the empty container which is installed after the last child has - * terminated while stopping; it is necessary to distinguish from the normal - * empty state while calling handleChildTerminated() for the last time. - */ - object TerminatedChildrenContainer extends EmptyChildrenContainer { - override def add(child: ActorRef): ChildrenContainer = this - override def reserve(name: String): ChildrenContainer = - throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated") - } - - /** - * Normal children container: we do have at least one child, but none of our - * children are currently terminating (which is the time period between - * calling context.stop(child) and processing the ChildTerminated() system - * message). - */ - class NormalChildrenContainer(c: TreeMap[String, ChildStats]) extends ChildrenContainer { - - def add(child: ActorRef): ChildrenContainer = - new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child))) - - def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name) - - def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { - case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] - case _ ⇒ None - } - - def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { - case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] - case _ ⇒ None - } - - def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } - - def stats: Iterable[ChildRestartStats] = c.values.collect { case c: ChildRestartStats ⇒ c } - - def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest) - - def reserve(name: String): ChildrenContainer = - if (c contains name) - throw new InvalidActorNameException("actor name " + name + " is not unique!") - else new NormalChildrenContainer(c.updated(name, ChildNameReserved)) - - def unreserve(name: String): ChildrenContainer = c.get(name) match { - case Some(ChildNameReserved) ⇒ NormalChildrenContainer(c - name) - case _ ⇒ this - } - - override def toString = - if (c.size > 20) c.size + " children" - else c.mkString("children:\n ", "\n ", "") - } - - object NormalChildrenContainer { - def apply(c: TreeMap[String, ChildStats]): ChildrenContainer = - if (c.isEmpty) EmptyChildrenContainer - else new NormalChildrenContainer(c) - } - - /** - * Waiting state: there are outstanding termination requests (i.e. context.stop(child) - * was called but the corresponding ChildTerminated() system message has not yet been - * processed). There could be no specific reason (UserRequested), we could be Restarting - * or Terminating. - * - * Removing the last child which was supposed to be terminating will return a different - * type of container, depending on whether or not children are left and whether or not - * the reason was “Terminating”. - */ - case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason) - extends ChildrenContainer { - - def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child))) - - def remove(child: ActorRef): ChildrenContainer = { - val t = toDie - child - if (t.isEmpty) reason match { - case Termination ⇒ TerminatedChildrenContainer - case _ ⇒ NormalChildrenContainer(c - child.path.name) - } - else copy(c - child.path.name, t) - } - - def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { - case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] - case _ ⇒ None - } - - def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { - case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] - case _ ⇒ None - } - - def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } - - def stats: Iterable[ChildRestartStats] = c.values.collect { case c: ChildRestartStats ⇒ c } - - def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor) - - def reserve(name: String): ChildrenContainer = reason match { - case Termination ⇒ throw new IllegalStateException("cannot reserve actor name '" + name + "': terminating") - case _ ⇒ - if (c contains name) - throw new InvalidActorNameException("actor name " + name + " is not unique!") - else copy(c = c.updated(name, ChildNameReserved)) - } - - def unreserve(name: String): ChildrenContainer = c.get(name) match { - case Some(ChildNameReserved) ⇒ copy(c = c - name) - case _ ⇒ this - } - - override def toString = - if (c.size > 20) c.size + " children" - else c.mkString("children (" + toDie.size + " terminating):\n ", "\n ", "\n") + toDie - } } //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) //vars don't need volatile since it's protected with the mailbox status //Make sure that they are not read/written outside of a message processing (systemInvoke/invoke) +/** + * Everything in here is completely Akka PRIVATE. You will not find any + * supported APIs in this place. This is not the API you were looking + * for! (waves hand) + */ private[akka] class ActorCell( val system: ActorSystemImpl, val self: InternalActorRef, val props: Props, - @volatile var parent: InternalActorRef) extends UntypedActorContext with Cell { + val parent: InternalActorRef) + extends UntypedActorContext with Cell + with cell.ReceiveTimeout + with cell.Children + with cell.Dispatch + with cell.DeathWatch + with cell.FaultHandling { - import AbstractActorCell.{ mailboxOffset, childrenOffset, nextNameOffset } import ActorCell._ final def isLocal = true final def systemImpl = system - protected final def guardian = self - protected final def lookupRoot = self - final def provider = system.provider - override final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match { - case Duration.Undefined ⇒ None - case duration ⇒ Some(duration) - } - - final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined)) - - override final def setReceiveTimeout(timeout: Duration): Unit = - receiveTimeoutData = ( - if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout, - receiveTimeoutData._2) - - final override def resetReceiveTimeout(): Unit = setReceiveTimeout(None) - - /** - * In milliseconds - */ - var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData - - @volatile - private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer - - def childrenRefs: ChildrenContainer = Unsafe.instance.getObjectVolatile(this, childrenOffset).asInstanceOf[ChildrenContainer] - - private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = - Unsafe.instance.compareAndSwapObject(this, childrenOffset, oldChildren, newChildren) - - @tailrec private def reserveChild(name: String): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.reserve(name)) || reserveChild(name) - } - - @tailrec private def unreserveChild(name: String): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name) - } - - @tailrec private def addChild(ref: ActorRef): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.add(ref)) || addChild(ref) - } - - @tailrec private def shallDie(ref: ActorRef): Boolean = { - val c = childrenRefs - swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) - } - - @tailrec private def removeChild(ref: ActorRef): ChildrenContainer = { - val c = childrenRefs - val n = c.remove(ref) - if (swapChildrenRefs(c, n)) n - else removeChild(ref) - } - - @tailrec private def setChildrenTerminationReason(reason: SuspendReason): Boolean = { - childrenRefs match { - case c: TerminatingChildrenContainer ⇒ swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason) - case _ ⇒ false - } - } - - private def isTerminating = childrenRefs match { - case TerminatingChildrenContainer(_, _, Termination) ⇒ true - case TerminatedChildrenContainer ⇒ true - case _ ⇒ false - } - private def isNormal = childrenRefs match { - case TerminatingChildrenContainer(_, _, Termination | _: Recreation) ⇒ false - case _ ⇒ true - } - - private def _actorOf(props: Props, name: String, async: Boolean): ActorRef = { - if (system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { - val ser = SerializationExtension(system) - ser.serialize(props.creator) match { - case Left(t) ⇒ throw t - case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass) match { - case Left(t) ⇒ throw t - case _ ⇒ //All good - } - } - } - /* - * in case we are currently terminating, fail external attachChild requests - * (internal calls cannot happen anyway because we are suspended) - */ - if (isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated") - else { - reserveChild(name) - // this name will either be unreserved or overwritten with a real child below - val actor = - try { - provider.actorOf(systemImpl, props, self, self.path / name, - systemService = false, deploy = None, lookupDeploy = true, async = async) - } catch { - case NonFatal(e) ⇒ - unreserveChild(name) - throw e - } - addChild(actor) - actor - } - } - - def actorOf(props: Props): ActorRef = _actorOf(props, randomName(), async = false) - - def actorOf(props: Props, name: String): ActorRef = _actorOf(props, checkName(name), async = false) - - private def checkName(name: String): String = { - import ActorPath.ElementRegex - name match { - case null ⇒ throw new InvalidActorNameException("actor name must not be null") - case "" ⇒ throw new InvalidActorNameException("actor name must not be empty") - case ElementRegex() ⇒ name - case _ ⇒ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex) - } - } - - private[akka] def attachChild(props: Props, name: String): ActorRef = - _actorOf(props, checkName(name), async = true) - - private[akka] def attachChild(props: Props): ActorRef = - _actorOf(props, randomName(), async = true) - - final def stop(actor: ActorRef): Unit = { - val started = actor match { - case r: RepointableRef ⇒ r.isStarted - case _ ⇒ true - } - if (childrenRefs.getByRef(actor).isDefined && started) shallDie(actor) - actor.asInstanceOf[InternalActorRef].stop() - } - + private[this] var _actor: Actor = _ + def actor: Actor = _actor + protected def actor_=(a: Actor): Unit = _actor = a var currentMessage: Envelope = _ - var actor: Actor = _ private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack - var watching: Set[ActorRef] = emptyActorRefSet - var watchedBy: Set[ActorRef] = emptyActorRefSet - @volatile private var _nextNameDoNotCallMeDirectly = 0L - final protected def randomName(): String = { - @tailrec def inc(): Long = { - val current = Unsafe.instance.getLongVolatile(this, nextNameOffset) - if (Unsafe.instance.compareAndSwapLong(this, nextNameOffset, current, current + 1)) current - else inc() + /* + * MESSAGE PROCESSING + */ + //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status + final def systemInvoke(message: SystemMessage): Unit = try { + message match { + case Create() ⇒ create() + case Recreate(cause) ⇒ faultRecreate(cause) + case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher) + case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher) + case Suspend() ⇒ faultSuspend() + case Resume(inRespToFailure) ⇒ faultResume(inRespToFailure) + case Terminate() ⇒ terminate() + case Supervise(child) ⇒ supervise(child) + case ChildTerminated(child) ⇒ handleChildTerminated(child) + case NoMessage ⇒ // only here to suppress warning } - Helpers.base64(inc()) + } catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) } - @volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status + //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status + final def invoke(messageHandle: Envelope): Unit = try { + currentMessage = messageHandle + cancelReceiveTimeout() // FIXME: leave this here??? + messageHandle.message match { + case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) + case msg ⇒ receiveMessage(msg) + } + currentMessage = null // reset current message after successful invocation + } catch { + case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, e.getMessage) + } finally { + checkReceiveTimeout // Reschedule receive timeout + } - /** - * INTERNAL API - * - * Returns a reference to the current mailbox + def autoReceiveMessage(msg: Envelope): Unit = { + if (system.settings.DebugAutoReceive) + publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) + + msg.message match { + case Failed(cause) ⇒ handleFailure(sender, cause) + case t: Terminated ⇒ watchedActorTerminated(t.actor); receiveMessage(t) + case Kill ⇒ throw new ActorKilledException("Kill") + case PoisonPill ⇒ self.stop() + case SelectParent(m) ⇒ parent.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ for (c ← getChildByName(name)) c.child.tell(m, msg.sender) + case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) + } + } + + final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled) + + /* + * ACTOR CONTEXT IMPLEMENTATION */ - @inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, mailboxOffset).asInstanceOf[Mailbox] - - /** - * INTERNAL API - * - * replaces the current mailbox using getAndSet semantics - */ - @tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = { - val oldMailbox = mailbox - if (!Unsafe.instance.compareAndSwapObject(this, mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox) - else oldMailbox - } - - final def hasMessages: Boolean = mailbox.hasMessages - - final def numberOfMessages: Int = mailbox.numberOfMessages - - val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) - - /** - * UntypedActorContext impl - */ - final def getDispatcher(): MessageDispatcher = dispatcher - - final def isTerminated: Boolean = mailbox.isClosed - - final def start(): this.type = { - - /* - * Create the mailbox and enqueue the Create() message to ensure that - * this is processed before anything else. - */ - swapMailbox(dispatcher.createMailbox(this)) - mailbox.setActor(this) - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - mailbox.systemEnqueue(self, Create()) - - // This call is expected to start off the actor by scheduling its mailbox. - dispatcher.attach(this) - - this - } - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def resume(): Unit = dispatcher.systemDispatch(this, Resume()) - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) - - override final def watch(subject: ActorRef): ActorRef = subject match { - case a: InternalActorRef ⇒ - if (a != self && !watching.contains(a)) { - a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching += a - } - a - } - - override final def unwatch(subject: ActorRef): ActorRef = subject match { - case a: InternalActorRef ⇒ - if (a != self && watching.contains(a)) { - a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching -= a - } - a - } - - final def children: Iterable[ActorRef] = childrenRefs.children - - /** - * Impl UntypedActorContext - */ - final def getChildren(): java.lang.Iterable[ActorRef] = - scala.collection.JavaConverters.asJavaIterableConverter(children).asJava - - def tell(message: Any, sender: ActorRef): Unit = - dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) - - override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message) final def sender: ActorRef = currentMessage match { case null ⇒ system.deadLetters @@ -695,6 +359,25 @@ private[akka] class ActorCell( case _ ⇒ system.deadLetters } + def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit = + behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack) + + def become(behavior: Procedure[Any]): Unit = become(behavior, false) + + def become(behavior: Procedure[Any], discardOld: Boolean): Unit = + become({ case msg ⇒ behavior.apply(msg) }: Actor.Receive, discardOld) + + def unbecome(): Unit = { + val original = behaviorStack + behaviorStack = + if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack + else original.tail + } + + /* + * ACTOR INSTANCE HANDLING + */ + //This method is in charge of setting up the contextStack and create a new instance of the Actor protected def newActor(): Actor = { contextStack.set(this :: contextStack.get) @@ -715,321 +398,43 @@ private[akka] class ActorCell( } } - //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status - final def systemInvoke(message: SystemMessage) { - - def create(): Unit = if (isNormal) { - try { - val created = newActor() - actor = created - created.preStart() - checkReceiveTimeout - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) - } catch { - case NonFatal(i: InstantiationException) ⇒ - throw new ActorInitializationException(self, - """exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either, + private def create(): Unit = if (isNormal) { + try { + val created = newActor() + actor = created + created.preStart() + checkReceiveTimeout + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(created), "started (" + created + ")")) + } catch { + case NonFatal(i: InstantiationException) ⇒ + throw new ActorInitializationException(self, + """exception during creation, this problem is likely to occur because the class of the Actor you tried to create is either, a non-static inner class (in which case make it a static inner class or use Props(new ...) or Props( new UntypedActorFactory ... ) or is missing an appropriate, reachable no-args constructor. """, i.getCause) - case NonFatal(e) ⇒ throw new ActorInitializationException(self, "exception during creation", e) - } - } - - def recreate(cause: Throwable): Unit = if (isNormal) { - try { - val failedActor = actor - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(failedActor), "restarting")) - if (failedActor ne null) { - val c = currentMessage //One read only plz - try { - if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) - } finally { - clearActorFields(failedActor) - } - } - childrenRefs match { - case ct: TerminatingChildrenContainer ⇒ - setChildrenTerminationReason(Recreation(cause)) - dispatcher suspend this - case _ ⇒ - doRecreate(cause, failedActor) - } - } catch { - case NonFatal(e) ⇒ throw new ActorInitializationException(self, "exception during creation", e match { - case i: InstantiationException ⇒ i.getCause - case other ⇒ other - }) - } - } - - def suspend(): Unit = if (isNormal) dispatcher suspend this - - def resume(): Unit = if (isNormal) dispatcher resume this - - def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { - val watcheeSelf = watchee == self - val watcherSelf = watcher == self - - if (watcheeSelf && !watcherSelf) { - if (!watchedBy.contains(watcher)) { - watchedBy += watcher - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher)) - } - } else if (!watcheeSelf && watcherSelf) { - watch(watchee) - } else { - system.eventStream.publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self))) - } - } - - def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { - val watcheeSelf = watchee == self - val watcherSelf = watcher == self - - if (watcheeSelf && !watcherSelf) { - if (watchedBy.contains(watcher)) { - watchedBy -= watcher - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher)) - } - } else if (!watcheeSelf && watcherSelf) { - unwatch(watchee) - } else { - system.eventStream.publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self))) - } - } - - def terminate() { - setReceiveTimeout(None) - cancelReceiveTimeout - - // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) - children foreach stop - - childrenRefs match { - case ct: TerminatingChildrenContainer ⇒ - setChildrenTerminationReason(Termination) - // do not process normal messages while waiting for all children to terminate - dispatcher suspend this - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopping")) - case _ ⇒ doTerminate() - } - } - - def supervise(child: ActorRef): Unit = if (!isTerminating) { - if (childrenRefs.getByRef(child).isEmpty) addChild(child) - handleSupervise(child) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) - } - - try { - message match { - case Create() ⇒ create() - case Recreate(cause) ⇒ recreate(cause) - case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher) - case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher) - case Suspend() ⇒ suspend() - case Resume() ⇒ resume() - case Terminate() ⇒ terminate() - case Supervise(child) ⇒ supervise(child) - case ChildTerminated(child) ⇒ handleChildTerminated(child) - case NoMessage ⇒ // only here to suppress warning - } - } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, "error while processing " + message) + case NonFatal(e) ⇒ throw new ActorInitializationException(self, "exception during creation", e) } } - //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status - final def invoke(messageHandle: Envelope): Unit = try { - currentMessage = messageHandle - cancelReceiveTimeout() // FIXME: leave this here??? - messageHandle.message match { - case msg: AutoReceivedMessage ⇒ autoReceiveMessage(messageHandle) - case msg ⇒ receiveMessage(msg) - } - currentMessage = null // reset current message after successful invocation - } catch { - case e @ (_: InterruptedException | NonFatal(_)) ⇒ handleInvokeFailure(e, e.getMessage) - } finally { - checkReceiveTimeout // Reschedule receive timeout - } - - final def handleInvokeFailure(t: Throwable, message: String): Unit = try { - dispatcher.reportFailure(new LogEventException(Error(t, self.path.toString, clazz(actor), message), t)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - if (actor ne null) actor.supervisorStrategy.handleSupervisorFailing(self, children) - } finally { - t match { // Wrap InterruptedExceptions and rethrow - case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t)), self); throw t - case _ ⇒ parent.tell(Failed(t), self) - } - } - - def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit = - behaviorStack = behavior :: (if (discardOld && behaviorStack.nonEmpty) behaviorStack.tail else behaviorStack) - - /** - * UntypedActorContext impl - */ - def become(behavior: Procedure[Any]): Unit = become(behavior, false) - - /* - * UntypedActorContext impl - */ - def become(behavior: Procedure[Any], discardOld: Boolean): Unit = - become({ case msg ⇒ behavior.apply(msg) }: Actor.Receive, discardOld) - - def unbecome(): Unit = { - val original = behaviorStack - behaviorStack = - if (original.isEmpty || original.tail.isEmpty) actor.receive :: emptyBehaviorStack - else original.tail - } - - def autoReceiveMessage(msg: Envelope): Unit = { - if (system.settings.DebugAutoReceive) - system.eventStream.publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) - - msg.message match { - case Failed(cause) ⇒ handleFailure(sender, cause) - case t: Terminated ⇒ watching -= t.actor; receiveMessage(t) - case Kill ⇒ throw new ActorKilledException("Kill") - case PoisonPill ⇒ self.stop() - case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ for (c ← childrenRefs getByName name) c.child.tell(m, msg.sender) - case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) - } - } - - final def receiveMessage(msg: Any): Unit = behaviorStack.head.applyOrElse(msg, actor.unhandled) - - private def doTerminate() { - val a = actor - try { - try { - if (a ne null) a.postStop() - } finally { - dispatcher.detach(this) - } - } finally { - try { - parent.sendSystemMessage(ChildTerminated(self)) - - if (!watchedBy.isEmpty) { - val terminated = Terminated(self)(existenceConfirmed = true) - try { - watchedBy foreach { - watcher ⇒ - try watcher.tell(terminated, self) catch { - case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) - } - } - } finally watchedBy = emptyActorRefSet - } - - if (!watching.isEmpty) { - try { - watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - case watchee: InternalActorRef ⇒ try watchee.sendSystemMessage(Unwatch(watchee, self)) catch { - case NonFatal(t) ⇒ system.eventStream.publish(Error(t, self.path.toString, clazz(a), "deathwatch")) - } - } - } finally watching = emptyActorRefSet - } - if (system.settings.DebugLifecycle) - system.eventStream.publish(Debug(self.path.toString, clazz(a), "stopped")) - } finally { - behaviorStack = emptyBehaviorStack - clearActorFields(a) - actor = null - } - } - } - - private def doRecreate(cause: Throwable, failedActor: Actor): Unit = try { - // after all killed children have terminated, recreate the rest, then go on to start the new instance - actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children) - val freshActor = newActor() - actor = freshActor // this must happen before postRestart has a chance to fail - if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. - - freshActor.postRestart(cause) - if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted")) - - dispatcher.resume(this) - } catch { - case NonFatal(e) ⇒ try { - dispatcher.reportFailure(new LogEventException(Error(e, self.path.toString, clazz(actor), "error while creating actor"), e)) - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - actor.supervisorStrategy.handleSupervisorFailing(self, children) // FIXME Should this be called on actor or failedActor? - clearActorFields(actor) // If this fails, we need to ensure that preRestart isn't called. - } finally { - parent.tell(Failed(new ActorInitializationException(self, "exception during re-creation", e)), self) - } - } - - final def handleFailure(child: ActorRef, cause: Throwable): Unit = childrenRefs.getByRef(child) match { - case Some(stats) ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, childrenRefs.stats)) throw cause - case None ⇒ system.eventStream.publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) - } - - final def handleChildTerminated(child: ActorRef): Unit = try { - childrenRefs match { - case tc @ TerminatingChildrenContainer(_, _, reason) ⇒ - val n = removeChild(child) - actor.supervisorStrategy.handleChildTerminated(this, child, children) - if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match { - case Recreation(cause) ⇒ doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate" - case Termination ⇒ doTerminate() - case _ ⇒ - } - case _ ⇒ - removeChild(child) - actor.supervisorStrategy.handleChildTerminated(this, child, children) - } - } catch { - case NonFatal(e) ⇒ - try { - dispatcher suspend this - actor.supervisorStrategy.handleSupervisorFailing(self, children) - } finally { - parent.tell(Failed(e), self) - } + private def supervise(child: ActorRef): Unit = if (!isTerminating) { + addChild(child) + handleSupervise(child) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) } + // future extension point protected def handleSupervise(child: ActorRef): Unit = child match { case r: RepointableActorRef ⇒ r.activate() case _ ⇒ } - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) - - final def checkReceiveTimeout() { - val recvtimeout = receiveTimeoutData - if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) { - recvtimeout._2.cancel() //Cancel any ongoing future - //Only reschedule if desired and there are currently no more messages to be processed - receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, ReceiveTimeout)) - } else cancelReceiveTimeout() - - } - - final def cancelReceiveTimeout(): Unit = - if (receiveTimeoutData._2 ne emptyCancellable) { - receiveTimeoutData._2.cancel() - receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable) - } - - final def clearActorFields(actorInstance: Actor): Unit = { + final protected def clearActorFields(actorInstance: Actor): Unit = { setActorFields(actorInstance, context = null, self = system.deadLetters) currentMessage = null + behaviorStack = emptyBehaviorStack } - final def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef) { + final protected def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef) { @tailrec def lookupAndSetField(clazz: Class[_], actor: Actor, name: String, value: Any): Boolean = { val success = try { @@ -1054,6 +459,9 @@ private[akka] class ActorCell( } } - private final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass + // logging is not the main purpose, and if it fails there’s nothing we can do + protected final def publish(e: LogEvent): Unit = try system.eventStream.publish(e) catch { case NonFatal(_) ⇒ } + + protected final def clazz(o: AnyRef): Class[_] = if (o eq null) this.getClass else o.getClass } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 8d42714b00..00a84f956a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -191,7 +191,7 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe /* * Actor life-cycle management, invoked only internally (in response to user requests via ActorContext). */ - def resume(): Unit + def resume(inResponseToFailure: Boolean): Unit def suspend(): Unit def restart(cause: Throwable): Unit def stop(): Unit @@ -262,10 +262,7 @@ private[akka] class LocalActorRef private[akka] ( * that is reached). */ private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor) - actorCell.start() - - // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - _supervisor.sendSystemMessage(akka.dispatch.Supervise(this)) + actorCell.start(sendSupervise = true) protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell = new ActorCell(system, ref, props, supervisor) @@ -291,7 +288,7 @@ private[akka] class LocalActorRef private[akka] ( /** * Resumes a suspended actor. */ - override def resume(): Unit = actorCell.resume() + override def resume(inResponseToFailure: Boolean): Unit = actorCell.resume(inResponseToFailure) /** * Shuts down the actor and its message queue @@ -307,7 +304,7 @@ private[akka] class LocalActorRef private[akka] ( * to inject “synthetic” actor paths like “/temp”. */ protected def getSingleChild(name: String): InternalActorRef = - actorCell.childrenRefs.getByName(name) match { + actorCell.getChildByName(name) match { case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef] case None ⇒ Nobody } @@ -391,7 +388,7 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef { override def getChild(names: Iterator[String]): InternalActorRef = if (names.forall(_.isEmpty)) this else Nobody override def suspend(): Unit = () - override def resume(): Unit = () + override def resume(inResponseToFailure: Boolean): Unit = () override def stop(): Unit = () override def isTerminated = false diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index c31c451c44..5bc2708e8c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -17,6 +17,7 @@ import akka.util._ import akka.util.internal.{ HashedWheelTimer, ConcurrentIdentityHashMap } import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } import java.util.concurrent.TimeUnit.MILLISECONDS +import akka.actor.cell.ChildrenContainer object ActorSystem { @@ -701,19 +702,30 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, node match { case wc: ActorRefWithCell ⇒ val cell = wc.underlying - indent + "-> " + node.path.name + " " + Logging.simpleName(node) + " " + + (if (indent.isEmpty) "-> " else indent.dropRight(1) + "⌊-> ") + + node.path.name + " " + Logging.simpleName(node) + " " + (cell match { case real: ActorCell ⇒ if (real.actor ne null) real.actor.getClass else "null" case _ ⇒ Logging.simpleName(cell) }) + + (cell match { + case real: ActorCell ⇒ " status=" + real.mailbox.status + case _ ⇒ "" + }) + " " + (cell.childrenRefs match { - case ActorCell.TerminatingChildrenContainer(_, toDie, reason) ⇒ + case ChildrenContainer.TerminatingChildrenContainer(_, toDie, reason) ⇒ "Terminating(" + reason + ")" + - (toDie.toSeq.sorted mkString ("\n" + indent + " toDie: ", "\n" + indent + " ", "")) + (toDie.toSeq.sorted mkString ("\n" + indent + " | toDie: ", "\n" + indent + " | ", "")) + case x @ (ChildrenContainer.TerminatedChildrenContainer | ChildrenContainer.EmptyChildrenContainer) ⇒ x.toString + case n: ChildrenContainer.NormalChildrenContainer ⇒ n.c.size + " children" case x ⇒ Logging.simpleName(x) }) + (if (cell.childrenRefs.children.isEmpty) "" else "\n") + - (cell.childrenRefs.children.toSeq.sorted map (printNode(_, indent + " |")) mkString ("\n")) + ({ + val children = cell.childrenRefs.children.toSeq.sorted + val bulk = children.dropRight(1) map (printNode(_, indent + " |")) + bulk ++ (children.lastOption map (printNode(_, indent + " "))) + } mkString ("\n")) case _ ⇒ indent + node.path.name + " " + Logging.simpleName(node) } diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 803c46ed26..bdeec7a8bb 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -208,19 +208,31 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits { } /** - * An Akka SupervisorStrategy is the policy to apply for crashing children + * An Akka SupervisorStrategy is the policy to apply for crashing children. + * + * IMPORTANT: + * + * You should not normally need to create new subclasses, instead use the + * existing [[akka.actor.OneForOneStrategy]] or [[akka.actor.AllForOneStrategy]], + * but if you do, please read the docs of the methods below carefully, as + * incorrect implementations may lead to “blocked” actor systems (i.e. + * permanently suspended actors). */ abstract class SupervisorStrategy { import SupervisorStrategy._ /** - * Returns the Decider that is associated with this SupervisorStrategy + * Returns the Decider that is associated with this SupervisorStrategy. + * The Decider is invoked by the default implementation of `handleFailure` + * to obtain the Directive to be applied. */ def decider: Decider /** * This method is called after the child has been removed from the set of children. + * It does not need to do anything special. Exceptions thrown from this method + * do NOT make the actor fail if this happens during termination. */ def handleChildTerminated(context: ActorContext, child: ActorRef, children: Iterable[ActorRef]): Unit @@ -229,27 +241,48 @@ abstract class SupervisorStrategy { */ def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit - //FIXME docs - def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = - if (children.nonEmpty) children.foreach(_.asInstanceOf[InternalActorRef].suspend()) - - //FIXME docs - def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = - if (children.nonEmpty) children.foreach(_.asInstanceOf[InternalActorRef].restart(cause)) - /** - * Returns whether it processed the failure or not + * This is the main entry point: in case of a child’s failure, this method + * must try to handle the failure by resuming, restarting or stopping the + * child (and returning `true`), or it returns `false` to escalate the + * failure, which will lead to this actor re-throwing the exception which + * caused the failure. The exception will not be wrapped. + * + * @param children is a lazy collection (a view) */ def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Boolean = { val directive = if (decider.isDefinedAt(cause)) decider(cause) else Escalate //FIXME applyOrElse in Scala 2.10 directive match { - case Resume ⇒ child.asInstanceOf[InternalActorRef].resume(); true + case Resume ⇒ resumeChild(child); true case Restart ⇒ processFailure(context, true, child, cause, stats, children); true case Stop ⇒ processFailure(context, false, child, cause, stats, children); true case Escalate ⇒ false } } + /** + * Resume the previously failed child: do never apply this to a child which + * is not the currently failing child. Suspend/resume needs to be done in + * matching pairs, otherwise actors will wake up too soon or never at all. + */ + final def resumeChild(child: ActorRef): Unit = child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = true) + + /** + * Restart the given child, possibly suspending it first. + * + * IMPORTANT: + * + * If the child is the currently failing one, it will already have been + * suspended, hence `suspendFirst` is false. If the child is not the + * currently failing one, then it did not request this treatment and is + * therefore not prepared to be resumed without prior suspend. + */ + final def restartChild(child: ActorRef, cause: Throwable, suspendFirst: Boolean): Unit = { + val c = child.asInstanceOf[InternalActorRef] + if (suspendFirst) c.suspend() + c.restart(cause) + } + } /** @@ -288,7 +321,7 @@ case class AllForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (children.nonEmpty) { if (restart && children.forall(_.requestRestartPermission(retriesWindow))) - children.foreach(_.child.asInstanceOf[InternalActorRef].restart(cause)) + children foreach (crs ⇒ restartChild(crs.child, cause, suspendFirst = (crs.child != child))) else for (c ← children) context.stop(c.child) } @@ -330,7 +363,7 @@ case class OneForOneStrategy(maxNrOfRetries: Int = -1, withinTimeRange: Duration def processFailure(context: ActorContext, restart: Boolean, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]): Unit = { if (restart && stats.requestRestartPermission(retriesWindow)) - child.asInstanceOf[InternalActorRef].restart(cause) + restartChild(child, cause, suspendFirst = false) else context.stop(child) //TODO optimization to drop child here already? } diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index ad9a7cb0c4..caad67503a 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -16,6 +16,7 @@ import akka.dispatch.MessageDispatcher import java.util.concurrent.locks.ReentrantLock import akka.event.Logging.Warning import scala.collection.mutable.Queue +import akka.actor.cell.ChildrenContainer /** * This actor ref starts out with some dummy cell (by default just enqueuing @@ -76,11 +77,11 @@ private[akka] class RepointableActorRef( * This is called by activate() to obtain the cell which is to replace the * unstarted cell. The cell must be fully functional. */ - def newCell(): Cell = new ActorCell(system, this, props, supervisor).start() + def newCell(): Cell = new ActorCell(system, this, props, supervisor).start(sendSupervise = false) def suspend(): Unit = underlying.suspend() - def resume(): Unit = underlying.resume() + def resume(inResponseToFailure: Boolean): Unit = underlying.resume(inResponseToFailure) def stop(): Unit = underlying.stop() @@ -102,7 +103,7 @@ private[akka] class RepointableActorRef( case ".." ⇒ getParent.getChild(name) case "" ⇒ getChild(name) case other ⇒ - underlying.childrenRefs.getByName(other) match { + underlying.getChildByName(other) match { case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) case None ⇒ Nobody } @@ -129,6 +130,7 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep // use Envelope to keep on-send checks in the same place val queue: Queue[Envelope] = Queue() val systemQueue: Queue[SystemMessage] = Queue() + var suspendCount = 0 def replaceWith(cell: Cell): Unit = { lock.lock() @@ -161,18 +163,21 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep if (interrupted) throw new InterruptedException } finally try self.swapCell(cell) + finally try + for (_ ← 1 to suspendCount) cell.suspend() finally lock.unlock() } def system: ActorSystem = systemImpl - def suspend(): Unit = {} - def resume(): Unit = {} - def restart(cause: Throwable): Unit = {} + def suspend(): Unit = { lock.lock(); try suspendCount += 1 finally lock.unlock() } + def resume(inResponseToFailure: Boolean): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() } + def restart(cause: Throwable): Unit = { lock.lock(); try suspendCount -= 1 finally lock.unlock() } def stop(): Unit = sendSystemMessage(Terminate()) def isTerminated: Boolean = false def parent: InternalActorRef = supervisor - def childrenRefs: ActorCell.ChildrenContainer = ActorCell.EmptyChildrenContainer + def childrenRefs: ChildrenContainer = ChildrenContainer.EmptyChildrenContainer + def getChildByName(name: String): Option[ChildRestartStats] = None def tell(message: Any, sender: ActorRef): Unit = { lock.lock() try { diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala new file mode 100644 index 0000000000..eea7ed7508 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -0,0 +1,188 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.cell + +import scala.annotation.tailrec +import scala.collection.JavaConverters.asJavaIterableConverter +import scala.util.control.NonFatal + +import akka.actor.{ RepointableRef, Props, NoSerializationVerificationNeeded, InvalidActorNameException, InternalActorRef, ChildRestartStats, ActorRef } +import akka.actor.ActorCell +import akka.actor.ActorPath.ElementRegex +import akka.serialization.SerializationExtension +import akka.util.{ Unsafe, Helpers } + +private[akka] trait Children { this: ActorCell ⇒ + + import ChildrenContainer._ + + @volatile + private var _childrenRefsDoNotCallMeDirectly: ChildrenContainer = EmptyChildrenContainer + + def childrenRefs: ChildrenContainer = + Unsafe.instance.getObjectVolatile(this, AbstractActorCell.childrenOffset).asInstanceOf[ChildrenContainer] + + final def children: Iterable[ActorRef] = childrenRefs.children + final def getChildren(): java.lang.Iterable[ActorRef] = children.asJava + + def actorOf(props: Props): ActorRef = makeChild(this, props, randomName(), async = false) + def actorOf(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = false) + private[akka] def attachChild(props: Props): ActorRef = makeChild(this, props, randomName(), async = true) + private[akka] def attachChild(props: Props, name: String): ActorRef = makeChild(this, props, checkName(name), async = true) + + @volatile private var _nextNameDoNotCallMeDirectly = 0L + final protected def randomName(): String = { + @tailrec def inc(): Long = { + val current = Unsafe.instance.getLongVolatile(this, AbstractActorCell.nextNameOffset) + if (Unsafe.instance.compareAndSwapLong(this, AbstractActorCell.nextNameOffset, current, current + 1)) current + else inc() + } + Helpers.base64(inc()) + } + + final def stop(actor: ActorRef): Unit = { + val started = actor match { + case r: RepointableRef ⇒ r.isStarted + case _ ⇒ true + } + if (childrenRefs.getByRef(actor).isDefined && started) shallDie(actor) + actor.asInstanceOf[InternalActorRef].stop() + } + + /* + * low level CAS helpers + */ + + @inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = + Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren) + + @tailrec final protected def reserveChild(name: String): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.reserve(name)) || reserveChild(name) + } + + @tailrec final protected def unreserveChild(name: String): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name) + } + + final protected def addChild(ref: ActorRef): Boolean = { + @tailrec def rec(): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.add(ref)) || rec() + } + /* + * This does not need to check getByRef every tailcall, because the change + * cannot happen in that direction as a race: the only entity removing a + * child is the actor itself, and the only entity which could be racing is + * somebody who calls attachChild, and there we are guaranteed that that + * child cannot yet have died (since it has not yet been created). + */ + if (childrenRefs.getByRef(ref).isEmpty) rec() else false + } + + @tailrec final protected def shallDie(ref: ActorRef): Boolean = { + val c = childrenRefs + swapChildrenRefs(c, c.shallDie(ref)) || shallDie(ref) + } + + @tailrec final private def removeChild(ref: ActorRef): ChildrenContainer = { + val c = childrenRefs + val n = c.remove(ref) + if (swapChildrenRefs(c, n)) n + else removeChild(ref) + } + + @tailrec final protected def setChildrenTerminationReason(reason: ChildrenContainer.SuspendReason): Boolean = { + childrenRefs match { + case c: ChildrenContainer.TerminatingChildrenContainer ⇒ + swapChildrenRefs(c, c.copy(reason = reason)) || setChildrenTerminationReason(reason) + case _ ⇒ false + } + } + + final protected def setTerminated(): Unit = Unsafe.instance.putObjectVolatile(this, AbstractActorCell.childrenOffset, TerminatedChildrenContainer) + + /* + * ActorCell-internal API + */ + + protected def isNormal = childrenRefs.isNormal + + protected def isTerminating = childrenRefs.isTerminating + + protected def suspendChildren(skip: Set[ActorRef] = Set.empty): Unit = + childrenRefs.stats foreach { + case ChildRestartStats(child, _, _) if !(skip contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend() + case _ ⇒ + } + + protected def resumeChildren(): Unit = + childrenRefs.stats foreach (_.child.asInstanceOf[InternalActorRef].resume(inResponseToFailure = false)) + + def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name) + + protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref) + + protected def getAllChildStats: Iterable[ChildRestartStats] = childrenRefs.stats + + protected def removeChildAndGetStateChange(child: ActorRef): Option[SuspendReason] = { + childrenRefs match { + case TerminatingChildrenContainer(_, _, reason) ⇒ + val newContainer = removeChild(child) + if (!newContainer.isInstanceOf[TerminatingChildrenContainer]) Some(reason) else None + case _ ⇒ + removeChild(child) + None + } + } + + /* + * Private helpers + */ + + private def checkName(name: String): String = { + name match { + case null ⇒ throw new InvalidActorNameException("actor name must not be null") + case "" ⇒ throw new InvalidActorNameException("actor name must not be empty") + case ElementRegex() ⇒ name + case _ ⇒ throw new InvalidActorNameException("illegal actor name '" + name + "', must conform to " + ElementRegex) + } + } + + private def makeChild(cell: ActorCell, props: Props, name: String, async: Boolean): ActorRef = { + if (cell.system.settings.SerializeAllCreators && !props.creator.isInstanceOf[NoSerializationVerificationNeeded]) { + val ser = SerializationExtension(cell.system) + ser.serialize(props.creator) match { + case Left(t) ⇒ throw t + case Right(bytes) ⇒ ser.deserialize(bytes, props.creator.getClass) match { + case Left(t) ⇒ throw t + case _ ⇒ //All good + } + } + } + /* + * in case we are currently terminating, fail external attachChild requests + * (internal calls cannot happen anyway because we are suspended) + */ + if (cell.childrenRefs.isTerminating) throw new IllegalStateException("cannot create children while terminating or terminated") + else { + reserveChild(name) + // this name will either be unreserved or overwritten with a real child below + val actor = + try { + cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name, + systemService = false, deploy = None, lookupDeploy = true, async = async) + } catch { + case NonFatal(e) ⇒ + unreserveChild(name) + throw e + } + addChild(actor) + actor + } + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala new file mode 100644 index 0000000000..98679862ba --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala @@ -0,0 +1,195 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.cell + +import scala.collection.immutable.TreeMap + +import akka.actor.{ InvalidActorNameException, ChildStats, ChildRestartStats, ChildNameReserved, ActorRef } + +/** + * INTERNAL API + */ +private[akka] trait ChildrenContainer { + + def add(child: ActorRef): ChildrenContainer + def remove(child: ActorRef): ChildrenContainer + + def getByName(name: String): Option[ChildRestartStats] + def getByRef(actor: ActorRef): Option[ChildRestartStats] + + def children: Iterable[ActorRef] + def stats: Iterable[ChildRestartStats] + + def shallDie(actor: ActorRef): ChildrenContainer + + // reserve that name or throw an exception + def reserve(name: String): ChildrenContainer + // cancel a reservation + def unreserve(name: String): ChildrenContainer + + def isTerminating: Boolean = false + def isNormal: Boolean = true +} + +/** + * INTERNAL API + * + * This object holds the classes performing the logic of managing the children + * of an actor, hence they are intimately tied to ActorCell. + */ +private[akka] object ChildrenContainer { + + sealed trait SuspendReason + case object UserRequest extends SuspendReason + case class Recreation(cause: Throwable) extends SuspendReason + case object Termination extends SuspendReason + + trait EmptyChildrenContainer extends ChildrenContainer { + val emptyStats = TreeMap.empty[String, ChildStats] + override def add(child: ActorRef): ChildrenContainer = + new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child))) + override def remove(child: ActorRef): ChildrenContainer = this + override def getByName(name: String): Option[ChildRestartStats] = None + override def getByRef(actor: ActorRef): Option[ChildRestartStats] = None + override def children: Iterable[ActorRef] = Nil + override def stats: Iterable[ChildRestartStats] = Nil + override def shallDie(actor: ActorRef): ChildrenContainer = this + override def reserve(name: String): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, ChildNameReserved)) + override def unreserve(name: String): ChildrenContainer = this + } + + /** + * This is the empty container, shared among all leaf actors. + */ + object EmptyChildrenContainer extends EmptyChildrenContainer { + override def toString = "no children" + } + + /** + * This is the empty container which is installed after the last child has + * terminated while stopping; it is necessary to distinguish from the normal + * empty state while calling handleChildTerminated() for the last time. + */ + object TerminatedChildrenContainer extends EmptyChildrenContainer { + override def add(child: ActorRef): ChildrenContainer = this + override def reserve(name: String): ChildrenContainer = + throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated") + override def isTerminating: Boolean = true + override def isNormal: Boolean = false + override def toString = "terminated" + } + + /** + * Normal children container: we do have at least one child, but none of our + * children are currently terminating (which is the time period between + * calling context.stop(child) and processing the ChildTerminated() system + * message). + */ + class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer { + + override def add(child: ActorRef): ChildrenContainer = + new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child))) + + override def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name) + + override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { + case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] + case _ ⇒ None + } + + override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { + case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] + case _ ⇒ None + } + + override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } + + override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats ⇒ c } + + override def shallDie(actor: ActorRef): ChildrenContainer = TerminatingChildrenContainer(c, Set(actor), UserRequest) + + override def reserve(name: String): ChildrenContainer = + if (c contains name) + throw new InvalidActorNameException("actor name " + name + " is not unique!") + else new NormalChildrenContainer(c.updated(name, ChildNameReserved)) + + override def unreserve(name: String): ChildrenContainer = c.get(name) match { + case Some(ChildNameReserved) ⇒ NormalChildrenContainer(c - name) + case _ ⇒ this + } + + override def toString = + if (c.size > 20) c.size + " children" + else c.mkString("children:\n ", "\n ", "") + } + + object NormalChildrenContainer { + def apply(c: TreeMap[String, ChildStats]): ChildrenContainer = + if (c.isEmpty) EmptyChildrenContainer + else new NormalChildrenContainer(c) + } + + /** + * Waiting state: there are outstanding termination requests (i.e. context.stop(child) + * was called but the corresponding ChildTerminated() system message has not yet been + * processed). There could be no specific reason (UserRequested), we could be Restarting + * or Terminating. + * + * Removing the last child which was supposed to be terminating will return a different + * type of container, depending on whether or not children are left and whether or not + * the reason was “Terminating”. + */ + case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason) + extends ChildrenContainer { + + override def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child))) + + override def remove(child: ActorRef): ChildrenContainer = { + val t = toDie - child + if (t.isEmpty) reason match { + case Termination ⇒ TerminatedChildrenContainer + case _ ⇒ NormalChildrenContainer(c - child.path.name) + } + else copy(c - child.path.name, t) + } + + override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { + case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] + case _ ⇒ None + } + + override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { + case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] + case _ ⇒ None + } + + override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } + + override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats ⇒ c } + + override def shallDie(actor: ActorRef): ChildrenContainer = copy(toDie = toDie + actor) + + override def reserve(name: String): ChildrenContainer = reason match { + case Termination ⇒ throw new IllegalStateException("cannot reserve actor name '" + name + "': terminating") + case _ ⇒ + if (c contains name) + throw new InvalidActorNameException("actor name " + name + " is not unique!") + else copy(c = c.updated(name, ChildNameReserved)) + } + + override def unreserve(name: String): ChildrenContainer = c.get(name) match { + case Some(ChildNameReserved) ⇒ copy(c = c - name) + case _ ⇒ this + } + + override def isTerminating: Boolean = reason == Termination + override def isNormal: Boolean = reason == UserRequest + + override def toString = + if (c.size > 20) c.size + " children" + else c.mkString("children (" + toDie.size + " terminating):\n ", "\n ", "\n") + toDie + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala new file mode 100644 index 0000000000..031019f3f6 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/DeathWatch.scala @@ -0,0 +1,95 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.cell + +import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorCell, Actor } +import akka.dispatch.{ Watch, Unwatch } +import akka.event.Logging.{ Warning, Error, Debug } +import scala.util.control.NonFatal + +private[akka] trait DeathWatch { this: ActorCell ⇒ + + private var watching: Set[ActorRef] = ActorCell.emptyActorRefSet + private var watchedBy: Set[ActorRef] = ActorCell.emptyActorRefSet + + override final def watch(subject: ActorRef): ActorRef = subject match { + case a: InternalActorRef ⇒ + if (a != self && !watching.contains(a)) { + a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching += a + } + a + } + + override final def unwatch(subject: ActorRef): ActorRef = subject match { + case a: InternalActorRef ⇒ + if (a != self && watching.contains(a)) { + a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + watching -= a + } + a + } + + protected def watchedActorTerminated(ref: ActorRef): Unit = watching -= ref + + protected def tellWatchersWeDied(actor: Actor): Unit = { + if (!watchedBy.isEmpty) { + val terminated = Terminated(self)(existenceConfirmed = true) + try { + watchedBy foreach { + watcher ⇒ + try watcher.tell(terminated, self) catch { + case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(actor), "deathwatch")) + } + } + } finally watchedBy = ActorCell.emptyActorRefSet + } + } + + protected def unwatchWatchedActors(actor: Actor): Unit = { + if (!watching.isEmpty) { + try { + watching foreach { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + case watchee: InternalActorRef ⇒ try watchee.sendSystemMessage(Unwatch(watchee, self)) catch { + case NonFatal(t) ⇒ publish(Error(t, self.path.toString, clazz(actor), "deathwatch")) + } + } + } finally watching = ActorCell.emptyActorRefSet + } + } + + protected def addWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { + val watcheeSelf = watchee == self + val watcherSelf = watcher == self + + if (watcheeSelf && !watcherSelf) { + if (!watchedBy.contains(watcher)) { + watchedBy += watcher + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now monitoring " + watcher)) + } + } else if (!watcheeSelf && watcherSelf) { + watch(watchee) + } else { + publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Watch(%s,%s) for %s".format(watchee, watcher, self))) + } + } + + protected def remWatcher(watchee: ActorRef, watcher: ActorRef): Unit = { + val watcheeSelf = watchee == self + val watcherSelf = watcher == self + + if (watcheeSelf && !watcherSelf) { + if (watchedBy.contains(watcher)) { + watchedBy -= watcher + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopped monitoring " + watcher)) + } + } else if (!watcheeSelf && watcherSelf) { + unwatch(watchee) + } else { + publish(Warning(self.path.toString, clazz(actor), "BUG: illegal Unwatch(%s,%s) for %s".format(watchee, watcher, self))) + } + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala new file mode 100644 index 0000000000..8c849366d8 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/Dispatch.scala @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.cell + +import scala.annotation.tailrec + +import akka.actor.{ ActorRef, ActorCell } +import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create } +import akka.util.Unsafe + +private[akka] trait Dispatch { this: ActorCell ⇒ + + @volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status + + @inline final def mailbox: Mailbox = Unsafe.instance.getObjectVolatile(this, AbstractActorCell.mailboxOffset).asInstanceOf[Mailbox] + + @tailrec final def swapMailbox(newMailbox: Mailbox): Mailbox = { + val oldMailbox = mailbox + if (!Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.mailboxOffset, oldMailbox, newMailbox)) swapMailbox(newMailbox) + else oldMailbox + } + + final def hasMessages: Boolean = mailbox.hasMessages + + final def numberOfMessages: Int = mailbox.numberOfMessages + + val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) + + /** + * UntypedActorContext impl + */ + final def getDispatcher(): MessageDispatcher = dispatcher + + final def isTerminated: Boolean = mailbox.isClosed + + final def start(sendSupervise: Boolean): this.type = { + + /* + * Create the mailbox and enqueue the Create() message to ensure that + * this is processed before anything else. + */ + swapMailbox(dispatcher.createMailbox(this)) + mailbox.setActor(this) + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + mailbox.systemEnqueue(self, Create()) + + if (sendSupervise) { + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + parent.sendSystemMessage(akka.dispatch.Supervise(self)) + } + + // This call is expected to start off the actor by scheduling its mailbox. + dispatcher.attach(this) + + this + } + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + final def resume(inResponseToFailure: Boolean): Unit = dispatcher.systemDispatch(this, Resume(inResponseToFailure)) + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) + + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) + + def tell(message: Any, sender: ActorRef): Unit = + dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) + + override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message) + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala new file mode 100644 index 0000000000..32786b7bb7 --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -0,0 +1,209 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.cell + +import scala.annotation.tailrec + +import akka.actor.{ PreRestartException, PostRestartException, InternalActorRef, Failed, ActorRef, ActorInterruptedException, ActorCell, Actor } +import akka.dispatch.{ Envelope, ChildTerminated } +import akka.event.Logging.{ Warning, Error, Debug } +import scala.util.control.NonFatal + +private[akka] trait FaultHandling { this: ActorCell ⇒ + + /* ================= + * T H E R U L E S + * ================= + * + * Actors can be suspended for two reasons: + * - they fail + * - their supervisor gets suspended + * + * In particular they are not suspended multiple times because of cascading + * own failures, i.e. while currentlyFailed() they do not fail again. In case + * of a restart, failures in constructor/preStart count as new failures. + */ + + private def suspendNonRecursive(): Unit = dispatcher suspend this + + private def resumeNonRecursive(): Unit = dispatcher resume this + + /* + * have we told our supervisor that we Failed() and have not yet heard back? + * (actually: we might have heard back but not yet acted upon it, in case of + * a restart with dying children) + * might well be replaced by ref to a Cancellable in the future (see #2299) + */ + private var _failed = false + private def isFailed: Boolean = _failed + private def setFailed(): Unit = _failed = true + private def clearFailed(): Unit = _failed = false + + /** + * Do re-create the actor in response to a failure. + */ + protected def faultRecreate(cause: Throwable): Unit = + if (isNormal) { + val failedActor = actor + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(failedActor), "restarting")) + if (failedActor ne null) { + try { + // if the actor fails in preRestart, we can do nothing but log it: it’s best-effort + if (failedActor.context ne null) failedActor.preRestart(cause, Option(currentMessage)) + } catch { + case NonFatal(e) ⇒ + val ex = new PreRestartException(self, e, cause, Option(currentMessage)) + publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage)) + } finally { + clearActorFields(failedActor) + } + } + assert(mailbox.isSuspended, "mailbox must be suspended during restart, status=" + mailbox.status) + if (!setChildrenTerminationReason(ChildrenContainer.Recreation(cause))) finishRecreate(cause, failedActor) + } else { + // need to keep that suspend counter balanced + faultResume(inResponseToFailure = false) + } + + /** + * Do suspend the actor in response to a failure of a parent (i.e. the + * “recursive suspend” feature). + */ + protected def faultSuspend(): Unit = { + // done always to keep that suspend counter balanced + suspendNonRecursive() + suspendChildren() + } + + /** + * Do resume the actor in response to a failure. + * + * @param inResponseToFailure signifies if it was our own failure which + * prompted this action. + */ + protected def faultResume(inResponseToFailure: Boolean): Unit = { + // done always to keep that suspend counter balanced + // must happen “atomically” + try resumeNonRecursive() + finally if (inResponseToFailure) clearFailed() + resumeChildren() + } + + protected def terminate() { + setReceiveTimeout(None) + cancelReceiveTimeout + + // stop all children, which will turn childrenRefs into TerminatingChildrenContainer (if there are children) + children foreach stop + + val wasTerminating = isTerminating + + if (setChildrenTerminationReason(ChildrenContainer.Termination)) { + if (!wasTerminating) { + // do not process normal messages while waiting for all children to terminate + suspendNonRecursive() + // do not propagate failures during shutdown to the supervisor + setFailed() + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "stopping")) + } + } else { + setTerminated() + finishTerminate() + } + } + + final def handleInvokeFailure(t: Throwable, message: String): Unit = { + publish(Error(t, self.path.toString, clazz(actor), message)) + // prevent any further messages to be processed until the actor has been restarted + if (!isFailed) try { + suspendNonRecursive() + setFailed() + // suspend children + val skip: Set[ActorRef] = currentMessage match { + case Envelope(Failed(`t`), child) ⇒ Set(child) + case _ ⇒ Set.empty + } + suspendChildren(skip) + // tell supervisor + t match { // Wrap InterruptedExceptions and rethrow + case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t)), self); throw t + case _ ⇒ parent.tell(Failed(t), self) + } + } catch { + case NonFatal(e) ⇒ + publish(Error(e, self.path.toString, clazz(actor), "emergency stop: exception in failure handling")) + try children foreach stop + finally finishTerminate() + } + } + + private def finishTerminate() { + val a = actor + try if (a ne null) a.postStop() + finally try dispatcher.detach(this) + finally try parent.sendSystemMessage(ChildTerminated(self)) + finally try tellWatchersWeDied(a) + finally try unwatchWatchedActors(a) + finally { + if (system.settings.DebugLifecycle) + publish(Debug(self.path.toString, clazz(a), "stopped")) + clearActorFields(a) + actor = null + } + } + + private def finishRecreate(cause: Throwable, failedActor: Actor): Unit = try { + try resumeNonRecursive() + finally clearFailed() // must happen in any case, so that failure is propagated + + // need to keep a snapshot of the surviving children before the new actor instance creates new ones + val survivors = children + + val freshActor = newActor() + actor = freshActor // this must happen before postRestart has a chance to fail + if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields. + + freshActor.postRestart(cause) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted")) + + // only after parent is up and running again do restart the children which were not stopped + survivors foreach (child ⇒ + try child.asInstanceOf[InternalActorRef].restart(cause) + catch { + case NonFatal(e) ⇒ publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child)) + }) + } catch { + case NonFatal(e) ⇒ + clearActorFields(actor) // in order to prevent preRestart() from happening again + handleInvokeFailure(new PostRestartException(self, e, cause), e.getMessage) + } + + final protected def handleFailure(child: ActorRef, cause: Throwable): Unit = getChildByRef(child) match { + case Some(stats) ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause + case None ⇒ publish(Warning(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) + } + + final protected def handleChildTerminated(child: ActorRef): Unit = { + val status = removeChildAndGetStateChange(child) + /* + * if this fails, we do nothing in case of terminating/restarting state, + * otherwise tell the supervisor etc. (in that second case, the match + * below will hit the empty default case, too) + */ + try actor.supervisorStrategy.handleChildTerminated(this, child, children) + catch { + case NonFatal(e) ⇒ handleInvokeFailure(e, "handleChildTerminated failed") + } + /* + * if the removal changed the state of the (terminating) children container, + * then we are continuing the previously suspended recreate/terminate action + */ + status match { + case Some(ChildrenContainer.Recreation(cause)) ⇒ finishRecreate(cause, actor) + case Some(ChildrenContainer.Termination) ⇒ finishTerminate() + case _ ⇒ + } + } +} diff --git a/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala b/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala new file mode 100644 index 0000000000..4fd46413ce --- /dev/null +++ b/akka-actor/src/main/scala/akka/actor/cell/ReceiveTimeout.scala @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ + +package akka.actor.cell + +import ReceiveTimeout.emptyReceiveTimeoutData +import akka.actor.ActorCell +import akka.actor.ActorCell.emptyCancellable +import akka.actor.Cancellable +import scala.concurrent.util.Duration + +private[akka] object ReceiveTimeout { + final val emptyReceiveTimeoutData: (Duration, Cancellable) = (Duration.Undefined, ActorCell.emptyCancellable) +} + +private[akka] trait ReceiveTimeout { this: ActorCell ⇒ + + import ReceiveTimeout._ + import ActorCell._ + + private var receiveTimeoutData: (Duration, Cancellable) = emptyReceiveTimeoutData + + final def receiveTimeout: Option[Duration] = receiveTimeoutData._1 match { + case Duration.Undefined ⇒ None + case duration ⇒ Some(duration) + } + + final def setReceiveTimeout(timeout: Option[Duration]): Unit = setReceiveTimeout(timeout.getOrElse(Duration.Undefined)) + + final def setReceiveTimeout(timeout: Duration): Unit = + receiveTimeoutData = ( + if (Duration.Undefined == timeout || timeout.toMillis < 1) Duration.Undefined else timeout, + receiveTimeoutData._2) + + final def resetReceiveTimeout(): Unit = setReceiveTimeout(None) + + final def checkReceiveTimeout() { + val recvtimeout = receiveTimeoutData + if (Duration.Undefined != recvtimeout._1 && !mailbox.hasMessages) { + recvtimeout._2.cancel() //Cancel any ongoing future + //Only reschedule if desired and there are currently no more messages to be processed + receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(recvtimeout._1, self, akka.actor.ReceiveTimeout)) + } else cancelReceiveTimeout() + + } + + final def cancelReceiveTimeout(): Unit = + if (receiveTimeoutData._2 ne emptyCancellable) { + receiveTimeoutData._2.cancel() + receiveTimeoutData = (receiveTimeoutData._1, emptyCancellable) + } + +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 9a3aacba4b..fff56a3776 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -90,7 +90,7 @@ private[akka] case class Suspend() extends SystemMessage // sent to self from Ac /** * INTERNAL API */ -private[akka] case class Resume() extends SystemMessage // sent to self from ActorCell.resume +private[akka] case class Resume(inResponseToFailure: Boolean) extends SystemMessage // sent to self from ActorCell.resume /** * INTERNAL API */ @@ -306,7 +306,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext def suspend(actor: ActorCell): Unit = { val mbox = actor.mailbox if ((mbox.actor eq actor) && (mbox.dispatcher eq this)) - mbox.becomeSuspended() + mbox.suspend() } /* @@ -314,7 +314,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext */ def resume(actor: ActorCell): Unit = { val mbox = actor.mailbox - if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.becomeOpen()) + if ((mbox.actor eq actor) && (mbox.dispatcher eq this) && mbox.resume()) registerForExecution(mbox, false, false) } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 1affa34a07..baf7b682c2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -25,12 +25,16 @@ private[akka] object Mailbox { * the following assigned numbers CANNOT be changed without looking at the code which uses them! */ - // primary status: only first three + // primary status final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant - final val Suspended = 1 // Deliberately without type ascription to make it a compile-time constant - final val Closed = 2 // Deliberately without type ascription to make it a compile-time constant + final val Closed = 1 // Deliberately without type ascription to make it a compile-time constant // secondary status: Scheduled bit may be added to Open/Suspended - final val Scheduled = 4 // Deliberately without type ascription to make it a compile-time constant + final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant + // shifted by 2: the suspend count! + final val shouldScheduleMask = 3 + final val shouldNotProcessMask = ~2 + final val suspendMask = ~3 + final val suspendUnit = 4 // mailbox debugging helper using println (see below) // since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1) @@ -101,10 +105,10 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) final def status: Mailbox.Status = Unsafe.instance.getIntVolatile(this, AbstractMailbox.mailboxStatusOffset) @inline - final def shouldProcessMessage: Boolean = (status & 3) == Open + final def shouldProcessMessage: Boolean = (status & shouldNotProcessMask) == 0 @inline - final def isSuspended: Boolean = (status & 3) == Suspended + final def isSuspended: Boolean = (status & suspendMask) != 0 @inline final def isClosed: Boolean = status == Closed @@ -121,23 +125,32 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) Unsafe.instance.putIntVolatile(this, AbstractMailbox.mailboxStatusOffset, newStatus) /** - * set new primary status Open. Caller does not need to worry about whether + * Reduce the suspend count by one. Caller does not need to worry about whether * status was Scheduled or not. + * + * @returns true if the suspend count reached zero */ @tailrec - final def becomeOpen(): Boolean = status match { + final def resume(): Boolean = status match { case Closed ⇒ setStatus(Closed); false - case s ⇒ updateStatus(s, Open | s & Scheduled) || becomeOpen() + case s ⇒ + val next = if (s < suspendUnit) s else s - suspendUnit + if (updateStatus(s, next)) next < suspendUnit + else resume() } /** - * set new primary status Suspended. Caller does not need to worry about whether + * Increment the suspend count by one. Caller does not need to worry about whether * status was Scheduled or not. + * + * @returns true if the previous suspend count was zero */ @tailrec - final def becomeSuspended(): Boolean = status match { + final def suspend(): Boolean = status match { case Closed ⇒ setStatus(Closed); false - case s ⇒ updateStatus(s, Suspended | s & Scheduled) || becomeSuspended() + case s ⇒ + if (updateStatus(s, s + suspendUnit)) s < suspendUnit + else suspend() } /** @@ -158,11 +171,10 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) val s = status /* * only try to add Scheduled bit if pure Open/Suspended, not Closed or with - * Scheduled bit already set (this is one of the reasons why the numbers - * cannot be changed in object Mailbox above) + * Scheduled bit already set */ - if (s <= Suspended) updateStatus(s, s | Scheduled) || setAsScheduled() - else false + if ((s & shouldScheduleMask) != Open) false + else updateStatus(s, s | Scheduled) || setAsScheduled() } /** @@ -171,12 +183,6 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) @tailrec final def setAsIdle(): Boolean = { val s = status - /* - * only try to remove Scheduled bit if currently Scheduled, not Closed or - * without Scheduled bit set (this is one of the reasons why the numbers - * cannot be changed in object Mailbox above) - */ - updateStatus(s, s & ~Scheduled) || setAsIdle() } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 4667da129a..cca90d6543 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -73,7 +73,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo if (routerConfig.resizer.isEmpty && _routees.isEmpty) throw new ActorInitializationException("router " + routerConfig + " did not register routees!") - start() + start(sendSupervise = false) /* * end of construction diff --git a/akka-agent/src/main/scala/akka/agent/Agent.scala b/akka-agent/src/main/scala/akka/agent/Agent.scala index cb2150d1c5..bd77c1d9ff 100644 --- a/akka-agent/src/main/scala/akka/agent/Agent.scala +++ b/akka-agent/src/main/scala/akka/agent/Agent.scala @@ -214,7 +214,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { /** * Resumes processing of `send` actions for the agent. */ - def resume(): Unit = updater.resume() + def resume(): Unit = updater.resume(inResponseToFailure = false) /** * Closes the agents and makes it eligible for garbage collection. diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index eb8fb7a204..cff9dda274 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -235,7 +235,7 @@ private[akka] class RemoteActorRef private[akka] ( def suspend(): Unit = sendSystemMessage(Suspend()) - def resume(): Unit = sendSystemMessage(Resume()) + def resume(inResponseToFailure: Boolean): Unit = sendSystemMessage(Resume(inResponseToFailure)) def stop(): Unit = sendSystemMessage(Terminate()) diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index c0143498e3..26efaef84d 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -154,7 +154,7 @@ class CallingThreadDispatcher( override def suspend(actor: ActorCell) { actor.mailbox match { - case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn + case m: CallingThreadMailbox ⇒ m.suspendSwitch.switchOn; m.suspend() case m ⇒ m.systemEnqueue(actor.self, Suspend()) } } @@ -166,11 +166,12 @@ class CallingThreadDispatcher( val wasActive = queue.isActive val switched = mbox.suspendSwitch.switchOff { CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(mbox, queue) + mbox.resume() } if (switched && !wasActive) { runQueue(mbox, queue) } - case m ⇒ m.systemEnqueue(actor.self, Resume()) + case m ⇒ m.systemEnqueue(actor.self, Resume(false)) } } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 215c4006e9..67478b35e3 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -15,6 +15,7 @@ import com.typesafe.config.{ Config, ConfigFactory } import java.util.concurrent.TimeoutException import akka.dispatch.{ MessageDispatcher, Dispatchers } import akka.pattern.ask +import akka.actor.ActorSystemImpl object TimingTest extends Tag("timing") object LongRunningTest extends Tag("long-running") @@ -76,7 +77,9 @@ abstract class AkkaSpec(_system: ActorSystem) beforeShutdown() system.shutdown() try system.awaitTermination(5 seconds) catch { - case _: TimeoutException ⇒ system.log.warning("Failed to stop [{}] within 5 seconds", system.name) + case _: TimeoutException ⇒ + system.log.warning("Failed to stop [{}] within 5 seconds", system.name) + println(system.asInstanceOf[ActorSystemImpl].printTree) } atTermination() }