diff --git a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java index 197408e0ba..e2af6b2be3 100644 --- a/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java +++ b/akka-actor-typed-tests/src/test/java/akka/actor/typed/javadsl/ActorCompile.java @@ -37,11 +37,11 @@ public class ActorCompile { Behavior actor2 = Behaviors.receive((ctx, msg) -> unhandled()); Behavior actor4 = empty(); Behavior actor5 = ignore(); - Behavior actor6 = tap((ctx, signal) -> {}, (ctx, msg) -> {}, actor5); + Behavior actor6 = tap(MyMsg.class, (ctx, signal) -> {}, (ctx, msg) -> {}, actor5); Behavior actor7 = actor6.narrow(); Behavior actor8 = setup(ctx -> { final ActorRef self = ctx.getSelf(); - return monitor(self, ignore()); + return monitor(MyMsg.class, self, ignore()); }); Behavior actor9 = widened(actor7, pf -> pf.match(MyMsgA.class, x -> x)); Behavior actor10 = Behaviors.receive((ctx, msg) -> stopped(actor4), (ctx, signal) -> same()); diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala index 358222d04f..032dcaded3 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ActorContextSpec.scala @@ -9,6 +9,7 @@ import akka.actor.typed.scaladsl.Behaviors import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } import scala.concurrent.duration._ +import scala.reflect.ClassTag object ActorSpecMessages { @@ -62,10 +63,10 @@ abstract class ActorContextSpec extends ActorTestKit with TypedAkkaSpecWithShutd import ActorSpecMessages._ - def decoration[T]: Behavior[T] ⇒ Behavior[T] + def decoration[T: ClassTag]: Behavior[T] ⇒ Behavior[T] - implicit class BehaviorDecorator[T](behavior: Behavior[T]) { - def decorate: Behavior[T] = decoration(behavior) + implicit class BehaviorDecorator[T](behavior: Behavior[T])(implicit ev: ClassTag[T]) { + def decorate: Behavior[T] = decoration[T](ev)(behavior) } "An ActorContext" must { @@ -588,25 +589,25 @@ abstract class ActorContextSpec extends ActorTestKit with TypedAkkaSpecWithShutd class NormalActorContextSpec extends ActorContextSpec { - override def decoration[T] = x ⇒ x + override def decoration[T: ClassTag] = x ⇒ x } class WidenActorContextSpec extends ActorContextSpec { - override def decoration[T] = b ⇒ b.widen { case x ⇒ x } + override def decoration[T: ClassTag] = b ⇒ b.widen { case x ⇒ x } } class DeferredActorContextSpec extends ActorContextSpec { - override def decoration[T] = b ⇒ Behaviors.setup(_ ⇒ b) + override def decoration[T: ClassTag] = b ⇒ Behaviors.setup(_ ⇒ b) } class NestedDeferredActorContextSpec extends ActorContextSpec { - override def decoration[T] = b ⇒ Behaviors.setup(_ ⇒ Behaviors.setup(_ ⇒ b)) + override def decoration[T: ClassTag] = b ⇒ Behaviors.setup(_ ⇒ Behaviors.setup(_ ⇒ b)) } class TapActorContextSpec extends ActorContextSpec { - override def decoration[T] = b ⇒ Behaviors.tap((_, _) ⇒ (), (_, _) ⇒ (), b) + override def decoration[T: ClassTag]: Behavior[T] ⇒ Behavior[T] = b ⇒ Behaviors.tap[T](b)((_, _) ⇒ (), (_, _) ⇒ ()) } diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala index 3e4f87368e..1bc0c8aa21 100644 --- a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/BehaviorSpec.scala @@ -608,6 +608,7 @@ class TapJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = { val inbox = TestInbox[Either[Signal, Command]]("tapListener") (JBehaviors.tap( + classOf[Command], pc((_, msg) ⇒ inbox.ref ! Right(msg)), ps((_, sig) ⇒ inbox.ref ! Left(sig)), super.behavior(monitor)._1), inbox) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index fe6dd92e19..6a873f5c28 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -85,7 +85,7 @@ import scala.reflect.ClassTag override def toString = s"ReceiveMessage(${LineNumbers(onMessage)})" } - def tap[T]( + def tap[T: ClassTag]( onMessage: (SAC[T], T) ⇒ _, onSignal: (SAC[T], Signal) ⇒ _, behavior: Behavior[T]): Behavior[T] = { @@ -100,7 +100,7 @@ import scala.reflect.ClassTag }, afterMessage = ConstantFun.scalaAnyThreeToThird, afterSignal = ConstantFun.scalaAnyThreeToThird, - behavior)(ClassTag(classOf[Any])) + behavior) } /** @@ -118,7 +118,7 @@ import scala.reflect.ClassTag * `afterMessage` is the message returned from `beforeMessage` (possibly * different than the incoming message). */ - def intercept[T, U <: Any: ClassTag]( + def intercept[T, U: ClassTag]( beforeMessage: (SAC[U], U) ⇒ T, beforeSignal: (SAC[T], Signal) ⇒ Boolean, afterMessage: (SAC[T], T, Behavior[T]) ⇒ Behavior[T], diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala index 942a9020da..c2ffa1b2f4 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/Behaviors.scala @@ -172,13 +172,13 @@ object Behaviors { * for logging or tracing what a certain Actor does. */ def tap[T]( + clazz: Class[T], onMessage: Procedure2[ActorContext[T], T], onSignal: Procedure2[ActorContext[T], Signal], behavior: Behavior[T]): Behavior[T] = { - BehaviorImpl.tap( + akka.actor.typed.scaladsl.Behaviors.tap[T](behavior)( (ctx, msg) ⇒ onMessage.apply(ctx.asJava, msg), - (ctx, sig) ⇒ onSignal.apply(ctx.asJava, sig), - behavior) + (ctx, sig) ⇒ onSignal.apply(ctx.asJava, sig))(ClassTag[T](clazz)) } /** @@ -187,11 +187,8 @@ object Behaviors { * wrapped behavior can evolve (i.e. return different behavior) without needing to be * wrapped in a `monitor` call again. */ - def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = { - BehaviorImpl.tap( - (ctx, msg) ⇒ monitor ! msg, - ConstantFun.scalaAnyTwoToUnit, - behavior) + def monitor[T](clazz: Class[T], monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = { + akka.actor.typed.scaladsl.Behaviors.monitor(monitor, behavior)(reflect.ClassTag(clazz)) } /** diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala index 1d37c68a3c..96f73d9e0e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/Behaviors.scala @@ -140,10 +140,21 @@ object Behaviors { * some action upon each received message or signal. It is most commonly used * for logging or tracing what a certain Actor does. */ - def tap[T]( + @deprecated("Use overloaded tap", "2.5.13") + def tap[T: ClassTag]( onMessage: (ActorContext[T], T) ⇒ _, onSignal: (ActorContext[T], Signal) ⇒ _, // FIXME use partial function here also? behavior: Behavior[T]): Behavior[T] = + tap(behavior)((ctx, msg) ⇒ onMessage(ctx, msg), (ctx, signal) ⇒ onSignal(ctx, signal)) + + /** + * This type of Behavior wraps another Behavior while allowing you to perform + * some action upon each received message or signal. It is most commonly used + * for logging or tracing what a certain Actor does. + */ + def tap[T: ClassTag](behavior: Behavior[T])( + onMessage: (ActorContext[T], T) ⇒ Unit, + onSignal: (ActorContext[T], Signal) ⇒ Unit): Behavior[T] = BehaviorImpl.tap(onMessage, onSignal, behavior) /** @@ -152,8 +163,8 @@ object Behaviors { * wrapped behavior can evolve (i.e. return different behavior) without needing to be * wrapped in a `monitor` call again. */ - def monitor[T](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = - tap((_, msg) ⇒ monitor ! msg, unitFunction, behavior) + def monitor[T: ClassTag](monitor: ActorRef[T], behavior: Behavior[T]): Behavior[T] = + tap(behavior)((_, msg) ⇒ monitor ! msg, unitFunction) /** * Wrap the given behavior with the given [[SupervisorStrategy]] for diff --git a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java index 039313fafb..a4e52a3f0e 100644 --- a/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java +++ b/akka-persistence-typed/src/test/java/akka/persistence/typed/javadsl/PersistentActorTest.java @@ -6,6 +6,7 @@ package akka.persistence.typed.javadsl; import akka.actor.typed.ActorRef; import akka.actor.typed.Behavior; +import akka.actor.typed.Signal; import akka.actor.typed.SupervisorStrategy; import akka.actor.typed.javadsl.Behaviors; import akka.japi.Pair; @@ -313,8 +314,8 @@ public class PersistentActorTest extends JUnitSuite { @Test public void workWhenWrappedInOtherBehavior() { Behavior behavior = Behaviors.supervise(counter("c6")).onFailure( - SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(1), - Duration.ofSeconds(10), 0.1) + SupervisorStrategy.restartWithBackoff(Duration.ofSeconds(1), + Duration.ofSeconds(10), 0.1) ); ActorRef c = testKit.spawn(behavior); @@ -351,5 +352,19 @@ public class PersistentActorTest extends JUnitSuite { c.tell(new StopThenLog()); probe.expectTerminated(c, Duration.ofSeconds(1)); } + + @Test + public void tapPersistentActor() { + TestProbe interceptProbe = testKit.createTestProbe(); + TestProbe signalProbe = testKit.createTestProbe(); + ActorRef c = testKit.spawn(Behaviors.tap(Command.class, + (ctx, cmd) -> interceptProbe.ref().tell(cmd), + (ctx, signal) -> signalProbe.ref().tell(signal), + counter("tap1"))); + c.tell(Increment.instance); + interceptProbe.expectMessage(Increment.instance); + signalProbe.expectNoMessage(); + } + // FIXME test with by state command handler } diff --git a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala index c90150e8df..dc83c6da85 100644 --- a/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala +++ b/akka-persistence-typed/src/test/scala/akka/persistence/typed/scaladsl/PersistentBehaviorSpec.scala @@ -4,6 +4,8 @@ package akka.persistence.typed.scaladsl +import java.util.concurrent.atomic.AtomicInteger + import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated, TypedAkkaSpecWithShutdown } import akka.persistence.snapshot.SnapshotStore @@ -182,10 +184,13 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown implicit val testSettings = TestKitSettings(system) + val pidCounter = new AtomicInteger(0) + private def nextPid(): String = s"c${pidCounter.incrementAndGet()}" + "A typed persistent actor" must { "persist an event" in { - val c = spawn(counter("c1")) + val c = spawn(counter(nextPid)) val probe = TestProbe[State] c ! Increment @@ -194,7 +199,8 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "replay stored events" in { - val c = spawn(counter("c2")) + val pid = nextPid + val c = spawn(counter(pid)) val probe = TestProbe[State] c ! Increment @@ -203,7 +209,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown c ! GetValue(probe.ref) probe.expectMessage(10.seconds, State(3, Vector(0, 1, 2))) - val c2 = spawn(counter("c2")) + val c2 = spawn(counter(pid)) c2 ! GetValue(probe.ref) probe.expectMessage(State(3, Vector(0, 1, 2))) c2 ! Increment @@ -212,7 +218,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "handle Terminated signal" in { - val c = spawn(counter("c3")) + val c = spawn(counter(nextPid)) val probe = TestProbe[State] c ! Increment c ! IncrementLater @@ -223,7 +229,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "handle receive timeout" in { - val c = spawn(counter("c4")) + val c = spawn(counter(nextPid)) val probe = TestProbe[State] c ! Increment @@ -242,7 +248,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown */ "chainable side effects with events" in { val loggingProbe = TestProbe[String] - val c = spawn(counter("c5", loggingProbe.ref)) + val c = spawn(counter(nextPid, loggingProbe.ref)) val probe = TestProbe[State] @@ -256,7 +262,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown "persist then stop" in { val loggingProbe = TestProbe[String] - val c = spawn(counter("c5a", loggingProbe.ref)) + val c = spawn(counter(nextPid, loggingProbe.ref)) val watchProbe = watcher(c) c ! IncrementThenLogThenStop @@ -266,7 +272,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown "persist(All) then stop" in { val loggingProbe = TestProbe[String] - val c = spawn(counter("c5b", loggingProbe.ref)) + val c = spawn(counter(nextPid, loggingProbe.ref)) val watchProbe = watcher(c) c ! IncrementTwiceThenLogThenStop @@ -278,7 +284,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown /** Proves that side-effects are called when emitting an empty list of events */ "chainable side effects without events" in { val loggingProbe = TestProbe[String] - val c = spawn(counter("c6", loggingProbe.ref)) + val c = spawn(counter(nextPid, loggingProbe.ref)) val probe = TestProbe[State] c ! EmptyEventsListAndThenLog @@ -290,7 +296,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown /** Proves that side-effects are called when explicitly calling Effect.none */ "chainable side effects when doing nothing (Effect.none)" in { val loggingProbe = TestProbe[String] - val c = spawn(counter("c7", loggingProbe.ref)) + val c = spawn(counter(nextPid, loggingProbe.ref)) val probe = TestProbe[State] c ! DoNothingAndThenLog @@ -301,7 +307,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown "work when wrapped in other behavior" in { val probe = TestProbe[State] - val behavior = Behaviors.supervise[Command](counter("c13")) + val behavior = Behaviors.supervise[Command](counter(nextPid)) .onFailure(SupervisorStrategy.restartWithBackoff(1.second, 10.seconds, 0.1)) val c = spawn(behavior) c ! Increment @@ -311,7 +317,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown "stop after logging (no persisting)" in { val loggingProbe = TestProbe[String] - val c: ActorRef[Command] = spawn(counter("c8", loggingProbe.ref)) + val c: ActorRef[Command] = spawn(counter(nextPid, loggingProbe.ref)) val watchProbe = watcher(c) c ! LogThenStop loggingProbe.expectMessage(firstLogging) @@ -319,9 +325,10 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "snapshot via predicate" in { + val pid = nextPid val alwaysSnapshot: Behavior[Command] = - Behaviors.setup { context ⇒ - counter("c9").snapshotWhen { (_, _, _) ⇒ true } + Behaviors.setup { _ ⇒ + counter(pid).snapshotWhen { (_, _, _) ⇒ true } } val c = spawn(alwaysSnapshot) @@ -335,7 +342,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown watchProbe.expectMessage("Terminated") val probe = TestProbe[(State, Event)]() - val c2 = spawn(counterWithProbe("c9", probe.ref)) + val c2 = spawn(counterWithProbe(pid, probe.ref)) // state should be rebuilt from snapshot, no events replayed probe.expectNoMessage() c2 ! Increment @@ -344,7 +351,8 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "check all events for snapshot in PersistAll" in { - val snapshotAtTwo = counter("c11").snapshotWhen { (s, _, _) ⇒ s.value == 2 } + val pid = nextPid + val snapshotAtTwo = counter(pid).snapshotWhen { (s, _, _) ⇒ s.value == 2 } val c: ActorRef[Command] = spawn(snapshotAtTwo) val watchProbe = watcher(c) val replyProbe = TestProbe[State]() @@ -356,7 +364,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown watchProbe.expectMessage("Terminated") val probeC2 = TestProbe[(State, Event)]() - val c2 = spawn(counterWithProbe("c11", probeC2.ref)) + val c2 = spawn(counterWithProbe(pid, probeC2.ref)) // middle event triggered all to be snapshot probeC2.expectNoMessage() c2 ! GetValue(replyProbe.ref) @@ -364,7 +372,8 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "snapshot every N sequence nrs" in { - val c = spawn(counter("c10").snapshotEvery(2)) + val pid = nextPid + val c = spawn(counter(pid).snapshotEvery(2)) val watchProbe = watcher(c) val replyProbe = TestProbe[State]() @@ -376,7 +385,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown // no snapshot should have happened val probeC2 = TestProbe[(State, Event)]() - val c2 = spawn(counterWithProbe("c10", probeC2.ref).snapshotEvery(2)) + val c2 = spawn(counterWithProbe(pid, probeC2.ref).snapshotEvery(2)) probeC2.expectMessage[(State, Event)]((State(0, Vector()), Incremented(1))) val watchProbeC2 = watcher(c2) c2 ! Increment @@ -384,7 +393,7 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown watchProbeC2.expectMessage("Terminated") val probeC3 = TestProbe[(State, Event)]() - val c3 = spawn(counterWithProbe("c10", probeC3.ref).snapshotEvery(2)) + val c3 = spawn(counterWithProbe(pid, probeC3.ref).snapshotEvery(2)) // this time it should have been snapshotted so no events to replay probeC3.expectNoMessage() c3 ! GetValue(replyProbe.ref) @@ -392,7 +401,8 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown } "snapshot every N sequence nrs when persisting multiple events" in { - val c = spawn(counter("c12").snapshotEvery(2)) + val pid = nextPid + val c = spawn(counter(pid).snapshotEvery(2)) val watchProbe = watcher(c) val replyProbe = TestProbe[State]() @@ -403,12 +413,27 @@ class PersistentBehaviorSpec extends ActorTestKit with TypedAkkaSpecWithShutdown watchProbe.expectMessage("Terminated") val probeC2 = TestProbe[(State, Event)]() - val c2 = spawn(counterWithProbe("c12", probeC2.ref).snapshotEvery(2)) + val c2 = spawn(counterWithProbe(pid, probeC2.ref).snapshotEvery(2)) probeC2.expectNoMessage() c2 ! GetValue(replyProbe.ref) replyProbe.expectMessage(State(3, Vector(0, 1, 2))) } + "wrap persistent behavior in tap" in { + val probe = TestProbe[String] + val wrapped: Behavior[Command] = Behaviors.tap(counter(nextPid))( + (_, _) ⇒ probe.ref ! "msg received", + (_, _) ⇒ () + ) + val c = spawn(wrapped) + + c ! Increment + val replyProbe = TestProbe[State]() + c ! GetValue(replyProbe.ref) + replyProbe.expectMessage(State(1, Vector(0))) + probe.expectMessage("msg received") + } + def watcher(toWatch: ActorRef[_]): TestProbe[String] = { val probe = TestProbe[String]() val w = Behaviors.setup[Any] { (ctx) ⇒