Extends supervision strategy to define custom log levels #30267 (#30268)

* Adds logLevel in Directive
* Extends supervision logging documentation
* Adds sealed and marks as internal
This commit is contained in:
Nicolas Vollmar 2021-05-31 13:56:42 +02:00 committed by GitHub
parent 3986b1c3bb
commit 622d8af0ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 78 additions and 29 deletions

View file

@ -0,0 +1,2 @@
# sealed trait
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.SupervisorStrategy#Directive.logLevel")

View file

@ -4,24 +4,22 @@
package akka.actor
import java.lang.{ Iterable => JIterable }
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.TimeUnit
import scala.collection.immutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import language.implicitConversions
import akka.event.Logging.Error
import akka.event.Logging.LogEvent
import akka.event.Logging.Warning
import akka.annotation.InternalApi
import akka.event.Logging
import akka.event.Logging.{ Error, LogEvent, LogLevel }
import akka.japi.Util.immutableSeq
import akka.util.JavaDurationConverters._
import akka.util.ccompat._
import java.lang.reflect.InvocationTargetException
import java.lang.{ Iterable => JIterable }
import java.util.concurrent.TimeUnit
import scala.collection.immutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration.Duration
import scala.language.implicitConversions
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
@ -49,7 +47,7 @@ final case class ChildRestartStats(
def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean =
retriesWindow match {
case (Some(retries), _) if retries < 1 => false
case (Some(retries), None) => { maxNrOfRetriesCount += 1; maxNrOfRetriesCount <= retries }
case (Some(retries), None) => maxNrOfRetriesCount += 1; maxNrOfRetriesCount <= retries
case (x, Some(window)) => retriesInWindowOkay(if (x.isDefined) x.get else 1, window)
case (None, _) => true
}
@ -106,53 +104,97 @@ trait SupervisorStrategyLowPriorityImplicits { this: SupervisorStrategy.type =>
}
object SupervisorStrategy extends SupervisorStrategyLowPriorityImplicits {
sealed trait Directive
sealed trait Directive {
/** INTERNAL API */
@InternalApi
private[akka] def logLevel: LogLevel
}
/**
* Resumes message processing for the failed Actor
*/
case object Resume extends Directive
case object Resume extends Resume(Logging.WarningLevel)
/**
* INTERNAL API
*/
@InternalApi
private[akka] sealed class Resume(private[akka] val logLevel: LogLevel) extends Directive
/**
* Discards the old Actor instance and replaces it with a new,
* then resumes message processing.
*/
case object Restart extends Directive
case object Restart extends Restart(Logging.ErrorLevel)
/**
* INTERNAL API
*/
@InternalApi
private[akka] sealed class Restart(private[akka] val logLevel: LogLevel) extends Directive
/**
* Stops the Actor
*/
case object Stop extends Directive
case object Stop extends Stop(Logging.ErrorLevel)
@InternalApi
private[akka] sealed class Stop(private[akka] val logLevel: LogLevel) extends Directive
/**
* Escalates the failure to the supervisor of the supervisor,
* by rethrowing the cause of the failure, i.e. the supervisor fails with
* the same exception as the child.
*/
case object Escalate extends Directive
case object Escalate extends Directive {
override private[akka] def logLevel = throw new IllegalStateException("Escalate is not logged")
}
/**
* Java API: Returning this directive resumes message processing for the failed Actor
*/
def resume = Resume
def resume = Resume // switch to return type `Directive` on next binary incompatible release
/**
* Returning this directive resumes message processing for the failed Actor.
*
* @param logLevel Log level which will be used to log the failure
*/
def resume(logLevel: LogLevel): Directive = new Resume(logLevel)
/**
* Java API: Returning this directive discards the old Actor instance and replaces it with a new,
* then resumes message processing.
*/
def restart = Restart
def restart = Restart // switch to return type `Directive` on next binary incompatible release
/**
* Returning this directive discards the old Actor instance and replaces it with a new,
* then resumes message processing.
*
* @param logLevel Log level which will be used to log the failure
*/
def restart(logLevel: LogLevel): Directive = new Restart(logLevel)
/**
* Java API: Returning this directive stops the Actor
*/
def stop = Stop
def stop = Stop // switch to return type `Directive` on next binary incompatible release
/**
* Returning this directive stops the Actor
*
* @param logLevel Log level which will be used to log the failure
*/
def stop(logLevel: LogLevel): Directive = new Stop(logLevel)
/**
* Java API: Returning this directive escalates the failure to the supervisor of the supervisor,
* by rethrowing the cause of the failure, i.e. the supervisor fails with
* the same exception as the child.
*/
def escalate = Escalate
def escalate = Escalate // switch to return type `Directive` on next binary incompatible release
/**
* When supervisorStrategy is not specified for an actor this
@ -317,15 +359,15 @@ abstract class SupervisorStrategy {
children: Iterable[ChildRestartStats]): Boolean = {
val directive = decider.applyOrElse(cause, escalateDefault)
directive match {
case Resume =>
case _: Resume =>
logFailure(context, child, cause, directive)
resumeChild(child, cause)
true
case Restart =>
case _: Restart =>
logFailure(context, child, cause, directive)
processFailure(context, true, child, cause, stats, children)
true
case Stop =>
case _: Stop =>
logFailure(context, child, cause, directive)
processFailure(context, false, child, cause, stats, children)
true
@ -359,9 +401,12 @@ abstract class SupervisorStrategy {
case e => e.getMessage
}
decision match {
case Resume => publish(context, Warning(child.path.toString, getClass, logMessage))
case Escalate => // don't log here
case _ => publish(context, Error(cause, child.path.toString, getClass, logMessage))
case d =>
if (d.logLevel == Logging.ErrorLevel)
publish(context, Error(cause, child.path.toString, getClass, logMessage))
else
publish(context, LogEvent(d.logLevel, child.path.toString, getClass, logMessage))
}
}

View file

@ -119,6 +119,8 @@ By default the `SupervisorStrategy` logs failures unless they are escalated.
Escalated failures are supposed to be handled, and potentially logged, at a level
higher in the hierarchy.
Log levels can be controlled by providing a `Decider` and using the appropriate decision methods accepting a `LogLevel` on `SupervisorStrategy`.
You can mute the default logging of a `SupervisorStrategy` by setting
`loggingEnabled` to `false` when instantiating it. Customized logging
can be done inside the `Decider`. Note that the reference to the currently