parent
273192f2a9
commit
e21f20b592
5 changed files with 85 additions and 7 deletions
|
|
@ -1292,6 +1292,41 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
|
|||
}
|
||||
}
|
||||
|
||||
"log at critical level after specified restartWithBackoff attempts" in {
|
||||
val probe = TestProbe[Event]("evt")
|
||||
val behv = Behaviors
|
||||
.supervise(targetBehavior(probe.ref))
|
||||
.onFailure[Exc1](
|
||||
SupervisorStrategy
|
||||
.restartWithBackoff(1.millis, 1.millis, 0.0)
|
||||
.withResetBackoffAfter(2.seconds)
|
||||
.withLogLevel(Level.INFO)
|
||||
.withCriticalLogLevel(Level.ERROR, 3))
|
||||
val ref = spawn(behv)
|
||||
LoggingTestKit.info("exc-1").withOccurrences(3).expect {
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
LoggingTestKit.error("exc-1").withOccurrences(2).expect {
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
|
||||
// reset backoff 2 seconds
|
||||
probe.expectNoMessage(2100.millis)
|
||||
LoggingTestKit.info("exc-1").expect {
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"handle exceptions from different message type" in {
|
||||
val probe = TestProbe[Event]("evt")
|
||||
|
||||
|
|
|
|||
|
|
@ -484,7 +484,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
|
|||
ref ! "stash"
|
||||
LoggingTestKit
|
||||
.error[TestException]
|
||||
.withMessageContains("Supervisor RestartSupervisor saw failure: unstash-fail")
|
||||
.withMessageRegex("Supervisor RestartSupervisor saw failure.*: unstash-fail")
|
||||
.expect {
|
||||
ref ! "unstash"
|
||||
// when childLatch is defined this be stashed in the internal stash of the RestartSupervisor
|
||||
|
|
|
|||
|
|
@ -0,0 +1,4 @@
|
|||
# #30445 Backoff supervision with critical log level
|
||||
ProblemFilters.exclude[Problem]("akka.actor.typed.SupervisorStrategy#Backoff*")
|
||||
ProblemFilters.exclude[MissingTypesProblem]("akka.actor.typed.SupervisorStrategy$Backoff$")
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.BackoffSupervisorStrategy.withCriticalLogLevel")
|
||||
|
|
@ -9,6 +9,7 @@ import scala.concurrent.duration.FiniteDuration
|
|||
|
||||
import org.slf4j.event.Level
|
||||
|
||||
import akka.annotation.DoNotInherit
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.JavaDurationConverters._
|
||||
|
||||
|
|
@ -180,6 +181,8 @@ object SupervisorStrategy {
|
|||
resetBackoffAfter: FiniteDuration,
|
||||
loggingEnabled: Boolean = true,
|
||||
logLevel: Level = Level.ERROR,
|
||||
criticalLogLevel: Level = Level.ERROR,
|
||||
criticalLogLevelAfter: Int = Int.MaxValue,
|
||||
maxRestarts: Int = -1,
|
||||
stopChildren: Boolean = true,
|
||||
stashCapacity: Int = -1)
|
||||
|
|
@ -207,11 +210,18 @@ object SupervisorStrategy {
|
|||
copy(loggingEnabled = enabled)
|
||||
|
||||
override def withLogLevel(level: Level): BackoffSupervisorStrategy =
|
||||
copy(logLevel = logLevel)
|
||||
copy(logLevel = level)
|
||||
|
||||
override def withCriticalLogLevel(criticalLevel: Level, afterErrors: Int): BackoffSupervisorStrategy =
|
||||
copy(criticalLogLevel = criticalLevel, criticalLogLevelAfter = afterErrors)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Not for user extension
|
||||
*/
|
||||
@DoNotInherit
|
||||
sealed abstract class SupervisorStrategy {
|
||||
def loggingEnabled: Boolean
|
||||
def logLevel: Level
|
||||
|
|
@ -222,6 +232,10 @@ sealed abstract class SupervisorStrategy {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Not for user extension
|
||||
*/
|
||||
@DoNotInherit
|
||||
sealed abstract class RestartSupervisorStrategy extends SupervisorStrategy {
|
||||
|
||||
/**
|
||||
|
|
@ -277,6 +291,10 @@ sealed abstract class RestartSupervisorStrategy extends SupervisorStrategy {
|
|||
|
||||
}
|
||||
|
||||
/**
|
||||
* Not for user extension
|
||||
*/
|
||||
@DoNotInherit
|
||||
sealed abstract class BackoffSupervisorStrategy extends SupervisorStrategy {
|
||||
def resetBackoffAfter: FiniteDuration
|
||||
|
||||
|
|
@ -323,4 +341,14 @@ sealed abstract class BackoffSupervisorStrategy extends SupervisorStrategy {
|
|||
|
||||
override def withLogLevel(level: Level): BackoffSupervisorStrategy
|
||||
|
||||
/**
|
||||
* Possibility to use another log level after a given number of errors.
|
||||
* The initial errors are logged at the level defined with [[BackoffSupervisorStrategy.withLogLevel]].
|
||||
* For example, the first 3 errors can be logged at INFO level and thereafter at ERROR level.
|
||||
*
|
||||
* The counter (and log level) is reset after the [[BackoffSupervisorStrategy.withResetBackoffAfter]]
|
||||
* duration.
|
||||
*/
|
||||
def withCriticalLogLevel(criticalLevel: Level, afterErrors: Int): BackoffSupervisorStrategy
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -84,12 +84,20 @@ private abstract class AbstractSupervisor[I, Thr <: Throwable](strategy: Supervi
|
|||
} catch handleSignalException(ctx, target)
|
||||
}
|
||||
|
||||
def log(ctx: TypedActorContext[_], t: Throwable): Unit = {
|
||||
def log(ctx: TypedActorContext[_], t: Throwable): Unit =
|
||||
log(ctx, t, errorCount = -1)
|
||||
|
||||
def log(ctx: TypedActorContext[_], t: Throwable, errorCount: Int): Unit = {
|
||||
if (strategy.loggingEnabled) {
|
||||
val unwrapped = UnstashException.unwrap(t)
|
||||
val logMessage = s"Supervisor $this saw failure: ${unwrapped.getMessage}"
|
||||
val errorCountStr = if (errorCount >= 0) s" [$errorCount]" else ""
|
||||
val logMessage = s"Supervisor $this saw failure$errorCountStr: ${unwrapped.getMessage}"
|
||||
val logger = ctx.asScala.log
|
||||
strategy.logLevel match {
|
||||
val logLevel = strategy match {
|
||||
case b: Backoff => if (errorCount > b.criticalLogLevelAfter) b.criticalLogLevel else strategy.logLevel
|
||||
case _ => strategy.logLevel
|
||||
}
|
||||
logLevel match {
|
||||
case Level.ERROR => logger.error(logMessage, unwrapped)
|
||||
case Level.WARN => logger.warn(logMessage, unwrapped)
|
||||
case Level.INFO => logger.info(logMessage, unwrapped)
|
||||
|
|
@ -313,7 +321,7 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
|||
strategy match {
|
||||
case _: Restart => throw t
|
||||
case _: Backoff =>
|
||||
log(ctx, t)
|
||||
log(ctx, t, restartCount + 1)
|
||||
BehaviorImpl.failed(t)
|
||||
}
|
||||
|
||||
|
|
@ -328,7 +336,10 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
|
|||
}
|
||||
|
||||
private def prepareRestart(ctx: TypedActorContext[Any], reason: Throwable): Behavior[T] = {
|
||||
log(ctx, reason)
|
||||
strategy match {
|
||||
case _: Backoff => log(ctx, reason, restartCount + 1)
|
||||
case _: Restart => log(ctx, reason)
|
||||
}
|
||||
|
||||
val currentRestartCount = restartCount
|
||||
updateRestartCount()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue