diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala index 412e2db50e..ebc9339b22 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/MessageAdapterSpec.scala @@ -6,8 +6,10 @@ package akka.actor.typed.scaladsl import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior import akka.actor.typed.PostStop import akka.actor.typed.Props +import akka.actor.typed.SupervisorStrategy import akka.actor.typed.TestException import akka.actor.typed.TypedAkkaSpecWithShutdown import akka.testkit.EventFilter @@ -51,7 +53,7 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { val probe = TestProbe[AnotherPong]() - val snitch = Behaviors.setup[AnotherPong] { (ctx) ⇒ + val snitch = Behaviors.setup[AnotherPong] { ctx ⇒ val replyTo = ctx.messageAdapter[Response](_ ⇒ AnotherPong(ctx.self.path.name, Thread.currentThread().getName)) @@ -62,10 +64,9 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { AnotherPong(ctx.self.path.name, Thread.currentThread().getName)) pingPong ! Ping(replyTo2) - Behaviors.receive { - case (_, anotherPong: AnotherPong) ⇒ - probe.ref ! anotherPong - Behaviors.same + Behaviors.receiveMessage { anotherPong ⇒ + probe.ref ! anotherPong + Behaviors.same } } @@ -91,20 +92,18 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { case class Wrapped(qualifier: String, response: Response) - val pingPong = spawn(Behaviors.receive[Ping] { (_, msg) ⇒ - msg match { - case Ping1(sender) ⇒ - sender ! Pong1("hello-1") - Behaviors.same - case Ping2(sender) ⇒ - sender ! Pong2("hello-2") - Behaviors.same - } + val pingPong = spawn(Behaviors.receiveMessage[Ping] { + case Ping1(sender) ⇒ + sender ! Pong1("hello-1") + Behaviors.same + case Ping2(sender) ⇒ + sender ! Pong2("hello-2") + Behaviors.same }) val probe = TestProbe[Wrapped]() - val snitch = Behaviors.setup[Wrapped] { (ctx) ⇒ + val snitch = Behaviors.setup[Wrapped] { ctx ⇒ ctx.messageAdapter[Response](pong ⇒ Wrapped(qualifier = "wrong", pong)) // this is replaced val replyTo1: ActorRef[Response] = ctx.messageAdapter(pong ⇒ Wrapped(qualifier = "1", pong)) @@ -112,10 +111,9 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { pingPong ! Ping1(replyTo1) pingPong ! Ping2(replyTo2) - Behaviors.receive { - case (_, wrapped) ⇒ - probe.ref ! wrapped - Behaviors.same + Behaviors.receiveMessage { wrapped ⇒ + probe.ref ! wrapped + Behaviors.same } } @@ -135,21 +133,19 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { case class Wrapped(qualifier: String, response: Response) - val pingPong = spawn(Behaviors.receive[Ping] { (_, msg) ⇒ - msg match { - case Ping1(sender) ⇒ - sender ! Pong1("hello-1") - Behaviors.same - case Ping2(sender) ⇒ - // doing something terribly wrong - sender ! Pong2("hello-2") - Behaviors.same - } + val pingPong = spawn(Behaviors.receiveMessage[Ping] { + case Ping1(sender) ⇒ + sender ! Pong1("hello-1") + Behaviors.same + case Ping2(sender) ⇒ + // doing something terribly wrong + sender ! Pong2("hello-2") + Behaviors.same }) val probe = TestProbe[Wrapped]() - val snitch = Behaviors.setup[Wrapped] { (ctx) ⇒ + val snitch = Behaviors.setup[Wrapped] { ctx ⇒ val replyTo1 = ctx.messageAdapter[Pong1](pong ⇒ Wrapped(qualifier = "1", pong)) pingPong ! Ping1(replyTo1) @@ -158,10 +154,9 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { pingPong ! Ping2(replyTo1.asInstanceOf[ActorRef[Pong2]]) pingPong ! Ping1(replyTo1) - Behaviors.receive { - case (_, wrapped) ⇒ - probe.ref ! wrapped - Behaviors.same + Behaviors.receiveMessage { wrapped ⇒ + probe.ref ! wrapped + Behaviors.same } } @@ -179,14 +174,14 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { case class Pong(greeting: String) case class Wrapped(count: Int, response: Pong) - val pingPong = spawn(Behaviors.receive[Ping] { (_, ping) ⇒ + val pingPong = spawn(Behaviors.receiveMessage[Ping] { ping ⇒ ping.sender ! Pong("hello") Behaviors.same }) val probe = TestProbe[Any]() - val snitch = Behaviors.setup[Wrapped] { (ctx) ⇒ + val snitch = Behaviors.setup[Wrapped] { ctx ⇒ var count = 0 val replyTo = ctx.messageAdapter[Pong] { pong ⇒ @@ -198,10 +193,9 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { pingPong ! Ping(replyTo) } - Behaviors.receive[Wrapped] { - case (_, wrapped) ⇒ - probe.ref ! wrapped - Behaviors.same + Behaviors.receiveMessage[Wrapped] { wrapped ⇒ + probe.ref ! wrapped + Behaviors.same }.receiveSignal { case (_, PostStop) ⇒ probe.ref ! "stopped" @@ -219,9 +213,56 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { probe.expectMessage(Wrapped(2, Pong("hello"))) // exception was thrown for 3 - // FIXME One thing to be aware of is that the supervision strategy of the Behavior is not - // used for exceptions from adapters. Should we instead catch, log, unhandled, and resume? - // It's kind of "before" the message arrives. + probe.expectMessage("stopped") + } + + "not catch exception thrown after adapter, when processing the message" in { + case class Ping(sender: ActorRef[Pong]) + case class Pong(greeting: String) + case class Wrapped(response: Pong) + + val pingPong = spawn(Behaviors.receiveMessage[Ping] { ping ⇒ + ping.sender ! Pong("hello") + Behaviors.same + }) + + val probe = TestProbe[Any]() + + val snitch = Behaviors.setup[Wrapped] { ctx ⇒ + + val replyTo = ctx.messageAdapter[Pong] { pong ⇒ + Wrapped(pong) + } + (1 to 5).foreach { _ ⇒ + pingPong ! Ping(replyTo) + } + + def behv(count: Int): Behavior[Wrapped] = Behaviors.receiveMessage[Wrapped] { wrapped ⇒ + probe.ref ! count + if (count == 3) { + throw new TestException("boom") + } + behv(count + 1) + }.receiveSignal { + case (_, PostStop) ⇒ + probe.ref ! "stopped" + Behaviors.same + } + + behv(count = 1) + } + + EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 2).intercept { + // Not expecting "Exception thrown out of adapter. Stopping myself" + EventFilter[TestException](message = "boom", occurrences = 1).intercept { + spawn(snitch) + } + } + + probe.expectMessage(1) + probe.expectMessage(2) + probe.expectMessage(3) + // exception was thrown for 3 probe.expectMessage("stopped") } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 0dc7f23559..c422bc895e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -7,10 +7,13 @@ package internal package adapter import scala.annotation.tailrec +import scala.util.Failure +import scala.util.Success +import scala.util.Try + import akka.{ actor ⇒ a } import akka.annotation.InternalApi import akka.util.OptionVal - import scala.util.control.NonFatal /** @@ -103,13 +106,15 @@ import scala.util.control.NonFatal handle(ctx.messageAdapters) } - private def withSafelyAdapted[U, V](adapt: () ⇒ U)(body: U ⇒ V): Unit = - try body(adapt()) - catch { - case NonFatal(ex) ⇒ + private def withSafelyAdapted[U, V](adapt: () ⇒ U)(body: U ⇒ V): Unit = { + Try(adapt()) match { + case Success(a) ⇒ + body(a) + case Failure(ex) ⇒ log.error(ex, "Exception thrown out of adapter. Stopping myself.") context.stop(self) } + } override def unhandled(msg: Any): Unit = msg match { case t @ Terminated(ref) ⇒ throw DeathPactException(ref)