From 9f36e8e647048c36595d857ff89ea5c8d315bcba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 18 Mar 2019 12:45:16 +0100 Subject: [PATCH] Expected behavior on Terminated with orElse signal handling #26518 --- .../scala/akka/actor/typed/OrElseSpec.scala | 127 +++++++++++++----- .../scala/akka/actor/typed/Behavior.scala | 2 +- .../actor/typed/internal/BehaviorImpl.scala | 12 +- .../typed/internal/adapter/ActorAdapter.scala | 12 +- 4 files changed, 114 insertions(+), 39 deletions(-) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/OrElseSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/OrElseSpec.scala index a92b6fcfb0..dc1dd0003e 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/OrElseSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/OrElseSpec.scala @@ -4,9 +4,12 @@ package akka.actor.typed +import akka.actor import akka.actor.typed.scaladsl.Behaviors import akka.actor.testkit.typed.scaladsl._ -import org.scalatest.{ Matchers, WordSpec, WordSpecLike } +import akka.testkit.EventFilter +import akka.actor.typed.scaladsl.adapter._ +import org.scalatest.{Matchers, WordSpec, WordSpecLike} object OrElseStubbedSpec { @@ -78,10 +81,15 @@ class OrElseStubbedSpec extends WordSpec with Matchers { } -class OrElseSpec extends ScalaTestWithActorTestKit with WordSpecLike { +class OrElseSpec extends ScalaTestWithActorTestKit(""" + akka.loglevel = DEBUG # test verifies debug + akka.loggers = ["akka.testkit.TestEventListener"] + """) with WordSpecLike { import OrElseStubbedSpec._ + implicit val untyped: actor.ActorSystem = system.toUntyped + "Behavior.orElse" must { "work for deferred behavior on the left" in { val orElseDeferred = Behaviors @@ -105,6 +113,7 @@ class OrElseSpec extends ScalaTestWithActorTestKit with WordSpecLike { case PingInfinite(replyTo) => replyTo ! Pong(-1) Behaviors.same + case _ => Behaviors.unhandled } }) @@ -113,49 +122,99 @@ class OrElseSpec extends ScalaTestWithActorTestKit with WordSpecLike { p ! PingInfinite(probe.ref) probe.expectMessage(Pong(-1)) } - } - "handle nested OrElse" in { + "handle nested OrElse" in { - sealed trait Parent - final case class Add(o: Any) extends Parent - final case class Remove(o: Any) extends Parent - final case class Stack(s: ActorRef[Array[StackTraceElement]]) extends Parent - final case class Get(s: ActorRef[Set[Any]]) extends Parent + sealed trait Parent + final case class Add(o: Any) extends Parent + final case class Remove(o: Any) extends Parent + final case class Stack(s: ActorRef[Array[StackTraceElement]]) extends Parent + final case class Get(s: ActorRef[Set[Any]]) extends Parent - def dealer(set: Set[Any]): Behavior[Parent] = { - val add = Behaviors.receiveMessage[Parent] { - case Add(o) => dealer(set + o) - case _ => Behaviors.unhandled + def dealer(set: Set[Any]): Behavior[Parent] = { + val add = Behaviors.receiveMessage[Parent] { + case Add(o) => dealer(set + o) + case _ => Behaviors.unhandled + } + val remove = Behaviors.receiveMessage[Parent] { + case Remove(o) => dealer(set - o) + case _ => Behaviors.unhandled + } + val getStack = Behaviors.receiveMessagePartial[Parent] { + case Stack(sender) => + sender ! Thread.currentThread().getStackTrace + Behaviors.same + } + val getSet = Behaviors.receiveMessagePartial[Parent] { + case Get(sender) => + sender ! set + Behaviors.same + } + add.orElse(remove).orElse(getStack).orElse(getSet) } - val remove = Behaviors.receiveMessage[Parent] { - case Remove(o) => dealer(set - o) - case _ => Behaviors.unhandled + + val y = spawn(dealer(Set.empty)) + + (0 to 10000).foreach { i => + y ! Add(i) } - val getStack = Behaviors.receiveMessagePartial[Parent] { - case Stack(sender) => - sender ! Thread.currentThread().getStackTrace - Behaviors.same + (0 to 9999).foreach { i => + y ! Remove(i) } - val getSet = Behaviors.receiveMessagePartial[Parent] { - case Get(sender) => - sender ! set - Behaviors.same - } - add.orElse(remove).orElse(getStack).orElse(getSet) + val probe = TestProbe[Set[Any]] + y ! Get(probe.ref) + probe.expectMessage(Set[Any](10000)) + } - val y = spawn(dealer(Set.empty)) - (0 to 10000).foreach { i => - y ! Add(i) + "pass unhandled Terminated along" in { + val probe = TestProbe[String]() + spawn(Behaviors.setup[String] { ctx => + + // arrange with a deathwatch triggering + ctx.watch(ctx.spawnAnonymous(Behavior.stopped[String])) + + Behaviors.receiveSignal[String] { + case (_, PreRestart) => + Behaviors.same + }.orElse(Behaviors.receiveSignal { + case (_, Terminated(_)) => + probe.ref ! "orElse saw it" + Behaviors.same + }) + }) + + probe.expectMessage("orElse saw it") } - (0 to 9999).foreach { i => - y ! Remove(i) + + "pass unhandled Terminated along and fail if alternative doesn't handle either" in { + val probe = TestProbe[String]() + + val ref = + EventFilter[DeathPactException](occurrences = 1).intercept { + spawn(Behaviors.setup[String] { ctx => + + // arrange with a deathwatch triggering + ctx.watch(ctx.spawnAnonymous(Behavior.stopped[String])) + + Behaviors.receiveSignal[String] { + case (_, Terminated(_)) => + probe.ref ! "first handler saw it" + Behavior.unhandled + }.orElse(Behaviors.receiveSignal { + case (_, Terminated(_)) => + probe.ref ! "second handler saw it" + Behavior.unhandled + }) + }) + } + + probe.expectMessage("first handler saw it") + probe.expectMessage("second handler saw it") + probe.expectTerminated(ref) } - val probe = TestProbe[Set[Any]] - y ! Get(probe.ref) - probe.expectMessage(Set[Any](10000)) + } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index edc7e4cc06..82df786756 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -59,7 +59,7 @@ abstract class Behavior[T] { behavior => @InternalApi private[akka] final def unsafeCast[U]: Behavior[U] = this.asInstanceOf[Behavior[U]] /** - * Composes this `Behavior with a fallback `Behavior` which + * Composes this `Behavior` with a fallback `Behavior` which * is used when this `Behavior` doesn't handle the message or signal, i.e. * when `unhandled` is returned. * diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index e2ff7cc899..dc82ee8561 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -80,7 +80,17 @@ import akka.actor.typed.scaladsl.{ ActorContext => SAC } } override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = { - Behavior.interpretSignal(first, ctx, msg) match { + val result: Behavior[T] = try { + Behavior.interpretSignal(first, ctx, msg) + } catch { + case _: DeathPactException => + // since we don't know what kind of concrete Behavior `first` is, if it is intercepted etc. + // the only way we can fallback to second behavior if Terminated wasn't handled is to + // catch the DeathPact here and pretend like it was just `unhandled` + Behavior.unhandled + } + + result match { case _: UnhandledBehavior.type => Behavior.interpretSignal(second, ctx, msg) case handled => handled } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 80fc5b3e22..ec8290da1f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -150,9 +150,15 @@ import akka.annotation.InternalApi } override def unhandled(msg: Any): Unit = msg match { - case Terminated(ref) => throw DeathPactException(ref) - case _: Signal => // that's ok - case other => super.unhandled(other) + + case Terminated(ref) => + // this should never get here, because if it did, the death pact could + // not be supervised - interpretSignal is where this actually happens + throw DeathPactException(ref) + case _: Signal => + // that's ok + case other => + super.unhandled(other) } override val supervisorStrategy = untyped.OneForOneStrategy(loggingEnabled = false) {