ClassTag in BehaviorInterceptor, #25887 (#27148)

* Always be explicit about what message types an interceptor can handle, to avoid
  ClassCastException if another message type is passing. That may happen when
  the inner behavior understands other messages than it says in it's declared
  behavior type by using narrow. EventSourcedBehaviorImpl is an example.
* Minimized failing tests
* Supervision interceptor is of type Any since failures of all messages must be
  handled
* Changed PoisonPillInterceptor to only intercept signals
* rename type params to Outer and Inner
* separate BehaviorSignalInterceptor
  * which only intercepts signals and messages bypass, e.g. PoisonPillInterceptor
  * also made aroundSignal optional to override in BehaviorInterceptor
* Add test for interceptors combined with EventSourcedBehavior
* ClassTag not needed for LogMessagesInterceptor
  * since it can handle Any
* test supervision of different message type
* clarify low level
* docs for interceptMessageClass param and ClassTag
* remove O type parameter in supervision
* remove extra setup for RestartSupervisor, already factory
* mention in migration guide
This commit is contained in:
Patrik Nordwall 2019-07-02 17:49:48 +02:00 committed by GitHub
parent a36ec1260d
commit 64fa2979ea
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 558 additions and 224 deletions

View file

@ -167,7 +167,7 @@ public class AsyncTestingExampleTest
});
TestProbe<Message> probe = testKit.createTestProbe();
ActorRef<Message> mockedPublisher =
testKit.spawn(Behaviors.monitor(probe.ref(), mockedBehavior));
testKit.spawn(Behaviors.monitor(Message.class, probe.ref(), mockedBehavior));
// test our component
Producer producer = new Producer(testKit.scheduler(), mockedPublisher);

View file

@ -41,7 +41,7 @@ public class ActorCompile {
Behavior<MyMsg> actor6 =
intercept(
() ->
new BehaviorInterceptor<MyMsg, MyMsg>() {
new BehaviorInterceptor<MyMsg, MyMsg>(MyMsg.class) {
@Override
public Behavior<MyMsg> aroundReceive(
TypedActorContext<MyMsg> context, MyMsg message, ReceiveTarget<MyMsg> target) {
@ -60,9 +60,9 @@ public class ActorCompile {
setup(
context -> {
final ActorRef<MyMsg> self = context.getSelf();
return monitor(self, ignore());
return monitor(MyMsg.class, self, ignore());
});
Behavior<MyMsg> actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x));
Behavior<MyMsg> actor9 = widened(MyMsgA.class, actor7, pf -> pf.match(MyMsgA.class, x -> x));
Behavior<MyMsg> actor10 =
Behaviors.receive((context, message) -> stopped(() -> {}), (context, signal) -> same());

View file

@ -21,7 +21,7 @@ public class InterceptTest extends JUnitSuite {
public void interceptMessage() {
final TestProbe<String> interceptProbe = testKit.createTestProbe();
BehaviorInterceptor<String, String> interceptor =
new BehaviorInterceptor<String, String>() {
new BehaviorInterceptor<String, String>(String.class) {
@Override
public Behavior<String> aroundReceive(
TypedActorContext<String> ctx, String msg, ReceiveTarget<String> target) {
@ -59,15 +59,10 @@ public class InterceptTest extends JUnitSuite {
static class B implements Message {}
@Test
public void interceptMessagesSelectively() {
public void interceptMessageSubclasses() {
final TestProbe<Message> interceptProbe = testKit.createTestProbe();
BehaviorInterceptor<Message, Message> interceptor =
new BehaviorInterceptor<Message, Message>() {
@Override
public Class<? extends Message> interceptMessageType() {
return B.class;
}
new BehaviorInterceptor<Message, Message>(Message.class) {
@Override
public Behavior<Message> aroundReceive(
@ -96,6 +91,7 @@ public class InterceptTest extends JUnitSuite {
ref.tell(new A());
ref.tell(new B());
interceptProbe.expectMessageClass(A.class);
probe.expectMessageClass(A.class);
interceptProbe.expectMessageClass(B.class);
probe.expectMessageClass(B.class);

View file

@ -73,10 +73,10 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit("""
import ActorSpecMessages._
def decoration[T]: Behavior[T] => Behavior[T]
def decoration[T: ClassTag]: Behavior[T] => Behavior[T]
implicit class BehaviorDecorator[T](behavior: Behavior[T])(implicit ev: ClassTag[T]) {
def decorate: Behavior[T] = decoration[T](behavior)
def decorate: Behavior[T] = decoration[T](ev)(behavior)
}
"An ActorContext" must {
@ -680,33 +680,31 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit("""
class NormalActorContextSpec extends ActorContextSpec {
override def decoration[T] = x => x
override def decoration[T: ClassTag] = x => x
}
class WidenActorContextSpec extends ActorContextSpec {
override def decoration[T] = b => b.widen { case x => x }
override def decoration[T: ClassTag] = b => b.widen { case x => x }
}
class DeferredActorContextSpec extends ActorContextSpec {
override def decoration[T] = b => Behaviors.setup(_ => b)
override def decoration[T: ClassTag] = b => Behaviors.setup(_ => b)
}
class NestedDeferredActorContextSpec extends ActorContextSpec {
override def decoration[T] = b => Behaviors.setup(_ => Behaviors.setup(_ => b))
override def decoration[T: ClassTag] = b => Behaviors.setup(_ => Behaviors.setup(_ => b))
}
class InterceptActorContextSpec extends ActorContextSpec {
import BehaviorInterceptor._
def tap[T] = new BehaviorInterceptor[T, T] {
def tap[T: ClassTag] = new BehaviorInterceptor[T, T] {
override def aroundReceive(context: TypedActorContext[T], message: T, target: ReceiveTarget[T]): Behavior[T] =
target(context, message)
override def aroundSignal(context: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] =
target(context, signal)
}
override def decoration[T]: Behavior[T] => Behavior[T] = b => Behaviors.intercept[T, T](() => tap)(b)
override def decoration[T: ClassTag]: Behavior[T] => Behavior[T] = b => Behaviors.intercept[T, T](() => tap)(b)
}

View file

@ -589,7 +589,7 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable {
class WidenedJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with Siphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Command]("widenedListener")
JBehaviors.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x => {
JBehaviors.widened(classOf[Command], super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x => {
inbox.ref ! x
x
})))) -> inbox

View file

@ -10,10 +10,12 @@ import akka.testkit.EventFilter
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.scaladsl.Behaviors
import org.scalatest.WordSpecLike
import scala.concurrent.duration._
import akka.actor.ActorInitializationException
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.internal.PoisonPill
import akka.actor.typed.internal.PoisonPillInterceptor
object InterceptSpec {
final case class Msg(hello: String, replyTo: ActorRef[String])
@ -28,16 +30,48 @@ object InterceptSpec {
target(context, message)
}
override def aroundSignal(
context: TypedActorContext[String],
signal: Signal,
target: SignalTarget[String]): Behavior[String] = {
target(context, signal)
}
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean =
other.isInstanceOf[SameTypeInterceptor]
}
// This is similar to how EventSourcedBehavior is implemented
object MultiProtocol {
final case class ExternalResponse(s: String)
final case class Command(s: String)
sealed trait InternalProtocol
object InternalProtocol {
final case class WrappedCommand(c: Command) extends InternalProtocol
final case class WrappedExternalResponse(r: ExternalResponse) extends InternalProtocol
}
private class ProtocolTransformer extends BehaviorInterceptor[Any, InternalProtocol] {
override def aroundReceive(
ctx: TypedActorContext[Any],
msg: Any,
target: BehaviorInterceptor.ReceiveTarget[InternalProtocol]): Behavior[InternalProtocol] = {
val wrapped = msg match {
case c: Command => InternalProtocol.WrappedCommand(c)
case r: ExternalResponse => InternalProtocol.WrappedExternalResponse(r)
}
target(ctx, wrapped)
}
}
def apply(probe: ActorRef[String]): Behavior[Command] = {
Behaviors
.intercept(() => new ProtocolTransformer)(Behaviors.receiveMessage[InternalProtocol] {
case InternalProtocol.WrappedCommand(cmd) =>
probe ! cmd.s
Behaviors.same
case InternalProtocol.WrappedExternalResponse(rsp) =>
probe ! rsp.s
Behaviors.same
})
.narrow
}
}
}
class InterceptSpec extends ScalaTestWithActorTestKit("""
@ -60,14 +94,6 @@ class InterceptSpec extends ScalaTestWithActorTestKit("""
probe ! ("after " + message)
b
}
override def aroundSignal(
context: TypedActorContext[String],
signal: Signal,
target: SignalTarget[String]): Behavior[String] = {
target(context, signal)
}
// keeping the instance equality as "isSame" for these
}
@ -187,12 +213,6 @@ class InterceptSpec extends ScalaTestWithActorTestKit("""
message: String,
target: ReceiveTarget[String]): Behavior[String] =
target(context, message)
def aroundSignal(
context: TypedActorContext[String],
signal: Signal,
target: SignalTarget[String]): Behavior[String] =
target(context, signal)
}
val innerBehaviorStarted = new AtomicBoolean(false)
@ -276,7 +296,30 @@ class InterceptSpec extends ScalaTestWithActorTestKit("""
}
"be useful for implementing PoisonPill" in {
"be useful for implementing signal based PoisonPill" in {
def inner(count: Int): Behavior[Msg] = Behaviors.receiveMessage {
case Msg(hello, replyTo) =>
replyTo ! s"$hello-$count"
inner(count + 1)
}
val decorated: Behavior[Msg] =
Behaviors.intercept(() => new PoisonPillInterceptor[Msg])(inner(0))
val ref = spawn(decorated)
val probe = TestProbe[String]()
ref ! Msg("hello", probe.ref)
probe.expectMessage("hello-0")
ref ! Msg("hello", probe.ref)
probe.expectMessage("hello-1")
ref.unsafeUpcast[Any] ! PoisonPill
probe.expectTerminated(ref, probe.remainingOrDefault)
}
"be useful for implementing custom message based PoisonPill" in {
def inner(count: Int): Behavior[Msg] = Behaviors.receiveMessage {
case Msg(hello, replyTo) =>
@ -295,12 +338,6 @@ class InterceptSpec extends ScalaTestWithActorTestKit("""
case _ => Behaviors.unhandled
}
override def aroundSignal(
context: TypedActorContext[Any],
signal: Signal,
target: SignalTarget[Msg]): Behavior[Msg] =
target.apply(context, signal)
}
val decorated: Behavior[Msg] =
@ -318,34 +355,28 @@ class InterceptSpec extends ScalaTestWithActorTestKit("""
probe.expectTerminated(ref, probe.remainingOrDefault)
}
"be able to intercept a subset of the messages" in {
"be able to intercept message subclasses" in {
trait Message
class A extends Message
class B extends Message
val interceptProbe = TestProbe[Message]()
val partialInterceptor: BehaviorInterceptor[Message, Message] = new BehaviorInterceptor[Message, Message] {
val interceptor: BehaviorInterceptor[Message, Message] =
new BehaviorInterceptor[Message, Message] {
override def interceptMessageType = classOf[B]
override def aroundReceive(
ctx: TypedActorContext[Message],
msg: Message,
target: ReceiveTarget[Message]): Behavior[Message] = {
interceptProbe.ref ! msg
target(ctx, msg)
}
override def aroundReceive(
ctx: TypedActorContext[Message],
msg: Message,
target: ReceiveTarget[Message]): Behavior[Message] = {
interceptProbe.ref ! msg
target(ctx, msg)
}
override def aroundSignal(
ctx: TypedActorContext[Message],
signal: Signal,
target: SignalTarget[Message]): Behavior[Message] =
target(ctx, signal)
}
val probe = TestProbe[Message]()
val ref = spawn(Behaviors.intercept(() => partialInterceptor)(Behaviors.receiveMessage { msg =>
val ref = spawn(Behaviors.intercept(() => interceptor)(Behaviors.receiveMessage { msg =>
probe.ref ! msg
Behaviors.same
}))
@ -353,6 +384,7 @@ class InterceptSpec extends ScalaTestWithActorTestKit("""
ref ! new A
ref ! new B
interceptProbe.expectMessageType[A]
probe.expectMessageType[A]
interceptProbe.expectMessageType[B]
probe.expectMessageType[B]
@ -360,14 +392,8 @@ class InterceptSpec extends ScalaTestWithActorTestKit("""
"intercept PostStop" in {
val probe = TestProbe[String]()
val postStopInterceptor = new BehaviorInterceptor[String, String] {
def aroundReceive(
ctx: TypedActorContext[String],
msg: String,
target: ReceiveTarget[String]): Behavior[String] = {
target(ctx, msg)
}
def aroundSignal(
val postStopInterceptor = new BehaviorSignalInterceptor[String] {
override def aroundSignal(
ctx: TypedActorContext[String],
signal: Signal,
target: SignalTarget[String]): Behavior[String] = {
@ -420,4 +446,71 @@ class InterceptSpec extends ScalaTestWithActorTestKit("""
}
}
"Protocol transformer interceptor" must {
import MultiProtocol._
"be possible to combine with another interceptor" in {
val probe = createTestProbe[String]()
val toUpper = new BehaviorInterceptor[Command, Command] {
override def aroundReceive(
ctx: TypedActorContext[Command],
msg: Command,
target: BehaviorInterceptor.ReceiveTarget[Command]): Behavior[Command] = {
target(ctx, Command(msg.s.toUpperCase()))
}
}
val ref = spawn(Behaviors.intercept(() => toUpper)(MultiProtocol(probe.ref)))
ref ! Command("a")
probe.expectMessage("A")
ref.unsafeUpcast ! ExternalResponse("b")
probe.expectMessage("b") // bypass toUpper interceptor
}
"be possible to combine with widen" in {
val probe = createTestProbe[String]()
val ref = spawn(MultiProtocol(probe.ref).widen[String] {
case s => Command(s.toUpperCase())
})
ref ! "a"
probe.expectMessage("A")
ref.unsafeUpcast ! ExternalResponse("b")
probe.expectMessage("b") // bypass widen interceptor
}
"be possible to combine with MDC" in {
val probe = createTestProbe[String]()
val ref = spawn(Behaviors.setup[Command] { _ =>
Behaviors.withMdc(staticMdc = Map("x" -> "y"), mdcForMessage = (msg: Command) => {
probe.ref ! s"mdc:${msg.s.toUpperCase()}"
Map("msg" -> msg.s.toUpperCase())
}) {
MultiProtocol(probe.ref)
}
})
ref ! Command("a")
probe.expectMessage("mdc:A")
probe.expectMessage("a")
ref.unsafeUpcast ! ExternalResponse("b")
probe.expectMessage("b") // bypass mdc interceptor
}
"be possible to combine with PoisonPillInterceptor" in {
val probe = createTestProbe[String]()
val ref =
spawn(Behaviors.intercept(() => new PoisonPillInterceptor[MultiProtocol.Command])(MultiProtocol(probe.ref)))
ref ! Command("a")
probe.expectMessage("a")
ref.unsafeUpcast ! PoisonPill
probe.expectTerminated(ref, probe.remainingOrDefault)
}
}
}

View file

@ -106,5 +106,15 @@ class LogMessagesSpec extends ScalaTestWithActorTestKit("""
}
}
"log messages of different type" in {
val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.ignore[String])
val ref = spawn(behavior)
EventFilter.debug("received message 13", source = ref.path.toString, occurrences = 1).intercept {
ref.unsafeUpcast[Any] ! 13
}
}
}
}

View file

@ -1084,12 +1084,6 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
message: String,
target: ReceiveTarget[String]): Behavior[String] =
target(context, message)
override def aroundSignal(
context: TypedActorContext[String],
signal: Signal,
target: SignalTarget[String]): Behavior[String] =
target(context, signal)
}
val behv = supervise[String](Behaviors.receiveMessage {
@ -1235,6 +1229,36 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
}
}
"handle exceptions from different message type" in {
val probe = TestProbe[Event]("evt")
val inner: Behavior[Command] = Behaviors
.receiveMessage[Any] {
case Ping(n) =>
probe.ref ! Pong(n)
Behaviors.same
case _ => throw new Exc1
}
.receiveSignal {
case (_, PreRestart) =>
probe.ref ! ReceivedSignal(PreRestart)
Behaviors.same
}
.narrow
val behv = Behaviors.supervise(inner).onFailure[Exc1](SupervisorStrategy.restart)
val ref = spawn(behv)
ref ! Ping(1)
probe.expectMessage(Pong(1))
EventFilter[Exc1](occurrences = 1).intercept {
ref.unsafeUpcast ! "boom"
probe.expectMessage(ReceivedSignal(PreRestart))
}
ref ! Ping(2)
probe.expectMessage(Pong(2))
}
}
val allStrategies = Seq(

View file

@ -74,13 +74,6 @@ class StopSpec extends ScalaTestWithActorTestKit with WordSpecLike {
target: ReceiveTarget[AnyRef]): Behavior[AnyRef] = {
target(context, message)
}
override def aroundSignal(
context: typed.TypedActorContext[AnyRef],
signal: Signal,
target: SignalTarget[AnyRef]): Behavior[AnyRef] = {
target(context, signal)
}
})(Behaviors.stopped { () =>
probe.ref ! Done
})

View file

@ -6,6 +6,7 @@ package akka.actor.typed
import scala.annotation.switch
import scala.annotation.tailrec
import scala.reflect.ClassTag
import akka.actor.InvalidMessageException
import akka.actor.typed.internal.BehaviorImpl
@ -117,7 +118,7 @@ abstract class ExtensibleBehavior[T] extends Behavior[T](BehaviorTags.Extensible
object Behavior {
final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal {
final implicit class BehaviorDecorators[Inner](val behavior: Behavior[Inner]) extends AnyVal {
/**
* Widen the wrapped Behavior by placing a funnel in front of it: the supplied
@ -134,8 +135,12 @@ object Behavior {
* }
* }}}
*
* The `ClassTag` for `Outer` ensures that only messages of this class or a subclass thereof will be
* intercepted. Other message types (e.g. a private protocol) will bypass
* the interceptor and be continue to the inner behavior untouched.
*
*/
def widen[U](matcher: PartialFunction[U, T]): Behavior[U] =
def widen[Outer: ClassTag](matcher: PartialFunction[Outer, Inner]): Behavior[Outer] =
BehaviorImpl.widened(behavior, matcher)
}

View file

@ -4,32 +4,46 @@
package akka.actor.typed
import scala.reflect.ClassTag
import akka.annotation.{ DoNotInherit, InternalApi }
import akka.util.BoxedType
/**
* A behavior interceptor allows for intercepting message and signal reception and perform arbitrary logic -
* transform, filter, send to a side channel etc. It is the core API for decoration of behaviors. Many built-in
* intercepting behaviors are provided through factories in the respective `Behaviors`.
* transform, filter, send to a side channel etc. It is the core API for decoration of behaviors.
*
* If the interceptor does keep mutable state care must be taken to create the instance in a `setup` block
* so that a new instance is created per spawned actor rather than shared among actor instance.
* The `BehaviorInterceptor` API is considered a low level tool for building other features and
* shouldn't be used for "normal" application logic. Several built-in intercepting behaviors
* are provided through factories in the respective `Behaviors`.
*
* @tparam O The outer message type the type of messages the intercepting behavior will accept
* @tparam I The inner message type - the type of message the wrapped behavior accepts
* If the interceptor does keep mutable state care must be taken to create a new instance from
* the factory function of `Behaviors.intercept` so that a new instance is created per spawned
* actor rather than shared among actor instance.
*
* @param interceptMessageClass Ensures that the interceptor will only receive `O` message types.
* If the message is not of this class or a subclass thereof
* (e.g. a private protocol) will bypass the interceptor and be
* continue to the inner behavior untouched.
*
* @tparam Outer The outer message type the type of messages the intercepting behavior will accept
* @tparam Inner The inner message type - the type of message the wrapped behavior accepts
*
* @see [[BehaviorSignalInterceptor]]
*/
abstract class BehaviorInterceptor[O, I] {
abstract class BehaviorInterceptor[Outer, Inner](val interceptMessageClass: Class[Outer]) {
import BehaviorInterceptor._
/**
* Allows for applying the interceptor only to certain message types. Useful if the official protocol and the actual
* protocol of an actor causes problems, for example class cast exceptions for a message not of type `O` that
* the actor still knows how to deal with. Note that this is only possible to use when `O` and `I` are the same type.
*
* @return A subtype of `O` that should be intercepted or `null` to intercept all `O`s.
* Subtypes of `O` matching this are passed directly to the inner behavior without interception.
* Scala API: The `ClassTag` for `Outer` ensures that only messages of this class or a subclass
* thereof will be intercepted. Other message types (e.g. a private protocol) will bypass the
* interceptor and be continue to the inner behavior untouched.
*/
// null for all to avoid having to deal with class tag/explicit class in the default case of no filter
def interceptMessageType: Class[_ <: O] = null
def this()(implicit interceptMessageClassTag: ClassTag[Outer]) =
this({
val runtimeClass = interceptMessageClassTag.runtimeClass
(if (runtimeClass eq null) runtimeClass else BoxedType(runtimeClass)).asInstanceOf[Class[Outer]]
})
/**
* Override to intercept actor startup. To trigger startup of
@ -37,7 +51,7 @@ abstract class BehaviorInterceptor[O, I] {
* @return The returned behavior will be the "started" behavior of the actor used to accept
* the next message or signal.
*/
def aroundStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Behavior[I] =
def aroundStart(ctx: TypedActorContext[Outer], target: PreStartTarget[Inner]): Behavior[Inner] =
target.start(ctx)
/**
@ -47,15 +61,18 @@ abstract class BehaviorInterceptor[O, I] {
*
* @return The behavior for next message or signal
*/
def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[I]): Behavior[I]
def aroundReceive(ctx: TypedActorContext[Outer], msg: Outer, target: ReceiveTarget[Inner]): Behavior[Inner]
/**
* Intercept a signal sent to the running actor. Pass the signal on to the next behavior
* Override to intercept a signal sent to the running actor. Pass the signal on to the next behavior
* in the stack by passing it to `target.apply`.
*
* @return The behavior for next message or signal
*
* @see [[BehaviorSignalInterceptor]]
*/
def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I]
def aroundSignal(ctx: TypedActorContext[Outer], signal: Signal, target: SignalTarget[Inner]): Behavior[Inner] =
target(ctx, signal)
/**
* @return `true` if this behavior logically the same as another behavior interceptor and can therefore be eliminated
@ -110,3 +127,43 @@ object BehaviorInterceptor {
}
}
/**
* A behavior interceptor allows for intercepting signals reception and perform arbitrary logic -
* transform, filter, send to a side channel etc.
*
* The `BehaviorSignalInterceptor` API is considered a low level tool for building other features and
* shouldn't be used for "normal" application logic. Several built-in intercepting behaviors
* are provided through factories in the respective `Behaviors`.
*
* If the interceptor does keep mutable state care must be taken to create a new instance from
* the factory function of `Behaviors.intercept` so that a new instance is created per spawned
* actor rather than shared among actor instance.
*
* @tparam Inner The inner message type - the type of message the wrapped behavior accepts
*
* @see [[BehaviorInterceptor]]
*/
abstract class BehaviorSignalInterceptor[Inner] extends BehaviorInterceptor[Inner, Inner](null) {
import BehaviorInterceptor._
/**
* Only signals and not messages are intercepted by `BehaviorSignalInterceptor`.
*/
final override def aroundReceive(
ctx: TypedActorContext[Inner],
msg: Inner,
target: ReceiveTarget[Inner]): Behavior[Inner] = {
// by using `null` as interceptMessageClass of `BehaviorInterceptor` no messages will pass here
throw new IllegalStateException(s"Unexpected message in ${getClass.getName}, it should only intercept signals.")
}
/**
* Intercept a signal sent to the running actor. Pass the signal on to the next behavior
* in the stack by passing it to `target.apply`.
*
* @return The behavior for next message or signal
*/
override def aroundSignal(ctx: TypedActorContext[Inner], signal: Signal, target: SignalTarget[Inner]): Behavior[Inner]
}

View file

@ -5,6 +5,8 @@
package akka.actor.typed
package internal
import scala.reflect.ClassTag
import akka.util.LineNumbers
import akka.annotation.InternalApi
import akka.actor.typed.{ TypedActorContext => AC }
@ -40,7 +42,7 @@ private[akka] object BehaviorTags {
def as[U]: AC[U] = ctx.asInstanceOf[AC[U]]
}
def widened[O, I](behavior: Behavior[I], matcher: PartialFunction[O, I]): Behavior[O] =
def widened[O: ClassTag, I](behavior: Behavior[I], matcher: PartialFunction[O, I]): Behavior[O] =
intercept(() => WidenedInterceptor(matcher))(behavior)
def same[T]: Behavior[T] = SameBehavior.unsafeCast[T]

View file

@ -4,6 +4,8 @@
package akka.actor.typed.internal
import scala.reflect.ClassTag
import akka.actor.typed
import akka.actor.typed.scaladsl.Behaviors
@ -74,9 +76,10 @@ private[akka] final class InterceptorImpl[O, I](
new InterceptorImpl(interceptor, newNested)
override def receive(ctx: typed.TypedActorContext[O], msg: O): Behavior[O] = {
val interceptMessageType = interceptor.interceptMessageType
// TODO performance optimization could maybe to avoid isAssignableFrom if interceptMessageClass is Class[Object]?
val interceptMessageClass = interceptor.interceptMessageClass
val result =
if (interceptMessageType == null || interceptMessageType.isAssignableFrom(msg.getClass))
if ((interceptMessageClass ne null) && interceptor.interceptMessageClass.isAssignableFrom(msg.getClass))
interceptor.aroundReceive(ctx, msg, receiveTarget)
else
receiveTarget.apply(ctx, msg.asInstanceOf[I])
@ -115,7 +118,8 @@ private[akka] final class InterceptorImpl[O, I](
* INTERNAL API
*/
@InternalApi
private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) extends BehaviorInterceptor[T, T] {
private[akka] final case class MonitorInterceptor[T: ClassTag](actorRef: ActorRef[T])
extends BehaviorInterceptor[T, T] {
import BehaviorInterceptor._
override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
@ -123,10 +127,6 @@ private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) exte
target(ctx, msg)
}
override def aroundSignal(ctx: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
target(ctx, signal)
}
// only once to the same actor in the same behavior stack
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match {
case MonitorInterceptor(`actorRef`) => true
@ -135,23 +135,32 @@ private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) exte
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object LogMessagesInterceptor {
def apply[T](opts: LogOptions): BehaviorInterceptor[T, T] = {
new LogMessagesInterceptor(opts).asInstanceOf[BehaviorInterceptor[T, T]]
}
}
/**
* Log all messages for this decorated ReceiveTarget[T] to logger before receiving it ourselves.
*
* INTERNAL API
*/
@InternalApi
private[akka] final case class LogMessagesInterceptor[T](opts: LogOptions) extends BehaviorInterceptor[T, T] {
private[akka] final class LogMessagesInterceptor(val opts: LogOptions) extends BehaviorInterceptor[Any, Any] {
import BehaviorInterceptor._
override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any]): Behavior[Any] = {
if (opts.enabled)
opts.logger.getOrElse(ctx.asScala.log).log(opts.level, "received message {}", msg)
target(ctx, msg)
}
override def aroundSignal(ctx: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[Any]): Behavior[Any] = {
if (opts.enabled)
opts.logger.getOrElse(ctx.asScala.log).log(opts.level, "received signal {}", signal)
target(ctx, signal)
@ -159,8 +168,8 @@ private[akka] final case class LogMessagesInterceptor[T](opts: LogOptions) exten
// only once in the same behavior stack
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match {
case LogMessagesInterceptor(`opts`) => true
case _ => false
case a: LogMessagesInterceptor => a.opts == opts
case _ => false
}
}
@ -178,7 +187,7 @@ private[akka] object WidenedInterceptor {
* INTERNAL API
*/
@InternalApi
private[akka] final case class WidenedInterceptor[O, I](matcher: PartialFunction[O, I])
private[akka] final case class WidenedInterceptor[O: ClassTag, I](matcher: PartialFunction[O, I])
extends BehaviorInterceptor[O, I] {
import BehaviorInterceptor._
import WidenedInterceptor._
@ -202,8 +211,5 @@ private[akka] final case class WidenedInterceptor[O, I](matcher: PartialFunction
}
}
def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] =
target(ctx, signal)
override def toString: String = s"Widen(${LineNumbers(matcher)})"
}

View file

@ -7,6 +7,7 @@ package akka.actor.typed.internal
import akka.actor.typed.TypedActorContext
import akka.actor.typed.Behavior
import akka.actor.typed.BehaviorInterceptor
import akka.actor.typed.BehaviorSignalInterceptor
import akka.actor.typed.Signal
import akka.annotation.InternalApi
@ -33,12 +34,7 @@ import akka.annotation.InternalApi
* application protocol. Persistent actors handle `PoisonPill` and run side effects after persist
* and process stashed messages before stopping.
*/
@InternalApi private[akka] final class PoisonPillInterceptor[M] extends BehaviorInterceptor[M, M] {
override def aroundReceive(
ctx: TypedActorContext[M],
msg: M,
target: BehaviorInterceptor.ReceiveTarget[M]): Behavior[M] =
target(ctx, msg)
@InternalApi private[akka] final class PoisonPillInterceptor[M] extends BehaviorSignalInterceptor[M] {
override def aroundSignal(
ctx: TypedActorContext[M],

View file

@ -33,11 +33,13 @@ import akka.util.unused
strategy match {
case r: RestartOrBackoff =>
Behaviors.intercept[T, T](() => new RestartSupervisor(initialBehavior, r))(initialBehavior)
Behaviors.intercept[Any, T](() => new RestartSupervisor(initialBehavior, r))(initialBehavior).narrow
case r: Resume =>
Behaviors.intercept[T, T](() => new ResumeSupervisor(r))(initialBehavior)
// stateless so safe to share
Behaviors.intercept[Any, T](() => new ResumeSupervisor(r))(initialBehavior).narrow
case r: Stop =>
Behaviors.intercept[T, T](() => new StopSupervisor(initialBehavior, r))(initialBehavior)
// stateless so safe to share
Behaviors.intercept[Any, T](() => new StopSupervisor(initialBehavior, r))(initialBehavior).narrow
}
}
}
@ -46,9 +48,8 @@ import akka.util.unused
* INTERNAL API
*/
@InternalApi
private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: SupervisorStrategy)(
implicit ev: ClassTag[Thr])
extends BehaviorInterceptor[O, I] {
private abstract class AbstractSupervisor[I, Thr <: Throwable](strategy: SupervisorStrategy)(implicit ev: ClassTag[Thr])
extends BehaviorInterceptor[Any, I] {
private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass
@ -57,18 +58,18 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = {
other match {
case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass => true
case _ => false
case as: AbstractSupervisor[_, Thr] if throwableClass == as.throwableClass => true
case _ => false
}
}
override def aroundStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Behavior[I] = {
override def aroundStart(ctx: TypedActorContext[Any], target: PreStartTarget[I]): Behavior[I] = {
try {
target.start(ctx)
} catch handleExceptionOnStart(ctx, target)
}
def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = {
override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[I]): Behavior[I] = {
try {
target(ctx, signal)
} catch handleSignalException(ctx, target)
@ -92,9 +93,9 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signalOrMessage, ctx.asScala.self))
}
protected def handleExceptionOnStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Catcher[Behavior[I]]
protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[I]): Catcher[Behavior[I]]
protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[I]): Catcher[Behavior[I]]
protected def handleExceptionOnStart(ctx: TypedActorContext[Any], target: PreStartTarget[I]): Catcher[Behavior[I]]
protected def handleSignalException(ctx: TypedActorContext[Any], target: SignalTarget[I]): Catcher[Behavior[I]]
protected def handleReceiveException(ctx: TypedActorContext[Any], target: ReceiveTarget[I]): Catcher[Behavior[I]]
override def toString: String = Logging.simpleName(getClass)
}
@ -103,32 +104,32 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
* For cases where O == I for BehaviorInterceptor.
*/
private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy)
extends AbstractSupervisor[T, T, Thr](ss) {
extends AbstractSupervisor[T, Thr](ss) {
override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[T]): Behavior[T] = {
try {
target(ctx, msg)
target(ctx, msg.asInstanceOf[T])
} catch handleReceiveException(ctx, target)
}
protected def handleException(@unused ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
protected def handleException(@unused ctx: TypedActorContext[Any]): Catcher[Behavior[T]] = {
case NonFatal(t) if isInstanceOfTheThrowableClass(t) =>
BehaviorImpl.failed(t)
}
// convenience if target not required to handle exception
protected def handleExceptionOnStart(ctx: TypedActorContext[T], target: PreStartTarget[T]): Catcher[Behavior[T]] =
protected def handleExceptionOnStart(ctx: TypedActorContext[Any], target: PreStartTarget[T]): Catcher[Behavior[T]] =
handleException(ctx)
protected def handleSignalException(ctx: TypedActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] =
protected def handleSignalException(ctx: TypedActorContext[Any], target: SignalTarget[T]): Catcher[Behavior[T]] =
handleException(ctx)
protected def handleReceiveException(ctx: TypedActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] =
protected def handleReceiveException(ctx: TypedActorContext[Any], target: ReceiveTarget[T]): Catcher[Behavior[T]] =
handleException(ctx)
}
private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Behavior[T], strategy: Stop)
extends SimpleSupervisor[T, Thr](strategy) {
override def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
override def handleException(ctx: TypedActorContext[Any]): Catcher[Behavior[T]] = {
case NonFatal(t) if isInstanceOfTheThrowableClass(t) =>
log(ctx, t)
BehaviorImpl.failed(t)
@ -136,7 +137,7 @@ private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Beh
}
private class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) {
override protected def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
override protected def handleException(ctx: TypedActorContext[Any]): Catcher[Behavior[T]] = {
case NonFatal(t) if isInstanceOfTheThrowableClass(t) =>
log(ctx, t)
t match {
@ -166,13 +167,13 @@ private object RestartSupervisor {
}
}
final case class ScheduledRestart(owner: RestartSupervisor[_, _, _ <: Throwable]) extends DeadLetterSuppression
final case class ResetRestartCount(current: Int, owner: RestartSupervisor[_, _, _ <: Throwable])
final case class ScheduledRestart(owner: RestartSupervisor[_, _ <: Throwable]) extends DeadLetterSuppression
final case class ResetRestartCount(current: Int, owner: RestartSupervisor[_, _ <: Throwable])
extends DeadLetterSuppression
}
private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff)
extends AbstractSupervisor[O, T, Thr](strategy) {
private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff)
extends AbstractSupervisor[T, Thr](strategy) {
import RestartSupervisor._
private var restartingInProgress: OptionVal[(StashBuffer[Any], Set[ActorRef[Nothing]])] = OptionVal.None
@ -185,7 +186,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
case OptionVal.Some(d) => d.hasTimeLeft
}
override def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
restartingInProgress match {
case OptionVal.None =>
super.aroundSignal(ctx, signal, target)
@ -210,8 +211,8 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
}
}
override def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = {
msg.asInstanceOf[Any] match {
override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[T]): Behavior[T] = {
msg match {
case ScheduledRestart(owner) =>
if (owner eq this) {
restartingInProgress match {
@ -259,7 +260,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
}
override protected def handleExceptionOnStart(
ctx: TypedActorContext[O],
ctx: TypedActorContext[Any],
@unused target: PreStartTarget[T]): Catcher[Behavior[T]] = {
case NonFatal(t) if isInstanceOfTheThrowableClass(t) =>
ctx.asScala.cancelAllTimers()
@ -278,23 +279,23 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
}
override protected def handleSignalException(
ctx: TypedActorContext[O],
ctx: TypedActorContext[Any],
target: SignalTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, signalRestart = {
case e: UnstashException[O] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart)
case _ => target(ctx, PreRestart)
case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart)
case _ => target(ctx, PreRestart)
})
}
override protected def handleReceiveException(
ctx: TypedActorContext[O],
ctx: TypedActorContext[Any],
target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, signalRestart = {
case e: UnstashException[O] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart)
case _ => target.signalRestart(ctx)
case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart)
case _ => target.signalRestart(ctx)
})
}
private def handleException(ctx: TypedActorContext[O], signalRestart: Throwable => Unit): Catcher[Behavior[T]] = {
private def handleException(ctx: TypedActorContext[Any], signalRestart: Throwable => Unit): Catcher[Behavior[T]] = {
case NonFatal(t) if isInstanceOfTheThrowableClass(t) =>
ctx.asScala.cancelAllTimers()
if (strategy.maxRestarts != -1 && restartCount >= strategy.maxRestarts && deadlineHasTimeLeft) {
@ -315,7 +316,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
}
}
private def prepareRestart(ctx: TypedActorContext[O], reason: Throwable): Behavior[T] = {
private def prepareRestart(ctx: TypedActorContext[Any], reason: Throwable): Behavior[T] = {
log(ctx, reason)
val currentRestartCount = restartCount
@ -334,7 +335,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
val restartDelay =
calculateDelay(currentRestartCount, backoff.minBackoff, backoff.maxBackoff, backoff.randomFactor)
gotScheduledRestart = false
ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart(this))
ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self, ScheduledRestart(this))
Behaviors.empty
case _: Restart =>
if (childrenToStop.isEmpty)
@ -344,17 +345,14 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
}
}
private def restartCompleted(ctx: TypedActorContext[O]): Behavior[T] = {
private def restartCompleted(ctx: TypedActorContext[Any]): Behavior[T] = {
// probably already done, but doesn't hurt to make sure they are canceled
ctx.asScala.cancelAllTimers()
strategy match {
case backoff: Backoff =>
gotScheduledRestart = false
ctx.asScala.scheduleOnce(
backoff.resetBackoffAfter,
ctx.asScala.self.unsafeUpcast[Any],
ResetRestartCount(restartCount, this))
ctx.asScala.scheduleOnce(backoff.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount, this))
case _: Restart =>
}
@ -364,12 +362,12 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
case OptionVal.None => newBehavior
case OptionVal.Some((stashBuffer, _)) =>
restartingInProgress = OptionVal.None
stashBuffer.unstashAll(ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], newBehavior.unsafeCast)
stashBuffer.unstashAll(ctx.asScala, newBehavior.unsafeCast)
}
nextBehavior.narrow
} catch handleException(ctx, signalRestart = {
case e: UnstashException[O] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart)
case _ => ()
case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart)
case _ => ()
})
}

View file

@ -7,8 +7,8 @@ package akka.actor.typed.internal
import akka.actor.typed.internal.adapter.AbstractLogger
import akka.actor.typed.{ Behavior, BehaviorInterceptor, Signal, TypedActorContext }
import akka.annotation.InternalApi
import scala.collection.immutable.HashMap
import scala.reflect.ClassTag
/**
* INTERNAL API
@ -16,7 +16,7 @@ import scala.collection.immutable.HashMap
@InternalApi private[akka] object WithMdcBehaviorInterceptor {
val noMdcPerMessage = (_: Any) => Map.empty[String, Any]
def apply[T](
def apply[T: ClassTag](
staticMdc: Map[String, Any],
mdcForMessage: T => Map[String, Any],
behavior: Behavior[T]): Behavior[T] = {
@ -31,7 +31,7 @@ import scala.collection.immutable.HashMap
*
* INTERNAL API
*/
@InternalApi private[akka] final class WithMdcBehaviorInterceptor[T] private (
@InternalApi private[akka] final class WithMdcBehaviorInterceptor[T: ClassTag] private (
staticMdc: Map[String, Any],
mdcForMessage: T => Map[String, Any])
extends BehaviorInterceptor[T, T] {

View file

@ -9,7 +9,6 @@ import akka.actor.typed.BehaviorInterceptor
import akka.actor.typed.Signal
import akka.actor.typed.TypedActorContext
import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.StashBuffer
import akka.annotation.InternalApi
@ -34,21 +33,17 @@ private[akka] final class GuardianStartupBehavior[T](val guardianBehavior: Behav
import GuardianStartupBehavior.Start
private val stash = StashBuffer[T](1000)
private val stash = StashBuffer[Any](1000)
override def onMessage(msg: Any): Behavior[Any] =
msg match {
case Start =>
// ctx is not available initially so we cannot use it until here
Behaviors.setup(
ctx =>
stash
.unstashAll(
ctx.asInstanceOf[ActorContext[T]],
Behaviors.intercept(() => new GuardianStopInterceptor[T])(guardianBehavior))
.unsafeCast[Any])
Behaviors.setup(ctx =>
stash
.unstashAll(ctx, Behaviors.intercept(() => new GuardianStopInterceptor)(guardianBehavior.unsafeCast[Any])))
case other =>
stash.stash(other.asInstanceOf[T])
stash.stash(other)
this
}
@ -61,24 +56,24 @@ private[akka] final class GuardianStartupBehavior[T](val guardianBehavior: Behav
* as part of that we must intercept when the guardian is stopped and call ActorSystem.terminate()
* explicitly.
*/
@InternalApi private[akka] final class GuardianStopInterceptor[T] extends BehaviorInterceptor[T, T] {
@InternalApi private[akka] final class GuardianStopInterceptor extends BehaviorInterceptor[Any, Any] {
override def aroundReceive(
ctx: TypedActorContext[T],
msg: T,
target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = {
ctx: TypedActorContext[Any],
msg: Any,
target: BehaviorInterceptor.ReceiveTarget[Any]): Behavior[Any] = {
val next = target(ctx, msg)
interceptStopped(ctx, next)
}
override def aroundSignal(
ctx: TypedActorContext[T],
ctx: TypedActorContext[Any],
signal: Signal,
target: BehaviorInterceptor.SignalTarget[T]): Behavior[T] = {
target: BehaviorInterceptor.SignalTarget[Any]): Behavior[Any] = {
val next = target(ctx, signal)
interceptStopped(ctx, next)
}
private def interceptStopped(ctx: TypedActorContext[T], next: Behavior[T]): Behavior[T] = {
private def interceptStopped(ctx: TypedActorContext[Any], next: Behavior[Any]): Behavior[Any] = {
if (Behavior.isAlive(next))
next
else {

View file

@ -182,9 +182,15 @@ object Behaviors {
* monitor [[akka.actor.typed.ActorRef]] before invoking the wrapped behavior. The
* wrapped behavior can evolve (i.e. return different behavior) without needing to be
* wrapped in a `monitor` call again.
*
* @param interceptMessageClass Ensures that the messages of this class or a subclass thereof will be
* sent to the `monitor`. Other message types (e.g. a private protocol)
* will bypass the interceptor and be continue to the inner behavior.
* @param monitor The messages will also be sent to this `ActorRef`
* @param behavior The inner behavior that is decorated
*/
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
scaladsl.Behaviors.monitor(monitor, behavior)
def monitor[T](interceptMessageClass: Class[T], monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
scaladsl.Behaviors.monitor(monitor, behavior)(ClassTag(interceptMessageClass))
/**
* Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided
@ -269,6 +275,9 @@ object Behaviors {
* }}}
*
*
* @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be
* intercepted. Other message types (e.g. a private protocol) will bypass
* the interceptor and be continue to the inner behavior untouched.
* @param behavior
* the behavior that will receive the selected messages
* @param selector
@ -276,8 +285,11 @@ object Behaviors {
* transformation
* @return a behavior of the widened type
*/
def widened[T, U](behavior: Behavior[T], selector: JFunction[PFBuilder[U, T], PFBuilder[U, T]]): Behavior[U] =
BehaviorImpl.widened(behavior, selector.apply(new PFBuilder).build())
def widened[Outer, Inner](
interceptMessageClass: Class[Outer],
behavior: Behavior[Outer],
selector: JFunction[PFBuilder[Inner, Outer], PFBuilder[Inner, Outer]]): Behavior[Inner] =
BehaviorImpl.widened(behavior, selector.apply(new PFBuilder).build())(ClassTag(interceptMessageClass))
/**
* Support for scheduled `self` messages in an actor.
@ -292,6 +304,9 @@ object Behaviors {
/**
* Per message MDC (Mapped Diagnostic Context) logging.
*
* @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be
* intercepted. Other message types (e.g. a private protocol) will bypass
* the interceptor and be continue to the inner behavior untouched.
* @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after
* each message processing by the inner behavior is done.
* @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through
@ -300,21 +315,28 @@ object Behaviors {
* See also [[akka.actor.typed.Logger.withMdc]]
*/
def withMdc[T](
interceptMessageClass: Class[T],
mdcForMessage: akka.japi.function.Function[T, java.util.Map[String, Any]],
behavior: Behavior[T]): Behavior[T] =
withMdc(Collections.emptyMap[String, Any], mdcForMessage, behavior)
withMdc(interceptMessageClass, Collections.emptyMap[String, Any], mdcForMessage, behavior)
/**
* Static MDC (Mapped Diagnostic Context)
*
* @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be
* intercepted. Other message types (e.g. a private protocol) will bypass
* the interceptor and be continue to the inner behavior untouched.
* @param staticMdc This MDC is setup in the logging context for every message
* @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through
* `ActorContext.log`
*
* See also [[akka.actor.typed.Logger.withMdc]]
*/
def withMdc[T](staticMdc: java.util.Map[String, Any], behavior: Behavior[T]): Behavior[T] =
withMdc(staticMdc, null, behavior)
def withMdc[T](
interceptMessageClass: Class[T],
staticMdc: java.util.Map[String, Any],
behavior: Behavior[T]): Behavior[T] =
withMdc(interceptMessageClass, staticMdc, null, behavior)
/**
* Combination of static and per message MDC (Mapped Diagnostic Context).
@ -325,6 +347,9 @@ object Behaviors {
*
* * The `staticMdc` or `mdcForMessage` may be empty.
*
* @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be
* intercepted. Other message types (e.g. a private protocol) will bypass
* the interceptor and be continue to the inner behavior untouched.
* @param staticMdc A static MDC applied for each message
* @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after
* each message processing by the inner behavior is done.
@ -334,6 +359,7 @@ object Behaviors {
* See also [[akka.actor.typed.Logger.withMdc]]
*/
def withMdc[T](
interceptMessageClass: Class[T],
staticMdc: java.util.Map[String, Any],
mdcForMessage: akka.japi.function.Function[T, java.util.Map[String, Any]],
behavior: Behavior[T]): Behavior[T] = {
@ -349,7 +375,7 @@ object Behaviors {
asScalaMap(mdcForMessage.apply(message))
}
WithMdcBehaviorInterceptor[T](asScalaMap(staticMdc), mdcForMessageFun, behavior)
WithMdcBehaviorInterceptor[T](asScalaMap(staticMdc), mdcForMessageFun, behavior)(ClassTag(interceptMessageClass))
}
}

View file

@ -156,8 +156,15 @@ object Behaviors {
* monitor [[akka.actor.typed.ActorRef]] before invoking the wrapped behavior. The
* wrapped behavior can evolve (i.e. return different behavior) without needing to be
* wrapped in a `monitor` call again.
*
* The `ClassTag` for `T` ensures that the messages of this class or a subclass thereof will be
* sent to the `monitor`. Other message types (e.g. a private protocol) will bypass the interceptor
* and be continue to the inner behavior.
*
* @param monitor The messages will also be sent to this `ActorRef`
* @param behavior The inner behavior that is decorated
*/
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
def monitor[T: ClassTag](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
BehaviorImpl.intercept(() => new MonitorInterceptor[T](monitor))(behavior)
/**
@ -166,7 +173,7 @@ object Behaviors {
* To include an MDC context then first wrap `logMessages` with `withMDC`.
*/
def logMessages[T](behavior: Behavior[T]): Behavior[T] =
BehaviorImpl.intercept(() => new LogMessagesInterceptor[T](LogOptions()))(behavior)
BehaviorImpl.intercept(() => LogMessagesInterceptor[T](LogOptions()))(behavior)
/**
* Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided
@ -174,7 +181,7 @@ object Behaviors {
* To include an MDC context then first wrap `logMessages` with `withMDC`.
*/
def logMessages[T](logOptions: LogOptions, behavior: Behavior[T]): Behavior[T] =
BehaviorImpl.intercept(() => new LogMessagesInterceptor[T](logOptions))(behavior)
BehaviorImpl.intercept(() => LogMessagesInterceptor[T](logOptions))(behavior)
/**
* Wrap the given behavior with the given [[SupervisorStrategy]] for
@ -227,6 +234,10 @@ object Behaviors {
/**
* Per message MDC (Mapped Diagnostic Context) logging.
*
* The `ClassTag` for `T` ensures that only messages of this class or a subclass thereof will be
* intercepted. Other message types (e.g. a private protocol) will bypass the interceptor and be
* continue to the inner behavior untouched.
*
* @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after
* each message processing by the inner behavior is done.
* @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through
@ -234,19 +245,23 @@ object Behaviors {
*
* See also [[akka.actor.typed.Logger.withMdc]]
*/
def withMdc[T](mdcForMessage: T => Map[String, Any])(behavior: Behavior[T]): Behavior[T] =
def withMdc[T: ClassTag](mdcForMessage: T => Map[String, Any])(behavior: Behavior[T]): Behavior[T] =
withMdc[T](Map.empty[String, Any], mdcForMessage)(behavior)
/**
* Static MDC (Mapped Diagnostic Context)
*
* The `ClassTag` for `T` ensures that only messages of this class or a subclass thereof will be
* intercepted. Other message types (e.g. a private protocol) will bypass the interceptor and be
* continue to the inner behavior untouched.
*
* @param staticMdc This MDC is setup in the logging context for every message
* @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through
* `ActorContext.log`
*
* See also [[akka.actor.typed.Logger.withMdc]]
*/
def withMdc[T](staticMdc: Map[String, Any])(behavior: Behavior[T]): Behavior[T] =
def withMdc[T: ClassTag](staticMdc: Map[String, Any])(behavior: Behavior[T]): Behavior[T] =
withMdc[T](staticMdc, (_: T) => Map.empty[String, Any])(behavior)
/**
@ -258,6 +273,10 @@ object Behaviors {
*
* The `staticMdc` or `mdcForMessage` may be empty.
*
* The `ClassTag` for `T` ensures that only messages of this class or a subclass thereof will be
* intercepted. Other message types (e.g. a private protocol) will bypass the interceptor and be
* continue to the inner behavior untouched.
*
* @param staticMdc A static MDC applied for each message
* @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after
* each message processing by the inner behavior is done.
@ -266,7 +285,7 @@ object Behaviors {
*
* See also [[akka.actor.typed.Logger.withMdc]]
*/
def withMdc[T](staticMdc: Map[String, Any], mdcForMessage: T => Map[String, Any])(
def withMdc[T: ClassTag](staticMdc: Map[String, Any], mdcForMessage: T => Map[String, Any])(
behavior: Behavior[T]): Behavior[T] =
WithMdcBehaviorInterceptor[T](staticMdc, mdcForMessage, behavior)

View file

@ -387,7 +387,9 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible
* `ActorContext` parameter removed in `javadsl.ReceiveBuilder` for the functional style in Java. Use `Behaviors.setup`
to retrieve `ActorContext`, and use an enclosing class to hold initialization parameters and `ActorContext`.
* Java @apidoc[akka.cluster.sharding.typed.javadsl.EntityRef] ask timeout now takes a `java.time.Duration` rather than a @apidoc[Timeout]
* `BehaviorInterceptor`, `Behaviors.monitor`, `Behaviors.withMdc` and @scala[`widen`]@java[`Behaviors.widen`] takes
a @scala[`ClassTag` parameter (probably source compatible)]@java[`interceptMessageClass` parameter].
`interceptMessageType` method in `BehaviorInterceptor` is replaced with this @scala[`ClassTag`]@java[`Class`] parameter.
#### Akka Typed Stream API changes

View file

@ -133,10 +133,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
val onStopInterceptor = new BehaviorInterceptor[Any, Any] {
import BehaviorInterceptor._
def aroundReceive(ctx: typed.TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any])
override def aroundReceive(ctx: typed.TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any])
: Behavior[Any] = { target(ctx, msg) }
def aroundSignal(ctx: typed.TypedActorContext[Any], signal: Signal, target: SignalTarget[Any])
override def aroundSignal(ctx: typed.TypedActorContext[Any], signal: Signal, target: SignalTarget[Any])
: Behavior[Any] = {
if (signal == PostStop) {
eventSourcedSetup.cancelRecoveryTimer()

View file

@ -493,12 +493,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
TestProbe<Object> interceptProbe = testKit.createTestProbe();
TestProbe<Signal> signalProbe = testKit.createTestProbe();
BehaviorInterceptor<Command, Command> tap =
new BehaviorInterceptor<Command, Command>() {
@Override
public Class<? extends Command> interceptMessageType() {
return Command.class;
}
new BehaviorInterceptor<Command, Command>(Command.class) {
@Override
public Behavior<Command> aroundReceive(

View file

@ -0,0 +1,119 @@
/*
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.persistence.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.testkit.typed.scaladsl._
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.BehaviorInterceptor
import akka.actor.typed.TypedActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.persistence.typed.PersistenceId
import akka.testkit.EventFilter
import akka.testkit.TestEvent.Mute
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
import org.scalatest.WordSpecLike
object EventSourcedBehaviorInterceptorSpec {
val journalId = "event-sourced-behavior-interceptor-spec"
def config: Config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
akka.loggers = [akka.testkit.TestEventListener]
akka.persistence.journal.plugin = "akka.persistence.journal.inmem"
""")
def testBehavior(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] =
Behaviors.setup { _ =>
EventSourcedBehavior[String, String, String](
persistenceId,
emptyState = "",
commandHandler = (_, command) =>
command match {
case _ =>
Effect.persist(command).thenRun(newState => probe ! newState)
},
eventHandler = (state, evt) => state + evt)
}
}
class EventSourcedBehaviorInterceptorSpec
extends ScalaTestWithActorTestKit(EventSourcedBehaviorTimersSpec.config)
with WordSpecLike {
import EventSourcedBehaviorInterceptorSpec._
val pidCounter = new AtomicInteger(0)
private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})")
import akka.actor.typed.scaladsl.adapter._
// needed for the untyped event filter
private implicit val untypedSystem: akka.actor.ActorSystem = system.toUntyped
untypedSystem.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1)))
"EventSourcedBehavior interceptor" must {
"be possible to combine with another interceptor" in {
val probe = createTestProbe[String]()
val pid = nextPid()
val toUpper = new BehaviorInterceptor[String, String] {
override def aroundReceive(
ctx: TypedActorContext[String],
msg: String,
target: BehaviorInterceptor.ReceiveTarget[String]): Behavior[String] = {
target(ctx, msg.toUpperCase())
}
}
val ref = spawn(Behaviors.intercept(() => toUpper)(testBehavior(pid, probe.ref)))
ref ! "a"
ref ! "bc"
probe.expectMessage("A")
probe.expectMessage("ABC")
}
"be possible to combine with widen" in {
// EventSourcedBehaviorImpl should use a plain BehaviorInterceptor instead of widen
pending // FIXME #25887
val probe = createTestProbe[String]()
val pid = nextPid()
val ref = spawn(testBehavior(pid, probe.ref).widen[String] {
case s => s.toUpperCase()
})
ref ! "a"
ref ! "bc"
probe.expectMessage("A")
probe.expectMessage("ABC")
}
"be possible to combine with MDC" in {
val probe = createTestProbe[String]()
val pid = nextPid()
val ref = spawn(Behaviors.setup[String] { _ =>
Behaviors
.withMdc(staticMdc = Map("pid" -> pid), mdcForMessage = (msg: String) => Map("msg" -> msg.toUpperCase())) {
testBehavior(pid, probe.ref)
}
})
ref ! "a"
ref ! "bc"
probe.expectMessage("a")
probe.expectMessage("abc")
}
}
}