make guardian supervisorStrategy configurable, see #2376
This commit is contained in:
parent
a9b1158bde
commit
f05447408c
5 changed files with 64 additions and 74 deletions
|
|
@ -46,7 +46,14 @@ akka {
|
|||
|
||||
actor {
|
||||
|
||||
# FQCN of the ActorRefProvider to be used; the below is the built-in default,
|
||||
# another one is akka.remote.RemoteActorRefProvider in the akka-remote bundle.
|
||||
provider = "akka.actor.LocalActorRefProvider"
|
||||
|
||||
# The guardian "/user" will use this subclass of akka.actor.SupervisorStrategyConfigurator
|
||||
# to obtain its supervisorstrategy. Besides the default there is
|
||||
# akka.actor.StoppingSupervisorStrategy
|
||||
guardian-supervisor-strategy = "akka.actor.DefaultSupervisorStrategy"
|
||||
|
||||
# Timeout for ActorSystem.actorOf
|
||||
creation-timeout = 20s
|
||||
|
|
|
|||
|
|
@ -294,16 +294,6 @@ trait ActorRefFactory {
|
|||
def stop(actor: ActorRef): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* Internal Akka use only, used in implementation of system.actorOf.
|
||||
*/
|
||||
private[akka] case class CreateChild(props: Props, name: String)
|
||||
|
||||
/**
|
||||
* Internal Akka use only, used in implementation of system.actorOf.
|
||||
*/
|
||||
private[akka] case class CreateRandomNameChild(props: Props)
|
||||
|
||||
/**
|
||||
* Internal Akka use only, used in implementation of system.stop(child).
|
||||
*/
|
||||
|
|
@ -317,6 +307,7 @@ class LocalActorRefProvider(
|
|||
override val settings: ActorSystem.Settings,
|
||||
val eventStream: EventStream,
|
||||
override val scheduler: Scheduler,
|
||||
val dynamicAccess: DynamicAccess,
|
||||
override val deployer: Deployer) extends ActorRefProvider {
|
||||
|
||||
// this is the constructor needed for reflectively instantiating the provider
|
||||
|
|
@ -329,6 +320,7 @@ class LocalActorRefProvider(
|
|||
settings,
|
||||
eventStream,
|
||||
scheduler,
|
||||
dynamicAccess,
|
||||
new Deployer(settings, dynamicAccess))
|
||||
|
||||
override val rootPath: ActorPath = RootActorPath(Address("akka", _systemName))
|
||||
|
|
@ -380,65 +372,12 @@ class LocalActorRefProvider(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Overridable supervision strategy to be used by the “/user” guardian.
|
||||
*/
|
||||
protected def guardianSupervisionStrategy: SupervisorStrategy = {
|
||||
import akka.actor.SupervisorStrategy._
|
||||
OneForOneStrategy() {
|
||||
case _: ActorKilledException ⇒ Stop
|
||||
case _: ActorInitializationException ⇒ Stop
|
||||
case _: Exception ⇒ Restart
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Guardians can be asked by ActorSystem to create children, i.e. top-level
|
||||
* actors. Therefore these need to answer to these requests, forwarding any
|
||||
* exceptions which might have occurred.
|
||||
*/
|
||||
private class Guardian extends Actor {
|
||||
|
||||
override val supervisorStrategy: SupervisorStrategy = guardianSupervisionStrategy
|
||||
private class Guardian(override val supervisorStrategy: SupervisorStrategy) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case Terminated(_) ⇒ context.stop(self)
|
||||
case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case NonFatal(e) ⇒ Status.Failure(e) })
|
||||
case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case NonFatal(e) ⇒ Status.Failure(e) })
|
||||
case StopChild(child) ⇒ context.stop(child)
|
||||
case m ⇒ deadLetters ! DeadLetter(m, sender, self)
|
||||
}
|
||||
|
||||
// guardian MUST NOT lose its children during restart
|
||||
override def preRestart(cause: Throwable, msg: Option[Any]) {}
|
||||
}
|
||||
|
||||
/**
|
||||
* Overridable supervision strategy to be used by the “/system” guardian.
|
||||
*/
|
||||
protected def systemGuardianSupervisionStrategy: SupervisorStrategy = {
|
||||
import akka.actor.SupervisorStrategy._
|
||||
OneForOneStrategy() {
|
||||
case _: ActorKilledException | _: ActorInitializationException ⇒ Stop
|
||||
case _: Exception ⇒ Restart
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Guardians can be asked by ActorSystem to create children, i.e. top-level
|
||||
* actors. Therefore these need to answer to these requests, forwarding any
|
||||
* exceptions which might have occurred.
|
||||
*/
|
||||
private class SystemGuardian extends Actor {
|
||||
|
||||
override val supervisorStrategy: SupervisorStrategy = systemGuardianSupervisionStrategy
|
||||
|
||||
def receive = {
|
||||
case Terminated(_) ⇒ eventStream.stopDefaultLoggers(); context.stop(self)
|
||||
case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case NonFatal(e) ⇒ Status.Failure(e) })
|
||||
case CreateRandomNameChild(child) ⇒ sender ! (try context.actorOf(child) catch { case NonFatal(e) ⇒ Status.Failure(e) })
|
||||
case StopChild(child) ⇒ context.stop(child); sender ! "ok"
|
||||
case m ⇒ deadLetters ! DeadLetter(m, sender, self)
|
||||
case Terminated(_) ⇒ if (context.self.path.name == "system") eventStream.stopDefaultLoggers(); context.stop(self)
|
||||
case StopChild(child) ⇒ context.stop(child)
|
||||
case m ⇒ deadLetters ! DeadLetter(m, sender, self)
|
||||
}
|
||||
|
||||
// guardian MUST NOT lose its children during restart
|
||||
|
|
@ -472,10 +411,26 @@ class LocalActorRefProvider(
|
|||
*/
|
||||
def registerExtraNames(_extras: Map[String, InternalActorRef]): Unit = extraNames ++= _extras
|
||||
|
||||
private val guardianProps = Props(new Guardian)
|
||||
private lazy val guardianSupervisorStrategyConfigurator =
|
||||
dynamicAccess.createInstanceFor[SupervisorStrategyConfigurator](settings.SupervisorStrategyClass, Seq()).fold(throw _, x ⇒ x)
|
||||
|
||||
/**
|
||||
* Overridable supervision strategy to be used by the “/user” guardian.
|
||||
*/
|
||||
protected def rootGuardianStrategy: SupervisorStrategy = SupervisorStrategy.stoppingStrategy
|
||||
|
||||
/**
|
||||
* Overridable supervision strategy to be used by the “/user” guardian.
|
||||
*/
|
||||
protected def guardianStrategy: SupervisorStrategy = guardianSupervisorStrategyConfigurator.create()
|
||||
|
||||
/**
|
||||
* Overridable supervision strategy to be used by the “/user” guardian.
|
||||
*/
|
||||
protected def systemGuardianStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy
|
||||
|
||||
lazy val rootGuardian: InternalActorRef =
|
||||
new LocalActorRef(system, guardianProps, theOneWhoWalksTheBubblesOfSpaceTime, rootPath) {
|
||||
new LocalActorRef(system, Props(new Guardian(rootGuardianStrategy)), theOneWhoWalksTheBubblesOfSpaceTime, rootPath) {
|
||||
override def getParent: InternalActorRef = this
|
||||
override def getSingleChild(name: String): InternalActorRef = name match {
|
||||
case "temp" ⇒ tempContainer
|
||||
|
|
@ -483,10 +438,11 @@ class LocalActorRefProvider(
|
|||
}
|
||||
}
|
||||
|
||||
lazy val guardian: LocalActorRef = new LocalActorRef(system, guardianProps, rootGuardian, rootPath / "user")
|
||||
lazy val guardian: LocalActorRef =
|
||||
new LocalActorRef(system, Props(new Guardian(guardianStrategy)), rootGuardian, rootPath / "user")
|
||||
|
||||
lazy val systemGuardian: LocalActorRef =
|
||||
new LocalActorRef(system, guardianProps.withCreator(new SystemGuardian), rootGuardian, rootPath / "system")
|
||||
new LocalActorRef(system, Props(new Guardian(systemGuardianStrategy)), rootGuardian, rootPath / "system")
|
||||
|
||||
lazy val tempContainer = new VirtualPathContainer(system.provider, tempNode, rootGuardian, log)
|
||||
|
||||
|
|
@ -559,4 +515,3 @@ class LocalActorRefProvider(
|
|||
|
||||
def getExternalAddressFor(addr: Address): Option[Address] = if (addr == rootPath.address) Some(addr) else None
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -133,6 +133,7 @@ object ActorSystem {
|
|||
|
||||
final val ConfigVersion = getString("akka.version")
|
||||
final val ProviderClass = getString("akka.actor.provider")
|
||||
final val SupervisorStrategyClass = getString("akka.actor.guardian-supervisor-strategy")
|
||||
final val CreationTimeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS))
|
||||
|
||||
final val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
|
||||
|
|
|
|||
|
|
@ -61,6 +61,23 @@ case class ChildRestartStats(child: ActorRef, var uid: Int = 0, var maxNrOfRetri
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implement this interface in order to configure the supervisorStrategy for
|
||||
* the top-level guardian actor (`/user`). An instance of this class must be
|
||||
* instantiable using a no-arg constructor.
|
||||
*/
|
||||
trait SupervisorStrategyConfigurator {
|
||||
def create(): SupervisorStrategy
|
||||
}
|
||||
|
||||
class DefaultSupervisorStrategy extends SupervisorStrategyConfigurator {
|
||||
override def create(): SupervisorStrategy = SupervisorStrategy.defaultStrategy
|
||||
}
|
||||
|
||||
class StoppingSupervisorStrategy extends SupervisorStrategyConfigurator {
|
||||
override def create(): SupervisorStrategy = SupervisorStrategy.stoppingStrategy
|
||||
}
|
||||
|
||||
trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type ⇒
|
||||
|
||||
/**
|
||||
|
|
@ -133,11 +150,21 @@ object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
|
|||
case _: ActorInitializationException ⇒ Stop
|
||||
case _: ActorKilledException ⇒ Stop
|
||||
case _: Exception ⇒ Restart
|
||||
case _ ⇒ Escalate
|
||||
}
|
||||
OneForOneStrategy()(defaultDecider)
|
||||
}
|
||||
|
||||
/**
|
||||
* This strategy resembles Erlang in that failing children are always
|
||||
* terminated.
|
||||
*/
|
||||
final val stoppingStrategy: SupervisorStrategy = {
|
||||
def stoppingDecider: Decider = {
|
||||
case _: Exception ⇒ Stop
|
||||
}
|
||||
OneForOneStrategy()(stoppingDecider)
|
||||
}
|
||||
|
||||
/**
|
||||
* Implicit conversion from `Seq` of Throwables to a `Decider`.
|
||||
* This maps the given Throwables to restarts, otherwise escalates.
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ private[akka] class RemoteActorRefProvider(
|
|||
|
||||
val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess)
|
||||
|
||||
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, deployer)
|
||||
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, dynamicAccess, deployer)
|
||||
|
||||
@volatile
|
||||
private var _log = local.log
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue