* test from other branch * wrap exception from unstash * simplify persistence, use the UnstashException there * incorporate feedback
This commit is contained in:
parent
26b234e9df
commit
5a36b6a537
9 changed files with 547 additions and 142 deletions
|
|
@ -39,7 +39,7 @@ object ActorSpecMessages {
|
|||
|
||||
case class StopRef[T](ref: ActorRef[T]) extends Command
|
||||
|
||||
case class GotSignal(signal: Signal) extends Event
|
||||
case class ReceivedSignal(signal: Signal) extends Event
|
||||
|
||||
case class GotChildSignal(signal: Signal) extends Event
|
||||
|
||||
|
|
@ -130,7 +130,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
throw new TestException("Boom")
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
probe.ref ! GotSignal(signal)
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behaviors.same
|
||||
}).decorate
|
||||
|
||||
|
|
@ -139,7 +139,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
EventFilter[TestException](occurrences = 1).intercept {
|
||||
actor ! Fail
|
||||
}
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
|
||||
"signal post stop after voluntary termination" in {
|
||||
|
|
@ -150,13 +150,13 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
case (_, Stop) ⇒ Behaviors.stopped
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
probe.ref ! GotSignal(signal)
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behaviors.same
|
||||
}).decorate
|
||||
|
||||
val actor = spawn(behavior)
|
||||
actor ! Stop
|
||||
probe.expectMessage(GotSignal(PostStop))
|
||||
probe.expectMessage(ReceivedSignal(PostStop))
|
||||
}
|
||||
|
||||
"restart and stop a child actor" in {
|
||||
|
|
@ -186,7 +186,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
Behavior.same
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
probe.ref ! GotSignal(signal)
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behavior.stopped
|
||||
}).decorate
|
||||
})
|
||||
|
|
@ -217,7 +217,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
Behaviors.same
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
probe.ref ! GotSignal(signal)
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behavior.stopped
|
||||
}
|
||||
}).decorate
|
||||
|
|
@ -285,7 +285,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
throw new TestException("boom")
|
||||
} receiveSignal {
|
||||
case (_, PostStop) ⇒
|
||||
probe.ref ! GotSignal(PostStop)
|
||||
probe.ref ! ReceivedSignal(PostStop)
|
||||
Behavior.same
|
||||
}).decorate
|
||||
val actorToWatch = spawn(behavior)
|
||||
|
|
@ -297,7 +297,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
Behavior.same
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
probe.ref ! GotSignal(signal)
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behavior.same
|
||||
}
|
||||
).decorate)
|
||||
|
|
@ -308,7 +308,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
EventFilter[TestException](occurrences = 1).intercept {
|
||||
actorToWatch ! Fail
|
||||
}
|
||||
probe.expectMessage(GotSignal(PostStop))
|
||||
probe.expectMessage(ReceivedSignal(PostStop))
|
||||
probe.expectTerminated(actorToWatch, timeout.duration)
|
||||
}
|
||||
|
||||
|
|
@ -352,7 +352,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
Behaviors.same
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
probe.ref ! GotSignal(signal)
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behaviors.same
|
||||
}
|
||||
}).decorate
|
||||
|
|
@ -379,7 +379,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
Behaviors.same
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
probe.ref ! GotSignal(signal)
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behaviors.same
|
||||
}
|
||||
}).decorate
|
||||
|
|
@ -414,7 +414,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
Behaviors.same
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
probe.ref ! GotSignal(signal)
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behaviors.same
|
||||
}
|
||||
}).decorate
|
||||
|
|
@ -451,12 +451,12 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
} receiveSignal {
|
||||
case (_, Terminated(_)) ⇒ Behaviors.unhandled
|
||||
case (_, signal) ⇒
|
||||
probe.ref ! GotSignal(signal)
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behaviors.same
|
||||
}
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
probe.ref ! GotSignal(signal)
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behaviors.same
|
||||
}
|
||||
}).decorate
|
||||
|
|
@ -467,7 +467,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(
|
|||
EventFilter[DeathPactException](occurrences = 1).intercept {
|
||||
childRef ! Stop
|
||||
probe.expectMessage(GotChildSignal(PostStop))
|
||||
probe.expectMessage(GotSignal(PostStop))
|
||||
probe.expectMessage(ReceivedSignal(PostStop))
|
||||
probe.expectTerminated(actor, timeout.duration)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ object BehaviorSpec {
|
|||
case object Stop extends Command
|
||||
|
||||
sealed trait Event
|
||||
case class GotSignal(signal: Signal) extends Event
|
||||
case class ReceivedSignal(signal: Signal) extends Event
|
||||
case class Self(self: ActorRef[Command]) extends Event
|
||||
case object Missed extends Event
|
||||
case object Ignored extends Event
|
||||
|
|
@ -100,7 +100,7 @@ object BehaviorSpec {
|
|||
implicit class Check(val setup: Setup) {
|
||||
def check(signal: Signal): Setup = {
|
||||
setup.testKit.signal(signal)
|
||||
setup.inbox.receiveAll() should ===(GotSignal(signal) :: Nil)
|
||||
setup.inbox.receiveAll() should ===(ReceivedSignal(signal) :: Nil)
|
||||
checkAux(signal, setup.aux)
|
||||
setup
|
||||
}
|
||||
|
|
@ -166,7 +166,7 @@ object BehaviorSpec {
|
|||
case (_, _) ⇒ SBehaviors.unhandled
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
monitor ! GotSignal(signal)
|
||||
monitor ! ReceivedSignal(signal)
|
||||
SBehaviors.same
|
||||
}
|
||||
}
|
||||
|
|
@ -372,7 +372,7 @@ class ReceiveBehaviorSpec extends Messages with BecomeWithLifecycle with Stoppab
|
|||
case (_, _: AuxPing) ⇒ SBehaviors.unhandled
|
||||
} receiveSignal {
|
||||
case (_, signal) ⇒
|
||||
monitor ! GotSignal(signal)
|
||||
monitor ! ReceivedSignal(signal)
|
||||
SBehaviors.same
|
||||
}
|
||||
}
|
||||
|
|
@ -409,7 +409,7 @@ class ImmutableWithSignalScalaBehaviorSpec extends Messages with BecomeWithLifec
|
|||
}
|
||||
} receiveSignal {
|
||||
case (_, sig) ⇒
|
||||
monitor ! GotSignal(sig)
|
||||
monitor ! ReceivedSignal(sig)
|
||||
SBehaviors.same
|
||||
}
|
||||
}
|
||||
|
|
@ -559,7 +559,7 @@ class ImmutableWithSignalJavaBehaviorSpec extends Messages with BecomeWithLifecy
|
|||
case _: AuxPing ⇒ SBehaviors.unhandled
|
||||
}),
|
||||
fs((_, sig) ⇒ {
|
||||
monitor ! GotSignal(sig)
|
||||
monitor ! ReceivedSignal(sig)
|
||||
SBehaviors.same
|
||||
}))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ object SupervisionSpec {
|
|||
|
||||
sealed trait Event
|
||||
final case class Pong(n: Int) extends Event
|
||||
final case class GotSignal(signal: Signal) extends Event
|
||||
final case class ReceivedSignal(signal: Signal) extends Event
|
||||
final case class State(n: Int, children: Map[String, ActorRef[Command]]) extends Event
|
||||
case object Started extends Event
|
||||
case object StartFailed extends Event
|
||||
|
|
@ -67,7 +67,7 @@ object SupervisionSpec {
|
|||
case (_, sig) ⇒
|
||||
if (sig == PostStop)
|
||||
slowStop.foreach(latch ⇒ latch.await(10, TimeUnit.SECONDS))
|
||||
monitor ! GotSignal(sig)
|
||||
monitor ! ReceivedSignal(sig)
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
|
|
@ -102,7 +102,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
|
|||
intercept[Exc3] {
|
||||
testkit.run(Throw(new Exc3))
|
||||
}
|
||||
inbox.receiveMessage() should ===(GotSignal(PostStop))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PostStop))
|
||||
}
|
||||
|
||||
"stop when unhandled exception" in {
|
||||
|
|
@ -112,7 +112,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
|
|||
intercept[Exc3] {
|
||||
testkit.run(Throw(new Exc3))
|
||||
}
|
||||
inbox.receiveMessage() should ===(GotSignal(PostStop))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PostStop))
|
||||
}
|
||||
|
||||
"restart when handled exception" in {
|
||||
|
|
@ -124,7 +124,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
|
|||
inbox.receiveMessage() should ===(State(1, Map.empty))
|
||||
|
||||
testkit.run(Throw(new Exc2))
|
||||
inbox.receiveMessage() should ===(GotSignal(PreRestart))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PreRestart))
|
||||
testkit.run(GetState)
|
||||
inbox.receiveMessage() should ===(State(0, Map.empty))
|
||||
}
|
||||
|
|
@ -162,7 +162,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
|
|||
|
||||
// restart
|
||||
testkit.run(Throw(new Exc3))
|
||||
inbox.receiveMessage() should ===(GotSignal(PreRestart))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PreRestart))
|
||||
testkit.run(GetState)
|
||||
inbox.receiveMessage() should ===(State(0, Map.empty))
|
||||
|
||||
|
|
@ -170,7 +170,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
|
|||
intercept[Exc1] {
|
||||
testkit.run(Throw(new Exc1))
|
||||
}
|
||||
inbox.receiveMessage() should ===(GotSignal(PostStop))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PostStop))
|
||||
}
|
||||
|
||||
"not catch fatal error" in {
|
||||
|
|
@ -190,13 +190,13 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
|
|||
val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](strategy)
|
||||
val testkit = BehaviorTestKit(behv)
|
||||
testkit.run(Throw(new Exc1))
|
||||
inbox.receiveMessage() should ===(GotSignal(PreRestart))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PreRestart))
|
||||
testkit.run(Throw(new Exc1))
|
||||
inbox.receiveMessage() should ===(GotSignal(PreRestart))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PreRestart))
|
||||
intercept[Exc1] {
|
||||
testkit.run(Throw(new Exc1))
|
||||
}
|
||||
inbox.receiveMessage() should ===(GotSignal(PostStop))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PostStop))
|
||||
}
|
||||
|
||||
"reset retry limit after withinTimeRange" in {
|
||||
|
|
@ -206,19 +206,19 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
|
|||
val behv = supervise(targetBehavior(inbox.ref)).onFailure[Exc1](strategy)
|
||||
val testkit = BehaviorTestKit(behv)
|
||||
testkit.run(Throw(new Exc1))
|
||||
inbox.receiveMessage() should ===(GotSignal(PreRestart))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PreRestart))
|
||||
testkit.run(Throw(new Exc1))
|
||||
inbox.receiveMessage() should ===(GotSignal(PreRestart))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PreRestart))
|
||||
Thread.sleep((2.seconds + 100.millis).toMillis)
|
||||
|
||||
testkit.run(Throw(new Exc1))
|
||||
inbox.receiveMessage() should ===(GotSignal(PreRestart))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PreRestart))
|
||||
testkit.run(Throw(new Exc1))
|
||||
inbox.receiveMessage() should ===(GotSignal(PreRestart))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PreRestart))
|
||||
intercept[Exc1] {
|
||||
testkit.run(Throw(new Exc1))
|
||||
}
|
||||
inbox.receiveMessage() should ===(GotSignal(PostStop))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PostStop))
|
||||
}
|
||||
|
||||
"stop at first exception when restart retries limit is 0" in {
|
||||
|
|
@ -230,7 +230,7 @@ class StubbedSupervisionSpec extends WordSpec with Matchers {
|
|||
intercept[Exc1] {
|
||||
testkit.run(Throw(new Exc1))
|
||||
}
|
||||
inbox.receiveMessage() should ===(GotSignal(PostStop))
|
||||
inbox.receiveMessage() should ===(ReceivedSignal(PostStop))
|
||||
}
|
||||
|
||||
"create underlying deferred behavior immediately" in {
|
||||
|
|
@ -317,7 +317,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
val ref = spawn(behv)
|
||||
EventFilter[Exc3](occurrences = 1).intercept {
|
||||
ref ! Throw(new Exc3)
|
||||
probe.expectMessage(GotSignal(PostStop))
|
||||
probe.expectMessage(ReceivedSignal(PostStop))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -345,12 +345,12 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[IOException](occurrences = 1).intercept {
|
||||
ref ! Throw(new IOException())
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
|
||||
EventFilter[IllegalArgumentException](occurrences = 1).intercept {
|
||||
ref ! Throw(new IllegalArgumentException("cat"))
|
||||
probe.expectMessage(GotSignal(PostStop))
|
||||
probe.expectMessage(ReceivedSignal(PostStop))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -366,7 +366,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[Exception](occurrences = 1).intercept {
|
||||
ref ! Throw(new IOException())
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
// verify that it's still alive and not stopped, IllegalStateException would stop it
|
||||
ref ! Ping(1)
|
||||
|
|
@ -374,7 +374,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[IllegalArgumentException](occurrences = 1).intercept {
|
||||
ref ! Throw(new IllegalArgumentException("cat"))
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
|
||||
// verify that it's still alive and not stopped, IllegalStateException would stop it
|
||||
|
|
@ -394,7 +394,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[Exception](occurrences = 1).intercept {
|
||||
ref ! Throw(new IOException())
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
// verify that it's still alive and not stopped, IllegalStateException would stop it
|
||||
ref ! Ping(1)
|
||||
|
|
@ -402,7 +402,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[IllegalArgumentException](occurrences = 1).intercept {
|
||||
ref ! Throw(new IllegalArgumentException("cat"))
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
|
||||
// verify that it's still alive and not stopped, IllegalStateException would stop it
|
||||
|
|
@ -416,7 +416,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
val ref = spawn(behv)
|
||||
EventFilter[Exc3](occurrences = 1).intercept {
|
||||
ref ! Throw(new Exc3)
|
||||
probe.expectMessage(GotSignal(PostStop))
|
||||
probe.expectMessage(ReceivedSignal(PostStop))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -427,7 +427,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
val ref = spawn(behv)
|
||||
EventFilter[Exc3](occurrences = 1).intercept {
|
||||
ref ! Throw(new Exc3)
|
||||
probe.expectMessage(GotSignal(PostStop))
|
||||
probe.expectMessage(ReceivedSignal(PostStop))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -442,7 +442,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[Exc2](occurrences = 1).intercept {
|
||||
ref ! Throw(new Exc2)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
ref ! GetState
|
||||
probe.expectMessage(State(0, Map.empty))
|
||||
|
|
@ -460,11 +460,11 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[Exc2](occurrences = 3).intercept {
|
||||
ref ! Throw(new Exc2)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! Throw(new Exc2)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! Throw(new Exc2)
|
||||
probe.expectMessage(GotSignal(PostStop))
|
||||
probe.expectMessage(ReceivedSignal(PostStop))
|
||||
}
|
||||
EventFilter.warning(start = "received dead letter", occurrences = 1).intercept {
|
||||
ref ! GetState
|
||||
|
|
@ -484,14 +484,14 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[Exc2](occurrences = 3).intercept {
|
||||
ref ! Throw(new Exc2)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! Throw(new Exc2)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
|
||||
probe.expectNoMessage(resetTimeout + 50.millis)
|
||||
|
||||
ref ! Throw(new Exc2)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
ref ! GetState
|
||||
probe.expectMessage(State(0, Map.empty))
|
||||
|
|
@ -525,7 +525,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[Exc1](occurrences = 1).intercept {
|
||||
ref ! Throw(new Exc1)
|
||||
parentProbe.expectMessage(GotSignal(PreRestart))
|
||||
parentProbe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! GetState
|
||||
anotherProbe.stop()
|
||||
}
|
||||
|
|
@ -534,11 +534,11 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
parentProbe.expectNoMessage()
|
||||
slowStop.countDown()
|
||||
|
||||
childProbe.expectMessage(GotSignal(PostStop))
|
||||
childProbe.expectMessage(GotSignal(PostStop))
|
||||
childProbe.expectMessage(ReceivedSignal(PostStop))
|
||||
childProbe.expectMessage(ReceivedSignal(PostStop))
|
||||
parentProbe.expectMessageType[State].children.keySet should ===(Set.empty)
|
||||
// anotherProbe was stopped, Terminated signal stashed and delivered to new behavior
|
||||
parentProbe.expectMessage(GotSignal(Terminated(anotherProbe.ref)))
|
||||
parentProbe.expectMessage(ReceivedSignal(Terminated(anotherProbe.ref)))
|
||||
}
|
||||
|
||||
"optionally NOT stop children when restarting" in {
|
||||
|
|
@ -564,7 +564,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[Exc1](occurrences = 1).intercept {
|
||||
ref ! Throw(new Exc1)
|
||||
parentProbe.expectMessage(GotSignal(PreRestart))
|
||||
parentProbe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! GetState
|
||||
}
|
||||
parentProbe.expectMessageType[State].children.keySet should contain(childName)
|
||||
|
|
@ -597,7 +597,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[Exc1](occurrences = 1).intercept {
|
||||
ref ! Throw(new Exc1)
|
||||
parentProbe.expectMessage(GotSignal(PreRestart))
|
||||
parentProbe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! GetState
|
||||
ref ! CreateChild(targetBehavior(childProbe.ref), child2Name)
|
||||
ref ! GetState
|
||||
|
|
@ -606,13 +606,14 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[Exc1](occurrences = 1).intercept {
|
||||
slowStop.countDown()
|
||||
childProbe.expectMessage(GotSignal(PostStop)) // child1
|
||||
childProbe.expectMessage(ReceivedSignal(PostStop)) // child1
|
||||
parentProbe.expectMessageType[State].children.keySet should ===(Set.empty)
|
||||
parentProbe.expectMessageType[State].children.keySet should ===(Set(child2Name))
|
||||
// the stashed Throw is causing another restart and stop of child2
|
||||
childProbe.expectMessage(GotSignal(PostStop)) // child2
|
||||
childProbe.expectMessage(ReceivedSignal(PostStop)) // child2
|
||||
}
|
||||
|
||||
parentProbe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! GetState
|
||||
parentProbe.expectMessageType[State].children.keySet should ===(Set.empty)
|
||||
}
|
||||
|
|
@ -650,10 +651,10 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
EventFilter[TestException](occurrences = 1).intercept {
|
||||
val ref = spawn(behv)
|
||||
slowStop1.countDown()
|
||||
child1Probe.expectMessage(GotSignal(PostStop))
|
||||
child1Probe.expectMessage(ReceivedSignal(PostStop))
|
||||
throwFromSetup.set(false)
|
||||
slowStop2.countDown()
|
||||
child2Probe.expectMessage(GotSignal(PostStop))
|
||||
child2Probe.expectMessage(ReceivedSignal(PostStop))
|
||||
|
||||
ref ! GetState
|
||||
parentProbe.expectMessageType[State].children.keySet should ===(Set("child1"))
|
||||
|
|
@ -699,16 +700,16 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
|
||||
EventFilter[Exc1](occurrences = 1).intercept {
|
||||
ref ! Throw(new Exc1)
|
||||
parentProbe.expectMessage(GotSignal(PreRestart))
|
||||
parentProbe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
|
||||
EventFilter[TestException](occurrences = 1).intercept {
|
||||
slowStop1.countDown()
|
||||
child1Probe.expectMessage(GotSignal(PostStop))
|
||||
child1Probe.expectMessage(GotSignal(PostStop))
|
||||
child1Probe.expectMessage(ReceivedSignal(PostStop))
|
||||
child1Probe.expectMessage(ReceivedSignal(PostStop))
|
||||
throwFromSetup.set(false)
|
||||
slowStop2.countDown()
|
||||
child2Probe.expectMessage(GotSignal(PostStop))
|
||||
child2Probe.expectMessage(ReceivedSignal(PostStop))
|
||||
}
|
||||
|
||||
ref ! GetState
|
||||
|
|
@ -752,7 +753,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
// restart
|
||||
EventFilter[Exc3](occurrences = 1).intercept {
|
||||
ref ! Throw(new Exc3)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! GetState
|
||||
probe.expectMessage(State(0, Map.empty))
|
||||
}
|
||||
|
|
@ -760,7 +761,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
// stop
|
||||
EventFilter[Exc1](occurrences = 1).intercept {
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(GotSignal(PostStop))
|
||||
probe.expectMessage(ReceivedSignal(PostStop))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -781,7 +782,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
EventFilter[Exc1](occurrences = 1).intercept {
|
||||
startedProbe.expectMessage(Started)
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
}
|
||||
ref ! Ping(1)
|
||||
ref ! Ping(2)
|
||||
|
|
@ -811,7 +812,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
startedProbe.expectMessage(Started)
|
||||
ref ! IncrementState
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! Ping(1) // dropped due to backoff, no stashing
|
||||
}
|
||||
|
||||
|
|
@ -825,7 +826,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
EventFilter[Exc1](occurrences = 1).intercept {
|
||||
ref ! IncrementState
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! Ping(2) // dropped due to backoff, no stashing
|
||||
}
|
||||
|
||||
|
|
@ -878,7 +879,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
EventFilter[Exc1](occurrences = 1).intercept {
|
||||
ref ! IncrementState
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! Ping(1) // dropped due to backoff, no stash
|
||||
}
|
||||
|
||||
|
|
@ -891,7 +892,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
probe.expectNoMessage(strategy.resetBackoffAfter + 100.millis.dilated)
|
||||
ref ! IncrementState
|
||||
ref ! Throw(new Exc1)
|
||||
probe.expectMessage(GotSignal(PreRestart))
|
||||
probe.expectMessage(ReceivedSignal(PreRestart))
|
||||
ref ! Ping(2) // dropped due to backoff
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,11 +5,18 @@
|
|||
package akka.actor.typed
|
||||
package scaladsl
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.testkit.typed.TestException
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.testkit.EventFilter
|
||||
import org.scalatest.WordSpecLike
|
||||
|
||||
object StashSpec {
|
||||
object AbstractStashSpec {
|
||||
sealed trait Command
|
||||
final case class Msg(s: String) extends Command
|
||||
final case class Unstashed(cmd: Command) extends Command
|
||||
|
|
@ -25,7 +32,7 @@ object StashSpec {
|
|||
val buffer = StashBuffer[Command](capacity = 10)
|
||||
|
||||
def active(processed: Vector[String]): Behavior[Command] =
|
||||
Behaviors.receive { (context, cmd) ⇒
|
||||
Behaviors.receive { (_, cmd) ⇒
|
||||
cmd match {
|
||||
case message: Msg ⇒
|
||||
active(processed :+ message.s)
|
||||
|
|
@ -176,20 +183,20 @@ object StashSpec {
|
|||
|
||||
}
|
||||
|
||||
class ImmutableStashSpec extends StashSpec {
|
||||
import StashSpec._
|
||||
class ImmutableStashSpec extends AbstractStashSpec {
|
||||
import AbstractStashSpec._
|
||||
def testQualifier: String = "immutable behavior"
|
||||
def behaviorUnderTest: Behavior[Command] = immutableStash
|
||||
}
|
||||
|
||||
class MutableStashSpec extends StashSpec {
|
||||
import StashSpec._
|
||||
class MutableStashSpec extends AbstractStashSpec {
|
||||
import AbstractStashSpec._
|
||||
def testQualifier: String = "mutable behavior"
|
||||
def behaviorUnderTest: Behavior[Command] = Behaviors.setup(context ⇒ new MutableStash(context))
|
||||
}
|
||||
|
||||
abstract class StashSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||
import StashSpec._
|
||||
abstract class AbstractStashSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||
import AbstractStashSpec._
|
||||
|
||||
def testQualifier: String
|
||||
def behaviorUnderTest: Behavior[Command]
|
||||
|
|
@ -242,3 +249,340 @@ abstract class StashSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
class UnstashingSpec extends ScalaTestWithActorTestKit("""
|
||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
""") with WordSpecLike {
|
||||
|
||||
// needed for EventFilter
|
||||
private implicit val untypedSys: akka.actor.ActorSystem = {
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
system.toUntyped
|
||||
}
|
||||
|
||||
private def slowStoppingChild(latch: CountDownLatch): Behavior[String] =
|
||||
Behaviors.receiveSignal {
|
||||
case (_, PostStop) ⇒
|
||||
latch.await(10, TimeUnit.SECONDS)
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
private def stashingBehavior(
|
||||
probe: ActorRef[String],
|
||||
withSlowStoppingChild: Option[CountDownLatch] = None) = {
|
||||
Behaviors.setup[String] { ctx ⇒
|
||||
|
||||
withSlowStoppingChild.foreach(latch ⇒ ctx.spawnAnonymous(slowStoppingChild(latch)))
|
||||
|
||||
val stash = StashBuffer[String](10)
|
||||
|
||||
def unstashing(n: Int): Behavior[String] =
|
||||
Behaviors.receiveMessage[String] {
|
||||
case "stash" ⇒
|
||||
probe.ref ! s"unstashing-$n"
|
||||
unstashing(n + 1)
|
||||
case "stash-fail" ⇒
|
||||
probe.ref ! s"stash-fail-$n"
|
||||
throw TestException("unstash-fail")
|
||||
case "get-current" ⇒
|
||||
probe.ref ! s"current-$n"
|
||||
Behaviors.same
|
||||
case "get-stash-size" ⇒
|
||||
probe.ref ! s"stash-size-${stash.size}"
|
||||
Behaviors.same
|
||||
case "unstash" ⇒
|
||||
// when testing resume
|
||||
stash.unstashAll(ctx, unstashing(n))
|
||||
}.receiveSignal {
|
||||
case (_, PreRestart) ⇒
|
||||
probe.ref ! s"pre-restart-$n"
|
||||
Behaviors.same
|
||||
case (_, PostStop) ⇒
|
||||
probe.ref ! s"post-stop-$n"
|
||||
Behaviors.same
|
||||
}
|
||||
|
||||
Behaviors.receiveMessage[String] {
|
||||
case msg if msg.startsWith("stash") ⇒
|
||||
stash.stash(msg)
|
||||
Behavior.same
|
||||
case "unstash" ⇒
|
||||
stash.unstashAll(ctx, unstashing(0))
|
||||
case "get-current" ⇒
|
||||
probe.ref ! s"current-00"
|
||||
Behaviors.same
|
||||
case "get-stash-size" ⇒
|
||||
probe.ref ! s"stash-size-${stash.size}"
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"Unstashing" must {
|
||||
|
||||
"work with initial Behaviors.same" in {
|
||||
// FIXME #26148 unstashAll doesn't support Behavior.same
|
||||
pending
|
||||
|
||||
val probe = TestProbe[String]()
|
||||
// unstashing is inside setup
|
||||
val ref = spawn(Behaviors.receive[String] {
|
||||
case (ctx, "unstash") ⇒
|
||||
val stash = StashBuffer[String](10)
|
||||
stash.stash("one")
|
||||
stash.unstashAll(ctx, Behavior.same)
|
||||
|
||||
case (_, msg) ⇒
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
})
|
||||
|
||||
ref ! "unstash"
|
||||
probe.expectMessage("one")
|
||||
}
|
||||
|
||||
"work with intermediate Behaviors.same" in {
|
||||
val probe = TestProbe[String]()
|
||||
// unstashing is inside setup
|
||||
val ref = spawn(Behaviors.receivePartial[String] {
|
||||
case (ctx, "unstash") ⇒
|
||||
val stash = StashBuffer[String](10)
|
||||
stash.stash("one")
|
||||
stash.stash("two")
|
||||
stash.unstashAll(ctx, Behaviors.receiveMessage { msg ⇒
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
})
|
||||
})
|
||||
|
||||
ref ! "unstash"
|
||||
probe.expectMessage("one")
|
||||
probe.expectMessage("two")
|
||||
ref ! "three"
|
||||
probe.expectMessage("three")
|
||||
}
|
||||
|
||||
"work with supervised initial Behaviors.same" in {
|
||||
// FIXME #26148 unstashAll doesn't support Behavior.same
|
||||
pending
|
||||
|
||||
val probe = TestProbe[String]()
|
||||
// unstashing is inside setup
|
||||
val ref = spawn(Behaviors.supervise(Behaviors.receivePartial[String] {
|
||||
case (ctx, "unstash") ⇒
|
||||
val stash = StashBuffer[String](10)
|
||||
stash.stash("one")
|
||||
stash.unstashAll(ctx, Behavior.same)
|
||||
|
||||
case (_, msg) ⇒
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
}).onFailure[TestException](SupervisorStrategy.stop))
|
||||
|
||||
ref ! "unstash"
|
||||
probe.expectMessage("one")
|
||||
ref ! "two"
|
||||
probe.expectMessage("two")
|
||||
}
|
||||
|
||||
"work with supervised intermediate Behaviors.same" in {
|
||||
val probe = TestProbe[String]()
|
||||
// unstashing is inside setup
|
||||
val ref = spawn(Behaviors.supervise(Behaviors.receivePartial[String] {
|
||||
case (ctx, "unstash") ⇒
|
||||
val stash = StashBuffer[String](10)
|
||||
stash.stash("one")
|
||||
stash.stash("two")
|
||||
stash.unstashAll(ctx, Behaviors.receiveMessage { msg ⇒
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
})
|
||||
}).onFailure[TestException](SupervisorStrategy.stop))
|
||||
|
||||
ref ! "unstash"
|
||||
probe.expectMessage("one")
|
||||
probe.expectMessage("two")
|
||||
ref ! "three"
|
||||
probe.expectMessage("three")
|
||||
}
|
||||
|
||||
def testPostStop(
|
||||
probe: TestProbe[String],
|
||||
ref: ActorRef[String]
|
||||
): Unit = {
|
||||
ref ! "stash"
|
||||
ref ! "stash"
|
||||
ref ! "stash-fail"
|
||||
ref ! "stash"
|
||||
EventFilter[TestException](start = "unstash-fail", occurrences = 1)
|
||||
.intercept {
|
||||
ref ! "unstash"
|
||||
probe.expectMessage("unstashing-0")
|
||||
probe.expectMessage("unstashing-1")
|
||||
probe.expectMessage("stash-fail-2")
|
||||
probe.expectMessage("post-stop-2")
|
||||
}
|
||||
}
|
||||
|
||||
"signal PostStop to the latest unstashed behavior on failure" in {
|
||||
val probe = TestProbe[String]()
|
||||
val ref = spawn(stashingBehavior(probe.ref))
|
||||
testPostStop(probe, ref)
|
||||
}
|
||||
|
||||
"signal PostStop to the latest unstashed behavior on failure with stop supervision" in {
|
||||
val probe = TestProbe[String]()
|
||||
val ref =
|
||||
spawn(Behaviors.supervise(stashingBehavior(probe.ref))
|
||||
.onFailure[TestException](SupervisorStrategy.stop))
|
||||
testPostStop(probe, ref)
|
||||
}
|
||||
|
||||
def testPreRestart(
|
||||
probe: TestProbe[String],
|
||||
childLatch: Option[CountDownLatch],
|
||||
ref: ActorRef[String]
|
||||
): Unit = {
|
||||
ref ! "stash"
|
||||
ref ! "stash"
|
||||
ref ! "stash-fail"
|
||||
ref ! "stash"
|
||||
EventFilter[TestException](
|
||||
start = "Supervisor RestartSupervisor saw failure: unstash-fail",
|
||||
occurrences = 1
|
||||
).intercept {
|
||||
ref ! "unstash"
|
||||
// when childLatch is defined this be stashed in the internal stash of the RestartSupervisor
|
||||
// because it's waiting for child to stop
|
||||
ref ! "get-current"
|
||||
|
||||
probe.expectMessage("unstashing-0")
|
||||
probe.expectMessage("unstashing-1")
|
||||
probe.expectMessage("stash-fail-2")
|
||||
probe.expectMessage("pre-restart-2")
|
||||
|
||||
childLatch.foreach(_.countDown())
|
||||
probe.expectMessage("current-00")
|
||||
|
||||
ref ! "get-stash-size"
|
||||
probe.expectMessage("stash-size-0")
|
||||
}
|
||||
}
|
||||
|
||||
"signal PreRestart to the latest unstashed behavior on failure with restart supervision" in {
|
||||
val probe = TestProbe[String]()
|
||||
val ref =
|
||||
spawn(Behaviors.supervise(stashingBehavior(probe.ref))
|
||||
.onFailure[TestException](SupervisorStrategy.restart))
|
||||
|
||||
testPreRestart(probe, None, ref)
|
||||
// one more time to ensure that the restart strategy is kept
|
||||
testPreRestart(probe, None, ref)
|
||||
}
|
||||
|
||||
"signal PreRestart to the latest unstashed behavior on failure with restart supervision and slow stopping child" in {
|
||||
val probe = TestProbe[String]()
|
||||
val childLatch = new CountDownLatch(1)
|
||||
val ref =
|
||||
spawn(Behaviors.supervise(stashingBehavior(probe.ref, Some(childLatch)))
|
||||
.onFailure[TestException](SupervisorStrategy.restart))
|
||||
|
||||
testPreRestart(probe, Some(childLatch), ref)
|
||||
}
|
||||
|
||||
"signal PreRestart to the latest unstashed behavior on failure with backoff supervision" in {
|
||||
val probe = TestProbe[String]()
|
||||
val ref =
|
||||
spawn(Behaviors.supervise(stashingBehavior(probe.ref))
|
||||
.onFailure[TestException](SupervisorStrategy.restartWithBackoff(100.millis, 100.millis, 0.0)))
|
||||
|
||||
testPreRestart(probe, None, ref)
|
||||
|
||||
// one more time to ensure that the backoff strategy is kept
|
||||
testPreRestart(probe, None, ref)
|
||||
}
|
||||
|
||||
"signal PreRestart to the latest unstashed behavior on failure with backoff supervision and slow stopping child" in {
|
||||
val probe = TestProbe[String]()
|
||||
val childLatch = new CountDownLatch(1)
|
||||
val ref =
|
||||
spawn(Behaviors.supervise(stashingBehavior(probe.ref, Some(childLatch)))
|
||||
.onFailure[TestException](SupervisorStrategy.restartWithBackoff(100.millis, 100.millis, 0.0)))
|
||||
|
||||
testPreRestart(probe, Some(childLatch), ref)
|
||||
}
|
||||
|
||||
"handle resume correctly on failure unstashing" in {
|
||||
val probe = TestProbe[String]()
|
||||
val ref =
|
||||
spawn(Behaviors.supervise(stashingBehavior(probe.ref))
|
||||
.onFailure[TestException](SupervisorStrategy.resume))
|
||||
|
||||
ref ! "stash"
|
||||
ref ! "stash"
|
||||
ref ! "stash-fail"
|
||||
ref ! "stash"
|
||||
ref ! "stash"
|
||||
ref ! "stash"
|
||||
ref ! "stash-fail"
|
||||
ref ! "stash"
|
||||
EventFilter[TestException](start = "Supervisor ResumeSupervisor saw failure: unstash-fail", occurrences = 1).intercept {
|
||||
ref ! "unstash"
|
||||
ref ! "get-current"
|
||||
|
||||
probe.expectMessage("unstashing-0")
|
||||
probe.expectMessage("unstashing-1")
|
||||
probe.expectMessage("stash-fail-2")
|
||||
probe.expectMessage("current-2")
|
||||
ref ! "get-stash-size"
|
||||
probe.expectMessage("stash-size-5")
|
||||
}
|
||||
|
||||
ref ! "unstash"
|
||||
ref ! "get-current"
|
||||
probe.expectMessage("unstashing-2")
|
||||
probe.expectMessage("unstashing-3")
|
||||
probe.expectMessage("unstashing-4")
|
||||
probe.expectMessage("stash-fail-5")
|
||||
probe.expectMessage("current-5")
|
||||
ref ! "get-stash-size"
|
||||
probe.expectMessage("stash-size-1")
|
||||
|
||||
ref ! "unstash"
|
||||
ref ! "get-current"
|
||||
probe.expectMessage("unstashing-5")
|
||||
probe.expectMessage("current-6")
|
||||
|
||||
ref ! "get-stash-size"
|
||||
probe.expectMessage("stash-size-0")
|
||||
}
|
||||
|
||||
"be possible in combination with setup" in {
|
||||
val probe = TestProbe[String]()
|
||||
val ref = spawn(Behaviors.setup[String] { _ ⇒
|
||||
val stash = StashBuffer[String](10)
|
||||
stash.stash("one")
|
||||
|
||||
// FIXME #26148 using AbstractBehavior because unstashAll doesn't support Behavior.same
|
||||
|
||||
// unstashing is inside setup
|
||||
new AbstractBehavior[String] {
|
||||
override def onMessage(msg: String): Behavior[String] = msg match {
|
||||
case "unstash" ⇒
|
||||
Behaviors.setup[String] { ctx ⇒
|
||||
stash.unstashAll(ctx, this)
|
||||
}
|
||||
case _ ⇒
|
||||
probe.ref ! msg
|
||||
Behavior.same
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
ref ! "unstash"
|
||||
probe.expectMessage("one")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -424,27 +424,5 @@ object Behavior {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Execute the behavior with the given messages (or signals).
|
||||
* The returned [[Behavior]] from each processed message is used for the next message.
|
||||
*/
|
||||
@InternalApi private[akka] def interpretMessages[T](behavior: Behavior[T], ctx: TypedActorContext[T], messages: Iterator[T]): Behavior[T] = {
|
||||
@tailrec def interpretOne(b: Behavior[T]): Behavior[T] = {
|
||||
val b2 = Behavior.start(b, ctx)
|
||||
if (!Behavior.isAlive(b2) || !messages.hasNext) b2
|
||||
else {
|
||||
val nextB = messages.next() match {
|
||||
case sig: Signal ⇒ Behavior.interpretSignal(b2, ctx, sig)
|
||||
case msg ⇒ Behavior.interpretMessage(b2, ctx, msg)
|
||||
}
|
||||
interpretOne(Behavior.canonicalize(nextB, b, ctx)) // recursive
|
||||
}
|
||||
}
|
||||
|
||||
interpretOne(Behavior.start(behavior, ctx))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,12 @@ package akka.actor.typed.internal
|
|||
import java.util.function.Consumer
|
||||
import java.util.function.{ Function ⇒ JFunction }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.Signal
|
||||
import akka.actor.typed.TypedActorContext
|
||||
import akka.actor.typed.javadsl
|
||||
import akka.actor.typed.scaladsl
|
||||
import akka.annotation.InternalApi
|
||||
|
|
@ -86,7 +91,7 @@ import akka.util.ConstantFun
|
|||
}
|
||||
}
|
||||
|
||||
override def forEach(f: Consumer[T]): Unit = foreach(f.accept(_))
|
||||
override def forEach(f: Consumer[T]): Unit = foreach(f.accept)
|
||||
|
||||
override def unstashAll(ctx: scaladsl.ActorContext[T], behavior: Behavior[T]): Behavior[T] =
|
||||
unstash(ctx, behavior, size, ConstantFun.scalaIdentityFunction[T])
|
||||
|
|
@ -96,11 +101,36 @@ import akka.util.ConstantFun
|
|||
|
||||
override def unstash(ctx: scaladsl.ActorContext[T], behavior: Behavior[T],
|
||||
numberOfMessages: Int, wrap: T ⇒ T): Behavior[T] = {
|
||||
val iter = new Iterator[T] {
|
||||
override def hasNext: Boolean = StashBufferImpl.this.nonEmpty
|
||||
override def next(): T = wrap(StashBufferImpl.this.dropHead())
|
||||
}.take(numberOfMessages)
|
||||
Behavior.interpretMessages[T](behavior, ctx, iter)
|
||||
if (isEmpty)
|
||||
behavior // optimization
|
||||
else {
|
||||
val iter = new Iterator[T] {
|
||||
override def hasNext: Boolean = StashBufferImpl.this.nonEmpty
|
||||
override def next(): T = wrap(StashBufferImpl.this.dropHead())
|
||||
}.take(numberOfMessages)
|
||||
interpretUnstashedMessages(behavior, ctx, iter)
|
||||
}
|
||||
}
|
||||
|
||||
private def interpretUnstashedMessages(behavior: Behavior[T], ctx: TypedActorContext[T], messages: Iterator[T]): Behavior[T] = {
|
||||
@tailrec def interpretOne(b: Behavior[T]): Behavior[T] = {
|
||||
val b2 = Behavior.start(b, ctx)
|
||||
if (!Behavior.isAlive(b2) || !messages.hasNext) b2
|
||||
else {
|
||||
val nextB = try {
|
||||
messages.next() match {
|
||||
case sig: Signal ⇒ Behavior.interpretSignal(b2, ctx, sig)
|
||||
case msg ⇒ Behavior.interpretMessage(b2, ctx, msg)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ throw UnstashException(e, b2)
|
||||
}
|
||||
|
||||
interpretOne(Behavior.canonicalize(nextB, b2, ctx)) // recursive
|
||||
}
|
||||
}
|
||||
|
||||
interpretOne(Behavior.start(behavior, ctx))
|
||||
}
|
||||
|
||||
override def unstash(ctx: javadsl.ActorContext[T], behavior: Behavior[T],
|
||||
|
|
@ -111,3 +141,23 @@ import akka.util.ConstantFun
|
|||
s"StashBuffer($size/$capacity)"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] object UnstashException {
|
||||
def unwrap(t: Throwable): Throwable = t match {
|
||||
case UnstashException(e, _) ⇒ e
|
||||
case _ ⇒ t
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API:
|
||||
*
|
||||
* When unstashing, the exception is wrapped in UnstashException because supervisor strategy
|
||||
* and ActorAdapter need the behavior that threw. It will use the behavior in the `UnstashException`
|
||||
* to emit the PreRestart and PostStop to the right behavior and install the latest behavior for resume strategy.
|
||||
*/
|
||||
@InternalApi private[akka] final case class UnstashException[T](cause: Throwable, behavior: Behavior[T])
|
||||
extends RuntimeException(s"[$cause] when unstashing in [$behavior]", cause)
|
||||
|
|
|
|||
|
|
@ -49,6 +49,9 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
|
|||
|
||||
private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass
|
||||
|
||||
protected def isInstanceOfTheThrowableClass(t: Throwable): Boolean =
|
||||
throwableClass.isAssignableFrom(UnstashException.unwrap(t).getClass)
|
||||
|
||||
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = {
|
||||
other match {
|
||||
case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass ⇒ true
|
||||
|
|
@ -70,7 +73,8 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
|
|||
|
||||
def log(ctx: TypedActorContext[_], t: Throwable): Unit = {
|
||||
if (strategy.loggingEnabled) {
|
||||
ctx.asScala.log.error(t, "Supervisor {} saw failure: {}", this, t.getMessage)
|
||||
val unwrapped = UnstashException.unwrap(t)
|
||||
ctx.asScala.log.error(unwrapped, "Supervisor {} saw failure: {}", this, unwrapped.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -98,7 +102,7 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super
|
|||
}
|
||||
|
||||
protected def handleException(@unused ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
case NonFatal(t) if isInstanceOfTheThrowableClass(t) ⇒
|
||||
Behavior.failed(t)
|
||||
}
|
||||
|
||||
|
|
@ -113,8 +117,9 @@ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: Super
|
|||
|
||||
private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Behavior[T], strategy: Stop)
|
||||
extends SimpleSupervisor[T, Thr](strategy) {
|
||||
|
||||
override def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
case NonFatal(t) if isInstanceOfTheThrowableClass(t) ⇒
|
||||
log(ctx, t)
|
||||
Behavior.failed(t)
|
||||
}
|
||||
|
|
@ -122,9 +127,12 @@ private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Beh
|
|||
|
||||
private class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) {
|
||||
override protected def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
case NonFatal(t) if isInstanceOfTheThrowableClass(t) ⇒
|
||||
log(ctx, t)
|
||||
Behaviors.same
|
||||
t match {
|
||||
case e: UnstashException[T] @unchecked ⇒ e.behavior
|
||||
case _ ⇒ Behaviors.same
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -239,7 +247,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
}
|
||||
|
||||
override protected def handleExceptionOnStart(ctx: TypedActorContext[O], @unused target: PreStartTarget[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
case NonFatal(t) if isInstanceOfTheThrowableClass(t) ⇒
|
||||
strategy match {
|
||||
case _: Restart ⇒
|
||||
// if unlimited restarts then don't restart if starting fails as it would likely be an infinite restart loop
|
||||
|
|
@ -255,14 +263,20 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
}
|
||||
|
||||
override protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[T]): Catcher[Behavior[T]] = {
|
||||
handleException(ctx, () ⇒ target(ctx, PreRestart))
|
||||
handleException(ctx, signalRestart = {
|
||||
case e: UnstashException[O] @unchecked ⇒ Behavior.interpretSignal(e.behavior, ctx, PreRestart)
|
||||
case _ ⇒ target(ctx, PreRestart)
|
||||
})
|
||||
}
|
||||
override protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
|
||||
handleException(ctx, () ⇒ target.signalRestart(ctx))
|
||||
handleException(ctx, signalRestart = {
|
||||
case e: UnstashException[O] @unchecked ⇒ Behavior.interpretSignal(e.behavior, ctx, PreRestart)
|
||||
case _ ⇒ target.signalRestart(ctx)
|
||||
})
|
||||
}
|
||||
|
||||
private def handleException(ctx: TypedActorContext[O], signalRestart: () ⇒ Unit): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
private def handleException(ctx: TypedActorContext[O], signalRestart: Throwable ⇒ Unit): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t) if isInstanceOfTheThrowableClass(t) ⇒
|
||||
if (strategy.maxRestarts != -1 && restartCount >= strategy.maxRestarts && deadlineHasTimeLeft) {
|
||||
strategy match {
|
||||
case _: Restart ⇒ throw t
|
||||
|
|
@ -272,7 +286,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
}
|
||||
|
||||
} else {
|
||||
try signalRestart() catch {
|
||||
try signalRestart(t) catch {
|
||||
case NonFatal(ex) ⇒ ctx.asScala.log.error(ex, "failure during PreRestart")
|
||||
}
|
||||
|
||||
|
|
@ -326,9 +340,10 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
stashBuffer.unstashAll(ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], newBehavior.unsafeCast)
|
||||
}
|
||||
nextBehavior.narrow
|
||||
} catch handleException(ctx, signalRestart = () ⇒ ())
|
||||
// FIXME signal Restart is not done if unstashAll throws, unstash of each message may return a new behavior and
|
||||
// it's the failing one that should receive the signal
|
||||
} catch handleException(ctx, signalRestart = {
|
||||
case e: UnstashException[O] @unchecked ⇒ Behavior.interpretSignal(e.behavior, ctx, PreRestart)
|
||||
case _ ⇒ ()
|
||||
})
|
||||
}
|
||||
|
||||
private def stopChildren(ctx: TypedActorContext[_], children: Set[ActorRef[Nothing]]): Unit = {
|
||||
|
|
|
|||
|
|
@ -10,11 +10,12 @@ import java.lang.reflect.InvocationTargetException
|
|||
|
||||
import akka.actor.ActorInitializationException
|
||||
import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
import scala.util.Try
|
||||
import scala.util.control.Exception.Catcher
|
||||
|
||||
import akka.{ actor ⇒ untyped }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.OptionVal
|
||||
|
|
@ -63,9 +64,9 @@ import akka.util.OptionVal
|
|||
failures -= ref
|
||||
ChildFailed(ActorRefAdapter(ref), ex)
|
||||
} else Terminated(ActorRefAdapter(ref))
|
||||
next(Behavior.interpretSignal(behavior, ctx, msg), msg)
|
||||
handleSignal(msg)
|
||||
case untyped.ReceiveTimeout ⇒
|
||||
next(Behavior.interpretMessage(behavior, ctx, ctx.receiveTimeoutMsg), ctx.receiveTimeoutMsg)
|
||||
handleMessage(ctx.receiveTimeoutMsg)
|
||||
case wrapped: AdaptMessage[Any, T] @unchecked ⇒
|
||||
withSafelyAdapted(() ⇒ wrapped.adapt()) {
|
||||
case AdaptWithRegisteredMessageAdapter(msg) ⇒
|
||||
|
|
@ -80,7 +81,27 @@ import akka.util.OptionVal
|
|||
}
|
||||
|
||||
private def handleMessage(msg: T): Unit = {
|
||||
next(Behavior.interpretMessage(behavior, ctx, msg), msg)
|
||||
try {
|
||||
next(Behavior.interpretMessage(behavior, ctx, msg), msg)
|
||||
} catch handleUnstashException
|
||||
}
|
||||
|
||||
private def handleSignal(sig: Signal): Unit = {
|
||||
try {
|
||||
next(Behavior.interpretSignal(behavior, ctx, sig), sig)
|
||||
} catch handleUnstashException
|
||||
}
|
||||
|
||||
private def handleUnstashException: Catcher[Unit] = {
|
||||
case e: UnstashException[T] @unchecked ⇒
|
||||
behavior = e.behavior
|
||||
throw e.cause
|
||||
case TypedActorFailedException(e: UnstashException[T] @unchecked) ⇒
|
||||
behavior = e.behavior
|
||||
throw TypedActorFailedException(e.cause)
|
||||
case ActorInitializationException(actor, message, e: UnstashException[T] @unchecked) ⇒
|
||||
behavior = e.behavior
|
||||
throw ActorInitializationException(actor, message, e.cause)
|
||||
}
|
||||
|
||||
private def next(b: Behavior[T], msg: Any): Unit = {
|
||||
|
|
@ -102,7 +123,7 @@ import akka.util.OptionVal
|
|||
context.stop(self)
|
||||
case f: FailedBehavior ⇒
|
||||
// For the parent untyped supervisor to pick up the exception
|
||||
throw new TypedActorFailedException(f.cause)
|
||||
throw TypedActorFailedException(f.cause)
|
||||
case _ ⇒
|
||||
behavior = Behavior.canonicalize(b, behavior, ctx)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,17 +5,17 @@
|
|||
package akka.persistence.typed.internal
|
||||
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.internal.PoisonPill
|
||||
import akka.actor.typed.Signal
|
||||
import akka.actor.typed.scaladsl.{ AbstractBehavior, Behaviors }
|
||||
import akka.actor.typed.internal.PoisonPill
|
||||
import akka.actor.typed.internal.UnstashException
|
||||
import akka.actor.typed.scaladsl.AbstractBehavior
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.annotation.InternalApi
|
||||
import akka.event.Logging
|
||||
import akka.persistence.JournalProtocol._
|
||||
import akka.persistence._
|
||||
import akka.persistence.typed.internal.ReplayingEvents.FailureWhileUnstashing
|
||||
import akka.persistence.typed.internal.ReplayingEvents.ReplayingState
|
||||
import akka.persistence.typed.internal.Running.WithSeqNrAccessible
|
||||
|
||||
|
|
@ -55,8 +55,6 @@ private[akka] object ReplayingEvents {
|
|||
new ReplayingEvents[C, E, S](setup.setMdc(MDC.ReplayingEvents), state)
|
||||
}
|
||||
|
||||
private final case class FailureWhileUnstashing(cause: Throwable) extends Exception(cause) with NoStackTrace
|
||||
|
||||
}
|
||||
|
||||
@InternalApi
|
||||
|
|
@ -113,7 +111,9 @@ private[akka] final class ReplayingEvents[C, E, S](
|
|||
Behaviors.unhandled
|
||||
}
|
||||
} catch {
|
||||
case FailureWhileUnstashing(ex) ⇒ throw ex
|
||||
case ex: UnstashException[_] ⇒
|
||||
// let supervisor handle it, don't treat it as recovery failure
|
||||
throw ex
|
||||
case NonFatal(cause) ⇒
|
||||
onRecoveryFailure(cause, state.seqNr, None)
|
||||
}
|
||||
|
|
@ -190,11 +190,7 @@ private[akka] final class ReplayingEvents[C, E, S](
|
|||
Running.RunningState[S](state.seqNr, state.state, state.receivedPoisonPill)
|
||||
)
|
||||
|
||||
try {
|
||||
tryUnstashOne(running)
|
||||
} catch {
|
||||
case NonFatal(t) ⇒ throw FailureWhileUnstashing(t)
|
||||
}
|
||||
tryUnstashOne(running)
|
||||
}
|
||||
} finally {
|
||||
setup.cancelRecoveryTimer()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue