Merge branch 'wip-2376-guardianStrategy-∂π'

This commit is contained in:
Roland 2012-08-17 14:35:39 +02:00
commit 48b68c650d
17 changed files with 248 additions and 164 deletions

View file

@ -4,7 +4,6 @@
package akka.actor package akka.actor
import language.postfixOps import language.postfixOps
import akka.testkit._ import akka.testkit._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
@ -12,9 +11,9 @@ import scala.concurrent.Await
import scala.concurrent.util.duration._ import scala.concurrent.util.duration._
import scala.collection.JavaConverters import scala.collection.JavaConverters
import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue } import java.util.concurrent.{ TimeUnit, RejectedExecutionException, CountDownLatch, ConcurrentLinkedQueue }
import akka.pattern.ask
import akka.util.Timeout import akka.util.Timeout
import scala.concurrent.Future import scala.concurrent.Future
import akka.pattern.ask
class JavaExtensionSpec extends JavaExtension with JUnitSuite class JavaExtensionSpec extends JavaExtension with JUnitSuite
@ -62,6 +61,12 @@ object ActorSystemSpec {
} }
} }
class Strategy extends SupervisorStrategyConfigurator {
def create() = OneForOneStrategy() {
case _ SupervisorStrategy.Escalate
}
}
} }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -187,6 +192,46 @@ class ActorSystemSpec extends AkkaSpec("""akka.extensions = ["akka.actor.TestExt
created filter (ref !ref.isTerminated && !ref.asInstanceOf[ActorRefWithCell].underlying.isInstanceOf[UnstartedCell]) must be(Seq()) created filter (ref !ref.isTerminated && !ref.asInstanceOf[ActorRefWithCell].underlying.isInstanceOf[UnstartedCell]) must be(Seq())
} }
"shut down when /user fails" in {
implicit val system = ActorSystem("Stop", AkkaSpec.testConf)
EventFilter[ActorKilledException]() intercept {
system.actorFor("/user") ! Kill
awaitCond(system.isTerminated)
}
}
"allow configuration of guardian supervisor strategy" in {
implicit val system = ActorSystem("Stop",
ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=akka.actor.StoppingSupervisorStrategy")
.withFallback(AkkaSpec.testConf))
val a = system.actorOf(Props(new Actor {
def receive = {
case "die" throw new Exception("hello")
}
}))
val probe = TestProbe()
probe.watch(a)
EventFilter[Exception]("hello", occurrences = 1) intercept {
a ! "die"
}
probe.expectMsg(Terminated(a)(true))
}
"shut down when /user escalates" in {
implicit val system = ActorSystem("Stop",
ConfigFactory.parseString("akka.actor.guardian-supervisor-strategy=\"akka.actor.ActorSystemSpec$Strategy\"")
.withFallback(AkkaSpec.testConf))
val a = system.actorOf(Props(new Actor {
def receive = {
case "die" throw new Exception("hello")
}
}))
EventFilter[Exception]("hello") intercept {
a ! "die"
awaitCond(system.isTerminated)
}
}
} }
} }

View file

@ -269,8 +269,8 @@ class VerifySerializabilitySpec extends AkkaSpec(VerifySerializabilitySpec.conf)
val a = system.actorOf(Props[FooActor]) val a = system.actorOf(Props[FooActor])
Await.result(a ? "pigdog", timeout.duration) must be("pigdog") Await.result(a ? "pigdog", timeout.duration) must be("pigdog")
intercept[NotSerializableException] { EventFilter[NotSerializableException](occurrences = 1) intercept {
Await.result(a ? new AnyRef, timeout.duration) a ! (new AnyRef)
} }
system stop a system stop a
} }

View file

@ -46,7 +46,14 @@ akka {
actor { actor {
# FQCN of the ActorRefProvider to be used; the below is the built-in default,
# another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle.
provider = "akka.actor.LocalActorRefProvider" provider = "akka.actor.LocalActorRefProvider"
# The guardian "/user" will use this subclass of akka.actor.SupervisorStrategyConfigurator
# to obtain its supervisorStrategy. Besides the default there is
# akka.actor.StoppingSupervisorStrategy
guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy"
# Timeout for ActorSystem.actorOf # Timeout for ActorSystem.actorOf
creation-timeout = 20s creation-timeout = 20s

View file

@ -217,7 +217,7 @@ case class DeathPactException private[akka] (dead: ActorRef)
* avoid cascading interrupts to other threads than the originally interrupted one. * avoid cascading interrupts to other threads than the originally interrupted one.
*/ */
@SerialVersionUID(1L) @SerialVersionUID(1L)
class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause) with NoStackTrace class ActorInterruptedException private[akka] (cause: Throwable) extends AkkaException(cause.getMessage, cause)
/** /**
* This message is published to the EventStream whenever an Actor receives a message it doesn't understand * This message is published to the EventStream whenever an Actor receives a message it doesn't understand

View file

@ -13,7 +13,7 @@ import scala.util.control.NonFatal
import akka.actor.cell.ChildrenContainer import akka.actor.cell.ChildrenContainer
import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, MessageDispatcher, Envelope, Create, ChildTerminated } import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Supervise, Resume, Recreate, NoMessage, MessageDispatcher, Envelope, Create, ChildTerminated }
import akka.event.Logging.{ LogEvent, Debug } import akka.event.Logging.{ LogEvent, Debug, Error }
import akka.japi.Procedure import akka.japi.Procedure
/** /**
@ -182,19 +182,19 @@ private[akka] trait Cell {
*/ */
def systemImpl: ActorSystemImpl def systemImpl: ActorSystemImpl
/** /**
* Recursively suspend this actor and all its children. * Recursively suspend this actor and all its children. Must not throw exceptions.
*/ */
def suspend(): Unit def suspend(): Unit
/** /**
* Recursively resume this actor and all its children. * Recursively resume this actor and all its children. Must not throw exceptions.
*/ */
def resume(causedByFailure: Throwable): Unit def resume(causedByFailure: Throwable): Unit
/** /**
* Restart this actor (will recursively restart or stop all children). * Restart this actor (will recursively restart or stop all children). Must not throw exceptions.
*/ */
def restart(cause: Throwable): Unit def restart(cause: Throwable): Unit
/** /**
* Recursively terminate this actor and all its children. * Recursively terminate this actor and all its children. Must not throw exceptions.
*/ */
def stop(): Unit def stop(): Unit
/** /**
@ -213,15 +213,17 @@ private[akka] trait Cell {
/** /**
* Get the stats for the named child, if that exists. * Get the stats for the named child, if that exists.
*/ */
def getChildByName(name: String): Option[ChildRestartStats] def getChildByName(name: String): Option[ChildStats]
/** /**
* Enqueue a message to be sent to the actor; may or may not actually * Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is. * schedule the actor to run, depending on which type of cell it is.
* Must not throw exceptions.
*/ */
def tell(message: Any, sender: ActorRef): Unit def tell(message: Any, sender: ActorRef): Unit
/** /**
* Enqueue a message to be sent to the actor; may or may not actually * Enqueue a message to be sent to the actor; may or may not actually
* schedule the actor to run, depending on which type of cell it is. * schedule the actor to run, depending on which type of cell it is.
* Must not throw exceptions.
*/ */
def sendSystemMessage(msg: SystemMessage): Unit def sendSystemMessage(msg: SystemMessage): Unit
/** /**
@ -368,7 +370,7 @@ private[akka] class ActorCell(
case Kill throw new ActorKilledException("Kill") case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop() case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender) case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) for (c getChildByName(name)) c.child.tell(m, msg.sender) case SelectChildName(name, m) getChildByName(name) match { case Some(c: ChildRestartStats) c.child.tell(m, msg.sender); case _ }
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
} }
} }
@ -444,9 +446,13 @@ private[akka] class ActorCell(
private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) { private def supervise(child: ActorRef, uid: Int): Unit = if (!isTerminating) {
// Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure()
addChild(child).uid = uid initChild(child) match {
handleSupervise(child) case Some(crs)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) crs.uid = uid
handleSupervise(child)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case None publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well"))
}
} }
// future extension point // future extension point

View file

@ -306,8 +306,8 @@ private[akka] class LocalActorRef private[akka] (
*/ */
protected def getSingleChild(name: String): InternalActorRef = protected def getSingleChild(name: String): InternalActorRef =
actorCell.getChildByName(name) match { actorCell.getChildByName(name) match {
case Some(crs) crs.child.asInstanceOf[InternalActorRef] case Some(crs: ChildRestartStats) crs.child.asInstanceOf[InternalActorRef]
case None Nobody case _ Nobody
} }
override def getChild(names: Iterator[String]): InternalActorRef = { override def getChild(names: Iterator[String]): InternalActorRef = {

View file

@ -294,16 +294,6 @@ trait ActorRefFactory {
def stop(actor: ActorRef): Unit def stop(actor: ActorRef): Unit
} }
/**
* Internal Akka use only, used in implementation of system.actorOf.
*/
private[akka] case class CreateChild(props: Props, name: String)
/**
* Internal Akka use only, used in implementation of system.actorOf.
*/
private[akka] case class CreateRandomNameChild(props: Props)
/** /**
* Internal Akka use only, used in implementation of system.stop(child). * Internal Akka use only, used in implementation of system.stop(child).
*/ */
@ -317,6 +307,7 @@ class LocalActorRefProvider(
override val settings: ActorSystem.Settings, override val settings: ActorSystem.Settings,
val eventStream: EventStream, val eventStream: EventStream,
override val scheduler: Scheduler, override val scheduler: Scheduler,
val dynamicAccess: DynamicAccess,
override val deployer: Deployer) extends ActorRefProvider { override val deployer: Deployer) extends ActorRefProvider {
// this is the constructor needed for reflectively instantiating the provider // this is the constructor needed for reflectively instantiating the provider
@ -329,6 +320,7 @@ class LocalActorRefProvider(
settings, settings,
eventStream, eventStream,
scheduler, scheduler,
dynamicAccess,
new Deployer(settings, dynamicAccess)) new Deployer(settings, dynamicAccess))
override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName)) override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))
@ -380,65 +372,12 @@ class LocalActorRefProvider(
} }
} }
/** private class Guardian(override val supervisorStrategy: SupervisorStrategy, isSystem: Boolean) extends Actor {
* Overridable supervision strategy to be used by the /user guardian.
*/
protected def guardianSupervisionStrategy: SupervisorStrategy = {
import akka.actor.SupervisorStrategy._
OneForOneStrategy() {
case _: ActorKilledException Stop
case _: ActorInitializationException Stop
case _: Exception Restart
}
}
/*
* Guardians can be asked by ActorSystem to create children, i.e. top-level
* actors. Therefore these need to answer to these requests, forwarding any
* exceptions which might have occurred.
*/
private class Guardian extends Actor {
override val supervisorStrategy: SupervisorStrategy = guardianSupervisionStrategy
def receive = { def receive = {
case Terminated(_) context.stop(self) case Terminated(_) if (isSystem) eventStream.stopDefaultLoggers(); context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case NonFatal(e) Status.Failure(e) }) case StopChild(child) context.stop(child)
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case NonFatal(e) Status.Failure(e) }) case m deadLetters ! DeadLetter(m, sender, self)
case StopChild(child) context.stop(child)
case m deadLetters ! DeadLetter(m, sender, self)
}
// guardian MUST NOT lose its children during restart
override def preRestart(cause: Throwable, msg: Option[Any]) {}
}
/**
* Overridable supervision strategy to be used by the /system guardian.
*/
protected def systemGuardianSupervisionStrategy: SupervisorStrategy = {
import akka.actor.SupervisorStrategy._
OneForOneStrategy() {
case _: ActorKilledException | _: ActorInitializationException Stop
case _: Exception Restart
}
}
/*
* Guardians can be asked by ActorSystem to create children, i.e. top-level
* actors. Therefore these need to answer to these requests, forwarding any
* exceptions which might have occurred.
*/
private class SystemGuardian extends Actor {
override val supervisorStrategy: SupervisorStrategy = systemGuardianSupervisionStrategy
def receive = {
case Terminated(_) eventStream.stopDefaultLoggers(); context.stop(self)
case CreateChild(child, name) sender ! (try context.actorOf(child, name) catch { case NonFatal(e) Status.Failure(e) })
case CreateRandomNameChild(child) sender ! (try context.actorOf(child) catch { case NonFatal(e) Status.Failure(e) })
case StopChild(child) context.stop(child); sender ! "ok"
case m deadLetters ! DeadLetter(m, sender, self)
} }
// guardian MUST NOT lose its children during restart // guardian MUST NOT lose its children during restart
@ -472,10 +411,30 @@ class LocalActorRefProvider(
*/ */
def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
private val guardianProps = Props(new Guardian) private def guardianSupervisorStrategyConfigurator =
dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).fold(throw _, x x)
lazy val rootGuardian: InternalActorRef = /**
new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath) { * Overridable supervision strategy to be used by the /user guardian.
*/
protected def rootGuardianStrategy: SupervisorStrategy = OneForOneStrategy() {
case ex
log.error(ex, "guardian failed, shutting down system")
SupervisorStrategy.Stop
}
/**
* Overridable supervision strategy to be used by the /user guardian.
*/
protected def guardianStrategy: SupervisorStrategy = guardianSupervisorStrategyConfigurator.create()
/**
* Overridable supervision strategy to be used by the /user guardian.
*/
protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
lazy val rootGuardian: LocalActorRef =
new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy, isSystem = false)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) {
override def getParent: InternalActorRef = this override def getParent: InternalActorRef = this
override def getSingleChild(name: String): InternalActorRef = name match { override def getSingleChild(name: String): InternalActorRef = name match {
case "temp" tempContainer case "temp" tempContainer
@ -483,10 +442,15 @@ class LocalActorRefProvider(
} }
} }
lazy val guardian: LocalActorRef = new LocalActorRef(system, guardianProps, rootGuardian, rootPath / "user") lazy val guardian: LocalActorRef = {
rootGuardian.underlying.reserveChild("user")
new LocalActorRef(system, Props(new Guardian(guardianStrategy, isSystem = false)), rootGuardian, rootPath / "user")
}
lazy val systemGuardian: LocalActorRef = lazy val systemGuardian: LocalActorRef = {
new LocalActorRef(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system") rootGuardian.underlying.reserveChild("system")
new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy, isSystem = true)), rootGuardian, rootPath / "system")
}
lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log) lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log)
@ -559,4 +523,3 @@ class LocalActorRefProvider(
def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None
} }

View file

@ -133,6 +133,7 @@ object ActorSystem {
final val ConfigVersion = getString("akka.version") final val ConfigVersion = getString("akka.version")
final val ProviderClass = getString("akka.actor.provider") final val ProviderClass = getString("akka.actor.provider")
final val SupervisorStrategyClass = getString("akka.actor.guardian-supervisor-strategy")
final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS)) final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS))
final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")

View file

@ -24,9 +24,11 @@ private[akka] case object ChildNameReserved extends ChildStats
* ChildRestartStats is the statistics kept by every parent Actor for every child Actor * ChildRestartStats is the statistics kept by every parent Actor for every child Actor
* and is used for SupervisorStrategies to know how to deal with problems that occur for the children. * and is used for SupervisorStrategies to know how to deal with problems that occur for the children.
*/ */
case class ChildRestartStats(child: ActorRef, var uid: Int = 0, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) case class ChildRestartStats(child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L)
extends ChildStats { extends ChildStats {
var uid: Int = 0
//FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies? //FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies?
def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean =
retriesWindow match { retriesWindow match {
@ -61,6 +63,23 @@ case class ChildRestartStats(child: ActorRef, var uid: Int = 0, var maxNrOfRetri
} }
} }
/**
* Implement this interface in order to configure the supervisorStrategy for
* the top-level guardian actor (`/user`). An instance of this class must be
* instantiable using a no-arg constructor.
*/
trait SupervisorStrategyConfigurator {
def create(): SupervisorStrategy
}
final class DefaultSupervisorStrategy extends SupervisorStrategyConfigurator {
override def create(): SupervisorStrategy = SupervisorStrategy.defaultStrategy
}
final class StoppingSupervisorStrategy extends SupervisorStrategyConfigurator {
override def create(): SupervisorStrategy = SupervisorStrategy.stoppingStrategy
}
trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type
/** /**
@ -133,11 +152,21 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
case _: ActorInitializationException Stop case _: ActorInitializationException Stop
case _: ActorKilledException Stop case _: ActorKilledException Stop
case _: Exception Restart case _: Exception Restart
case _ Escalate
} }
OneForOneStrategy()(defaultDecider) OneForOneStrategy()(defaultDecider)
} }
/**
* This strategy resembles Erlang in that failing children are always
* terminated (one-for-one).
*/
final val stoppingStrategy: SupervisorStrategy = {
def stoppingDecider: Decider = {
case _: Exception Stop
}
OneForOneStrategy()(stoppingDecider)
}
/** /**
* Implicit conversion from `Seq` of Throwables to a `Decider`. * Implicit conversion from `Seq` of Throwables to a `Decider`.
* This maps the given Throwables to restarts, otherwise escalates. * This maps the given Throwables to restarts, otherwise escalates.

View file

@ -108,8 +108,8 @@ private[akka] class RepointableActorRef(
case "" getChild(name) case "" getChild(name)
case other case other
underlying.getChildByName(other) match { underlying.getChildByName(other) match {
case Some(crs) crs.child.asInstanceOf[InternalActorRef].getChild(name) case Some(crs: ChildRestartStats) crs.child.asInstanceOf[InternalActorRef].getChild(name)
case None Nobody case _ Nobody
} }
} }
} else this } else this
@ -147,27 +147,16 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
* lock, double-tap (well, N-tap, really); concurrent modification is * lock, double-tap (well, N-tap, really); concurrent modification is
* still not possible because were the only thread accessing the queues. * still not possible because were the only thread accessing the queues.
*/ */
var interrupted = false
while (systemQueue.nonEmpty || queue.nonEmpty) { while (systemQueue.nonEmpty || queue.nonEmpty) {
while (systemQueue.nonEmpty) { while (systemQueue.nonEmpty) {
val msg = systemQueue.dequeue() val msg = systemQueue.dequeue()
try cell.sendSystemMessage(msg) cell.sendSystemMessage(msg)
catch {
case _: InterruptedException interrupted = true
}
} }
if (queue.nonEmpty) { if (queue.nonEmpty) {
val envelope = queue.dequeue() val envelope = queue.dequeue()
try cell.tell(envelope.message, envelope.sender) cell.tell(envelope.message, envelope.sender)
catch {
case _: InterruptedException interrupted = true
}
} }
} }
if (interrupted) {
Thread.interrupted() // clear interrupted flag before throwing according to java convention
throw new InterruptedException
}
} finally try } finally try
self.swapCell(cell) self.swapCell(cell)
finally try finally try
@ -223,4 +212,4 @@ private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: Rep
} }
} }
} }

View file

@ -7,11 +7,12 @@ package akka.actor.cell
import scala.annotation.tailrec import scala.annotation.tailrec
import scala.collection.JavaConverters.asJavaIterableConverter import scala.collection.JavaConverters.asJavaIterableConverter
import scala.util.control.NonFatal import scala.util.control.NonFatal
import akka.actor.{ RepointableRef, Props, NoSerializationVerificationNeeded, InvalidActorNameException, InternalActorRef, ChildRestartStats, ActorRef } import akka.actor._
import akka.actor.ActorCell import akka.actor.ActorCell
import akka.actor.ActorPath.ElementRegex import akka.actor.ActorPath.ElementRegex
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import akka.util.{ Unsafe, Helpers } import akka.util.{ Unsafe, Helpers }
import akka.actor.ChildNameReserved
private[akka] trait Children { this: ActorCell private[akka] trait Children { this: ActorCell
@ -61,7 +62,7 @@ private[akka] trait Children { this: ActorCell ⇒
@inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean = @inline private def swapChildrenRefs(oldChildren: ChildrenContainer, newChildren: ChildrenContainer): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren) Unsafe.instance.compareAndSwapObject(this, AbstractActorCell.childrenOffset, oldChildren, newChildren)
@tailrec final protected def reserveChild(name: String): Boolean = { @tailrec final def reserveChild(name: String): Boolean = {
val c = childrenRefs val c = childrenRefs
swapChildrenRefs(c, c.reserve(name)) || reserveChild(name) swapChildrenRefs(c, c.reserve(name)) || reserveChild(name)
} }
@ -71,24 +72,16 @@ private[akka] trait Children { this: ActorCell ⇒
swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name) swapChildrenRefs(c, c.unreserve(name)) || unreserveChild(name)
} }
final protected def addChild(ref: ActorRef): ChildRestartStats = { @tailrec final protected def initChild(ref: ActorRef): Option[ChildRestartStats] =
@tailrec def rec(): ChildRestartStats = { childrenRefs.getByName(ref.path.name) match {
val c = childrenRefs case old @ Some(_: ChildRestartStats) old.asInstanceOf[Option[ChildRestartStats]]
val nc = c.add(ref) case Some(ChildNameReserved)
if (swapChildrenRefs(c, nc)) nc.getByName(ref.path.name).get else rec() val crs = ChildRestartStats(ref)
val name = ref.path.name
val c = childrenRefs
if (swapChildrenRefs(c, c.add(name, crs))) Some(crs) else initChild(ref)
case None None
} }
/*
* This does not need to check getByRef every tailcall, because the change
* cannot happen in that direction as a race: the only entity removing a
* child is the actor itself, and the only entity which could be racing is
* somebody who calls attachChild, and there we are guaranteed that that
* child cannot yet have died (since it has not yet been created).
*/
childrenRefs.getByRef(ref) match {
case Some(old) old
case None rec()
}
}
@tailrec final protected def shallDie(ref: ActorRef): Boolean = { @tailrec final protected def shallDie(ref: ActorRef): Boolean = {
val c = childrenRefs val c = childrenRefs
@ -127,17 +120,17 @@ private[akka] trait Children { this: ActorCell ⇒
protected def suspendChildren(exceptFor: Set[ActorRef] = Set.empty): Unit = protected def suspendChildren(exceptFor: Set[ActorRef] = Set.empty): Unit =
childrenRefs.stats foreach { childrenRefs.stats foreach {
case ChildRestartStats(child, _, _, _) if !(exceptFor contains child) child.asInstanceOf[InternalActorRef].suspend() case ChildRestartStats(child, _, _) if !(exceptFor contains child) child.asInstanceOf[InternalActorRef].suspend()
case _ case _
} }
protected def resumeChildren(causedByFailure: Throwable, perp: ActorRef): Unit = protected def resumeChildren(causedByFailure: Throwable, perp: ActorRef): Unit =
childrenRefs.stats foreach { childrenRefs.stats foreach {
case ChildRestartStats(child: InternalActorRef, _, _, _) case ChildRestartStats(child: InternalActorRef, _, _)
child.resume(if (perp == child) causedByFailure else null) child.resume(if (perp == child) causedByFailure else null)
} }
def getChildByName(name: String): Option[ChildRestartStats] = childrenRefs.getByName(name) def getChildByName(name: String): Option[ChildStats] = childrenRefs.getByName(name)
protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref) protected def getChildByRef(ref: ActorRef): Option[ChildRestartStats] = childrenRefs.getByRef(ref)
@ -197,7 +190,7 @@ private[akka] trait Children { this: ActorCell ⇒
} }
// mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise // mailbox==null during RoutedActorCell constructor, where suspends are queued otherwise
if (mailbox ne null) for (_ 1 to mailbox.suspendCount) actor.suspend() if (mailbox ne null) for (_ 1 to mailbox.suspendCount) actor.suspend()
addChild(actor) initChild(actor)
actor actor
} }
} }

View file

@ -14,10 +14,10 @@ import akka.dispatch.SystemMessage
*/ */
private[akka] trait ChildrenContainer { private[akka] trait ChildrenContainer {
def add(child: ActorRef): ChildrenContainer def add(name: String, stats: ChildRestartStats): ChildrenContainer
def remove(child: ActorRef): ChildrenContainer def remove(child: ActorRef): ChildrenContainer
def getByName(name: String): Option[ChildRestartStats] def getByName(name: String): Option[ChildStats]
def getByRef(actor: ActorRef): Option[ChildRestartStats] def getByRef(actor: ActorRef): Option[ChildRestartStats]
def children: Iterable[ActorRef] def children: Iterable[ActorRef]
@ -50,8 +50,7 @@ private[akka] object ChildrenContainer {
trait EmptyChildrenContainer extends ChildrenContainer { trait EmptyChildrenContainer extends ChildrenContainer {
val emptyStats = TreeMap.empty[String, ChildStats] val emptyStats = TreeMap.empty[String, ChildStats]
override def add(child: ActorRef): ChildrenContainer = override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(emptyStats.updated(name, stats))
new NormalChildrenContainer(emptyStats.updated(child.path.name, ChildRestartStats(child)))
override def remove(child: ActorRef): ChildrenContainer = this override def remove(child: ActorRef): ChildrenContainer = this
override def getByName(name: String): Option[ChildRestartStats] = None override def getByName(name: String): Option[ChildRestartStats] = None
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = None override def getByRef(actor: ActorRef): Option[ChildRestartStats] = None
@ -75,7 +74,7 @@ private[akka] object ChildrenContainer {
* empty state while calling handleChildTerminated() for the last time. * empty state while calling handleChildTerminated() for the last time.
*/ */
object TerminatedChildrenContainer extends EmptyChildrenContainer { object TerminatedChildrenContainer extends EmptyChildrenContainer {
override def add(child: ActorRef): ChildrenContainer = this override def add(name: String, stats: ChildRestartStats): ChildrenContainer = this
override def reserve(name: String): ChildrenContainer = override def reserve(name: String): ChildrenContainer =
throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated") throw new IllegalStateException("cannot reserve actor name '" + name + "': already terminated")
override def isTerminating: Boolean = true override def isTerminating: Boolean = true
@ -91,22 +90,18 @@ private[akka] object ChildrenContainer {
*/ */
class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer { class NormalChildrenContainer(val c: TreeMap[String, ChildStats]) extends ChildrenContainer {
override def add(child: ActorRef): ChildrenContainer = override def add(name: String, stats: ChildRestartStats): ChildrenContainer = new NormalChildrenContainer(c.updated(name, stats))
new NormalChildrenContainer(c.updated(child.path.name, ChildRestartStats(child)))
override def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name) override def remove(child: ActorRef): ChildrenContainer = NormalChildrenContainer(c - child.path.name)
override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { override def getByName(name: String): Option[ChildStats] = c.get(name)
case s @ Some(_: ChildRestartStats) s.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match {
case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]] case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]]
case _ None case _ None
} }
override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _, _) child } override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) child }
override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats c } override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats c }
@ -146,7 +141,7 @@ private[akka] object ChildrenContainer {
case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason) case class TerminatingChildrenContainer(c: TreeMap[String, ChildStats], toDie: Set[ActorRef], reason: SuspendReason)
extends ChildrenContainer { extends ChildrenContainer {
override def add(child: ActorRef): ChildrenContainer = copy(c.updated(child.path.name, ChildRestartStats(child))) override def add(name: String, stats: ChildRestartStats): ChildrenContainer = copy(c.updated(name, stats))
override def remove(child: ActorRef): ChildrenContainer = { override def remove(child: ActorRef): ChildrenContainer = {
val t = toDie - child val t = toDie - child
@ -157,17 +152,14 @@ private[akka] object ChildrenContainer {
else copy(c - child.path.name, t) else copy(c - child.path.name, t)
} }
override def getByName(name: String): Option[ChildRestartStats] = c.get(name) match { override def getByName(name: String): Option[ChildStats] = c.get(name)
case s @ Some(_: ChildRestartStats) s.asInstanceOf[Option[ChildRestartStats]]
case _ None
}
override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match { override def getByRef(actor: ActorRef): Option[ChildRestartStats] = c.get(actor.path.name) match {
case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]] case c @ Some(crs: ChildRestartStats) if (crs.child == actor) c.asInstanceOf[Option[ChildRestartStats]]
case _ None case _ None
} }
override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _, _) child } override def children: Iterable[ActorRef] = c.values.view.collect { case ChildRestartStats(child, _, _) child }
override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats c } override def stats: Iterable[ChildRestartStats] = c.values.view.collect { case c: ChildRestartStats c }

View file

@ -7,7 +7,9 @@ package akka.actor.cell
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.actor.{ ActorRef, ActorCell } import akka.actor.{ ActorRef, ActorCell }
import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create } import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create }
import akka.event.Logging.Error
import akka.util.Unsafe import akka.util.Unsafe
import scala.util.control.NonFatal
private[akka] trait Dispatch { this: ActorCell private[akka] trait Dispatch { this: ActorCell
@ -63,20 +65,49 @@ private[akka] trait Dispatch { this: ActorCell ⇒
} }
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def suspend(): Unit = dispatcher.systemDispatch(this, Suspend()) final def suspend(): Unit =
try dispatcher.systemDispatch(this, Suspend())
catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def resume(causedByFailure: Throwable): Unit = dispatcher.systemDispatch(this, Resume(causedByFailure)) final def resume(causedByFailure: Throwable): Unit =
try dispatcher.systemDispatch(this, Resume(causedByFailure))
catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def restart(cause: Throwable): Unit = dispatcher.systemDispatch(this, Recreate(cause)) final def restart(cause: Throwable): Unit =
try dispatcher.systemDispatch(this, Recreate(cause))
catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
final def stop(): Unit = dispatcher.systemDispatch(this, Terminate()) final def stop(): Unit =
try dispatcher.systemDispatch(this, Terminate())
catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
def tell(message: Any, sender: ActorRef): Unit = def tell(message: Any, sender: ActorRef): Unit =
dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system)) try dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender, system))
catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
override def sendSystemMessage(message: SystemMessage): Unit = dispatcher.systemDispatch(this, message) override def sendSystemMessage(message: SystemMessage): Unit =
try dispatcher.systemDispatch(this, message)
catch {
case e @ (_: InterruptedException | NonFatal(_))
system.eventStream.publish(Error(e, self.path.toString, clazz(actor), "swallowing exception during message send"))
}
} }

View file

@ -204,7 +204,11 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
*/ */
case Some(stats) if stats.uid == uid case Some(stats) if stats.uid == uid
if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause
case _ publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) case Some(stats)
publish(Debug(self.path.toString, clazz(actor),
"dropping Failed(" + cause + ") from old child " + child + " (uid=" + stats.uid + " != " + uid + ")"))
case None
publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
} }
final protected def handleChildTerminated(child: ActorRef): SystemMessage = { final protected def handleChildTerminated(child: ActorRef): SystemMessage = {

View file

@ -7,8 +7,10 @@ package akka.remote
import akka.actor._ import akka.actor._
import akka.dispatch._ import akka.dispatch._
import akka.event.{ Logging, LoggingAdapter, EventStream } import akka.event.{ Logging, LoggingAdapter, EventStream }
import akka.event.Logging.Error
import akka.serialization.{ Serialization, SerializationExtension } import akka.serialization.{ Serialization, SerializationExtension }
import scala.concurrent.Future import scala.concurrent.Future
import scala.util.control.NonFatal
/** /**
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
@ -24,7 +26,7 @@ class RemoteActorRefProvider(
val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess) val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, deployer) private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, dynamicAccess, deployer)
@volatile @volatile
private var _log = local.log private var _log = local.log
@ -229,9 +231,19 @@ private[akka] class RemoteActorRef private[akka] (
def isTerminated: Boolean = !running def isTerminated: Boolean = !running
def sendSystemMessage(message: SystemMessage): Unit = remote.send(message, None, this) def sendSystemMessage(message: SystemMessage): Unit =
try remote.send(message, None, this)
catch {
case e @ (_: InterruptedException | NonFatal(_))
remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send"))
}
override def !(message: Any)(implicit sender: ActorRef = null): Unit = remote.send(message, Option(sender), this) override def !(message: Any)(implicit sender: ActorRef = null): Unit =
try remote.send(message, Option(sender), this)
catch {
case e @ (_: InterruptedException | NonFatal(_))
remote.system.eventStream.publish(Error(e, path.toString, classOf[RemoteActorRef], "swallowing exception during message send"))
}
def suspend(): Unit = sendSystemMessage(Suspend()) def suspend(): Unit = sendSystemMessage(Suspend())

View file

@ -25,7 +25,17 @@ class TestActorRef[T <: Actor](
_props: Props, _props: Props,
_supervisor: InternalActorRef, _supervisor: InternalActorRef,
name: String) name: String)
extends LocalActorRef( extends {
private val disregard = _supervisor match {
case l: LocalActorRef l.underlying.reserveChild(name)
case r: RepointableActorRef r.underlying match {
case u: UnstartedCell throw new IllegalStateException("cannot attach a TestActor to an unstarted top-level actor, ensure that it is started by sending a message and observing the reply")
case c: ActorCell c.reserveChild(name)
case o _system.log.error("trying to attach child {} to unknown type of supervisor cell {}, this is not going to end well", name, o.getClass)
}
case s _system.log.error("trying to attach child {} to unknown type of supervisor {}, this is not going to end well", name, s.getClass)
}
} with LocalActorRef(
_system, _system,
_props.withDispatcher( _props.withDispatcher(
if (_props.dispatcher == Dispatchers.DefaultDispatcherId) CallingThreadDispatcher.Id if (_props.dispatcher == Dispatchers.DefaultDispatcherId) CallingThreadDispatcher.Id
@ -119,8 +129,9 @@ object TestActorRef {
def apply[T <: Actor](props: Props, name: String)(implicit system: ActorSystem): TestActorRef[T] = def apply[T <: Actor](props: Props, name: String)(implicit system: ActorSystem): TestActorRef[T] =
apply[T](props, system.asInstanceOf[ActorSystemImpl].guardian, name) apply[T](props, system.asInstanceOf[ActorSystemImpl].guardian, name)
def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit system: ActorSystem): TestActorRef[T] = {
new TestActorRef(system.asInstanceOf[ActorSystemImpl], system.dispatchers.prerequisites, props, supervisor.asInstanceOf[InternalActorRef], name) new TestActorRef(system.asInstanceOf[ActorSystemImpl], system.dispatchers.prerequisites, props, supervisor.asInstanceOf[InternalActorRef], name)
}
def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName) def apply[T <: Actor](implicit t: ClassTag[T], system: ActorSystem): TestActorRef[T] = apply[T](randomName)

View file

@ -14,6 +14,7 @@ import java.lang.{ Iterable ⇒ JIterable }
import scala.collection.JavaConverters import scala.collection.JavaConverters
import scala.concurrent.util.Duration import scala.concurrent.util.Duration
import scala.reflect.ClassTag import scala.reflect.ClassTag
import akka.actor.NoSerializationVerificationNeeded
/** /**
* Implementation helpers of the EventFilter facilities: send `Mute` * Implementation helpers of the EventFilter facilities: send `Mute`
@ -39,7 +40,7 @@ object TestEvent {
object Mute { object Mute {
def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.toSeq) def apply(filter: EventFilter, filters: EventFilter*): Mute = new Mute(filter +: filters.toSeq)
} }
case class Mute(filters: Seq[EventFilter]) extends TestEvent { case class Mute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded {
/** /**
* Java API * Java API
*/ */
@ -48,7 +49,7 @@ object TestEvent {
object UnMute { object UnMute {
def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.toSeq) def apply(filter: EventFilter, filters: EventFilter*): UnMute = new UnMute(filter +: filters.toSeq)
} }
case class UnMute(filters: Seq[EventFilter]) extends TestEvent { case class UnMute(filters: Seq[EventFilter]) extends TestEvent with NoSerializationVerificationNeeded {
/** /**
* Java API * Java API
*/ */