diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index d59602c696..d24d7b4d84 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -18,6 +18,7 @@ import org.scalatest.{ Matchers, WordSpec } import scala.concurrent.duration._ import scala.util.control.NoStackTrace +import scala.concurrent.duration._ object SupervisionSpec { @@ -646,5 +647,47 @@ class SupervisionSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { probe.expectMessage(Started) } + "replace supervision when new returned behavior catches same exception" in { + val probe = TestProbe[AnyRef]("probeMcProbeFace") + val behv = supervise[String](Behaviors.receiveMessage { + case "boom" ⇒ throw TE("boom indeed") + case "switch" ⇒ + supervise[String]( + supervise[String]( + supervise[String]( + supervise[String]( + supervise[String]( + Behaviors.receiveMessage { + case "boom" ⇒ throw TE("boom indeed") + case "ping" ⇒ + probe.ref ! "pong" + Behaviors.same + case "give me stacktrace" ⇒ + probe.ref ! new RuntimeException().getStackTrace.toVector + Behaviors.stopped + }).onFailure[RuntimeException](SupervisorStrategy.resume) + ).onFailure[RuntimeException](SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 23D)) + ).onFailure[RuntimeException](SupervisorStrategy.restartWithLimit(23, 10.seconds)) + ).onFailure[IllegalArgumentException](SupervisorStrategy.restart) + ).onFailure[RuntimeException](SupervisorStrategy.restart) + }).onFailure[RuntimeException](SupervisorStrategy.stop) + + val actor = spawn(behv) + actor ! "switch" + actor ! "ping" + probe.expectMessage("pong") + + EventFilter[RuntimeException](occurrences = 1).intercept { + // Should be supervised as resume + actor ! "boom" + } + + actor ! "give me stacktrace" + val stacktrace = probe.expectMessageType[Vector[StackTraceElement]] + // supervisor receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException + // and then the IllegalArgument one is kept since it has a different throwable + stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.Supervisor.receive")) should ===(2) + } + } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala index 226b67e503..d0dfcf6426 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala @@ -11,8 +11,9 @@ import akka.actor.DeadLetterSuppression import akka.actor.typed.SupervisorStrategy._ import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi -import akka.util.OptionVal +import akka.util.{ OptionVal, PrettyDuration } +import scala.annotation.tailrec import scala.concurrent.duration.{ Deadline, FiniteDuration } import scala.reflect.ClassTag import scala.util.control.Exception.Catcher @@ -44,6 +45,37 @@ import scala.util.control.NonFatal supervisor.init(ctx) } + // find supervision that is superflous by having the same exception handled closer to the behavior + // and remove those instances. Needs to be called in the wrap method of all supervisor classes + def deduplicate[T, Thr <: Throwable: ClassTag](supervisor: Supervisor[T, Thr]): Supervisor[T, Thr] = { + + // side effecting to avoid allocating a tuple per loop + var seenSupervised = Set.empty[Class[_]] + + // can't be tailrec, but should be ok since hierarchies shouldn't be _that_ deep given that they + // are deduplicated for every wrap + def loop(behavior: Behavior[T]): Behavior[T] = { + behavior match { + case s: Supervisor[T, _] ⇒ + val inner = loop(s.behavior) + if (seenSupervised.contains(s.throwableClass)) + // the exception this supervision cover is already covered closer to + // the actual behavior so s will never be invoked, lets' remove it + inner + else { + seenSupervised += s.throwableClass + if (inner eq s.behavior) s + else s.wrap(inner, false) + } + case b ⇒ b + } + } + + // we know that the result is either the original outermost or another outermost supervision + // but the type system doesn't + loop(supervisor).asInstanceOf[Supervisor[T, Thr]] + } + } /** @@ -51,6 +83,8 @@ import scala.util.control.NonFatal */ @InternalApi private[akka] abstract class Supervisor[T, Thr <: Throwable: ClassTag] extends ExtensibleBehavior[T] { + private[akka] def throwableClass = implicitly[ClassTag[Thr]].runtimeClass + protected def loggingEnabled: Boolean /** @@ -99,7 +133,7 @@ import scala.util.control.NonFatal protected def log(ctx: ActorContext[T], ex: Thr): Unit = { if (loggingEnabled) - ctx.asScala.log.error(ex, ex.getMessage) + ctx.asScala.log.error(ex, "Supervisor [{}] saw failure: {}", this, ex.getMessage) } } @@ -120,8 +154,9 @@ import scala.util.control.NonFatal } override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = - new Resumer[T, Thr](nextBehavior, loggingEnabled) + Supervisor.deduplicate(new Resumer[T, Thr](nextBehavior, loggingEnabled)) + override def toString = "resume" } /** @@ -140,7 +175,9 @@ import scala.util.control.NonFatal } override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = - new Stopper[T, Thr](nextBehavior, loggingEnabled) + Supervisor.deduplicate(new Stopper[T, Thr](nextBehavior, loggingEnabled)) + + override def toString = "stop" } @@ -162,7 +199,9 @@ import scala.util.control.NonFatal } override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = - new Restarter[T, Thr](initialBehavior, nextBehavior, loggingEnabled) + Supervisor.deduplicate(new Restarter[T, Thr](initialBehavior, nextBehavior, loggingEnabled)) + + override def toString = "restart" } /** @@ -200,14 +239,18 @@ import scala.util.control.NonFatal } override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = { - if (afterException) { + val restarter = if (afterException) { val timeLeft = deadlineHasTimeLeft val newRetries = if (timeLeft) retries + 1 else 1 val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange) new LimitedRestarter[T, Thr](initialBehavior, nextBehavior, strategy, newRetries, newDeadline) } else new LimitedRestarter[T, Thr](initialBehavior, nextBehavior, strategy, retries, deadline) + + Supervisor.deduplicate(restarter) } + + override def toString = s"restartWithLimit(${strategy.maxNrOfRetries}, ${PrettyDuration.format(strategy.withinTimeRange)})" } /** @@ -308,7 +351,9 @@ import scala.util.control.NonFatal if (afterException) throw new IllegalStateException("wrap not expected afterException in BackoffRestarter") else - new BackoffRestarter[T, Thr](initialBehavior, nextBehavior, strategy, restartCount, blackhole) + Supervisor.deduplicate(new BackoffRestarter[T, Thr](initialBehavior, nextBehavior, strategy, restartCount, blackhole)) } + + override def toString = s"restartWithBackoff(${PrettyDuration.format(strategy.minBackoff)}, ${PrettyDuration.format(strategy.maxBackoff)}, ${strategy.randomFactor})" }