diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala index 70df741fb1..6f9d4785eb 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala @@ -8,12 +8,16 @@ import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.Behaviors._ import akka.actor.typed.scaladsl.adapter._ +import akka.testkit.EventFilter import akka.testkit.typed.TestKit +import akka.testkit.typed.scaladsl.TestProbe import akka.util.Timeout +import com.typesafe.config.ConfigFactory import org.scalatest.concurrent.ScalaFutures import scala.concurrent.duration._ import scala.concurrent.{ ExecutionContext, TimeoutException } +import scala.util.Success object AskSpec { sealed trait Msg @@ -21,7 +25,9 @@ object AskSpec { final case class Stop(replyTo: ActorRef[Unit]) extends Msg } -class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures { +class AskSpec + extends TestKit("AskSpec", ConfigFactory.parseString("akka.loggers = [ akka.testkit.TestEventListener ]")) + with TypedAkkaSpec with ScalaFutures { import AskSpec._ @@ -41,6 +47,8 @@ class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures { "must fail the future if the actor is already terminated" in { val ref = spawn(behavior) (ref ? Stop).futureValue + val probe = TestProbe() + probe.expectTerminated(ref, probe.remainingOrDefault) val answer = ref ? Foo("bar") val result = answer.failed.futureValue result shouldBe a[TimeoutException] @@ -104,5 +112,46 @@ class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures { akka.testkit.TestKit.shutdownActorSystem(untypedSystem) } } + + "fail asking actor if responder function throws" in { + case class Question(reply: ActorRef[Long]) + + val probe = TestProbe[AnyRef]("probe") + val behv = + Behaviors.immutable[String] { + case (ctx, "start-ask") ⇒ + ctx.ask[Question, Long](probe.ref)(Question(_)) { + case Success(42L) ⇒ + throw new RuntimeException("Unsupported number") + case _ ⇒ "test" + } + Behavior.same + case (ctx, "test") ⇒ + probe.ref ! "got-test" + Behavior.same + case (ctx, "get-state") ⇒ + probe.ref ! "running" + Behavior.same + } + + val ref = spawn(behv) + + ref ! "test" + probe.expectMessage("got-test") + + ref ! "start-ask" + val Question(replyRef) = probe.expectMessageType[Question] + replyRef ! 0L + probe.expectMessage("got-test") + + ref ! "start-ask" + val Question(replyRef2) = probe.expectMessageType[Question] + + EventFilter[RuntimeException](message = "Exception thrown out of adapter. Stopping myself.", occurrences = 1).intercept { + replyRef2 ! 42L + }(system.toUntyped) + + probe.expectTerminated(ref, probe.remainingOrDefault) + } } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala index 7daf93a3d5..7c33906a55 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/AskResponse.scala @@ -13,7 +13,6 @@ import scala.util.Try * Message wrapper used to allow ActorContext.ask to map the response inside the asking actor. */ @InternalApi -private[akka] final class AskResponse[U, T](result: Try[U], adapt: Try[U] ⇒ T) { - - def adapted: T = adapt(result) +private[akka] final class AskResponse[U, T](result: Try[U], adapter: Try[U] ⇒ T) { + def adapt(): T = adapter(result) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala index f545b29984..f89c9419e6 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InternalMessage.scala @@ -16,8 +16,8 @@ import akka.annotation.InternalApi * function. Used by `ActorContext.spawnMessageAdapter` so that the function is * applied in the "parent" actor (for better thread safetey).. */ -@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapt: U ⇒ T) { - def adapted: T = adapt(msg) +@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapter: U ⇒ T) { + def adapt(): T = adapter(msg) } // FIXME move AskResponse in other PR diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 899cd4f0a5..5bd149f16a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -10,10 +10,12 @@ import akka.{ actor ⇒ a } import akka.annotation.InternalApi import akka.util.OptionVal +import scala.util.control.NonFatal + /** * INTERNAL API */ -@InternalApi private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor { +@InternalApi private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor with a.ActorLogging { import Behavior._ import ActorRefAdapter.toUntyped @@ -40,9 +42,9 @@ import akka.util.OptionVal case a.ReceiveTimeout ⇒ next(Behavior.interpretMessage(behavior, ctx, ctx.receiveTimeoutMsg), ctx.receiveTimeoutMsg) case wrapped: AskResponse[Any, T] @unchecked ⇒ - handleMessage(wrapped.adapted) + withSafelyAdapted(() ⇒ wrapped.adapt())(handleMessage) case wrapped: AdaptMessage[Any, T] @unchecked ⇒ - wrapped.adapted match { + withSafelyAdapted(() ⇒ wrapped.adapt()) { case AdaptWithRegisteredMessageAdapter(msg) ⇒ adaptAndHandle(msg) case msg: T @unchecked ⇒ @@ -89,8 +91,7 @@ import akka.util.OptionVal unhandled(msg) case (clazz, f) :: tail ⇒ if (clazz.isAssignableFrom(msg.getClass)) { - val adaptedMsg = f(msg) - handleMessage(adaptedMsg) + withSafelyAdapted(() ⇒ f(msg))(handleMessage) } else handle(tail) // recursive } @@ -98,6 +99,14 @@ import akka.util.OptionVal handle(ctx.messageAdapters) } + private def withSafelyAdapted[U, V](adapt: () ⇒ U)(body: U ⇒ V): Unit = + try body(adapt()) + catch { + case NonFatal(ex) ⇒ + log.error(ex, "Exception thrown out of adapter. Stopping myself.") + context.stop(self) + } + override def unhandled(msg: Any): Unit = msg match { case Terminated(ref) ⇒ throw a.DeathPactException(toUntyped(ref)) case msg: Signal ⇒ // that's ok