adding ActorContext.delegate (#31474)

* adding ActorContext.forward that wrapping Behavior.interpret for forwarding message between Behaviors
This commit is contained in:
GreyPlane 2022-08-23 15:19:36 +08:00 committed by GitHub
parent 563f337d51
commit 672a3fd81e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 134 additions and 7 deletions

View file

@ -0,0 +1,94 @@
/*
* Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.scaladsl
import akka.actor.UnhandledMessage
import akka.actor.testkit.typed.TestKitSettings
import akka.actor.testkit.typed.scaladsl.{ FishingOutcomes, LogCapturing, ScalaTestWithActorTestKit, TestProbe }
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.{ ActorRef, Behavior }
import org.scalatest.wordspec.AnyWordSpecLike
object ActorContextDelegateSpec {
sealed trait PingPongCommand
case object Ping extends PingPongCommand
case object Pong extends PingPongCommand
case object UnPingable extends PingPongCommand
sealed trait BehaviorTag
case object PingTag extends BehaviorTag
case object PongTag extends BehaviorTag
sealed trait Event
final case class ResponseFrom(from: BehaviorTag, cmd: PingPongCommand) extends Event
final case class ForwardTo(to: BehaviorTag) extends Event
def ping(monitor: ActorRef[Event])(implicit context: ActorContext[PingPongCommand]): Behavior[PingPongCommand] =
Behaviors.receiveMessagePartial[PingPongCommand] {
case Ping =>
monitor ! ResponseFrom(PingTag, Ping)
Behaviors.same
case msg @ Pong =>
monitor ! ForwardTo(PongTag)
context.delegate(pong(monitor), msg)
}
def pong(monitor: ActorRef[Event])(implicit context: ActorContext[PingPongCommand]): Behavior[PingPongCommand] =
Behaviors.receiveMessage[PingPongCommand] {
case msg @ Ping =>
monitor ! ForwardTo(PingTag)
context.delegate(ping(monitor), msg)
case Pong =>
monitor ! ResponseFrom(PongTag, Pong)
Behaviors.same
case msg @ UnPingable =>
monitor ! ForwardTo(PingTag)
context.delegate(ping(monitor), msg)
}
}
class ActorContextDelegateSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
import ActorContextDelegateSpec._
implicit val testSettings: TestKitSettings = TestKitSettings(system)
"The Scala DSL ActorContext delegate" must {
"delegate message by given behavior and handle resulting Behavior.same properly" in {
val probe = TestProbe[Event]()
val behv = Behaviors.setup[PingPongCommand] { implicit context =>
ping(probe.ref)
}
val ref = spawn(behv)
ref ! Pong
probe.expectMessage(ForwardTo(PongTag))
probe.expectMessage(ResponseFrom(PongTag, Pong))
ref ! Pong
probe.expectMessage(ResponseFrom(PongTag, Pong))
ref ! Ping
probe.expectMessage(ForwardTo(PingTag))
probe.expectMessage(ResponseFrom(PingTag, Ping))
}
"publish unhandled message to eventStream as UnhandledMessage and switch to delegator behavior" in {
val deadLetters = TestProbe[UnhandledMessage]("probeDeadLetters")
system.eventStream ! EventStream.Subscribe[UnhandledMessage](deadLetters.ref)
val probe = TestProbe[Event]()
val behv = Behaviors.setup[PingPongCommand] { implicit context =>
pong(probe.ref)
}
val ref = spawn(behv)
ref ! UnPingable
probe.expectMessage(ForwardTo(PingTag))
deadLetters.fishForMessage(deadLetters.remainingOrDefault) {
case UnhandledMessage(UnPingable, _, _) => FishingOutcomes.complete
case _ => FishingOutcomes.fail("unexpected message")
}
ref ! Ping
probe.expectMessage(ResponseFrom(PingTag, Ping))
}
}
}

View file

@ -0,0 +1,3 @@
#akka31474 ActorContext is marked with @DoNotInherit
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.scaladsl.ActorContext.delegate")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.actor.typed.javadsl.ActorContext.delegate")

View file

@ -9,11 +9,10 @@ import java.time.Duration
import java.util.ArrayList
import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.reflect.ClassTag
import scala.util.Try
import scala.annotation.nowarn
import scala.annotation.{ nowarn, switch }
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import akka.actor.Address
@ -203,6 +202,18 @@ import scala.util.Success
override def spawnAnonymous[U](behavior: akka.actor.typed.Behavior[U]): akka.actor.typed.ActorRef[U] =
spawnAnonymous(behavior, Props.empty)
def delegate(delegator: Behavior[T], msg: T): Behavior[T] = {
val started = Behavior.start(delegator, this)
val interpreted = msg match {
case signal: Signal => Behavior.interpretSignal(started, this, signal)
case message => Behavior.interpretMessage(started, this, message)
}
(interpreted._tag: @switch) match {
case BehaviorTags.SameBehavior => started
case BehaviorTags.UnhandledBehavior => this.onUnhandled(msg); started
case _ => interpreted
}
}
// Scala API impl
override def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] => Req)(
mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {

View file

@ -5,13 +5,12 @@
package akka.actor.typed
package internal
import scala.reflect.ClassTag
import akka.actor.typed.{ TypedActorContext => AC }
import akka.actor.typed.scaladsl.{ ActorContext => SAC }
import akka.actor.typed.{ TypedActorContext => AC }
import akka.annotation.InternalApi
import akka.util.LineNumbers
import akka.util.OptionVal
import akka.util.{ LineNumbers, OptionVal }
import scala.reflect.ClassTag
/**
* INTERNAL API

View file

@ -146,6 +146,16 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi
*/
def spawn[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U]
/**
* Delegate message and signal's execution by given [[akka.actor.typed.Behavior]]
* using [[Behavior.interpretMessage]] or [[Behavior.interpretSignal]]
*
* note: if given [[akka.actor.typed.Behavior]] resulting [[Behaviors.same]] that will cause context switching to the given behavior
* and if result is [[Behaviors.unhandled]] that will trigger the [[akka.actor.typed.scaladsl.ActorContext.onUnhandled]]
* then switching to the given behavior.
*/
def delegate(delegator: Behavior[T], msg: T): Behavior[T]
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef is a child that is already stopped.

View file

@ -129,6 +129,16 @@ trait ActorContext[T] extends TypedActorContext[T] with ClassicActorContextProvi
*/
def spawn[U](behavior: Behavior[U], name: String, props: Props = Props.empty): ActorRef[U]
/**
* Delegate message and signal's execution by given [[akka.actor.typed.Behavior]]
* using [[Behavior.interpretMessage]] or [[Behavior.interpretSignal]]
*
* note: if given [[akka.actor.typed.Behavior]] resulting [[Behaviors.same]] that will cause context switching to the given behavior
* and if result is [[Behaviors.unhandled]] that will trigger the [[akka.actor.typed.scaladsl.ActorContext.onUnhandled]]
* then switching to the given behavior.
*/
def delegate(delegator: Behavior[T], msg: T): Behavior[T]
/**
* Force the child Actor under the given name to terminate after it finishes
* processing its current message. Nothing happens if the ActorRef is a child that is already stopped.