diff --git a/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java b/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java index e5994ef93d..56bebd2712 100644 --- a/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java +++ b/akka-actor-testkit-typed/src/test/java/jdocs/akka/actor/testkit/typed/javadsl/AsyncTestingExampleTest.java @@ -167,7 +167,7 @@ public class AsyncTestingExampleTest }); TestProbe probe = testKit.createTestProbe(); ActorRef mockedPublisher = - testKit.spawn(Behaviors.monitor(probe.ref(), mockedBehavior)); + testKit.spawn(Behaviors.monitor(Message.class, probe.ref(), mockedBehavior)); // test our component Producer producer = new Producer(testKit.scheduler(), mockedPublisher); diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java index be9e98ef83..344695360e 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java @@ -41,7 +41,7 @@ public class ActorCompile { Behavior actor6 = intercept( () -> - new BehaviorInterceptor() { + new BehaviorInterceptor(MyMsg.class) { @Override public Behavior aroundReceive( TypedActorContext context, MyMsg message, ReceiveTarget target) { @@ -60,9 +60,9 @@ public class ActorCompile { setup( context -> { final ActorRef self = context.getSelf(); - return monitor(self, ignore()); + return monitor(MyMsg.class, self, ignore()); }); - Behavior actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x)); + Behavior actor9 = widened(MyMsgA.class, actor7, pf -> pf.match(MyMsgA.class, x -> x)); Behavior actor10 = Behaviors.receive((context, message) -> stopped(() -> {}), (context, signal) -> same()); diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java index 5a60e48c32..5031871439 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java @@ -21,7 +21,7 @@ public class InterceptTest extends JUnitSuite { public void interceptMessage() { final TestProbe interceptProbe = testKit.createTestProbe(); BehaviorInterceptor interceptor = - new BehaviorInterceptor() { + new BehaviorInterceptor(String.class) { @Override public Behavior aroundReceive( TypedActorContext ctx, String msg, ReceiveTarget target) { @@ -59,15 +59,10 @@ public class InterceptTest extends JUnitSuite { static class B implements Message {} @Test - public void interceptMessagesSelectively() { + public void interceptMessageSubclasses() { final TestProbe interceptProbe = testKit.createTestProbe(); BehaviorInterceptor interceptor = - new BehaviorInterceptor() { - - @Override - public Class interceptMessageType() { - return B.class; - } + new BehaviorInterceptor(Message.class) { @Override public Behavior aroundReceive( @@ -96,6 +91,7 @@ public class InterceptTest extends JUnitSuite { ref.tell(new A()); ref.tell(new B()); + interceptProbe.expectMessageClass(A.class); probe.expectMessageClass(A.class); interceptProbe.expectMessageClass(B.class); probe.expectMessageClass(B.class); diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala index 8fdc701048..d58bbc02e1 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala @@ -73,10 +73,10 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(""" import ActorSpecMessages._ - def decoration[T]: Behavior[T] => Behavior[T] + def decoration[T: ClassTag]: Behavior[T] => Behavior[T] implicit class BehaviorDecorator[T](behavior: Behavior[T])(implicit ev: ClassTag[T]) { - def decorate: Behavior[T] = decoration[T](behavior) + def decorate: Behavior[T] = decoration[T](ev)(behavior) } "An ActorContext" must { @@ -680,33 +680,31 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit(""" class NormalActorContextSpec extends ActorContextSpec { - override def decoration[T] = x => x + override def decoration[T: ClassTag] = x => x } class WidenActorContextSpec extends ActorContextSpec { - override def decoration[T] = b => b.widen { case x => x } + override def decoration[T: ClassTag] = b => b.widen { case x => x } } class DeferredActorContextSpec extends ActorContextSpec { - override def decoration[T] = b => Behaviors.setup(_ => b) + override def decoration[T: ClassTag] = b => Behaviors.setup(_ => b) } class NestedDeferredActorContextSpec extends ActorContextSpec { - override def decoration[T] = b => Behaviors.setup(_ => Behaviors.setup(_ => b)) + override def decoration[T: ClassTag] = b => Behaviors.setup(_ => Behaviors.setup(_ => b)) } class InterceptActorContextSpec extends ActorContextSpec { import BehaviorInterceptor._ - def tap[T] = new BehaviorInterceptor[T, T] { + def tap[T: ClassTag] = new BehaviorInterceptor[T, T] { override def aroundReceive(context: TypedActorContext[T], message: T, target: ReceiveTarget[T]): Behavior[T] = target(context, message) - override def aroundSignal(context: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = - target(context, signal) } - override def decoration[T]: Behavior[T] => Behavior[T] = b => Behaviors.intercept[T, T](() => tap)(b) + override def decoration[T: ClassTag]: Behavior[T] => Behavior[T] = b => Behaviors.intercept[T, T](() => tap)(b) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala index 018f1794af..d083969ac4 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala @@ -589,7 +589,7 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable { class WidenedJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with Siphon { override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Command]("widenedListener") - JBehaviors.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x => { + JBehaviors.widened(classOf[Command], super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x => { inbox.ref ! x x })))) -> inbox diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala index f99a57f01b..dea99b7b0d 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala @@ -10,10 +10,12 @@ import akka.testkit.EventFilter import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.scaladsl.Behaviors import org.scalatest.WordSpecLike - import scala.concurrent.duration._ + import akka.actor.ActorInitializationException import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.internal.PoisonPill +import akka.actor.typed.internal.PoisonPillInterceptor object InterceptSpec { final case class Msg(hello: String, replyTo: ActorRef[String]) @@ -28,16 +30,48 @@ object InterceptSpec { target(context, message) } - override def aroundSignal( - context: TypedActorContext[String], - signal: Signal, - target: SignalTarget[String]): Behavior[String] = { - target(context, signal) - } - override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other.isInstanceOf[SameTypeInterceptor] } + + // This is similar to how EventSourcedBehavior is implemented + object MultiProtocol { + final case class ExternalResponse(s: String) + final case class Command(s: String) + + sealed trait InternalProtocol + object InternalProtocol { + final case class WrappedCommand(c: Command) extends InternalProtocol + final case class WrappedExternalResponse(r: ExternalResponse) extends InternalProtocol + } + + private class ProtocolTransformer extends BehaviorInterceptor[Any, InternalProtocol] { + override def aroundReceive( + ctx: TypedActorContext[Any], + msg: Any, + target: BehaviorInterceptor.ReceiveTarget[InternalProtocol]): Behavior[InternalProtocol] = { + val wrapped = msg match { + case c: Command => InternalProtocol.WrappedCommand(c) + case r: ExternalResponse => InternalProtocol.WrappedExternalResponse(r) + } + target(ctx, wrapped) + } + + } + + def apply(probe: ActorRef[String]): Behavior[Command] = { + Behaviors + .intercept(() => new ProtocolTransformer)(Behaviors.receiveMessage[InternalProtocol] { + case InternalProtocol.WrappedCommand(cmd) => + probe ! cmd.s + Behaviors.same + case InternalProtocol.WrappedExternalResponse(rsp) => + probe ! rsp.s + Behaviors.same + }) + .narrow + } + } } class InterceptSpec extends ScalaTestWithActorTestKit(""" @@ -60,14 +94,6 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" probe ! ("after " + message) b } - - override def aroundSignal( - context: TypedActorContext[String], - signal: Signal, - target: SignalTarget[String]): Behavior[String] = { - target(context, signal) - } - // keeping the instance equality as "isSame" for these } @@ -187,12 +213,6 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" message: String, target: ReceiveTarget[String]): Behavior[String] = target(context, message) - - def aroundSignal( - context: TypedActorContext[String], - signal: Signal, - target: SignalTarget[String]): Behavior[String] = - target(context, signal) } val innerBehaviorStarted = new AtomicBoolean(false) @@ -276,7 +296,30 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" } - "be useful for implementing PoisonPill" in { + "be useful for implementing signal based PoisonPill" in { + + def inner(count: Int): Behavior[Msg] = Behaviors.receiveMessage { + case Msg(hello, replyTo) => + replyTo ! s"$hello-$count" + inner(count + 1) + } + + val decorated: Behavior[Msg] = + Behaviors.intercept(() => new PoisonPillInterceptor[Msg])(inner(0)) + + val ref = spawn(decorated) + val probe = TestProbe[String]() + ref ! Msg("hello", probe.ref) + probe.expectMessage("hello-0") + ref ! Msg("hello", probe.ref) + probe.expectMessage("hello-1") + + ref.unsafeUpcast[Any] ! PoisonPill + + probe.expectTerminated(ref, probe.remainingOrDefault) + } + + "be useful for implementing custom message based PoisonPill" in { def inner(count: Int): Behavior[Msg] = Behaviors.receiveMessage { case Msg(hello, replyTo) => @@ -295,12 +338,6 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" case _ => Behaviors.unhandled } - override def aroundSignal( - context: TypedActorContext[Any], - signal: Signal, - target: SignalTarget[Msg]): Behavior[Msg] = - target.apply(context, signal) - } val decorated: Behavior[Msg] = @@ -318,34 +355,28 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" probe.expectTerminated(ref, probe.remainingOrDefault) } - "be able to intercept a subset of the messages" in { + "be able to intercept message subclasses" in { trait Message class A extends Message class B extends Message val interceptProbe = TestProbe[Message]() - val partialInterceptor: BehaviorInterceptor[Message, Message] = new BehaviorInterceptor[Message, Message] { + val interceptor: BehaviorInterceptor[Message, Message] = + new BehaviorInterceptor[Message, Message] { - override def interceptMessageType = classOf[B] + override def aroundReceive( + ctx: TypedActorContext[Message], + msg: Message, + target: ReceiveTarget[Message]): Behavior[Message] = { + interceptProbe.ref ! msg + target(ctx, msg) + } - override def aroundReceive( - ctx: TypedActorContext[Message], - msg: Message, - target: ReceiveTarget[Message]): Behavior[Message] = { - interceptProbe.ref ! msg - target(ctx, msg) } - override def aroundSignal( - ctx: TypedActorContext[Message], - signal: Signal, - target: SignalTarget[Message]): Behavior[Message] = - target(ctx, signal) - } - val probe = TestProbe[Message]() - val ref = spawn(Behaviors.intercept(() => partialInterceptor)(Behaviors.receiveMessage { msg => + val ref = spawn(Behaviors.intercept(() => interceptor)(Behaviors.receiveMessage { msg => probe.ref ! msg Behaviors.same })) @@ -353,6 +384,7 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" ref ! new A ref ! new B + interceptProbe.expectMessageType[A] probe.expectMessageType[A] interceptProbe.expectMessageType[B] probe.expectMessageType[B] @@ -360,14 +392,8 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" "intercept PostStop" in { val probe = TestProbe[String]() - val postStopInterceptor = new BehaviorInterceptor[String, String] { - def aroundReceive( - ctx: TypedActorContext[String], - msg: String, - target: ReceiveTarget[String]): Behavior[String] = { - target(ctx, msg) - } - def aroundSignal( + val postStopInterceptor = new BehaviorSignalInterceptor[String] { + override def aroundSignal( ctx: TypedActorContext[String], signal: Signal, target: SignalTarget[String]): Behavior[String] = { @@ -420,4 +446,71 @@ class InterceptSpec extends ScalaTestWithActorTestKit(""" } } + "Protocol transformer interceptor" must { + import MultiProtocol._ + + "be possible to combine with another interceptor" in { + val probe = createTestProbe[String]() + + val toUpper = new BehaviorInterceptor[Command, Command] { + override def aroundReceive( + ctx: TypedActorContext[Command], + msg: Command, + target: BehaviorInterceptor.ReceiveTarget[Command]): Behavior[Command] = { + target(ctx, Command(msg.s.toUpperCase())) + } + + } + + val ref = spawn(Behaviors.intercept(() => toUpper)(MultiProtocol(probe.ref))) + + ref ! Command("a") + probe.expectMessage("A") + ref.unsafeUpcast ! ExternalResponse("b") + probe.expectMessage("b") // bypass toUpper interceptor + } + + "be possible to combine with widen" in { + val probe = createTestProbe[String]() + val ref = spawn(MultiProtocol(probe.ref).widen[String] { + case s => Command(s.toUpperCase()) + }) + + ref ! "a" + probe.expectMessage("A") + ref.unsafeUpcast ! ExternalResponse("b") + probe.expectMessage("b") // bypass widen interceptor + } + + "be possible to combine with MDC" in { + val probe = createTestProbe[String]() + val ref = spawn(Behaviors.setup[Command] { _ => + Behaviors.withMdc(staticMdc = Map("x" -> "y"), mdcForMessage = (msg: Command) => { + probe.ref ! s"mdc:${msg.s.toUpperCase()}" + Map("msg" -> msg.s.toUpperCase()) + }) { + MultiProtocol(probe.ref) + } + }) + + ref ! Command("a") + probe.expectMessage("mdc:A") + probe.expectMessage("a") + ref.unsafeUpcast ! ExternalResponse("b") + probe.expectMessage("b") // bypass mdc interceptor + + } + + "be possible to combine with PoisonPillInterceptor" in { + val probe = createTestProbe[String]() + val ref = + spawn(Behaviors.intercept(() => new PoisonPillInterceptor[MultiProtocol.Command])(MultiProtocol(probe.ref))) + + ref ! Command("a") + probe.expectMessage("a") + ref.unsafeUpcast ! PoisonPill + probe.expectTerminated(ref, probe.remainingOrDefault) + } + } + } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala index 16a1d0e90c..375e71eeb7 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/LogMessagesSpec.scala @@ -106,5 +106,15 @@ class LogMessagesSpec extends ScalaTestWithActorTestKit(""" } } + "log messages of different type" in { + val behavior: Behavior[String] = Behaviors.logMessages(Behaviors.ignore[String]) + + val ref = spawn(behavior) + + EventFilter.debug("received message 13", source = ref.path.toString, occurrences = 1).intercept { + ref.unsafeUpcast[Any] ! 13 + } + } + } } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala index 863e1c090e..136cc0078d 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/SupervisionSpec.scala @@ -1084,12 +1084,6 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" message: String, target: ReceiveTarget[String]): Behavior[String] = target(context, message) - - override def aroundSignal( - context: TypedActorContext[String], - signal: Signal, - target: SignalTarget[String]): Behavior[String] = - target(context, signal) } val behv = supervise[String](Behaviors.receiveMessage { @@ -1235,6 +1229,36 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(""" } } + "handle exceptions from different message type" in { + val probe = TestProbe[Event]("evt") + + val inner: Behavior[Command] = Behaviors + .receiveMessage[Any] { + case Ping(n) => + probe.ref ! Pong(n) + Behaviors.same + case _ => throw new Exc1 + } + .receiveSignal { + case (_, PreRestart) => + probe.ref ! ReceivedSignal(PreRestart) + Behaviors.same + } + .narrow + + val behv = Behaviors.supervise(inner).onFailure[Exc1](SupervisorStrategy.restart) + val ref = spawn(behv) + ref ! Ping(1) + probe.expectMessage(Pong(1)) + + EventFilter[Exc1](occurrences = 1).intercept { + ref.unsafeUpcast ! "boom" + probe.expectMessage(ReceivedSignal(PreRestart)) + } + ref ! Ping(2) + probe.expectMessage(Pong(2)) + } + } val allStrategies = Seq( diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala index 5013c22db3..e3ef465798 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/StopSpec.scala @@ -74,13 +74,6 @@ class StopSpec extends ScalaTestWithActorTestKit with WordSpecLike { target: ReceiveTarget[AnyRef]): Behavior[AnyRef] = { target(context, message) } - - override def aroundSignal( - context: typed.TypedActorContext[AnyRef], - signal: Signal, - target: SignalTarget[AnyRef]): Behavior[AnyRef] = { - target(context, signal) - } })(Behaviors.stopped { () => probe.ref ! Done }) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 276c441fce..e89068c5aa 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -6,6 +6,7 @@ package akka.actor.typed import scala.annotation.switch import scala.annotation.tailrec +import scala.reflect.ClassTag import akka.actor.InvalidMessageException import akka.actor.typed.internal.BehaviorImpl @@ -117,7 +118,7 @@ abstract class ExtensibleBehavior[T] extends Behavior[T](BehaviorTags.Extensible object Behavior { - final implicit class BehaviorDecorators[T](val behavior: Behavior[T]) extends AnyVal { + final implicit class BehaviorDecorators[Inner](val behavior: Behavior[Inner]) extends AnyVal { /** * Widen the wrapped Behavior by placing a funnel in front of it: the supplied @@ -134,8 +135,12 @@ object Behavior { * } * }}} * + * The `ClassTag` for `Outer` ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass + * the interceptor and be continue to the inner behavior untouched. + * */ - def widen[U](matcher: PartialFunction[U, T]): Behavior[U] = + def widen[Outer: ClassTag](matcher: PartialFunction[Outer, Inner]): Behavior[Outer] = BehaviorImpl.widened(behavior, matcher) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala index 6c2defed72..c0ed84aab7 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala @@ -4,32 +4,46 @@ package akka.actor.typed +import scala.reflect.ClassTag + import akka.annotation.{ DoNotInherit, InternalApi } +import akka.util.BoxedType /** * A behavior interceptor allows for intercepting message and signal reception and perform arbitrary logic - - * transform, filter, send to a side channel etc. It is the core API for decoration of behaviors. Many built-in - * intercepting behaviors are provided through factories in the respective `Behaviors`. + * transform, filter, send to a side channel etc. It is the core API for decoration of behaviors. * - * If the interceptor does keep mutable state care must be taken to create the instance in a `setup` block - * so that a new instance is created per spawned actor rather than shared among actor instance. + * The `BehaviorInterceptor` API is considered a low level tool for building other features and + * shouldn't be used for "normal" application logic. Several built-in intercepting behaviors + * are provided through factories in the respective `Behaviors`. * - * @tparam O The outer message type – the type of messages the intercepting behavior will accept - * @tparam I The inner message type - the type of message the wrapped behavior accepts + * If the interceptor does keep mutable state care must be taken to create a new instance from + * the factory function of `Behaviors.intercept` so that a new instance is created per spawned + * actor rather than shared among actor instance. + * + * @param interceptMessageClass Ensures that the interceptor will only receive `O` message types. + * If the message is not of this class or a subclass thereof + * (e.g. a private protocol) will bypass the interceptor and be + * continue to the inner behavior untouched. + * + * @tparam Outer The outer message type – the type of messages the intercepting behavior will accept + * @tparam Inner The inner message type - the type of message the wrapped behavior accepts + * + * @see [[BehaviorSignalInterceptor]] */ -abstract class BehaviorInterceptor[O, I] { +abstract class BehaviorInterceptor[Outer, Inner](val interceptMessageClass: Class[Outer]) { import BehaviorInterceptor._ /** - * Allows for applying the interceptor only to certain message types. Useful if the official protocol and the actual - * protocol of an actor causes problems, for example class cast exceptions for a message not of type `O` that - * the actor still knows how to deal with. Note that this is only possible to use when `O` and `I` are the same type. - * - * @return A subtype of `O` that should be intercepted or `null` to intercept all `O`s. - * Subtypes of `O` matching this are passed directly to the inner behavior without interception. + * Scala API: The `ClassTag` for `Outer` ensures that only messages of this class or a subclass + * thereof will be intercepted. Other message types (e.g. a private protocol) will bypass the + * interceptor and be continue to the inner behavior untouched. */ - // null for all to avoid having to deal with class tag/explicit class in the default case of no filter - def interceptMessageType: Class[_ <: O] = null + def this()(implicit interceptMessageClassTag: ClassTag[Outer]) = + this({ + val runtimeClass = interceptMessageClassTag.runtimeClass + (if (runtimeClass eq null) runtimeClass else BoxedType(runtimeClass)).asInstanceOf[Class[Outer]] + }) /** * Override to intercept actor startup. To trigger startup of @@ -37,7 +51,7 @@ abstract class BehaviorInterceptor[O, I] { * @return The returned behavior will be the "started" behavior of the actor used to accept * the next message or signal. */ - def aroundStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Behavior[I] = + def aroundStart(ctx: TypedActorContext[Outer], target: PreStartTarget[Inner]): Behavior[Inner] = target.start(ctx) /** @@ -47,15 +61,18 @@ abstract class BehaviorInterceptor[O, I] { * * @return The behavior for next message or signal */ - def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[I]): Behavior[I] + def aroundReceive(ctx: TypedActorContext[Outer], msg: Outer, target: ReceiveTarget[Inner]): Behavior[Inner] /** - * Intercept a signal sent to the running actor. Pass the signal on to the next behavior + * Override to intercept a signal sent to the running actor. Pass the signal on to the next behavior * in the stack by passing it to `target.apply`. * * @return The behavior for next message or signal + * + * @see [[BehaviorSignalInterceptor]] */ - def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] + def aroundSignal(ctx: TypedActorContext[Outer], signal: Signal, target: SignalTarget[Inner]): Behavior[Inner] = + target(ctx, signal) /** * @return `true` if this behavior logically the same as another behavior interceptor and can therefore be eliminated @@ -110,3 +127,43 @@ object BehaviorInterceptor { } } + +/** + * A behavior interceptor allows for intercepting signals reception and perform arbitrary logic - + * transform, filter, send to a side channel etc. + * + * The `BehaviorSignalInterceptor` API is considered a low level tool for building other features and + * shouldn't be used for "normal" application logic. Several built-in intercepting behaviors + * are provided through factories in the respective `Behaviors`. + * + * If the interceptor does keep mutable state care must be taken to create a new instance from + * the factory function of `Behaviors.intercept` so that a new instance is created per spawned + * actor rather than shared among actor instance. + * + * @tparam Inner The inner message type - the type of message the wrapped behavior accepts + * + * @see [[BehaviorInterceptor]] + */ +abstract class BehaviorSignalInterceptor[Inner] extends BehaviorInterceptor[Inner, Inner](null) { + import BehaviorInterceptor._ + + /** + * Only signals and not messages are intercepted by `BehaviorSignalInterceptor`. + */ + final override def aroundReceive( + ctx: TypedActorContext[Inner], + msg: Inner, + target: ReceiveTarget[Inner]): Behavior[Inner] = { + // by using `null` as interceptMessageClass of `BehaviorInterceptor` no messages will pass here + throw new IllegalStateException(s"Unexpected message in ${getClass.getName}, it should only intercept signals.") + } + + /** + * Intercept a signal sent to the running actor. Pass the signal on to the next behavior + * in the stack by passing it to `target.apply`. + * + * @return The behavior for next message or signal + */ + override def aroundSignal(ctx: TypedActorContext[Inner], signal: Signal, target: SignalTarget[Inner]): Behavior[Inner] + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index ef6ee879ff..8fa0873813 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -5,6 +5,8 @@ package akka.actor.typed package internal +import scala.reflect.ClassTag + import akka.util.LineNumbers import akka.annotation.InternalApi import akka.actor.typed.{ TypedActorContext => AC } @@ -40,7 +42,7 @@ private[akka] object BehaviorTags { def as[U]: AC[U] = ctx.asInstanceOf[AC[U]] } - def widened[O, I](behavior: Behavior[I], matcher: PartialFunction[O, I]): Behavior[O] = + def widened[O: ClassTag, I](behavior: Behavior[I], matcher: PartialFunction[O, I]): Behavior[O] = intercept(() => WidenedInterceptor(matcher))(behavior) def same[T]: Behavior[T] = SameBehavior.unsafeCast[T] diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala index c0f116fced..8bb0934c97 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala @@ -4,6 +4,8 @@ package akka.actor.typed.internal +import scala.reflect.ClassTag + import akka.actor.typed import akka.actor.typed.scaladsl.Behaviors @@ -74,9 +76,10 @@ private[akka] final class InterceptorImpl[O, I]( new InterceptorImpl(interceptor, newNested) override def receive(ctx: typed.TypedActorContext[O], msg: O): Behavior[O] = { - val interceptMessageType = interceptor.interceptMessageType + // TODO performance optimization could maybe to avoid isAssignableFrom if interceptMessageClass is Class[Object]? + val interceptMessageClass = interceptor.interceptMessageClass val result = - if (interceptMessageType == null || interceptMessageType.isAssignableFrom(msg.getClass)) + if ((interceptMessageClass ne null) && interceptor.interceptMessageClass.isAssignableFrom(msg.getClass)) interceptor.aroundReceive(ctx, msg, receiveTarget) else receiveTarget.apply(ctx, msg.asInstanceOf[I]) @@ -115,7 +118,8 @@ private[akka] final class InterceptorImpl[O, I]( * INTERNAL API */ @InternalApi -private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) extends BehaviorInterceptor[T, T] { +private[akka] final case class MonitorInterceptor[T: ClassTag](actorRef: ActorRef[T]) + extends BehaviorInterceptor[T, T] { import BehaviorInterceptor._ override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = { @@ -123,10 +127,6 @@ private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) exte target(ctx, msg) } - override def aroundSignal(ctx: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = { - target(ctx, signal) - } - // only once to the same actor in the same behavior stack override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match { case MonitorInterceptor(`actorRef`) => true @@ -135,23 +135,32 @@ private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) exte } +/** + * INTERNAL API + */ +@InternalApi private[akka] object LogMessagesInterceptor { + def apply[T](opts: LogOptions): BehaviorInterceptor[T, T] = { + new LogMessagesInterceptor(opts).asInstanceOf[BehaviorInterceptor[T, T]] + } +} + /** * Log all messages for this decorated ReceiveTarget[T] to logger before receiving it ourselves. * * INTERNAL API */ @InternalApi -private[akka] final case class LogMessagesInterceptor[T](opts: LogOptions) extends BehaviorInterceptor[T, T] { +private[akka] final class LogMessagesInterceptor(val opts: LogOptions) extends BehaviorInterceptor[Any, Any] { import BehaviorInterceptor._ - override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = { + override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any]): Behavior[Any] = { if (opts.enabled) opts.logger.getOrElse(ctx.asScala.log).log(opts.level, "received message {}", msg) target(ctx, msg) } - override def aroundSignal(ctx: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = { + override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[Any]): Behavior[Any] = { if (opts.enabled) opts.logger.getOrElse(ctx.asScala.log).log(opts.level, "received signal {}", signal) target(ctx, signal) @@ -159,8 +168,8 @@ private[akka] final case class LogMessagesInterceptor[T](opts: LogOptions) exten // only once in the same behavior stack override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = other match { - case LogMessagesInterceptor(`opts`) => true - case _ => false + case a: LogMessagesInterceptor => a.opts == opts + case _ => false } } @@ -178,7 +187,7 @@ private[akka] object WidenedInterceptor { * INTERNAL API */ @InternalApi -private[akka] final case class WidenedInterceptor[O, I](matcher: PartialFunction[O, I]) +private[akka] final case class WidenedInterceptor[O: ClassTag, I](matcher: PartialFunction[O, I]) extends BehaviorInterceptor[O, I] { import BehaviorInterceptor._ import WidenedInterceptor._ @@ -202,8 +211,5 @@ private[akka] final case class WidenedInterceptor[O, I](matcher: PartialFunction } } - def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = - target(ctx, signal) - override def toString: String = s"Widen(${LineNumbers(matcher)})" } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala index a613d94dd8..17406e2f9d 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/PoisonPill.scala @@ -7,6 +7,7 @@ package akka.actor.typed.internal import akka.actor.typed.TypedActorContext import akka.actor.typed.Behavior import akka.actor.typed.BehaviorInterceptor +import akka.actor.typed.BehaviorSignalInterceptor import akka.actor.typed.Signal import akka.annotation.InternalApi @@ -33,12 +34,7 @@ import akka.annotation.InternalApi * application protocol. Persistent actors handle `PoisonPill` and run side effects after persist * and process stashed messages before stopping. */ -@InternalApi private[akka] final class PoisonPillInterceptor[M] extends BehaviorInterceptor[M, M] { - override def aroundReceive( - ctx: TypedActorContext[M], - msg: M, - target: BehaviorInterceptor.ReceiveTarget[M]): Behavior[M] = - target(ctx, msg) +@InternalApi private[akka] final class PoisonPillInterceptor[M] extends BehaviorSignalInterceptor[M] { override def aroundSignal( ctx: TypedActorContext[M], diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala index 5ca5bc696f..872c5bee46 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Supervision.scala @@ -33,11 +33,13 @@ import akka.util.unused strategy match { case r: RestartOrBackoff => - Behaviors.intercept[T, T](() => new RestartSupervisor(initialBehavior, r))(initialBehavior) + Behaviors.intercept[Any, T](() => new RestartSupervisor(initialBehavior, r))(initialBehavior).narrow case r: Resume => - Behaviors.intercept[T, T](() => new ResumeSupervisor(r))(initialBehavior) + // stateless so safe to share + Behaviors.intercept[Any, T](() => new ResumeSupervisor(r))(initialBehavior).narrow case r: Stop => - Behaviors.intercept[T, T](() => new StopSupervisor(initialBehavior, r))(initialBehavior) + // stateless so safe to share + Behaviors.intercept[Any, T](() => new StopSupervisor(initialBehavior, r))(initialBehavior).narrow } } } @@ -46,9 +48,8 @@ import akka.util.unused * INTERNAL API */ @InternalApi -private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: SupervisorStrategy)( - implicit ev: ClassTag[Thr]) - extends BehaviorInterceptor[O, I] { +private abstract class AbstractSupervisor[I, Thr <: Throwable](strategy: SupervisorStrategy)(implicit ev: ClassTag[Thr]) + extends BehaviorInterceptor[Any, I] { private val throwableClass = implicitly[ClassTag[Thr]].runtimeClass @@ -57,18 +58,18 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe override def isSame(other: BehaviorInterceptor[Any, Any]): Boolean = { other match { - case as: AbstractSupervisor[_, _, Thr] if throwableClass == as.throwableClass => true - case _ => false + case as: AbstractSupervisor[_, Thr] if throwableClass == as.throwableClass => true + case _ => false } } - override def aroundStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Behavior[I] = { + override def aroundStart(ctx: TypedActorContext[Any], target: PreStartTarget[I]): Behavior[I] = { try { target.start(ctx) } catch handleExceptionOnStart(ctx, target) } - def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = { + override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[I]): Behavior[I] = { try { target(ctx, signal) } catch handleSignalException(ctx, target) @@ -92,9 +93,9 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signalOrMessage, ctx.asScala.self)) } - protected def handleExceptionOnStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Catcher[Behavior[I]] - protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[I]): Catcher[Behavior[I]] - protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[I]): Catcher[Behavior[I]] + protected def handleExceptionOnStart(ctx: TypedActorContext[Any], target: PreStartTarget[I]): Catcher[Behavior[I]] + protected def handleSignalException(ctx: TypedActorContext[Any], target: SignalTarget[I]): Catcher[Behavior[I]] + protected def handleReceiveException(ctx: TypedActorContext[Any], target: ReceiveTarget[I]): Catcher[Behavior[I]] override def toString: String = Logging.simpleName(getClass) } @@ -103,32 +104,32 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe * For cases where O == I for BehaviorInterceptor. */ private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) - extends AbstractSupervisor[T, T, Thr](ss) { + extends AbstractSupervisor[T, Thr](ss) { - override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = { + override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[T]): Behavior[T] = { try { - target(ctx, msg) + target(ctx, msg.asInstanceOf[T]) } catch handleReceiveException(ctx, target) } - protected def handleException(@unused ctx: TypedActorContext[T]): Catcher[Behavior[T]] = { + protected def handleException(@unused ctx: TypedActorContext[Any]): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => BehaviorImpl.failed(t) } // convenience if target not required to handle exception - protected def handleExceptionOnStart(ctx: TypedActorContext[T], target: PreStartTarget[T]): Catcher[Behavior[T]] = + protected def handleExceptionOnStart(ctx: TypedActorContext[Any], target: PreStartTarget[T]): Catcher[Behavior[T]] = handleException(ctx) - protected def handleSignalException(ctx: TypedActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = + protected def handleSignalException(ctx: TypedActorContext[Any], target: SignalTarget[T]): Catcher[Behavior[T]] = handleException(ctx) - protected def handleReceiveException(ctx: TypedActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] = + protected def handleReceiveException(ctx: TypedActorContext[Any], target: ReceiveTarget[T]): Catcher[Behavior[T]] = handleException(ctx) } private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Behavior[T], strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) { - override def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = { + override def handleException(ctx: TypedActorContext[Any]): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => log(ctx, t) BehaviorImpl.failed(t) @@ -136,7 +137,7 @@ private class StopSupervisor[T, Thr <: Throwable: ClassTag](@unused initial: Beh } private class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) { - override protected def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = { + override protected def handleException(ctx: TypedActorContext[Any]): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => log(ctx, t) t match { @@ -166,13 +167,13 @@ private object RestartSupervisor { } } - final case class ScheduledRestart(owner: RestartSupervisor[_, _, _ <: Throwable]) extends DeadLetterSuppression - final case class ResetRestartCount(current: Int, owner: RestartSupervisor[_, _, _ <: Throwable]) + final case class ScheduledRestart(owner: RestartSupervisor[_, _ <: Throwable]) extends DeadLetterSuppression + final case class ResetRestartCount(current: Int, owner: RestartSupervisor[_, _ <: Throwable]) extends DeadLetterSuppression } -private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff) - extends AbstractSupervisor[O, T, Thr](strategy) { +private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior[T], strategy: RestartOrBackoff) + extends AbstractSupervisor[T, Thr](strategy) { import RestartSupervisor._ private var restartingInProgress: OptionVal[(StashBuffer[Any], Set[ActorRef[Nothing]])] = OptionVal.None @@ -185,7 +186,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav case OptionVal.Some(d) => d.hasTimeLeft } - override def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[T]): Behavior[T] = { + override def aroundSignal(ctx: TypedActorContext[Any], signal: Signal, target: SignalTarget[T]): Behavior[T] = { restartingInProgress match { case OptionVal.None => super.aroundSignal(ctx, signal, target) @@ -210,8 +211,8 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } } - override def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = { - msg.asInstanceOf[Any] match { + override def aroundReceive(ctx: TypedActorContext[Any], msg: Any, target: ReceiveTarget[T]): Behavior[T] = { + msg match { case ScheduledRestart(owner) => if (owner eq this) { restartingInProgress match { @@ -259,7 +260,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } override protected def handleExceptionOnStart( - ctx: TypedActorContext[O], + ctx: TypedActorContext[Any], @unused target: PreStartTarget[T]): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => ctx.asScala.cancelAllTimers() @@ -278,23 +279,23 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } override protected def handleSignalException( - ctx: TypedActorContext[O], + ctx: TypedActorContext[Any], target: SignalTarget[T]): Catcher[Behavior[T]] = { handleException(ctx, signalRestart = { - case e: UnstashException[O] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) - case _ => target(ctx, PreRestart) + case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) + case _ => target(ctx, PreRestart) }) } override protected def handleReceiveException( - ctx: TypedActorContext[O], + ctx: TypedActorContext[Any], target: ReceiveTarget[T]): Catcher[Behavior[T]] = { handleException(ctx, signalRestart = { - case e: UnstashException[O] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) - case _ => target.signalRestart(ctx) + case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) + case _ => target.signalRestart(ctx) }) } - private def handleException(ctx: TypedActorContext[O], signalRestart: Throwable => Unit): Catcher[Behavior[T]] = { + private def handleException(ctx: TypedActorContext[Any], signalRestart: Throwable => Unit): Catcher[Behavior[T]] = { case NonFatal(t) if isInstanceOfTheThrowableClass(t) => ctx.asScala.cancelAllTimers() if (strategy.maxRestarts != -1 && restartCount >= strategy.maxRestarts && deadlineHasTimeLeft) { @@ -315,7 +316,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } } - private def prepareRestart(ctx: TypedActorContext[O], reason: Throwable): Behavior[T] = { + private def prepareRestart(ctx: TypedActorContext[Any], reason: Throwable): Behavior[T] = { log(ctx, reason) val currentRestartCount = restartCount @@ -334,7 +335,7 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav val restartDelay = calculateDelay(currentRestartCount, backoff.minBackoff, backoff.maxBackoff, backoff.randomFactor) gotScheduledRestart = false - ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart(this)) + ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self, ScheduledRestart(this)) Behaviors.empty case _: Restart => if (childrenToStop.isEmpty) @@ -344,17 +345,14 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav } } - private def restartCompleted(ctx: TypedActorContext[O]): Behavior[T] = { + private def restartCompleted(ctx: TypedActorContext[Any]): Behavior[T] = { // probably already done, but doesn't hurt to make sure they are canceled ctx.asScala.cancelAllTimers() strategy match { case backoff: Backoff => gotScheduledRestart = false - ctx.asScala.scheduleOnce( - backoff.resetBackoffAfter, - ctx.asScala.self.unsafeUpcast[Any], - ResetRestartCount(restartCount, this)) + ctx.asScala.scheduleOnce(backoff.resetBackoffAfter, ctx.asScala.self, ResetRestartCount(restartCount, this)) case _: Restart => } @@ -364,12 +362,12 @@ private class RestartSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav case OptionVal.None => newBehavior case OptionVal.Some((stashBuffer, _)) => restartingInProgress = OptionVal.None - stashBuffer.unstashAll(ctx.asScala.asInstanceOf[scaladsl.ActorContext[Any]], newBehavior.unsafeCast) + stashBuffer.unstashAll(ctx.asScala, newBehavior.unsafeCast) } nextBehavior.narrow } catch handleException(ctx, signalRestart = { - case e: UnstashException[O] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) - case _ => () + case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart) + case _ => () }) } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala index 0d6c39e8e8..35aa728f5e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/WithMdcBehaviorInterceptor.scala @@ -7,8 +7,8 @@ package akka.actor.typed.internal import akka.actor.typed.internal.adapter.AbstractLogger import akka.actor.typed.{ Behavior, BehaviorInterceptor, Signal, TypedActorContext } import akka.annotation.InternalApi - import scala.collection.immutable.HashMap +import scala.reflect.ClassTag /** * INTERNAL API @@ -16,7 +16,7 @@ import scala.collection.immutable.HashMap @InternalApi private[akka] object WithMdcBehaviorInterceptor { val noMdcPerMessage = (_: Any) => Map.empty[String, Any] - def apply[T]( + def apply[T: ClassTag]( staticMdc: Map[String, Any], mdcForMessage: T => Map[String, Any], behavior: Behavior[T]): Behavior[T] = { @@ -31,7 +31,7 @@ import scala.collection.immutable.HashMap * * INTERNAL API */ -@InternalApi private[akka] final class WithMdcBehaviorInterceptor[T] private ( +@InternalApi private[akka] final class WithMdcBehaviorInterceptor[T: ClassTag] private ( staticMdc: Map[String, Any], mdcForMessage: T => Map[String, Any]) extends BehaviorInterceptor[T, T] { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala index 56044cb2c0..cb294e9266 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/GuardianStartupBehavior.scala @@ -9,7 +9,6 @@ import akka.actor.typed.BehaviorInterceptor import akka.actor.typed.Signal import akka.actor.typed.TypedActorContext import akka.actor.typed.scaladsl.AbstractBehavior -import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.StashBuffer import akka.annotation.InternalApi @@ -34,21 +33,17 @@ private[akka] final class GuardianStartupBehavior[T](val guardianBehavior: Behav import GuardianStartupBehavior.Start - private val stash = StashBuffer[T](1000) + private val stash = StashBuffer[Any](1000) override def onMessage(msg: Any): Behavior[Any] = msg match { case Start => // ctx is not available initially so we cannot use it until here - Behaviors.setup( - ctx => - stash - .unstashAll( - ctx.asInstanceOf[ActorContext[T]], - Behaviors.intercept(() => new GuardianStopInterceptor[T])(guardianBehavior)) - .unsafeCast[Any]) + Behaviors.setup(ctx => + stash + .unstashAll(ctx, Behaviors.intercept(() => new GuardianStopInterceptor)(guardianBehavior.unsafeCast[Any]))) case other => - stash.stash(other.asInstanceOf[T]) + stash.stash(other) this } @@ -61,24 +56,24 @@ private[akka] final class GuardianStartupBehavior[T](val guardianBehavior: Behav * as part of that we must intercept when the guardian is stopped and call ActorSystem.terminate() * explicitly. */ -@InternalApi private[akka] final class GuardianStopInterceptor[T] extends BehaviorInterceptor[T, T] { +@InternalApi private[akka] final class GuardianStopInterceptor extends BehaviorInterceptor[Any, Any] { override def aroundReceive( - ctx: TypedActorContext[T], - msg: T, - target: BehaviorInterceptor.ReceiveTarget[T]): Behavior[T] = { + ctx: TypedActorContext[Any], + msg: Any, + target: BehaviorInterceptor.ReceiveTarget[Any]): Behavior[Any] = { val next = target(ctx, msg) interceptStopped(ctx, next) } override def aroundSignal( - ctx: TypedActorContext[T], + ctx: TypedActorContext[Any], signal: Signal, - target: BehaviorInterceptor.SignalTarget[T]): Behavior[T] = { + target: BehaviorInterceptor.SignalTarget[Any]): Behavior[Any] = { val next = target(ctx, signal) interceptStopped(ctx, next) } - private def interceptStopped(ctx: TypedActorContext[T], next: Behavior[T]): Behavior[T] = { + private def interceptStopped(ctx: TypedActorContext[Any], next: Behavior[Any]): Behavior[Any] = { if (Behavior.isAlive(next)) next else { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala index c3dc2f8039..e3f6916faf 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala @@ -182,9 +182,15 @@ object Behaviors { * monitor [[akka.actor.typed.ActorRef]] before invoking the wrapped behavior. The * wrapped behavior can evolve (i.e. return different behavior) without needing to be * wrapped in a `monitor` call again. + * + * @param interceptMessageClass Ensures that the messages of this class or a subclass thereof will be + * sent to the `monitor`. Other message types (e.g. a private protocol) + * will bypass the interceptor and be continue to the inner behavior. + * @param monitor The messages will also be sent to this `ActorRef` + * @param behavior The inner behavior that is decorated */ - def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = - scaladsl.Behaviors.monitor(monitor, behavior) + def monitor[T](interceptMessageClass: Class[T], monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = + scaladsl.Behaviors.monitor(monitor, behavior)(ClassTag(interceptMessageClass)) /** * Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided @@ -269,6 +275,9 @@ object Behaviors { * }}} * * + * @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass + * the interceptor and be continue to the inner behavior untouched. * @param behavior * the behavior that will receive the selected messages * @param selector @@ -276,8 +285,11 @@ object Behaviors { * transformation * @return a behavior of the widened type */ - def widened[T, U](behavior: Behavior[T], selector: JFunction[PFBuilder[U, T], PFBuilder[U, T]]): Behavior[U] = - BehaviorImpl.widened(behavior, selector.apply(new PFBuilder).build()) + def widened[Outer, Inner]( + interceptMessageClass: Class[Outer], + behavior: Behavior[Outer], + selector: JFunction[PFBuilder[Inner, Outer], PFBuilder[Inner, Outer]]): Behavior[Inner] = + BehaviorImpl.widened(behavior, selector.apply(new PFBuilder).build())(ClassTag(interceptMessageClass)) /** * Support for scheduled `self` messages in an actor. @@ -292,6 +304,9 @@ object Behaviors { /** * Per message MDC (Mapped Diagnostic Context) logging. * + * @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass + * the interceptor and be continue to the inner behavior untouched. * @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after * each message processing by the inner behavior is done. * @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through @@ -300,21 +315,28 @@ object Behaviors { * See also [[akka.actor.typed.Logger.withMdc]] */ def withMdc[T]( + interceptMessageClass: Class[T], mdcForMessage: akka.japi.function.Function[T, java.util.Map[String, Any]], behavior: Behavior[T]): Behavior[T] = - withMdc(Collections.emptyMap[String, Any], mdcForMessage, behavior) + withMdc(interceptMessageClass, Collections.emptyMap[String, Any], mdcForMessage, behavior) /** * Static MDC (Mapped Diagnostic Context) * + * @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass + * the interceptor and be continue to the inner behavior untouched. * @param staticMdc This MDC is setup in the logging context for every message * @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through * `ActorContext.log` * * See also [[akka.actor.typed.Logger.withMdc]] */ - def withMdc[T](staticMdc: java.util.Map[String, Any], behavior: Behavior[T]): Behavior[T] = - withMdc(staticMdc, null, behavior) + def withMdc[T]( + interceptMessageClass: Class[T], + staticMdc: java.util.Map[String, Any], + behavior: Behavior[T]): Behavior[T] = + withMdc(interceptMessageClass, staticMdc, null, behavior) /** * Combination of static and per message MDC (Mapped Diagnostic Context). @@ -325,6 +347,9 @@ object Behaviors { * * * The `staticMdc` or `mdcForMessage` may be empty. * + * @param interceptMessageClass Ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass + * the interceptor and be continue to the inner behavior untouched. * @param staticMdc A static MDC applied for each message * @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after * each message processing by the inner behavior is done. @@ -334,6 +359,7 @@ object Behaviors { * See also [[akka.actor.typed.Logger.withMdc]] */ def withMdc[T]( + interceptMessageClass: Class[T], staticMdc: java.util.Map[String, Any], mdcForMessage: akka.japi.function.Function[T, java.util.Map[String, Any]], behavior: Behavior[T]): Behavior[T] = { @@ -349,7 +375,7 @@ object Behaviors { asScalaMap(mdcForMessage.apply(message)) } - WithMdcBehaviorInterceptor[T](asScalaMap(staticMdc), mdcForMessageFun, behavior) + WithMdcBehaviorInterceptor[T](asScalaMap(staticMdc), mdcForMessageFun, behavior)(ClassTag(interceptMessageClass)) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index 3ab7e8b204..f86e53783b 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -156,8 +156,15 @@ object Behaviors { * monitor [[akka.actor.typed.ActorRef]] before invoking the wrapped behavior. The * wrapped behavior can evolve (i.e. return different behavior) without needing to be * wrapped in a `monitor` call again. + * + * The `ClassTag` for `T` ensures that the messages of this class or a subclass thereof will be + * sent to the `monitor`. Other message types (e.g. a private protocol) will bypass the interceptor + * and be continue to the inner behavior. + * + * @param monitor The messages will also be sent to this `ActorRef` + * @param behavior The inner behavior that is decorated */ - def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = + def monitor[T: ClassTag](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = BehaviorImpl.intercept(() => new MonitorInterceptor[T](monitor))(behavior) /** @@ -166,7 +173,7 @@ object Behaviors { * To include an MDC context then first wrap `logMessages` with `withMDC`. */ def logMessages[T](behavior: Behavior[T]): Behavior[T] = - BehaviorImpl.intercept(() => new LogMessagesInterceptor[T](LogOptions()))(behavior) + BehaviorImpl.intercept(() => LogMessagesInterceptor[T](LogOptions()))(behavior) /** * Behavior decorator that logs all messages to the [[akka.actor.typed.Behavior]] using the provided @@ -174,7 +181,7 @@ object Behaviors { * To include an MDC context then first wrap `logMessages` with `withMDC`. */ def logMessages[T](logOptions: LogOptions, behavior: Behavior[T]): Behavior[T] = - BehaviorImpl.intercept(() => new LogMessagesInterceptor[T](logOptions))(behavior) + BehaviorImpl.intercept(() => LogMessagesInterceptor[T](logOptions))(behavior) /** * Wrap the given behavior with the given [[SupervisorStrategy]] for @@ -227,6 +234,10 @@ object Behaviors { /** * Per message MDC (Mapped Diagnostic Context) logging. * + * The `ClassTag` for `T` ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass the interceptor and be + * continue to the inner behavior untouched. + * * @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after * each message processing by the inner behavior is done. * @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through @@ -234,19 +245,23 @@ object Behaviors { * * See also [[akka.actor.typed.Logger.withMdc]] */ - def withMdc[T](mdcForMessage: T => Map[String, Any])(behavior: Behavior[T]): Behavior[T] = + def withMdc[T: ClassTag](mdcForMessage: T => Map[String, Any])(behavior: Behavior[T]): Behavior[T] = withMdc[T](Map.empty[String, Any], mdcForMessage)(behavior) /** * Static MDC (Mapped Diagnostic Context) * + * The `ClassTag` for `T` ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass the interceptor and be + * continue to the inner behavior untouched. + * * @param staticMdc This MDC is setup in the logging context for every message * @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through * `ActorContext.log` * * See also [[akka.actor.typed.Logger.withMdc]] */ - def withMdc[T](staticMdc: Map[String, Any])(behavior: Behavior[T]): Behavior[T] = + def withMdc[T: ClassTag](staticMdc: Map[String, Any])(behavior: Behavior[T]): Behavior[T] = withMdc[T](staticMdc, (_: T) => Map.empty[String, Any])(behavior) /** @@ -258,6 +273,10 @@ object Behaviors { * * The `staticMdc` or `mdcForMessage` may be empty. * + * The `ClassTag` for `T` ensures that only messages of this class or a subclass thereof will be + * intercepted. Other message types (e.g. a private protocol) will bypass the interceptor and be + * continue to the inner behavior untouched. + * * @param staticMdc A static MDC applied for each message * @param mdcForMessage Is invoked before each message is handled, allowing to setup MDC, MDC is cleared after * each message processing by the inner behavior is done. @@ -266,7 +285,7 @@ object Behaviors { * * See also [[akka.actor.typed.Logger.withMdc]] */ - def withMdc[T](staticMdc: Map[String, Any], mdcForMessage: T => Map[String, Any])( + def withMdc[T: ClassTag](staticMdc: Map[String, Any], mdcForMessage: T => Map[String, Any])( behavior: Behavior[T]): Behavior[T] = WithMdcBehaviorInterceptor[T](staticMdc, mdcForMessage, behavior) diff --git a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md index 3193b2e745..4e70180d6d 100644 --- a/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md +++ b/akka-docs/src/main/paradox/project/migration-guide-2.5.x-2.6.x.md @@ -387,7 +387,9 @@ made before finalizing the APIs. Compared to Akka 2.5.x the source incompatible * `ActorContext` parameter removed in `javadsl.ReceiveBuilder` for the functional style in Java. Use `Behaviors.setup` to retrieve `ActorContext`, and use an enclosing class to hold initialization parameters and `ActorContext`. * Java @apidoc[akka.cluster.sharding.typed.javadsl.EntityRef] ask timeout now takes a `java.time.Duration` rather than a @apidoc[Timeout] - +* `BehaviorInterceptor`, `Behaviors.monitor`, `Behaviors.withMdc` and @scala[`widen`]@java[`Behaviors.widen`] takes + a @scala[`ClassTag` parameter (probably source compatible)]@java[`interceptMessageClass` parameter]. + `interceptMessageType` method in `BehaviorInterceptor` is replaced with this @scala[`ClassTag`]@java[`Class`] parameter. #### Akka Typed Stream API changes diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala index 722cda35d5..c7c0da860b 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala @@ -133,10 +133,10 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State]( val onStopInterceptor = new BehaviorInterceptor[Any, Any] { import BehaviorInterceptor._ - def aroundReceive(ctx: typed.TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any]) + override def aroundReceive(ctx: typed.TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any]) : Behavior[Any] = { target(ctx, msg) } - def aroundSignal(ctx: typed.TypedActorContext[Any], signal: Signal, target: SignalTarget[Any]) + override def aroundSignal(ctx: typed.TypedActorContext[Any], signal: Signal, target: SignalTarget[Any]) : Behavior[Any] = { if (signal == PostStop) { eventSourcedSetup.cancelRecoveryTimer() diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 9a9ab29368..8dc745807f 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -493,12 +493,7 @@ public class PersistentActorJavaDslTest extends JUnitSuite { TestProbe interceptProbe = testKit.createTestProbe(); TestProbe signalProbe = testKit.createTestProbe(); BehaviorInterceptor tap = - new BehaviorInterceptor() { - - @Override - public Class interceptMessageType() { - return Command.class; - } + new BehaviorInterceptor(Command.class) { @Override public Behavior aroundReceive( diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorInterceptorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorInterceptorSpec.scala new file mode 100644 index 0000000000..b3da47a87e --- /dev/null +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/EventSourcedBehaviorInterceptorSpec.scala @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package akka.persistence.typed.scaladsl + +import java.util.concurrent.atomic.AtomicInteger + +import akka.actor.testkit.typed.scaladsl._ +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.BehaviorInterceptor +import akka.actor.typed.TypedActorContext +import akka.actor.typed.scaladsl.Behaviors +import akka.persistence.typed.PersistenceId +import akka.testkit.EventFilter +import akka.testkit.TestEvent.Mute +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalatest.WordSpecLike + +object EventSourcedBehaviorInterceptorSpec { + + val journalId = "event-sourced-behavior-interceptor-spec" + + def config: Config = ConfigFactory.parseString(s""" + akka.loglevel = INFO + akka.loggers = [akka.testkit.TestEventListener] + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + """) + + def testBehavior(persistenceId: PersistenceId, probe: ActorRef[String]): Behavior[String] = + Behaviors.setup { _ => + EventSourcedBehavior[String, String, String]( + persistenceId, + emptyState = "", + commandHandler = (_, command) => + command match { + case _ => + Effect.persist(command).thenRun(newState => probe ! newState) + }, + eventHandler = (state, evt) => state + evt) + } + +} + +class EventSourcedBehaviorInterceptorSpec + extends ScalaTestWithActorTestKit(EventSourcedBehaviorTimersSpec.config) + with WordSpecLike { + + import EventSourcedBehaviorInterceptorSpec._ + + val pidCounter = new AtomicInteger(0) + private def nextPid(): PersistenceId = PersistenceId(s"c${pidCounter.incrementAndGet()})") + + import akka.actor.typed.scaladsl.adapter._ + // needed for the untyped event filter + private implicit val untypedSystem: akka.actor.ActorSystem = system.toUntyped + + untypedSystem.eventStream.publish(Mute(EventFilter.warning(start = "No default snapshot store", occurrences = 1))) + + "EventSourcedBehavior interceptor" must { + + "be possible to combine with another interceptor" in { + val probe = createTestProbe[String]() + val pid = nextPid() + + val toUpper = new BehaviorInterceptor[String, String] { + override def aroundReceive( + ctx: TypedActorContext[String], + msg: String, + target: BehaviorInterceptor.ReceiveTarget[String]): Behavior[String] = { + target(ctx, msg.toUpperCase()) + } + + } + + val ref = spawn(Behaviors.intercept(() => toUpper)(testBehavior(pid, probe.ref))) + + ref ! "a" + ref ! "bc" + probe.expectMessage("A") + probe.expectMessage("ABC") + } + + "be possible to combine with widen" in { + // EventSourcedBehaviorImpl should use a plain BehaviorInterceptor instead of widen + pending // FIXME #25887 + val probe = createTestProbe[String]() + val pid = nextPid() + val ref = spawn(testBehavior(pid, probe.ref).widen[String] { + case s => s.toUpperCase() + }) + + ref ! "a" + ref ! "bc" + probe.expectMessage("A") + probe.expectMessage("ABC") + } + + "be possible to combine with MDC" in { + val probe = createTestProbe[String]() + val pid = nextPid() + val ref = spawn(Behaviors.setup[String] { _ => + Behaviors + .withMdc(staticMdc = Map("pid" -> pid), mdcForMessage = (msg: String) => Map("msg" -> msg.toUpperCase())) { + testBehavior(pid, probe.ref) + } + }) + + ref ! "a" + ref ! "bc" + probe.expectMessage("a") + probe.expectMessage("abc") + + } + + } +}