parent
cc738def48
commit
d74426a255
31 changed files with 136 additions and 134 deletions
|
|
@ -126,7 +126,7 @@ private[akka] final class FunctionRef[-T](
|
|||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* An [[ActorContext]] for synchronous execution of a [[Behavior]] that
|
||||
* A [[TypedActorContext]] for synchronous execution of a [[Behavior]] that
|
||||
* provides only stubs for the effects an Actor can perform and replaces
|
||||
* created child Actors by a synchronous Inbox (see `Inbox.sync`).
|
||||
*/
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package akka.actor.testkit.typed.scaladsl
|
|||
|
||||
import akka.actor.testkit.typed.internal.BehaviorTestKitImpl
|
||||
import akka.actor.testkit.typed.{ CapturedLogEvent, Effect }
|
||||
import akka.actor.typed.{ ActorRef, Behavior, Signal }
|
||||
import akka.actor.typed.{ ActorRef, Behavior, Signal, TypedActorContext }
|
||||
import akka.annotation.DoNotInherit
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
|
|
@ -38,7 +38,7 @@ object BehaviorTestKit {
|
|||
trait BehaviorTestKit[T] {
|
||||
|
||||
// FIXME it is weird that this is public but it is used in BehaviorSpec, could we avoid that?
|
||||
private[akka] def context: akka.actor.typed.ActorContext[T]
|
||||
private[akka] def context: TypedActorContext[T]
|
||||
|
||||
/**
|
||||
* Requests the oldest [[Effect]] or [[akka.actor.testkit.typed.Effect.NoEffects]] if no effects
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.actor.typed.javadsl;
|
||||
|
||||
import akka.actor.typed.*;
|
||||
import akka.actor.typed.ActorContext;
|
||||
import akka.actor.typed.TypedActorContext;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
|
|
@ -39,12 +39,12 @@ public class ActorCompile {
|
|||
Behavior<MyMsg> actor5 = ignore();
|
||||
Behavior<MyMsg> actor6 = intercept(new BehaviorInterceptor<MyMsg, MyMsg>() {
|
||||
@Override
|
||||
public Behavior<MyMsg> aroundReceive(ActorContext<MyMsg> context, MyMsg message, ReceiveTarget<MyMsg> target) {
|
||||
public Behavior<MyMsg> aroundReceive(TypedActorContext<MyMsg> context, MyMsg message, ReceiveTarget<MyMsg> target) {
|
||||
return target.apply(context, message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Behavior<MyMsg> aroundSignal(ActorContext<MyMsg> context, Signal signal, SignalTarget<MyMsg> target) {
|
||||
public Behavior<MyMsg> aroundSignal(TypedActorContext<MyMsg> context, Signal signal, SignalTarget<MyMsg> target) {
|
||||
return target.apply(context, signal);
|
||||
}
|
||||
}, actor5);
|
||||
|
|
@ -84,12 +84,12 @@ public class ActorCompile {
|
|||
static class MyBehavior extends ExtensibleBehavior<MyMsg> {
|
||||
|
||||
@Override
|
||||
public Behavior<MyMsg> receiveSignal(ActorContext<MyMsg> context, Signal message) throws Exception {
|
||||
public Behavior<MyMsg> receiveSignal(TypedActorContext<MyMsg> context, Signal message) throws Exception {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Behavior<MyMsg> receive(ActorContext<MyMsg> context, MyMsg message) throws Exception {
|
||||
public Behavior<MyMsg> receive(TypedActorContext<MyMsg> context, MyMsg message) throws Exception {
|
||||
ActorRef<String> adapter = context.asJava().messageAdapter(String.class, s -> new MyMsgB(s.toUpperCase()));
|
||||
return this;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -636,9 +636,9 @@ class InterceptActorContextSpec extends ActorContextSpec {
|
|||
import BehaviorInterceptor._
|
||||
|
||||
def tap[T] = new BehaviorInterceptor[T, T] {
|
||||
override def aroundReceive(context: ActorContext[T], message: T, target: ReceiveTarget[T]): Behavior[T] =
|
||||
override def aroundReceive(context: TypedActorContext[T], message: T, target: ReceiveTarget[T]): Behavior[T] =
|
||||
target(context, message)
|
||||
override def aroundSignal(context: ActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] =
|
||||
override def aroundSignal(context: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] =
|
||||
target(context, signal)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -20,30 +20,30 @@ import org.scalatest.WordSpecLike
|
|||
|
||||
object BehaviorSpec {
|
||||
sealed trait Command {
|
||||
def expectedResponse(context: ActorContext[Command]): Seq[Event] = Nil
|
||||
def expectedResponse(context: TypedActorContext[Command]): Seq[Event] = Nil
|
||||
}
|
||||
case object GetSelf extends Command {
|
||||
override def expectedResponse(context: ActorContext[Command]): Seq[Event] = Self(context.asScala.self) :: Nil
|
||||
override def expectedResponse(context: TypedActorContext[Command]): Seq[Event] = Self(context.asScala.self) :: Nil
|
||||
}
|
||||
// Behavior under test must return Unhandled
|
||||
case object Miss extends Command {
|
||||
override def expectedResponse(context: ActorContext[Command]): Seq[Event] = Missed :: Nil
|
||||
override def expectedResponse(context: TypedActorContext[Command]): Seq[Event] = Missed :: Nil
|
||||
}
|
||||
// Behavior under test must return same
|
||||
case object Ignore extends Command {
|
||||
override def expectedResponse(context: ActorContext[Command]): Seq[Event] = Ignored :: Nil
|
||||
override def expectedResponse(context: TypedActorContext[Command]): Seq[Event] = Ignored :: Nil
|
||||
}
|
||||
case object Ping extends Command {
|
||||
override def expectedResponse(context: ActorContext[Command]): Seq[Event] = Pong :: Nil
|
||||
override def expectedResponse(context: TypedActorContext[Command]): Seq[Event] = Pong :: Nil
|
||||
}
|
||||
case object Swap extends Command {
|
||||
override def expectedResponse(context: ActorContext[Command]): Seq[Event] = Swapped :: Nil
|
||||
override def expectedResponse(context: TypedActorContext[Command]): Seq[Event] = Swapped :: Nil
|
||||
}
|
||||
case class GetState()(s: State) extends Command {
|
||||
override def expectedResponse(context: ActorContext[Command]): Seq[Event] = s :: Nil
|
||||
override def expectedResponse(context: TypedActorContext[Command]): Seq[Event] = s :: Nil
|
||||
}
|
||||
case class AuxPing(id: Int) extends Command {
|
||||
override def expectedResponse(context: ActorContext[Command]): Seq[Event] = Pong :: Nil
|
||||
override def expectedResponse(context: TypedActorContext[Command]): Seq[Event] = Pong :: Nil
|
||||
}
|
||||
case object Stop extends Command
|
||||
|
||||
|
|
@ -512,12 +512,12 @@ class InterceptScalaBehaviorSpec extends ImmutableWithSignalScalaBehaviorSpec wi
|
|||
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
|
||||
val inbox = TestInbox[Either[Signal, Command]]("tapListener")
|
||||
val tap = new BehaviorInterceptor[Command, Command] {
|
||||
override def aroundReceive(context: ActorContext[Command], message: Command, target: ReceiveTarget[Command]): Behavior[Command] = {
|
||||
override def aroundReceive(context: TypedActorContext[Command], message: Command, target: ReceiveTarget[Command]): Behavior[Command] = {
|
||||
inbox.ref ! Right(message)
|
||||
target(context, message)
|
||||
}
|
||||
|
||||
override def aroundSignal(context: ActorContext[Command], signal: Signal, target: SignalTarget[Command]): Behavior[Command] = {
|
||||
override def aroundSignal(context: TypedActorContext[Command], signal: Signal, target: SignalTarget[Command]): Behavior[Command] = {
|
||||
inbox.ref ! Left(signal)
|
||||
target(context, signal)
|
||||
}
|
||||
|
|
@ -625,12 +625,12 @@ class TapJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse
|
|||
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
|
||||
val inbox = TestInbox[Either[Signal, Command]]("tapListener")
|
||||
val tap = new BehaviorInterceptor[Command, Command] {
|
||||
override def aroundReceive(context: ActorContext[Command], message: Command, target: ReceiveTarget[Command]): Behavior[Command] = {
|
||||
override def aroundReceive(context: TypedActorContext[Command], message: Command, target: ReceiveTarget[Command]): Behavior[Command] = {
|
||||
inbox.ref ! Right(message)
|
||||
target(context, message)
|
||||
}
|
||||
|
||||
override def aroundSignal(context: ActorContext[Command], signal: Signal, target: SignalTarget[Command]): Behavior[Command] = {
|
||||
override def aroundSignal(context: TypedActorContext[Command], signal: Signal, target: SignalTarget[Command]): Behavior[Command] = {
|
||||
inbox.ref ! Left(signal)
|
||||
target(context, signal)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,14 +32,14 @@ class InterceptSpec extends ScalaTestWithActorTestKit(
|
|||
implicit val untypedSystem = system.toUntyped
|
||||
|
||||
private def snitchingInterceptor(probe: ActorRef[String]) = new BehaviorInterceptor[String, String] {
|
||||
override def aroundReceive(context: ActorContext[String], message: String, target: ReceiveTarget[String]): Behavior[String] = {
|
||||
override def aroundReceive(context: TypedActorContext[String], message: String, target: ReceiveTarget[String]): Behavior[String] = {
|
||||
probe ! ("before " + message)
|
||||
val b = target(context, message)
|
||||
probe ! ("after " + message)
|
||||
b
|
||||
}
|
||||
|
||||
override def aroundSignal(context: ActorContext[String], signal: Signal, target: SignalTarget[String]): Behavior[String] = {
|
||||
override def aroundSignal(context: TypedActorContext[String], signal: Signal, target: SignalTarget[String]): Behavior[String] = {
|
||||
target(context, signal)
|
||||
}
|
||||
|
||||
|
|
@ -176,14 +176,14 @@ class InterceptSpec extends ScalaTestWithActorTestKit(
|
|||
"allow an interceptor to replace started behavior" in {
|
||||
val interceptor = new BehaviorInterceptor[String, String] {
|
||||
|
||||
override def aroundStart(context: ActorContext[String], target: PreStartTarget[String]): Behavior[String] = {
|
||||
override def aroundStart(context: TypedActorContext[String], target: PreStartTarget[String]): Behavior[String] = {
|
||||
Behaviors.stopped
|
||||
}
|
||||
|
||||
def aroundReceive(context: ActorContext[String], message: String, target: ReceiveTarget[String]): Behavior[String] =
|
||||
def aroundReceive(context: TypedActorContext[String], message: String, target: ReceiveTarget[String]): Behavior[String] =
|
||||
target(context, message)
|
||||
|
||||
def aroundSignal(context: ActorContext[String], signal: Signal, target: SignalTarget[String]): Behavior[String] =
|
||||
def aroundSignal(context: TypedActorContext[String], signal: Signal, target: SignalTarget[String]): Behavior[String] =
|
||||
target(context, signal)
|
||||
}
|
||||
|
||||
|
|
@ -282,14 +282,14 @@ class InterceptSpec extends ScalaTestWithActorTestKit(
|
|||
}
|
||||
|
||||
val poisonInterceptor = new BehaviorInterceptor[Any, Msg] {
|
||||
override def aroundReceive(context: ActorContext[Any], message: Any, target: ReceiveTarget[Msg]): Behavior[Msg] =
|
||||
override def aroundReceive(context: TypedActorContext[Any], message: Any, target: ReceiveTarget[Msg]): Behavior[Msg] =
|
||||
message match {
|
||||
case MyPoisonPill ⇒ Behaviors.stopped
|
||||
case m: Msg ⇒ target(context, m)
|
||||
case _ ⇒ Behaviors.unhandled
|
||||
}
|
||||
|
||||
override def aroundSignal(context: ActorContext[Any], signal: Signal, target: SignalTarget[Msg]): Behavior[Msg] =
|
||||
override def aroundSignal(context: TypedActorContext[Any], signal: Signal, target: SignalTarget[Msg]): Behavior[Msg] =
|
||||
target.apply(context, signal)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -804,10 +804,10 @@ class SupervisionSpec extends ScalaTestWithActorTestKit(
|
|||
// irrelevant for test case but needed to use intercept in the pyramid of doom below
|
||||
val whateverInterceptor = new BehaviorInterceptor[String, String] {
|
||||
// identity intercept
|
||||
override def aroundReceive(context: ActorContext[String], message: String, target: ReceiveTarget[String]): Behavior[String] =
|
||||
override def aroundReceive(context: TypedActorContext[String], message: String, target: ReceiveTarget[String]): Behavior[String] =
|
||||
target(context, message)
|
||||
|
||||
override def aroundSignal(context: ActorContext[String], signal: Signal, target: SignalTarget[String]): Behavior[String] =
|
||||
override def aroundSignal(context: TypedActorContext[String], signal: Signal, target: SignalTarget[String]): Behavior[String] =
|
||||
target(context, signal)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -38,11 +38,11 @@ class StopSpec extends ScalaTestWithActorTestKit with WordSpecLike {
|
|||
val ref = spawn(Behaviors.setup[AnyRef] { _ ⇒
|
||||
Behaviors.intercept(
|
||||
new BehaviorInterceptor[AnyRef, AnyRef] {
|
||||
override def aroundReceive(context: typed.ActorContext[AnyRef], message: AnyRef, target: ReceiveTarget[AnyRef]): Behavior[AnyRef] = {
|
||||
override def aroundReceive(context: typed.TypedActorContext[AnyRef], message: AnyRef, target: ReceiveTarget[AnyRef]): Behavior[AnyRef] = {
|
||||
target(context, message)
|
||||
}
|
||||
|
||||
override def aroundSignal(context: typed.ActorContext[AnyRef], signal: Signal, target: SignalTarget[AnyRef]): Behavior[AnyRef] = {
|
||||
override def aroundSignal(context: typed.TypedActorContext[AnyRef], signal: Signal, target: SignalTarget[AnyRef]): Behavior[AnyRef] = {
|
||||
target(context, signal)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ abstract class ActorSystem[-T] extends ActorRef[T] with Extensions { this: Inter
|
|||
/**
|
||||
* A [[akka.actor.typed.Logger]] that can be used to emit log messages
|
||||
* without specifying a more detailed source. Typically it is desirable to
|
||||
* use the dedicated `Logger` available from each Actor’s [[ActorContext]]
|
||||
* use the dedicated `Logger` available from each Actor’s [[TypedActorContext]]
|
||||
* as that ties the log entries to the actor.
|
||||
*/
|
||||
def log: Logger
|
||||
|
|
|
|||
|
|
@ -90,7 +90,7 @@ abstract class ExtensibleBehavior[T] extends Behavior[T] {
|
|||
* the special objects with real Behaviors.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def receive(ctx: ActorContext[T], msg: T): Behavior[T]
|
||||
def receive(ctx: TypedActorContext[T], msg: T): Behavior[T]
|
||||
|
||||
/**
|
||||
* Process an incoming [[Signal]] and return the next behavior. This means
|
||||
|
|
@ -108,7 +108,7 @@ abstract class ExtensibleBehavior[T] extends Behavior[T] {
|
|||
* the special objects with real Behaviors.
|
||||
*/
|
||||
@throws(classOf[Exception])
|
||||
def receiveSignal(ctx: ActorContext[T], msg: Signal): Behavior[T]
|
||||
def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T]
|
||||
}
|
||||
|
||||
object Behavior {
|
||||
|
|
@ -219,7 +219,7 @@ object Behavior {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] val unhandledSignal: PartialFunction[(ActorContext[Nothing], Signal), Behavior[Nothing]] = {
|
||||
@InternalApi private[akka] val unhandledSignal: PartialFunction[(TypedActorContext[Nothing], Signal), Behavior[Nothing]] = {
|
||||
case (_, _) ⇒ UnhandledBehavior
|
||||
}
|
||||
|
||||
|
|
@ -229,14 +229,14 @@ object Behavior {
|
|||
*/
|
||||
@InternalApi
|
||||
private[akka] abstract class DeferredBehavior[T] extends Behavior[T] {
|
||||
def apply(ctx: ActorContext[T]): Behavior[T]
|
||||
def apply(ctx: TypedActorContext[T]): Behavior[T]
|
||||
}
|
||||
/** INTERNAL API */
|
||||
@InternalApi
|
||||
private[akka] object DeferredBehavior {
|
||||
def apply[T](factory: SAC[T] ⇒ Behavior[T]): Behavior[T] =
|
||||
new DeferredBehavior[T] {
|
||||
def apply(ctx: ActorContext[T]): Behavior[T] = factory(ctx.asScala)
|
||||
def apply(ctx: TypedActorContext[T]): Behavior[T] = factory(ctx.asScala)
|
||||
override def toString: String = s"Deferred(${LineNumbers(factory)})"
|
||||
}
|
||||
}
|
||||
|
|
@ -288,7 +288,7 @@ object Behavior {
|
|||
* message or signal.
|
||||
*/
|
||||
@tailrec
|
||||
def canonicalize[T](behavior: Behavior[T], current: Behavior[T], ctx: ActorContext[T]): Behavior[T] =
|
||||
def canonicalize[T](behavior: Behavior[T], current: Behavior[T], ctx: TypedActorContext[T]): Behavior[T] =
|
||||
behavior match {
|
||||
case SameBehavior ⇒ current
|
||||
case UnhandledBehavior ⇒ current
|
||||
|
|
@ -305,7 +305,7 @@ object Behavior {
|
|||
*/
|
||||
@InternalApi
|
||||
@tailrec
|
||||
private[akka] def wrap[T, U](currentBehavior: Behavior[_], nextBehavior: Behavior[T], ctx: ActorContext[T])(f: Behavior[T] ⇒ Behavior[U]): Behavior[U] =
|
||||
private[akka] def wrap[T, U](currentBehavior: Behavior[_], nextBehavior: Behavior[T], ctx: TypedActorContext[T])(f: Behavior[T] ⇒ Behavior[U]): Behavior[U] =
|
||||
nextBehavior match {
|
||||
case SameBehavior | `currentBehavior` ⇒ same
|
||||
case UnhandledBehavior ⇒ unhandled
|
||||
|
|
@ -318,13 +318,13 @@ object Behavior {
|
|||
* Starts deferred behavior and nested deferred behaviors until all deferred behaviors in the stack are started
|
||||
* and then the resulting behavior is returned.
|
||||
*/
|
||||
def start[T](behavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
|
||||
def start[T](behavior: Behavior[T], ctx: TypedActorContext[T]): Behavior[T] = {
|
||||
// TODO can this be made @tailrec?
|
||||
behavior match {
|
||||
case innerDeferred: DeferredBehavior[T] ⇒ start(innerDeferred(ctx), ctx)
|
||||
case wrapped: WrappingBehavior[T, Any] @unchecked ⇒
|
||||
// make sure that a deferred behavior wrapped inside some other behavior is also started
|
||||
val startedInner = start(wrapped.nestedBehavior, ctx.asInstanceOf[ActorContext[Any]])
|
||||
val startedInner = start(wrapped.nestedBehavior, ctx.asInstanceOf[TypedActorContext[Any]])
|
||||
if (startedInner eq wrapped.nestedBehavior) wrapped
|
||||
else wrapped.replaceNested(startedInner)
|
||||
case _ ⇒ behavior
|
||||
|
|
@ -390,13 +390,13 @@ object Behavior {
|
|||
/**
|
||||
* Execute the behavior with the given message
|
||||
*/
|
||||
def interpretMessage[T](behavior: Behavior[T], ctx: ActorContext[T], msg: T): Behavior[T] =
|
||||
def interpretMessage[T](behavior: Behavior[T], ctx: TypedActorContext[T], msg: T): Behavior[T] =
|
||||
interpret(behavior, ctx, msg)
|
||||
|
||||
/**
|
||||
* Execute the behavior with the given signal
|
||||
*/
|
||||
def interpretSignal[T](behavior: Behavior[T], ctx: ActorContext[T], signal: Signal): Behavior[T] = {
|
||||
def interpretSignal[T](behavior: Behavior[T], ctx: TypedActorContext[T], signal: Signal): Behavior[T] = {
|
||||
val result = interpret(behavior, ctx, signal)
|
||||
// we need to throw here to allow supervision of deathpact exception
|
||||
signal match {
|
||||
|
|
@ -405,7 +405,7 @@ object Behavior {
|
|||
}
|
||||
}
|
||||
|
||||
private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] = {
|
||||
private def interpret[T](behavior: Behavior[T], ctx: TypedActorContext[T], msg: Any): Behavior[T] = {
|
||||
behavior match {
|
||||
case null ⇒ throw new InvalidMessageException("[null] is not an allowed behavior")
|
||||
case SameBehavior | UnhandledBehavior ⇒
|
||||
|
|
@ -430,7 +430,7 @@ object Behavior {
|
|||
* Execute the behavior with the given messages (or signals).
|
||||
* The returned [[Behavior]] from each processed message is used for the next message.
|
||||
*/
|
||||
@InternalApi private[akka] def interpretMessages[T](behavior: Behavior[T], ctx: ActorContext[T], messages: Iterator[T]): Behavior[T] = {
|
||||
@InternalApi private[akka] def interpretMessages[T](behavior: Behavior[T], ctx: TypedActorContext[T], messages: Iterator[T]): Behavior[T] = {
|
||||
@tailrec def interpretOne(b: Behavior[T]): Behavior[T] = {
|
||||
val b2 = Behavior.start(b, ctx)
|
||||
if (!Behavior.isAlive(b2) || !messages.hasNext) b2
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ abstract class BehaviorInterceptor[O, I] {
|
|||
* @return The returned behavior will be the "started" behavior of the actor used to accept
|
||||
* the next message or signal.
|
||||
*/
|
||||
def aroundStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] =
|
||||
def aroundStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Behavior[I] =
|
||||
target.start(ctx)
|
||||
|
||||
/**
|
||||
|
|
@ -33,7 +33,7 @@ abstract class BehaviorInterceptor[O, I] {
|
|||
*
|
||||
* @return The behavior for next message or signal
|
||||
*/
|
||||
def aroundReceive(ctx: ActorContext[O], msg: O, target: ReceiveTarget[I]): Behavior[I]
|
||||
def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[I]): Behavior[I]
|
||||
|
||||
/**
|
||||
* Intercept a signal sent to the running actor. Pass the signal on to the next behavior
|
||||
|
|
@ -41,7 +41,7 @@ abstract class BehaviorInterceptor[O, I] {
|
|||
*
|
||||
* @return The behavior for next message or signal
|
||||
*/
|
||||
def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I]
|
||||
def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I]
|
||||
|
||||
/**
|
||||
* @return `true` if this behavior logically the same as another behavior interceptor and can therefore be eliminated
|
||||
|
|
@ -61,7 +61,7 @@ object BehaviorInterceptor {
|
|||
*/
|
||||
@DoNotInherit
|
||||
trait PreStartTarget[T] {
|
||||
def start(ctx: ActorContext[_]): Behavior[T]
|
||||
def start(ctx: TypedActorContext[_]): Behavior[T]
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -71,7 +71,7 @@ object BehaviorInterceptor {
|
|||
*/
|
||||
@DoNotInherit
|
||||
trait ReceiveTarget[T] {
|
||||
def apply(ctx: ActorContext[_], msg: T): Behavior[T]
|
||||
def apply(ctx: TypedActorContext[_], msg: T): Behavior[T]
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -82,7 +82,7 @@ object BehaviorInterceptor {
|
|||
* is taking place.
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] def signalRestart(ctx: ActorContext[_]): Unit
|
||||
private[akka] def signalRestart(ctx: TypedActorContext[_]): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -92,7 +92,7 @@ object BehaviorInterceptor {
|
|||
*/
|
||||
@DoNotInherit
|
||||
trait SignalTarget[T] {
|
||||
def apply(ctx: ActorContext[_], signal: Signal): Behavior[T]
|
||||
def apply(ctx: TypedActorContext[_], signal: Signal): Behavior[T]
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import akka.annotation.ApiMayChange
|
|||
*/
|
||||
@DoNotInherit
|
||||
@ApiMayChange
|
||||
trait ActorContext[T] {
|
||||
trait TypedActorContext[T] {
|
||||
// this should be a pure interface, i.e. only abstract methods
|
||||
|
||||
/**
|
||||
|
|
@ -25,7 +25,7 @@ import akka.util.JavaDurationConverters._
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi private[akka] trait ActorContextImpl[T] extends ActorContext[T] with javadsl.ActorContext[T] with scaladsl.ActorContext[T] {
|
||||
@InternalApi private[akka] trait ActorContextImpl[T] extends TypedActorContext[T] with javadsl.ActorContext[T] with scaladsl.ActorContext[T] {
|
||||
|
||||
private var messageAdapterRef: OptionVal[ActorRef[Any]] = OptionVal.None
|
||||
private var _messageAdapters: List[(Class[_], Any ⇒ T)] = Nil
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package internal
|
|||
|
||||
import akka.util.{ LineNumbers }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.actor.typed.{ ActorContext ⇒ AC }
|
||||
import akka.actor.typed.{ TypedActorContext ⇒ AC }
|
||||
import akka.actor.typed.scaladsl.{ ActorContext ⇒ SAC }
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package akka.actor.typed.internal
|
|||
import akka.actor.typed
|
||||
import akka.actor.typed.Behavior.{ SameBehavior, UnhandledBehavior }
|
||||
import akka.actor.typed.internal.TimerSchedulerImpl.TimerMsg
|
||||
import akka.actor.typed.{ ActorContext, ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, PreRestart, Signal }
|
||||
import akka.actor.typed.{ TypedActorContext, ActorRef, Behavior, BehaviorInterceptor, ExtensibleBehavior, PreRestart, Signal }
|
||||
import akka.annotation.InternalApi
|
||||
import akka.util.LineNumbers
|
||||
|
||||
|
|
@ -39,26 +39,26 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
|
|||
import BehaviorInterceptor._
|
||||
|
||||
private val preStartTarget: PreStartTarget[I] = new PreStartTarget[I] {
|
||||
override def start(ctx: ActorContext[_]): Behavior[I] = {
|
||||
Behavior.start[I](nestedBehavior, ctx.asInstanceOf[ActorContext[I]])
|
||||
override def start(ctx: TypedActorContext[_]): Behavior[I] = {
|
||||
Behavior.start[I](nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]])
|
||||
}
|
||||
}
|
||||
|
||||
private val receiveTarget: ReceiveTarget[I] = new ReceiveTarget[I] {
|
||||
override def apply(ctx: ActorContext[_], msg: I): Behavior[I] =
|
||||
Behavior.interpretMessage(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], msg)
|
||||
override def apply(ctx: TypedActorContext[_], msg: I): Behavior[I] =
|
||||
Behavior.interpretMessage(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], msg)
|
||||
|
||||
override def signalRestart(ctx: ActorContext[_]): Unit =
|
||||
Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], PreRestart)
|
||||
override def signalRestart(ctx: TypedActorContext[_]): Unit =
|
||||
Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], PreRestart)
|
||||
}
|
||||
|
||||
private val signalTarget = new SignalTarget[I] {
|
||||
override def apply(ctx: ActorContext[_], signal: Signal): Behavior[I] =
|
||||
Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[ActorContext[I]], signal)
|
||||
override def apply(ctx: TypedActorContext[_], signal: Signal): Behavior[I] =
|
||||
Behavior.interpretSignal(nestedBehavior, ctx.asInstanceOf[TypedActorContext[I]], signal)
|
||||
}
|
||||
|
||||
// invoked pre-start to start/de-duplicate the initial behavior stack
|
||||
def preStart(ctx: typed.ActorContext[O]): Behavior[O] = {
|
||||
def preStart(ctx: typed.TypedActorContext[O]): Behavior[O] = {
|
||||
val started = interceptor.aroundStart(ctx, preStartTarget)
|
||||
deduplicate(started, ctx)
|
||||
}
|
||||
|
|
@ -66,18 +66,18 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
|
|||
override def replaceNested(newNested: Behavior[I]): Behavior[O] =
|
||||
new InterceptorImpl(interceptor, newNested)
|
||||
|
||||
override def receive(ctx: typed.ActorContext[O], msg: O): Behavior[O] = {
|
||||
override def receive(ctx: typed.TypedActorContext[O], msg: O): Behavior[O] = {
|
||||
val interceptedResult = interceptor.aroundReceive(ctx, msg, receiveTarget)
|
||||
deduplicate(interceptedResult, ctx)
|
||||
}
|
||||
|
||||
override def receiveSignal(ctx: typed.ActorContext[O], signal: Signal): Behavior[O] = {
|
||||
override def receiveSignal(ctx: typed.TypedActorContext[O], signal: Signal): Behavior[O] = {
|
||||
val interceptedResult = interceptor.aroundSignal(ctx, signal, signalTarget)
|
||||
deduplicate(interceptedResult, ctx)
|
||||
}
|
||||
|
||||
private def deduplicate(interceptedResult: Behavior[I], ctx: ActorContext[O]): Behavior[O] = {
|
||||
val started = Behavior.start(interceptedResult, ctx.asInstanceOf[ActorContext[I]])
|
||||
private def deduplicate(interceptedResult: Behavior[I], ctx: TypedActorContext[O]): Behavior[O] = {
|
||||
val started = Behavior.start(interceptedResult, ctx.asInstanceOf[TypedActorContext[I]])
|
||||
if (started == UnhandledBehavior || started == SameBehavior || !Behavior.isAlive(started)) {
|
||||
started.unsafeCast[O]
|
||||
} else {
|
||||
|
|
@ -104,12 +104,12 @@ private[akka] final class InterceptorImpl[O, I](val interceptor: BehaviorInterce
|
|||
private[akka] final case class MonitorInterceptor[T](actorRef: ActorRef[T]) extends BehaviorInterceptor[T, T] {
|
||||
import BehaviorInterceptor._
|
||||
|
||||
override def aroundReceive(ctx: ActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
actorRef ! msg
|
||||
target(ctx, msg)
|
||||
}
|
||||
|
||||
override def aroundSignal(ctx: ActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
override def aroundSignal(ctx: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
target(ctx, signal)
|
||||
}
|
||||
|
||||
|
|
@ -150,7 +150,7 @@ private[akka] final case class WidenedInterceptor[O, I](matcher: PartialFunction
|
|||
case _ ⇒ false
|
||||
}
|
||||
|
||||
def aroundReceive(ctx: ActorContext[O], msg: O, target: ReceiveTarget[I]): Behavior[I] = {
|
||||
def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[I]): Behavior[I] = {
|
||||
// widen would wrap the TimerMessage, which would be wrong, see issue #25318
|
||||
msg match {
|
||||
case t: TimerMsg ⇒ throw new IllegalArgumentException(
|
||||
|
|
@ -164,7 +164,7 @@ private[akka] final case class WidenedInterceptor[O, I](matcher: PartialFunction
|
|||
}
|
||||
}
|
||||
|
||||
def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] =
|
||||
def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] =
|
||||
target(ctx, signal)
|
||||
|
||||
override def toString: String = s"Widen(${LineNumbers(matcher)})"
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package akka.actor.typed.internal
|
||||
|
||||
import akka.actor.typed.ActorContext
|
||||
import akka.actor.typed.TypedActorContext
|
||||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.BehaviorInterceptor
|
||||
import akka.actor.typed.Signal
|
||||
|
|
@ -31,10 +31,10 @@ import akka.annotation.InternalApi
|
|||
* and process stashed messages before stopping.
|
||||
*/
|
||||
@InternalApi private[akka] final class PoisonPillInterceptor[M] extends BehaviorInterceptor[M, M] {
|
||||
override def aroundReceive(ctx: ActorContext[M], msg: M, target: BehaviorInterceptor.ReceiveTarget[M]): Behavior[M] =
|
||||
override def aroundReceive(ctx: TypedActorContext[M], msg: M, target: BehaviorInterceptor.ReceiveTarget[M]): Behavior[M] =
|
||||
target(ctx, msg)
|
||||
|
||||
override def aroundSignal(ctx: ActorContext[M], signal: Signal, target: BehaviorInterceptor.SignalTarget[M]): Behavior[M] = {
|
||||
override def aroundSignal(ctx: TypedActorContext[M], signal: Signal, target: BehaviorInterceptor.SignalTarget[M]): Behavior[M] = {
|
||||
signal match {
|
||||
case p: PoisonPill ⇒
|
||||
val next = target(ctx, p)
|
||||
|
|
|
|||
|
|
@ -52,27 +52,27 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
|
|||
}
|
||||
}
|
||||
|
||||
override def aroundStart(ctx: ActorContext[O], target: PreStartTarget[I]): Behavior[I] = {
|
||||
override def aroundStart(ctx: TypedActorContext[O], target: PreStartTarget[I]): Behavior[I] = {
|
||||
try {
|
||||
target.start(ctx)
|
||||
} catch handleExceptionOnStart(ctx)
|
||||
}
|
||||
|
||||
def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = {
|
||||
def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[I]): Behavior[I] = {
|
||||
try {
|
||||
target(ctx, signal)
|
||||
} catch handleSignalException(ctx, target)
|
||||
}
|
||||
|
||||
def log(ctx: ActorContext[_], t: Throwable): Unit = {
|
||||
def log(ctx: TypedActorContext[_], t: Throwable): Unit = {
|
||||
if (strategy.loggingEnabled) {
|
||||
ctx.asScala.log.error(t, "Supervisor {} saw failure: {}", this, t.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[I]]
|
||||
protected def handleSignalException(ctx: ActorContext[O], target: SignalTarget[I]): Catcher[Behavior[I]]
|
||||
protected def handleReceiveException(ctx: ActorContext[O], target: ReceiveTarget[I]): Catcher[Behavior[I]]
|
||||
protected def handleExceptionOnStart(ctx: TypedActorContext[O]): Catcher[Behavior[I]]
|
||||
protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[I]): Catcher[Behavior[I]]
|
||||
protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[I]): Catcher[Behavior[I]]
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -80,28 +80,28 @@ private abstract class AbstractSupervisor[O, I, Thr <: Throwable](strategy: Supe
|
|||
*/
|
||||
private abstract class SimpleSupervisor[T, Thr <: Throwable: ClassTag](ss: SupervisorStrategy) extends AbstractSupervisor[T, T, Thr](ss) {
|
||||
|
||||
override def aroundReceive(ctx: ActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
try {
|
||||
target(ctx, msg)
|
||||
} catch handleReceiveException(ctx, target)
|
||||
}
|
||||
|
||||
protected def handleException(@unused ctx: ActorContext[T]): Catcher[Behavior[T]] = {
|
||||
protected def handleException(@unused ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
Behavior.failed(t)
|
||||
}
|
||||
|
||||
// convenience if target not required to handle exception
|
||||
protected def handleExceptionOnStart(ctx: ActorContext[T]): Catcher[Behavior[T]] =
|
||||
protected def handleExceptionOnStart(ctx: TypedActorContext[T]): Catcher[Behavior[T]] =
|
||||
handleException(ctx)
|
||||
protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] =
|
||||
protected def handleSignalException(ctx: TypedActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] =
|
||||
handleException(ctx)
|
||||
protected def handleReceiveException(ctx: ActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] =
|
||||
protected def handleReceiveException(ctx: TypedActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] =
|
||||
handleException(ctx)
|
||||
}
|
||||
|
||||
private class StopSupervisor[T, Thr <: Throwable: ClassTag](strategy: Stop) extends SimpleSupervisor[T, Thr](strategy) {
|
||||
override def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = {
|
||||
override def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
log(ctx, t)
|
||||
Behavior.failed(t)
|
||||
|
|
@ -109,7 +109,7 @@ private class StopSupervisor[T, Thr <: Throwable: ClassTag](strategy: Stop) exte
|
|||
}
|
||||
|
||||
private class ResumeSupervisor[T, Thr <: Throwable: ClassTag](ss: Resume) extends SimpleSupervisor[T, Thr](ss) {
|
||||
override protected def handleException(ctx: ActorContext[T]): Catcher[Behavior[T]] = {
|
||||
override protected def handleException(ctx: TypedActorContext[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
log(ctx, t)
|
||||
Behaviors.same
|
||||
|
|
@ -126,7 +126,7 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat
|
|||
case OptionVal.Some(d) ⇒ d.hasTimeLeft
|
||||
}
|
||||
|
||||
override def aroundStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = {
|
||||
override def aroundStart(ctx: TypedActorContext[T], target: PreStartTarget[T]): Behavior[T] = {
|
||||
try {
|
||||
target.start(ctx)
|
||||
} catch {
|
||||
|
|
@ -150,7 +150,7 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat
|
|||
deadline = newDeadline
|
||||
}
|
||||
|
||||
private def handleException(ctx: ActorContext[T], signalRestart: () ⇒ Unit): Catcher[Behavior[T]] = {
|
||||
private def handleException(ctx: TypedActorContext[T], signalRestart: () ⇒ Unit): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
if (strategy.maxNrOfRetries != -1 && restarts >= strategy.maxNrOfRetries && deadlineHasTimeLeft) {
|
||||
throw t
|
||||
|
|
@ -166,10 +166,10 @@ private class RestartSupervisor[T, Thr <: Throwable](initial: Behavior[T], strat
|
|||
}
|
||||
}
|
||||
|
||||
override protected def handleSignalException(ctx: ActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = {
|
||||
override protected def handleSignalException(ctx: TypedActorContext[T], target: SignalTarget[T]): Catcher[Behavior[T]] = {
|
||||
handleException(ctx, () ⇒ target(ctx, PreRestart))
|
||||
}
|
||||
override protected def handleReceiveException(ctx: ActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
|
||||
override protected def handleReceiveException(ctx: TypedActorContext[T], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
|
||||
handleException(ctx, () ⇒ target.signalRestart(ctx))
|
||||
}
|
||||
}
|
||||
|
|
@ -181,7 +181,7 @@ private class BackoffSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
var blackhole = false
|
||||
var restartCount: Int = 0
|
||||
|
||||
override def aroundSignal(ctx: ActorContext[O], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
override def aroundSignal(ctx: TypedActorContext[O], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
if (blackhole) {
|
||||
import akka.actor.typed.scaladsl.adapter._
|
||||
ctx.asScala.system.toUntyped.eventStream.publish(Dropped(signal, ctx.asScala.self))
|
||||
|
|
@ -191,14 +191,14 @@ private class BackoffSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
}
|
||||
}
|
||||
|
||||
override def aroundReceive(ctx: ActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
override def aroundReceive(ctx: TypedActorContext[O], msg: O, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
try {
|
||||
msg.asInstanceOf[Any] match {
|
||||
case ScheduledRestart ⇒
|
||||
blackhole = false
|
||||
ctx.asScala.scheduleOnce(b.resetBackoffAfter, ctx.asScala.self.unsafeUpcast[Any], ResetRestartCount(restartCount))
|
||||
try {
|
||||
Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[ActorContext[T]]))
|
||||
Behavior.validateAsInitial(Behavior.start(initial, ctx.asInstanceOf[TypedActorContext[T]]))
|
||||
} catch {
|
||||
case NonFatal(ex: Thr) if b.maxRestarts > 0 && restartCount >= b.maxRestarts ⇒
|
||||
log(ctx, ex)
|
||||
|
|
@ -222,12 +222,12 @@ private class BackoffSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
} catch handleReceiveException(ctx, target)
|
||||
}
|
||||
|
||||
protected def handleExceptionOnStart(ctx: ActorContext[O]): Catcher[Behavior[T]] = {
|
||||
protected def handleExceptionOnStart(ctx: TypedActorContext[O]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
scheduleRestart(ctx, t)
|
||||
}
|
||||
|
||||
protected def handleReceiveException(ctx: ActorContext[O], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
|
||||
protected def handleReceiveException(ctx: TypedActorContext[O], target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
try {
|
||||
target.signalRestart(ctx)
|
||||
|
|
@ -237,7 +237,7 @@ private class BackoffSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
scheduleRestart(ctx, t)
|
||||
}
|
||||
|
||||
protected def handleSignalException(ctx: ActorContext[O], target: SignalTarget[T]): Catcher[Behavior[T]] = {
|
||||
protected def handleSignalException(ctx: TypedActorContext[O], target: SignalTarget[T]): Catcher[Behavior[T]] = {
|
||||
case NonFatal(t: Thr) ⇒
|
||||
try {
|
||||
target(ctx, PreRestart)
|
||||
|
|
@ -247,7 +247,7 @@ private class BackoffSupervisor[O, T, Thr <: Throwable: ClassTag](initial: Behav
|
|||
scheduleRestart(ctx, t)
|
||||
}
|
||||
|
||||
private def scheduleRestart(ctx: ActorContext[O], reason: Throwable): Behavior[T] = {
|
||||
private def scheduleRestart(ctx: TypedActorContext[O], reason: Throwable): Behavior[T] = {
|
||||
log(ctx, reason)
|
||||
val restartDelay = calculateDelay(restartCount, b.minBackoff, b.maxBackoff, b.randomFactor)
|
||||
ctx.asScala.scheduleOnce(restartDelay, ctx.asScala.self.unsafeUpcast[Any], ScheduledRestart)
|
||||
|
|
|
|||
|
|
@ -156,7 +156,7 @@ private final class TimerInterceptor[T](timerSchedulerImpl: TimerSchedulerImpl[T
|
|||
import TimerSchedulerImpl._
|
||||
import BehaviorInterceptor._
|
||||
|
||||
override def aroundReceive(ctx: typed.ActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
override def aroundReceive(ctx: typed.TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
val maybeIntercepted = msg match {
|
||||
case msg: TimerMsg ⇒ timerSchedulerImpl.interceptTimerMsg(ctx.asScala.log, msg)
|
||||
case msg ⇒ OptionVal.Some(msg)
|
||||
|
|
@ -168,7 +168,7 @@ private final class TimerInterceptor[T](timerSchedulerImpl: TimerSchedulerImpl[T
|
|||
}
|
||||
}
|
||||
|
||||
override def aroundSignal(ctx: typed.ActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
override def aroundSignal(ctx: typed.TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
signal match {
|
||||
case PreRestart | PostStop ⇒ timerSchedulerImpl.cancelAll()
|
||||
case _ ⇒ // unhandled
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.actor.typed.internal
|
||||
|
||||
import akka.actor.typed.internal.adapter.AbstractLogger
|
||||
import akka.actor.typed.{ ActorContext, Behavior, BehaviorInterceptor, Signal }
|
||||
import akka.actor.typed.{ TypedActorContext, Behavior, BehaviorInterceptor, Signal }
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
import scala.collection.immutable.HashMap
|
||||
|
|
@ -38,7 +38,7 @@ import scala.collection.immutable.HashMap
|
|||
|
||||
import BehaviorInterceptor._
|
||||
|
||||
override def aroundStart(ctx: ActorContext[T], target: PreStartTarget[T]): Behavior[T] = {
|
||||
override def aroundStart(ctx: TypedActorContext[T], target: PreStartTarget[T]): Behavior[T] = {
|
||||
// when declaring we expect the outermost to win
|
||||
// for example with
|
||||
// val behavior = ...
|
||||
|
|
@ -72,7 +72,7 @@ import scala.collection.immutable.HashMap
|
|||
case _ ⇒ false
|
||||
}
|
||||
|
||||
override def aroundReceive(ctx: ActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
override def aroundReceive(ctx: TypedActorContext[T], msg: T, target: ReceiveTarget[T]): Behavior[T] = {
|
||||
val mdc = merge(staticMdc, mdcForMessage(msg))
|
||||
ctx.asScala.log.asInstanceOf[AbstractLogger].mdc = mdc
|
||||
val next =
|
||||
|
|
@ -84,7 +84,7 @@ import scala.collection.immutable.HashMap
|
|||
next
|
||||
}
|
||||
|
||||
override def aroundSignal(ctx: ActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
override def aroundSignal(ctx: TypedActorContext[T], signal: Signal, target: SignalTarget[T]): Behavior[T] = {
|
||||
ctx.asScala.log.asInstanceOf[AbstractLogger].mdc = staticMdc
|
||||
try {
|
||||
target(ctx, signal)
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ import scala.concurrent.ExecutionContextExecutor
|
|||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[ActorContext]].
|
||||
* INTERNAL API. Wrapping an [[akka.actor.ActorContext]] as an [[TypedActorContext]].
|
||||
*/
|
||||
@InternalApi private[akka] final class ActorContextAdapter[T](val untyped: a.ActorContext) extends ActorContextImpl[T] {
|
||||
|
||||
|
|
@ -102,7 +102,7 @@ import scala.concurrent.duration._
|
|||
*/
|
||||
@InternalApi private[typed] object ActorContextAdapter {
|
||||
|
||||
private def toUntypedImp[U](ctx: ActorContext[_]): a.ActorContext =
|
||||
private def toUntypedImp[U](ctx: TypedActorContext[_]): a.ActorContext =
|
||||
ctx match {
|
||||
case adapter: ActorContextAdapter[_] ⇒ adapter.untyped
|
||||
case _ ⇒
|
||||
|
|
@ -110,11 +110,11 @@ import scala.concurrent.duration._
|
|||
s"($ctx of class ${ctx.getClass.getName})")
|
||||
}
|
||||
|
||||
def toUntyped2[U](ctx: ActorContext[_]): a.ActorContext = toUntypedImp(ctx)
|
||||
def toUntyped2[U](ctx: TypedActorContext[_]): a.ActorContext = toUntypedImp(ctx)
|
||||
|
||||
def toUntyped[U](ctx: scaladsl.ActorContext[_]): a.ActorContext =
|
||||
ctx match {
|
||||
case c: ActorContext[_] ⇒ toUntypedImp(c)
|
||||
case c: TypedActorContext[_] ⇒ toUntypedImp(c)
|
||||
case _ ⇒
|
||||
throw new UnsupportedOperationException("unknown ActorContext type " +
|
||||
s"($ctx of class ${ctx.getClass.getName})")
|
||||
|
|
@ -122,7 +122,7 @@ import scala.concurrent.duration._
|
|||
|
||||
def toUntyped[U](ctx: javadsl.ActorContext[_]): a.ActorContext =
|
||||
ctx match {
|
||||
case c: ActorContext[_] ⇒ toUntypedImp(c)
|
||||
case c: TypedActorContext[_] ⇒ toUntypedImp(c)
|
||||
case _ ⇒
|
||||
throw new UnsupportedOperationException("unknown ActorContext type " +
|
||||
s"($ctx of class ${ctx.getClass.getName})")
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package akka.actor.typed.javadsl
|
||||
|
||||
import akka.actor.typed.{ Behavior, ExtensibleBehavior, Signal }
|
||||
import akka.actor.typed.{ Behavior, ExtensibleBehavior, Signal, TypedActorContext }
|
||||
import akka.util.OptionVal
|
||||
|
||||
/**
|
||||
|
|
@ -33,11 +33,11 @@ abstract class AbstractBehavior[T] extends ExtensibleBehavior[T] {
|
|||
}
|
||||
|
||||
@throws(classOf[Exception])
|
||||
override final def receive(ctx: akka.actor.typed.ActorContext[T], msg: T): Behavior[T] =
|
||||
override final def receive(ctx: TypedActorContext[T], msg: T): Behavior[T] =
|
||||
receive.receive(ctx, msg)
|
||||
|
||||
@throws(classOf[Exception])
|
||||
override final def receiveSignal(ctx: akka.actor.typed.ActorContext[T], msg: Signal): Behavior[T] =
|
||||
override final def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] =
|
||||
receive.receiveSignal(ctx, msg)
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ import scala.concurrent.ExecutionContextExecutor
|
|||
*/
|
||||
@DoNotInherit
|
||||
@ApiMayChange
|
||||
trait ActorContext[T] extends akka.actor.typed.ActorContext[T] {
|
||||
trait ActorContext[T] extends TypedActorContext[T] {
|
||||
// this must be a pure interface, i.e. only abstract methods
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -249,9 +249,9 @@ private class BuiltBehavior[T](
|
|||
private val signalHandlers: List[Case[T, Signal]]
|
||||
) extends ExtensibleBehavior[T] {
|
||||
|
||||
override def receive(ctx: typed.ActorContext[T], msg: T): Behavior[T] = receive[T](ctx.asJava, msg, messageHandlers)
|
||||
override def receive(ctx: typed.TypedActorContext[T], msg: T): Behavior[T] = receive[T](ctx.asJava, msg, messageHandlers)
|
||||
|
||||
override def receiveSignal(ctx: typed.ActorContext[T], msg: Signal): Behavior[T] = receive[Signal](ctx.asJava, msg, signalHandlers)
|
||||
override def receiveSignal(ctx: typed.TypedActorContext[T], msg: Signal): Behavior[T] = receive[Signal](ctx.asJava, msg, signalHandlers)
|
||||
|
||||
@tailrec
|
||||
private def receive[M](ctx: ActorContext[T], msg: M, handlers: List[Case[T, M]]): Behavior[T] =
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package akka.actor.typed.javadsl
|
|||
import akka.actor.typed.Behavior
|
||||
import akka.actor.typed.ExtensibleBehavior
|
||||
import akka.actor.typed.Signal
|
||||
import akka.actor.typed.TypedActorContext
|
||||
import akka.annotation.DoNotInherit
|
||||
|
||||
/**
|
||||
|
|
@ -48,11 +49,11 @@ abstract class Receive[T] extends ExtensibleBehavior[T] {
|
|||
def receiveSignal(sig: Signal): Behavior[T]
|
||||
|
||||
@throws(classOf[Exception])
|
||||
override final def receive(ctx: akka.actor.typed.ActorContext[T], msg: T): Behavior[T] =
|
||||
override final def receive(ctx: TypedActorContext[T], msg: T): Behavior[T] =
|
||||
receiveMessage(msg)
|
||||
|
||||
@throws(classOf[Exception])
|
||||
override final def receiveSignal(ctx: akka.actor.typed.ActorContext[T], sig: Signal): Behavior[T] =
|
||||
override final def receiveSignal(ctx: TypedActorContext[T], sig: Signal): Behavior[T] =
|
||||
receiveSignal(sig)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
package akka.actor.typed.scaladsl
|
||||
|
||||
import akka.actor.typed.{ Behavior, ExtensibleBehavior, Signal }
|
||||
import akka.actor.typed.{ Behavior, ExtensibleBehavior, Signal, TypedActorContext }
|
||||
|
||||
/**
|
||||
* An actor `Behavior` can be implemented by extending this class and implement the
|
||||
|
|
@ -55,10 +55,10 @@ abstract class AbstractBehavior[T] extends ExtensibleBehavior[T] {
|
|||
def onSignal: PartialFunction[Signal, Behavior[T]] = PartialFunction.empty
|
||||
|
||||
@throws(classOf[Exception])
|
||||
override final def receive(ctx: akka.actor.typed.ActorContext[T], msg: T): Behavior[T] =
|
||||
override final def receive(ctx: TypedActorContext[T], msg: T): Behavior[T] =
|
||||
onMessage(msg)
|
||||
|
||||
@throws(classOf[Exception])
|
||||
override final def receiveSignal(ctx: akka.actor.typed.ActorContext[T], msg: Signal): Behavior[T] =
|
||||
override final def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] =
|
||||
onSignal.applyOrElse(msg, { case _ ⇒ Behavior.unhandled }: PartialFunction[Signal, Behavior[T]])
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ import akka.annotation.InternalApi
|
|||
*/
|
||||
@DoNotInherit
|
||||
@ApiMayChange
|
||||
trait ActorContext[T] extends akka.actor.typed.ActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
|
||||
trait ActorContext[T] extends TypedActorContext[T] { this: akka.actor.typed.javadsl.ActorContext[T] ⇒
|
||||
|
||||
/**
|
||||
* Get the `javadsl` of this `ActorContext`.
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ import akka.actor.ActorRefProvider
|
|||
import akka.actor.ExtendedActorSystem
|
||||
import akka.actor.InternalActorRef
|
||||
import akka.actor.Scheduler
|
||||
import akka.actor.typed.ActorContext
|
||||
import akka.actor.typed.TypedActorContext
|
||||
import akka.actor.typed.ActorRef
|
||||
import akka.actor.typed.ActorSystem
|
||||
import akka.actor.typed.Behavior
|
||||
|
|
@ -348,7 +348,7 @@ import akka.util.Timeout
|
|||
import akka.cluster.sharding.ShardRegion.{ Passivate ⇒ UntypedPassivate }
|
||||
|
||||
def behavior(stopMessage: Any): Behavior[scaladsl.ClusterSharding.ShardCommand] = {
|
||||
def sendUntypedPassivate(entity: ActorRef[_], ctx: ActorContext[_]): Unit = {
|
||||
def sendUntypedPassivate(entity: ActorRef[_], ctx: TypedActorContext[_]): Unit = {
|
||||
val pathToShard = entity.toUntyped.path.elements.take(4).mkString("/")
|
||||
ctx.asScala.system.toUntyped.actorSelection(pathToShard).tell(UntypedPassivate(stopMessage), entity.toUntyped)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,8 @@ is a tool that mimics the old style of starting up actors.
|
|||
|
||||
### Spawning Children
|
||||
|
||||
Child actors are spawned with @unidoc[akka.actor.typed.ActorContext]'s `spawn`. In the example below, when the root actor
|
||||
Child actors are spawned with @scala[@unidoc[akka.actor.typed.scaladsl.ActorContext]]@java[@unidoc[akka.actor.typed.javadsl.ActorContext]]'s `spawn`.
|
||||
In the example below, when the root actor
|
||||
is started, it spawns a child actor described by the behavior `HelloWorld.greeter`. Additionally, when the root actor receives a
|
||||
`Start` message, it creates a child actor defined by the behavior `HelloWorldBot.bot`:
|
||||
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
|
|||
|
||||
import EventSourcedBehaviorImpl.WriterIdentity
|
||||
|
||||
override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
|
||||
override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = {
|
||||
Behaviors.supervise {
|
||||
Behaviors.setup[Command] { ctx ⇒
|
||||
val settings = EventSourcedSettings(ctx.system, journalPluginId.getOrElse(""), snapshotPluginId.getOrElse(""))
|
||||
|
|
@ -103,11 +103,11 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
|
|||
// not part of the protocol
|
||||
val onStopInterceptor = new BehaviorInterceptor[Any, Any] {
|
||||
import BehaviorInterceptor._
|
||||
def aroundReceive(ctx: typed.ActorContext[Any], msg: Any, target: ReceiveTarget[Any]): Behavior[Any] = {
|
||||
def aroundReceive(ctx: typed.TypedActorContext[Any], msg: Any, target: ReceiveTarget[Any]): Behavior[Any] = {
|
||||
target(ctx, msg)
|
||||
}
|
||||
|
||||
def aroundSignal(ctx: typed.ActorContext[Any], signal: Signal, target: SignalTarget[Any]): Behavior[Any] = {
|
||||
def aroundSignal(ctx: typed.TypedActorContext[Any], signal: Signal, target: SignalTarget[Any]): Behavior[Any] = {
|
||||
if (signal == PostStop) {
|
||||
eventsourcedSetup.cancelRecoveryTimer()
|
||||
clearStashBuffer()
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ abstract class EventSourcedBehavior[Command, Event, State >: Null] private[akka]
|
|||
/**
|
||||
* INTERNAL API: DeferredBehavior init
|
||||
*/
|
||||
@InternalApi override def apply(context: typed.ActorContext[Command]): Behavior[Command] = {
|
||||
@InternalApi override def apply(context: typed.TypedActorContext[Command]): Behavior[Command] = {
|
||||
|
||||
val snapshotWhen: (State, Event, Long) ⇒ Boolean = { (state, event, seqNr) ⇒
|
||||
val n = snapshotEvery()
|
||||
|
|
|
|||
|
|
@ -452,13 +452,13 @@ public class PersistentActorJavaDslTest extends JUnitSuite {
|
|||
TestProbe<Signal> signalProbe = testKit.createTestProbe();
|
||||
BehaviorInterceptor<Command, Command> tap = new BehaviorInterceptor<Command, Command>() {
|
||||
@Override
|
||||
public Behavior<Command> aroundReceive(akka.actor.typed.ActorContext<Command> ctx, Command msg, ReceiveTarget<Command> target) {
|
||||
public Behavior<Command> aroundReceive(TypedActorContext<Command> ctx, Command msg, ReceiveTarget<Command> target) {
|
||||
interceptProbe.ref().tell(msg);
|
||||
return target.apply(ctx, msg);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Behavior<Command> aroundSignal(akka.actor.typed.ActorContext<Command> ctx, Signal signal, SignalTarget<Command> target) {
|
||||
public Behavior<Command> aroundSignal(TypedActorContext<Command> ctx, Signal signal, SignalTarget<Command> target) {
|
||||
signalProbe.ref().tell(signal);
|
||||
return target.apply(ctx, signal);
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue