Log settings for restart stream stages, #30445 (#30529)

* to be able to specify log level
* and possibility to use critical log level after a
  given number of errors
This commit is contained in:
Patrik Nordwall 2021-08-18 16:18:58 +02:00 committed by GitHub
parent 0305a5f05a
commit 9f83f437f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 147 additions and 34 deletions

View file

@ -5,12 +5,15 @@
package akka.stream.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{ Await, Promise }
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import akka.Done
import akka.NotUsed
import akka.event.Logging
import akka.stream.Attributes.Name
import akka.stream.scaladsl.AttributesSpec.{
whateverAttribute,
@ -29,9 +32,12 @@ import akka.stream.testkit.scaladsl.StreamTestKit._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.DefaultTimeout
import akka.testkit.EventFilter
import akka.testkit.TestDuration
class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "10s")) with DefaultTimeout {
class RestartSpec
extends StreamSpec(Map("akka.test.single-expect-default" -> "10s", "akka.loglevel" -> "INFO"))
with DefaultTimeout {
import system.dispatcher
@ -40,8 +46,11 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
private val minBackoff = 1.second.dilated
private val maxBackoff = 3.seconds.dilated
private val shortRestartSettings = RestartSettings(shortMinBackoff, shortMaxBackoff, 0)
private val restartSettings = RestartSettings(minBackoff, maxBackoff, 0)
private val logSettings = RestartSettings.LogSettings(Logging.InfoLevel).withCriticalLogLevel(Logging.WarningLevel, 2)
private val shortRestartSettings =
RestartSettings(shortMinBackoff, shortMaxBackoff, 0).withLogSettings(logSettings)
private val restartSettings =
RestartSettings(minBackoff, maxBackoff, 0).withLogSettings(logSettings)
"A restart with backoff source" should {
"run normally" in assertAllStagesStopped {
@ -73,11 +82,13 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
}
.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
EventFilter.info(start = "Restarting stream due to completion", occurrences = 2).intercept {
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
}
created.get() should ===(3)
@ -96,13 +107,21 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
}
.runWith(TestSink.probe)
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
EventFilter.info(start = "Restarting stream due to failure", occurrences = 2).intercept {
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
probe.requestNext("b")
probe.requestNext("a")
}
created.get() should ===(3)
// after 2, use critical level
EventFilter.warning(start = "Restarting stream due to failure [3]", occurrences = 1).intercept {
probe.requestNext("b")
probe.requestNext("a")
}
created.get() should ===(4)
probe.cancel()
}
@ -622,7 +641,9 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
.viaMat(
RestartFlowFactory(
onlyOnFailures,
RestartSettings(minBackoff, maxBackoff, 0).withMaxRestarts(maxRestarts, minBackoff)) { () =>
RestartSettings(minBackoff, maxBackoff, 0)
.withMaxRestarts(maxRestarts, minBackoff)
.withLogSettings(logSettings)) { () =>
created.incrementAndGet()
Flow.fromSinkAndSource(
Flow[String]
@ -705,12 +726,14 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
flowOutProbe.sendNext("b")
sink.requestNext("b")
sink.request(1)
flowOutProbe.sendNext("complete")
EventFilter.info(start = "Restarting stream due to completion", occurrences = 1).intercept {
sink.request(1)
flowOutProbe.sendNext("complete")
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.request(2)
Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain.only("in complete", "out complete")
// This will complete the flow in probe and cancel the flow out probe
flowInProbe.request(2)
Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain.only("in complete", "out complete")
}
// and it should restart
source.sendNext("c")
@ -729,11 +752,13 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
flowOutProbe.sendNext("b")
sink.requestNext("b")
sink.request(1)
flowOutProbe.sendNext("error")
EventFilter.info(start = "Restarting stream due to failure", occurrences = 1).intercept {
sink.request(1)
flowOutProbe.sendNext("error")
// This should complete the in probe
flowInProbe.requestNext("in complete")
// This should complete the in probe
flowInProbe.requestNext("in complete")
}
// and it should restart
source.sendNext("c")
@ -909,7 +934,8 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1
val restartOnFailures =
RestartFlow
.onFailuresWithBackoff(RestartSettings(1.second, 2.seconds, 0.2).withMaxRestarts(2, 1.second))(() => {
.onFailuresWithBackoff(
RestartSettings(1.second, 2.seconds, 0.2).withMaxRestarts(2, 1.second).withLogSettings(logSettings))(() => {
flowCreations.incrementAndGet()
failsSomeTimes
})

View file

@ -0,0 +1,3 @@
# #30445 Log settings for restart stream stages
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.RestartSettings.this")

View file

@ -6,6 +6,8 @@ package akka.stream
import scala.concurrent.duration.FiniteDuration
import akka.event.Logging
import akka.event.Logging.LogLevel
import akka.util.JavaDurationConverters._
final class RestartSettings private (
@ -13,7 +15,8 @@ final class RestartSettings private (
val maxBackoff: FiniteDuration,
val randomFactor: Double,
val maxRestarts: Int,
val maxRestartsWithin: FiniteDuration) {
val maxRestartsWithin: FiniteDuration,
val logSettings: RestartSettings.LogSettings) {
/** Scala API: minimum (initial) duration until the child actor will started again, if it is terminated */
def withMinBackoff(value: FiniteDuration): RestartSettings = copy(minBackoff = value)
@ -41,6 +44,9 @@ final class RestartSettings private (
def withMaxRestarts(count: Int, within: java.time.Duration): RestartSettings =
copy(maxRestarts = count, maxRestartsWithin = within.asScala)
def withLogSettings(newLogSettings: RestartSettings.LogSettings): RestartSettings =
copy(logSettings = newLogSettings)
override def toString: String =
"RestartSettings(" +
s"minBackoff=$minBackoff," +
@ -54,8 +60,9 @@ final class RestartSettings private (
maxBackoff: FiniteDuration = maxBackoff,
randomFactor: Double = randomFactor,
maxRestarts: Int = maxRestarts,
maxRestartsWithin: FiniteDuration = maxRestartsWithin): RestartSettings =
new RestartSettings(minBackoff, maxBackoff, randomFactor, maxRestarts, maxRestartsWithin)
maxRestartsWithin: FiniteDuration = maxRestartsWithin,
logSettings: RestartSettings.LogSettings = logSettings): RestartSettings =
new RestartSettings(minBackoff, maxBackoff, randomFactor, maxRestarts, maxRestartsWithin, logSettings)
}
@ -68,7 +75,8 @@ object RestartSettings {
maxBackoff = maxBackoff,
randomFactor = randomFactor,
maxRestarts = Int.MaxValue,
maxRestartsWithin = minBackoff)
maxRestartsWithin = minBackoff,
logSettings = LogSettings.defaultSettings)
/** Java API */
def create(minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double): RestartSettings =
@ -77,5 +85,44 @@ object RestartSettings {
maxBackoff = maxBackoff.asScala,
randomFactor = randomFactor,
maxRestarts = Int.MaxValue,
maxRestartsWithin = minBackoff.asScala)
maxRestartsWithin = minBackoff.asScala,
logSettings = LogSettings.defaultSettings)
/** Java API */
def createLogSettings(logLevel: LogLevel): LogSettings =
LogSettings(logLevel)
object LogSettings {
private[akka] val defaultSettings =
new LogSettings(Logging.WarningLevel, Logging.ErrorLevel, criticalLogLevelAfter = Int.MaxValue)
def apply(logLevel: LogLevel): LogSettings = defaultSettings.copy(logLevel = logLevel)
}
final class LogSettings(val logLevel: LogLevel, val criticalLogLevel: LogLevel, val criticalLogLevelAfter: Int) {
def withLogLevel(level: LogLevel): LogSettings =
copy(logLevel = level)
/**
* Possibility to use another log level after a given number of errors.
* The initial errors are logged at the level defined with [[LogSettings.withLogLevel]].
* For example, the first 3 errors can be logged at INFO level and thereafter at ERROR level.
*
* The counter (and log level) is reset after the [[RestartSettings.maxRestartsWithin]]
* duration.
*/
def withCriticalLogLevel(criticalLevel: LogLevel, afterErrors: Int): LogSettings =
copy(criticalLogLevel = criticalLevel, criticalLogLevelAfter = afterErrors)
private def copy(
logLevel: LogLevel = logLevel,
criticalLogLevel: LogLevel = criticalLogLevel,
criticalLogLevelAfter: Int = criticalLogLevelAfter): LogSettings =
new LogSettings(
logLevel = logLevel,
criticalLogLevel = criticalLogLevel,
criticalLogLevelAfter = criticalLogLevelAfter)
}
}

View file

@ -5,6 +5,7 @@
package akka.stream.scaladsl
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.NotUsed
import akka.event.Logging
@ -281,6 +282,10 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
if (finishing || maxRestartsReached() || onlyOnFailures) {
complete(out)
} else {
logIt(
s"Restarting stream due to completion [${restartCount + 1}]",
OptionVal.None,
minLogLevel = Logging.InfoLevel)
scheduleRestartTimer()
}
}
@ -292,8 +297,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
if (finishing || maxRestartsReached()) {
fail(out, ex)
} else {
if (loggingEnabled)
log.warning("Restarting graph due to failure. stack_trace: {}", Logging.stackTraceFor(ex))
logIt(s"Restarting stream due to failure [${restartCount + 1}]: $ex", OptionVal.Some(ex))
scheduleRestartTimer()
}
}
@ -309,6 +313,39 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
sinkIn
}
private def logLevel(minLogLevel: Logging.LogLevel): Logging.LogLevel = {
val level =
if (restartCount >= logSettings.criticalLogLevelAfter) logSettings.criticalLogLevel else logSettings.logLevel
if (level >= minLogLevel || level == Logging.OffLevel) level else minLogLevel
}
private def logIt(
message: String,
exc: OptionVal[Throwable],
minLogLevel: Logging.LogLevel = Logging.ErrorLevel): Unit = {
if (loggingEnabled) {
logLevel(minLogLevel) match {
case Logging.ErrorLevel =>
exc match {
case OptionVal.Some(e) => log.error(e, message)
case _ => log.error(message)
}
case Logging.WarningLevel =>
if (log.isWarningEnabled) {
exc match {
case OptionVal.Some(e) if !e.isInstanceOf[NoStackTrace] =>
log.warning(message + s"${Logging.stackTraceFor(e)}")
case _ =>
log.warning(message)
}
}
case Logging.InfoLevel => log.info(message)
case Logging.DebugLevel => log.debug(message)
case _ => // off
}
}
}
/**
* @param in The permanent inlet for this operator
* @return Temporary SubSourceOutlet for this "restart"
@ -363,7 +400,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
protected final def maxRestartsReached(): Boolean = {
// Check if the last start attempt was more than the reset deadline
if (resetDeadline.isOverdue()) {
log.debug("Last restart attempt was more than {} ago, resetting restart count", maxRestartsWithin)
log.debug("Last restart attempt was more than {} ago, resetting restart count", maxRestartsWithin.toCoarsest)
restartCount = 0
}
restartCount == maxRestarts
@ -372,7 +409,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape](
// Set a timer to restart after the calculated delay
protected final def scheduleRestartTimer(): Unit = {
val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor)
log.debug("Restarting graph in {}", restartDelay)
log.debug("Restarting graph in {}", restartDelay.toCoarsest)
scheduleOnce("RestartTimer", restartDelay)
restartCount += 1
// And while we wait, we go into backoff mode