Merge pull request #24961 from chbatey/issue-24915

Add classtag to tap/monitor for interception
This commit is contained in:
Patrik Nordwall 2018-04-23 17:34:09 +01:00 committed by GitHub
commit 3809fac7d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 98 additions and 48 deletions

View file

@ -37,11 +37,11 @@ public class ActorCompile {
Behavior<MyMsg> actor2 = Behaviors.receive((ctx, msg) -> unhandled());
Behavior<MyMsg> actor4 = empty();
Behavior<MyMsg> actor5 = ignore();
Behavior<MyMsg> actor6 = tap((ctx, signal) -> {}, (ctx, msg) -> {}, actor5);
Behavior<MyMsg> actor6 = tap(MyMsg.class, (ctx, signal) -> {}, (ctx, msg) -> {}, actor5);
Behavior<MyMsgA> actor7 = actor6.narrow();
Behavior<MyMsg> actor8 = setup(ctx -> {
final ActorRef<MyMsg> self = ctx.getSelf();
return monitor(self, ignore());
return monitor(MyMsg.class, self, ignore());
});
Behavior<MyMsg> actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x));
Behavior<MyMsg> actor10 = Behaviors.receive((ctx, msg) -> stopped(actor4), (ctx, signal) -> same());

View file

@ -9,6 +9,7 @@ import akka.actor.typed.scaladsl.Behaviors
import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe }
import scala.concurrent.duration._
import scala.reflect.ClassTag
object ActorSpecMessages {
@ -62,10 +63,10 @@ abstract class ActorContextSpec extends ActorTestKit with TypedAkkaSpecWithShutd
import ActorSpecMessages._
def decoration[T]: Behavior[T] Behavior[T]
def decoration[T: ClassTag]: Behavior[T] Behavior[T]
implicit class BehaviorDecorator[T](behavior: Behavior[T]) {
def decorate: Behavior[T] = decoration(behavior)
implicit class BehaviorDecorator[T](behavior: Behavior[T])(implicit ev: ClassTag[T]) {
def decorate: Behavior[T] = decoration[T](ev)(behavior)
}
"An ActorContext" must {
@ -588,25 +589,25 @@ abstract class ActorContextSpec extends ActorTestKit with TypedAkkaSpecWithShutd
class NormalActorContextSpec extends ActorContextSpec {
override def decoration[T] = x x
override def decoration[T: ClassTag] = x x
}
class WidenActorContextSpec extends ActorContextSpec {
override def decoration[T] = b b.widen { case x x }
override def decoration[T: ClassTag] = b b.widen { case x x }
}
class DeferredActorContextSpec extends ActorContextSpec {
override def decoration[T] = b Behaviors.setup(_ b)
override def decoration[T: ClassTag] = b Behaviors.setup(_ b)
}
class NestedDeferredActorContextSpec extends ActorContextSpec {
override def decoration[T] = b Behaviors.setup(_ Behaviors.setup(_ b))
override def decoration[T: ClassTag] = b Behaviors.setup(_ Behaviors.setup(_ b))
}
class TapActorContextSpec extends ActorContextSpec {
override def decoration[T] = b Behaviors.tap((_, _) (), (_, _) (), b)
override def decoration[T: ClassTag]: Behavior[T] Behavior[T] = b Behaviors.tap[T](b)((_, _) (), (_, _) ())
}

View file

@ -608,6 +608,7 @@ class TapJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Either[Signal, Command]]("tapListener")
(JBehaviors.tap(
classOf[Command],
pc((_, msg) inbox.ref ! Right(msg)),
ps((_, sig) inbox.ref ! Left(sig)),
super.behavior(monitor)._1), inbox)

View file

@ -85,7 +85,7 @@ import scala.reflect.ClassTag
override def toString = s"ReceiveMessage(${LineNumbers(onMessage)})"
}
def tap[T](
def tap[T: ClassTag](
onMessage: (SAC[T], T) _,
onSignal: (SAC[T], Signal) _,
behavior: Behavior[T]): Behavior[T] = {
@ -100,7 +100,7 @@ import scala.reflect.ClassTag
},
afterMessage = ConstantFun.scalaAnyThreeToThird,
afterSignal = ConstantFun.scalaAnyThreeToThird,
behavior)(ClassTag(classOf[Any]))
behavior)
}
/**
@ -118,7 +118,7 @@ import scala.reflect.ClassTag
* `afterMessage` is the message returned from `beforeMessage` (possibly
* different than the incoming message).
*/
def intercept[T, U <: Any: ClassTag](
def intercept[T, U: ClassTag](
beforeMessage: (SAC[U], U) T,
beforeSignal: (SAC[T], Signal) Boolean,
afterMessage: (SAC[T], T, Behavior[T]) Behavior[T],

View file

@ -172,13 +172,13 @@ object Behaviors {
* for logging or tracing what a certain Actor does.
*/
def tap[T](
clazz: Class[T],
onMessage: Procedure2[ActorContext[T], T],
onSignal: Procedure2[ActorContext[T], Signal],
behavior: Behavior[T]): Behavior[T] = {
BehaviorImpl.tap(
akka.actor.typed.scaladsl.Behaviors.tap[T](behavior)(
(ctx, msg) onMessage.apply(ctx.asJava, msg),
(ctx, sig) onSignal.apply(ctx.asJava, sig),
behavior)
(ctx, sig) onSignal.apply(ctx.asJava, sig))(ClassTag[T](clazz))
}
/**
@ -187,11 +187,8 @@ object Behaviors {
* wrapped behavior can evolve (i.e. return different behavior) without needing to be
* wrapped in a `monitor` call again.
*/
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = {
BehaviorImpl.tap(
(ctx, msg) monitor ! msg,
ConstantFun.scalaAnyTwoToUnit,
behavior)
def monitor[T](clazz: Class[T], monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = {
akka.actor.typed.scaladsl.Behaviors.monitor(monitor, behavior)(reflect.ClassTag(clazz))
}
/**

View file

@ -140,10 +140,21 @@ object Behaviors {
* some action upon each received message or signal. It is most commonly used
* for logging or tracing what a certain Actor does.
*/
def tap[T](
@deprecated("Use overloaded tap", "2.5.13")
def tap[T: ClassTag](
onMessage: (ActorContext[T], T) _,
onSignal: (ActorContext[T], Signal) _, // FIXME use partial function here also?
behavior: Behavior[T]): Behavior[T] =
tap(behavior)((ctx, msg) onMessage(ctx, msg), (ctx, signal) onSignal(ctx, signal))
/**
* This type of Behavior wraps another Behavior while allowing you to perform
* some action upon each received message or signal. It is most commonly used
* for logging or tracing what a certain Actor does.
*/
def tap[T: ClassTag](behavior: Behavior[T])(
onMessage: (ActorContext[T], T) Unit,
onSignal: (ActorContext[T], Signal) Unit): Behavior[T] =
BehaviorImpl.tap(onMessage, onSignal, behavior)
/**
@ -152,8 +163,8 @@ object Behaviors {
* wrapped behavior can evolve (i.e. return different behavior) without needing to be
* wrapped in a `monitor` call again.
*/
def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
tap((_, msg) monitor ! msg, unitFunction, behavior)
def monitor[T: ClassTag](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] =
tap(behavior)((_, msg) monitor ! msg, unitFunction)
/**
* Wrap the given behavior with the given [[SupervisorStrategy]] for

View file

@ -6,6 +6,7 @@ package akka.persistence.typed.javadsl;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.Signal;
import akka.actor.typed.SupervisorStrategy;
import akka.actor.typed.javadsl.Behaviors;
import akka.japi.Pair;
@ -313,8 +314,8 @@ public class PersistentActorTest extends JUnitSuite {
@Test
public void workWhenWrappedInOtherBehavior() {
Behavior<Command> behavior = Behaviors.supervise(counter("c6")).onFailure(
SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(1),
Duration.ofSeconds(10), 0.1)
SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(1),
Duration.ofSeconds(10), 0.1)
);
ActorRef<Command> c = testKit.spawn(behavior);
@ -351,5 +352,19 @@ public class PersistentActorTest extends JUnitSuite {
c.tell(new StopThenLog());
probe.expectTerminated(c, Duration.ofSeconds(1));
}
@Test
public void tapPersistentActor() {
TestProbe<Command> interceptProbe = testKit.createTestProbe();
TestProbe<Signal> signalProbe = testKit.createTestProbe();
ActorRef<Command> c = testKit.spawn(Behaviors.tap(Command.class,
(ctx, cmd) -> interceptProbe.ref().tell(cmd),
(ctx, signal) -> signalProbe.ref().tell(signal),
counter("tap1")));
c.tell(Increment.instance);
interceptProbe.expectMessage(Increment.instance);
signalProbe.expectNoMessage();
}
// FIXME test with by state command handler
}

View file

@ -4,6 +4,8 @@
package akka.persistence.typed.scaladsl
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown }
import akka.persistence.snapshot.SnapshotStore
@ -182,10 +184,13 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
implicit val testSettings = TestKitSettings(system)
val pidCounter = new AtomicInteger(0)
private def nextPid(): String = s"c${pidCounter.incrementAndGet()}"
"A typed persistent actor" must {
"persist an event" in {
val c = spawn(counter("c1"))
val c = spawn(counter(nextPid))
val probe = TestProbe[State]
c ! Increment
@ -194,7 +199,8 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
}
"replay stored events" in {
val c = spawn(counter("c2"))
val pid = nextPid
val c = spawn(counter(pid))
val probe = TestProbe[State]
c ! Increment
@ -203,7 +209,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
c ! GetValue(probe.ref)
probe.expectMessage(10.seconds, State(3, Vector(0, 1, 2)))
val c2 = spawn(counter("c2"))
val c2 = spawn(counter(pid))
c2 ! GetValue(probe.ref)
probe.expectMessage(State(3, Vector(0, 1, 2)))
c2 ! Increment
@ -212,7 +218,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
}
"handle Terminated signal" in {
val c = spawn(counter("c3"))
val c = spawn(counter(nextPid))
val probe = TestProbe[State]
c ! Increment
c ! IncrementLater
@ -223,7 +229,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
}
"handle receive timeout" in {
val c = spawn(counter("c4"))
val c = spawn(counter(nextPid))
val probe = TestProbe[State]
c ! Increment
@ -242,7 +248,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
*/
"chainable side effects with events" in {
val loggingProbe = TestProbe[String]
val c = spawn(counter("c5", loggingProbe.ref))
val c = spawn(counter(nextPid, loggingProbe.ref))
val probe = TestProbe[State]
@ -256,7 +262,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
"persist then stop" in {
val loggingProbe = TestProbe[String]
val c = spawn(counter("c5a", loggingProbe.ref))
val c = spawn(counter(nextPid, loggingProbe.ref))
val watchProbe = watcher(c)
c ! IncrementThenLogThenStop
@ -266,7 +272,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
"persist(All) then stop" in {
val loggingProbe = TestProbe[String]
val c = spawn(counter("c5b", loggingProbe.ref))
val c = spawn(counter(nextPid, loggingProbe.ref))
val watchProbe = watcher(c)
c ! IncrementTwiceThenLogThenStop
@ -278,7 +284,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
/** Proves that side-effects are called when emitting an empty list of events */
"chainable side effects without events" in {
val loggingProbe = TestProbe[String]
val c = spawn(counter("c6", loggingProbe.ref))
val c = spawn(counter(nextPid, loggingProbe.ref))
val probe = TestProbe[State]
c ! EmptyEventsListAndThenLog
@ -290,7 +296,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
/** Proves that side-effects are called when explicitly calling Effect.none */
"chainable side effects when doing nothing (Effect.none)" in {
val loggingProbe = TestProbe[String]
val c = spawn(counter("c7", loggingProbe.ref))
val c = spawn(counter(nextPid, loggingProbe.ref))
val probe = TestProbe[State]
c ! DoNothingAndThenLog
@ -301,7 +307,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
"work when wrapped in other behavior" in {
val probe = TestProbe[State]
val behavior = Behaviors.supervise[Command](counter("c13"))
val behavior = Behaviors.supervise[Command](counter(nextPid))
.onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1))
val c = spawn(behavior)
c ! Increment
@ -311,7 +317,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
"stop after logging (no persisting)" in {
val loggingProbe = TestProbe[String]
val c: ActorRef[Command] = spawn(counter("c8", loggingProbe.ref))
val c: ActorRef[Command] = spawn(counter(nextPid, loggingProbe.ref))
val watchProbe = watcher(c)
c ! LogThenStop
loggingProbe.expectMessage(firstLogging)
@ -319,9 +325,10 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
}
"snapshot via predicate" in {
val pid = nextPid
val alwaysSnapshot: Behavior[Command] =
Behaviors.setup { context
counter("c9").snapshotWhen { (_, _, _) true }
Behaviors.setup { _
counter(pid).snapshotWhen { (_, _, _) true }
}
val c = spawn(alwaysSnapshot)
@ -335,7 +342,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
watchProbe.expectMessage("Terminated")
val probe = TestProbe[(State, Event)]()
val c2 = spawn(counterWithProbe("c9", probe.ref))
val c2 = spawn(counterWithProbe(pid, probe.ref))
// state should be rebuilt from snapshot, no events replayed
probe.expectNoMessage()
c2 ! Increment
@ -344,7 +351,8 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
}
"check all events for snapshot in PersistAll" in {
val snapshotAtTwo = counter("c11").snapshotWhen { (s, _, _) s.value == 2 }
val pid = nextPid
val snapshotAtTwo = counter(pid).snapshotWhen { (s, _, _) s.value == 2 }
val c: ActorRef[Command] = spawn(snapshotAtTwo)
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
@ -356,7 +364,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
watchProbe.expectMessage("Terminated")
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(counterWithProbe("c11", probeC2.ref))
val c2 = spawn(counterWithProbe(pid, probeC2.ref))
// middle event triggered all to be snapshot
probeC2.expectNoMessage()
c2 ! GetValue(replyProbe.ref)
@ -364,7 +372,8 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
}
"snapshot every N sequence nrs" in {
val c = spawn(counter("c10").snapshotEvery(2))
val pid = nextPid
val c = spawn(counter(pid).snapshotEvery(2))
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
@ -376,7 +385,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
// no snapshot should have happened
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(counterWithProbe("c10", probeC2.ref).snapshotEvery(2))
val c2 = spawn(counterWithProbe(pid, probeC2.ref).snapshotEvery(2))
probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1)))
val watchProbeC2 = watcher(c2)
c2 ! Increment
@ -384,7 +393,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
watchProbeC2.expectMessage("Terminated")
val probeC3 = TestProbe[(State, Event)]()
val c3 = spawn(counterWithProbe("c10", probeC3.ref).snapshotEvery(2))
val c3 = spawn(counterWithProbe(pid, probeC3.ref).snapshotEvery(2))
// this time it should have been snapshotted so no events to replay
probeC3.expectNoMessage()
c3 ! GetValue(replyProbe.ref)
@ -392,7 +401,8 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
}
"snapshot every N sequence nrs when persisting multiple events" in {
val c = spawn(counter("c12").snapshotEvery(2))
val pid = nextPid
val c = spawn(counter(pid).snapshotEvery(2))
val watchProbe = watcher(c)
val replyProbe = TestProbe[State]()
@ -403,12 +413,27 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown
watchProbe.expectMessage("Terminated")
val probeC2 = TestProbe[(State, Event)]()
val c2 = spawn(counterWithProbe("c12", probeC2.ref).snapshotEvery(2))
val c2 = spawn(counterWithProbe(pid, probeC2.ref).snapshotEvery(2))
probeC2.expectNoMessage()
c2 ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(3, Vector(0, 1, 2)))
}
"wrap persistent behavior in tap" in {
val probe = TestProbe[String]
val wrapped: Behavior[Command] = Behaviors.tap(counter(nextPid))(
(_, _) probe.ref ! "msg received",
(_, _) ()
)
val c = spawn(wrapped)
c ! Increment
val replyProbe = TestProbe[State]()
c ! GetValue(replyProbe.ref)
replyProbe.expectMessage(State(1, Vector(0)))
probe.expectMessage("msg received")
}
def watcher(toWatch: ActorRef[_]): TestProbe[String] = {
val probe = TestProbe[String]()
val w = Behaviors.setup[Any] { (ctx)