Expected behavior on Terminated with orElse signal handling #26518
This commit is contained in:
parent
ef3a19b5d0
commit
9f36e8e647
4 changed files with 114 additions and 39 deletions
|
|
@ -4,9 +4,12 @@
|
|||
|
||||
package akka.actor.typed
|
||||
|
||||
import akka.actor
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import akka.actor.testkit.typed.scaladsl._
|
||||
import org.scalatest.{ Matchers, WordSpec, WordSpecLike }
|
||||
import akka.testkit.EventFilter
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
import org.scalatest.{Matchers, WordSpec, WordSpecLike}
|
||||
|
||||
object OrElseStubbedSpec {
|
||||
|
||||
|
|
@ -78,10 +81,15 @@ class OrElseStubbedSpec extends WordSpec with Matchers {
|
|||
|
||||
}
|
||||
|
||||
class OrElseSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
||||
class OrElseSpec extends ScalaTestWithActorTestKit("""
|
||||
akka.loglevel = DEBUG # test verifies debug
|
||||
akka.loggers = ["akka.testkit.TestEventListener"]
|
||||
""") with WordSpecLike {
|
||||
|
||||
import OrElseStubbedSpec._
|
||||
|
||||
implicit val untyped: actor.ActorSystem = system.toUntyped
|
||||
|
||||
"Behavior.orElse" must {
|
||||
"work for deferred behavior on the left" in {
|
||||
val orElseDeferred = Behaviors
|
||||
|
|
@ -105,6 +113,7 @@ class OrElseSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
case PingInfinite(replyTo) =>
|
||||
replyTo ! Pong(-1)
|
||||
Behaviors.same
|
||||
case _ => Behaviors.unhandled
|
||||
}
|
||||
})
|
||||
|
||||
|
|
@ -113,49 +122,99 @@ class OrElseSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
p ! PingInfinite(probe.ref)
|
||||
probe.expectMessage(Pong(-1))
|
||||
}
|
||||
}
|
||||
|
||||
"handle nested OrElse" in {
|
||||
"handle nested OrElse" in {
|
||||
|
||||
sealed trait Parent
|
||||
final case class Add(o: Any) extends Parent
|
||||
final case class Remove(o: Any) extends Parent
|
||||
final case class Stack(s: ActorRef[Array[StackTraceElement]]) extends Parent
|
||||
final case class Get(s: ActorRef[Set[Any]]) extends Parent
|
||||
sealed trait Parent
|
||||
final case class Add(o: Any) extends Parent
|
||||
final case class Remove(o: Any) extends Parent
|
||||
final case class Stack(s: ActorRef[Array[StackTraceElement]]) extends Parent
|
||||
final case class Get(s: ActorRef[Set[Any]]) extends Parent
|
||||
|
||||
def dealer(set: Set[Any]): Behavior[Parent] = {
|
||||
val add = Behaviors.receiveMessage[Parent] {
|
||||
case Add(o) => dealer(set + o)
|
||||
case _ => Behaviors.unhandled
|
||||
def dealer(set: Set[Any]): Behavior[Parent] = {
|
||||
val add = Behaviors.receiveMessage[Parent] {
|
||||
case Add(o) => dealer(set + o)
|
||||
case _ => Behaviors.unhandled
|
||||
}
|
||||
val remove = Behaviors.receiveMessage[Parent] {
|
||||
case Remove(o) => dealer(set - o)
|
||||
case _ => Behaviors.unhandled
|
||||
}
|
||||
val getStack = Behaviors.receiveMessagePartial[Parent] {
|
||||
case Stack(sender) =>
|
||||
sender ! Thread.currentThread().getStackTrace
|
||||
Behaviors.same
|
||||
}
|
||||
val getSet = Behaviors.receiveMessagePartial[Parent] {
|
||||
case Get(sender) =>
|
||||
sender ! set
|
||||
Behaviors.same
|
||||
}
|
||||
add.orElse(remove).orElse(getStack).orElse(getSet)
|
||||
}
|
||||
val remove = Behaviors.receiveMessage[Parent] {
|
||||
case Remove(o) => dealer(set - o)
|
||||
case _ => Behaviors.unhandled
|
||||
|
||||
val y = spawn(dealer(Set.empty))
|
||||
|
||||
(0 to 10000).foreach { i =>
|
||||
y ! Add(i)
|
||||
}
|
||||
val getStack = Behaviors.receiveMessagePartial[Parent] {
|
||||
case Stack(sender) =>
|
||||
sender ! Thread.currentThread().getStackTrace
|
||||
Behaviors.same
|
||||
(0 to 9999).foreach { i =>
|
||||
y ! Remove(i)
|
||||
}
|
||||
val getSet = Behaviors.receiveMessagePartial[Parent] {
|
||||
case Get(sender) =>
|
||||
sender ! set
|
||||
Behaviors.same
|
||||
}
|
||||
add.orElse(remove).orElse(getStack).orElse(getSet)
|
||||
val probe = TestProbe[Set[Any]]
|
||||
y ! Get(probe.ref)
|
||||
probe.expectMessage(Set[Any](10000))
|
||||
|
||||
}
|
||||
|
||||
val y = spawn(dealer(Set.empty))
|
||||
|
||||
(0 to 10000).foreach { i =>
|
||||
y ! Add(i)
|
||||
"pass unhandled Terminated along" in {
|
||||
val probe = TestProbe[String]()
|
||||
spawn(Behaviors.setup[String] { ctx =>
|
||||
|
||||
// arrange with a deathwatch triggering
|
||||
ctx.watch(ctx.spawnAnonymous(Behavior.stopped[String]))
|
||||
|
||||
Behaviors.receiveSignal[String] {
|
||||
case (_, PreRestart) =>
|
||||
Behaviors.same
|
||||
}.orElse(Behaviors.receiveSignal {
|
||||
case (_, Terminated(_)) =>
|
||||
probe.ref ! "orElse saw it"
|
||||
Behaviors.same
|
||||
})
|
||||
})
|
||||
|
||||
probe.expectMessage("orElse saw it")
|
||||
}
|
||||
(0 to 9999).foreach { i =>
|
||||
y ! Remove(i)
|
||||
|
||||
"pass unhandled Terminated along and fail if alternative doesn't handle either" in {
|
||||
val probe = TestProbe[String]()
|
||||
|
||||
val ref =
|
||||
EventFilter[DeathPactException](occurrences = 1).intercept {
|
||||
spawn(Behaviors.setup[String] { ctx =>
|
||||
|
||||
// arrange with a deathwatch triggering
|
||||
ctx.watch(ctx.spawnAnonymous(Behavior.stopped[String]))
|
||||
|
||||
Behaviors.receiveSignal[String] {
|
||||
case (_, Terminated(_)) =>
|
||||
probe.ref ! "first handler saw it"
|
||||
Behavior.unhandled
|
||||
}.orElse(Behaviors.receiveSignal {
|
||||
case (_, Terminated(_)) =>
|
||||
probe.ref ! "second handler saw it"
|
||||
Behavior.unhandled
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
probe.expectMessage("first handler saw it")
|
||||
probe.expectMessage("second handler saw it")
|
||||
probe.expectTerminated(ref)
|
||||
}
|
||||
val probe = TestProbe[Set[Any]]
|
||||
y ! Get(probe.ref)
|
||||
probe.expectMessage(Set[Any](10000))
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ abstract class Behavior[T] { behavior =>
|
|||
@InternalApi private[akka] final def unsafeCast[U]: Behavior[U] = this.asInstanceOf[Behavior[U]]
|
||||
|
||||
/**
|
||||
* Composes this `Behavior with a fallback `Behavior` which
|
||||
* Composes this `Behavior` with a fallback `Behavior` which
|
||||
* is used when this `Behavior` doesn't handle the message or signal, i.e.
|
||||
* when `unhandled` is returned.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -80,7 +80,17 @@ import akka.actor.typed.scaladsl.{ ActorContext => SAC }
|
|||
}
|
||||
|
||||
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = {
|
||||
Behavior.interpretSignal(first, ctx, msg) match {
|
||||
val result: Behavior[T] = try {
|
||||
Behavior.interpretSignal(first, ctx, msg)
|
||||
} catch {
|
||||
case _: DeathPactException =>
|
||||
// since we don't know what kind of concrete Behavior `first` is, if it is intercepted etc.
|
||||
// the only way we can fallback to second behavior if Terminated wasn't handled is to
|
||||
// catch the DeathPact here and pretend like it was just `unhandled`
|
||||
Behavior.unhandled
|
||||
}
|
||||
|
||||
result match {
|
||||
case _: UnhandledBehavior.type => Behavior.interpretSignal(second, ctx, msg)
|
||||
case handled => handled
|
||||
}
|
||||
|
|
|
|||
|
|
@ -150,9 +150,15 @@ import akka.annotation.InternalApi
|
|||
}
|
||||
|
||||
override def unhandled(msg: Any): Unit = msg match {
|
||||
case Terminated(ref) => throw DeathPactException(ref)
|
||||
case _: Signal => // that's ok
|
||||
case other => super.unhandled(other)
|
||||
|
||||
case Terminated(ref) =>
|
||||
// this should never get here, because if it did, the death pact could
|
||||
// not be supervised - interpretSignal is where this actually happens
|
||||
throw DeathPactException(ref)
|
||||
case _: Signal =>
|
||||
// that's ok
|
||||
case other =>
|
||||
super.unhandled(other)
|
||||
}
|
||||
|
||||
override val supervisorStrategy = untyped.OneForOneStrategy(loggingEnabled = false) {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue