diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala index c0607d108c..6d5de9ec79 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/AskSpec.scala @@ -181,7 +181,7 @@ class AskSpec extends ScalaTestWithActorTestKit(""" val Question(replyRef2) = probe.expectMessageType[Question] LoggingTestKit - .error("Exception thrown out of adapter. Stopping myself.") + .error("Unsupported number") .expect { replyRef2 ! 42L }(system) diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 833dfee28c..5f8c9c70e8 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -12,7 +12,6 @@ import java.util.concurrent.atomic.AtomicInteger import scala.concurrent.duration._ import scala.util.control.NoStackTrace - import akka.actor.ActorInitializationException import akka.actor.Dropped import akka.actor.testkit.typed._ @@ -28,6 +27,10 @@ import org.slf4j.event.Level import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.{ AnyWordSpec, AnyWordSpecLike } +import scala.concurrent.Future +import scala.util.Failure +import scala.util.Success + object SupervisionSpec { sealed trait Command @@ -1322,6 +1325,44 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" probe.expectTerminated(wrong) } + "apply supervision to adapter function" in { + val probe = createTestProbe[String]() + val ref = testKit.spawn( + Behaviors + .supervise(Behaviors.setup[String] { context => + probe.ref ! "Starting" + Behaviors + .receiveMessage[String] { + case "future-boom" => + implicit val ec = context.executionContext + // throw an exception from the adapt function + context.pipeToSelf(Future[String] { + throw TestException("thrown in adapter") + }) { + case Success(msg) => msg + case Failure(exception) => throw exception + } + Behaviors.same + case other => + probe.ref ! other + Behaviors.same + } + .receiveSignal { + case (_, PreRestart) => + probe.ref ! "PreRestart" + Behaviors.same + } + }) + .onFailure[TestException](SupervisorStrategy.restart)) + + probe.expectMessage("Starting") + ref ! "future-boom" + probe.expectMessage("PreRestart") + probe.expectMessage("Starting") + ref ! "message" + probe.expectMessage("message") + } + } val allStrategies = Seq( diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/javadsl/AdaptationFailureSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/javadsl/AdaptationFailureSpec.scala new file mode 100644 index 0000000000..0204232c27 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/javadsl/AdaptationFailureSpec.scala @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.actor.typed.javadsl + +import akka.Done +import akka.actor.testkit.typed.TestException +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.Behavior +import akka.actor.typed.MessageAdaptionFailure +import akka.actor.typed.PreRestart +import akka.actor.typed.Terminated +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.Promise + +object AdaptationFailureSpec { + def emptyAbstractBehavior: Behavior[Any] = Behaviors.setup(new EmptyAbstractBehavior(_)) + class EmptyAbstractBehavior(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) { + protected def createReceive: Receive[Any] = newReceiveBuilder.build() + } + + def abstractBehaviorHandlingOtherSignals: Behavior[Any] = Behaviors.setup(new AbstractBehaviorHandlingOtherSignals(_)) + class AbstractBehaviorHandlingOtherSignals(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) { + protected def createReceive: Receive[Any] = + newReceiveBuilder.onSignal(classOf[PreRestart], (_: PreRestart) => Behaviors.same).build() + } + + def abstractBehaviorHandlingMessageAdaptationFailure: Behavior[Any] = + Behaviors.setup(new AbstractBehaviorHandlingMessageAdaptationFailure(_)) + class AbstractBehaviorHandlingMessageAdaptationFailure(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) { + protected def createReceive: Receive[Any] = + newReceiveBuilder.onSignal(classOf[MessageAdaptionFailure], (_: MessageAdaptionFailure) => Behaviors.same).build() + } +} + +class AdaptationFailureSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { + + import AdaptationFailureSpec._ + + val crashingBehaviors: List[(String, Behavior[Any])] = + "AbstractBehavior" -> emptyAbstractBehavior :: + "AbstractBehavior handling other signals" -> abstractBehaviorHandlingOtherSignals :: + Nil + + val nonCrashingBehaviors: List[(String, Behavior[Any])] = + "AbstractBehavior handling MessageAdaptationFailure" -> abstractBehaviorHandlingMessageAdaptationFailure :: + Nil + + "Failure in an adapter" must { + + crashingBehaviors.foreach { + case (name, behavior) => + s"default to crash the actor or $name" in { + val probe = createTestProbe() + val ref = spawn(Behaviors.setup[Any] { ctx => + val adapter = ctx.messageAdapter[Any](classOf[Any], _ => throw TestException("boom")) + adapter ! "go boom" + + behavior + }) + probe.expectTerminated(ref) + } + } + + nonCrashingBehaviors.foreach { + case (name, behavior) => + s"ignore the failure for $name" in { + val probe = createTestProbe[Any]() + val threw = Promise[Done]() + val ref = spawn(Behaviors.setup[Any] { ctx => + val adapter = ctx.messageAdapter[Any](classOf[Any], { _ => + threw.success(Done) + throw TestException("boom") + }) + adapter ! "go boom" + behavior + }) + spawn(Behaviors.setup[Any] { ctx => + ctx.watch(ref) + + Behaviors.receiveSignal { + case (_, Terminated(`ref`)) => + probe.ref ! "actor-stopped" + Behaviors.same + } + }) + + probe.expectNoMessage() + } + } + } + +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/AdaptationFailureSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/AdaptationFailureSpec.scala new file mode 100644 index 0000000000..36d3098ac0 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/AdaptationFailureSpec.scala @@ -0,0 +1,120 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.actor.typed.scaladsl + +import akka.Done +import akka.actor.testkit.typed.TestException +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.Behavior +import akka.actor.typed.MessageAdaptionFailure +import akka.actor.typed.PreRestart +import akka.actor.typed.Signal +import akka.actor.typed.Terminated +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.Promise + +object AdaptationFailureSpec { + + def emptyAbstractBehavior: Behavior[Any] = Behaviors.setup(new EmptyAbstractBehavior(_)) + class EmptyAbstractBehavior(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) { + def onMessage(msg: Any): Behavior[Any] = this + } + + def abstractBehaviorHandlingOtherSignals: Behavior[Any] = Behaviors.setup(new AbstractBehaviorHandlingOtherSignals(_)) + class AbstractBehaviorHandlingOtherSignals(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) { + + def onMessage(msg: Any): Behavior[Any] = this + + override def onSignal: PartialFunction[Signal, Behavior[Any]] = { + case PreRestart => Behaviors.same + } + } + + def abstractBehaviorHandlingMessageAdaptionFailure: Behavior[Any] = + Behaviors.setup(new AbstractBehaviorHandlingMessageAdaptionFailure(_)) + class AbstractBehaviorHandlingMessageAdaptionFailure(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) { + + def onMessage(msg: Any): Behavior[Any] = this + + override def onSignal: PartialFunction[Signal, Behavior[Any]] = { + case MessageAdaptionFailure(_) => Behaviors.same + } + } + +} +class AdaptationFailureSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { + + import AdaptationFailureSpec._ + + val crashingBehaviors: List[(String, Behavior[Any])] = + "receive" -> Behaviors.receive[Any]((_, _) => Behaviors.same) :: + "receiveMessage" -> Behaviors.receiveMessage[Any](_ => Behaviors.same) :: + "receivePartial" -> Behaviors.receivePartial[Any](PartialFunction.empty) :: + "receiveSignal" -> Behaviors.receiveSignal[Any](PartialFunction.empty) :: + "receiveSignal not catching adaption failure" -> + Behaviors.receiveSignal[Any] { + case (_, PreRestart) => Behaviors.same + } :: + "AbstractBehavior" -> emptyAbstractBehavior :: + "AbstractBehavior handling other signals" -> abstractBehaviorHandlingOtherSignals :: + Nil + + val nonCrashingBehaviors: List[(String, Behavior[Any])] = + "empty" -> Behaviors.empty[Any] :: + "ignore" -> Behaviors.ignore[Any] :: + "receiveSignal catching adaption failure" -> + Behaviors.receiveSignal[Any] { + case (_, MessageAdaptionFailure(_)) => Behaviors.same + } :: + "AbstractBehavior handling MessageAdaptionFailure" -> abstractBehaviorHandlingMessageAdaptionFailure :: + Nil + + "Failure in an adapter" must { + + crashingBehaviors.foreach { + case (name, behavior) => + s"default to crash the actor or $name" in { + val probe = createTestProbe() + val ref = spawn(Behaviors.setup[Any] { ctx => + val adapter = ctx.messageAdapter[Any](_ => throw TestException("boom")) + adapter ! "go boom" + + behavior + }) + probe.expectTerminated(ref) + } + } + + nonCrashingBehaviors.foreach { + case (name, behavior) => + s"ignore the failure for $name" in { + val probe = createTestProbe[Any]() + val threw = Promise[Done]() + val ref = spawn(Behaviors.setup[Any] { ctx => + val adapter = ctx.messageAdapter[Any] { _ => + threw.success(Done) + throw TestException("boom") + } + adapter ! "go boom" + behavior + }) + spawn(Behaviors.setup[Any] { ctx => + ctx.watch(ref) + + Behaviors.receiveSignal { + case (_, Terminated(`ref`)) => + probe.ref ! "actor-stopped" + Behaviors.same + } + }) + + probe.expectNoMessage() + } + } + + } +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 30f8da4937..0754296668 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -4,10 +4,6 @@ package akka.actor.typed -import scala.annotation.switch -import scala.annotation.tailrec -import scala.reflect.ClassTag - import akka.actor.InvalidMessageException import akka.actor.typed.internal.BehaviorImpl import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior @@ -17,6 +13,10 @@ import akka.actor.typed.internal.InterceptorImpl import akka.annotation.DoNotInherit import akka.annotation.InternalApi +import scala.annotation.switch +import scala.annotation.tailrec +import scala.reflect.ClassTag + /** * The behavior of an actor defines how it reacts to the messages that it * receives. The message may either be of the type that the Actor declares diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala index 344628a4ce..0129de741d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/MessageAndSignals.scala @@ -107,3 +107,9 @@ final class ChildFailed(ref: ActorRef[Nothing], val cause: Throwable) extends Te case _ => false } } + +/** + * Signal passed to the actor when a message adapter has thrown an exception adapting an incoming message. + * Default signal handlers will re-throw the exception so that such failures are handled by supervision. + */ +final case class MessageAdaptionFailure(exception: Throwable) extends Signal diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index 66078c3e5e..d1f3788305 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -61,7 +61,8 @@ private[akka] object BehaviorTags { def failed[T](cause: Throwable): Behavior[T] = new FailedBehavior(cause).asInstanceOf[Behavior[T]] val unhandledSignal: PartialFunction[(TypedActorContext[Nothing], Signal), Behavior[Nothing]] = { - case (_, _) => UnhandledBehavior + case (_, MessageAdaptionFailure(ex)) => throw ex + case (_, _) => UnhandledBehavior } private object EmptyBehavior extends Behavior[Any](BehaviorTags.EmptyBehavior) { @@ -126,10 +127,11 @@ private[akka] object BehaviorTags { BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]]) extends ExtensibleBehavior[T] { - override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = + override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = { onSignal.applyOrElse( (ctx.asScala, msg), BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]]) + } override def receive(ctx: AC[T], msg: T) = onMessage(ctx.asScala, msg) @@ -149,10 +151,11 @@ private[akka] object BehaviorTags { override def receive(ctx: AC[T], msg: T) = onMessage(msg) - override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = + override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = { onSignal.applyOrElse( (ctx.asScala, msg), BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]]) + } override def toString = s"ReceiveMessage(${LineNumbers(onMessage)})" } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index bc12717abb..ba7285606a 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -8,21 +8,20 @@ package adapter import java.lang.reflect.InvocationTargetException -import akka.actor.{ ActorInitializationException, ActorRefWithCell } -import akka.{ actor => classic } import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior import akka.actor.typed.internal.BehaviorImpl.StoppedBehavior -import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException -import akka.annotation.InternalApi - -import scala.annotation.tailrec -import scala.util.Failure -import scala.util.Success -import scala.util.Try -import scala.util.control.Exception.Catcher -import scala.annotation.switch import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg +import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException +import akka.actor.ActorInitializationException +import akka.actor.ActorRefWithCell +import akka.annotation.InternalApi import akka.util.OptionVal +import akka.{ actor => classic } + +import scala.annotation.switch +import scala.annotation.tailrec +import scala.util.control.Exception.Catcher +import scala.util.control.NonFatal /** * INTERNAL API @@ -181,15 +180,16 @@ import akka.util.OptionVal } private def withSafelyAdapted[U, V](adapt: () => U)(body: U => V): Unit = { - Try(adapt()) match { - case Success(null) => + try { + val a = adapt() + if (a != null) body(a) + else ctx.log.warn( "Adapter function returned null which is not valid as an actor message, ignoring. This can happen for example when using pipeToSelf and returning null from the adapt function. Null value is ignored and not passed on to actor.") - case Success(a) => - body(a) - case Failure(ex) => - ctx.log.error(s"Exception thrown out of adapter. Stopping myself. ${ex.getMessage}", ex) - context.stop(self) + } catch { + case NonFatal(ex) => + // pass it on through the signal handler chain giving supervision a chance to deal with it + handleSignal(MessageAdaptionFailure(ex)) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ReceiveBuilder.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ReceiveBuilder.scala index aa71b778fa..2c811a86cb 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ReceiveBuilder.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ReceiveBuilder.scala @@ -4,6 +4,8 @@ package akka.actor.typed.javadsl +import akka.actor.typed.MessageAdaptionFailure + import scala.annotation.tailrec import akka.japi.function.Creator import akka.japi.function.{ Function => JFunction } @@ -24,9 +26,16 @@ final class ReceiveBuilder[T] private ( private var messageHandlers: List[ReceiveBuilder.Case[T, T]], private var signalHandlers: List[ReceiveBuilder.Case[T, Signal]]) { - import ReceiveBuilder.Case + import ReceiveBuilder._ - def build(): Receive[T] = new BuiltReceive[T](messageHandlers.reverse, signalHandlers.reverse) + def build(): Receive[T] = { + // signal handlers will often be empty so optimize for that + val builtSignalHandlers = + if (signalHandlers.isEmpty) defaultSignalHandlers[T] + else (adapterExceptionSignalHandler[T] :: signalHandlers).reverse + + new BuiltReceive[T](messageHandlers.reverse, builtSignalHandlers) + } /** * Add a new case to the message handling. @@ -158,6 +167,26 @@ object ReceiveBuilder { test: OptionVal[JPredicate[MT]], handler: JFunction[MT, Behavior[BT]]) + /** INTERNAL API */ + @InternalApi + private val _adapterExceptionSignalHandler = Case[Any, MessageAdaptionFailure]( + OptionVal.Some(classOf[MessageAdaptionFailure]), + OptionVal.None, + failure => throw failure.exception) + + /** INTERNAL API */ + @InternalApi + private def adapterExceptionSignalHandler[T]: Case[T, Signal] = + _adapterExceptionSignalHandler.asInstanceOf[Case[T, Signal]] + + /** INTERNAL API */ + @InternalApi + private val _defaultSignalHandlers = adapterExceptionSignalHandler :: Nil + + /** INTERNAL API */ + @InternalApi + private def defaultSignalHandlers[T] = _defaultSignalHandlers.asInstanceOf[List[Case[T, Signal]]] + } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AbstractBehavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AbstractBehavior.scala index 43b4a91fa9..c0a2c32d05 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AbstractBehavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/AbstractBehavior.scala @@ -4,6 +4,7 @@ package akka.actor.typed.scaladsl +import akka.actor.typed.MessageAdaptionFailure import akka.actor.typed.{ Behavior, ExtensibleBehavior, Signal, TypedActorContext } /** @@ -85,6 +86,9 @@ abstract class AbstractBehavior[T](protected val context: ActorContext[T]) exten @throws(classOf[Exception]) override final def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] = { checkRightContext(ctx) - onSignal.applyOrElse(msg, { case _ => Behaviors.unhandled }: PartialFunction[Signal, Behavior[T]]) + onSignal.applyOrElse(msg, { + case MessageAdaptionFailure(ex) => throw ex + case _ => Behaviors.unhandled + }: PartialFunction[Signal, Behavior[T]]) } }