diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index ad3f5e2870..06fbc99dfd 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -4,7 +4,6 @@ package akka.actor import language.postfixOps - import akka.testkit._ import org.scalatest.junit.JUnitSuite import com.typesafe.config.ConfigFactory @@ -12,9 +11,9 @@ import scala.concurrent.Await import scala.concurrent.util.duration._ import scala.collection.JavaConverters import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue } -import akka.pattern.ask import akka.util.Timeout import scala.concurrent.Future +import akka.pattern.ask class JavaExtensionSpec extends JavaExtension with JUnitSuite @@ -62,6 +61,12 @@ object ActorSystemSpec { } } + class Strategy extends SupervisorStrategyConfigurator { + def create() = OneForOneStrategy() { + case _ ⇒ SupervisorStrategy.Escalate + } + } + } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -187,6 +192,47 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt created filter (ref ⇒ !ref.isTerminated && !ref.asInstanceOf[ActorRefWithCell].underlying.isInstanceOf[UnstartedCell]) must be(Seq()) } + "shut down when /user fails" in { + implicit val system = ActorSystem("Stop", AkkaSpec.testConf) + EventFilter[ActorKilledException]() intercept { + system.actorFor("/user") ! Kill + awaitCond(system.isTerminated) + } + } + + "allow configuration of guardian supervisor strategy" in { + implicit val system = ActorSystem("Stop", + ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=akka.actor.StoppingSupervisorStrategy") + .withFallback(AkkaSpec.testConf)) + val a = system.actorOf(Props(new Actor { + def receive = { + case "die" ⇒ throw new Exception("hello") + } + })) + val probe = TestProbe() + probe.watch(a) + EventFilter[Exception]("hello", occurrences = 1) intercept { + a ! "die" + } + probe.expectMsg(Terminated(a)(true)) + } + + "shut down when /user escalates" in { + implicit val system = ActorSystem("Stop", + ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=\"akka.actor.ActorSystemSpec$Strategy\"") + .withFallback(AkkaSpec.testConf)) + val a = system.actorOf(Props(new Actor { + def receive = { + case "die" ⇒ throw new Exception("hello") + } + })) + EventFilter[Exception]("hello") intercept { + Thread.sleep(250) + a ! "die" + awaitCond(system.isTerminated) + } + } + } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 40938ec809..209ed70478 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -13,7 +13,7 @@ import scala.util.control.NonFatal import akka.actor.cell.ChildrenContainer import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, MessageDispatcher, Envelope, Create, ChildTerminated } -import akka.event.Logging.{ LogEvent, Debug } +import akka.event.Logging.{ LogEvent, Debug, Error } import akka.japi.Procedure /** @@ -213,7 +213,7 @@ private[akka] trait Cell { /** * Get the stats for the named child, if that exists. */ - def getChildByName(name: String): Option[ChildRestartStats] + def getChildByName(name: String): Option[ChildStats] /** * Enqueue a message to be sent to the actor; may or may not actually * schedule the actor to run, depending on which type of cell it is. @@ -368,7 +368,7 @@ private[akka] class ActorCell( case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ self.stop() case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ for (c ← getChildByName(name)) c.child.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) } } @@ -444,9 +444,13 @@ private[akka] class ActorCell( private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) { // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() - addChild(child).uid = uid - handleSupervise(child) - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) + initChild(child) match { + case Some(crs) ⇒ + crs.uid = uid + handleSupervise(child) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) + case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well")) + } } // future extension point diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 5ad951b89e..30aacac0d8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -306,8 +306,8 @@ private[akka] class LocalActorRef private[akka] ( */ protected def getSingleChild(name: String): InternalActorRef = actorCell.getChildByName(name) match { - case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef] - case None ⇒ Nobody + case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef] + case _ ⇒ Nobody } override def getChild(names: Iterator[String]): InternalActorRef = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 52cab34e63..422636c1ed 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -429,7 +429,7 @@ class LocalActorRefProvider( */ protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy - lazy val rootGuardian: InternalActorRef = + lazy val rootGuardian: LocalActorRef = new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { override def getParent: InternalActorRef = this override def getSingleChild(name: String): InternalActorRef = name match { @@ -438,11 +438,15 @@ class LocalActorRefProvider( } } - lazy val guardian: LocalActorRef = + lazy val guardian: LocalActorRef = { + rootGuardian.underlying.reserveChild("user") new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user") + } - lazy val systemGuardian: LocalActorRef = + lazy val systemGuardian: LocalActorRef = { + rootGuardian.underlying.reserveChild("system") new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy)), rootGuardian, rootPath / "system") + } lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 9623c7eef7..58c9a62967 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -24,9 +24,11 @@ private[akka] case object ChildNameReserved extends ChildStats * ChildRestartStats is the statistics kept by every parent Actor for every child Actor * and is used for SupervisorStrategies to know how to deal with problems that occur for the children. */ -case class ChildRestartStats(child: ActorRef, var uid: Int = 0, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) +case class ChildRestartStats(child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) extends ChildStats { + var uid = 0 + //FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies? def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = retriesWindow match { diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 450a7afc34..7c38c0cff5 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -108,8 +108,8 @@ private[akka] class RepointableActorRef( case "" ⇒ getChild(name) case other ⇒ underlying.getChildByName(other) match { - case Some(crs) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) - case None ⇒ Nobody + case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) + case _ ⇒ Nobody } } } else this diff --git a/akka-actor/src/main/scala/akka/actor/cell/Children.scala b/akka-actor/src/main/scala/akka/actor/cell/Children.scala index b2761519fe..eac359b8ab 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/Children.scala @@ -7,11 +7,12 @@ package akka.actor.cell import scala.annotation.tailrec import scala.collection.JavaConverters.asJavaIterableConverter import scala.util.control.NonFatal -import akka.actor.{ RepointableRef, Props, NoSerializationVerificationNeeded, InvalidActorNameException, InternalActorRef, ChildRestartStats, ActorRef } +import akka.actor._ import akka.actor.ActorCell import akka.actor.ActorPath.ElementRegex import akka.serialization.SerializationExtension import akka.util.{ Unsafe, Helpers } +import akka.actor.ChildNameReserved private[akka] trait Children { this: ActorCell ⇒ @@ -57,7 +58,7 @@ private[akka] trait Children { this: ActorCell ⇒ @inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren) - @tailrec final protected def reserveChild(name: String): Boolean = { + @tailrec final def reserveChild(name: String): Boolean = { val c = childrenRefs swapChildrenRefs(c, c.reserve(name)) || reserveChild(name) } @@ -67,24 +68,16 @@ private[akka] trait Children { this: ActorCell ⇒ swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name) } - final protected def addChild(ref: ActorRef): ChildRestartStats = { - @tailrec def rec(): ChildRestartStats = { - val c = childrenRefs - val nc = c.add(ref) - if (swapChildrenRefs(c, nc)) nc.getByName(ref.path.name).get else rec() + @tailrec final protected def initChild(ref: ActorRef): Option[ChildRestartStats] = + childrenRefs.getByName(ref.path.name) match { + case old @ Some(_: ChildRestartStats) ⇒ old.asInstanceOf[Option[ChildRestartStats]] + case Some(ChildNameReserved) ⇒ + val crs = ChildRestartStats(ref) + val name = ref.path.name + val c = childrenRefs + if (swapChildrenRefs(c, c.add(name, crs))) Some(crs) else initChild(ref) + case None ⇒ None } - /* - * This does not need to check getByRef every tailcall, because the change - * cannot happen in that direction as a race: the only entity removing a - * child is the actor itself, and the only entity which could be racing is - * somebody who calls attachChild, and there we are guaranteed that that - * child cannot yet have died (since it has not yet been created). - */ - childrenRefs.getByRef(ref) match { - case Some(old) ⇒ old - case None ⇒ rec() - } - } @tailrec final protected def shallDie(ref: ActorRef): Boolean = { val c = childrenRefs @@ -123,17 +116,17 @@ private[akka] trait Children { this: ActorCell ⇒ protected def suspendChildren(exceptFor: Set[ActorRef] = Set.empty): Unit = childrenRefs.stats foreach { - case ChildRestartStats(child, _, _, _) if !(exceptFor contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend() + case ChildRestartStats(child, _, _) if !(exceptFor contains child) ⇒ child.asInstanceOf[InternalActorRef].suspend() case _ ⇒ } protected def resumeChildren(causedByFailure: Throwable, perp: ActorRef): Unit = childrenRefs.stats foreach { - case ChildRestartStats(child: InternalActorRef, _, _, _) ⇒ + case ChildRestartStats(child: InternalActorRef, _, _) ⇒ child.resume(if (perp == child) causedByFailure else null) } - def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name) + def getChildByName(name: String): Option[ChildStats] = childrenRefs.getByName(name) protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref) @@ -193,7 +186,7 @@ private[akka] trait Children { this: ActorCell ⇒ } // mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise if (mailbox ne null) for (_ ← 1 to mailbox.suspendCount) actor.suspend() - addChild(actor) + initChild(actor) actor } } diff --git a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala index 38bd239db6..e7b5fcc441 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/ChildrenContainer.scala @@ -14,10 +14,10 @@ import akka.dispatch.SystemMessage */ private[akka] trait ChildrenContainer { - def add(child: ActorRef): ChildrenContainer + def add(name: String, stats: ChildRestartStats): ChildrenContainer def remove(child: ActorRef): ChildrenContainer - def getByName(name: String): Option[ChildRestartStats] + def getByName(name: String): Option[ChildStats] def getByRef(actor: ActorRef): Option[ChildRestartStats] def children: Iterable[ActorRef] @@ -50,8 +50,7 @@ private[akka] object ChildrenContainer { trait EmptyChildrenContainer extends ChildrenContainer { val emptyStats = TreeMap.empty[String, ChildStats] - override def add(child: ActorRef): ChildrenContainer = - new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child))) + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, stats)) override def remove(child: ActorRef): ChildrenContainer = this override def getByName(name: String): Option[ChildRestartStats] = None override def getByRef(actor: ActorRef): Option[ChildRestartStats] = None @@ -75,7 +74,7 @@ private[akka] object ChildrenContainer { * empty state while calling handleChildTerminated() for the last time. */ object TerminatedChildrenContainer extends EmptyChildrenContainer { - override def add(child: ActorRef): ChildrenContainer = this + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = this override def reserve(name: String): ChildrenContainer = throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated") override def isTerminating: Boolean = true @@ -91,22 +90,18 @@ private[akka] object ChildrenContainer { */ class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer { - override def add(child: ActorRef): ChildrenContainer = - new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child))) + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(c.updated(name, stats)) override def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name) - override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { - case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] - case _ ⇒ None - } + override def getByName(name: String): Option[ChildStats] = c.get(name) override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] case _ ⇒ None } - override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _, _) ⇒ child } + override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats ⇒ c } @@ -146,7 +141,7 @@ private[akka] object ChildrenContainer { case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason) extends ChildrenContainer { - override def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child))) + override def add(name: String, stats: ChildRestartStats): ChildrenContainer = copy(c.updated(name, stats)) override def remove(child: ActorRef): ChildrenContainer = { val t = toDie - child @@ -157,17 +152,14 @@ private[akka] object ChildrenContainer { else copy(c - child.path.name, t) } - override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { - case s @ Some(_: ChildRestartStats) ⇒ s.asInstanceOf[Option[ChildRestartStats]] - case _ ⇒ None - } + override def getByName(name: String): Option[ChildStats] = c.get(name) override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { case c @ Some(crs: ChildRestartStats) if (crs.child == actor) ⇒ c.asInstanceOf[Option[ChildRestartStats]] case _ ⇒ None } - override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _, _) ⇒ child } + override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) ⇒ child } override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats ⇒ c } diff --git a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala index 1d51777db1..3eaea06a22 100644 --- a/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/cell/FaultHandling.scala @@ -201,7 +201,11 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ */ case Some(stats) if stats.uid == uid ⇒ if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause - case _ ⇒ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) + case Some(stats) ⇒ + publish(Debug(self.path.toString, clazz(actor), + "dropping Failed(" + cause + ") from old child " + child + " (uid=" + stats.uid + " != " + uid + ")")) + case None ⇒ + publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) } final protected def handleChildTerminated(child: ActorRef): SystemMessage = { diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index d961a75fff..33559cd56c 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -25,7 +25,17 @@ class TestActorRef[T <: Actor]( _props: Props, _supervisor: InternalActorRef, name: String) - extends LocalActorRef( + extends { + private val disregard = _supervisor match { + case l: LocalActorRef ⇒ l.underlying.reserveChild(name) + case r: RepointableActorRef ⇒ r.underlying match { + case u: UnstartedCell ⇒ throw new IllegalStateException("cannot attach a TestActor to an unstarted top-level actor, ensure that it is started by sending a message and observing the reply") + case c: ActorCell ⇒ c.reserveChild(name) + case o ⇒ _system.log.error("trying to attach child {} to unknown type of supervisor cell {}, this is not going to end well", name, o.getClass) + } + case s ⇒ _system.log.error("trying to attach child {} to unknown type of supervisor {}, this is not going to end well", name, s.getClass) + } + } with LocalActorRef( _system, _props.withDispatcher( if (_props.dispatcher == Dispatchers.DefaultDispatcherId) CallingThreadDispatcher.Id @@ -119,8 +129,9 @@ object TestActorRef { def apply[T <: Actor](props: Props, name: String)(implicit system: ActorSystem): TestActorRef[T] = apply[T](props, system.asInstanceOf[ActorSystemImpl].guardian, name) - def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = + def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = { new TestActorRef(system.asInstanceOf[ActorSystemImpl], system.dispatchers.prerequisites, props, supervisor.asInstanceOf[InternalActorRef], name) + } def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName)