diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 44dae83d58..5f15928af8 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -4,16 +4,19 @@ package akka.actor.typed import java.io.IOException +import java.util.concurrent.atomic.AtomicInteger +import akka.actor.ActorInitializationException import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.Behaviors._ +import akka.testkit.EventFilter +import akka.testkit.typed.scaladsl._ +import akka.testkit.typed._ +import com.typesafe.config.ConfigFactory +import org.scalatest.{ Matchers, WordSpec } import scala.concurrent.duration._ -import akka.actor.typed.scaladsl.Behaviors._ -import akka.testkit.typed.{ BehaviorTestkit, TestInbox, TestKit, TestKitSettings } - import scala.util.control.NoStackTrace -import akka.testkit.typed.scaladsl._ -import org.scalatest.{ Matchers, WordSpec, fixture } object SupervisionSpec { @@ -29,6 +32,7 @@ object SupervisionSpec { case class GotSignal(signal: Signal) extends Event case class State(n: Int, children: Map[String, ActorRef[Command]]) extends Event case object Started extends Event + case object StartFailed extends Event class Exc1(msg: String = "exc-1") extends RuntimeException(msg) with NoStackTrace class Exc2 extends Exc1("exc-2") @@ -236,13 +240,55 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { } } -class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { +class SupervisionSpec extends TestKit("SupervisionSpec", ConfigFactory.parseString( + """ + akka.loggers = [akka.testkit.TestEventListener] + """)) with TypedAkkaSpecWithShutdown { import SupervisionSpec._ private val nameCounter = Iterator.from(0) private def nextName(prefix: String = "a"): String = s"$prefix-${nameCounter.next()}" - implicit val testSettings = TestKitSettings(system) + // FIXME eventfilter support in typed testkit + import scaladsl.adapter._ + implicit val untypedSystem = system.toUntyped + + class FailingConstructorTestSetup(failCount: Int) { + val failCounter = new AtomicInteger(0) + class FailingConstructor(monitor: ActorRef[Event]) extends MutableBehavior[Command] { + monitor ! Started + if (failCounter.getAndIncrement() < failCount) { + throw TE("simulated exc from constructor") + } + override def onMessage(msg: Command): Behavior[Command] = { + monitor ! Pong + Behaviors.same + } + } + } + + class FailingDeferredTestSetup(failCount: Int, strategy: SupervisorStrategy) { + val probe = TestProbe[AnyRef]("evt") + val failCounter = new AtomicInteger(0) + def behv = supervise(deferred[Command] { _ ⇒ + val count = failCounter.getAndIncrement() + if (count < failCount) { + probe.ref ! StartFailed + throw TE(s"construction ${count} failed") + } else { + probe.ref ! Started + Behaviors.empty + } + }).onFailure[TE](strategy) + } + + class FailingUnhandledTestSetup(strategy: SupervisorStrategy) { + val probe = TestProbe[AnyRef]("evt") + def behv = supervise(deferred[Command] { _ ⇒ + probe.ref ! StartFailed + throw new TE("construction failed") + }).onFailure[IllegalArgumentException](strategy) + } "A supervised actor" must { "receive message" in { @@ -259,8 +305,10 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { val behv = Behaviors.supervise(targetBehavior(probe.ref)) .onFailure[Throwable](SupervisorStrategy.stop) val ref = spawn(behv) - ref ! Throw(new Exc3) - probe.expectMessage(GotSignal(PostStop)) + EventFilter[Exc3](occurrences = 1).intercept { + ref ! Throw(new Exc3) + probe.expectMessage(GotSignal(PostStop)) + } } "support nesting exceptions with different strategies" in { @@ -273,19 +321,25 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { val ref = spawn(behv) - ref ! Throw(new IOException()) - probe.expectMessage(GotSignal(PreRestart)) + EventFilter[IOException](occurrences = 1).intercept { + ref ! Throw(new IOException()) + probe.expectMessage(GotSignal(PreRestart)) + } - ref ! Throw(new IllegalArgumentException("cat")) - probe.expectMessage(GotSignal(PostStop)) + EventFilter[IllegalArgumentException](occurrences = 1).intercept { + ref ! Throw(new IllegalArgumentException("cat")) + probe.expectMessage(GotSignal(PostStop)) + } } "stop when not supervised" in { val probe = TestProbe[Event]("evt") val behv = targetBehavior(probe.ref) val ref = spawn(behv) - ref ! Throw(new Exc3) - probe.expectMessage(GotSignal(PostStop)) + EventFilter[Exc3](occurrences = 1).intercept { + ref ! Throw(new Exc3) + probe.expectMessage(GotSignal(PostStop)) + } } "stop when unhandled exception" in { @@ -293,8 +347,10 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { val behv = Behaviors.supervise(targetBehavior(probe.ref)) .onFailure[Exc1](SupervisorStrategy.restart) val ref = spawn(behv) - ref ! Throw(new Exc3) - probe.expectMessage(GotSignal(PostStop)) + EventFilter[Exc3](occurrences = 1).intercept { + ref ! Throw(new Exc3) + probe.expectMessage(GotSignal(PostStop)) + } } "restart when handled exception" in { @@ -306,8 +362,10 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { ref ! GetState probe.expectMessage(State(1, Map.empty)) - ref ! Throw(new Exc2) - probe.expectMessage(GotSignal(PreRestart)) + EventFilter[Exc2](occurrences = 1).intercept { + ref ! Throw(new Exc2) + probe.expectMessage(GotSignal(PreRestart)) + } ref ! GetState probe.expectMessage(State(0, Map.empty)) } @@ -324,9 +382,11 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { ref ! GetState parentProbe.expectMessageType[State].children.keySet should contain(childName) - ref ! Throw(new Exc1) - parentProbe.expectMessage(GotSignal(PreRestart)) - ref ! GetState + EventFilter[Exc1](occurrences = 1).intercept { + ref ! Throw(new Exc1) + parentProbe.expectMessage(GotSignal(PreRestart)) + ref ! GetState + } // TODO document this difference compared to classic actors, and that // children can be stopped if needed in PreRestart parentProbe.expectMessageType[State].children.keySet should contain(childName) @@ -341,9 +401,11 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { ref ! GetState probe.expectMessage(State(1, Map.empty)) - ref ! Throw(new Exc2) - ref ! GetState - probe.expectMessage(State(1, Map.empty)) + EventFilter[Exc2](occurrences = 1).intercept { + ref ! Throw(new Exc2) + ref ! GetState + probe.expectMessage(State(1, Map.empty)) + } } "support nesting to handle different exceptions" in { @@ -358,20 +420,26 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { probe.expectMessage(State(1, Map.empty)) // resume - ref ! Throw(new Exc2) - probe.expectNoMessage() - ref ! GetState - probe.expectMessage(State(1, Map.empty)) + EventFilter[Exc2](occurrences = 1).intercept { + ref ! Throw(new Exc2) + probe.expectNoMessage() + ref ! GetState + probe.expectMessage(State(1, Map.empty)) + } // restart - ref ! Throw(new Exc3) - probe.expectMessage(GotSignal(PreRestart)) - ref ! GetState - probe.expectMessage(State(0, Map.empty)) + EventFilter[Exc3](occurrences = 1).intercept { + ref ! Throw(new Exc3) + probe.expectMessage(GotSignal(PreRestart)) + ref ! GetState + probe.expectMessage(State(0, Map.empty)) + } // stop - ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PostStop)) + EventFilter[Exc1](occurrences = 1).intercept { + ref ! Throw(new Exc1) + probe.expectMessage(GotSignal(PostStop)) + } } "restart after exponential backoff" in { @@ -387,11 +455,13 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { }).onFailure[Exception](strategy) val ref = spawn(behv) - startedProbe.expectMessage(Started) - ref ! IncrementState - ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PreRestart)) - ref ! Ping // dropped due to backoff + EventFilter[Exc1](occurrences = 1).intercept { + startedProbe.expectMessage(Started) + ref ! IncrementState + ref ! Throw(new Exc1) + probe.expectMessage(GotSignal(PreRestart)) + ref ! Ping // dropped due to backoff + } startedProbe.expectNoMessage(minBackoff - 100.millis) probe.expectNoMessage(minBackoff + 100.millis) @@ -400,10 +470,12 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { probe.expectMessage(State(0, Map.empty)) // one more time - ref ! IncrementState - ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PreRestart)) - ref ! Ping // dropped due to backoff + EventFilter[Exc1](occurrences = 1).intercept { + ref ! IncrementState + ref ! Throw(new Exc1) + probe.expectMessage(GotSignal(PreRestart)) + ref ! Ping // dropped due to backoff + } startedProbe.expectNoMessage((minBackoff * 2) - 100.millis) probe.expectNoMessage((minBackoff * 2) + 100.millis) @@ -420,21 +492,25 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { val behv = supervise(targetBehavior(probe.ref)).onFailure[Exc1](strategy) val ref = spawn(behv) - ref ! IncrementState - ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PreRestart)) - ref ! Ping // dropped due to backoff + EventFilter[Exc1](occurrences = 1).intercept { + ref ! IncrementState + ref ! Throw(new Exc1) + probe.expectMessage(GotSignal(PreRestart)) + ref ! Ping // dropped due to backoff + } probe.expectNoMessage(minBackoff + 100.millis.dilated) ref ! GetState probe.expectMessage(State(0, Map.empty)) // one more time after the reset timeout - probe.expectNoMessage(strategy.resetBackoffAfter + 100.millis.dilated) - ref ! IncrementState - ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PreRestart)) - ref ! Ping // dropped due to backoff + EventFilter[Exc1](occurrences = 1).intercept { + probe.expectNoMessage(strategy.resetBackoffAfter + 100.millis.dilated) + ref ! IncrementState + ref ! Throw(new Exc1) + probe.expectMessage(GotSignal(PreRestart)) + ref ! Ping // dropped due to backoff + } // backoff was reset, so restarted after the minBackoff probe.expectNoMessage(minBackoff + 100.millis.dilated) @@ -454,14 +530,118 @@ class SupervisionSpec extends TestKit with TypedAkkaSpecWithShutdown { probe.expectMessage(Started) } - "stop when exception from MutableBehavior constructor" in { + "fail instead of restart when deferred factory throws" in new FailingDeferredTestSetup( + failCount = 1, strategy = SupervisorStrategy.restart) { + + EventFilter[ActorInitializationException](occurrences = 1).intercept { + spawn(behv) + } + } + + "fail to restart when deferred factory throws unhandled" in new FailingUnhandledTestSetup( + strategy = SupervisorStrategy.restart) { + + EventFilter[ActorInitializationException](occurrences = 1).intercept { + spawn(behv) + } + } + + "fail to resume when deferred factory throws" in new FailingDeferredTestSetup( + failCount = 1, + strategy = SupervisorStrategy.resume + ) { + EventFilter[ActorInitializationException](occurrences = 1).intercept { + spawn(behv) + } + } + + "restart with exponential backoff when deferred factory throws" in new FailingDeferredTestSetup( + failCount = 1, + strategy = SupervisorStrategy.restartWithBackoff(minBackoff = 100.millis.dilated, maxBackoff = 1.second, 0) + ) { + + EventFilter[TE](occurrences = 1).intercept { + spawn(behv) + + probe.expectMessage(StartFailed) + // restarted after a delay when first start failed + probe.expectNoMessage(100.millis) + probe.expectMessage(Started) + } + } + + "fail instead of restart with exponential backoff when deferred factory throws unhandled" in new FailingUnhandledTestSetup( + strategy = SupervisorStrategy.restartWithBackoff(minBackoff = 100.millis.dilated, maxBackoff = 1.second, 0)) { + + EventFilter[ActorInitializationException](occurrences = 1).intercept { + spawn(behv) + probe.expectMessage(StartFailed) + } + } + + "restartWithLimit when deferred factory throws" in new FailingDeferredTestSetup( + failCount = 1, + strategy = SupervisorStrategy.restartWithLimit(3, 1.second) + ) { + + EventFilter[TE](occurrences = 1).intercept { + spawn(behv) + + probe.expectMessage(StartFailed) + probe.expectMessage(Started) + } + } + + "fail after more than limit in restartWithLimit when deferred factory throws" in new FailingDeferredTestSetup( + failCount = 3, + strategy = SupervisorStrategy.restartWithLimit(2, 1.second) + ) { + + EventFilter[ActorInitializationException](occurrences = 1).intercept { + EventFilter[TE](occurrences = 2).intercept { + spawn(behv) + + // restarted 2 times before it gave up + probe.expectMessage(StartFailed) + probe.expectMessage(StartFailed) + probe.expectNoMessage(100.millis) + } + } + } + + "fail instead of restart with limit when deferred factory throws unhandled" in new FailingUnhandledTestSetup( + strategy = SupervisorStrategy.restartWithLimit(3, 1.second)) { + + EventFilter[ActorInitializationException](occurrences = 1).intercept { + spawn(behv) + probe.expectMessage(StartFailed) + } + } + + "fail when exception from MutableBehavior constructor" in new FailingConstructorTestSetup(failCount = 1) { val probe = TestProbe[Event]("evt") val behv = supervise(mutable[Command](_ ⇒ new FailingConstructor(probe.ref))) .onFailure[Exception](SupervisorStrategy.restart) - val ref = spawn(behv) - probe.expectMessage(Started) - ref ! Ping - probe.expectNoMessage() + + EventFilter[ActorInitializationException](occurrences = 1).intercept { + val ref = spawn(behv) + probe.expectMessage(Started) // first one before failure + } } + + "work with nested supervisions and defers" in { + val strategy = SupervisorStrategy.restartWithLimit(3, 1.second) + val probe = TestProbe[AnyRef]("p") + val beh = supervise[String](deferred(ctx ⇒ + supervise[String](deferred { ctx ⇒ + probe.ref ! Started + scaladsl.Behaviors.empty[String] + }).onFailure[RuntimeException](strategy) + )).onFailure[Exception](strategy) + + spawn(beh) + probe.expectMessage(Started) + } + } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala index 437a8f0b97..8b5a5d55b4 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/SupervisorStrategy.scala @@ -12,11 +12,17 @@ object SupervisorStrategy { /** * Resume means keeping the same state as before the exception was * thrown and is thus less safe than `restart`. + * + * If the actor behavior is deferred and throws an exception on startup the actor is stopped + * (restarting would be dangerous as it could lead to an infinite restart-loop) */ val resume: SupervisorStrategy = Resume(loggingEnabled = true) /** * Restart immediately without any limit on number of restart retries. + * + * If the actor behavior is deferred and throws an exception on startup the actor is stopped + * (restarting would be dangerous as it could lead to an infinite restart-loop) */ val restart: SupervisorStrategy = Restart(-1, Duration.Zero, loggingEnabled = true) @@ -31,6 +37,9 @@ object SupervisorStrategy { * within a time range (`withinTimeRange`). When the time window has elapsed without reaching * `maxNrOfRetries` the restart count is reset. * + * The strategy is applied also if the actor behavior is deferred and throws an exception during + * startup. + * * @param maxNrOfRetries the number of times a child actor is allowed to be restarted, * if the limit is exceeded the child actor is stopped * @param withinTimeRange duration of the time window for maxNrOfRetries @@ -55,6 +64,9 @@ object SupervisorStrategy { * If no new exception occurs within the `minBackoff` duration the exponentially * increased back-off timeout is reset. * + * The strategy is applied also if the actor behavior is deferred and throws an exception during + * startup. + * * @param minBackoff minimum (initial) duration until the child actor will * started again, if it is terminated * @param maxBackoff the exponential back-off is capped to this duration diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala index 6497bb6d47..e0e3e662f3 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala @@ -6,19 +6,17 @@ package internal import java.util.concurrent.ThreadLocalRandom -import scala.annotation.tailrec -import scala.concurrent.duration.Deadline -import scala.concurrent.duration.FiniteDuration +import akka.actor.DeadLetterSuppression +import akka.actor.typed.SupervisorStrategy._ +import akka.actor.typed.scaladsl.Behaviors +import akka.annotation.InternalApi +import akka.event.Logging +import akka.util.OptionVal + +import scala.concurrent.duration.{ Deadline, FiniteDuration } import scala.reflect.ClassTag import scala.util.control.Exception.Catcher import scala.util.control.NonFatal -import akka.actor.DeadLetterSuppression -import akka.annotation.InternalApi -import akka.event.Logging -import akka.actor.typed.Behavior.DeferredBehavior -import akka.actor.typed.SupervisorStrategy._ -import akka.util.OptionVal -import akka.actor.typed.scaladsl.Behaviors /** * INTERNAL API @@ -27,26 +25,26 @@ import akka.actor.typed.scaladsl.Behaviors def apply[T, Thr <: Throwable: ClassTag](initialBehavior: Behavior[T], strategy: SupervisorStrategy): Behavior[T] = Behaviors.deferred[T] { ctx ⇒ val c = ctx.asInstanceOf[akka.actor.typed.ActorContext[T]] - val startedBehavior = initialUndefer(c, initialBehavior) - strategy match { + val supervisor: Supervisor[T, Thr] = strategy match { case Restart(-1, _, loggingEnabled) ⇒ - new Restarter(initialBehavior, startedBehavior, loggingEnabled) + new Restarter(initialBehavior, initialBehavior, loggingEnabled) case r: Restart ⇒ - new LimitedRestarter(initialBehavior, startedBehavior, r, retries = 0, deadline = OptionVal.None) - case Resume(loggingEnabled) ⇒ new Resumer(startedBehavior, loggingEnabled) - case Stop(loggingEnabled) ⇒ new Stopper(startedBehavior, loggingEnabled) + new LimitedRestarter(initialBehavior, initialBehavior, r, retries = 0, deadline = OptionVal.None) + case Resume(loggingEnabled) ⇒ new Resumer(initialBehavior, loggingEnabled) + case Stop(loggingEnabled) ⇒ new Stopper(initialBehavior, loggingEnabled) case b: Backoff ⇒ val backoffRestarter = new BackoffRestarter( initialBehavior.asInstanceOf[Behavior[Any]], - startedBehavior.asInstanceOf[Behavior[Any]], + initialBehavior.asInstanceOf[Behavior[Any]], b, restartCount = 0, blackhole = false) - backoffRestarter.asInstanceOf[Behavior[T]] + backoffRestarter + .asInstanceOf[Supervisor[T, Thr]] } + + supervisor.init(c) } - def initialUndefer[T](ctx: ActorContext[T], initialBehavior: Behavior[T]): Behavior[T] = - Behavior.validateAsInitial(Behavior.undefer(initialBehavior, ctx)) } /** @@ -56,6 +54,14 @@ import akka.actor.typed.scaladsl.Behaviors protected def loggingEnabled: Boolean + /** + * Invoked when the actor is created (or re-created on restart) this is where a restarter implementation + * can provide logic for dealing with exceptions thrown when running any actor initialization logic (undeferring). + * + * @return The initial behavior of the actor after undeferring if needed + */ + def init(ctx: ActorContext[T]): Supervisor[T, Thr] + /** * Current behavior */ @@ -72,8 +78,7 @@ import akka.actor.typed.scaladsl.Behaviors try Behavior.interpretSignal(startedBehavior, ctx, PreRestart) catch { case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart") } - // no need to canonicalize, it's done in the calling methods - wrap(Supervisor.initialUndefer(ctx, initialBehavior), afterException = true) + wrap(initialBehavior, afterException = true).init(ctx) } protected final def supervise(nextBehavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = @@ -105,6 +110,10 @@ import akka.actor.typed.scaladsl.Behaviors @InternalApi private[akka] final class Resumer[T, Thr <: Throwable: ClassTag]( override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] { + def init(ctx: ActorContext[T]) = + // no handling of errors for Resume as that could lead to infinite restart-loop + wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), afterException = false) + override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Supervisor[T, Thr]] = { case NonFatal(ex: Thr) ⇒ log(ctx, ex) @@ -122,6 +131,9 @@ import akka.actor.typed.scaladsl.Behaviors @InternalApi private[akka] final class Stopper[T, Thr <: Throwable: ClassTag]( override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] { + def init(ctx: ActorContext[T]): Supervisor[T, Thr] = + wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), false) + override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Behavior[T]] = { case NonFatal(ex: Thr) ⇒ log(ctx, ex) @@ -130,6 +142,7 @@ import akka.actor.typed.scaladsl.Behaviors override protected def wrap(nextBehavior: Behavior[T], afterException: Boolean): Supervisor[T, Thr] = new Stopper[T, Thr](nextBehavior, loggingEnabled) + } /** @@ -139,6 +152,10 @@ import akka.actor.typed.scaladsl.Behaviors initialBehavior: Behavior[T], override val behavior: Behavior[T], override val loggingEnabled: Boolean) extends Supervisor[T, Thr] { + override def init(ctx: ActorContext[T]) = + // no handling of errors for Restart as that could lead to infinite restart-loop + wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), afterException = false) + override def handleException(ctx: ActorContext[T], startedBehavior: Behavior[T]): Catcher[Supervisor[T, Thr]] = { case NonFatal(ex: Thr) ⇒ log(ctx, ex) @@ -158,6 +175,17 @@ import akka.actor.typed.scaladsl.Behaviors override def loggingEnabled: Boolean = strategy.loggingEnabled + override def init(ctx: ActorContext[T]) = + try { + wrap(Behavior.validateAsInitial(Behavior.undefer(behavior, ctx)), afterException = false) + } catch { + case NonFatal(ex: Thr) ⇒ + log(ctx, ex) + // we haven't actually wrapped and increased retries yet, so need to compare with +1 + if (deadlineHasTimeLeft && (retries + 1) >= strategy.maxNrOfRetries) throw ex + else wrap(initialBehavior, afterException = true).init(ctx) + } + private def deadlineHasTimeLeft: Boolean = deadline match { case OptionVal.None ⇒ true case OptionVal.Some(d) ⇒ d.hasTimeLeft @@ -222,6 +250,18 @@ import akka.actor.typed.scaladsl.Behaviors override def loggingEnabled: Boolean = strategy.loggingEnabled + def init(ctx: ActorContext[Any]): Supervisor[Any, Thr] = + try { + val startedBehavior = Behavior.validateAsInitial(Behavior.undefer(initialBehavior, ctx)) + new BackoffRestarter(initialBehavior, startedBehavior, strategy, restartCount, blackhole) + } catch { + case NonFatal(ex: Thr) ⇒ + log(ctx, ex) + val restartDelay = calculateDelay(restartCount, strategy.minBackoff, strategy.maxBackoff, strategy.randomFactor) + ctx.asScala.schedule(restartDelay, ctx.asScala.self, ScheduledRestart) + new BackoffRestarter[T, Thr](initialBehavior, initialBehavior, strategy, restartCount + 1, blackhole = true) + } + override def receiveSignal(ctx: ActorContext[Any], signal: Signal): Behavior[Any] = { if (blackhole) { import scaladsl.adapter._ @@ -236,9 +276,8 @@ import akka.actor.typed.scaladsl.Behaviors msg match { case ScheduledRestart ⇒ // actual restart after scheduled backoff delay - val restartedBehavior = Supervisor.initialUndefer(ctx, initialBehavior) ctx.asScala.schedule(strategy.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount)) - new BackoffRestarter[T, Thr](initialBehavior, restartedBehavior, strategy, restartCount, blackhole = false) + new BackoffRestarter[T, Thr](initialBehavior, initialBehavior, strategy, restartCount, blackhole = false).init(ctx) case ResetRestartCount(current) ⇒ if (current == restartCount) new BackoffRestarter[T, Thr](initialBehavior, behavior, strategy, restartCount = 0, blackhole) diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestEventListener.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestEventListener.scala new file mode 100644 index 0000000000..e69de29bb2 diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala index 1d3729cc2e..ff720fb57a 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala @@ -10,6 +10,12 @@ import com.typesafe.config.Config import scala.concurrent.duration._ import scala.concurrent.{ Await, TimeoutException } +import scala.util.control.NoStackTrace + +/** + * Exception without stack trace to use for verifying exceptions in tests + */ +case class TE(message: String) extends RuntimeException(message) with NoStackTrace object TestKit {