Adapted message failure supervision bugfix (#29847)

This commit is contained in:
Johan Andrén 2021-01-13 20:58:48 +01:00 committed by GitHub
parent feec7aa9b1
commit ba3af3ea46
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 18 deletions

View file

@ -10,27 +10,26 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.control.NoStackTrace
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.event.Level
import akka.actor.ActorInitializationException
import akka.actor.Dropped
import akka.actor.testkit.typed._
import akka.actor.testkit.typed.scaladsl._
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.SupervisorStrategy.Resume
import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.Behaviors._
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import org.scalatest.wordspec.AnyWordSpecLike
import org.slf4j.event.Level
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.control.NoStackTrace
object SupervisionSpec {
@ -1361,6 +1360,33 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
probe.expectMessage("message")
}
"apply the right nested supervision to adapted message failure" in {
val signalProbe = createTestProbe[String]()
val behavior =
Behaviors
.receivePartial[String] {
case (ctx, "adapt-fail") =>
val adapter = ctx.messageAdapter[String](_ => "throw-test-exception")
adapter ! "throw-test-exception"
Behaviors.same
case (_, "throw-test-exception") =>
throw TestException("boom")
}
.receiveSignal {
case (_, signal @ (PreRestart | PostStop)) =>
signalProbe.ref ! signal.toString
Behaviors.same
}
// restart on all exceptions, stop on specific exception subtype
val ref = testKit.spawn(
supervise(supervise(behavior).onFailure[TestException](SupervisorStrategy.stop))
.onFailure[Exception](SupervisorStrategy.restart))
ref ! "adapt-fail"
signalProbe.expectMessage("PostStop")
signalProbe.expectTerminated(ref)
}
}
val allStrategies = Seq(

View file

@ -185,16 +185,23 @@ import akka.util.OptionVal
}
private def withSafelyAdapted[U, V](adapt: () => U)(body: U => V): Unit = {
try {
val a = adapt()
if (a != null) body(a)
else
ctx.log.warn(
"Adapter function returned null which is not valid as an actor message, ignoring. This can happen for example when using pipeToSelf and returning null from the adapt function. Null value is ignored and not passed on to actor.")
var failed = false
val adapted: U = try {
adapt()
} catch {
case NonFatal(ex) =>
// pass it on through the signal handler chain giving supervision a chance to deal with it
handleSignal(MessageAdaptionFailure(ex))
// Signal handler should actually throw so this is mostly to keep compiler happy (although a user could override
// the MessageAdaptionFailure handling to do something weird)
failed = true
null.asInstanceOf[U]
}
if (!failed) {
if (adapted != null) body(adapted)
else
ctx.log.warn(
"Adapter function returned null which is not valid as an actor message, ignoring. This can happen for example when using pipeToSelf and returning null from the adapt function. Null value is ignored and not passed on to actor.")
}
}