diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala index 592180adcf..649c06c983 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/RestartSpec.scala @@ -5,12 +5,15 @@ package akka.stream.scaladsl import java.util.concurrent.atomic.AtomicInteger + import scala.concurrent.{ Await, Promise } import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success + import akka.Done import akka.NotUsed +import akka.event.Logging import akka.stream.Attributes.Name import akka.stream.scaladsl.AttributesSpec.{ whateverAttribute, @@ -29,9 +32,12 @@ import akka.stream.testkit.scaladsl.StreamTestKit._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSource import akka.testkit.DefaultTimeout +import akka.testkit.EventFilter import akka.testkit.TestDuration -class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "10s")) with DefaultTimeout { +class RestartSpec + extends StreamSpec(Map("akka.test.single-expect-default" -> "10s", "akka.loglevel" -> "INFO")) + with DefaultTimeout { import system.dispatcher @@ -40,8 +46,11 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 private val minBackoff = 1.second.dilated private val maxBackoff = 3.seconds.dilated - private val shortRestartSettings = RestartSettings(shortMinBackoff, shortMaxBackoff, 0) - private val restartSettings = RestartSettings(minBackoff, maxBackoff, 0) + private val logSettings = RestartSettings.LogSettings(Logging.InfoLevel).withCriticalLogLevel(Logging.WarningLevel, 2) + private val shortRestartSettings = + RestartSettings(shortMinBackoff, shortMaxBackoff, 0).withLogSettings(logSettings) + private val restartSettings = + RestartSettings(minBackoff, maxBackoff, 0).withLogSettings(logSettings) "A restart with backoff source" should { "run normally" in assertAllStagesStopped { @@ -73,11 +82,13 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 } .runWith(TestSink.probe) - probe.requestNext("a") - probe.requestNext("b") - probe.requestNext("a") - probe.requestNext("b") - probe.requestNext("a") + EventFilter.info(start = "Restarting stream due to completion", occurrences = 2).intercept { + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("a") + } created.get() should ===(3) @@ -96,13 +107,21 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 } .runWith(TestSink.probe) - probe.requestNext("a") - probe.requestNext("b") - probe.requestNext("a") - probe.requestNext("b") - probe.requestNext("a") + EventFilter.info(start = "Restarting stream due to failure", occurrences = 2).intercept { + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("a") + probe.requestNext("b") + probe.requestNext("a") + } - created.get() should ===(3) + // after 2, use critical level + EventFilter.warning(start = "Restarting stream due to failure [3]", occurrences = 1).intercept { + probe.requestNext("b") + probe.requestNext("a") + } + + created.get() should ===(4) probe.cancel() } @@ -622,7 +641,9 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 .viaMat( RestartFlowFactory( onlyOnFailures, - RestartSettings(minBackoff, maxBackoff, 0).withMaxRestarts(maxRestarts, minBackoff)) { () => + RestartSettings(minBackoff, maxBackoff, 0) + .withMaxRestarts(maxRestarts, minBackoff) + .withLogSettings(logSettings)) { () => created.incrementAndGet() Flow.fromSinkAndSource( Flow[String] @@ -705,12 +726,14 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 flowOutProbe.sendNext("b") sink.requestNext("b") - sink.request(1) - flowOutProbe.sendNext("complete") + EventFilter.info(start = "Restarting stream due to completion", occurrences = 1).intercept { + sink.request(1) + flowOutProbe.sendNext("complete") - // This will complete the flow in probe and cancel the flow out probe - flowInProbe.request(2) - Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain.only("in complete", "out complete") + // This will complete the flow in probe and cancel the flow out probe + flowInProbe.request(2) + Seq(flowInProbe.expectNext(), flowInProbe.expectNext()) should contain.only("in complete", "out complete") + } // and it should restart source.sendNext("c") @@ -729,11 +752,13 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 flowOutProbe.sendNext("b") sink.requestNext("b") - sink.request(1) - flowOutProbe.sendNext("error") + EventFilter.info(start = "Restarting stream due to failure", occurrences = 1).intercept { + sink.request(1) + flowOutProbe.sendNext("error") - // This should complete the in probe - flowInProbe.requestNext("in complete") + // This should complete the in probe + flowInProbe.requestNext("in complete") + } // and it should restart source.sendNext("c") @@ -909,7 +934,8 @@ class RestartSpec extends StreamSpec(Map("akka.test.single-expect-default" -> "1 val restartOnFailures = RestartFlow - .onFailuresWithBackoff(RestartSettings(1.second, 2.seconds, 0.2).withMaxRestarts(2, 1.second))(() => { + .onFailuresWithBackoff( + RestartSettings(1.second, 2.seconds, 0.2).withMaxRestarts(2, 1.second).withLogSettings(logSettings))(() => { flowCreations.incrementAndGet() failsSomeTimes }) diff --git a/akka-stream/src/main/mima-filters/2.6.16.backwards.excludes/context-propagation.backwards.excludes b/akka-stream/src/main/mima-filters/2.6.15.backwards.excludes/context-propagation.excludes similarity index 100% rename from akka-stream/src/main/mima-filters/2.6.16.backwards.excludes/context-propagation.backwards.excludes rename to akka-stream/src/main/mima-filters/2.6.15.backwards.excludes/context-propagation.excludes diff --git a/akka-stream/src/main/mima-filters/2.6.15.backwards.excludes/issue-30445-RestartSettings.excludes b/akka-stream/src/main/mima-filters/2.6.15.backwards.excludes/issue-30445-RestartSettings.excludes new file mode 100644 index 0000000000..2715f7592f --- /dev/null +++ b/akka-stream/src/main/mima-filters/2.6.15.backwards.excludes/issue-30445-RestartSettings.excludes @@ -0,0 +1,3 @@ +# #30445 Log settings for restart stream stages +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.RestartSettings.this") + diff --git a/akka-stream/src/main/scala/akka/stream/RestartSettings.scala b/akka-stream/src/main/scala/akka/stream/RestartSettings.scala index b6453a7de1..2994716bf3 100644 --- a/akka-stream/src/main/scala/akka/stream/RestartSettings.scala +++ b/akka-stream/src/main/scala/akka/stream/RestartSettings.scala @@ -6,6 +6,8 @@ package akka.stream import scala.concurrent.duration.FiniteDuration +import akka.event.Logging +import akka.event.Logging.LogLevel import akka.util.JavaDurationConverters._ final class RestartSettings private ( @@ -13,7 +15,8 @@ final class RestartSettings private ( val maxBackoff: FiniteDuration, val randomFactor: Double, val maxRestarts: Int, - val maxRestartsWithin: FiniteDuration) { + val maxRestartsWithin: FiniteDuration, + val logSettings: RestartSettings.LogSettings) { /** Scala API: minimum (initial) duration until the child actor will started again, if it is terminated */ def withMinBackoff(value: FiniteDuration): RestartSettings = copy(minBackoff = value) @@ -41,6 +44,9 @@ final class RestartSettings private ( def withMaxRestarts(count: Int, within: java.time.Duration): RestartSettings = copy(maxRestarts = count, maxRestartsWithin = within.asScala) + def withLogSettings(newLogSettings: RestartSettings.LogSettings): RestartSettings = + copy(logSettings = newLogSettings) + override def toString: String = "RestartSettings(" + s"minBackoff=$minBackoff," + @@ -54,8 +60,9 @@ final class RestartSettings private ( maxBackoff: FiniteDuration = maxBackoff, randomFactor: Double = randomFactor, maxRestarts: Int = maxRestarts, - maxRestartsWithin: FiniteDuration = maxRestartsWithin): RestartSettings = - new RestartSettings(minBackoff, maxBackoff, randomFactor, maxRestarts, maxRestartsWithin) + maxRestartsWithin: FiniteDuration = maxRestartsWithin, + logSettings: RestartSettings.LogSettings = logSettings): RestartSettings = + new RestartSettings(minBackoff, maxBackoff, randomFactor, maxRestarts, maxRestartsWithin, logSettings) } @@ -68,7 +75,8 @@ object RestartSettings { maxBackoff = maxBackoff, randomFactor = randomFactor, maxRestarts = Int.MaxValue, - maxRestartsWithin = minBackoff) + maxRestartsWithin = minBackoff, + logSettings = LogSettings.defaultSettings) /** Java API */ def create(minBackoff: java.time.Duration, maxBackoff: java.time.Duration, randomFactor: Double): RestartSettings = @@ -77,5 +85,44 @@ object RestartSettings { maxBackoff = maxBackoff.asScala, randomFactor = randomFactor, maxRestarts = Int.MaxValue, - maxRestartsWithin = minBackoff.asScala) + maxRestartsWithin = minBackoff.asScala, + logSettings = LogSettings.defaultSettings) + + /** Java API */ + def createLogSettings(logLevel: LogLevel): LogSettings = + LogSettings(logLevel) + + object LogSettings { + private[akka] val defaultSettings = + new LogSettings(Logging.WarningLevel, Logging.ErrorLevel, criticalLogLevelAfter = Int.MaxValue) + + def apply(logLevel: LogLevel): LogSettings = defaultSettings.copy(logLevel = logLevel) + + } + + final class LogSettings(val logLevel: LogLevel, val criticalLogLevel: LogLevel, val criticalLogLevelAfter: Int) { + + def withLogLevel(level: LogLevel): LogSettings = + copy(logLevel = level) + + /** + * Possibility to use another log level after a given number of errors. + * The initial errors are logged at the level defined with [[LogSettings.withLogLevel]]. + * For example, the first 3 errors can be logged at INFO level and thereafter at ERROR level. + * + * The counter (and log level) is reset after the [[RestartSettings.maxRestartsWithin]] + * duration. + */ + def withCriticalLogLevel(criticalLevel: LogLevel, afterErrors: Int): LogSettings = + copy(criticalLogLevel = criticalLevel, criticalLogLevelAfter = afterErrors) + + private def copy( + logLevel: LogLevel = logLevel, + criticalLogLevel: LogLevel = criticalLogLevel, + criticalLogLevelAfter: Int = criticalLogLevelAfter): LogSettings = + new LogSettings( + logLevel = logLevel, + criticalLogLevel = criticalLogLevel, + criticalLogLevelAfter = criticalLogLevelAfter) + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala index ae68cf5edb..93a0969e3a 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/RestartFlow.scala @@ -5,6 +5,7 @@ package akka.stream.scaladsl import scala.concurrent.duration._ +import scala.util.control.NoStackTrace import akka.NotUsed import akka.event.Logging @@ -281,6 +282,10 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( if (finishing || maxRestartsReached() || onlyOnFailures) { complete(out) } else { + logIt( + s"Restarting stream due to completion [${restartCount + 1}]", + OptionVal.None, + minLogLevel = Logging.InfoLevel) scheduleRestartTimer() } } @@ -292,8 +297,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( if (finishing || maxRestartsReached()) { fail(out, ex) } else { - if (loggingEnabled) - log.warning("Restarting graph due to failure. stack_trace: {}", Logging.stackTraceFor(ex)) + logIt(s"Restarting stream due to failure [${restartCount + 1}]: $ex", OptionVal.Some(ex)) scheduleRestartTimer() } } @@ -309,6 +313,39 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( sinkIn } + private def logLevel(minLogLevel: Logging.LogLevel): Logging.LogLevel = { + val level = + if (restartCount >= logSettings.criticalLogLevelAfter) logSettings.criticalLogLevel else logSettings.logLevel + if (level >= minLogLevel || level == Logging.OffLevel) level else minLogLevel + } + + private def logIt( + message: String, + exc: OptionVal[Throwable], + minLogLevel: Logging.LogLevel = Logging.ErrorLevel): Unit = { + if (loggingEnabled) { + logLevel(minLogLevel) match { + case Logging.ErrorLevel => + exc match { + case OptionVal.Some(e) => log.error(e, message) + case _ => log.error(message) + } + case Logging.WarningLevel => + if (log.isWarningEnabled) { + exc match { + case OptionVal.Some(e) if !e.isInstanceOf[NoStackTrace] => + log.warning(message + s"${Logging.stackTraceFor(e)}") + case _ => + log.warning(message) + } + } + case Logging.InfoLevel => log.info(message) + case Logging.DebugLevel => log.debug(message) + case _ => // off + } + } + } + /** * @param in The permanent inlet for this operator * @return Temporary SubSourceOutlet for this "restart" @@ -363,7 +400,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( protected final def maxRestartsReached(): Boolean = { // Check if the last start attempt was more than the reset deadline if (resetDeadline.isOverdue()) { - log.debug("Last restart attempt was more than {} ago, resetting restart count", maxRestartsWithin) + log.debug("Last restart attempt was more than {} ago, resetting restart count", maxRestartsWithin.toCoarsest) restartCount = 0 } restartCount == maxRestarts @@ -372,7 +409,7 @@ private abstract class RestartWithBackoffLogic[S <: Shape]( // Set a timer to restart after the calculated delay protected final def scheduleRestartTimer(): Unit = { val restartDelay = BackoffSupervisor.calculateDelay(restartCount, minBackoff, maxBackoff, randomFactor) - log.debug("Restarting graph in {}", restartDelay) + log.debug("Restarting graph in {}", restartDelay.toCoarsest) scheduleOnce("RestartTimer", restartDelay) restartCount += 1 // And while we wait, we go into backoff mode