From 64fa2979eae3f8b92fff74ec1aa2eb6822c3a720 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 2 Jul 2019 17:49:48 +0200 Subject: [PATCH] ClassTag in BehaviorInterceptor, #25887 (#27148) * Always be explicit about what message types an interceptor can handle, to avoid ClassCastException if another message type is passing. That may happen when the inner behavior understands other messages than it says in it's declared behavior type by using narrow. EventSourcedBehaviorImpl is an example. * Minimized failing tests * Supervision interceptor is of type Any since failures of all messages must be handled * Changed PoisonPillInterceptor to only intercept signals * rename type params to Outer and Inner * separate BehaviorSignalInterceptor * which only intercepts signals and messages bypass, e.g. PoisonPillInterceptor * also made aroundSignal optional to override in BehaviorInterceptor * Add test for interceptors combined with EventSourcedBehavior * ClassTag not needed for LogMessagesInterceptor * since it can handle Any * test supervision of different message type * clarify low level * docs for interceptMessageClass param and ClassTag * remove O type parameter in supervision * remove extra setup for RestartSupervisor, already factory * mention in migration guide --- .../javadsl/AsyncTestingExampleTest.java | 2 +- .../actor/typed/javadsl/ActorCompile.java | 6 +- .../actor/typed/javadsl/InterceptTest.java | 12 +- .../akka/actor/typed/ActorContextSpec.scala | 18 +- .../scala/akka/actor/typed/BehaviorSpec.scala | 2 +- .../akka/actor/typed/InterceptSpec.scala | 201 +++++++++++++----- .../akka/actor/typed/LogMessagesSpec.scala | 10 + .../akka/actor/typed/SupervisionSpec.scala | 36 +++- .../akka/actor/typed/scaladsl/StopSpec.scala | 7 - .../scala/akka/actor/typed/Behavior.scala | 9 +- .../actor/typed/BehaviorInterceptor.scala | 95 +++++++-- .../actor/typed/internal/BehaviorImpl.scala | 4 +- .../typed/internal/InterceptorImpl.scala | 38 ++-- .../actor/typed/internal/PoisonPill.scala | 8 +- .../actor/typed/internal/Supervision.scala | 92 ++++---- .../internal/WithMdcBehaviorInterceptor.scala | 6 +- .../adapter/GuardianStartupBehavior.scala | 29 ++- .../akka/actor/typed/javadsl/Behaviors.scala | 42 +++- .../akka/actor/typed/scaladsl/Behaviors.scala | 31 ++- .../project/migration-guide-2.5.x-2.6.x.md | 4 +- .../internal/EventSourcedBehaviorImpl.scala | 4 +- .../javadsl/PersistentActorJavaDslTest.java | 7 +- .../EventSourcedBehaviorInterceptorSpec.scala | 119 +++++++++++ 23 files changed, 558 insertions(+), 224 deletions(-) create mode 100644 akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorInterceptorSpec.scala diff --git a/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java b/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java index e5994ef93d..56bebd2712 100644 --- a/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java +++ b/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java @@ -167,7 +167,7 @@ public class AsyncTestingExampleTest }); TestProbe probe = testKit.createTestProbe(); ActorRef mockedPublisher = - testKit.spawn(Behaviors.monitor(probe.ref(), mockedBehavior)); + testKit.spawn(Behaviors.monitor(Message.class, probe.ref(), mockedBehavior)); // test our component Producer producer = new Producer(testKit.scheduler(), mockedPublisher); diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java index be9e98ef83..344695360e 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java @@ -41,7 +41,7 @@ public class ActorCompile { Behavior actor6 = intercept( () -> - new BehaviorInterceptor() { + new BehaviorInterceptor(MyMsg.class) { @Override public Behavior aroundReceive( TypedActorContext context, MyMsg message, ReceiveTarget target) { @@ -60,9 +60,9 @@ public class ActorCompile { setup( context -> { final ActorRef self = context.getSelf(); - return monitor(self, ignore()); + return monitor(MyMsg.class, self, ignore()); }); - Behavior actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x)); + Behavior actor9 = widened(MyMsgA.class, actor7, pf -> pf.match(MyMsgA.class, x -> x)); Behavior actor10 = Behaviors.receive((context, message) -> stopped(() -> {}), (context, signal) -> same()); diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java index 5a60e48c32..5031871439 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java @@ -21,7 +21,7 @@ public class InterceptTest extends JUnitSuite { public void interceptMessage() { final TestProbe interceptProbe = testKit.createTestProbe(); BehaviorInterceptor interceptor = - new BehaviorInterceptor() { + new BehaviorInterceptor(String.class) { @Override public Behavior aroundReceive( TypedActorContext ctx, String msg, ReceiveTarget target) { @@ -59,15 +59,10 @@ public class InterceptTest extends JUnitSuite { static class B implements Message {} @Test - public void interceptMessagesSelectively() { + public void interceptMessageSubclasses() { final TestProbe interceptProbe = testKit.createTestProbe(); BehaviorInterceptor interceptor = - new BehaviorInterceptor() { - - @Override - public Class interceptMessageType() { - return B.class; - } + new BehaviorInterceptor(Message.class) { @Override public Behavior aroundReceive( @@ -96,6 +91,7 @@ public class InterceptTest extends JUnitSuite { ref.tell(new A()); ref.tell(new B()); + interceptProbe.expectMessageClass(A.class); probe.expectMessageClass(A.class); interceptProbe.expectMessageClass(B.class); probe.expectMessageClass(B.class); diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala index 8fdc701048..d58bbc02e1 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala @@ -73,10 +73,10 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(""" import ActorSpecMessages._ - def decoration[T]: Behavior[T] => Behavior[T] + def decoration[T: ClassTag]: Behavior[T] => Behavior[T] implicit class BehaviorDecorator[T](behavior: Behavior[T])(implicit ev: ClassTag[T]) { - def decorate: Behavior[T] = decoration[T](behavior) + def decorate: Behavior[T] = decoration[T](ev)(behavior) } "An ActorContext" must { @@ -680,33 +680,31 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(""" class NormalActorContextSpec extends ActorContextSpec { - override def decoration[T] = x => x + override def decoration[T: ClassTag] = x => x } class WidenActorContextSpec extends ActorContextSpec { - override def decoration[T] = b => b.widen { case x => x } + override def decoration[T: ClassTag] = b => b.widen { case x => x } } class DeferredActorContextSpec extends ActorContextSpec { - override def decoration[T] = b => Behaviors.setup(_ => b) + override def decoration[T: ClassTag] = b => Behaviors.setup(_ => b) } class NestedDeferredActorContextSpec extends ActorContextSpec { - override def decoration[T] = b => Behaviors.setup(_ => Behaviors.setup(_ => b)) + override def decoration[T: ClassTag] = b => Behaviors.setup(_ => Behaviors.setup(_ => b)) } class InterceptActorContextSpec extends ActorContextSpec { import BehaviorInterceptor._ - def tap[T] = new BehaviorInterceptor[T, T] { + def tap[T: ClassTag] = new BehaviorInterceptor[T, T] { override def aroundReceive(context: TypedActorContext[T], message: T, target: ReceiveTarget[T]): Behavior[T] = target(context, message) - override def aroundSignal(context: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = - target(context, signal) } - override def decoration[T]: Behavior[T] => Behavior[T] = b => Behaviors.intercept[T, T](() => tap)(b) + override def decoration[T: ClassTag]: Behavior[T] => Behavior[T] = b => Behaviors.intercept[T, T](() => tap)(b) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala index 018f1794af..d083969ac4 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala @@ -589,7 +589,7 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable { class WidenedJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with Siphon { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Command]("widenedListener") - JBehaviors.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x => { + JBehaviors.widened(classOf[Command], super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x => { inbox.ref ! x x })))) -> inbox diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala index f99a57f01b..dea99b7b0d 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala @@ -10,10 +10,12 @@ import akka.testkit.EventFilter import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.scaladsl.Behaviors import org.scalatest.WordSpecLike - import scala.concurrent.duration._ + import akka.actor.ActorInitializationException import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.internal.PoisonPill +import akka.actor.typed.internal.PoisonPillInterceptor object InterceptSpec { final case class Msg(hello: String, replyTo: ActorRef[String]) @@ -28,16 +30,48 @@ object InterceptSpec { target(context, message) } - override def aroundSignal( - context: TypedActorContext[String], - signal: Signal, - target: SignalTarget[String]): Behavior[String] = { - target(context, signal) - } - override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other.isInstanceOf[SameTypeInterceptor] } + + // This is similar to how EventSourcedBehavior is implemented + object MultiProtocol { + final case class ExternalResponse(s: String) + final case class Command(s: String) + + sealed trait InternalProtocol + object InternalProtocol { + final case class WrappedCommand(c: Command) extends InternalProtocol + final case class WrappedExternalResponse(r: ExternalResponse) extends InternalProtocol + } + + private class ProtocolTransformer extends BehaviorInterceptor[Any, InternalProtocol] { + override def aroundReceive( + ctx: TypedActorContext[Any], + msg: Any, + target: BehaviorInterceptor.ReceiveTarget[InternalProtocol]): Behavior[InternalProtocol] = { + val wrapped = msg match { + case c: Command => InternalProtocol.WrappedCommand(c) + case r: ExternalResponse => InternalProtocol.WrappedExternalResponse(r) + } + target(ctx, wrapped) + } + + } + + def apply(probe: ActorRef[String]): Behavior[Command] = { + Behaviors + .intercept(() => new ProtocolTransformer)(Behaviors.receiveMessage[InternalProtocol] { + case InternalProtocol.WrappedCommand(cmd) => + probe ! cmd.s + Behaviors.same + case InternalProtocol.WrappedExternalResponse(rsp) => + probe ! rsp.s + Behaviors.same + }) + .narrow + } + } } class InterceptSpec extends ScalaTestWithActorTestKit(""" @@ -60,14 +94,6 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" probe ! ("after " + message) b } - - override def aroundSignal( - context: TypedActorContext[String], - signal: Signal, - target: SignalTarget[String]): Behavior[String] = { - target(context, signal) - } - // keeping the instance equality as "isSame" for these } @@ -187,12 +213,6 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" message: String, target: ReceiveTarget[String]): Behavior[String] = target(context, message) - - def aroundSignal( - context: TypedActorContext[String], - signal: Signal, - target: SignalTarget[String]): Behavior[String] = - target(context, signal) } val innerBehaviorStarted = new AtomicBoolean(false) @@ -276,7 +296,30 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" } - "be useful for implementing PoisonPill" in { + "be useful for implementing signal based PoisonPill" in { + + def inner(count: Int): Behavior[Msg] = Behaviors.receiveMessage { + case Msg(hello, replyTo) => + replyTo ! s"$hello-$count" + inner(count + 1) + } + + val decorated: Behavior[Msg] = + Behaviors.intercept(() => new PoisonPillInterceptor[Msg])(inner(0)) + + val ref = spawn(decorated) + val probe = TestProbe[String]() + ref ! Msg("hello", probe.ref) + probe.expectMessage("hello-0") + ref ! Msg("hello", probe.ref) + probe.expectMessage("hello-1") + + ref.unsafeUpcast[Any] ! PoisonPill + + probe.expectTerminated(ref, probe.remainingOrDefault) + } + + "be useful for implementing custom message based PoisonPill" in { def inner(count: Int): Behavior[Msg] = Behaviors.receiveMessage { case Msg(hello, replyTo) => @@ -295,12 +338,6 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" case _ => Behaviors.unhandled } - override def aroundSignal( - context: TypedActorContext[Any], - signal: Signal, - target: SignalTarget[Msg]): Behavior[Msg] = - target.apply(context, signal) - } val decorated: Behavior[Msg] = @@ -318,34 +355,28 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" probe.expectTerminated(ref, probe.remainingOrDefault) } - "be able to intercept a subset of the messages" in { + "be able to intercept message subclasses" in { trait Message class A extends Message class B extends Message val interceptProbe = TestProbe[Message]() - val partialInterceptor: BehaviorInterceptor[Message, Message] = new BehaviorInterceptor[Message, Message] { + val interceptor: BehaviorInterceptor[Message, Message] = + new BehaviorInterceptor[Message, Message] { - override def interceptMessageType = classOf[B] + override def aroundReceive( + ctx: TypedActorContext[Message], + msg: Message, + target: ReceiveTarget[Message]): Behavior[Message] = { + interceptProbe.ref ! msg + target(ctx, msg) + } - override def aroundReceive( - ctx: TypedActorContext[Message], - msg: Message, - target: ReceiveTarget[Message]): Behavior[Message] = { - interceptProbe.ref ! msg - target(ctx, msg) } - override def aroundSignal( - ctx: TypedActorContext[Message], - signal: Signal, - target: SignalTarget[Message]): Behavior[Message] = - target(ctx, signal) - } - val probe = TestProbe[Message]() - val ref = spawn(Behaviors.intercept(() => partialInterceptor)(Behaviors.receiveMessage { msg => + val ref = spawn(Behaviors.intercept(() => interceptor)(Behaviors.receiveMessage { msg => probe.ref ! msg Behaviors.same })) @@ -353,6 +384,7 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" ref ! new A ref ! new B + interceptProbe.expectMessageType[A] probe.expectMessageType[A] interceptProbe.expectMessageType[B] probe.expectMessageType[B] @@ -360,14 +392,8 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" "intercept PostStop" in { val probe = TestProbe[String]() - val postStopInterceptor = new BehaviorInterceptor[String, String] { - def aroundReceive( - ctx: TypedActorContext[String], - msg: String, - target: ReceiveTarget[String]): Behavior[String] = { - target(ctx, msg) - } - def aroundSignal( + val postStopInterceptor = new BehaviorSignalInterceptor[String] { + override def aroundSignal( ctx: TypedActorContext[String], signal: Signal, target: SignalTarget[String]): Behavior[String] = { @@ -420,4 +446,71 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" } } + "Protocol transformer interceptor" must { + import MultiProtocol._ + + "be possible to combine with another interceptor" in { + val probe = createTestProbe[String]() + + val toUpper = new BehaviorInterceptor[Command, Command] { + override def aroundReceive( + ctx: TypedActorContext[Command], + msg: Command, + target: BehaviorInterceptor.ReceiveTarget[Command]): Behavior[Command] = { + target(ctx, Command(msg.s.toUpperCase())) + } + + } + + val ref = spawn(Behaviors.intercept(() => toUpper)(MultiProtocol(probe.ref))) + + ref ! Command("a") + probe.expectMessage("A") + ref.unsafeUpcast ! ExternalResponse("b") + probe.expectMessage("b") // bypass toUpper interceptor + } + + "be possible to combine with widen" in { + val probe = createTestProbe[String]() + val ref = spawn(MultiProtocol(probe.ref).widen[String] { + case s => Command(s.toUpperCase()) + }) + + ref ! "a" + probe.expectMessage("A") + ref.unsafeUpcast ! ExternalResponse("b") + probe.expectMessage("b") // bypass widen interceptor + } + + "be possible to combine with MDC" in { + val probe = createTestProbe[String]() + val ref = spawn(Behaviors.setup[Command] { _ => + Behaviors.withMdc(staticMdc = Map("x" -> "y"), mdcForMessage = (msg: Command) => { + probe.ref ! s"mdc:${msg.s.toUpperCase()}" + Map("msg" -> msg.s.toUpperCase()) + }) { + MultiProtocol(probe.ref) + } + }) + + ref ! Command("a") + probe.expectMessage("mdc:A") + probe.expectMessage("a") + ref.unsafeUpcast ! ExternalResponse("b") + probe.expectMessage("b") // bypass mdc interceptor + + } + + "be possible to combine with PoisonPillInterceptor" in { + val probe = createTestProbe[String]() + val ref = + spawn(Behaviors.intercept(() => new PoisonPillInterceptor[MultiProtocol.Command])(MultiProtocol(probe.ref))) + + ref ! Command("a") + probe.expectMessage("a") + ref.unsafeUpcast ! PoisonPill + probe.expectTerminated(ref, probe.remainingOrDefault) + } + } + } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala index 16a1d0e90c..375e71eeb7 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala @@ -106,5 +106,15 @@ class LogMessagesSpec extends ScalaTestWithActorTestKit(""" } } + "log messages of different type" in { + val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.ignore[String]) + + val ref = spawn(behavior) + + EventFilter.debug("received message 13", source = ref.path.toString, occurrences = 1).intercept { + ref.unsafeUpcast[Any] ! 13 + } + } + } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 863e1c090e..136cc0078d 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -1084,12 +1084,6 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" message: String, target: ReceiveTarget[String]): Behavior[String] = target(context, message) - - override def aroundSignal( - context: TypedActorContext[String], - signal: Signal, - target: SignalTarget[String]): Behavior[String] = - target(context, signal) } val behv = supervise[String](Behaviors.receiveMessage { @@ -1235,6 +1229,36 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" } } + "handle exceptions from different message type" in { + val probe = TestProbe[Event]("evt") + + val inner: Behavior[Command] = Behaviors + .receiveMessage[Any] { + case Ping(n) => + probe.ref ! Pong(n) + Behaviors.same + case _ => throw new Exc1 + } + .receiveSignal { + case (_, PreRestart) => + probe.ref ! ReceivedSignal(PreRestart) + Behaviors.same + } + .narrow + + val behv = Behaviors.supervise(inner).onFailure[Exc1](SupervisorStrategy.restart) + val ref = spawn(behv) + ref ! Ping(1) + probe.expectMessage(Pong(1)) + + EventFilter[Exc1](occurrences = 1).intercept { + ref.unsafeUpcast ! "boom" + probe.expectMessage(ReceivedSignal(PreRestart)) + } + ref ! Ping(2) + probe.expectMessage(Pong(2)) + } + } val allStrategies = Seq( diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala index 5013c22db3..e3ef465798 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala @@ -74,13 +74,6 @@ class StopSpec extends ScalaTestWithActorTestKit with WordSpecLike { target: ReceiveTarget[AnyRef]): Behavior[AnyRef] = { target(context, message) } - - override def aroundSignal( - context: typed.TypedActorContext[AnyRef], - signal: Signal, - target: SignalTarget[AnyRef]): Behavior[AnyRef] = { - target(context, signal) - } })(Behaviors.stopped { () => probe.ref ! Done }) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 276c441fce..e89068c5aa 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -6,6 +6,7 @@ package akka.actor.typed import scala.annotation.switch import scala.annotation.tailrec +import scala.reflect.ClassTag import akka.actor.InvalidMessageException import akka.actor.typed.internal.BehaviorImpl @@ -117,7 +118,7 @@ abstract class ExtensibleBehavior[T] extends Behavior[T](BehaviorTags.Extensible object Behavior { - final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal { + final implicit class BehaviorDecorators[Inner](val behavior: Behavior[Inner]) extends AnyVal { /** * Widen the wrapped Behavior by placing a funnel in front of it: the supplied @@ -134,8 +135,12 @@ object Behavior { * } * }}} * + * The `ClassTag` for `Outer` ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass + * the interceptor and be continue to the inner behavior untouched. + * */ - def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = + def widen[Outer: ClassTag](matcher: PartialFunction[Outer, Inner]): Behavior[Outer] = BehaviorImpl.widened(behavior, matcher) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala index 6c2defed72..c0ed84aab7 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala @@ -4,32 +4,46 @@ package akka.actor.typed +import scala.reflect.ClassTag + import akka.annotation.{ DoNotInherit, InternalApi } +import akka.util.BoxedType /** * A behavior interceptor allows for intercepting message and signal reception and perform arbitrary logic - - * transform, filter, send to a side channel etc. It is the core API for decoration of behaviors. Many built-in - * intercepting behaviors are provided through factories in the respective `Behaviors`. + * transform, filter, send to a side channel etc. It is the core API for decoration of behaviors. * - * If the interceptor does keep mutable state care must be taken to create the instance in a `setup` block - * so that a new instance is created per spawned actor rather than shared among actor instance. + * The `BehaviorInterceptor` API is considered a low level tool for building other features and + * shouldn't be used for "normal" application logic. Several built-in intercepting behaviors + * are provided through factories in the respective `Behaviors`. * - * @tparam O The outer message type – the type of messages the intercepting behavior will accept - * @tparam I The inner message type - the type of message the wrapped behavior accepts + * If the interceptor does keep mutable state care must be taken to create a new instance from + * the factory function of `Behaviors.intercept` so that a new instance is created per spawned + * actor rather than shared among actor instance. + * + * @param interceptMessageClass Ensures that the interceptor will only receive `O` message types. + * If the message is not of this class or a subclass thereof + * (e.g. a private protocol) will bypass the interceptor and be + * continue to the inner behavior untouched. + * + * @tparam Outer The outer message type – the type of messages the intercepting behavior will accept + * @tparam Inner The inner message type - the type of message the wrapped behavior accepts + * + * @see [[BehaviorSignalInterceptor]] */ -abstract class BehaviorInterceptor[O, I] { +abstract class BehaviorInterceptor[Outer, Inner](val interceptMessageClass: Class[Outer]) { import BehaviorInterceptor._ /** - * Allows for applying the interceptor only to certain message types. Useful if the official protocol and the actual - * protocol of an actor causes problems, for example class cast exceptions for a message not of type `O` that - * the actor still knows how to deal with. Note that this is only possible to use when `O` and `I` are the same type. - * - * @return A subtype of `O` that should be intercepted or `null` to intercept all `O`s. - * Subtypes of `O` matching this are passed directly to the inner behavior without interception. + * Scala API: The `ClassTag` for `Outer` ensures that only messages of this class or a subclass + * thereof will be intercepted. Other message types (e.g. a private protocol) will bypass the + * interceptor and be continue to the inner behavior untouched. */ - // null for all to avoid having to deal with class tag/explicit class in the default case of no filter - def interceptMessageType: Class[_ <: O] = null + def this()(implicit interceptMessageClassTag: ClassTag[Outer]) = + this({ + val runtimeClass = interceptMessageClassTag.runtimeClass + (if (runtimeClass eq null) runtimeClass else BoxedType(runtimeClass)).asInstanceOf[Class[Outer]] + }) /** * Override to intercept actor startup. To trigger startup of @@ -37,7 +51,7 @@ abstract class BehaviorInterceptor[O, I] { * @return The returned behavior will be the "started" behavior of the actor used to accept * the next message or signal. */ - def aroundStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Behavior[I] = + def aroundStart(ctx: TypedActorContext[Outer], target: PreStartTarget[Inner]): Behavior[Inner] = target.start(ctx) /** @@ -47,15 +61,18 @@ abstract class BehaviorInterceptor[O, I] { * * @return The behavior for next message or signal */ - def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[I]): Behavior[I] + def aroundReceive(ctx: TypedActorContext[Outer], msg: Outer, target: ReceiveTarget[Inner]): Behavior[Inner] /** - * Intercept a signal sent to the running actor. Pass the signal on to the next behavior + * Override to intercept a signal sent to the running actor. Pass the signal on to the next behavior * in the stack by passing it to `target.apply`. * * @return The behavior for next message or signal + * + * @see [[BehaviorSignalInterceptor]] */ - def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] + def aroundSignal(ctx: TypedActorContext[Outer], signal: Signal, target: SignalTarget[Inner]): Behavior[Inner] = + target(ctx, signal) /** * @return `true` if this behavior logically the same as another behavior interceptor and can therefore be eliminated @@ -110,3 +127,43 @@ object BehaviorInterceptor { } } + +/** + * A behavior interceptor allows for intercepting signals reception and perform arbitrary logic - + * transform, filter, send to a side channel etc. + * + * The `BehaviorSignalInterceptor` API is considered a low level tool for building other features and + * shouldn't be used for "normal" application logic. Several built-in intercepting behaviors + * are provided through factories in the respective `Behaviors`. + * + * If the interceptor does keep mutable state care must be taken to create a new instance from + * the factory function of `Behaviors.intercept` so that a new instance is created per spawned + * actor rather than shared among actor instance. + * + * @tparam Inner The inner message type - the type of message the wrapped behavior accepts + * + * @see [[BehaviorInterceptor]] + */ +abstract class BehaviorSignalInterceptor[Inner] extends BehaviorInterceptor[Inner, Inner](null) { + import BehaviorInterceptor._ + + /** + * Only signals and not messages are intercepted by `BehaviorSignalInterceptor`. + */ + final override def aroundReceive( + ctx: TypedActorContext[Inner], + msg: Inner, + target: ReceiveTarget[Inner]): Behavior[Inner] = { + // by using `null` as interceptMessageClass of `BehaviorInterceptor` no messages will pass here + throw new IllegalStateException(s"Unexpected message in ${getClass.getName}, it should only intercept signals.") + } + + /** + * Intercept a signal sent to the running actor. Pass the signal on to the next behavior + * in the stack by passing it to `target.apply`. + * + * @return The behavior for next message or signal + */ + override def aroundSignal(ctx: TypedActorContext[Inner], signal: Signal, target: SignalTarget[Inner]): Behavior[Inner] + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index ef6ee879ff..8fa0873813 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -5,6 +5,8 @@ package akka.actor.typed package internal +import scala.reflect.ClassTag + import akka.util.LineNumbers import akka.annotation.InternalApi import akka.actor.typed.{ TypedActorContext => AC } @@ -40,7 +42,7 @@ private[akka] object BehaviorTags { def as[U]: AC[U] = ctx.asInstanceOf[AC[U]] } - def widened[O, I](behavior: Behavior[I], matcher: PartialFunction[O, I]): Behavior[O] = + def widened[O: ClassTag, I](behavior: Behavior[I], matcher: PartialFunction[O, I]): Behavior[O] = intercept(() => WidenedInterceptor(matcher))(behavior) def same[T]: Behavior[T] = SameBehavior.unsafeCast[T] diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala index c0f116fced..8bb0934c97 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala @@ -4,6 +4,8 @@ package akka.actor.typed.internal +import scala.reflect.ClassTag + import akka.actor.typed import akka.actor.typed.scaladsl.Behaviors @@ -74,9 +76,10 @@ private[akka] final class InterceptorImpl[O, I]( new InterceptorImpl(interceptor, newNested) override def receive(ctx: typed.TypedActorContext[O], msg: O): Behavior[O] = { - val interceptMessageType = interceptor.interceptMessageType + // TODO performance optimization could maybe to avoid isAssignableFrom if interceptMessageClass is Class[Object]? + val interceptMessageClass = interceptor.interceptMessageClass val result = - if (interceptMessageType == null || interceptMessageType.isAssignableFrom(msg.getClass)) + if ((interceptMessageClass ne null) && interceptor.interceptMessageClass.isAssignableFrom(msg.getClass)) interceptor.aroundReceive(ctx, msg, receiveTarget) else receiveTarget.apply(ctx, msg.asInstanceOf[I]) @@ -115,7 +118,8 @@ private[akka] final class InterceptorImpl[O, I]( * INTERNAL API */ @InternalApi -private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) extends BehaviorInterceptor[T, T] { +private[akka] final case class MonitorInterceptor[T: ClassTag](actorRef: ActorRef[T]) + extends BehaviorInterceptor[T, T] { import BehaviorInterceptor._ override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = { @@ -123,10 +127,6 @@ private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) exte target(ctx, msg) } - override def aroundSignal(ctx: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = { - target(ctx, signal) - } - // only once to the same actor in the same behavior stack override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match { case MonitorInterceptor(`actorRef`) => true @@ -135,23 +135,32 @@ private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) exte } +/** + * INTERNAL API + */ +@InternalApi private[akka] object LogMessagesInterceptor { + def apply[T](opts: LogOptions): BehaviorInterceptor[T, T] = { + new LogMessagesInterceptor(opts).asInstanceOf[BehaviorInterceptor[T, T]] + } +} + /** * Log all messages for this decorated ReceiveTarget[T] to logger before receiving it ourselves. * * INTERNAL API */ @InternalApi -private[akka] final case class LogMessagesInterceptor[T](opts: LogOptions) extends BehaviorInterceptor[T, T] { +private[akka] final class LogMessagesInterceptor(val opts: LogOptions) extends BehaviorInterceptor[Any, Any] { import BehaviorInterceptor._ - override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = { + override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any]): Behavior[Any] = { if (opts.enabled) opts.logger.getOrElse(ctx.asScala.log).log(opts.level, "received message {}", msg) target(ctx, msg) } - override def aroundSignal(ctx: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = { + override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[Any]): Behavior[Any] = { if (opts.enabled) opts.logger.getOrElse(ctx.asScala.log).log(opts.level, "received signal {}", signal) target(ctx, signal) @@ -159,8 +168,8 @@ private[akka] final case class LogMessagesInterceptor[T](opts: LogOptions) exten // only once in the same behavior stack override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match { - case LogMessagesInterceptor(`opts`) => true - case _ => false + case a: LogMessagesInterceptor => a.opts == opts + case _ => false } } @@ -178,7 +187,7 @@ private[akka] object WidenedInterceptor { * INTERNAL API */ @InternalApi -private[akka] final case class WidenedInterceptor[O, I](matcher: PartialFunction[O, I]) +private[akka] final case class WidenedInterceptor[O: ClassTag, I](matcher: PartialFunction[O, I]) extends BehaviorInterceptor[O, I] { import BehaviorInterceptor._ import WidenedInterceptor._ @@ -202,8 +211,5 @@ private[akka] final case class WidenedInterceptor[O, I](matcher: PartialFunction } } - def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = - target(ctx, signal) - override def toString: String = s"Widen(${LineNumbers(matcher)})" } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala index a613d94dd8..17406e2f9d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala @@ -7,6 +7,7 @@ package akka.actor.typed.internal import akka.actor.typed.TypedActorContext import akka.actor.typed.Behavior import akka.actor.typed.BehaviorInterceptor +import akka.actor.typed.BehaviorSignalInterceptor import akka.actor.typed.Signal import akka.annotation.InternalApi @@ -33,12 +34,7 @@ import akka.annotation.InternalApi * application protocol. Persistent actors handle `PoisonPill` and run side effects after persist * and process stashed messages before stopping. */ -@InternalApi private[akka] final class PoisonPillInterceptor[M] extends BehaviorInterceptor[M, M] { - override def aroundReceive( - ctx: TypedActorContext[M], - msg: M, - target: BehaviorInterceptor.ReceiveTarget[M]): Behavior[M] = - target(ctx, msg) +@InternalApi private[akka] final class PoisonPillInterceptor[M] extends BehaviorSignalInterceptor[M] { override def aroundSignal( ctx: TypedActorContext[M], diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index 5ca5bc696f..872c5bee46 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -33,11 +33,13 @@ import akka.util.unused strategy match { case r: RestartOrBackoff => - Behaviors.intercept[T, T](() => new RestartSupervisor(initialBehavior, r))(initialBehavior) + Behaviors.intercept[Any, T](() => new RestartSupervisor(initialBehavior, r))(initialBehavior).narrow case r: Resume => - Behaviors.intercept[T, T](() => new ResumeSupervisor(r))(initialBehavior) + // stateless so safe to share + Behaviors.intercept[Any, T](() => new ResumeSupervisor(r))(initialBehavior).narrow case r: Stop => - Behaviors.intercept[T, T](() => new StopSupervisor(initialBehavior, r))(initialBehavior) + // stateless so safe to share + Behaviors.intercept[Any, T](() => new StopSupervisor(initialBehavior, r))(initialBehavior).narrow } } } @@ -46,9 +48,8 @@ import akka.util.unused * INTERNAL API */ @InternalApi -private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: SupervisorStrategy)( - implicit ev: ClassTag[Thr]) - extends BehaviorInterceptor[O, I] { +private abstract class AbstractSupervisor[I, Thr <: Throwable](strategy: SupervisorStrategy)(implicit ev: ClassTag[Thr]) + extends BehaviorInterceptor[Any, I] { private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass @@ -57,18 +58,18 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = { other match { - case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass => true - case _ => false + case as: AbstractSupervisor[_, Thr] if throwableClass == as.throwableClass => true + case _ => false } } - override def aroundStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Behavior[I] = { + override def aroundStart(ctx: TypedActorContext[Any], target: PreStartTarget[I]): Behavior[I] = { try { target.start(ctx) } catch handleExceptionOnStart(ctx, target) } - def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = { + override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[I]): Behavior[I] = { try { target(ctx, signal) } catch handleSignalException(ctx, target) @@ -92,9 +93,9 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signalOrMessage, ctx.asScala.self)) } - protected def handleExceptionOnStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Catcher[Behavior[I]] - protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[I]): Catcher[Behavior[I]] - protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[I]): Catcher[Behavior[I]] + protected def handleExceptionOnStart(ctx: TypedActorContext[Any], target: PreStartTarget[I]): Catcher[Behavior[I]] + protected def handleSignalException(ctx: TypedActorContext[Any], target: SignalTarget[I]): Catcher[Behavior[I]] + protected def handleReceiveException(ctx: TypedActorContext[Any], target: ReceiveTarget[I]): Catcher[Behavior[I]] override def toString: String = Logging.simpleName(getClass) } @@ -103,32 +104,32 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe * For cases where O == I for BehaviorInterceptor. */ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) - extends AbstractSupervisor[T, T, Thr](ss) { + extends AbstractSupervisor[T, Thr](ss) { - override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = { + override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[T]): Behavior[T] = { try { - target(ctx, msg) + target(ctx, msg.asInstanceOf[T]) } catch handleReceiveException(ctx, target) } - protected def handleException(@unused ctx: TypedActorContext[T]): Catcher[Behavior[T]] = { + protected def handleException(@unused ctx: TypedActorContext[Any]): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => BehaviorImpl.failed(t) } // convenience if target not required to handle exception - protected def handleExceptionOnStart(ctx: TypedActorContext[T], target: PreStartTarget[T]): Catcher[Behavior[T]] = + protected def handleExceptionOnStart(ctx: TypedActorContext[Any], target: PreStartTarget[T]): Catcher[Behavior[T]] = handleException(ctx) - protected def handleSignalException(ctx: TypedActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = + protected def handleSignalException(ctx: TypedActorContext[Any], target: SignalTarget[T]): Catcher[Behavior[T]] = handleException(ctx) - protected def handleReceiveException(ctx: TypedActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] = + protected def handleReceiveException(ctx: TypedActorContext[Any], target: ReceiveTarget[T]): Catcher[Behavior[T]] = handleException(ctx) } private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { - override def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = { + override def handleException(ctx: TypedActorContext[Any]): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => log(ctx, t) BehaviorImpl.failed(t) @@ -136,7 +137,7 @@ private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Beh } private class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) { - override protected def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = { + override protected def handleException(ctx: TypedActorContext[Any]): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => log(ctx, t) t match { @@ -166,13 +167,13 @@ private object RestartSupervisor { } } - final case class ScheduledRestart(owner: RestartSupervisor[_, _, _ <: Throwable]) extends DeadLetterSuppression - final case class ResetRestartCount(current: Int, owner: RestartSupervisor[_, _, _ <: Throwable]) + final case class ScheduledRestart(owner: RestartSupervisor[_, _ <: Throwable]) extends DeadLetterSuppression + final case class ResetRestartCount(current: Int, owner: RestartSupervisor[_, _ <: Throwable]) extends DeadLetterSuppression } -private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff) - extends AbstractSupervisor[O, T, Thr](strategy) { +private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff) + extends AbstractSupervisor[T, Thr](strategy) { import RestartSupervisor._ private var restartingInProgress: OptionVal[(StashBuffer[Any], Set[ActorRef[Nothing]])] = OptionVal.None @@ -185,7 +186,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav case OptionVal.Some(d) => d.hasTimeLeft } - override def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[T]): Behavior[T] = { + override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[T]): Behavior[T] = { restartingInProgress match { case OptionVal.None => super.aroundSignal(ctx, signal, target) @@ -210,8 +211,8 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } } - override def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = { - msg.asInstanceOf[Any] match { + override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[T]): Behavior[T] = { + msg match { case ScheduledRestart(owner) => if (owner eq this) { restartingInProgress match { @@ -259,7 +260,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } override protected def handleExceptionOnStart( - ctx: TypedActorContext[O], + ctx: TypedActorContext[Any], @unused target: PreStartTarget[T]): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => ctx.asScala.cancelAllTimers() @@ -278,23 +279,23 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } override protected def handleSignalException( - ctx: TypedActorContext[O], + ctx: TypedActorContext[Any], target: SignalTarget[T]): Catcher[Behavior[T]] = { handleException(ctx, signalRestart = { - case e: UnstashException[O] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) - case _ => target(ctx, PreRestart) + case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) + case _ => target(ctx, PreRestart) }) } override protected def handleReceiveException( - ctx: TypedActorContext[O], + ctx: TypedActorContext[Any], target: ReceiveTarget[T]): Catcher[Behavior[T]] = { handleException(ctx, signalRestart = { - case e: UnstashException[O] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) - case _ => target.signalRestart(ctx) + case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) + case _ => target.signalRestart(ctx) }) } - private def handleException(ctx: TypedActorContext[O], signalRestart: Throwable => Unit): Catcher[Behavior[T]] = { + private def handleException(ctx: TypedActorContext[Any], signalRestart: Throwable => Unit): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => ctx.asScala.cancelAllTimers() if (strategy.maxRestarts != -1 && restartCount >= strategy.maxRestarts && deadlineHasTimeLeft) { @@ -315,7 +316,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } } - private def prepareRestart(ctx: TypedActorContext[O], reason: Throwable): Behavior[T] = { + private def prepareRestart(ctx: TypedActorContext[Any], reason: Throwable): Behavior[T] = { log(ctx, reason) val currentRestartCount = restartCount @@ -334,7 +335,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav val restartDelay = calculateDelay(currentRestartCount, backoff.minBackoff, backoff.maxBackoff, backoff.randomFactor) gotScheduledRestart = false - ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart(this)) + ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self, ScheduledRestart(this)) Behaviors.empty case _: Restart => if (childrenToStop.isEmpty) @@ -344,17 +345,14 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } } - private def restartCompleted(ctx: TypedActorContext[O]): Behavior[T] = { + private def restartCompleted(ctx: TypedActorContext[Any]): Behavior[T] = { // probably already done, but doesn't hurt to make sure they are canceled ctx.asScala.cancelAllTimers() strategy match { case backoff: Backoff => gotScheduledRestart = false - ctx.asScala.scheduleOnce( - backoff.resetBackoffAfter, - ctx.asScala.self.unsafeUpcast[Any], - ResetRestartCount(restartCount, this)) + ctx.asScala.scheduleOnce(backoff.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount, this)) case _: Restart => } @@ -364,12 +362,12 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav case OptionVal.None => newBehavior case OptionVal.Some((stashBuffer, _)) => restartingInProgress = OptionVal.None - stashBuffer.unstashAll(ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], newBehavior.unsafeCast) + stashBuffer.unstashAll(ctx.asScala, newBehavior.unsafeCast) } nextBehavior.narrow } catch handleException(ctx, signalRestart = { - case e: UnstashException[O] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) - case _ => () + case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) + case _ => () }) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala index 0d6c39e8e8..35aa728f5e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala @@ -7,8 +7,8 @@ package akka.actor.typed.internal import akka.actor.typed.internal.adapter.AbstractLogger import akka.actor.typed.{ Behavior, BehaviorInterceptor, Signal, TypedActorContext } import akka.annotation.InternalApi - import scala.collection.immutable.HashMap +import scala.reflect.ClassTag /** * INTERNAL API @@ -16,7 +16,7 @@ import scala.collection.immutable.HashMap @InternalApi private[akka] object WithMdcBehaviorInterceptor { val noMdcPerMessage = (_: Any) => Map.empty[String, Any] - def apply[T]( + def apply[T: ClassTag]( staticMdc: Map[String, Any], mdcForMessage: T => Map[String, Any], behavior: Behavior[T]): Behavior[T] = { @@ -31,7 +31,7 @@ import scala.collection.immutable.HashMap * * INTERNAL API */ -@InternalApi private[akka] final class WithMdcBehaviorInterceptor[T] private ( +@InternalApi private[akka] final class WithMdcBehaviorInterceptor[T: ClassTag] private ( staticMdc: Map[String, Any], mdcForMessage: T => Map[String, Any]) extends BehaviorInterceptor[T, T] { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala index 56044cb2c0..cb294e9266 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala @@ -9,7 +9,6 @@ import akka.actor.typed.BehaviorInterceptor import akka.actor.typed.Signal import akka.actor.typed.TypedActorContext import akka.actor.typed.scaladsl.AbstractBehavior -import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.StashBuffer import akka.annotation.InternalApi @@ -34,21 +33,17 @@ private[akka] final class GuardianStartupBehavior[T](val guardianBehavior: Behav import GuardianStartupBehavior.Start - private val stash = StashBuffer[T](1000) + private val stash = StashBuffer[Any](1000) override def onMessage(msg: Any): Behavior[Any] = msg match { case Start => // ctx is not available initially so we cannot use it until here - Behaviors.setup( - ctx => - stash - .unstashAll( - ctx.asInstanceOf[ActorContext[T]], - Behaviors.intercept(() => new GuardianStopInterceptor[T])(guardianBehavior)) - .unsafeCast[Any]) + Behaviors.setup(ctx => + stash + .unstashAll(ctx, Behaviors.intercept(() => new GuardianStopInterceptor)(guardianBehavior.unsafeCast[Any]))) case other => - stash.stash(other.asInstanceOf[T]) + stash.stash(other) this } @@ -61,24 +56,24 @@ private[akka] final class GuardianStartupBehavior[T](val guardianBehavior: Behav * as part of that we must intercept when the guardian is stopped and call ActorSystem.terminate() * explicitly. */ -@InternalApi private[akka] final class GuardianStopInterceptor[T] extends BehaviorInterceptor[T, T] { +@InternalApi private[akka] final class GuardianStopInterceptor extends BehaviorInterceptor[Any, Any] { override def aroundReceive( - ctx: TypedActorContext[T], - msg: T, - target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + ctx: TypedActorContext[Any], + msg: Any, + target: BehaviorInterceptor.ReceiveTarget[Any]): Behavior[Any] = { val next = target(ctx, msg) interceptStopped(ctx, next) } override def aroundSignal( - ctx: TypedActorContext[T], + ctx: TypedActorContext[Any], signal: Signal, - target: BehaviorInterceptor.SignalTarget[T]): Behavior[T] = { + target: BehaviorInterceptor.SignalTarget[Any]): Behavior[Any] = { val next = target(ctx, signal) interceptStopped(ctx, next) } - private def interceptStopped(ctx: TypedActorContext[T], next: Behavior[T]): Behavior[T] = { + private def interceptStopped(ctx: TypedActorContext[Any], next: Behavior[Any]): Behavior[Any] = { if (Behavior.isAlive(next)) next else { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala index c3dc2f8039..e3f6916faf 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala @@ -182,9 +182,15 @@ object Behaviors { * monitor [[akka.actor.typed.ActorRef]] before invoking the wrapped behavior. The * wrapped behavior can evolve (i.e. return different behavior) without needing to be * wrapped in a `monitor` call again. + * + * @param interceptMessageClass Ensures that the messages of this class or a subclass thereof will be + * sent to the `monitor`. Other message types (e.g. a private protocol) + * will bypass the interceptor and be continue to the inner behavior. + * @param monitor The messages will also be sent to this `ActorRef` + * @param behavior The inner behavior that is decorated */ - def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = - scaladsl.Behaviors.monitor(monitor, behavior) + def monitor[T](interceptMessageClass: Class[T], monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = + scaladsl.Behaviors.monitor(monitor, behavior)(ClassTag(interceptMessageClass)) /** * Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided @@ -269,6 +275,9 @@ object Behaviors { * }}} * * + * @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass + * the interceptor and be continue to the inner behavior untouched. * @param behavior * the behavior that will receive the selected messages * @param selector @@ -276,8 +285,11 @@ object Behaviors { * transformation * @return a behavior of the widened type */ - def widened[T, U](behavior: Behavior[T], selector: JFunction[PFBuilder[U, T], PFBuilder[U, T]]): Behavior[U] = - BehaviorImpl.widened(behavior, selector.apply(new PFBuilder).build()) + def widened[Outer, Inner]( + interceptMessageClass: Class[Outer], + behavior: Behavior[Outer], + selector: JFunction[PFBuilder[Inner, Outer], PFBuilder[Inner, Outer]]): Behavior[Inner] = + BehaviorImpl.widened(behavior, selector.apply(new PFBuilder).build())(ClassTag(interceptMessageClass)) /** * Support for scheduled `self` messages in an actor. @@ -292,6 +304,9 @@ object Behaviors { /** * Per message MDC (Mapped Diagnostic Context) logging. * + * @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass + * the interceptor and be continue to the inner behavior untouched. * @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after * each message processing by the inner behavior is done. * @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through @@ -300,21 +315,28 @@ object Behaviors { * See also [[akka.actor.typed.Logger.withMdc]] */ def withMdc[T]( + interceptMessageClass: Class[T], mdcForMessage: akka.japi.function.Function[T, java.util.Map[String, Any]], behavior: Behavior[T]): Behavior[T] = - withMdc(Collections.emptyMap[String, Any], mdcForMessage, behavior) + withMdc(interceptMessageClass, Collections.emptyMap[String, Any], mdcForMessage, behavior) /** * Static MDC (Mapped Diagnostic Context) * + * @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass + * the interceptor and be continue to the inner behavior untouched. * @param staticMdc This MDC is setup in the logging context for every message * @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through * `ActorContext.log` * * See also [[akka.actor.typed.Logger.withMdc]] */ - def withMdc[T](staticMdc: java.util.Map[String, Any], behavior: Behavior[T]): Behavior[T] = - withMdc(staticMdc, null, behavior) + def withMdc[T]( + interceptMessageClass: Class[T], + staticMdc: java.util.Map[String, Any], + behavior: Behavior[T]): Behavior[T] = + withMdc(interceptMessageClass, staticMdc, null, behavior) /** * Combination of static and per message MDC (Mapped Diagnostic Context). @@ -325,6 +347,9 @@ object Behaviors { * * * The `staticMdc` or `mdcForMessage` may be empty. * + * @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass + * the interceptor and be continue to the inner behavior untouched. * @param staticMdc A static MDC applied for each message * @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after * each message processing by the inner behavior is done. @@ -334,6 +359,7 @@ object Behaviors { * See also [[akka.actor.typed.Logger.withMdc]] */ def withMdc[T]( + interceptMessageClass: Class[T], staticMdc: java.util.Map[String, Any], mdcForMessage: akka.japi.function.Function[T, java.util.Map[String, Any]], behavior: Behavior[T]): Behavior[T] = { @@ -349,7 +375,7 @@ object Behaviors { asScalaMap(mdcForMessage.apply(message)) } - WithMdcBehaviorInterceptor[T](asScalaMap(staticMdc), mdcForMessageFun, behavior) + WithMdcBehaviorInterceptor[T](asScalaMap(staticMdc), mdcForMessageFun, behavior)(ClassTag(interceptMessageClass)) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index 3ab7e8b204..f86e53783b 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -156,8 +156,15 @@ object Behaviors { * monitor [[akka.actor.typed.ActorRef]] before invoking the wrapped behavior. The * wrapped behavior can evolve (i.e. return different behavior) without needing to be * wrapped in a `monitor` call again. + * + * The `ClassTag` for `T` ensures that the messages of this class or a subclass thereof will be + * sent to the `monitor`. Other message types (e.g. a private protocol) will bypass the interceptor + * and be continue to the inner behavior. + * + * @param monitor The messages will also be sent to this `ActorRef` + * @param behavior The inner behavior that is decorated */ - def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = + def monitor[T: ClassTag](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = BehaviorImpl.intercept(() => new MonitorInterceptor[T](monitor))(behavior) /** @@ -166,7 +173,7 @@ object Behaviors { * To include an MDC context then first wrap `logMessages` with `withMDC`. */ def logMessages[T](behavior: Behavior[T]): Behavior[T] = - BehaviorImpl.intercept(() => new LogMessagesInterceptor[T](LogOptions()))(behavior) + BehaviorImpl.intercept(() => LogMessagesInterceptor[T](LogOptions()))(behavior) /** * Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided @@ -174,7 +181,7 @@ object Behaviors { * To include an MDC context then first wrap `logMessages` with `withMDC`. */ def logMessages[T](logOptions: LogOptions, behavior: Behavior[T]): Behavior[T] = - BehaviorImpl.intercept(() => new LogMessagesInterceptor[T](logOptions))(behavior) + BehaviorImpl.intercept(() => LogMessagesInterceptor[T](logOptions))(behavior) /** * Wrap the given behavior with the given [[SupervisorStrategy]] for @@ -227,6 +234,10 @@ object Behaviors { /** * Per message MDC (Mapped Diagnostic Context) logging. * + * The `ClassTag` for `T` ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass the interceptor and be + * continue to the inner behavior untouched. + * * @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after * each message processing by the inner behavior is done. * @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through @@ -234,19 +245,23 @@ object Behaviors { * * See also [[akka.actor.typed.Logger.withMdc]] */ - def withMdc[T](mdcForMessage: T => Map[String, Any])(behavior: Behavior[T]): Behavior[T] = + def withMdc[T: ClassTag](mdcForMessage: T => Map[String, Any])(behavior: Behavior[T]): Behavior[T] = withMdc[T](Map.empty[String, Any], mdcForMessage)(behavior) /** * Static MDC (Mapped Diagnostic Context) * + * The `ClassTag` for `T` ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass the interceptor and be + * continue to the inner behavior untouched. + * * @param staticMdc This MDC is setup in the logging context for every message * @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through * `ActorContext.log` * * See also [[akka.actor.typed.Logger.withMdc]] */ - def withMdc[T](staticMdc: Map[String, Any])(behavior: Behavior[T]): Behavior[T] = + def withMdc[T: ClassTag](staticMdc: Map[String, Any])(behavior: Behavior[T]): Behavior[T] = withMdc[T](staticMdc, (_: T) => Map.empty[String, Any])(behavior) /** @@ -258,6 +273,10 @@ object Behaviors { * * The `staticMdc` or `mdcForMessage` may be empty. * + * The `ClassTag` for `T` ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass the interceptor and be + * continue to the inner behavior untouched. + * * @param staticMdc A static MDC applied for each message * @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after * each message processing by the inner behavior is done. @@ -266,7 +285,7 @@ object Behaviors { * * See also [[akka.actor.typed.Logger.withMdc]] */ - def withMdc[T](staticMdc: Map[String, Any], mdcForMessage: T => Map[String, Any])( + def withMdc[T: ClassTag](staticMdc: Map[String, Any], mdcForMessage: T => Map[String, Any])( behavior: Behavior[T]): Behavior[T] = WithMdcBehaviorInterceptor[T](staticMdc, mdcForMessage, behavior) diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 3193b2e745..4e70180d6d 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -387,7 +387,9 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible * `ActorContext` parameter removed in `javadsl.ReceiveBuilder` for the functional style in Java. Use `Behaviors.setup` to retrieve `ActorContext`, and use an enclosing class to hold initialization parameters and `ActorContext`. * Java @apidoc[akka.cluster.sharding.typed.javadsl.EntityRef] ask timeout now takes a `java.time.Duration` rather than a @apidoc[Timeout] - +* `BehaviorInterceptor`, `Behaviors.monitor`, `Behaviors.withMdc` and @scala[`widen`]@java[`Behaviors.widen`] takes + a @scala[`ClassTag` parameter (probably source compatible)]@java[`interceptMessageClass` parameter]. + `interceptMessageType` method in `BehaviorInterceptor` is replaced with this @scala[`ClassTag`]@java[`Class`] parameter. #### Akka Typed Stream API changes diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 722cda35d5..c7c0da860b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -133,10 +133,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( val onStopInterceptor = new BehaviorInterceptor[Any, Any] { import BehaviorInterceptor._ - def aroundReceive(ctx: typed.TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any]) + override def aroundReceive(ctx: typed.TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any]) : Behavior[Any] = { target(ctx, msg) } - def aroundSignal(ctx: typed.TypedActorContext[Any], signal: Signal, target: SignalTarget[Any]) + override def aroundSignal(ctx: typed.TypedActorContext[Any], signal: Signal, target: SignalTarget[Any]) : Behavior[Any] = { if (signal == PostStop) { eventSourcedSetup.cancelRecoveryTimer() diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 9a9ab29368..8dc745807f 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -493,12 +493,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { TestProbe interceptProbe = testKit.createTestProbe(); TestProbe signalProbe = testKit.createTestProbe(); BehaviorInterceptor tap = - new BehaviorInterceptor() { - - @Override - public Class interceptMessageType() { - return Command.class; - } + new BehaviorInterceptor(Command.class) { @Override public Behavior aroundReceive( diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorInterceptorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorInterceptorSpec.scala new file mode 100644 index 0000000000..b3da47a87e --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorInterceptorSpec.scala @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.BehaviorInterceptor +import akka.actor.typed.TypedActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.typed.PersistenceId +import akka.testkit.EventFilter +import akka.testkit.TestEvent.Mute +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object EventSourcedBehaviorInterceptorSpec { + + val journalId = "event-sourced-behavior-interceptor-spec" + + def config: Config = ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.loggers = [akka.testkit.TestEventListener] + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """) + + def testBehavior(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] = + Behaviors.setup { _ => + EventSourcedBehavior[String, String, String]( + persistenceId, + emptyState = "", + commandHandler = (_, command) => + command match { + case _ => + Effect.persist(command).thenRun(newState => probe ! newState) + }, + eventHandler = (state, evt) => state + evt) + } + +} + +class EventSourcedBehaviorInterceptorSpec + extends ScalaTestWithActorTestKit(EventSourcedBehaviorTimersSpec.config) + with WordSpecLike { + + import EventSourcedBehaviorInterceptorSpec._ + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") + + import akka.actor.typed.scaladsl.adapter._ + // needed for the untyped event filter + private implicit val untypedSystem: akka.actor.ActorSystem = system.toUntyped + + untypedSystem.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1))) + + "EventSourcedBehavior interceptor" must { + + "be possible to combine with another interceptor" in { + val probe = createTestProbe[String]() + val pid = nextPid() + + val toUpper = new BehaviorInterceptor[String, String] { + override def aroundReceive( + ctx: TypedActorContext[String], + msg: String, + target: BehaviorInterceptor.ReceiveTarget[String]): Behavior[String] = { + target(ctx, msg.toUpperCase()) + } + + } + + val ref = spawn(Behaviors.intercept(() => toUpper)(testBehavior(pid, probe.ref))) + + ref ! "a" + ref ! "bc" + probe.expectMessage("A") + probe.expectMessage("ABC") + } + + "be possible to combine with widen" in { + // EventSourcedBehaviorImpl should use a plain BehaviorInterceptor instead of widen + pending // FIXME #25887 + val probe = createTestProbe[String]() + val pid = nextPid() + val ref = spawn(testBehavior(pid, probe.ref).widen[String] { + case s => s.toUpperCase() + }) + + ref ! "a" + ref ! "bc" + probe.expectMessage("A") + probe.expectMessage("ABC") + } + + "be possible to combine with MDC" in { + val probe = createTestProbe[String]() + val pid = nextPid() + val ref = spawn(Behaviors.setup[String] { _ => + Behaviors + .withMdc(staticMdc = Map("pid" -> pid), mdcForMessage = (msg: String) => Map("msg" -> msg.toUpperCase())) { + testBehavior(pid, probe.ref) + } + }) + + ref ! "a" + ref ! "bc" + probe.expectMessage("a") + probe.expectMessage("abc") + + } + + } +}