Typed: Un-nest supervision that overlaps #25128
This commit is contained in:
parent
9b4b47ea8b
commit
2b6997b7a0
2 changed files with 95 additions and 7 deletions
|
|
@ -18,6 +18,7 @@ import org.scalatest.{ Matchers, WordSpec }
|
|||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NoStackTrace
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object SupervisionSpec {
|
||||
|
||||
|
|
@ -646,5 +647,47 @@ class SupervisionSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
|
|||
probe.expectMessage(Started)
|
||||
}
|
||||
|
||||
"replace supervision when new returned behavior catches same exception" in {
|
||||
val probe = TestProbe[AnyRef]("probeMcProbeFace")
|
||||
val behv = supervise[String](Behaviors.receiveMessage {
|
||||
case "boom" ⇒ throw TE("boom indeed")
|
||||
case "switch" ⇒
|
||||
supervise[String](
|
||||
supervise[String](
|
||||
supervise[String](
|
||||
supervise[String](
|
||||
supervise[String](
|
||||
Behaviors.receiveMessage {
|
||||
case "boom" ⇒ throw TE("boom indeed")
|
||||
case "ping" ⇒
|
||||
probe.ref ! "pong"
|
||||
Behaviors.same
|
||||
case "give me stacktrace" ⇒
|
||||
probe.ref ! new RuntimeException().getStackTrace.toVector
|
||||
Behaviors.stopped
|
||||
}).onFailure[RuntimeException](SupervisorStrategy.resume)
|
||||
).onFailure[RuntimeException](SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 23D))
|
||||
).onFailure[RuntimeException](SupervisorStrategy.restartWithLimit(23, 10.seconds))
|
||||
).onFailure[IllegalArgumentException](SupervisorStrategy.restart)
|
||||
).onFailure[RuntimeException](SupervisorStrategy.restart)
|
||||
}).onFailure[RuntimeException](SupervisorStrategy.stop)
|
||||
|
||||
val actor = spawn(behv)
|
||||
actor ! "switch"
|
||||
actor ! "ping"
|
||||
probe.expectMessage("pong")
|
||||
|
||||
EventFilter[RuntimeException](occurrences = 1).intercept {
|
||||
// Should be supervised as resume
|
||||
actor ! "boom"
|
||||
}
|
||||
|
||||
actor ! "give me stacktrace"
|
||||
val stacktrace = probe.expectMessageType[Vector[StackTraceElement]]
|
||||
// supervisor receive is used for every supervision instance, only wrapped in one supervisor for RuntimeException
|
||||
// and then the IllegalArgument one is kept since it has a different throwable
|
||||
stacktrace.count(_.toString.startsWith("akka.actor.typed.internal.Supervisor.receive")) should ===(2)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,8 +11,9 @@ import akka.actor.DeadLetterSuppression
|
|||
import akka.actor.typed.SupervisorStrategy._
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.OptionVal
|
||||
import akka.util.{ OptionVal, PrettyDuration }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.duration.{ Deadline, FiniteDuration }
|
||||
import scala.reflect.ClassTag
|
||||
import scala.util.control.Exception.Catcher
|
||||
|
|
@ -44,6 +45,37 @@ import scala.util.control.NonFatal
|
|||
supervisor.init(ctx)
|
||||
}
|
||||
|
||||
// find supervision that is superflous by having the same exception handled closer to the behavior
|
||||
// and remove those instances. Needs to be called in the wrap method of all supervisor classes
|
||||
def deduplicate[T, Thr <: Throwable: ClassTag](supervisor: Supervisor[T, Thr]): Supervisor[T, Thr] = {
|
||||
|
||||
// side effecting to avoid allocating a tuple per loop
|
||||
var seenSupervised = Set.empty[Class[_]]
|
||||
|
||||
// can't be tailrec, but should be ok since hierarchies shouldn't be _that_ deep given that they
|
||||
// are deduplicated for every wrap
|
||||
def loop(behavior: Behavior[T]): Behavior[T] = {
|
||||
behavior match {
|
||||
case s: Supervisor[T, _] ⇒
|
||||
val inner = loop(s.behavior)
|
||||
if (seenSupervised.contains(s.throwableClass))
|
||||
// the exception this supervision cover is already covered closer to
|
||||
// the actual behavior so s will never be invoked, lets' remove it
|
||||
inner
|
||||
else {
|
||||
seenSupervised += s.throwableClass
|
||||
if (inner eq s.behavior) s
|
||||
else s.wrap(inner, false)
|
||||
}
|
||||
case b ⇒ b
|
||||
}
|
||||
}
|
||||
|
||||
// we know that the result is either the original outermost or another outermost supervision
|
||||
// but the type system doesn't
|
||||
loop(supervisor).asInstanceOf[Supervisor[T, Thr]]
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -51,6 +83,8 @@ import scala.util.control.NonFatal
|
|||
*/
|
||||
@InternalApi private[akka] abstract class Supervisor[T, Thr <: Throwable: ClassTag] extends ExtensibleBehavior[T] {
|
||||
|
||||
private[akka] def throwableClass = implicitly[ClassTag[Thr]].runtimeClass
|
||||
|
||||
protected def loggingEnabled: Boolean
|
||||
|
||||
/**
|
||||
|
|
@ -99,7 +133,7 @@ import scala.util.control.NonFatal
|
|||
|
||||
protected def log(ctx: ActorContext[T], ex: Thr): Unit = {
|
||||
if (loggingEnabled)
|
||||
ctx.asScala.log.error(ex, ex.getMessage)
|
||||
ctx.asScala.log.error(ex, "Supervisor [{}] saw failure: {}", this, ex.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -120,8 +154,9 @@ import scala.util.control.NonFatal
|
|||
}
|
||||
|
||||
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] =
|
||||
new Resumer[T, Thr](nextBehavior, loggingEnabled)
|
||||
Supervisor.deduplicate(new Resumer[T, Thr](nextBehavior, loggingEnabled))
|
||||
|
||||
override def toString = "resume"
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -140,7 +175,9 @@ import scala.util.control.NonFatal
|
|||
}
|
||||
|
||||
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] =
|
||||
new Stopper[T, Thr](nextBehavior, loggingEnabled)
|
||||
Supervisor.deduplicate(new Stopper[T, Thr](nextBehavior, loggingEnabled))
|
||||
|
||||
override def toString = "stop"
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -162,7 +199,9 @@ import scala.util.control.NonFatal
|
|||
}
|
||||
|
||||
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] =
|
||||
new Restarter[T, Thr](initialBehavior, nextBehavior, loggingEnabled)
|
||||
Supervisor.deduplicate(new Restarter[T, Thr](initialBehavior, nextBehavior, loggingEnabled))
|
||||
|
||||
override def toString = "restart"
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -200,14 +239,18 @@ import scala.util.control.NonFatal
|
|||
}
|
||||
|
||||
override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = {
|
||||
if (afterException) {
|
||||
val restarter = if (afterException) {
|
||||
val timeLeft = deadlineHasTimeLeft
|
||||
val newRetries = if (timeLeft) retries + 1 else 1
|
||||
val newDeadline = if (deadline.isDefined && timeLeft) deadline else OptionVal.Some(Deadline.now + strategy.withinTimeRange)
|
||||
new LimitedRestarter[T, Thr](initialBehavior, nextBehavior, strategy, newRetries, newDeadline)
|
||||
} else
|
||||
new LimitedRestarter[T, Thr](initialBehavior, nextBehavior, strategy, retries, deadline)
|
||||
|
||||
Supervisor.deduplicate(restarter)
|
||||
}
|
||||
|
||||
override def toString = s"restartWithLimit(${strategy.maxNrOfRetries}, ${PrettyDuration.format(strategy.withinTimeRange)})"
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -308,7 +351,9 @@ import scala.util.control.NonFatal
|
|||
if (afterException)
|
||||
throw new IllegalStateException("wrap not expected afterException in BackoffRestarter")
|
||||
else
|
||||
new BackoffRestarter[T, Thr](initialBehavior, nextBehavior, strategy, restartCount, blackhole)
|
||||
Supervisor.deduplicate(new BackoffRestarter[T, Thr](initialBehavior, nextBehavior, strategy, restartCount, blackhole))
|
||||
}
|
||||
|
||||
override def toString = s"restartWithBackoff(${PrettyDuration.format(strategy.minBackoff)}, ${PrettyDuration.format(strategy.maxBackoff)}, ${strategy.randomFactor})"
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue