diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala index 521ea8e1f7..ae235ac90d 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala @@ -39,7 +39,7 @@ object ActorSpecMessages { case class StopRef[T](ref: ActorRef[T]) extends Command - case class GotSignal(signal: Signal) extends Event + case class ReceivedSignal(signal: Signal) extends Event case class GotChildSignal(signal: Signal) extends Event @@ -130,7 +130,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( throw new TestException("Boom") } receiveSignal { case (_, signal) ⇒ - probe.ref ! GotSignal(signal) + probe.ref ! ReceivedSignal(signal) Behaviors.same }).decorate @@ -139,7 +139,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( EventFilter[TestException](occurrences = 1).intercept { actor ! Fail } - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) } "signal post stop after voluntary termination" in { @@ -150,13 +150,13 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( case (_, Stop) ⇒ Behaviors.stopped } receiveSignal { case (_, signal) ⇒ - probe.ref ! GotSignal(signal) + probe.ref ! ReceivedSignal(signal) Behaviors.same }).decorate val actor = spawn(behavior) actor ! Stop - probe.expectMessage(GotSignal(PostStop)) + probe.expectMessage(ReceivedSignal(PostStop)) } "restart and stop a child actor" in { @@ -186,7 +186,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( Behavior.same } receiveSignal { case (_, signal) ⇒ - probe.ref ! GotSignal(signal) + probe.ref ! ReceivedSignal(signal) Behavior.stopped }).decorate }) @@ -217,7 +217,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( Behaviors.same } receiveSignal { case (_, signal) ⇒ - probe.ref ! GotSignal(signal) + probe.ref ! ReceivedSignal(signal) Behavior.stopped } }).decorate @@ -285,7 +285,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( throw new TestException("boom") } receiveSignal { case (_, PostStop) ⇒ - probe.ref ! GotSignal(PostStop) + probe.ref ! ReceivedSignal(PostStop) Behavior.same }).decorate val actorToWatch = spawn(behavior) @@ -297,7 +297,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( Behavior.same } receiveSignal { case (_, signal) ⇒ - probe.ref ! GotSignal(signal) + probe.ref ! ReceivedSignal(signal) Behavior.same } ).decorate) @@ -308,7 +308,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( EventFilter[TestException](occurrences = 1).intercept { actorToWatch ! Fail } - probe.expectMessage(GotSignal(PostStop)) + probe.expectMessage(ReceivedSignal(PostStop)) probe.expectTerminated(actorToWatch, timeout.duration) } @@ -352,7 +352,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( Behaviors.same } receiveSignal { case (_, signal) ⇒ - probe.ref ! GotSignal(signal) + probe.ref ! ReceivedSignal(signal) Behaviors.same } }).decorate @@ -379,7 +379,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( Behaviors.same } receiveSignal { case (_, signal) ⇒ - probe.ref ! GotSignal(signal) + probe.ref ! ReceivedSignal(signal) Behaviors.same } }).decorate @@ -414,7 +414,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( Behaviors.same } receiveSignal { case (_, signal) ⇒ - probe.ref ! GotSignal(signal) + probe.ref ! ReceivedSignal(signal) Behaviors.same } }).decorate @@ -451,12 +451,12 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( } receiveSignal { case (_, Terminated(_)) ⇒ Behaviors.unhandled case (_, signal) ⇒ - probe.ref ! GotSignal(signal) + probe.ref ! ReceivedSignal(signal) Behaviors.same } } receiveSignal { case (_, signal) ⇒ - probe.ref ! GotSignal(signal) + probe.ref ! ReceivedSignal(signal) Behaviors.same } }).decorate @@ -467,7 +467,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit( EventFilter[DeathPactException](occurrences = 1).intercept { childRef ! Stop probe.expectMessage(GotChildSignal(PostStop)) - probe.expectMessage(GotSignal(PostStop)) + probe.expectMessage(ReceivedSignal(PostStop)) probe.expectTerminated(actor, timeout.duration) } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala index 51dbc7801b..4c2e5cd069 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala @@ -48,7 +48,7 @@ object BehaviorSpec { case object Stop extends Command sealed trait Event - case class GotSignal(signal: Signal) extends Event + case class ReceivedSignal(signal: Signal) extends Event case class Self(self: ActorRef[Command]) extends Event case object Missed extends Event case object Ignored extends Event @@ -100,7 +100,7 @@ object BehaviorSpec { implicit class Check(val setup: Setup) { def check(signal: Signal): Setup = { setup.testKit.signal(signal) - setup.inbox.receiveAll() should ===(GotSignal(signal) :: Nil) + setup.inbox.receiveAll() should ===(ReceivedSignal(signal) :: Nil) checkAux(signal, setup.aux) setup } @@ -166,7 +166,7 @@ object BehaviorSpec { case (_, _) ⇒ SBehaviors.unhandled } receiveSignal { case (_, signal) ⇒ - monitor ! GotSignal(signal) + monitor ! ReceivedSignal(signal) SBehaviors.same } } @@ -372,7 +372,7 @@ class ReceiveBehaviorSpec extends Messages with BecomeWithLifecycle with Stoppab case (_, _: AuxPing) ⇒ SBehaviors.unhandled } receiveSignal { case (_, signal) ⇒ - monitor ! GotSignal(signal) + monitor ! ReceivedSignal(signal) SBehaviors.same } } @@ -409,7 +409,7 @@ class ImmutableWithSignalScalaBehaviorSpec extends Messages with BecomeWithLifec } } receiveSignal { case (_, sig) ⇒ - monitor ! GotSignal(sig) + monitor ! ReceivedSignal(sig) SBehaviors.same } } @@ -559,7 +559,7 @@ class ImmutableWithSignalJavaBehaviorSpec extends Messages with BecomeWithLifecy case _: AuxPing ⇒ SBehaviors.unhandled }), fs((_, sig) ⇒ { - monitor ! GotSignal(sig) + monitor ! ReceivedSignal(sig) SBehaviors.same })) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 9afedb1e25..b4081943d8 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -33,7 +33,7 @@ object SupervisionSpec { sealed trait Event final case class Pong(n: Int) extends Event - final case class GotSignal(signal: Signal) extends Event + final case class ReceivedSignal(signal: Signal) extends Event final case class State(n: Int, children: Map[String, ActorRef[Command]]) extends Event case object Started extends Event case object StartFailed extends Event @@ -67,7 +67,7 @@ object SupervisionSpec { case (_, sig) ⇒ if (sig == PostStop) slowStop.foreach(latch ⇒ latch.await(10, TimeUnit.SECONDS)) - monitor ! GotSignal(sig) + monitor ! ReceivedSignal(sig) Behaviors.same } @@ -102,7 +102,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { intercept[Exc3] { testkit.run(Throw(new Exc3)) } - inbox.receiveMessage() should ===(GotSignal(PostStop)) + inbox.receiveMessage() should ===(ReceivedSignal(PostStop)) } "stop when unhandled exception" in { @@ -112,7 +112,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { intercept[Exc3] { testkit.run(Throw(new Exc3)) } - inbox.receiveMessage() should ===(GotSignal(PostStop)) + inbox.receiveMessage() should ===(ReceivedSignal(PostStop)) } "restart when handled exception" in { @@ -124,7 +124,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { inbox.receiveMessage() should ===(State(1, Map.empty)) testkit.run(Throw(new Exc2)) - inbox.receiveMessage() should ===(GotSignal(PreRestart)) + inbox.receiveMessage() should ===(ReceivedSignal(PreRestart)) testkit.run(GetState) inbox.receiveMessage() should ===(State(0, Map.empty)) } @@ -162,7 +162,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { // restart testkit.run(Throw(new Exc3)) - inbox.receiveMessage() should ===(GotSignal(PreRestart)) + inbox.receiveMessage() should ===(ReceivedSignal(PreRestart)) testkit.run(GetState) inbox.receiveMessage() should ===(State(0, Map.empty)) @@ -170,7 +170,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { intercept[Exc1] { testkit.run(Throw(new Exc1)) } - inbox.receiveMessage() should ===(GotSignal(PostStop)) + inbox.receiveMessage() should ===(ReceivedSignal(PostStop)) } "not catch fatal error" in { @@ -190,13 +190,13 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](strategy) val testkit = BehaviorTestKit(behv) testkit.run(Throw(new Exc1)) - inbox.receiveMessage() should ===(GotSignal(PreRestart)) + inbox.receiveMessage() should ===(ReceivedSignal(PreRestart)) testkit.run(Throw(new Exc1)) - inbox.receiveMessage() should ===(GotSignal(PreRestart)) + inbox.receiveMessage() should ===(ReceivedSignal(PreRestart)) intercept[Exc1] { testkit.run(Throw(new Exc1)) } - inbox.receiveMessage() should ===(GotSignal(PostStop)) + inbox.receiveMessage() should ===(ReceivedSignal(PostStop)) } "reset retry limit after withinTimeRange" in { @@ -206,19 +206,19 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](strategy) val testkit = BehaviorTestKit(behv) testkit.run(Throw(new Exc1)) - inbox.receiveMessage() should ===(GotSignal(PreRestart)) + inbox.receiveMessage() should ===(ReceivedSignal(PreRestart)) testkit.run(Throw(new Exc1)) - inbox.receiveMessage() should ===(GotSignal(PreRestart)) + inbox.receiveMessage() should ===(ReceivedSignal(PreRestart)) Thread.sleep((2.seconds + 100.millis).toMillis) testkit.run(Throw(new Exc1)) - inbox.receiveMessage() should ===(GotSignal(PreRestart)) + inbox.receiveMessage() should ===(ReceivedSignal(PreRestart)) testkit.run(Throw(new Exc1)) - inbox.receiveMessage() should ===(GotSignal(PreRestart)) + inbox.receiveMessage() should ===(ReceivedSignal(PreRestart)) intercept[Exc1] { testkit.run(Throw(new Exc1)) } - inbox.receiveMessage() should ===(GotSignal(PostStop)) + inbox.receiveMessage() should ===(ReceivedSignal(PostStop)) } "stop at first exception when restart retries limit is 0" in { @@ -230,7 +230,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers { intercept[Exc1] { testkit.run(Throw(new Exc1)) } - inbox.receiveMessage() should ===(GotSignal(PostStop)) + inbox.receiveMessage() should ===(ReceivedSignal(PostStop)) } "create underlying deferred behavior immediately" in { @@ -317,7 +317,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( val ref = spawn(behv) EventFilter[Exc3](occurrences = 1).intercept { ref ! Throw(new Exc3) - probe.expectMessage(GotSignal(PostStop)) + probe.expectMessage(ReceivedSignal(PostStop)) } } @@ -345,12 +345,12 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[IOException](occurrences = 1).intercept { ref ! Throw(new IOException()) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) } EventFilter[IllegalArgumentException](occurrences = 1).intercept { ref ! Throw(new IllegalArgumentException("cat")) - probe.expectMessage(GotSignal(PostStop)) + probe.expectMessage(ReceivedSignal(PostStop)) } } @@ -366,7 +366,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exception](occurrences = 1).intercept { ref ! Throw(new IOException()) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) } // verify that it's still alive and not stopped, IllegalStateException would stop it ref ! Ping(1) @@ -374,7 +374,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[IllegalArgumentException](occurrences = 1).intercept { ref ! Throw(new IllegalArgumentException("cat")) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) } // verify that it's still alive and not stopped, IllegalStateException would stop it @@ -394,7 +394,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exception](occurrences = 1).intercept { ref ! Throw(new IOException()) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) } // verify that it's still alive and not stopped, IllegalStateException would stop it ref ! Ping(1) @@ -402,7 +402,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[IllegalArgumentException](occurrences = 1).intercept { ref ! Throw(new IllegalArgumentException("cat")) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) } // verify that it's still alive and not stopped, IllegalStateException would stop it @@ -416,7 +416,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( val ref = spawn(behv) EventFilter[Exc3](occurrences = 1).intercept { ref ! Throw(new Exc3) - probe.expectMessage(GotSignal(PostStop)) + probe.expectMessage(ReceivedSignal(PostStop)) } } @@ -427,7 +427,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( val ref = spawn(behv) EventFilter[Exc3](occurrences = 1).intercept { ref ! Throw(new Exc3) - probe.expectMessage(GotSignal(PostStop)) + probe.expectMessage(ReceivedSignal(PostStop)) } } @@ -442,7 +442,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc2](occurrences = 1).intercept { ref ! Throw(new Exc2) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) } ref ! GetState probe.expectMessage(State(0, Map.empty)) @@ -460,11 +460,11 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc2](occurrences = 3).intercept { ref ! Throw(new Exc2) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) ref ! Throw(new Exc2) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) ref ! Throw(new Exc2) - probe.expectMessage(GotSignal(PostStop)) + probe.expectMessage(ReceivedSignal(PostStop)) } EventFilter.warning(start = "received dead letter", occurrences = 1).intercept { ref ! GetState @@ -484,14 +484,14 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc2](occurrences = 3).intercept { ref ! Throw(new Exc2) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) ref ! Throw(new Exc2) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) probe.expectNoMessage(resetTimeout + 50.millis) ref ! Throw(new Exc2) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) } ref ! GetState probe.expectMessage(State(0, Map.empty)) @@ -525,7 +525,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc1](occurrences = 1).intercept { ref ! Throw(new Exc1) - parentProbe.expectMessage(GotSignal(PreRestart)) + parentProbe.expectMessage(ReceivedSignal(PreRestart)) ref ! GetState anotherProbe.stop() } @@ -534,11 +534,11 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( parentProbe.expectNoMessage() slowStop.countDown() - childProbe.expectMessage(GotSignal(PostStop)) - childProbe.expectMessage(GotSignal(PostStop)) + childProbe.expectMessage(ReceivedSignal(PostStop)) + childProbe.expectMessage(ReceivedSignal(PostStop)) parentProbe.expectMessageType[State].children.keySet should ===(Set.empty) // anotherProbe was stopped, Terminated signal stashed and delivered to new behavior - parentProbe.expectMessage(GotSignal(Terminated(anotherProbe.ref))) + parentProbe.expectMessage(ReceivedSignal(Terminated(anotherProbe.ref))) } "optionally NOT stop children when restarting" in { @@ -564,7 +564,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc1](occurrences = 1).intercept { ref ! Throw(new Exc1) - parentProbe.expectMessage(GotSignal(PreRestart)) + parentProbe.expectMessage(ReceivedSignal(PreRestart)) ref ! GetState } parentProbe.expectMessageType[State].children.keySet should contain(childName) @@ -597,7 +597,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc1](occurrences = 1).intercept { ref ! Throw(new Exc1) - parentProbe.expectMessage(GotSignal(PreRestart)) + parentProbe.expectMessage(ReceivedSignal(PreRestart)) ref ! GetState ref ! CreateChild(targetBehavior(childProbe.ref), child2Name) ref ! GetState @@ -606,13 +606,14 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc1](occurrences = 1).intercept { slowStop.countDown() - childProbe.expectMessage(GotSignal(PostStop)) // child1 + childProbe.expectMessage(ReceivedSignal(PostStop)) // child1 parentProbe.expectMessageType[State].children.keySet should ===(Set.empty) parentProbe.expectMessageType[State].children.keySet should ===(Set(child2Name)) // the stashed Throw is causing another restart and stop of child2 - childProbe.expectMessage(GotSignal(PostStop)) // child2 + childProbe.expectMessage(ReceivedSignal(PostStop)) // child2 } + parentProbe.expectMessage(ReceivedSignal(PreRestart)) ref ! GetState parentProbe.expectMessageType[State].children.keySet should ===(Set.empty) } @@ -650,10 +651,10 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[TestException](occurrences = 1).intercept { val ref = spawn(behv) slowStop1.countDown() - child1Probe.expectMessage(GotSignal(PostStop)) + child1Probe.expectMessage(ReceivedSignal(PostStop)) throwFromSetup.set(false) slowStop2.countDown() - child2Probe.expectMessage(GotSignal(PostStop)) + child2Probe.expectMessage(ReceivedSignal(PostStop)) ref ! GetState parentProbe.expectMessageType[State].children.keySet should ===(Set("child1")) @@ -699,16 +700,16 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc1](occurrences = 1).intercept { ref ! Throw(new Exc1) - parentProbe.expectMessage(GotSignal(PreRestart)) + parentProbe.expectMessage(ReceivedSignal(PreRestart)) } EventFilter[TestException](occurrences = 1).intercept { slowStop1.countDown() - child1Probe.expectMessage(GotSignal(PostStop)) - child1Probe.expectMessage(GotSignal(PostStop)) + child1Probe.expectMessage(ReceivedSignal(PostStop)) + child1Probe.expectMessage(ReceivedSignal(PostStop)) throwFromSetup.set(false) slowStop2.countDown() - child2Probe.expectMessage(GotSignal(PostStop)) + child2Probe.expectMessage(ReceivedSignal(PostStop)) } ref ! GetState @@ -752,7 +753,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( // restart EventFilter[Exc3](occurrences = 1).intercept { ref ! Throw(new Exc3) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) ref ! GetState probe.expectMessage(State(0, Map.empty)) } @@ -760,7 +761,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( // stop EventFilter[Exc1](occurrences = 1).intercept { ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PostStop)) + probe.expectMessage(ReceivedSignal(PostStop)) } } @@ -781,7 +782,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc1](occurrences = 1).intercept { startedProbe.expectMessage(Started) ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) } ref ! Ping(1) ref ! Ping(2) @@ -811,7 +812,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( startedProbe.expectMessage(Started) ref ! IncrementState ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) ref ! Ping(1) // dropped due to backoff, no stashing } @@ -825,7 +826,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc1](occurrences = 1).intercept { ref ! IncrementState ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) ref ! Ping(2) // dropped due to backoff, no stashing } @@ -878,7 +879,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( EventFilter[Exc1](occurrences = 1).intercept { ref ! IncrementState ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) ref ! Ping(1) // dropped due to backoff, no stash } @@ -891,7 +892,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit( probe.expectNoMessage(strategy.resetBackoffAfter + 100.millis.dilated) ref ! IncrementState ref ! Throw(new Exc1) - probe.expectMessage(GotSignal(PreRestart)) + probe.expectMessage(ReceivedSignal(PreRestart)) ref ! Ping(2) // dropped due to backoff } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala index 4675a77953..d80c681499 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StashSpec.scala @@ -5,11 +5,18 @@ package akka.actor.typed package scaladsl +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration._ + +import akka.actor.testkit.typed.TestException import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.testkit.EventFilter import org.scalatest.WordSpecLike -object StashSpec { +object AbstractStashSpec { sealed trait Command final case class Msg(s: String) extends Command final case class Unstashed(cmd: Command) extends Command @@ -25,7 +32,7 @@ object StashSpec { val buffer = StashBuffer[Command](capacity = 10) def active(processed: Vector[String]): Behavior[Command] = - Behaviors.receive { (context, cmd) ⇒ + Behaviors.receive { (_, cmd) ⇒ cmd match { case message: Msg ⇒ active(processed :+ message.s) @@ -176,20 +183,20 @@ object StashSpec { } -class ImmutableStashSpec extends StashSpec { - import StashSpec._ +class ImmutableStashSpec extends AbstractStashSpec { + import AbstractStashSpec._ def testQualifier: String = "immutable behavior" def behaviorUnderTest: Behavior[Command] = immutableStash } -class MutableStashSpec extends StashSpec { - import StashSpec._ +class MutableStashSpec extends AbstractStashSpec { + import AbstractStashSpec._ def testQualifier: String = "mutable behavior" def behaviorUnderTest: Behavior[Command] = Behaviors.setup(context ⇒ new MutableStash(context)) } -abstract class StashSpec extends ScalaTestWithActorTestKit with WordSpecLike { - import StashSpec._ +abstract class AbstractStashSpec extends ScalaTestWithActorTestKit with WordSpecLike { + import AbstractStashSpec._ def testQualifier: String def behaviorUnderTest: Behavior[Command] @@ -242,3 +249,340 @@ abstract class StashSpec extends ScalaTestWithActorTestKit with WordSpecLike { } } + +class UnstashingSpec extends ScalaTestWithActorTestKit(""" + akka.loggers = ["akka.testkit.TestEventListener"] + """) with WordSpecLike { + + // needed for EventFilter + private implicit val untypedSys: akka.actor.ActorSystem = { + import akka.actor.typed.scaladsl.adapter._ + system.toUntyped + } + + private def slowStoppingChild(latch: CountDownLatch): Behavior[String] = + Behaviors.receiveSignal { + case (_, PostStop) ⇒ + latch.await(10, TimeUnit.SECONDS) + Behaviors.same + } + + private def stashingBehavior( + probe: ActorRef[String], + withSlowStoppingChild: Option[CountDownLatch] = None) = { + Behaviors.setup[String] { ctx ⇒ + + withSlowStoppingChild.foreach(latch ⇒ ctx.spawnAnonymous(slowStoppingChild(latch))) + + val stash = StashBuffer[String](10) + + def unstashing(n: Int): Behavior[String] = + Behaviors.receiveMessage[String] { + case "stash" ⇒ + probe.ref ! s"unstashing-$n" + unstashing(n + 1) + case "stash-fail" ⇒ + probe.ref ! s"stash-fail-$n" + throw TestException("unstash-fail") + case "get-current" ⇒ + probe.ref ! s"current-$n" + Behaviors.same + case "get-stash-size" ⇒ + probe.ref ! s"stash-size-${stash.size}" + Behaviors.same + case "unstash" ⇒ + // when testing resume + stash.unstashAll(ctx, unstashing(n)) + }.receiveSignal { + case (_, PreRestart) ⇒ + probe.ref ! s"pre-restart-$n" + Behaviors.same + case (_, PostStop) ⇒ + probe.ref ! s"post-stop-$n" + Behaviors.same + } + + Behaviors.receiveMessage[String] { + case msg if msg.startsWith("stash") ⇒ + stash.stash(msg) + Behavior.same + case "unstash" ⇒ + stash.unstashAll(ctx, unstashing(0)) + case "get-current" ⇒ + probe.ref ! s"current-00" + Behaviors.same + case "get-stash-size" ⇒ + probe.ref ! s"stash-size-${stash.size}" + Behaviors.same + } + } + } + + "Unstashing" must { + + "work with initial Behaviors.same" in { + // FIXME #26148 unstashAll doesn't support Behavior.same + pending + + val probe = TestProbe[String]() + // unstashing is inside setup + val ref = spawn(Behaviors.receive[String] { + case (ctx, "unstash") ⇒ + val stash = StashBuffer[String](10) + stash.stash("one") + stash.unstashAll(ctx, Behavior.same) + + case (_, msg) ⇒ + probe.ref ! msg + Behaviors.same + }) + + ref ! "unstash" + probe.expectMessage("one") + } + + "work with intermediate Behaviors.same" in { + val probe = TestProbe[String]() + // unstashing is inside setup + val ref = spawn(Behaviors.receivePartial[String] { + case (ctx, "unstash") ⇒ + val stash = StashBuffer[String](10) + stash.stash("one") + stash.stash("two") + stash.unstashAll(ctx, Behaviors.receiveMessage { msg ⇒ + probe.ref ! msg + Behaviors.same + }) + }) + + ref ! "unstash" + probe.expectMessage("one") + probe.expectMessage("two") + ref ! "three" + probe.expectMessage("three") + } + + "work with supervised initial Behaviors.same" in { + // FIXME #26148 unstashAll doesn't support Behavior.same + pending + + val probe = TestProbe[String]() + // unstashing is inside setup + val ref = spawn(Behaviors.supervise(Behaviors.receivePartial[String] { + case (ctx, "unstash") ⇒ + val stash = StashBuffer[String](10) + stash.stash("one") + stash.unstashAll(ctx, Behavior.same) + + case (_, msg) ⇒ + probe.ref ! msg + Behaviors.same + }).onFailure[TestException](SupervisorStrategy.stop)) + + ref ! "unstash" + probe.expectMessage("one") + ref ! "two" + probe.expectMessage("two") + } + + "work with supervised intermediate Behaviors.same" in { + val probe = TestProbe[String]() + // unstashing is inside setup + val ref = spawn(Behaviors.supervise(Behaviors.receivePartial[String] { + case (ctx, "unstash") ⇒ + val stash = StashBuffer[String](10) + stash.stash("one") + stash.stash("two") + stash.unstashAll(ctx, Behaviors.receiveMessage { msg ⇒ + probe.ref ! msg + Behaviors.same + }) + }).onFailure[TestException](SupervisorStrategy.stop)) + + ref ! "unstash" + probe.expectMessage("one") + probe.expectMessage("two") + ref ! "three" + probe.expectMessage("three") + } + + def testPostStop( + probe: TestProbe[String], + ref: ActorRef[String] + ): Unit = { + ref ! "stash" + ref ! "stash" + ref ! "stash-fail" + ref ! "stash" + EventFilter[TestException](start = "unstash-fail", occurrences = 1) + .intercept { + ref ! "unstash" + probe.expectMessage("unstashing-0") + probe.expectMessage("unstashing-1") + probe.expectMessage("stash-fail-2") + probe.expectMessage("post-stop-2") + } + } + + "signal PostStop to the latest unstashed behavior on failure" in { + val probe = TestProbe[String]() + val ref = spawn(stashingBehavior(probe.ref)) + testPostStop(probe, ref) + } + + "signal PostStop to the latest unstashed behavior on failure with stop supervision" in { + val probe = TestProbe[String]() + val ref = + spawn(Behaviors.supervise(stashingBehavior(probe.ref)) + .onFailure[TestException](SupervisorStrategy.stop)) + testPostStop(probe, ref) + } + + def testPreRestart( + probe: TestProbe[String], + childLatch: Option[CountDownLatch], + ref: ActorRef[String] + ): Unit = { + ref ! "stash" + ref ! "stash" + ref ! "stash-fail" + ref ! "stash" + EventFilter[TestException]( + start = "Supervisor RestartSupervisor saw failure: unstash-fail", + occurrences = 1 + ).intercept { + ref ! "unstash" + // when childLatch is defined this be stashed in the internal stash of the RestartSupervisor + // because it's waiting for child to stop + ref ! "get-current" + + probe.expectMessage("unstashing-0") + probe.expectMessage("unstashing-1") + probe.expectMessage("stash-fail-2") + probe.expectMessage("pre-restart-2") + + childLatch.foreach(_.countDown()) + probe.expectMessage("current-00") + + ref ! "get-stash-size" + probe.expectMessage("stash-size-0") + } + } + + "signal PreRestart to the latest unstashed behavior on failure with restart supervision" in { + val probe = TestProbe[String]() + val ref = + spawn(Behaviors.supervise(stashingBehavior(probe.ref)) + .onFailure[TestException](SupervisorStrategy.restart)) + + testPreRestart(probe, None, ref) + // one more time to ensure that the restart strategy is kept + testPreRestart(probe, None, ref) + } + + "signal PreRestart to the latest unstashed behavior on failure with restart supervision and slow stopping child" in { + val probe = TestProbe[String]() + val childLatch = new CountDownLatch(1) + val ref = + spawn(Behaviors.supervise(stashingBehavior(probe.ref, Some(childLatch))) + .onFailure[TestException](SupervisorStrategy.restart)) + + testPreRestart(probe, Some(childLatch), ref) + } + + "signal PreRestart to the latest unstashed behavior on failure with backoff supervision" in { + val probe = TestProbe[String]() + val ref = + spawn(Behaviors.supervise(stashingBehavior(probe.ref)) + .onFailure[TestException](SupervisorStrategy.restartWithBackoff(100.millis, 100.millis, 0.0))) + + testPreRestart(probe, None, ref) + + // one more time to ensure that the backoff strategy is kept + testPreRestart(probe, None, ref) + } + + "signal PreRestart to the latest unstashed behavior on failure with backoff supervision and slow stopping child" in { + val probe = TestProbe[String]() + val childLatch = new CountDownLatch(1) + val ref = + spawn(Behaviors.supervise(stashingBehavior(probe.ref, Some(childLatch))) + .onFailure[TestException](SupervisorStrategy.restartWithBackoff(100.millis, 100.millis, 0.0))) + + testPreRestart(probe, Some(childLatch), ref) + } + + "handle resume correctly on failure unstashing" in { + val probe = TestProbe[String]() + val ref = + spawn(Behaviors.supervise(stashingBehavior(probe.ref)) + .onFailure[TestException](SupervisorStrategy.resume)) + + ref ! "stash" + ref ! "stash" + ref ! "stash-fail" + ref ! "stash" + ref ! "stash" + ref ! "stash" + ref ! "stash-fail" + ref ! "stash" + EventFilter[TestException](start = "Supervisor ResumeSupervisor saw failure: unstash-fail", occurrences = 1).intercept { + ref ! "unstash" + ref ! "get-current" + + probe.expectMessage("unstashing-0") + probe.expectMessage("unstashing-1") + probe.expectMessage("stash-fail-2") + probe.expectMessage("current-2") + ref ! "get-stash-size" + probe.expectMessage("stash-size-5") + } + + ref ! "unstash" + ref ! "get-current" + probe.expectMessage("unstashing-2") + probe.expectMessage("unstashing-3") + probe.expectMessage("unstashing-4") + probe.expectMessage("stash-fail-5") + probe.expectMessage("current-5") + ref ! "get-stash-size" + probe.expectMessage("stash-size-1") + + ref ! "unstash" + ref ! "get-current" + probe.expectMessage("unstashing-5") + probe.expectMessage("current-6") + + ref ! "get-stash-size" + probe.expectMessage("stash-size-0") + } + + "be possible in combination with setup" in { + val probe = TestProbe[String]() + val ref = spawn(Behaviors.setup[String] { _ ⇒ + val stash = StashBuffer[String](10) + stash.stash("one") + + // FIXME #26148 using AbstractBehavior because unstashAll doesn't support Behavior.same + + // unstashing is inside setup + new AbstractBehavior[String] { + override def onMessage(msg: String): Behavior[String] = msg match { + case "unstash" ⇒ + Behaviors.setup[String] { ctx ⇒ + stash.unstashAll(ctx, this) + } + case _ ⇒ + probe.ref ! msg + Behavior.same + } + } + }) + + ref ! "unstash" + probe.expectMessage("one") + } + + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 6455982a83..69c6ec64fc 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -424,27 +424,5 @@ object Behavior { } } - /** - * INTERNAL API - * - * Execute the behavior with the given messages (or signals). - * The returned [[Behavior]] from each processed message is used for the next message. - */ - @InternalApi private[akka] def interpretMessages[T](behavior: Behavior[T], ctx: TypedActorContext[T], messages: Iterator[T]): Behavior[T] = { - @tailrec def interpretOne(b: Behavior[T]): Behavior[T] = { - val b2 = Behavior.start(b, ctx) - if (!Behavior.isAlive(b2) || !messages.hasNext) b2 - else { - val nextB = messages.next() match { - case sig: Signal ⇒ Behavior.interpretSignal(b2, ctx, sig) - case msg ⇒ Behavior.interpretMessage(b2, ctx, msg) - } - interpretOne(Behavior.canonicalize(nextB, b, ctx)) // recursive - } - } - - interpretOne(Behavior.start(behavior, ctx)) - } - } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala index 3ed3a0c02b..82d63e07c0 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/StashBufferImpl.scala @@ -7,7 +7,12 @@ package akka.actor.typed.internal import java.util.function.Consumer import java.util.function.{ Function ⇒ JFunction } +import scala.annotation.tailrec +import scala.util.control.NonFatal + import akka.actor.typed.Behavior +import akka.actor.typed.Signal +import akka.actor.typed.TypedActorContext import akka.actor.typed.javadsl import akka.actor.typed.scaladsl import akka.annotation.InternalApi @@ -86,7 +91,7 @@ import akka.util.ConstantFun } } - override def forEach(f: Consumer[T]): Unit = foreach(f.accept(_)) + override def forEach(f: Consumer[T]): Unit = foreach(f.accept) override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] = unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T]) @@ -96,11 +101,36 @@ import akka.util.ConstantFun override def unstash(ctx: scaladsl.ActorContext[T], behavior: Behavior[T], numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] = { - val iter = new Iterator[T] { - override def hasNext: Boolean = StashBufferImpl.this.nonEmpty - override def next(): T = wrap(StashBufferImpl.this.dropHead()) - }.take(numberOfMessages) - Behavior.interpretMessages[T](behavior, ctx, iter) + if (isEmpty) + behavior // optimization + else { + val iter = new Iterator[T] { + override def hasNext: Boolean = StashBufferImpl.this.nonEmpty + override def next(): T = wrap(StashBufferImpl.this.dropHead()) + }.take(numberOfMessages) + interpretUnstashedMessages(behavior, ctx, iter) + } + } + + private def interpretUnstashedMessages(behavior: Behavior[T], ctx: TypedActorContext[T], messages: Iterator[T]): Behavior[T] = { + @tailrec def interpretOne(b: Behavior[T]): Behavior[T] = { + val b2 = Behavior.start(b, ctx) + if (!Behavior.isAlive(b2) || !messages.hasNext) b2 + else { + val nextB = try { + messages.next() match { + case sig: Signal ⇒ Behavior.interpretSignal(b2, ctx, sig) + case msg ⇒ Behavior.interpretMessage(b2, ctx, msg) + } + } catch { + case NonFatal(e) ⇒ throw UnstashException(e, b2) + } + + interpretOne(Behavior.canonicalize(nextB, b2, ctx)) // recursive + } + } + + interpretOne(Behavior.start(behavior, ctx)) } override def unstash(ctx: javadsl.ActorContext[T], behavior: Behavior[T], @@ -111,3 +141,23 @@ import akka.util.ConstantFun s"StashBuffer($size/$capacity)" } +/** + * INTERNAL API + */ +@InternalApi private[akka] object UnstashException { + def unwrap(t: Throwable): Throwable = t match { + case UnstashException(e, _) ⇒ e + case _ ⇒ t + } + +} + +/** + * INTERNAL API: + * + * When unstashing, the exception is wrapped in UnstashException because supervisor strategy + * and ActorAdapter need the behavior that threw. It will use the behavior in the `UnstashException` + * to emit the PreRestart and PostStop to the right behavior and install the latest behavior for resume strategy. + */ +@InternalApi private[akka] final case class UnstashException[T](cause: Throwable, behavior: Behavior[T]) + extends RuntimeException(s"[$cause] when unstashing in [$behavior]", cause) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index 9e2d70ba9c..acf9a2a735 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -49,6 +49,9 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass + protected def isInstanceOfTheThrowableClass(t: Throwable): Boolean = + throwableClass.isAssignableFrom(UnstashException.unwrap(t).getClass) + override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = { other match { case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass ⇒ true @@ -70,7 +73,8 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe def log(ctx: TypedActorContext[_], t: Throwable): Unit = { if (strategy.loggingEnabled) { - ctx.asScala.log.error(t, "Supervisor {} saw failure: {}", this, t.getMessage) + val unwrapped = UnstashException.unwrap(t) + ctx.asScala.log.error(unwrapped, "Supervisor {} saw failure: {}", this, unwrapped.getMessage) } } @@ -98,7 +102,7 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super } protected def handleException(@unused ctx: TypedActorContext[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ + case NonFatal(t) if isInstanceOfTheThrowableClass(t) ⇒ Behavior.failed(t) } @@ -113,8 +117,9 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { + override def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ + case NonFatal(t) if isInstanceOfTheThrowableClass(t) ⇒ log(ctx, t) Behavior.failed(t) } @@ -122,9 +127,12 @@ private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Beh private class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) { override protected def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ + case NonFatal(t) if isInstanceOfTheThrowableClass(t) ⇒ log(ctx, t) - Behaviors.same + t match { + case e: UnstashException[T] @unchecked ⇒ e.behavior + case _ ⇒ Behaviors.same + } } } @@ -239,7 +247,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } override protected def handleExceptionOnStart(ctx: TypedActorContext[O], @unused target: PreStartTarget[T]): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ + case NonFatal(t) if isInstanceOfTheThrowableClass(t) ⇒ strategy match { case _: Restart ⇒ // if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop @@ -255,14 +263,20 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } override protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[T]): Catcher[Behavior[T]] = { - handleException(ctx, () ⇒ target(ctx, PreRestart)) + handleException(ctx, signalRestart = { + case e: UnstashException[O] @unchecked ⇒ Behavior.interpretSignal(e.behavior, ctx, PreRestart) + case _ ⇒ target(ctx, PreRestart) + }) } override protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[T]): Catcher[Behavior[T]] = { - handleException(ctx, () ⇒ target.signalRestart(ctx)) + handleException(ctx, signalRestart = { + case e: UnstashException[O] @unchecked ⇒ Behavior.interpretSignal(e.behavior, ctx, PreRestart) + case _ ⇒ target.signalRestart(ctx) + }) } - private def handleException(ctx: TypedActorContext[O], signalRestart: () ⇒ Unit): Catcher[Behavior[T]] = { - case NonFatal(t: Thr) ⇒ + private def handleException(ctx: TypedActorContext[O], signalRestart: Throwable ⇒ Unit): Catcher[Behavior[T]] = { + case NonFatal(t) if isInstanceOfTheThrowableClass(t) ⇒ if (strategy.maxRestarts != -1 && restartCount >= strategy.maxRestarts && deadlineHasTimeLeft) { strategy match { case _: Restart ⇒ throw t @@ -272,7 +286,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } } else { - try signalRestart() catch { + try signalRestart(t) catch { case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart") } @@ -326,9 +340,10 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav stashBuffer.unstashAll(ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], newBehavior.unsafeCast) } nextBehavior.narrow - } catch handleException(ctx, signalRestart = () ⇒ ()) - // FIXME signal Restart is not done if unstashAll throws, unstash of each message may return a new behavior and - // it's the failing one that should receive the signal + } catch handleException(ctx, signalRestart = { + case e: UnstashException[O] @unchecked ⇒ Behavior.interpretSignal(e.behavior, ctx, PreRestart) + case _ ⇒ () + }) } private def stopChildren(ctx: TypedActorContext[_], children: Set[ActorRef[Nothing]]): Unit = { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index d1ec21aa84..ec07ebf4af 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -10,11 +10,12 @@ import java.lang.reflect.InvocationTargetException import akka.actor.ActorInitializationException import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException - import scala.annotation.tailrec import scala.util.Failure import scala.util.Success import scala.util.Try +import scala.util.control.Exception.Catcher + import akka.{ actor ⇒ untyped } import akka.annotation.InternalApi import akka.util.OptionVal @@ -63,9 +64,9 @@ import akka.util.OptionVal failures -= ref ChildFailed(ActorRefAdapter(ref), ex) } else Terminated(ActorRefAdapter(ref)) - next(Behavior.interpretSignal(behavior, ctx, msg), msg) + handleSignal(msg) case untyped.ReceiveTimeout ⇒ - next(Behavior.interpretMessage(behavior, ctx, ctx.receiveTimeoutMsg), ctx.receiveTimeoutMsg) + handleMessage(ctx.receiveTimeoutMsg) case wrapped: AdaptMessage[Any, T] @unchecked ⇒ withSafelyAdapted(() ⇒ wrapped.adapt()) { case AdaptWithRegisteredMessageAdapter(msg) ⇒ @@ -80,7 +81,27 @@ import akka.util.OptionVal } private def handleMessage(msg: T): Unit = { - next(Behavior.interpretMessage(behavior, ctx, msg), msg) + try { + next(Behavior.interpretMessage(behavior, ctx, msg), msg) + } catch handleUnstashException + } + + private def handleSignal(sig: Signal): Unit = { + try { + next(Behavior.interpretSignal(behavior, ctx, sig), sig) + } catch handleUnstashException + } + + private def handleUnstashException: Catcher[Unit] = { + case e: UnstashException[T] @unchecked ⇒ + behavior = e.behavior + throw e.cause + case TypedActorFailedException(e: UnstashException[T] @unchecked) ⇒ + behavior = e.behavior + throw TypedActorFailedException(e.cause) + case ActorInitializationException(actor, message, e: UnstashException[T] @unchecked) ⇒ + behavior = e.behavior + throw ActorInitializationException(actor, message, e.cause) } private def next(b: Behavior[T], msg: Any): Unit = { @@ -102,7 +123,7 @@ import akka.util.OptionVal context.stop(self) case f: FailedBehavior ⇒ // For the parent untyped supervisor to pick up the exception - throw new TypedActorFailedException(f.cause) + throw TypedActorFailedException(f.cause) case _ ⇒ behavior = Behavior.canonicalize(b, behavior, ctx) } diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala index 621e5a4255..76b3969fdb 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/ReplayingEvents.scala @@ -5,17 +5,17 @@ package akka.persistence.typed.internal import scala.util.control.NonFatal -import scala.util.control.NoStackTrace import akka.actor.typed.Behavior -import akka.actor.typed.internal.PoisonPill import akka.actor.typed.Signal -import akka.actor.typed.scaladsl.{ AbstractBehavior, Behaviors } +import akka.actor.typed.internal.PoisonPill +import akka.actor.typed.internal.UnstashException +import akka.actor.typed.scaladsl.AbstractBehavior +import akka.actor.typed.scaladsl.Behaviors import akka.annotation.InternalApi import akka.event.Logging import akka.persistence.JournalProtocol._ import akka.persistence._ -import akka.persistence.typed.internal.ReplayingEvents.FailureWhileUnstashing import akka.persistence.typed.internal.ReplayingEvents.ReplayingState import akka.persistence.typed.internal.Running.WithSeqNrAccessible @@ -55,8 +55,6 @@ private[akka] object ReplayingEvents { new ReplayingEvents[C, E, S](setup.setMdc(MDC.ReplayingEvents), state) } - private final case class FailureWhileUnstashing(cause: Throwable) extends Exception(cause) with NoStackTrace - } @InternalApi @@ -113,7 +111,9 @@ private[akka] final class ReplayingEvents[C, E, S]( Behaviors.unhandled } } catch { - case FailureWhileUnstashing(ex) ⇒ throw ex + case ex: UnstashException[_] ⇒ + // let supervisor handle it, don't treat it as recovery failure + throw ex case NonFatal(cause) ⇒ onRecoveryFailure(cause, state.seqNr, None) } @@ -190,11 +190,7 @@ private[akka] final class ReplayingEvents[C, E, S]( Running.RunningState[S](state.seqNr, state.state, state.receivedPoisonPill) ) - try { - tryUnstashOne(running) - } catch { - case NonFatal(t) ⇒ throw FailureWhileUnstashing(t) - } + tryUnstashOne(running) } } finally { setup.cancelRecoveryTimer()