From 463cdfe2a6122d57d763b29041cafee091eac2dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 9 Jan 2019 20:15:24 +0100 Subject: [PATCH] Intercept subset of messages #25727 --- .../actor/typed/javadsl/InterceptTest.java | 86 +++++++ .../akka/actor/typed/InterceptSpec.scala | 210 ++++++++++-------- .../actor/typed/BehaviorInterceptor.scala | 11 + .../typed/internal/InterceptorImpl.scala | 11 +- .../javadsl/PersistentActorJavaDslTest.java | 6 + 5 files changed, 233 insertions(+), 91 deletions(-) create mode 100644 akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java new file mode 100644 index 0000000000..963c7459c4 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/InterceptTest.java @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.actor.typed.javadsl; + +import akka.actor.testkit.typed.javadsl.TestKitJunitResource; +import akka.actor.testkit.typed.javadsl.TestProbe; +import akka.actor.typed.*; +import akka.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +public class InterceptTest extends JUnitSuite { + + @ClassRule + public static final TestKitJunitResource testKit = new TestKitJunitResource(AkkaSpec.testConf()); + + @Test + public void interceptMessage() { + final TestProbe interceptProbe = testKit.createTestProbe(); + BehaviorInterceptor interceptor = new BehaviorInterceptor() { + @Override + public Behavior aroundReceive(TypedActorContext ctx, String msg, ReceiveTarget target) { + interceptProbe.getRef().tell(msg); + return target.apply(ctx, msg); + } + + @Override + public Behavior aroundSignal(TypedActorContext ctx, Signal signal, SignalTarget target) { + return target.apply(ctx, signal); + } + }; + + final TestProbe probe = testKit.createTestProbe(); + ActorRef ref = testKit.spawn(Behaviors.intercept(interceptor, Behaviors.receiveMessage((String msg) -> { + probe.getRef().tell(msg); + return Behaviors.same(); + }))); + ref.tell("Hello"); + + interceptProbe.expectMessage("Hello"); + probe.expectMessage("Hello"); + } + + interface Message {} + static class A implements Message {} + static class B implements Message {} + + @Test + public void interceptMessagesSelectively() { + final TestProbe interceptProbe = testKit.createTestProbe(); + BehaviorInterceptor interceptor = new BehaviorInterceptor() { + + @Override + public Class interceptMessageType() { + return B.class; + } + + @Override + public Behavior aroundReceive(TypedActorContext ctx, Message msg, ReceiveTarget target) { + interceptProbe.getRef().tell(msg); + return target.apply(ctx, msg); + } + + @Override + public Behavior aroundSignal(TypedActorContext ctx, Signal signal, SignalTarget target) { + return target.apply(ctx, signal); + } + }; + + final TestProbe probe = testKit.createTestProbe(); + ActorRef ref = testKit.spawn(Behaviors.intercept(interceptor, Behaviors.receiveMessage((Message msg) -> { + probe.getRef().tell(msg); + return Behaviors.same(); + }))); + ref.tell(new A()); + ref.tell(new B()); + + probe.expectMessageClass(A.class); + interceptProbe.expectMessageClass(B.class); + probe.expectMessageClass(B.class); + } + +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala index 68c7a5b305..0ff14a8220 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/InterceptSpec.scala @@ -10,8 +10,8 @@ import akka.testkit.EventFilter import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.typed.scaladsl.Behaviors import org.scalatest.WordSpecLike -import scala.concurrent.duration._ +import scala.concurrent.duration._ import akka.actor.ActorInitializationException import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -46,7 +46,7 @@ class InterceptSpec extends ScalaTestWithActorTestKit( // keeping the instance equality as "isSame" for these } - "Intercept" should { + "Intercept" must { "intercept messages" in { val probe = TestProbe[String]() @@ -198,115 +198,149 @@ class InterceptSpec extends ScalaTestWithActorTestKit( innerBehaviorStarted.get should ===(false) } - } + "intercept with nested setup" in { + val probe = TestProbe[String]() + val interceptor = snitchingInterceptor(probe.ref) - "intercept with nested setup" in { - val probe = TestProbe[String]() - val interceptor = snitchingInterceptor(probe.ref) - - val ref: ActorRef[String] = spawn(Behaviors.intercept(interceptor)( - Behaviors.setup { _ ⇒ - var count = 0 - Behaviors.receiveMessage[String] { m ⇒ - count += 1 - probe.ref ! s"actual behavior $m-$count" - Behaviors.same - } - } - )) - - ref ! "a" - probe.expectMessage("before a") - probe.expectMessage("actual behavior a-1") - probe.expectMessage("after a") - - ref ! "b" - probe.expectMessage("before b") - probe.expectMessage("actual behavior b-2") - probe.expectMessage("after b") - } - - "intercept with recursivly setup" in { - val probe = TestProbe[String]() - val interceptor = snitchingInterceptor(probe.ref) - - def next(count1: Int): Behavior[String] = { - Behaviors.intercept(interceptor)( + val ref: ActorRef[String] = spawn(Behaviors.intercept(interceptor)( Behaviors.setup { _ ⇒ - var count2 = 0 + var count = 0 Behaviors.receiveMessage[String] { m ⇒ - count2 += 1 - probe.ref ! s"actual behavior $m-$count1-$count2" - next(count1 + 1) + count += 1 + probe.ref ! s"actual behavior $m-$count" + Behaviors.same } } - ) + )) + + ref ! "a" + probe.expectMessage("before a") + probe.expectMessage("actual behavior a-1") + probe.expectMessage("after a") + + ref ! "b" + probe.expectMessage("before b") + probe.expectMessage("actual behavior b-2") + probe.expectMessage("after b") } - val ref: ActorRef[String] = spawn(next(1)) + "intercept with recursivly setup" in { + val probe = TestProbe[String]() + val interceptor = snitchingInterceptor(probe.ref) - ref ! "a" - probe.expectMessage("before a") - probe.expectMessage("actual behavior a-1-1") - probe.expectMessage("after a") + def next(count1: Int): Behavior[String] = { + Behaviors.intercept(interceptor)( + Behaviors.setup { _ ⇒ + var count2 = 0 + Behaviors.receiveMessage[String] { m ⇒ + count2 += 1 + probe.ref ! s"actual behavior $m-$count1-$count2" + next(count1 + 1) + } + } + ) + } - ref ! "b" - probe.expectMessage("before b") - probe.expectMessage("actual behavior b-2-1") - probe.expectMessage("after b") + val ref: ActorRef[String] = spawn(next(1)) - ref ! "c" - probe.expectMessage("before c") - probe.expectMessage("actual behavior c-3-1") - probe.expectMessage("after c") - } + ref ! "a" + probe.expectMessage("before a") + probe.expectMessage("actual behavior a-1-1") + probe.expectMessage("after a") - "not allow intercept setup(same)" in { - val probe = TestProbe[String]() - val interceptor = snitchingInterceptor(probe.ref) + ref ! "b" + probe.expectMessage("before b") + probe.expectMessage("actual behavior b-2-1") + probe.expectMessage("after b") + + ref ! "c" + probe.expectMessage("before c") + probe.expectMessage("actual behavior c-3-1") + probe.expectMessage("after c") + } + + "not allow intercept setup(same)" in { + val probe = TestProbe[String]() + val interceptor = snitchingInterceptor(probe.ref) + + EventFilter[ActorInitializationException](occurrences = 1).intercept { + val ref = spawn(Behaviors.intercept(interceptor)( + Behaviors.setup[String] { _ ⇒ Behaviors.same[String] })) + probe.expectTerminated(ref, probe.remainingOrDefault) + } + + } + + "be useful for implementing PoisonPill" in { + + def inner(count: Int): Behavior[Msg] = Behaviors.receiveMessage { + case Msg(hello, replyTo) ⇒ + replyTo ! s"$hello-$count" + inner(count + 1) + } + + val poisonInterceptor = new BehaviorInterceptor[Any, Msg] { + override def aroundReceive(context: TypedActorContext[Any], message: Any, target: ReceiveTarget[Msg]): Behavior[Msg] = + message match { + case MyPoisonPill ⇒ Behaviors.stopped + case m: Msg ⇒ target(context, m) + case _ ⇒ Behaviors.unhandled + } + + override def aroundSignal(context: TypedActorContext[Any], signal: Signal, target: SignalTarget[Msg]): Behavior[Msg] = + target.apply(context, signal) + + } + + val decorated: Behavior[Msg] = + Behaviors.intercept(poisonInterceptor)(inner(0)).narrow + + val ref = spawn(decorated) + val probe = TestProbe[String]() + ref ! Msg("hello", probe.ref) + probe.expectMessage("hello-0") + ref ! Msg("hello", probe.ref) + probe.expectMessage("hello-1") + + ref.unsafeUpcast[Any] ! MyPoisonPill - EventFilter[ActorInitializationException](occurrences = 1).intercept { - val ref = spawn(Behaviors.intercept(interceptor)( - Behaviors.setup[String] { _ ⇒ Behaviors.same[String] })) probe.expectTerminated(ref, probe.remainingOrDefault) } - } + "be able to intercept a subset of the messages" in { + trait Message + class A extends Message + class B extends Message - "be useful for implementing PoisonPill" in { + val interceptProbe = TestProbe[Message]() - def inner(count: Int): Behavior[Msg] = Behaviors.receiveMessage { - case Msg(hello, replyTo) ⇒ - replyTo ! s"$hello-$count" - inner(count + 1) - } + val partialInterceptor: BehaviorInterceptor[Message, Message] = new BehaviorInterceptor[Message, Message] { - val poisonInterceptor = new BehaviorInterceptor[Any, Msg] { - override def aroundReceive(context: TypedActorContext[Any], message: Any, target: ReceiveTarget[Msg]): Behavior[Msg] = - message match { - case MyPoisonPill ⇒ Behaviors.stopped - case m: Msg ⇒ target(context, m) - case _ ⇒ Behaviors.unhandled + override def interceptMessageType = classOf[B] + + override def aroundReceive(ctx: TypedActorContext[Message], msg: Message, target: ReceiveTarget[Message]): Behavior[Message] = { + interceptProbe.ref ! msg + target(ctx, msg) } - override def aroundSignal(context: TypedActorContext[Any], signal: Signal, target: SignalTarget[Msg]): Behavior[Msg] = - target.apply(context, signal) + override def aroundSignal(ctx: TypedActorContext[Message], signal: Signal, target: SignalTarget[Message]): Behavior[Message] = + target(ctx, signal) + } + val probe = TestProbe[Message]() + val ref = spawn(Behaviors.intercept(partialInterceptor)(Behaviors.receiveMessage { msg ⇒ + probe.ref ! msg + Behaviors.same + })) + + ref ! new A + ref ! new B + + probe.expectMessageType[A] + interceptProbe.expectMessageType[B] + probe.expectMessageType[B] } - val decorated: Behavior[Msg] = - Behaviors.intercept(poisonInterceptor)(inner(0)).narrow - - val ref = spawn(decorated) - val probe = TestProbe[String]() - ref ! Msg("hello", probe.ref) - probe.expectMessage("hello-0") - ref ! Msg("hello", probe.ref) - probe.expectMessage("hello-1") - - ref.unsafeUpcast[Any] ! MyPoisonPill - - probe.expectTerminated(ref, probe.remainingOrDefault) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala index e04ede78a0..22e3758596 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/BehaviorInterceptor.scala @@ -17,6 +17,17 @@ import akka.annotation.{ DoNotInherit, InternalApi } abstract class BehaviorInterceptor[O, I] { import BehaviorInterceptor._ + /** + * Allows for applying the interceptor only to certain message types. Useful if the official protocol and the actual + * protocol of an actor causes problems, for example class cast exceptions for a message not of type `O` that + * the actor still knows how to deal with. Note that this is only possible to use when `O` and `I` are the same type. + * + * @return A subtype of `O` that should be intercepted or `null` to intercept all `O`s. + * Subtypes of `O` matching this are passed directly to the inner behavior without interception. + */ + // null for all to avoid having to deal with class tag/explicit class in the default case of no filter + def interceptMessageType: Class[_ <: O] = null + /** * Override to intercept actor startup. To trigger startup of * the next behavior in the stack, call `target.start()`. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala index 2b20599317..bee82a469f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/InterceptorImpl.scala @@ -7,7 +7,7 @@ package akka.actor.typed.internal import akka.actor.typed import akka.actor.typed.Behavior.{ SameBehavior, UnhandledBehavior } import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg -import akka.actor.typed.{ TypedActorContext, ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, PreRestart, Signal } +import akka.actor.typed.{ ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, PreRestart, Signal, TypedActorContext } import akka.annotation.InternalApi import akka.util.LineNumbers @@ -67,8 +67,13 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce new InterceptorImpl(interceptor, newNested) override def receive(ctx: typed.TypedActorContext[O], msg: O): Behavior[O] = { - val interceptedResult = interceptor.aroundReceive(ctx, msg, receiveTarget) - deduplicate(interceptedResult, ctx) + val interceptMessageType = interceptor.interceptMessageType + val result = + if (interceptMessageType == null || interceptMessageType.isAssignableFrom(msg.getClass)) + interceptor.aroundReceive(ctx, msg, receiveTarget) + else + receiveTarget.apply(ctx, msg.asInstanceOf[I]) + deduplicate(result, ctx) } override def receiveSignal(ctx: typed.TypedActorContext[O], signal: Signal): Behavior[O] = { diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java index 196c097d48..09992b368b 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorJavaDslTest.java @@ -451,6 +451,12 @@ public class PersistentActorJavaDslTest extends JUnitSuite { TestProbe interceptProbe = testKit.createTestProbe(); TestProbe signalProbe = testKit.createTestProbe(); BehaviorInterceptor tap = new BehaviorInterceptor() { + + @Override + public Class interceptMessageType() { + return Command.class; + } + @Override public Behavior aroundReceive(TypedActorContext ctx, Command msg, ReceiveTarget target) { interceptProbe.ref().tell(msg);