Typed - cleanup canonic #22841
This commit is contained in:
parent
5c68f2f627
commit
e81f350b2f
3 changed files with 50 additions and 60 deletions
|
|
@ -222,6 +222,24 @@ object Behavior {
|
|||
case other ⇒ other
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Return special behaviors as is, undefer deferred, if behavior is "non-special" apply the wrap function `f` to get
|
||||
* and return the result from that. Useful for cases where a [[Behavior]] implementation that is decorating another
|
||||
* behavior has processed a message and needs to re-wrap the resulting behavior with itself.
|
||||
*/
|
||||
@InternalApi
|
||||
@tailrec
|
||||
private[akka] def wrap[T, U](currentBehavior: Behavior[_], nextBehavior: Behavior[T], ctx: ActorContext[T])(f: Behavior[T] ⇒ Behavior[U]): Behavior[U] =
|
||||
nextBehavior match {
|
||||
case SameBehavior | `currentBehavior` ⇒ same
|
||||
case UnhandledBehavior ⇒ unhandled
|
||||
case StoppedBehavior ⇒ stopped
|
||||
case deferred: DeferredBehavior[T] ⇒ wrap(currentBehavior, undefer(deferred, ctx), ctx)(f)
|
||||
case other ⇒ f(other)
|
||||
}
|
||||
|
||||
@tailrec
|
||||
def undefer[T](behavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
|
||||
behavior match {
|
||||
|
|
|
|||
|
|
@ -7,10 +7,9 @@ package internal
|
|||
import akka.util.LineNumbers
|
||||
import akka.annotation.InternalApi
|
||||
import akka.actor.typed.{ ActorContext ⇒ AC }
|
||||
import akka.actor.typed.scaladsl.{ ActorContext ⇒ TAC }
|
||||
import akka.actor.typed.scaladsl.{ ActorContext ⇒ SAC }
|
||||
|
||||
import scala.reflect.ClassTag
|
||||
import scala.annotation.tailrec
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -22,7 +21,7 @@ import scala.annotation.tailrec
|
|||
private def nullFun[T] = _nullFun.asInstanceOf[Any ⇒ T]
|
||||
|
||||
implicit class ContextAs[T](val ctx: AC[T]) extends AnyVal {
|
||||
def as[U] = ctx.asInstanceOf[AC[U]]
|
||||
def as[U]: AC[U] = ctx.asInstanceOf[AC[U]]
|
||||
}
|
||||
|
||||
def widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]): Behavior[U] = {
|
||||
|
|
@ -39,45 +38,36 @@ import scala.annotation.tailrec
|
|||
}
|
||||
|
||||
private final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends ExtensibleBehavior[U] {
|
||||
@tailrec
|
||||
private def canonical(b: Behavior[T], ctx: AC[T]): Behavior[U] = {
|
||||
if (isUnhandled(b)) unhandled
|
||||
else if ((b eq SameBehavior) || (b eq this)) same
|
||||
else if (!Behavior.isAlive(b)) Behavior.stopped
|
||||
else {
|
||||
b match {
|
||||
case d: DeferredBehavior[T] ⇒ canonical(Behavior.undefer(d, ctx), ctx)
|
||||
case _ ⇒ Widened(b, matcher)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def widen(b: Behavior[T], ctx: AC[T]): Behavior[U] =
|
||||
Behavior.wrap(this, b, ctx)(b ⇒ Widened[T, U](b, matcher))
|
||||
|
||||
override def receiveSignal(ctx: AC[U], signal: Signal): Behavior[U] =
|
||||
canonical(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T])
|
||||
widen(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T])
|
||||
|
||||
override def receiveMessage(ctx: AC[U], msg: U): Behavior[U] =
|
||||
matcher.applyOrElse(msg, nullFun) match {
|
||||
case null ⇒ unhandled
|
||||
case transformed ⇒ canonical(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T])
|
||||
case transformed ⇒ widen(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T])
|
||||
}
|
||||
|
||||
override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})"
|
||||
}
|
||||
|
||||
class ImmutableBehavior[T](
|
||||
val onMessage: (TAC[T], T) ⇒ Behavior[T],
|
||||
onSignal: PartialFunction[(TAC[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(TAC[T], Signal), Behavior[T]]])
|
||||
val onMessage: (SAC[T], T) ⇒ Behavior[T],
|
||||
onSignal: PartialFunction[(SAC[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
extends ExtensibleBehavior[T] {
|
||||
|
||||
override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] =
|
||||
onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(TAC[T], Signal), Behavior[T]]])
|
||||
onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
|
||||
override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx.asScala, msg)
|
||||
override def toString = s"Immutable(${LineNumbers(onMessage)})"
|
||||
}
|
||||
|
||||
def tap[T](
|
||||
onMessage: (TAC[T], T) ⇒ _,
|
||||
onSignal: (TAC[T], Signal) ⇒ _,
|
||||
onMessage: (SAC[T], T) ⇒ _,
|
||||
onSignal: (SAC[T], Signal) ⇒ _,
|
||||
behavior: Behavior[T]): Behavior[T] = {
|
||||
intercept[T, T](
|
||||
beforeMessage = (ctx, msg) ⇒ {
|
||||
|
|
@ -109,12 +99,12 @@ import scala.annotation.tailrec
|
|||
* different than the incoming message).
|
||||
*/
|
||||
def intercept[T, U <: Any: ClassTag](
|
||||
beforeMessage: Function2[TAC[U], U, T],
|
||||
beforeSignal: Function2[TAC[T], Signal, Boolean],
|
||||
afterMessage: Function3[TAC[T], T, Behavior[T], Behavior[T]],
|
||||
afterSignal: Function3[TAC[T], Signal, Behavior[T], Behavior[T]],
|
||||
beforeMessage: (SAC[U], U) ⇒ T,
|
||||
beforeSignal: (SAC[T], Signal) ⇒ Boolean,
|
||||
afterMessage: (SAC[T], T, Behavior[T]) ⇒ Behavior[T],
|
||||
afterSignal: (SAC[T], Signal, Behavior[T]) ⇒ Behavior[T],
|
||||
behavior: Behavior[T],
|
||||
toStringPrefix: String = "Intercept"): Behavior[T] = {
|
||||
toStringPrefix: String = "Intercept"): Behavior[T] = {
|
||||
behavior match {
|
||||
case d: DeferredBehavior[T] ⇒
|
||||
DeferredBehavior[T] { ctx ⇒
|
||||
|
|
@ -128,24 +118,15 @@ import scala.annotation.tailrec
|
|||
}
|
||||
|
||||
private final case class Intercept[T, U <: Any: ClassTag](
|
||||
beforeOnMessage: Function2[TAC[U], U, T],
|
||||
beforeOnSignal: Function2[TAC[T], Signal, Boolean],
|
||||
afterMessage: Function3[TAC[T], T, Behavior[T], Behavior[T]],
|
||||
afterSignal: Function3[TAC[T], Signal, Behavior[T], Behavior[T]],
|
||||
beforeOnMessage: (SAC[U], U) ⇒ T,
|
||||
beforeOnSignal: (SAC[T], Signal) ⇒ Boolean,
|
||||
afterMessage: (SAC[T], T, Behavior[T]) ⇒ Behavior[T],
|
||||
afterSignal: (SAC[T], Signal, Behavior[T]) ⇒ Behavior[T],
|
||||
behavior: Behavior[T],
|
||||
toStringPrefix: String = "Intercept") extends ExtensibleBehavior[T] {
|
||||
toStringPrefix: String = "Intercept") extends ExtensibleBehavior[T] {
|
||||
|
||||
@tailrec
|
||||
private def canonical(b: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
|
||||
if (isUnhandled(b)) unhandled
|
||||
else if ((b eq SameBehavior) || (b eq this)) same
|
||||
else if (!Behavior.isAlive(b)) b
|
||||
else {
|
||||
b match {
|
||||
case d: DeferredBehavior[T] ⇒ canonical(Behavior.undefer(d, ctx), ctx)
|
||||
case _ ⇒ Intercept(beforeOnMessage, beforeOnSignal, afterMessage, afterSignal, b)
|
||||
}
|
||||
}
|
||||
private def intercept(nextBehavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = {
|
||||
Behavior.wrap(this, nextBehavior, ctx)(Intercept(beforeOnMessage, beforeOnSignal, afterMessage, afterSignal, _))
|
||||
}
|
||||
|
||||
override def receiveSignal(ctx: AC[T], signal: Signal): Behavior[T] = {
|
||||
|
|
@ -154,22 +135,22 @@ import scala.annotation.tailrec
|
|||
Behavior.interpretSignal(behavior, ctx, signal)
|
||||
else
|
||||
same
|
||||
canonical(afterSignal(ctx.asScala, signal, next), ctx)
|
||||
intercept(afterSignal(ctx.asScala, signal, next), ctx)
|
||||
}
|
||||
|
||||
override def receiveMessage(ctx: AC[T], msg: T): Behavior[T] = {
|
||||
msg match {
|
||||
case m: U ⇒
|
||||
val msg2 = beforeOnMessage(ctx.asScala.asInstanceOf[TAC[U]], m)
|
||||
val msg2 = beforeOnMessage(ctx.asScala.asInstanceOf[SAC[U]], m)
|
||||
val next: Behavior[T] =
|
||||
if (msg2 == null)
|
||||
same
|
||||
else
|
||||
Behavior.interpretMessage(behavior, ctx, msg2)
|
||||
canonical(afterMessage(ctx.asScala, msg2, next), ctx)
|
||||
intercept(afterMessage(ctx.asScala, msg2, next), ctx)
|
||||
case _ ⇒
|
||||
val next: Behavior[T] = Behavior.interpretMessage(behavior, ctx, msg)
|
||||
canonical(afterMessage(ctx.asScala, msg, next), ctx)
|
||||
intercept(afterMessage(ctx.asScala, msg, next), ctx)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -83,29 +83,20 @@ import akka.actor.typed.scaladsl.Behaviors
|
|||
wrap(Restarter.initialUndefer(ctx, initialBehavior), afterException = true)
|
||||
}
|
||||
|
||||
@tailrec
|
||||
protected final def canonical(b: Behavior[T], ctx: ActorContext[T], afterException: Boolean): Behavior[T] =
|
||||
if (Behavior.isUnhandled(b)) Behavior.unhandled
|
||||
else if ((b eq Behavior.SameBehavior) || (b eq behavior)) Behavior.same
|
||||
else if (!Behavior.isAlive(b)) b
|
||||
else {
|
||||
b match {
|
||||
case d: DeferredBehavior[T] ⇒ canonical(Behavior.undefer(d, ctx), ctx, afterException)
|
||||
case b ⇒ wrap(b, afterException)
|
||||
}
|
||||
}
|
||||
protected final def supervise(nextBehavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] =
|
||||
Behavior.wrap[T, T](behavior, nextBehavior, ctx)(wrap(_, afterException = false))
|
||||
|
||||
override def receiveSignal(ctx: ActorContext[T], signal: Signal): Behavior[T] = {
|
||||
try {
|
||||
val b = Behavior.interpretSignal(behavior, ctx, signal)
|
||||
canonical(b, ctx, afterException = false)
|
||||
supervise(b, ctx)
|
||||
} catch handleException(ctx, behavior)
|
||||
}
|
||||
|
||||
override def receiveMessage(ctx: ActorContext[T], msg: T): Behavior[T] = {
|
||||
try {
|
||||
val b = Behavior.interpretMessage(behavior, ctx, msg)
|
||||
canonical(b, ctx, afterException = false)
|
||||
supervise(b, ctx)
|
||||
} catch handleException(ctx, behavior)
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue