Intercept subset of messages #25727
This commit is contained in:
parent
cc19367588
commit
463cdfe2a6
5 changed files with 233 additions and 91 deletions
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.actor.typed.javadsl;
|
||||
|
||||
import akka.actor.testkit.typed.javadsl.TestKitJunitResource;
|
||||
import akka.actor.testkit.typed.javadsl.TestProbe;
|
||||
import akka.actor.typed.*;
|
||||
import akka.testkit.AkkaSpec;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.scalatest.junit.JUnitSuite;
|
||||
|
||||
public class InterceptTest extends JUnitSuite {
|
||||
|
||||
@ClassRule
|
||||
public static final TestKitJunitResource testKit = new TestKitJunitResource(AkkaSpec.testConf());
|
||||
|
||||
@Test
|
||||
public void interceptMessage() {
|
||||
final TestProbe<String> interceptProbe = testKit.createTestProbe();
|
||||
BehaviorInterceptor<String, String> interceptor = new BehaviorInterceptor<String, String>() {
|
||||
@Override
|
||||
public Behavior<String> aroundReceive(TypedActorContext<String> ctx, String msg, ReceiveTarget<String> target) {
|
||||
interceptProbe.getRef().tell(msg);
|
||||
return target.apply(ctx, msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Behavior<String> aroundSignal(TypedActorContext<String> ctx, Signal signal, SignalTarget<String> target) {
|
||||
return target.apply(ctx, signal);
|
||||
}
|
||||
};
|
||||
|
||||
final TestProbe<String> probe = testKit.createTestProbe();
|
||||
ActorRef<String> ref = testKit.spawn(Behaviors.intercept(interceptor, Behaviors.receiveMessage((String msg) -> {
|
||||
probe.getRef().tell(msg);
|
||||
return Behaviors.same();
|
||||
})));
|
||||
ref.tell("Hello");
|
||||
|
||||
interceptProbe.expectMessage("Hello");
|
||||
probe.expectMessage("Hello");
|
||||
}
|
||||
|
||||
interface Message {}
|
||||
static class A implements Message {}
|
||||
static class B implements Message {}
|
||||
|
||||
@Test
|
||||
public void interceptMessagesSelectively() {
|
||||
final TestProbe<Message> interceptProbe = testKit.createTestProbe();
|
||||
BehaviorInterceptor<Message, Message> interceptor = new BehaviorInterceptor<Message, Message>() {
|
||||
|
||||
@Override
|
||||
public Class<? extends Message> interceptMessageType() {
|
||||
return B.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Behavior<Message> aroundReceive(TypedActorContext<Message> ctx, Message msg, ReceiveTarget<Message> target) {
|
||||
interceptProbe.getRef().tell(msg);
|
||||
return target.apply(ctx, msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Behavior<Message> aroundSignal(TypedActorContext<Message> ctx, Signal signal, SignalTarget<Message> target) {
|
||||
return target.apply(ctx, signal);
|
||||
}
|
||||
};
|
||||
|
||||
final TestProbe<Message> probe = testKit.createTestProbe();
|
||||
ActorRef<Message> ref = testKit.spawn(Behaviors.intercept(interceptor, Behaviors.receiveMessage((Message msg) -> {
|
||||
probe.getRef().tell(msg);
|
||||
return Behaviors.same();
|
||||
})));
|
||||
ref.tell(new A());
|
||||
ref.tell(new B());
|
||||
|
||||
probe.expectMessageClass(A.class);
|
||||
interceptProbe.expectMessageClass(B.class);
|
||||
probe.expectMessageClass(B.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -10,8 +10,8 @@ import akka.testkit.EventFilter
|
|||
import akka.actor.testkit.typed.scaladsl.TestProbe
|
||||
import akka.actor.typed.scaladsl.Behaviors
|
||||
import org.scalatest.WordSpecLike
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.ActorInitializationException
|
||||
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
|
||||
|
||||
|
|
@ -46,7 +46,7 @@ class InterceptSpec extends ScalaTestWithActorTestKit(
|
|||
// keeping the instance equality as "isSame" for these
|
||||
}
|
||||
|
||||
"Intercept" should {
|
||||
"Intercept" must {
|
||||
|
||||
"intercept messages" in {
|
||||
val probe = TestProbe[String]()
|
||||
|
|
@ -198,115 +198,149 @@ class InterceptSpec extends ScalaTestWithActorTestKit(
|
|||
innerBehaviorStarted.get should ===(false)
|
||||
}
|
||||
|
||||
}
|
||||
"intercept with nested setup" in {
|
||||
val probe = TestProbe[String]()
|
||||
val interceptor = snitchingInterceptor(probe.ref)
|
||||
|
||||
"intercept with nested setup" in {
|
||||
val probe = TestProbe[String]()
|
||||
val interceptor = snitchingInterceptor(probe.ref)
|
||||
|
||||
val ref: ActorRef[String] = spawn(Behaviors.intercept(interceptor)(
|
||||
Behaviors.setup { _ ⇒
|
||||
var count = 0
|
||||
Behaviors.receiveMessage[String] { m ⇒
|
||||
count += 1
|
||||
probe.ref ! s"actual behavior $m-$count"
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
))
|
||||
|
||||
ref ! "a"
|
||||
probe.expectMessage("before a")
|
||||
probe.expectMessage("actual behavior a-1")
|
||||
probe.expectMessage("after a")
|
||||
|
||||
ref ! "b"
|
||||
probe.expectMessage("before b")
|
||||
probe.expectMessage("actual behavior b-2")
|
||||
probe.expectMessage("after b")
|
||||
}
|
||||
|
||||
"intercept with recursivly setup" in {
|
||||
val probe = TestProbe[String]()
|
||||
val interceptor = snitchingInterceptor(probe.ref)
|
||||
|
||||
def next(count1: Int): Behavior[String] = {
|
||||
Behaviors.intercept(interceptor)(
|
||||
val ref: ActorRef[String] = spawn(Behaviors.intercept(interceptor)(
|
||||
Behaviors.setup { _ ⇒
|
||||
var count2 = 0
|
||||
var count = 0
|
||||
Behaviors.receiveMessage[String] { m ⇒
|
||||
count2 += 1
|
||||
probe.ref ! s"actual behavior $m-$count1-$count2"
|
||||
next(count1 + 1)
|
||||
count += 1
|
||||
probe.ref ! s"actual behavior $m-$count"
|
||||
Behaviors.same
|
||||
}
|
||||
}
|
||||
)
|
||||
))
|
||||
|
||||
ref ! "a"
|
||||
probe.expectMessage("before a")
|
||||
probe.expectMessage("actual behavior a-1")
|
||||
probe.expectMessage("after a")
|
||||
|
||||
ref ! "b"
|
||||
probe.expectMessage("before b")
|
||||
probe.expectMessage("actual behavior b-2")
|
||||
probe.expectMessage("after b")
|
||||
}
|
||||
|
||||
val ref: ActorRef[String] = spawn(next(1))
|
||||
"intercept with recursivly setup" in {
|
||||
val probe = TestProbe[String]()
|
||||
val interceptor = snitchingInterceptor(probe.ref)
|
||||
|
||||
ref ! "a"
|
||||
probe.expectMessage("before a")
|
||||
probe.expectMessage("actual behavior a-1-1")
|
||||
probe.expectMessage("after a")
|
||||
def next(count1: Int): Behavior[String] = {
|
||||
Behaviors.intercept(interceptor)(
|
||||
Behaviors.setup { _ ⇒
|
||||
var count2 = 0
|
||||
Behaviors.receiveMessage[String] { m ⇒
|
||||
count2 += 1
|
||||
probe.ref ! s"actual behavior $m-$count1-$count2"
|
||||
next(count1 + 1)
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
ref ! "b"
|
||||
probe.expectMessage("before b")
|
||||
probe.expectMessage("actual behavior b-2-1")
|
||||
probe.expectMessage("after b")
|
||||
val ref: ActorRef[String] = spawn(next(1))
|
||||
|
||||
ref ! "c"
|
||||
probe.expectMessage("before c")
|
||||
probe.expectMessage("actual behavior c-3-1")
|
||||
probe.expectMessage("after c")
|
||||
}
|
||||
ref ! "a"
|
||||
probe.expectMessage("before a")
|
||||
probe.expectMessage("actual behavior a-1-1")
|
||||
probe.expectMessage("after a")
|
||||
|
||||
"not allow intercept setup(same)" in {
|
||||
val probe = TestProbe[String]()
|
||||
val interceptor = snitchingInterceptor(probe.ref)
|
||||
ref ! "b"
|
||||
probe.expectMessage("before b")
|
||||
probe.expectMessage("actual behavior b-2-1")
|
||||
probe.expectMessage("after b")
|
||||
|
||||
ref ! "c"
|
||||
probe.expectMessage("before c")
|
||||
probe.expectMessage("actual behavior c-3-1")
|
||||
probe.expectMessage("after c")
|
||||
}
|
||||
|
||||
"not allow intercept setup(same)" in {
|
||||
val probe = TestProbe[String]()
|
||||
val interceptor = snitchingInterceptor(probe.ref)
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 1).intercept {
|
||||
val ref = spawn(Behaviors.intercept(interceptor)(
|
||||
Behaviors.setup[String] { _ ⇒ Behaviors.same[String] }))
|
||||
probe.expectTerminated(ref, probe.remainingOrDefault)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"be useful for implementing PoisonPill" in {
|
||||
|
||||
def inner(count: Int): Behavior[Msg] = Behaviors.receiveMessage {
|
||||
case Msg(hello, replyTo) ⇒
|
||||
replyTo ! s"$hello-$count"
|
||||
inner(count + 1)
|
||||
}
|
||||
|
||||
val poisonInterceptor = new BehaviorInterceptor[Any, Msg] {
|
||||
override def aroundReceive(context: TypedActorContext[Any], message: Any, target: ReceiveTarget[Msg]): Behavior[Msg] =
|
||||
message match {
|
||||
case MyPoisonPill ⇒ Behaviors.stopped
|
||||
case m: Msg ⇒ target(context, m)
|
||||
case _ ⇒ Behaviors.unhandled
|
||||
}
|
||||
|
||||
override def aroundSignal(context: TypedActorContext[Any], signal: Signal, target: SignalTarget[Msg]): Behavior[Msg] =
|
||||
target.apply(context, signal)
|
||||
|
||||
}
|
||||
|
||||
val decorated: Behavior[Msg] =
|
||||
Behaviors.intercept(poisonInterceptor)(inner(0)).narrow
|
||||
|
||||
val ref = spawn(decorated)
|
||||
val probe = TestProbe[String]()
|
||||
ref ! Msg("hello", probe.ref)
|
||||
probe.expectMessage("hello-0")
|
||||
ref ! Msg("hello", probe.ref)
|
||||
probe.expectMessage("hello-1")
|
||||
|
||||
ref.unsafeUpcast[Any] ! MyPoisonPill
|
||||
|
||||
EventFilter[ActorInitializationException](occurrences = 1).intercept {
|
||||
val ref = spawn(Behaviors.intercept(interceptor)(
|
||||
Behaviors.setup[String] { _ ⇒ Behaviors.same[String] }))
|
||||
probe.expectTerminated(ref, probe.remainingOrDefault)
|
||||
}
|
||||
|
||||
}
|
||||
"be able to intercept a subset of the messages" in {
|
||||
trait Message
|
||||
class A extends Message
|
||||
class B extends Message
|
||||
|
||||
"be useful for implementing PoisonPill" in {
|
||||
val interceptProbe = TestProbe[Message]()
|
||||
|
||||
def inner(count: Int): Behavior[Msg] = Behaviors.receiveMessage {
|
||||
case Msg(hello, replyTo) ⇒
|
||||
replyTo ! s"$hello-$count"
|
||||
inner(count + 1)
|
||||
}
|
||||
val partialInterceptor: BehaviorInterceptor[Message, Message] = new BehaviorInterceptor[Message, Message] {
|
||||
|
||||
val poisonInterceptor = new BehaviorInterceptor[Any, Msg] {
|
||||
override def aroundReceive(context: TypedActorContext[Any], message: Any, target: ReceiveTarget[Msg]): Behavior[Msg] =
|
||||
message match {
|
||||
case MyPoisonPill ⇒ Behaviors.stopped
|
||||
case m: Msg ⇒ target(context, m)
|
||||
case _ ⇒ Behaviors.unhandled
|
||||
override def interceptMessageType = classOf[B]
|
||||
|
||||
override def aroundReceive(ctx: TypedActorContext[Message], msg: Message, target: ReceiveTarget[Message]): Behavior[Message] = {
|
||||
interceptProbe.ref ! msg
|
||||
target(ctx, msg)
|
||||
}
|
||||
|
||||
override def aroundSignal(context: TypedActorContext[Any], signal: Signal, target: SignalTarget[Msg]): Behavior[Msg] =
|
||||
target.apply(context, signal)
|
||||
override def aroundSignal(ctx: TypedActorContext[Message], signal: Signal, target: SignalTarget[Message]): Behavior[Message] =
|
||||
target(ctx, signal)
|
||||
}
|
||||
|
||||
val probe = TestProbe[Message]()
|
||||
val ref = spawn(Behaviors.intercept(partialInterceptor)(Behaviors.receiveMessage { msg ⇒
|
||||
probe.ref ! msg
|
||||
Behaviors.same
|
||||
}))
|
||||
|
||||
ref ! new A
|
||||
ref ! new B
|
||||
|
||||
probe.expectMessageType[A]
|
||||
interceptProbe.expectMessageType[B]
|
||||
probe.expectMessageType[B]
|
||||
}
|
||||
|
||||
val decorated: Behavior[Msg] =
|
||||
Behaviors.intercept(poisonInterceptor)(inner(0)).narrow
|
||||
|
||||
val ref = spawn(decorated)
|
||||
val probe = TestProbe[String]()
|
||||
ref ! Msg("hello", probe.ref)
|
||||
probe.expectMessage("hello-0")
|
||||
ref ! Msg("hello", probe.ref)
|
||||
probe.expectMessage("hello-1")
|
||||
|
||||
ref.unsafeUpcast[Any] ! MyPoisonPill
|
||||
|
||||
probe.expectTerminated(ref, probe.remainingOrDefault)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,17 @@ import akka.annotation.{ DoNotInherit, InternalApi }
|
|||
abstract class BehaviorInterceptor[O, I] {
|
||||
import BehaviorInterceptor._
|
||||
|
||||
/**
|
||||
* Allows for applying the interceptor only to certain message types. Useful if the official protocol and the actual
|
||||
* protocol of an actor causes problems, for example class cast exceptions for a message not of type `O` that
|
||||
* the actor still knows how to deal with. Note that this is only possible to use when `O` and `I` are the same type.
|
||||
*
|
||||
* @return A subtype of `O` that should be intercepted or `null` to intercept all `O`s.
|
||||
* Subtypes of `O` matching this are passed directly to the inner behavior without interception.
|
||||
*/
|
||||
// null for all to avoid having to deal with class tag/explicit class in the default case of no filter
|
||||
def interceptMessageType: Class[_ <: O] = null
|
||||
|
||||
/**
|
||||
* Override to intercept actor startup. To trigger startup of
|
||||
* the next behavior in the stack, call `target.start()`.
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package akka.actor.typed.internal
|
|||
import akka.actor.typed
|
||||
import akka.actor.typed.Behavior.{ SameBehavior, UnhandledBehavior }
|
||||
import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg
|
||||
import akka.actor.typed.{ TypedActorContext, ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, PreRestart, Signal }
|
||||
import akka.actor.typed.{ ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, PreRestart, Signal, TypedActorContext }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.LineNumbers
|
||||
|
||||
|
|
@ -67,8 +67,13 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
|
|||
new InterceptorImpl(interceptor, newNested)
|
||||
|
||||
override def receive(ctx: typed.TypedActorContext[O], msg: O): Behavior[O] = {
|
||||
val interceptedResult = interceptor.aroundReceive(ctx, msg, receiveTarget)
|
||||
deduplicate(interceptedResult, ctx)
|
||||
val interceptMessageType = interceptor.interceptMessageType
|
||||
val result =
|
||||
if (interceptMessageType == null || interceptMessageType.isAssignableFrom(msg.getClass))
|
||||
interceptor.aroundReceive(ctx, msg, receiveTarget)
|
||||
else
|
||||
receiveTarget.apply(ctx, msg.asInstanceOf[I])
|
||||
deduplicate(result, ctx)
|
||||
}
|
||||
|
||||
override def receiveSignal(ctx: typed.TypedActorContext[O], signal: Signal): Behavior[O] = {
|
||||
|
|
|
|||
|
|
@ -451,6 +451,12 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
|
|||
TestProbe<Object> interceptProbe = testKit.createTestProbe();
|
||||
TestProbe<Signal> signalProbe = testKit.createTestProbe();
|
||||
BehaviorInterceptor<Command, Command> tap = new BehaviorInterceptor<Command, Command>() {
|
||||
|
||||
@Override
|
||||
public Class<Command> interceptMessageType() {
|
||||
return Command.class;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Behavior<Command> aroundReceive(TypedActorContext<Command> ctx, Command msg, ReceiveTarget<Command> target) {
|
||||
interceptProbe.ref().tell(msg);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue