diff --git a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala index 545901d3f3..159d611c28 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RestartStrategySpec.scala @@ -12,13 +12,13 @@ import java.util.concurrent.{ TimeUnit, CountDownLatch } import org.multiverse.api.latches.StandardLatch import akka.testkit.AkkaSpec -class RestartStrategySpec extends AkkaSpec with BeforeAndAfterAll { +class RestartStrategySpec extends AkkaSpec { - override def beforeAll() { + override def atStartup() { app.eventHandler.notify(Mute(EventFilter[Exception]("Crashing..."))) } - override def afterAll() { + override def atTermination() { app.eventHandler.notify(UnMuteAll) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index b802583340..24c104debe 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -63,7 +63,7 @@ object SupervisorSpec { } } -class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfterAll { +class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach { import SupervisorSpec._ @@ -118,13 +118,13 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte (pingpong1, pingpong2, pingpong3, topSupervisor) } - override def beforeAll() = { + override def atStartup() = { app.eventHandler notify Mute(EventFilter[Exception]("Die"), EventFilter[IllegalStateException]("Don't wanna!"), EventFilter[RuntimeException]("Expected")) } - override def afterAll() = { + override def atTermination() = { app.eventHandler notify UnMuteAll } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala index 85cbda16a2..897ddc06b0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorTreeSpec.scala @@ -20,7 +20,12 @@ class SupervisorTreeSpec extends AkkaSpec with ImplicitSender { "be able to kill the middle actor and see itself and its child restarted" in { filterException[ActorKilledException] { within(5 seconds) { - val p = Props[Supervisor].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000)) + val p = Props(new Actor { + def receive = { + case p: Props ⇒ this reply context.actorOf(p) + } + override def preRestart(cause: Throwable, msg: Option[Any]) { testActor ! self.address } + }).withFaultHandler(OneForOneStrategy(List(classOf[Exception]), 3, 1000)) val headActor = actorOf(p) val middleActor = (headActor ? p).as[ActorRef].get val lastActor = (middleActor ? p).as[ActorRef].get diff --git a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala index 6620aeebf3..b1f56963b5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/Ticket669Spec.scala @@ -13,7 +13,10 @@ import akka.testkit.ImplicitSender class Ticket669Spec extends AkkaSpec with BeforeAndAfterAll with ImplicitSender { import Ticket669Spec._ - override def beforeAll = Thread.interrupted() //remove interrupted status. + // TODO: does this really make sense? + override def atStartup() { + Thread.interrupted() //remove interrupted status. + } "A supervised actor with lifecycle PERMANENT" should { "be able to reply on failure during preRestart" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index b8a672aaf1..c9efc0a0fd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -14,6 +14,7 @@ import org.junit.{ After, Test } import akka.actor._ import util.control.NoStackTrace import akka.AkkaApplication +import akka.util.duration._ object ActorModelSpec { @@ -346,20 +347,6 @@ abstract class ActorModelSpec extends AkkaSpec { assertRefDefaultZero(b)(registers = 1, unregisters = 1, msgsReceived = 1, msgsProcessed = 1) } - "suspend and resume a failing non supervised permanent actor" in { - filterEvents(EventFilter[Exception]("Restart")) { - implicit val dispatcher = newInterceptedDispatcher - val a = newTestActor(dispatcher) - val done = new CountDownLatch(1) - a ! Restart - a ! CountDown(done) - assertCountDown(done, Testing.testTime(3000), "Should be suspended+resumed and done with next message within 3 seconds") - a.stop() - assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 2, - msgsProcessed = 2, suspensions = 1, resumes = 1) - } - } - "not process messages for a suspended actor" in { implicit val dispatcher = newInterceptedDispatcher val a = newTestActor(dispatcher).asInstanceOf[LocalActorRef] @@ -380,13 +367,15 @@ abstract class ActorModelSpec extends AkkaSpec { } "handle waves of actors" in { - implicit val dispatcher = newInterceptedDispatcher + val dispatcher = newInterceptedDispatcher + val props = Props[DispatcherActor].withDispatcher(dispatcher) def flood(num: Int) { val cachedMessage = CountDownNStop(new CountDownLatch(num)) - (1 to num) foreach { _ ⇒ - newTestActor(dispatcher) ! cachedMessage - } + val boss = actorOf(Props(context ⇒ { + case "run" ⇒ + for (_ ← 1 to num) context.actorOf(props) ! cachedMessage + })) ! "run" try { assertCountDown(cachedMessage.latch, Testing.testTime(10000), "Should process " + num + " countdowns") } catch { @@ -421,8 +410,9 @@ abstract class ActorModelSpec extends AkkaSpec { } "continue to process messages when a thread gets interrupted" in { - filterEvents(EventFilter[InterruptedException]("Ping!"), EventFilter[akka.event.EventHandler.EventHandlerException]) { + filterEvents(EventFilter[InterruptedException], EventFilter[akka.event.EventHandler.EventHandlerException]) { implicit val dispatcher = newInterceptedDispatcher + implicit val timeout = Timeout(5 seconds) val a = newTestActor(dispatcher) val f1 = a ? Reply("foo") val f2 = a ? Reply("bar") @@ -433,11 +423,11 @@ abstract class ActorModelSpec extends AkkaSpec { assert(f1.get === "foo") assert(f2.get === "bar") - assert((intercept[InterruptedException] { + assert((intercept[ActorInterruptedException] { f3.get }).getMessage === "Ping!") assert(f4.get === "foo2") - assert((intercept[InterruptedException] { + assert((intercept[ActorInterruptedException] { f5.get }).getMessage === "Ping!") assert(f6.get === "bar2") diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index 7a35872c90..129e9a8aac 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -7,14 +7,14 @@ import akka.config._ import akka.actor._ import java.net.InetAddress import com.eaio.uuid.UUID -import dispatch.{ Dispatcher, Dispatchers } +import akka.dispatch.{ Dispatcher, Dispatchers, Future, DefaultPromise } import akka.util.Duration -import util.ReflectiveAccess +import akka.util.ReflectiveAccess import java.util.concurrent.TimeUnit import akka.dispatch.BoundedMailbox import akka.dispatch.UnboundedMailbox import akka.routing.Routing -import remote.RemoteSupport +import akka.remote.RemoteSupport import akka.serialization.Serialization import akka.event.EventHandler import akka.event.EventHandlerLogging @@ -71,6 +71,10 @@ object AkkaApplication { def apply(): AkkaApplication = new AkkaApplication() + sealed trait ExitStatus + case object Stopped extends ExitStatus + case class Failed(cause: Throwable) extends ExitStatus + } class AkkaApplication(val name: String, val config: Configuration) extends ActorRefFactory { @@ -160,9 +164,12 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher + def terminationFuture: Future[ExitStatus] = provider.terminationFuture + + // TODO think about memory consistency effects when doing funky stuff inside constructor val reflective = new ReflectiveAccess(this) - // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor + // TODO think about memory consistency effects when doing funky stuff inside constructor val provider: ActorRefProvider = reflective.createProvider // TODO make this configurable @@ -178,18 +185,29 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor true) } + // TODO think about memory consistency effects when doing funky stuff inside constructor val eventHandler = new EventHandler(this) + // TODO think about memory consistency effects when doing funky stuff inside constructor val log: Logging = new EventHandlerLogging(eventHandler, this) - // TODO think about memory consistency effects when doing funky stuff inside an ActorRefProvider's constructor + // TODO think about memory consistency effects when doing funky stuff inside constructor val deployer = new Deployer(this) val deathWatch = provider.createDeathWatch() + // TODO think about memory consistency effects when doing funky stuff inside constructor val typedActor = new TypedActor(this) + // TODO think about memory consistency effects when doing funky stuff inside constructor val serialization = new Serialization(this) val scheduler = new DefaultScheduler + terminationFuture.onComplete(_ ⇒ scheduler.shutdown()) + + // TODO shutdown all that other stuff, whatever that may be + def stop(): Unit = { + guardian.stop() + } + } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d80d1285c7..d3ef4a04c9 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -99,6 +99,9 @@ case class DeathPactException private[akka] (dead: ActorRef, cause: Throwable) extends AkkaException("monitored actor " + dead + " terminated", cause) with NoStackTrace +// must not pass InterruptedException to other threads +case class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) with NoStackTrace + /** * This message is thrown by default when an Actors behavior doesn't match a message */ diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 3afde89403..586f2c9d81 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -7,7 +7,7 @@ package akka.actor import akka.dispatch._ import akka.util._ import scala.annotation.tailrec -import scala.collection.immutable.Stack +import scala.collection.immutable.{ Stack, TreeMap } import scala.collection.JavaConverters import java.util.concurrent.{ ScheduledFuture, TimeUnit } import java.util.{ Collection ⇒ JCollection, Collections ⇒ JCollections } @@ -76,7 +76,7 @@ private[akka] class ActorCell( var futureTimeout: Option[ScheduledFuture[AnyRef]] = None //FIXME TODO Doesn't need to be volatile either, since it will only ever be accessed when a message is processed - var _children: Vector[ChildRestartStats] = Vector.empty + var _children = TreeMap[ActorRef, ChildRestartStats]() var currentMessage: Envelope = null @@ -122,7 +122,7 @@ private[akka] class ActorCell( subject } - def children: Iterable[ActorRef] = _children.map(_.child) + def children: Iterable[ActorRef] = _children.keys def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = dispatcher dispatch Envelope(this, message, channel) @@ -211,7 +211,7 @@ private[akka] class ActorCell( dispatcher.resume(this) //FIXME should this be moved down? - props.faultHandler.handleSupervisorRestarted(cause, self, _children) + props.faultHandler.handleSupervisorRestarted(cause, self, children) } catch { case e ⇒ try { app.eventHandler.error(e, self, "error while creating actor") @@ -239,14 +239,15 @@ private[akka] class ActorCell( if (a ne null) a.postStop() } finally { //Stop supervised actors - val links = _children - if (links.nonEmpty) { - _children = Vector.empty - links.foreach(_.child.stop()) + val c = children + if (c.nonEmpty) { + _children = TreeMap.empty + for (child ← c) child.stop() } } } finally { try { + // when changing this, remember to update the match in the BubbleWalker val cause = new ActorKilledException("Stopped") //FIXME TODO make this an object, can be reused everywhere supervisor ! ChildTerminated(self, cause) app.deathWatch.publish(Terminated(self, cause)) @@ -259,8 +260,8 @@ private[akka] class ActorCell( def supervise(child: ActorRef): Unit = { val links = _children - if (!links.exists(_.child == child)) { - _children = links :+ ChildRestartStats(child) + if (!links.contains(child)) { + _children = _children.updated(child, ChildRestartStats()) if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "now supervising " + child) } else app.eventHandler.warning(self, "Already supervising " + child) } @@ -309,12 +310,18 @@ private[akka] class ActorCell( // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) - channel.sendException(e) - - props.faultHandler.handleSupervisorFailing(self, _children) - supervisor ! Failed(self, e) - - if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected + // make sure that InterruptedException does not leave this thread + if (e.isInstanceOf[InterruptedException]) { + val ex = ActorInterruptedException(e) + channel.sendException(ex) + props.faultHandler.handleSupervisorFailing(self, children) + supervisor ! Failed(self, ex) + throw e //Re-throw InterruptedExceptions as expected + } else { + channel.sendException(e) + props.faultHandler.handleSupervisorFailing(self, children) + supervisor ! Failed(self, e) + } } finally { checkReceiveTimeout // Reschedule receive timeout } @@ -330,11 +337,15 @@ private[akka] class ActorCell( } } - def handleFailure(fail: Failed): Unit = if (!props.faultHandler.handleFailure(fail, _children)) { - throw fail.cause + def handleFailure(fail: Failed): Unit = _children.get(fail.actor) match { + case Some(stats) ⇒ if (!props.faultHandler.handleFailure(fail, stats, _children)) throw fail.cause + case None ⇒ app.eventHandler.warning(self, "dropping " + fail + " from unknown child") } - def handleChildTerminated(child: ActorRef): Unit = _children = props.faultHandler.handleChildTerminated(child, _children) + def handleChildTerminated(child: ActorRef): Unit = { + _children -= child + props.faultHandler.handleChildTerminated(child, children) + } // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 50d9c243f5..4b5c0ebf32 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -34,6 +34,8 @@ trait ActorRefProvider { private[akka] def createDeathWatch(): DeathWatch private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef + + private[akka] def terminationFuture: Future[AkkaApplication.ExitStatus] } /** @@ -85,6 +87,8 @@ class ActorRefProviderException(message: String) extends AkkaException(message) */ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { + val terminationFuture = new DefaultPromise[AkkaApplication.ExitStatus](Timeout.never)(app.dispatcher) + /** * Top-level anchor for the supervision hierarchy of this actor system. Will * receive only Supervise/ChildTerminated system messages or Failure message. @@ -96,9 +100,12 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider { protected[akka] override def postMessageToMailbox(msg: Any, channel: UntypedChannel) { msg match { - case Failed(child, ex) ⇒ child.stop() - case ChildTerminated(child, ex) ⇒ // TODO execute any installed termination handlers - case _ ⇒ app.eventHandler.error(this, this + " received unexpected message " + msg) + case Failed(child, ex) ⇒ child.stop() + case ChildTerminated(child, ex) ⇒ ex match { + case a: ActorKilledException if a.getMessage == "Stopped" ⇒ terminationFuture.completeWithResult(AkkaApplication.Stopped) + case x ⇒ terminationFuture.completeWithResult(AkkaApplication.Failed(x)) + } + case _ ⇒ app.eventHandler.error(this, this + " received unexpected message " + msg) } } diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index d654539e5d..873a9f1451 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -9,7 +9,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import java.lang.{ Iterable ⇒ JIterable } -case class ChildRestartStats(val child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { +case class ChildRestartStats(var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) { def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = retriesWindow match { @@ -51,8 +51,8 @@ object FaultHandlingStrategy { case object Stop extends Action case object Escalate extends Action - type Decider = PartialFunction[Class[_ <: Throwable], Action] - type JDecider = akka.japi.Function[Class[_ <: Throwable], Action] + type Decider = PartialFunction[Throwable, Action] + type JDecider = akka.japi.Function[Throwable, Action] type CauseAction = (Class[_ <: Throwable], Action) /** @@ -60,14 +60,14 @@ object FaultHandlingStrategy { * the given Throwables matches the cause and restarts, otherwise escalates. */ def makeDecider(trapExit: Array[Class[_ <: Throwable]]): Decider = - { case x ⇒ if (trapExit exists (_ isAssignableFrom x)) Restart else Escalate } + { case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate } /** * Backwards compatible Decider builder which just checks whether one of * the given Throwables matches the cause and restarts, otherwise escalates. */ def makeDecider(trapExit: List[Class[_ <: Throwable]]): Decider = - { case x ⇒ if (trapExit exists (_ isAssignableFrom x)) Restart else Escalate } + { case x ⇒ if (trapExit exists (_ isInstance x)) Restart else Escalate } /** * Backwards compatible Decider builder which just checks whether one of @@ -83,7 +83,7 @@ object FaultHandlingStrategy { */ def makeDecider(flat: Iterable[CauseAction]): Decider = { val actions = sort(flat) - return { case x ⇒ actions find (_._1 isAssignableFrom x) map (_._2) getOrElse Escalate } + return { case x ⇒ actions find (_._1 isInstance x) map (_._2) getOrElse Escalate } } def makeDecider(func: JDecider): Decider = { @@ -110,30 +110,36 @@ abstract class FaultHandlingStrategy { def decider: Decider - def handleChildTerminated(child: ActorRef, children: Vector[ChildRestartStats]): Vector[ChildRestartStats] + /** + * This method is called after the child has been removed from the set of children. + */ + def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit - def processFailure(restart: Boolean, fail: Failed, children: Vector[ChildRestartStats]): Unit + /** + * This method is called to act on the failure of a child: restart if the flag is true, stop otherwise. + */ + def processFailure(restart: Boolean, fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit - def handleSupervisorFailing(supervisor: ActorRef, children: Vector[ChildRestartStats]): Unit = { + def handleSupervisorFailing(supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { if (children.nonEmpty) - children.foreach(_.child.suspend()) + children.foreach(_.suspend()) } - def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Vector[ChildRestartStats]): Unit = { + def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, children: Iterable[ActorRef]): Unit = { if (children.nonEmpty) - children.foreach(_.child.restart(cause)) + children.foreach(_.restart(cause)) } /** * Returns whether it processed the failure or not */ - final def handleFailure(fail: Failed, children: Vector[ChildRestartStats]): Boolean = { - val cause = fail.cause.getClass + final def handleFailure(fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Boolean = { + val cause = fail.cause val action = if (decider.isDefinedAt(cause)) decider(cause) else Escalate action match { case Resume ⇒ fail.actor.resume(); true - case Restart ⇒ processFailure(true, fail, children); true - case Stop ⇒ processFailure(false, fail, children); true + case Restart ⇒ processFailure(true, fail, stats, children); true + case Stop ⇒ processFailure(false, fail, stats, children); true case Escalate ⇒ false } } @@ -181,18 +187,17 @@ case class AllForOneStrategy(decider: FaultHandlingStrategy.Decider, */ val retriesWindow = (maxNrOfRetries, withinTimeRange) - def handleChildTerminated(child: ActorRef, children: Vector[ChildRestartStats]): Vector[ChildRestartStats] = { - children collect { - case stats if stats.child != child ⇒ stats.child.stop(); stats //2 birds with one stone: remove the child + stop the other children - } //TODO optimization to drop all children here already? + def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = { + children foreach (_.stop()) + //TODO optimization to drop all children here already? } - def processFailure(restart: Boolean, fail: Failed, children: Vector[ChildRestartStats]): Unit = { + def processFailure(restart: Boolean, fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = { if (children.nonEmpty) { - if (restart && children.forall(_.requestRestartPermission(retriesWindow))) - children.foreach(_.child.restart(fail.cause)) + if (restart && children.forall(_._2.requestRestartPermission(retriesWindow))) + children.foreach(_._1.restart(fail.cause)) else - children.foreach(_.child.stop()) + children.foreach(_._1.stop()) } } } @@ -239,18 +244,13 @@ case class OneForOneStrategy(decider: FaultHandlingStrategy.Decider, */ val retriesWindow = (maxNrOfRetries, withinTimeRange) - def handleChildTerminated(child: ActorRef, children: Vector[ChildRestartStats]): Vector[ChildRestartStats] = - children.filterNot(_.child == child) // TODO: check: I think this copies the whole vector in addition to allocating a closure ... + def handleChildTerminated(child: ActorRef, children: Iterable[ActorRef]): Unit = {} - def processFailure(restart: Boolean, fail: Failed, children: Vector[ChildRestartStats]): Unit = { - children.find(_.child == fail.actor) match { - case Some(stats) ⇒ - if (restart && stats.requestRestartPermission(retriesWindow)) - fail.actor.restart(fail.cause) - else - fail.actor.stop() //TODO optimization to drop child here already? - case None ⇒ throw new AssertionError("Got Failure from non-child: " + fail) - } + def processFailure(restart: Boolean, fail: Failed, stats: ChildRestartStats, children: Iterable[(ActorRef, ChildRestartStats)]): Unit = { + if (restart && stats.requestRestartPermission(retriesWindow)) + fail.actor.restart(fail.cause) + else + fail.actor.stop() //TODO optimization to drop child here already? } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index df730efd8c..0885839b3e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -29,6 +29,9 @@ private[dispatch] object Mailbox { final val Closed = 2 // secondary status: Scheduled bit may be added to Open/Suspended final val Scheduled = 4 + + // static constant for enabling println debugging of message processing (for hardcore bugs) + final val debug = false } /** @@ -147,9 +150,8 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag } final def run = { - try { processMailbox() } catch { - case ie: InterruptedException ⇒ Thread.currentThread().interrupt() //Restore interrupt - } finally { + try processMailbox() + finally { setAsIdle() dispatcher.registerForExecution(this, false, false) } @@ -170,6 +172,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag var processedMessages = 0 val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 do { + if (debug) println(actor + " processing message " + nextMessage.message + " from " + nextMessage.channel) nextMessage.invoke processAllSystemMessages() //After we're done, process all system messages @@ -193,11 +196,13 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag var nextMessage = systemDrain() try { while (nextMessage ne null) { + if (debug) println(actor + " processing system message " + nextMessage) actor systemInvoke nextMessage nextMessage = nextMessage.next // don’t ever execute normal message when system message present! if (nextMessage eq null) nextMessage = systemDrain() } + if (debug) println(actor + " has finished processing system messages") } catch { case e ⇒ actor.app.eventHandler.error(e, this, "exception during processing system messages, dropping " + SystemMessage.size(nextMessage) + " messages!") @@ -239,6 +244,7 @@ trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemEnqueue(message: SystemMessage): Unit = { + if (Mailbox.debug) println(actor + " having system message enqueued: " + message) val head = systemQueueGet /* * this write is safely published by the compareAndSet contained within diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index e08f3b7d9a..b4341e949d 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -4,14 +4,27 @@ package akka.testkit import akka.config.Configuration -import org.scalatest.WordSpec +import org.scalatest.{ WordSpec, BeforeAndAfterAll } import org.scalatest.matchers.MustMatchers import akka.AkkaApplication import akka.actor.{ Actor, ActorRef, Props } import akka.dispatch.MessageDispatcher abstract class AkkaSpec(_application: AkkaApplication = AkkaApplication()) - extends TestKit(_application) with WordSpec with MustMatchers { + extends TestKit(_application) with WordSpec with MustMatchers with BeforeAndAfterAll { + + final override def beforeAll { + atStartup() + } + + final override def afterAll { + app.stop() + atTermination() + } + + protected def atStartup() {} + + protected def atTermination() {} def this(config: Configuration) = this(new AkkaApplication(getClass.getSimpleName, AkkaApplication.defaultConfig ++ config))