message adapter catching too much (#25110)

This commit is contained in:
Patrik Nordwall 2018-05-22 11:14:53 +02:00 committed by Johan Andrén
parent 00f07c2429
commit 8f40dc7f03
2 changed files with 94 additions and 48 deletions

View file

@ -6,8 +6,10 @@ package akka.actor.typed.scaladsl
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.PostStop
import akka.actor.typed.Props
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.TestException
import akka.actor.typed.TypedAkkaSpecWithShutdown
import akka.testkit.EventFilter
@ -51,7 +53,7 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
val probe = TestProbe[AnotherPong]()
val snitch = Behaviors.setup[AnotherPong] { (ctx)
val snitch = Behaviors.setup[AnotherPong] { ctx
val replyTo = ctx.messageAdapter[Response](_
AnotherPong(ctx.self.path.name, Thread.currentThread().getName))
@ -62,10 +64,9 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
AnotherPong(ctx.self.path.name, Thread.currentThread().getName))
pingPong ! Ping(replyTo2)
Behaviors.receive {
case (_, anotherPong: AnotherPong)
probe.ref ! anotherPong
Behaviors.same
Behaviors.receiveMessage { anotherPong
probe.ref ! anotherPong
Behaviors.same
}
}
@ -91,20 +92,18 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
case class Wrapped(qualifier: String, response: Response)
val pingPong = spawn(Behaviors.receive[Ping] { (_, msg)
msg match {
case Ping1(sender)
sender ! Pong1("hello-1")
Behaviors.same
case Ping2(sender)
sender ! Pong2("hello-2")
Behaviors.same
}
val pingPong = spawn(Behaviors.receiveMessage[Ping] {
case Ping1(sender)
sender ! Pong1("hello-1")
Behaviors.same
case Ping2(sender)
sender ! Pong2("hello-2")
Behaviors.same
})
val probe = TestProbe[Wrapped]()
val snitch = Behaviors.setup[Wrapped] { (ctx)
val snitch = Behaviors.setup[Wrapped] { ctx
ctx.messageAdapter[Response](pong Wrapped(qualifier = "wrong", pong)) // this is replaced
val replyTo1: ActorRef[Response] = ctx.messageAdapter(pong Wrapped(qualifier = "1", pong))
@ -112,10 +111,9 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
pingPong ! Ping1(replyTo1)
pingPong ! Ping2(replyTo2)
Behaviors.receive {
case (_, wrapped)
probe.ref ! wrapped
Behaviors.same
Behaviors.receiveMessage { wrapped
probe.ref ! wrapped
Behaviors.same
}
}
@ -135,21 +133,19 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
case class Wrapped(qualifier: String, response: Response)
val pingPong = spawn(Behaviors.receive[Ping] { (_, msg)
msg match {
case Ping1(sender)
sender ! Pong1("hello-1")
Behaviors.same
case Ping2(sender)
// doing something terribly wrong
sender ! Pong2("hello-2")
Behaviors.same
}
val pingPong = spawn(Behaviors.receiveMessage[Ping] {
case Ping1(sender)
sender ! Pong1("hello-1")
Behaviors.same
case Ping2(sender)
// doing something terribly wrong
sender ! Pong2("hello-2")
Behaviors.same
})
val probe = TestProbe[Wrapped]()
val snitch = Behaviors.setup[Wrapped] { (ctx)
val snitch = Behaviors.setup[Wrapped] { ctx
val replyTo1 = ctx.messageAdapter[Pong1](pong Wrapped(qualifier = "1", pong))
pingPong ! Ping1(replyTo1)
@ -158,10 +154,9 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
pingPong ! Ping2(replyTo1.asInstanceOf[ActorRef[Pong2]])
pingPong ! Ping1(replyTo1)
Behaviors.receive {
case (_, wrapped)
probe.ref ! wrapped
Behaviors.same
Behaviors.receiveMessage { wrapped
probe.ref ! wrapped
Behaviors.same
}
}
@ -179,14 +174,14 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
case class Pong(greeting: String)
case class Wrapped(count: Int, response: Pong)
val pingPong = spawn(Behaviors.receive[Ping] { (_, ping)
val pingPong = spawn(Behaviors.receiveMessage[Ping] { ping
ping.sender ! Pong("hello")
Behaviors.same
})
val probe = TestProbe[Any]()
val snitch = Behaviors.setup[Wrapped] { (ctx)
val snitch = Behaviors.setup[Wrapped] { ctx
var count = 0
val replyTo = ctx.messageAdapter[Pong] { pong
@ -198,10 +193,9 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
pingPong ! Ping(replyTo)
}
Behaviors.receive[Wrapped] {
case (_, wrapped)
probe.ref ! wrapped
Behaviors.same
Behaviors.receiveMessage[Wrapped] { wrapped
probe.ref ! wrapped
Behaviors.same
}.receiveSignal {
case (_, PostStop)
probe.ref ! "stopped"
@ -219,9 +213,56 @@ class MessageAdapterSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
probe.expectMessage(Wrapped(2, Pong("hello")))
// exception was thrown for 3
// FIXME One thing to be aware of is that the supervision strategy of the Behavior is not
// used for exceptions from adapters. Should we instead catch, log, unhandled, and resume?
// It's kind of "before" the message arrives.
probe.expectMessage("stopped")
}
"not catch exception thrown after adapter, when processing the message" in {
case class Ping(sender: ActorRef[Pong])
case class Pong(greeting: String)
case class Wrapped(response: Pong)
val pingPong = spawn(Behaviors.receiveMessage[Ping] { ping
ping.sender ! Pong("hello")
Behaviors.same
})
val probe = TestProbe[Any]()
val snitch = Behaviors.setup[Wrapped] { ctx
val replyTo = ctx.messageAdapter[Pong] { pong
Wrapped(pong)
}
(1 to 5).foreach { _
pingPong ! Ping(replyTo)
}
def behv(count: Int): Behavior[Wrapped] = Behaviors.receiveMessage[Wrapped] { wrapped
probe.ref ! count
if (count == 3) {
throw new TestException("boom")
}
behv(count + 1)
}.receiveSignal {
case (_, PostStop)
probe.ref ! "stopped"
Behaviors.same
}
behv(count = 1)
}
EventFilter.warning(pattern = ".*received dead letter.*", occurrences = 2).intercept {
// Not expecting "Exception thrown out of adapter. Stopping myself"
EventFilter[TestException](message = "boom", occurrences = 1).intercept {
spawn(snitch)
}
}
probe.expectMessage(1)
probe.expectMessage(2)
probe.expectMessage(3)
// exception was thrown for 3
probe.expectMessage("stopped")
}

View file

@ -7,10 +7,13 @@ package internal
package adapter
import scala.annotation.tailrec
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.{ actor a }
import akka.annotation.InternalApi
import akka.util.OptionVal
import scala.util.control.NonFatal
/**
@ -103,13 +106,15 @@ import scala.util.control.NonFatal
handle(ctx.messageAdapters)
}
private def withSafelyAdapted[U, V](adapt: () U)(body: U V): Unit =
try body(adapt())
catch {
case NonFatal(ex)
private def withSafelyAdapted[U, V](adapt: () U)(body: U V): Unit = {
Try(adapt()) match {
case Success(a)
body(a)
case Failure(ex)
log.error(ex, "Exception thrown out of adapter. Stopping myself.")
context.stop(self)
}
}
override def unhandled(msg: Any): Unit = msg match {
case t @ Terminated(ref) throw DeathPactException(ref)