From 78df3940392fce80324246aaa41580b1e489765d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 27 Mar 2019 16:45:20 +0100 Subject: [PATCH] Typed perf improvements, #25986 * Table switch for interpreter ~20% faster * Avoid instanceof on message in interpreter (but keep single method) ~10% faster * Use the behavior tags all over ~4% improvement * Make next a single tableswitch * Behavior.wrap isn't actually used anymore * Sidestep the untyped behavior stack --- .../scala/akka/actor/typed/Behavior.scala | 140 +++++++++--------- .../typed/internal/adapter/ActorAdapter.scala | 88 ++++++----- 2 files changed, 125 insertions(+), 103 deletions(-) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala index 82df786756..c6c9839147 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/Behavior.scala @@ -5,16 +5,36 @@ package akka.actor.typed import scala.annotation.tailrec - import akka.actor.InvalidMessageException import akka.actor.typed.internal.BehaviorImpl import akka.actor.typed.internal.WrappingBehavior import akka.actor.typed.internal.BehaviorImpl.OrElseBehavior - import akka.util.{ LineNumbers, OptionVal } import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi } import akka.actor.typed.scaladsl.{ ActorContext => SAC } +import scala.annotation.switch + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object BehaviorTags { + + // optimization - by keeping an identifier for each concrete subtype of behavior + // without gaps we can do table switches instead of instance of checks when interpreting + // note that these must be compile time constants for it to work + final val ExtensibleBehavior = 1 + final val EmptyBehavior = 2 + final val IgnoreBehavior = 3 + final val UnhandledBehavior = 4 + final val DeferredBehavior = 5 + final val SameBehavior = 6 + final val FailedBehavior = 7 + final val StoppedBehavior = 8 + +} + /** * The behavior of an actor defines how it reacts to the messages that it * receives. The message may either be of the type that the Actor declares @@ -37,7 +57,7 @@ import akka.actor.typed.scaladsl.{ ActorContext => SAC } */ @ApiMayChange @DoNotInherit -abstract class Behavior[T] { behavior => +abstract class Behavior[T](private[akka] val _tag: Int) { behavior => /** * Narrow the type of this Behavior, which is always a safe operation. This @@ -77,7 +97,7 @@ abstract class Behavior[T] { behavior => * Note that behaviors that keep an inner behavior, and intercepts messages for it should not be implemented as * an extensible behavior but should instead use the [[BehaviorInterceptor]] */ -abstract class ExtensibleBehavior[T] extends Behavior[T] { +abstract class ExtensibleBehavior[T] extends Behavior[T](BehaviorTags.ExtensibleBehavior) { /** * Process an incoming message and return the next behavior. @@ -194,7 +214,7 @@ object Behavior { * INTERNAL API. */ @InternalApi - private[akka] object EmptyBehavior extends Behavior[Any] { + private[akka] object EmptyBehavior extends Behavior[Any](BehaviorTags.EmptyBehavior) { override def toString = "Empty" } @@ -202,7 +222,7 @@ object Behavior { * INTERNAL API. */ @InternalApi - private[akka] object IgnoreBehavior extends Behavior[Any] { + private[akka] object IgnoreBehavior extends Behavior[Any](BehaviorTags.IgnoreBehavior) { override def toString = "Ignore" } @@ -210,7 +230,7 @@ object Behavior { * INTERNAL API */ @InternalApi - private[akka] object UnhandledBehavior extends Behavior[Nothing] { + private[akka] object UnhandledBehavior extends Behavior[Nothing](BehaviorTags.UnhandledBehavior) { override def toString = "Unhandled" } @@ -233,7 +253,7 @@ object Behavior { * Not placed in internal.BehaviorImpl because Behavior is sealed. */ @InternalApi - private[akka] abstract class DeferredBehavior[T] extends Behavior[T] { + private[akka] abstract class DeferredBehavior[T] extends Behavior[T](BehaviorTags.DeferredBehavior) { def apply(ctx: TypedActorContext[T]): Behavior[T] } @@ -250,11 +270,11 @@ object Behavior { /** * INTERNAL API */ - private[akka] object SameBehavior extends Behavior[Nothing] { + private[akka] object SameBehavior extends Behavior[Nothing](BehaviorTags.SameBehavior) { override def toString = "Same" } - private[akka] class FailedBehavior(val cause: Throwable) extends Behavior[Nothing] { + private[akka] class FailedBehavior(val cause: Throwable) extends Behavior[Nothing](BehaviorTags.FailedBehavior) { override def toString: String = s"Failed($cause)" } @@ -268,7 +288,7 @@ object Behavior { * that PostStop can be sent to previous behavior from `finishTerminate`. */ private[akka] sealed class StoppedBehavior[T](val postStop: OptionVal[TypedActorContext[T] => Unit]) - extends Behavior[T] { + extends Behavior[T](BehaviorTags.StoppedBehavior) { def onPostStop(ctx: TypedActorContext[T]): Unit = { postStop match { @@ -293,30 +313,13 @@ object Behavior { */ @tailrec def canonicalize[T](behavior: Behavior[T], current: Behavior[T], ctx: TypedActorContext[T]): Behavior[T] = - behavior match { - case SameBehavior => current - case UnhandledBehavior => current - case deferred: DeferredBehavior[T] => canonicalize(deferred(ctx), deferred, ctx) - case other => other - } - - /** - * INTERNAL API - * - * Return special behaviors as is, start deferred, if behavior is "non-special" apply the wrap function `f` to get - * and return the result from that. Useful for cases where a [[Behavior]] implementation that is decorating another - * behavior has processed a message and needs to re-wrap the resulting behavior with itself. - */ - @InternalApi - @tailrec - private[akka] def wrap[T, U](currentBehavior: Behavior[_], nextBehavior: Behavior[T], ctx: TypedActorContext[T])( - f: Behavior[T] => Behavior[U]): Behavior[U] = - nextBehavior match { - case SameBehavior | `currentBehavior` => same - case UnhandledBehavior => unhandled - case stopped: StoppedBehavior[T] => stopped.unsafeCast[U] // won't receive more messages so cast is safe - case deferred: DeferredBehavior[T] => wrap(currentBehavior, start(deferred, ctx), ctx)(f) - case other => f(other) + (behavior._tag: @switch) match { + case BehaviorTags.SameBehavior => current + case BehaviorTags.UnhandledBehavior => current + case BehaviorTags.DeferredBehavior => + val deferred = behavior.asInstanceOf[DeferredBehavior[T]] + canonicalize(deferred(ctx), deferred, ctx) + case _ => behavior } /** @@ -364,20 +367,15 @@ object Behavior { * out with a `Stopped` behavior is allowed, though. */ def validateAsInitial[T](behavior: Behavior[T]): Behavior[T] = - behavior match { - case SameBehavior | UnhandledBehavior => - throw new IllegalArgumentException(s"cannot use $behavior as initial behavior") - case x => x - } + if (behavior._tag == BehaviorTags.SameBehavior || behavior._tag == BehaviorTags.UnhandledBehavior) + throw new IllegalArgumentException(s"cannot use $behavior as initial behavior") + else behavior /** * Returns true if the given behavior is not stopped. */ - def isAlive[T](behavior: Behavior[T]): Boolean = behavior match { - case _: StoppedBehavior[_] => false - case _: FailedBehavior => false - case _ => true - } + def isAlive[T](behavior: Behavior[T]): Boolean = + !(behavior._tag == BehaviorTags.StoppedBehavior || behavior._tag == BehaviorTags.FailedBehavior) /** * Returns true if the given behavior is the special `unhandled` marker. @@ -387,22 +385,19 @@ object Behavior { /** * Returns true if the given behavior is the special `Unhandled` marker. */ - def isDeferred[T](behavior: Behavior[T]): Boolean = behavior match { - case _: DeferredBehavior[T] => true - case _ => false - } + def isDeferred[T](behavior: Behavior[T]): Boolean = behavior._tag == BehaviorTags.DeferredBehavior /** * Execute the behavior with the given message */ def interpretMessage[T](behavior: Behavior[T], ctx: TypedActorContext[T], msg: T): Behavior[T] = - interpret(behavior, ctx, msg) + interpret(behavior, ctx, msg, isSignal = false) /** * Execute the behavior with the given signal */ def interpretSignal[T](behavior: Behavior[T], ctx: TypedActorContext[T], signal: Signal): Behavior[T] = { - val result = interpret(behavior, ctx, signal) + val result = interpret(behavior, ctx, signal, isSignal = true) // we need to throw here to allow supervision of deathpact exception signal match { case Terminated(ref) if result == UnhandledBehavior => throw DeathPactException(ref) @@ -410,24 +405,37 @@ object Behavior { } } - private def interpret[T](behavior: Behavior[T], ctx: TypedActorContext[T], msg: Any): Behavior[T] = { - behavior match { - case null => throw new InvalidMessageException("[null] is not an allowed behavior") - case SameBehavior | UnhandledBehavior => + private def interpret[T]( + behavior: Behavior[T], + ctx: TypedActorContext[T], + msg: Any, + // optimization to avoid an instanceof on the message + isSignal: Boolean): Behavior[T] = { + if (behavior eq null) + throw InvalidMessageException("[null] is not an allowed behavior") + + (behavior._tag: @switch) match { + case BehaviorTags.SameBehavior => throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior") - case d: DeferredBehavior[_] => - throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter") - case IgnoreBehavior => Behavior.same[T] - case s: StoppedBehavior[T] => + case BehaviorTags.UnhandledBehavior => + throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior") + case BehaviorTags.DeferredBehavior => + throw new IllegalArgumentException(s"deferred [$behavior] should not be passed to interpreter") + case BehaviorTags.IgnoreBehavior => + Behavior.same[T] + case BehaviorTags.StoppedBehavior => + val s = behavior.asInstanceOf[StoppedBehavior[T]] if (msg == PostStop) s.onPostStop(ctx) s - case f: FailedBehavior => f - case EmptyBehavior => Behavior.unhandled[T] - case ext: ExtensibleBehavior[T] => - val possiblyDeferredResult = msg match { - case signal: Signal => ext.receiveSignal(ctx, signal) - case m => ext.receive(ctx, m.asInstanceOf[T]) - } + case BehaviorTags.FailedBehavior => + behavior + case BehaviorTags.EmptyBehavior => + Behavior.unhandled[T] + case BehaviorTags.ExtensibleBehavior => + val ext = behavior.asInstanceOf[ExtensibleBehavior[T]] + val possiblyDeferredResult = + if (isSignal) ext.receiveSignal(ctx, msg.asInstanceOf[Signal]) + else ext.receive(ctx, msg.asInstanceOf[T]) start(possiblyDeferredResult, ctx) } } diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala index 99f1fd6448..3da2372d2f 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorAdapter.scala @@ -21,6 +21,8 @@ import scala.util.Success import scala.util.Try import scala.util.control.Exception.Catcher +import scala.annotation.switch + /** * INTERNAL API */ @@ -33,6 +35,11 @@ import scala.util.control.Exception.Catcher * have logged it. */ final case class TypedActorFailedException(cause: Throwable) extends RuntimeException + + private val DummyReceive: untyped.Actor.Receive = { + case _ => throw new RuntimeException("receive should never be called on the typed ActorAdapter") + } + } /** @@ -44,7 +51,7 @@ import scala.util.control.Exception.Catcher import Behavior._ private var behavior: Behavior[T] = _initialBehavior - final def currentBehavior: Behavior[T] = behavior + def currentBehavior: Behavior[T] = behavior // context adapter construction must be lazy because so that it is not created before the system is ready // when the adapter is used for the user guardian (which avoids touching context until it is safe) @@ -60,30 +67,37 @@ import scala.util.control.Exception.Catcher */ private var failures: Map[untyped.ActorRef, Throwable] = Map.empty - def receive: Receive = running + def receive: Receive = ActorAdapter.DummyReceive - def running: Receive = { - case untyped.Terminated(ref) => - val msg = - if (failures contains ref) { - val ex = failures(ref) - failures -= ref - ChildFailed(ActorRefAdapter(ref), ex) - } else Terminated(ActorRefAdapter(ref)) - handleSignal(msg) - case untyped.ReceiveTimeout => - handleMessage(ctx.receiveTimeoutMsg) - case wrapped: AdaptMessage[Any, T] @unchecked => - withSafelyAdapted(() => wrapped.adapt()) { - case AdaptWithRegisteredMessageAdapter(msg) => - adaptAndHandle(msg) - case msg: T @unchecked => - handleMessage(msg) - } - case AdaptWithRegisteredMessageAdapter(msg) => - adaptAndHandle(msg) - case msg: T @unchecked => - handleMessage(msg) + override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = { + // as we know we never become in "normal" typed actors, it is just the current behavior that + // changes, we can avoid some overhead with the partial function/behavior stack of untyped entirely + // we also know that the receive is total, so we can avoid the orElse part as well. + msg match { + case untyped.Terminated(ref) => + val msg = + if (failures contains ref) { + val ex = failures(ref) + failures -= ref + ChildFailed(ActorRefAdapter(ref), ex) + } else Terminated(ActorRefAdapter(ref)) + handleSignal(msg) + case untyped.ReceiveTimeout => + handleMessage(ctx.receiveTimeoutMsg) + case wrapped: AdaptMessage[Any, T] @unchecked => + withSafelyAdapted(() => wrapped.adapt()) { + case AdaptWithRegisteredMessageAdapter(msg) => + adaptAndHandle(msg) + case msg: T @unchecked => + handleMessage(msg) + } + case AdaptWithRegisteredMessageAdapter(msg) => + adaptAndHandle(msg) + case signal: Signal => + handleSignal(signal) + case msg: T @unchecked => + handleMessage(msg) + } } private def handleMessage(msg: T): Unit = { @@ -111,18 +125,19 @@ import scala.util.control.Exception.Catcher } private def next(b: Behavior[T], msg: Any): Unit = { - if (Behavior.isUnhandled(b)) unhandled(msg) - else { - b match { - case f: FailedBehavior => - // For the parent untyped supervisor to pick up the exception - throw TypedActorFailedException(f.cause) - case stopped: StoppedBehavior[T] => - behavior = new ComposedStoppingBehavior[T](behavior, stopped) - context.stop(self) - case _ => - behavior = Behavior.canonicalize(b, behavior, ctx) - } + (b._tag: @switch) match { + case BehaviorTags.UnhandledBehavior => + unhandled(msg) + case BehaviorTags.FailedBehavior => + val f = b.asInstanceOf[FailedBehavior] + // For the parent untyped supervisor to pick up the exception + throw TypedActorFailedException(f.cause) + case BehaviorTags.StoppedBehavior => + val stopped = b.asInstanceOf[StoppedBehavior[T]] + behavior = new ComposedStoppingBehavior[T](behavior, stopped) + context.stop(self) + case _ => + behavior = Behavior.canonicalize(b, behavior, ctx) } } @@ -193,7 +208,6 @@ import scala.util.control.Exception.Catcher override def preStart(): Unit = { if (isAlive(behavior)) { - context.become(running) behavior = validateAsInitial(Behavior.start(behavior, ctx)) } // either was stopped initially or became stopped on start