Cover adaptation failures with supervision #28592

This commit is contained in:
Johan Andrén 2020-03-11 17:56:30 +01:00 committed by GitHub
parent 717d72ff8f
commit 8721b05a66
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 329 additions and 30 deletions

View file

@ -181,7 +181,7 @@ class AskSpec extends ScalaTestWithActorTestKit("""
val Question(replyRef2) = probe.expectMessageType[Question]
LoggingTestKit
.error("Exception thrown out of adapter. Stopping myself.")
.error("Unsupported number")
.expect {
replyRef2 ! 42L
}(system)

View file

@ -12,7 +12,6 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.actor.ActorInitializationException
import akka.actor.Dropped
import akka.actor.testkit.typed._
@ -28,6 +27,10 @@ import org.slf4j.event.Level
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.{ AnyWordSpec, AnyWordSpecLike }
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
object SupervisionSpec {
sealed trait Command
@ -1322,6 +1325,44 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
probe.expectTerminated(wrong)
}
"apply supervision to adapter function" in {
val probe = createTestProbe[String]()
val ref = testKit.spawn(
Behaviors
.supervise(Behaviors.setup[String] { context =>
probe.ref ! "Starting"
Behaviors
.receiveMessage[String] {
case "future-boom" =>
implicit val ec = context.executionContext
// throw an exception from the adapt function
context.pipeToSelf(Future[String] {
throw TestException("thrown in adapter")
}) {
case Success(msg) => msg
case Failure(exception) => throw exception
}
Behaviors.same
case other =>
probe.ref ! other
Behaviors.same
}
.receiveSignal {
case (_, PreRestart) =>
probe.ref ! "PreRestart"
Behaviors.same
}
})
.onFailure[TestException](SupervisorStrategy.restart))
probe.expectMessage("Starting")
ref ! "future-boom"
probe.expectMessage("PreRestart")
probe.expectMessage("Starting")
ref ! "message"
probe.expectMessage("message")
}
}
val allStrategies = Seq(

View file

@ -0,0 +1,96 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.javadsl
import akka.Done
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.Behavior
import akka.actor.typed.MessageAdaptionFailure
import akka.actor.typed.PreRestart
import akka.actor.typed.Terminated
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.Promise
object AdaptationFailureSpec {
def emptyAbstractBehavior: Behavior[Any] = Behaviors.setup(new EmptyAbstractBehavior(_))
class EmptyAbstractBehavior(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) {
protected def createReceive: Receive[Any] = newReceiveBuilder.build()
}
def abstractBehaviorHandlingOtherSignals: Behavior[Any] = Behaviors.setup(new AbstractBehaviorHandlingOtherSignals(_))
class AbstractBehaviorHandlingOtherSignals(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) {
protected def createReceive: Receive[Any] =
newReceiveBuilder.onSignal(classOf[PreRestart], (_: PreRestart) => Behaviors.same).build()
}
def abstractBehaviorHandlingMessageAdaptationFailure: Behavior[Any] =
Behaviors.setup(new AbstractBehaviorHandlingMessageAdaptationFailure(_))
class AbstractBehaviorHandlingMessageAdaptationFailure(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) {
protected def createReceive: Receive[Any] =
newReceiveBuilder.onSignal(classOf[MessageAdaptionFailure], (_: MessageAdaptionFailure) => Behaviors.same).build()
}
}
class AdaptationFailureSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
import AdaptationFailureSpec._
val crashingBehaviors: List[(String, Behavior[Any])] =
"AbstractBehavior" -> emptyAbstractBehavior ::
"AbstractBehavior handling other signals" -> abstractBehaviorHandlingOtherSignals ::
Nil
val nonCrashingBehaviors: List[(String, Behavior[Any])] =
"AbstractBehavior handling MessageAdaptationFailure" -> abstractBehaviorHandlingMessageAdaptationFailure ::
Nil
"Failure in an adapter" must {
crashingBehaviors.foreach {
case (name, behavior) =>
s"default to crash the actor or $name" in {
val probe = createTestProbe()
val ref = spawn(Behaviors.setup[Any] { ctx =>
val adapter = ctx.messageAdapter[Any](classOf[Any], _ => throw TestException("boom"))
adapter ! "go boom"
behavior
})
probe.expectTerminated(ref)
}
}
nonCrashingBehaviors.foreach {
case (name, behavior) =>
s"ignore the failure for $name" in {
val probe = createTestProbe[Any]()
val threw = Promise[Done]()
val ref = spawn(Behaviors.setup[Any] { ctx =>
val adapter = ctx.messageAdapter[Any](classOf[Any], { _ =>
threw.success(Done)
throw TestException("boom")
})
adapter ! "go boom"
behavior
})
spawn(Behaviors.setup[Any] { ctx =>
ctx.watch(ref)
Behaviors.receiveSignal {
case (_, Terminated(`ref`)) =>
probe.ref ! "actor-stopped"
Behaviors.same
}
})
probe.expectNoMessage()
}
}
}
}

View file

@ -0,0 +1,120 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.scaladsl
import akka.Done
import akka.actor.testkit.typed.TestException
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.Behavior
import akka.actor.typed.MessageAdaptionFailure
import akka.actor.typed.PreRestart
import akka.actor.typed.Signal
import akka.actor.typed.Terminated
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.Promise
object AdaptationFailureSpec {
def emptyAbstractBehavior: Behavior[Any] = Behaviors.setup(new EmptyAbstractBehavior(_))
class EmptyAbstractBehavior(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) {
def onMessage(msg: Any): Behavior[Any] = this
}
def abstractBehaviorHandlingOtherSignals: Behavior[Any] = Behaviors.setup(new AbstractBehaviorHandlingOtherSignals(_))
class AbstractBehaviorHandlingOtherSignals(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) {
def onMessage(msg: Any): Behavior[Any] = this
override def onSignal: PartialFunction[Signal, Behavior[Any]] = {
case PreRestart => Behaviors.same
}
}
def abstractBehaviorHandlingMessageAdaptionFailure: Behavior[Any] =
Behaviors.setup(new AbstractBehaviorHandlingMessageAdaptionFailure(_))
class AbstractBehaviorHandlingMessageAdaptionFailure(ctx: ActorContext[Any]) extends AbstractBehavior[Any](ctx) {
def onMessage(msg: Any): Behavior[Any] = this
override def onSignal: PartialFunction[Signal, Behavior[Any]] = {
case MessageAdaptionFailure(_) => Behaviors.same
}
}
}
class AdaptationFailureSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
import AdaptationFailureSpec._
val crashingBehaviors: List[(String, Behavior[Any])] =
"receive" -> Behaviors.receive[Any]((_, _) => Behaviors.same) ::
"receiveMessage" -> Behaviors.receiveMessage[Any](_ => Behaviors.same) ::
"receivePartial" -> Behaviors.receivePartial[Any](PartialFunction.empty) ::
"receiveSignal" -> Behaviors.receiveSignal[Any](PartialFunction.empty) ::
"receiveSignal not catching adaption failure" ->
Behaviors.receiveSignal[Any] {
case (_, PreRestart) => Behaviors.same
} ::
"AbstractBehavior" -> emptyAbstractBehavior ::
"AbstractBehavior handling other signals" -> abstractBehaviorHandlingOtherSignals ::
Nil
val nonCrashingBehaviors: List[(String, Behavior[Any])] =
"empty" -> Behaviors.empty[Any] ::
"ignore" -> Behaviors.ignore[Any] ::
"receiveSignal catching adaption failure" ->
Behaviors.receiveSignal[Any] {
case (_, MessageAdaptionFailure(_)) => Behaviors.same
} ::
"AbstractBehavior handling MessageAdaptionFailure" -> abstractBehaviorHandlingMessageAdaptionFailure ::
Nil
"Failure in an adapter" must {
crashingBehaviors.foreach {
case (name, behavior) =>
s"default to crash the actor or $name" in {
val probe = createTestProbe()
val ref = spawn(Behaviors.setup[Any] { ctx =>
val adapter = ctx.messageAdapter[Any](_ => throw TestException("boom"))
adapter ! "go boom"
behavior
})
probe.expectTerminated(ref)
}
}
nonCrashingBehaviors.foreach {
case (name, behavior) =>
s"ignore the failure for $name" in {
val probe = createTestProbe[Any]()
val threw = Promise[Done]()
val ref = spawn(Behaviors.setup[Any] { ctx =>
val adapter = ctx.messageAdapter[Any] { _ =>
threw.success(Done)
throw TestException("boom")
}
adapter ! "go boom"
behavior
})
spawn(Behaviors.setup[Any] { ctx =>
ctx.watch(ref)
Behaviors.receiveSignal {
case (_, Terminated(`ref`)) =>
probe.ref ! "actor-stopped"
Behaviors.same
}
})
probe.expectNoMessage()
}
}
}
}

View file

@ -4,10 +4,6 @@
package akka.actor.typed
import scala.annotation.switch
import scala.annotation.tailrec
import scala.reflect.ClassTag
import akka.actor.InvalidMessageException
import akka.actor.typed.internal.BehaviorImpl
import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
@ -17,6 +13,10 @@ import akka.actor.typed.internal.InterceptorImpl
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import scala.annotation.switch
import scala.annotation.tailrec
import scala.reflect.ClassTag
/**
* The behavior of an actor defines how it reacts to the messages that it
* receives. The message may either be of the type that the Actor declares

View file

@ -107,3 +107,9 @@ final class ChildFailed(ref: ActorRef[Nothing], val cause: Throwable) extends Te
case _ => false
}
}
/**
* Signal passed to the actor when a message adapter has thrown an exception adapting an incoming message.
* Default signal handlers will re-throw the exception so that such failures are handled by supervision.
*/
final case class MessageAdaptionFailure(exception: Throwable) extends Signal

View file

@ -61,7 +61,8 @@ private[akka] object BehaviorTags {
def failed[T](cause: Throwable): Behavior[T] = new FailedBehavior(cause).asInstanceOf[Behavior[T]]
val unhandledSignal: PartialFunction[(TypedActorContext[Nothing], Signal), Behavior[Nothing]] = {
case (_, _) => UnhandledBehavior
case (_, MessageAdaptionFailure(ex)) => throw ex
case (_, _) => UnhandledBehavior
}
private object EmptyBehavior extends Behavior[Any](BehaviorTags.EmptyBehavior) {
@ -126,10 +127,11 @@ private[akka] object BehaviorTags {
BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
extends ExtensibleBehavior[T] {
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] =
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = {
onSignal.applyOrElse(
(ctx.asScala, msg),
BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
}
override def receive(ctx: AC[T], msg: T) = onMessage(ctx.asScala, msg)
@ -149,10 +151,11 @@ private[akka] object BehaviorTags {
override def receive(ctx: AC[T], msg: T) = onMessage(msg)
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] =
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = {
onSignal.applyOrElse(
(ctx.asScala, msg),
BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
}
override def toString = s"ReceiveMessage(${LineNumbers(onMessage)})"
}

View file

@ -8,21 +8,20 @@ package adapter
import java.lang.reflect.InvocationTargetException
import akka.actor.{ ActorInitializationException, ActorRefWithCell }
import akka.{ actor => classic }
import akka.actor.typed.internal.BehaviorImpl.DeferredBehavior
import akka.actor.typed.internal.BehaviorImpl.StoppedBehavior
import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException
import akka.annotation.InternalApi
import scala.annotation.tailrec
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.Exception.Catcher
import scala.annotation.switch
import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg
import akka.actor.typed.internal.adapter.ActorAdapter.TypedActorFailedException
import akka.actor.ActorInitializationException
import akka.actor.ActorRefWithCell
import akka.annotation.InternalApi
import akka.util.OptionVal
import akka.{ actor => classic }
import scala.annotation.switch
import scala.annotation.tailrec
import scala.util.control.Exception.Catcher
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -181,15 +180,16 @@ import akka.util.OptionVal
}
private def withSafelyAdapted[U, V](adapt: () => U)(body: U => V): Unit = {
Try(adapt()) match {
case Success(null) =>
try {
val a = adapt()
if (a != null) body(a)
else
ctx.log.warn(
"Adapter function returned null which is not valid as an actor message, ignoring. This can happen for example when using pipeToSelf and returning null from the adapt function. Null value is ignored and not passed on to actor.")
case Success(a) =>
body(a)
case Failure(ex) =>
ctx.log.error(s"Exception thrown out of adapter. Stopping myself. ${ex.getMessage}", ex)
context.stop(self)
} catch {
case NonFatal(ex) =>
// pass it on through the signal handler chain giving supervision a chance to deal with it
handleSignal(MessageAdaptionFailure(ex))
}
}

View file

@ -4,6 +4,8 @@
package akka.actor.typed.javadsl
import akka.actor.typed.MessageAdaptionFailure
import scala.annotation.tailrec
import akka.japi.function.Creator
import akka.japi.function.{ Function => JFunction }
@ -24,9 +26,16 @@ final class ReceiveBuilder[T] private (
private var messageHandlers: List[ReceiveBuilder.Case[T, T]],
private var signalHandlers: List[ReceiveBuilder.Case[T, Signal]]) {
import ReceiveBuilder.Case
import ReceiveBuilder._
def build(): Receive[T] = new BuiltReceive[T](messageHandlers.reverse, signalHandlers.reverse)
def build(): Receive[T] = {
// signal handlers will often be empty so optimize for that
val builtSignalHandlers =
if (signalHandlers.isEmpty) defaultSignalHandlers[T]
else (adapterExceptionSignalHandler[T] :: signalHandlers).reverse
new BuiltReceive[T](messageHandlers.reverse, builtSignalHandlers)
}
/**
* Add a new case to the message handling.
@ -158,6 +167,26 @@ object ReceiveBuilder {
test: OptionVal[JPredicate[MT]],
handler: JFunction[MT, Behavior[BT]])
/** INTERNAL API */
@InternalApi
private val _adapterExceptionSignalHandler = Case[Any, MessageAdaptionFailure](
OptionVal.Some(classOf[MessageAdaptionFailure]),
OptionVal.None,
failure => throw failure.exception)
/** INTERNAL API */
@InternalApi
private def adapterExceptionSignalHandler[T]: Case[T, Signal] =
_adapterExceptionSignalHandler.asInstanceOf[Case[T, Signal]]
/** INTERNAL API */
@InternalApi
private val _defaultSignalHandlers = adapterExceptionSignalHandler :: Nil
/** INTERNAL API */
@InternalApi
private def defaultSignalHandlers[T] = _defaultSignalHandlers.asInstanceOf[List[Case[T, Signal]]]
}
/**

View file

@ -4,6 +4,7 @@
package akka.actor.typed.scaladsl
import akka.actor.typed.MessageAdaptionFailure
import akka.actor.typed.{ Behavior, ExtensibleBehavior, Signal, TypedActorContext }
/**
@ -85,6 +86,9 @@ abstract class AbstractBehavior[T](protected val context: ActorContext[T]) exten
@throws(classOf[Exception])
override final def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] = {
checkRightContext(ctx)
onSignal.applyOrElse(msg, { case _ => Behaviors.unhandled }: PartialFunction[Signal, Behavior[T]])
onSignal.applyOrElse(msg, {
case MessageAdaptionFailure(ex) => throw ex
case _ => Behaviors.unhandled
}: PartialFunction[Signal, Behavior[T]])
}
}