Merge pull request #24475 from jrudolph/jr/24456-adapter-exception-handling

#24456 Handle adaption exceptions explicitly
This commit is contained in:
Patrik Nordwall 2018-02-01 19:42:34 +01:00 committed by GitHub
commit 90b8774759
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 11 deletions

View file

@ -8,12 +8,16 @@ import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors._
import akka.actor.typed.scaladsl.adapter._
import akka.testkit.EventFilter
import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.TestProbe
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
import scala.concurrent.{ ExecutionContext, TimeoutException }
import scala.util.Success
object AskSpec {
sealed trait Msg
@ -21,7 +25,9 @@ object AskSpec {
final case class Stop(replyTo: ActorRef[Unit]) extends Msg
}
class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures {
class AskSpec
extends TestKit("AskSpec", ConfigFactory.parseString("akka.loggers = [ akka.testkit.TestEventListener ]"))
with TypedAkkaSpec with ScalaFutures {
import AskSpec._
@ -41,6 +47,8 @@ class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures {
"must fail the future if the actor is already terminated" in {
val ref = spawn(behavior)
(ref ? Stop).futureValue
val probe = TestProbe()
probe.expectTerminated(ref, probe.remainingOrDefault)
val answer = ref ? Foo("bar")
val result = answer.failed.futureValue
result shouldBe a[TimeoutException]
@ -104,5 +112,46 @@ class AskSpec extends TestKit("AskSpec") with TypedAkkaSpec with ScalaFutures {
akka.testkit.TestKit.shutdownActorSystem(untypedSystem)
}
}
"fail asking actor if responder function throws" in {
case class Question(reply: ActorRef[Long])
val probe = TestProbe[AnyRef]("probe")
val behv =
Behaviors.immutable[String] {
case (ctx, "start-ask")
ctx.ask[Question, Long](probe.ref)(Question(_)) {
case Success(42L)
throw new RuntimeException("Unsupported number")
case _ "test"
}
Behavior.same
case (ctx, "test")
probe.ref ! "got-test"
Behavior.same
case (ctx, "get-state")
probe.ref ! "running"
Behavior.same
}
val ref = spawn(behv)
ref ! "test"
probe.expectMessage("got-test")
ref ! "start-ask"
val Question(replyRef) = probe.expectMessageType[Question]
replyRef ! 0L
probe.expectMessage("got-test")
ref ! "start-ask"
val Question(replyRef2) = probe.expectMessageType[Question]
EventFilter[RuntimeException](message = "Exception thrown out of adapter. Stopping myself.", occurrences = 1).intercept {
replyRef2 ! 42L
}(system.toUntyped)
probe.expectTerminated(ref, probe.remainingOrDefault)
}
}
}

View file

@ -13,7 +13,6 @@ import scala.util.Try
* Message wrapper used to allow ActorContext.ask to map the response inside the asking actor.
*/
@InternalApi
private[akka] final class AskResponse[U, T](result: Try[U], adapt: Try[U] T) {
def adapted: T = adapt(result)
private[akka] final class AskResponse[U, T](result: Try[U], adapter: Try[U] T) {
def adapt(): T = adapter(result)
}

View file

@ -16,8 +16,8 @@ import akka.annotation.InternalApi
* function. Used by `ActorContext.spawnMessageAdapter` so that the function is
* applied in the "parent" actor (for better thread safetey)..
*/
@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapt: U T) {
def adapted: T = adapt(msg)
@InternalApi private[akka] final case class AdaptMessage[U, T](msg: U, adapter: U T) {
def adapt(): T = adapter(msg)
}
// FIXME move AskResponse in other PR

View file

@ -10,10 +10,12 @@ import akka.{ actor ⇒ a }
import akka.annotation.InternalApi
import akka.util.OptionVal
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
@InternalApi private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor {
@InternalApi private[typed] class ActorAdapter[T](_initialBehavior: Behavior[T]) extends a.Actor with a.ActorLogging {
import Behavior._
import ActorRefAdapter.toUntyped
@ -40,9 +42,9 @@ import akka.util.OptionVal
case a.ReceiveTimeout
next(Behavior.interpretMessage(behavior, ctx, ctx.receiveTimeoutMsg), ctx.receiveTimeoutMsg)
case wrapped: AskResponse[Any, T] @unchecked
handleMessage(wrapped.adapted)
withSafelyAdapted(() wrapped.adapt())(handleMessage)
case wrapped: AdaptMessage[Any, T] @unchecked
wrapped.adapted match {
withSafelyAdapted(() wrapped.adapt()) {
case AdaptWithRegisteredMessageAdapter(msg)
adaptAndHandle(msg)
case msg: T @unchecked
@ -89,8 +91,7 @@ import akka.util.OptionVal
unhandled(msg)
case (clazz, f) :: tail
if (clazz.isAssignableFrom(msg.getClass)) {
val adaptedMsg = f(msg)
handleMessage(adaptedMsg)
withSafelyAdapted(() f(msg))(handleMessage)
} else
handle(tail) // recursive
}
@ -98,6 +99,14 @@ import akka.util.OptionVal
handle(ctx.messageAdapters)
}
private def withSafelyAdapted[U, V](adapt: () U)(body: U V): Unit =
try body(adapt())
catch {
case NonFatal(ex)
log.error(ex, "Exception thrown out of adapter. Stopping myself.")
context.stop(self)
}
override def unhandled(msg: Any): Unit = msg match {
case Terminated(ref) throw a.DeathPactException(toUntyped(ref))
case msg: Signal // that's ok