diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index ef8c2a2723..98e039a268 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -222,6 +222,24 @@ object Behavior { case other ⇒ other } + /** + * INTERNAL API + * + * Return special behaviors as is, undefer deferred, if behavior is "non-special" apply the wrap function `f` to get + * and return the result from that. Useful for cases where a [[Behavior]] implementation that is decorating another + * behavior has processed a message and needs to re-wrap the resulting behavior with itself. + */ + @InternalApi + @tailrec + private[akka] def wrap[T, U](currentBehavior: Behavior[_], nextBehavior: Behavior[T], ctx: ActorContext[T])(f: Behavior[T] ⇒ Behavior[U]): Behavior[U] = + nextBehavior match { + case SameBehavior | `currentBehavior` ⇒ same + case UnhandledBehavior ⇒ unhandled + case StoppedBehavior ⇒ stopped + case deferred: DeferredBehavior[T] ⇒ wrap(currentBehavior, undefer(deferred, ctx), ctx)(f) + case other ⇒ f(other) + } + @tailrec def undefer[T](behavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = { behavior match { diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala index 0a300a6d20..bd996bfafb 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/BehaviorImpl.scala @@ -7,10 +7,9 @@ package internal import akka.util.LineNumbers import akka.annotation.InternalApi import akka.actor.typed.{ ActorContext ⇒ AC } -import akka.actor.typed.scaladsl.{ ActorContext ⇒ TAC } +import akka.actor.typed.scaladsl.{ ActorContext ⇒ SAC } import scala.reflect.ClassTag -import scala.annotation.tailrec /** * INTERNAL API @@ -22,7 +21,7 @@ import scala.annotation.tailrec private def nullFun[T] = _nullFun.asInstanceOf[Any ⇒ T] implicit class ContextAs[T](val ctx: AC[T]) extends AnyVal { - def as[U] = ctx.asInstanceOf[AC[U]] + def as[U]: AC[U] = ctx.asInstanceOf[AC[U]] } def widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]): Behavior[U] = { @@ -39,45 +38,36 @@ import scala.annotation.tailrec } private final case class Widened[T, U](behavior: Behavior[T], matcher: PartialFunction[U, T]) extends ExtensibleBehavior[U] { - @tailrec - private def canonical(b: Behavior[T], ctx: AC[T]): Behavior[U] = { - if (isUnhandled(b)) unhandled - else if ((b eq SameBehavior) || (b eq this)) same - else if (!Behavior.isAlive(b)) Behavior.stopped - else { - b match { - case d: DeferredBehavior[T] ⇒ canonical(Behavior.undefer(d, ctx), ctx) - case _ ⇒ Widened(b, matcher) - } - } - } + + private def widen(b: Behavior[T], ctx: AC[T]): Behavior[U] = + Behavior.wrap(this, b, ctx)(b ⇒ Widened[T, U](b, matcher)) override def receiveSignal(ctx: AC[U], signal: Signal): Behavior[U] = - canonical(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T]) + widen(Behavior.interpretSignal(behavior, ctx.as[T], signal), ctx.as[T]) override def receiveMessage(ctx: AC[U], msg: U): Behavior[U] = matcher.applyOrElse(msg, nullFun) match { case null ⇒ unhandled - case transformed ⇒ canonical(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T]) + case transformed ⇒ widen(Behavior.interpretMessage(behavior, ctx.as[T], transformed), ctx.as[T]) } override def toString: String = s"${behavior.toString}.widen(${LineNumbers(matcher)})" } class ImmutableBehavior[T]( - val onMessage: (TAC[T], T) ⇒ Behavior[T], - onSignal: PartialFunction[(TAC[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(TAC[T], Signal), Behavior[T]]]) + val onMessage: (SAC[T], T) ⇒ Behavior[T], + onSignal: PartialFunction[(SAC[T], Signal), Behavior[T]] = Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]]) extends ExtensibleBehavior[T] { override def receiveSignal(ctx: AC[T], msg: Signal): Behavior[T] = - onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(TAC[T], Signal), Behavior[T]]]) + onSignal.applyOrElse((ctx.asScala, msg), Behavior.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]]) override def receiveMessage(ctx: AC[T], msg: T) = onMessage(ctx.asScala, msg) override def toString = s"Immutable(${LineNumbers(onMessage)})" } def tap[T]( - onMessage: (TAC[T], T) ⇒ _, - onSignal: (TAC[T], Signal) ⇒ _, + onMessage: (SAC[T], T) ⇒ _, + onSignal: (SAC[T], Signal) ⇒ _, behavior: Behavior[T]): Behavior[T] = { intercept[T, T]( beforeMessage = (ctx, msg) ⇒ { @@ -109,12 +99,12 @@ import scala.annotation.tailrec * different than the incoming message). */ def intercept[T, U <: Any: ClassTag]( - beforeMessage: Function2[TAC[U], U, T], - beforeSignal: Function2[TAC[T], Signal, Boolean], - afterMessage: Function3[TAC[T], T, Behavior[T], Behavior[T]], - afterSignal: Function3[TAC[T], Signal, Behavior[T], Behavior[T]], + beforeMessage: (SAC[U], U) ⇒ T, + beforeSignal: (SAC[T], Signal) ⇒ Boolean, + afterMessage: (SAC[T], T, Behavior[T]) ⇒ Behavior[T], + afterSignal: (SAC[T], Signal, Behavior[T]) ⇒ Behavior[T], behavior: Behavior[T], - toStringPrefix: String = "Intercept"): Behavior[T] = { + toStringPrefix: String = "Intercept"): Behavior[T] = { behavior match { case d: DeferredBehavior[T] ⇒ DeferredBehavior[T] { ctx ⇒ @@ -128,24 +118,15 @@ import scala.annotation.tailrec } private final case class Intercept[T, U <: Any: ClassTag]( - beforeOnMessage: Function2[TAC[U], U, T], - beforeOnSignal: Function2[TAC[T], Signal, Boolean], - afterMessage: Function3[TAC[T], T, Behavior[T], Behavior[T]], - afterSignal: Function3[TAC[T], Signal, Behavior[T], Behavior[T]], + beforeOnMessage: (SAC[U], U) ⇒ T, + beforeOnSignal: (SAC[T], Signal) ⇒ Boolean, + afterMessage: (SAC[T], T, Behavior[T]) ⇒ Behavior[T], + afterSignal: (SAC[T], Signal, Behavior[T]) ⇒ Behavior[T], behavior: Behavior[T], - toStringPrefix: String = "Intercept") extends ExtensibleBehavior[T] { + toStringPrefix: String = "Intercept") extends ExtensibleBehavior[T] { - @tailrec - private def canonical(b: Behavior[T], ctx: ActorContext[T]): Behavior[T] = { - if (isUnhandled(b)) unhandled - else if ((b eq SameBehavior) || (b eq this)) same - else if (!Behavior.isAlive(b)) b - else { - b match { - case d: DeferredBehavior[T] ⇒ canonical(Behavior.undefer(d, ctx), ctx) - case _ ⇒ Intercept(beforeOnMessage, beforeOnSignal, afterMessage, afterSignal, b) - } - } + private def intercept(nextBehavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = { + Behavior.wrap(this, nextBehavior, ctx)(Intercept(beforeOnMessage, beforeOnSignal, afterMessage, afterSignal, _)) } override def receiveSignal(ctx: AC[T], signal: Signal): Behavior[T] = { @@ -154,22 +135,22 @@ import scala.annotation.tailrec Behavior.interpretSignal(behavior, ctx, signal) else same - canonical(afterSignal(ctx.asScala, signal, next), ctx) + intercept(afterSignal(ctx.asScala, signal, next), ctx) } override def receiveMessage(ctx: AC[T], msg: T): Behavior[T] = { msg match { case m: U ⇒ - val msg2 = beforeOnMessage(ctx.asScala.asInstanceOf[TAC[U]], m) + val msg2 = beforeOnMessage(ctx.asScala.asInstanceOf[SAC[U]], m) val next: Behavior[T] = if (msg2 == null) same else Behavior.interpretMessage(behavior, ctx, msg2) - canonical(afterMessage(ctx.asScala, msg2, next), ctx) + intercept(afterMessage(ctx.asScala, msg2, next), ctx) case _ ⇒ val next: Behavior[T] = Behavior.interpretMessage(behavior, ctx, msg) - canonical(afterMessage(ctx.asScala, msg, next), ctx) + intercept(afterMessage(ctx.asScala, msg, next), ctx) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala index e997a2983c..627de1b4f9 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/Restarter.scala @@ -83,29 +83,20 @@ import akka.actor.typed.scaladsl.Behaviors wrap(Restarter.initialUndefer(ctx, initialBehavior), afterException = true) } - @tailrec - protected final def canonical(b: Behavior[T], ctx: ActorContext[T], afterException: Boolean): Behavior[T] = - if (Behavior.isUnhandled(b)) Behavior.unhandled - else if ((b eq Behavior.SameBehavior) || (b eq behavior)) Behavior.same - else if (!Behavior.isAlive(b)) b - else { - b match { - case d: DeferredBehavior[T] ⇒ canonical(Behavior.undefer(d, ctx), ctx, afterException) - case b ⇒ wrap(b, afterException) - } - } + protected final def supervise(nextBehavior: Behavior[T], ctx: ActorContext[T]): Behavior[T] = + Behavior.wrap[T, T](behavior, nextBehavior, ctx)(wrap(_, afterException = false)) override def receiveSignal(ctx: ActorContext[T], signal: Signal): Behavior[T] = { try { val b = Behavior.interpretSignal(behavior, ctx, signal) - canonical(b, ctx, afterException = false) + supervise(b, ctx) } catch handleException(ctx, behavior) } override def receiveMessage(ctx: ActorContext[T], msg: T): Behavior[T] = { try { val b = Behavior.interpretMessage(behavior, ctx, msg) - canonical(b, ctx, afterException = false) + supervise(b, ctx) } catch handleException(ctx, behavior) }