add tests for guardian strategies, see #2376

- this discovered some pretty nice race conditions when creating actors
  synchronously (i.e. system.actorOf) vs. the recent fault-handling fix
  which discards Failed from old incarnations of a certain child
- as a consequence, all actor creation MUST be registered with the
  parent before dispatching the Supervise message
This commit is contained in:
Roland 2012-08-15 15:25:43 +02:00
parent 496fd331db
commit f7ea9bf3dd
10 changed files with 116 additions and 60 deletions

View file

@ -4,7 +4,6 @@
package akka.actor
import language.postfixOps
import akka.testkit._
import org.scalatest.junit.JUnitSuite
import com.typesafe.config.ConfigFactory
@ -12,9 +11,9 @@ import scala.concurrent.Await
import scala.concurrent.util.duration._
import scala.collection.JavaConverters
import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue }
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Future
import akka.pattern.ask
class JavaExtensionSpec extends JavaExtension with JUnitSuite
@ -62,6 +61,12 @@ object ActorSystemSpec {
}
}
class Strategy extends SupervisorStrategyConfigurator {
def create() = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -187,6 +192,47 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
created filter (ref !ref.isTerminated && !ref.asInstanceOf[ActorRefWithCell].underlying.isInstanceOf[UnstartedCell]) must be(Seq())
}
"shut down when /user fails" in {
implicit val system = ActorSystem("Stop", AkkaSpec.testConf)
EventFilter[ActorKilledException]() intercept {
system.actorFor("/user") ! Kill
awaitCond(system.isTerminated)
}
}
"allow configuration of guardian supervisor strategy" in {
implicit val system = ActorSystem("Stop",
ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=akka.actor.StoppingSupervisorStrategy")
.withFallback(AkkaSpec.testConf))
val a = system.actorOf(Props(new Actor {
def receive = {
case "die" throw new Exception("hello")
}
}))
val probe = TestProbe()
probe.watch(a)
EventFilter[Exception]("hello", occurrences = 1) intercept {
a ! "die"
}
probe.expectMsg(Terminated(a)(true))
}
"shut down when /user escalates" in {
implicit val system = ActorSystem("Stop",
ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=\"akka.actor.ActorSystemSpec$Strategy\"")
.withFallback(AkkaSpec.testConf))
val a = system.actorOf(Props(new Actor {
def receive = {
case "die" throw new Exception("hello")
}
}))
EventFilter[Exception]("hello") intercept {
Thread.sleep(250)
a ! "die"
awaitCond(system.isTerminated)
}
}
}
}

View file

@ -13,7 +13,7 @@ import scala.util.control.NonFatal
import akka.actor.cell.ChildrenContainer
import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, MessageDispatcher, Envelope, Create, ChildTerminated }
import akka.event.Logging.{ LogEvent, Debug }
import akka.event.Logging.{ LogEvent, Debug, Error }
import akka.japi.Procedure
/**
@ -213,7 +213,7 @@ private[akka] trait Cell {
/**
* Get the stats for the named child, if that exists.
*/
def getChildByName(name: String): Option[ChildRestartStats]
def getChildByName(name: String): Option[ChildStats]
/**
* Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is.
@ -368,7 +368,7 @@ private[akka] class ActorCell(
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) for (c getChildByName(name)) c.child.tell(m, msg.sender)
case SelectChildName(name, m) getChildByName(name) match { case Some(c: ChildRestartStats) c.child.tell(m, msg.sender); case _ }
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
}
}
@ -444,9 +444,13 @@ private[akka] class ActorCell(
private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) {
// Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure()
addChild(child).uid = uid
handleSupervise(child)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
initChild(child) match {
case Some(crs)
crs.uid = uid
handleSupervise(child)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case None publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well"))
}
}
// future extension point

View file

@ -306,8 +306,8 @@ private[akka] class LocalActorRef private[akka] (
*/
protected def getSingleChild(name: String): InternalActorRef =
actorCell.getChildByName(name) match {
case Some(crs) crs.child.asInstanceOf[InternalActorRef]
case None Nobody
case Some(crs: ChildRestartStats) crs.child.asInstanceOf[InternalActorRef]
case _ Nobody
}
override def getChild(names: Iterator[String]): InternalActorRef = {

View file

@ -429,7 +429,7 @@ class LocalActorRefProvider(
*/
protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
lazy val rootGuardian: InternalActorRef =
lazy val rootGuardian: LocalActorRef =
new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) {
override def getParent: InternalActorRef = this
override def getSingleChild(name: String): InternalActorRef = name match {
@ -438,11 +438,15 @@ class LocalActorRefProvider(
}
}
lazy val guardian: LocalActorRef =
lazy val guardian: LocalActorRef = {
rootGuardian.underlying.reserveChild("user")
new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user")
}
lazy val systemGuardian: LocalActorRef =
lazy val systemGuardian: LocalActorRef = {
rootGuardian.underlying.reserveChild("system")
new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy)), rootGuardian, rootPath / "system")
}
lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log)

View file

@ -24,9 +24,11 @@ private[akka] case object ChildNameReserved extends ChildStats
* ChildRestartStats is the statistics kept by every parent Actor for every child Actor
* and is used for SupervisorStrategies to know how to deal with problems that occur for the children.
*/
case class ChildRestartStats(child: ActorRef, var uid: Int = 0, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L)
case class ChildRestartStats(child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L)
extends ChildStats {
var uid = 0
//FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies?
def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean =
retriesWindow match {

View file

@ -108,8 +108,8 @@ private[akka] class RepointableActorRef(
case "" getChild(name)
case other
underlying.getChildByName(other) match {
case Some(crs) crs.child.asInstanceOf[InternalActorRef].getChild(name)
case None Nobody
case Some(crs: ChildRestartStats) crs.child.asInstanceOf[InternalActorRef].getChild(name)
case _ Nobody
}
}
} else this

View file

@ -7,11 +7,12 @@ package akka.actor.cell
import scala.annotation.tailrec
import scala.collection.JavaConverters.asJavaIterableConverter
import scala.util.control.NonFatal
import akka.actor.{ RepointableRef, Props, NoSerializationVerificationNeeded, InvalidActorNameException, InternalActorRef, ChildRestartStats, ActorRef }
import akka.actor._
import akka.actor.ActorCell
import akka.actor.ActorPath.ElementRegex
import akka.serialization.SerializationExtension
import akka.util.{ Unsafe, Helpers }
import akka.actor.ChildNameReserved
private[akka] trait Children { this: ActorCell
@ -57,7 +58,7 @@ private[akka] trait Children { this: ActorCell ⇒
@inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren)
@tailrec final protected def reserveChild(name: String): Boolean = {
@tailrec final def reserveChild(name: String): Boolean = {
val c = childrenRefs
swapChildrenRefs(c, c.reserve(name)) || reserveChild(name)
}
@ -67,24 +68,16 @@ private[akka] trait Children { this: ActorCell ⇒
swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name)
}
final protected def addChild(ref: ActorRef): ChildRestartStats = {
@tailrec def rec(): ChildRestartStats = {
val c = childrenRefs
val nc = c.add(ref)
if (swapChildrenRefs(c, nc)) nc.getByName(ref.path.name).get else rec()
@tailrec final protected def initChild(ref: ActorRef): Option[ChildRestartStats] =
childrenRefs.getByName(ref.path.name) match {
case old @ Some(_: ChildRestartStats) old.asInstanceOf[Option[ChildRestartStats]]
case Some(ChildNameReserved)
val crs = ChildRestartStats(ref)
val name = ref.path.name
val c = childrenRefs
if (swapChildrenRefs(c, c.add(name, crs))) Some(crs) else initChild(ref)
case None None
}
/*
* This does not need to check getByRef every tailcall, because the change
* cannot happen in that direction as a race: the only entity removing a
* child is the actor itself, and the only entity which could be racing is
* somebody who calls attachChild, and there we are guaranteed that that
* child cannot yet have died (since it has not yet been created).
*/
childrenRefs.getByRef(ref) match {
case Some(old) old
case None rec()
}
}
@tailrec final protected def shallDie(ref: ActorRef): Boolean = {
val c = childrenRefs
@ -123,17 +116,17 @@ private[akka] trait Children { this: ActorCell ⇒
protected def suspendChildren(exceptFor: Set[ActorRef] = Set.empty): Unit =
childrenRefs.stats foreach {
case ChildRestartStats(child, _, _, _) if !(exceptFor contains child) child.asInstanceOf[InternalActorRef].suspend()
case ChildRestartStats(child, _, _) if !(exceptFor contains child) child.asInstanceOf[InternalActorRef].suspend()
case _
}
protected def resumeChildren(causedByFailure: Throwable, perp: ActorRef): Unit =
childrenRefs.stats foreach {
case ChildRestartStats(child: InternalActorRef, _, _, _)
case ChildRestartStats(child: InternalActorRef, _, _)
child.resume(if (perp == child) causedByFailure else null)
}
def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name)
def getChildByName(name: String): Option[ChildStats] = childrenRefs.getByName(name)
protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref)
@ -193,7 +186,7 @@ private[akka] trait Children { this: ActorCell ⇒
}
// mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise
if (mailbox ne null) for (_ 1 to mailbox.suspendCount) actor.suspend()
addChild(actor)
initChild(actor)
actor
}
}

View file

@ -14,10 +14,10 @@ import akka.dispatch.SystemMessage
*/
private[akka] trait ChildrenContainer {
def add(child: ActorRef): ChildrenContainer
def add(name: String, stats: ChildRestartStats): ChildrenContainer
def remove(child: ActorRef): ChildrenContainer
def getByName(name: String): Option[ChildRestartStats]
def getByName(name: String): Option[ChildStats]
def getByRef(actor: ActorRef): Option[ChildRestartStats]
def children: Iterable[ActorRef]
@ -50,8 +50,7 @@ private[akka] object ChildrenContainer {
trait EmptyChildrenContainer extends ChildrenContainer {
val emptyStats = TreeMap.empty[String, ChildStats]
override def add(child: ActorRef): ChildrenContainer =
new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child)))
override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, stats))
override def remove(child: ActorRef): ChildrenContainer = this
override def getByName(name: String): Option[ChildRestartStats] = None
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = None
@ -75,7 +74,7 @@ private[akka] object ChildrenContainer {
* empty state while calling handleChildTerminated() for the last time.
*/
object TerminatedChildrenContainer extends EmptyChildrenContainer {
override def add(child: ActorRef): ChildrenContainer = this
override def add(name: String, stats: ChildRestartStats): ChildrenContainer = this
override def reserve(name: String): ChildrenContainer =
throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated")
override def isTerminating: Boolean = true
@ -91,22 +90,18 @@ private[akka] object ChildrenContainer {
*/
class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer {
override def add(child: ActorRef): ChildrenContainer =
new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child)))
override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(c.updated(name, stats))
override def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name)
override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match {
case s @ Some(_: ChildRestartStats) s.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
override def getByName(name: String): Option[ChildStats] = c.get(name)
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match {
case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _, _) child }
override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) child }
override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats c }
@ -146,7 +141,7 @@ private[akka] object ChildrenContainer {
case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason)
extends ChildrenContainer {
override def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child)))
override def add(name: String, stats: ChildRestartStats): ChildrenContainer = copy(c.updated(name, stats))
override def remove(child: ActorRef): ChildrenContainer = {
val t = toDie - child
@ -157,17 +152,14 @@ private[akka] object ChildrenContainer {
else copy(c - child.path.name, t)
}
override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match {
case s @ Some(_: ChildRestartStats) s.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
override def getByName(name: String): Option[ChildStats] = c.get(name)
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match {
case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _, _) child }
override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) child }
override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats c }

View file

@ -201,7 +201,11 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
*/
case Some(stats) if stats.uid == uid
if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause
case _ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
case Some(stats)
publish(Debug(self.path.toString, clazz(actor),
"dropping Failed(" + cause + ") from old child " + child + " (uid=" + stats.uid + " != " + uid + ")"))
case None
publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
}
final protected def handleChildTerminated(child: ActorRef): SystemMessage = {

View file

@ -25,7 +25,17 @@ class TestActorRef[T <: Actor](
_props: Props,
_supervisor: InternalActorRef,
name: String)
extends LocalActorRef(
extends {
private val disregard = _supervisor match {
case l: LocalActorRef l.underlying.reserveChild(name)
case r: RepointableActorRef r.underlying match {
case u: UnstartedCell throw new IllegalStateException("cannot attach a TestActor to an unstarted top-level actor, ensure that it is started by sending a message and observing the reply")
case c: ActorCell c.reserveChild(name)
case o _system.log.error("trying to attach child {} to unknown type of supervisor cell {}, this is not going to end well", name, o.getClass)
}
case s _system.log.error("trying to attach child {} to unknown type of supervisor {}, this is not going to end well", name, s.getClass)
}
} with LocalActorRef(
_system,
_props.withDispatcher(
if (_props.dispatcher == Dispatchers.DefaultDispatcherId) CallingThreadDispatcher.Id
@ -119,8 +129,9 @@ object TestActorRef {
def apply[T <: Actor](props: Props, name: String)(implicit system: ActorSystem): TestActorRef[T] =
apply[T](props, system.asInstanceOf[ActorSystemImpl].guardian, name)
def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] =
def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = {
new TestActorRef(system.asInstanceOf[ActorSystemImpl], system.dispatchers.prerequisites, props, supervisor.asInstanceOf[InternalActorRef], name)
}
def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName)