From 672a3fd81ef310384ec817fa58ffa5b8f588ef96 Mon Sep 17 00:00:00 2001 From: GreyPlane <31082046+GreyPlane@users.noreply.github.com> Date: Tue, 23 Aug 2022 15:19:36 +0800 Subject: [PATCH] adding ActorContext.delegate (#31474) * adding ActorContext.forward that wrapping Behavior.interpret for forwarding message between Behaviors --- .../scaladsl/ActorContextDelegateSpec.scala | 94 +++++++++++++++++++ ...-actor-context-delegate.backwards.excludes | 3 + .../typed/internal/ActorContextImpl.scala | 15 ++- .../actor/typed/internal/BehaviorImpl.scala | 9 +- .../actor/typed/javadsl/ActorContext.scala | 10 ++ .../actor/typed/scaladsl/ActorContext.scala | 10 ++ 6 files changed, 134 insertions(+), 7 deletions(-) create mode 100644 akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextDelegateSpec.scala create mode 100644 akka-actor-typed/src/main/mima-filters/2.6.19.backwards.excludes/pr-31474-actor-context-delegate.backwards.excludes diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextDelegateSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextDelegateSpec.scala new file mode 100644 index 0000000000..fbd8e9c83b --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/scaladsl/ActorContextDelegateSpec.scala @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2022 Lightbend Inc. + */ + +package akka.actor.typed.scaladsl + +import akka.actor.UnhandledMessage +import akka.actor.testkit.typed.TestKitSettings +import akka.actor.testkit.typed.scaladsl.{ FishingOutcomes, LogCapturing, ScalaTestWithActorTestKit, TestProbe } +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.{ ActorRef, Behavior } +import org.scalatest.wordspec.AnyWordSpecLike + +object ActorContextDelegateSpec { + sealed trait PingPongCommand + case object Ping extends PingPongCommand + case object Pong extends PingPongCommand + case object UnPingable extends PingPongCommand + + sealed trait BehaviorTag + case object PingTag extends BehaviorTag + case object PongTag extends BehaviorTag + + sealed trait Event + final case class ResponseFrom(from: BehaviorTag, cmd: PingPongCommand) extends Event + final case class ForwardTo(to: BehaviorTag) extends Event + + def ping(monitor: ActorRef[Event])(implicit context: ActorContext[PingPongCommand]): Behavior[PingPongCommand] = + Behaviors.receiveMessagePartial[PingPongCommand] { + case Ping => + monitor ! ResponseFrom(PingTag, Ping) + Behaviors.same + case msg @ Pong => + monitor ! ForwardTo(PongTag) + context.delegate(pong(monitor), msg) + } + + def pong(monitor: ActorRef[Event])(implicit context: ActorContext[PingPongCommand]): Behavior[PingPongCommand] = + Behaviors.receiveMessage[PingPongCommand] { + case msg @ Ping => + monitor ! ForwardTo(PingTag) + context.delegate(ping(monitor), msg) + case Pong => + monitor ! ResponseFrom(PongTag, Pong) + Behaviors.same + case msg @ UnPingable => + monitor ! ForwardTo(PingTag) + context.delegate(ping(monitor), msg) + } +} + +class ActorContextDelegateSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { + import ActorContextDelegateSpec._ + implicit val testSettings: TestKitSettings = TestKitSettings(system) + + "The Scala DSL ActorContext delegate" must { + "delegate message by given behavior and handle resulting Behavior.same properly" in { + val probe = TestProbe[Event]() + val behv = Behaviors.setup[PingPongCommand] { implicit context => + ping(probe.ref) + } + val ref = spawn(behv) + ref ! Pong + probe.expectMessage(ForwardTo(PongTag)) + probe.expectMessage(ResponseFrom(PongTag, Pong)) + ref ! Pong + probe.expectMessage(ResponseFrom(PongTag, Pong)) + ref ! Ping + probe.expectMessage(ForwardTo(PingTag)) + probe.expectMessage(ResponseFrom(PingTag, Ping)) + } + + "publish unhandled message to eventStream as UnhandledMessage and switch to delegator behavior" in { + val deadLetters = TestProbe[UnhandledMessage]("probeDeadLetters") + system.eventStream ! EventStream.Subscribe[UnhandledMessage](deadLetters.ref) + + val probe = TestProbe[Event]() + val behv = Behaviors.setup[PingPongCommand] { implicit context => + pong(probe.ref) + } + val ref = spawn(behv) + + ref ! UnPingable + probe.expectMessage(ForwardTo(PingTag)) + deadLetters.fishForMessage(deadLetters.remainingOrDefault) { + case UnhandledMessage(UnPingable, _, _) => FishingOutcomes.complete + case _ => FishingOutcomes.fail("unexpected message") + } + ref ! Ping + probe.expectMessage(ResponseFrom(PingTag, Ping)) + } + } + +} diff --git a/akka-actor-typed/src/main/mima-filters/2.6.19.backwards.excludes/pr-31474-actor-context-delegate.backwards.excludes b/akka-actor-typed/src/main/mima-filters/2.6.19.backwards.excludes/pr-31474-actor-context-delegate.backwards.excludes new file mode 100644 index 0000000000..79c2b4a676 --- /dev/null +++ b/akka-actor-typed/src/main/mima-filters/2.6.19.backwards.excludes/pr-31474-actor-context-delegate.backwards.excludes @@ -0,0 +1,3 @@ +#akka31474 ActorContext is marked with @DoNotInherit +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.ActorContext.delegate") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.ActorContext.delegate") \ No newline at end of file diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index 5c86f90bba..36f8c79a1e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -9,11 +9,10 @@ import java.time.Duration import java.util.ArrayList import java.util.Optional import java.util.concurrent.CompletionStage - import scala.concurrent.{ ExecutionContextExecutor, Future } import scala.reflect.ClassTag import scala.util.Try -import scala.annotation.nowarn +import scala.annotation.{ nowarn, switch } import org.slf4j.Logger import org.slf4j.LoggerFactory import akka.actor.Address @@ -203,6 +202,18 @@ import scala.util.Success override def spawnAnonymous[U](behavior: akka.actor.typed.Behavior[U]): akka.actor.typed.ActorRef[U] = spawnAnonymous(behavior, Props.empty) + def delegate(delegator: Behavior[T], msg: T): Behavior[T] = { + val started = Behavior.start(delegator, this) + val interpreted = msg match { + case signal: Signal => Behavior.interpretSignal(started, this, signal) + case message => Behavior.interpretMessage(started, this, message) + } + (interpreted._tag: @switch) match { + case BehaviorTags.SameBehavior => started + case BehaviorTags.UnhandledBehavior => this.onUnhandled(msg); started + case _ => interpreted + } + } // Scala API impl override def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] => Req)( mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index 2e70b837ba..283d83a6a1 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -5,13 +5,12 @@ package akka.actor.typed package internal -import scala.reflect.ClassTag - -import akka.actor.typed.{ TypedActorContext => AC } import akka.actor.typed.scaladsl.{ ActorContext => SAC } +import akka.actor.typed.{ TypedActorContext => AC } import akka.annotation.InternalApi -import akka.util.LineNumbers -import akka.util.OptionVal +import akka.util.{ LineNumbers, OptionVal } + +import scala.reflect.ClassTag /** * INTERNAL API diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala index 1f0827a775..cd166cf0a7 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/javadsl/ActorContext.scala @@ -146,6 +146,16 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi */ def spawn[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] + /** + * Delegate message and signal's execution by given [[akka.actor.typed.Behavior]] + * using [[Behavior.interpretMessage]] or [[Behavior.interpretSignal]] + * + * note: if given [[akka.actor.typed.Behavior]] resulting [[Behaviors.same]] that will cause context switching to the given behavior + * and if result is [[Behaviors.unhandled]] that will trigger the [[akka.actor.typed.scaladsl.ActorContext.onUnhandled]] + * then switching to the given behavior. + */ + def delegate(delegator: Behavior[T], msg: T): Behavior[T] + /** * Force the child Actor under the given name to terminate after it finishes * processing its current message. Nothing happens if the ActorRef is a child that is already stopped. diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala index b50967986d..ed7c50b1ca 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/scaladsl/ActorContext.scala @@ -129,6 +129,16 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi */ def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U] + /** + * Delegate message and signal's execution by given [[akka.actor.typed.Behavior]] + * using [[Behavior.interpretMessage]] or [[Behavior.interpretSignal]] + * + * note: if given [[akka.actor.typed.Behavior]] resulting [[Behaviors.same]] that will cause context switching to the given behavior + * and if result is [[Behaviors.unhandled]] that will trigger the [[akka.actor.typed.scaladsl.ActorContext.onUnhandled]] + * then switching to the given behavior. + */ + def delegate(delegator: Behavior[T], msg: T): Behavior[T] + /** * Force the child Actor under the given name to terminate after it finishes * processing its current message. Nothing happens if the ActorRef is a child that is already stopped.