Typed perf improvements, #25986
* Table switch for interpreter ~20% faster * Avoid instanceof on message in interpreter (but keep single method) ~10% faster * Use the behavior tags all over ~4% improvement * Make next a single tableswitch * Behavior.wrap isn't actually used anymore * Sidestep the untyped behavior stack
This commit is contained in:
parent
7b763c815e
commit
78df394039
2 changed files with 125 additions and 103 deletions
|
|
@ -5,16 +5,36 @@
|
|||
package akka.actor.typed
|
||||
|
||||
import scala.annotation.tailrec
|
||||
|
||||
import akka.actor.InvalidMessageException
|
||||
import akka.actor.typed.internal.BehaviorImpl
|
||||
import akka.actor.typed.internal.WrappingBehavior
|
||||
import akka.actor.typed.internal.BehaviorImpl.OrElseBehavior
|
||||
|
||||
import akka.util.{ LineNumbers, OptionVal }
|
||||
import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi }
|
||||
import akka.actor.typed.scaladsl.{ ActorContext => SAC }
|
||||
|
||||
import scala.annotation.switch
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object BehaviorTags {
|
||||
|
||||
// optimization - by keeping an identifier for each concrete subtype of behavior
|
||||
// without gaps we can do table switches instead of instance of checks when interpreting
|
||||
// note that these must be compile time constants for it to work
|
||||
final val ExtensibleBehavior = 1
|
||||
final val EmptyBehavior = 2
|
||||
final val IgnoreBehavior = 3
|
||||
final val UnhandledBehavior = 4
|
||||
final val DeferredBehavior = 5
|
||||
final val SameBehavior = 6
|
||||
final val FailedBehavior = 7
|
||||
final val StoppedBehavior = 8
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The behavior of an actor defines how it reacts to the messages that it
|
||||
* receives. The message may either be of the type that the Actor declares
|
||||
|
|
@ -37,7 +57,7 @@ import akka.actor.typed.scaladsl.{ ActorContext => SAC }
|
|||
*/
|
||||
@ApiMayChange
|
||||
@DoNotInherit
|
||||
abstract class Behavior[T] { behavior =>
|
||||
abstract class Behavior[T](private[akka] val _tag: Int) { behavior =>
|
||||
|
||||
/**
|
||||
* Narrow the type of this Behavior, which is always a safe operation. This
|
||||
|
|
@ -77,7 +97,7 @@ abstract class Behavior[T] { behavior =>
|
|||
* Note that behaviors that keep an inner behavior, and intercepts messages for it should not be implemented as
|
||||
* an extensible behavior but should instead use the [[BehaviorInterceptor]]
|
||||
*/
|
||||
abstract class ExtensibleBehavior[T] extends Behavior[T] {
|
||||
abstract class ExtensibleBehavior[T] extends Behavior[T](BehaviorTags.ExtensibleBehavior) {
|
||||
|
||||
/**
|
||||
* Process an incoming message and return the next behavior.
|
||||
|
|
@ -194,7 +214,7 @@ object Behavior {
|
|||
* INTERNAL API.
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object EmptyBehavior extends Behavior[Any] {
|
||||
private[akka] object EmptyBehavior extends Behavior[Any](BehaviorTags.EmptyBehavior) {
|
||||
override def toString = "Empty"
|
||||
}
|
||||
|
||||
|
|
@ -202,7 +222,7 @@ object Behavior {
|
|||
* INTERNAL API.
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object IgnoreBehavior extends Behavior[Any] {
|
||||
private[akka] object IgnoreBehavior extends Behavior[Any](BehaviorTags.IgnoreBehavior) {
|
||||
override def toString = "Ignore"
|
||||
}
|
||||
|
||||
|
|
@ -210,7 +230,7 @@ object Behavior {
|
|||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] object UnhandledBehavior extends Behavior[Nothing] {
|
||||
private[akka] object UnhandledBehavior extends Behavior[Nothing](BehaviorTags.UnhandledBehavior) {
|
||||
override def toString = "Unhandled"
|
||||
}
|
||||
|
||||
|
|
@ -233,7 +253,7 @@ object Behavior {
|
|||
* Not placed in internal.BehaviorImpl because Behavior is sealed.
|
||||
*/
|
||||
@InternalApi
|
||||
private[akka] abstract class DeferredBehavior[T] extends Behavior[T] {
|
||||
private[akka] abstract class DeferredBehavior[T] extends Behavior[T](BehaviorTags.DeferredBehavior) {
|
||||
def apply(ctx: TypedActorContext[T]): Behavior[T]
|
||||
}
|
||||
|
||||
|
|
@ -250,11 +270,11 @@ object Behavior {
|
|||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object SameBehavior extends Behavior[Nothing] {
|
||||
private[akka] object SameBehavior extends Behavior[Nothing](BehaviorTags.SameBehavior) {
|
||||
override def toString = "Same"
|
||||
}
|
||||
|
||||
private[akka] class FailedBehavior(val cause: Throwable) extends Behavior[Nothing] {
|
||||
private[akka] class FailedBehavior(val cause: Throwable) extends Behavior[Nothing](BehaviorTags.FailedBehavior) {
|
||||
override def toString: String = s"Failed($cause)"
|
||||
}
|
||||
|
||||
|
|
@ -268,7 +288,7 @@ object Behavior {
|
|||
* that PostStop can be sent to previous behavior from `finishTerminate`.
|
||||
*/
|
||||
private[akka] sealed class StoppedBehavior[T](val postStop: OptionVal[TypedActorContext[T] => Unit])
|
||||
extends Behavior[T] {
|
||||
extends Behavior[T](BehaviorTags.StoppedBehavior) {
|
||||
|
||||
def onPostStop(ctx: TypedActorContext[T]): Unit = {
|
||||
postStop match {
|
||||
|
|
@ -293,30 +313,13 @@ object Behavior {
|
|||
*/
|
||||
@tailrec
|
||||
def canonicalize[T](behavior: Behavior[T], current: Behavior[T], ctx: TypedActorContext[T]): Behavior[T] =
|
||||
behavior match {
|
||||
case SameBehavior => current
|
||||
case UnhandledBehavior => current
|
||||
case deferred: DeferredBehavior[T] => canonicalize(deferred(ctx), deferred, ctx)
|
||||
case other => other
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*
|
||||
* Return special behaviors as is, start deferred, if behavior is "non-special" apply the wrap function `f` to get
|
||||
* and return the result from that. Useful for cases where a [[Behavior]] implementation that is decorating another
|
||||
* behavior has processed a message and needs to re-wrap the resulting behavior with itself.
|
||||
*/
|
||||
@InternalApi
|
||||
@tailrec
|
||||
private[akka] def wrap[T, U](currentBehavior: Behavior[_], nextBehavior: Behavior[T], ctx: TypedActorContext[T])(
|
||||
f: Behavior[T] => Behavior[U]): Behavior[U] =
|
||||
nextBehavior match {
|
||||
case SameBehavior | `currentBehavior` => same
|
||||
case UnhandledBehavior => unhandled
|
||||
case stopped: StoppedBehavior[T] => stopped.unsafeCast[U] // won't receive more messages so cast is safe
|
||||
case deferred: DeferredBehavior[T] => wrap(currentBehavior, start(deferred, ctx), ctx)(f)
|
||||
case other => f(other)
|
||||
(behavior._tag: @switch) match {
|
||||
case BehaviorTags.SameBehavior => current
|
||||
case BehaviorTags.UnhandledBehavior => current
|
||||
case BehaviorTags.DeferredBehavior =>
|
||||
val deferred = behavior.asInstanceOf[DeferredBehavior[T]]
|
||||
canonicalize(deferred(ctx), deferred, ctx)
|
||||
case _ => behavior
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -364,20 +367,15 @@ object Behavior {
|
|||
* out with a `Stopped` behavior is allowed, though.
|
||||
*/
|
||||
def validateAsInitial[T](behavior: Behavior[T]): Behavior[T] =
|
||||
behavior match {
|
||||
case SameBehavior | UnhandledBehavior =>
|
||||
throw new IllegalArgumentException(s"cannot use $behavior as initial behavior")
|
||||
case x => x
|
||||
}
|
||||
if (behavior._tag == BehaviorTags.SameBehavior || behavior._tag == BehaviorTags.UnhandledBehavior)
|
||||
throw new IllegalArgumentException(s"cannot use $behavior as initial behavior")
|
||||
else behavior
|
||||
|
||||
/**
|
||||
* Returns true if the given behavior is not stopped.
|
||||
*/
|
||||
def isAlive[T](behavior: Behavior[T]): Boolean = behavior match {
|
||||
case _: StoppedBehavior[_] => false
|
||||
case _: FailedBehavior => false
|
||||
case _ => true
|
||||
}
|
||||
def isAlive[T](behavior: Behavior[T]): Boolean =
|
||||
!(behavior._tag == BehaviorTags.StoppedBehavior || behavior._tag == BehaviorTags.FailedBehavior)
|
||||
|
||||
/**
|
||||
* Returns true if the given behavior is the special `unhandled` marker.
|
||||
|
|
@ -387,22 +385,19 @@ object Behavior {
|
|||
/**
|
||||
* Returns true if the given behavior is the special `Unhandled` marker.
|
||||
*/
|
||||
def isDeferred[T](behavior: Behavior[T]): Boolean = behavior match {
|
||||
case _: DeferredBehavior[T] => true
|
||||
case _ => false
|
||||
}
|
||||
def isDeferred[T](behavior: Behavior[T]): Boolean = behavior._tag == BehaviorTags.DeferredBehavior
|
||||
|
||||
/**
|
||||
* Execute the behavior with the given message
|
||||
*/
|
||||
def interpretMessage[T](behavior: Behavior[T], ctx: TypedActorContext[T], msg: T): Behavior[T] =
|
||||
interpret(behavior, ctx, msg)
|
||||
interpret(behavior, ctx, msg, isSignal = false)
|
||||
|
||||
/**
|
||||
* Execute the behavior with the given signal
|
||||
*/
|
||||
def interpretSignal[T](behavior: Behavior[T], ctx: TypedActorContext[T], signal: Signal): Behavior[T] = {
|
||||
val result = interpret(behavior, ctx, signal)
|
||||
val result = interpret(behavior, ctx, signal, isSignal = true)
|
||||
// we need to throw here to allow supervision of deathpact exception
|
||||
signal match {
|
||||
case Terminated(ref) if result == UnhandledBehavior => throw DeathPactException(ref)
|
||||
|
|
@ -410,24 +405,37 @@ object Behavior {
|
|||
}
|
||||
}
|
||||
|
||||
private def interpret[T](behavior: Behavior[T], ctx: TypedActorContext[T], msg: Any): Behavior[T] = {
|
||||
behavior match {
|
||||
case null => throw new InvalidMessageException("[null] is not an allowed behavior")
|
||||
case SameBehavior | UnhandledBehavior =>
|
||||
private def interpret[T](
|
||||
behavior: Behavior[T],
|
||||
ctx: TypedActorContext[T],
|
||||
msg: Any,
|
||||
// optimization to avoid an instanceof on the message
|
||||
isSignal: Boolean): Behavior[T] = {
|
||||
if (behavior eq null)
|
||||
throw InvalidMessageException("[null] is not an allowed behavior")
|
||||
|
||||
(behavior._tag: @switch) match {
|
||||
case BehaviorTags.SameBehavior =>
|
||||
throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
|
||||
case d: DeferredBehavior[_] =>
|
||||
throw new IllegalArgumentException(s"deferred [$d] should not be passed to interpreter")
|
||||
case IgnoreBehavior => Behavior.same[T]
|
||||
case s: StoppedBehavior[T] =>
|
||||
case BehaviorTags.UnhandledBehavior =>
|
||||
throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
|
||||
case BehaviorTags.DeferredBehavior =>
|
||||
throw new IllegalArgumentException(s"deferred [$behavior] should not be passed to interpreter")
|
||||
case BehaviorTags.IgnoreBehavior =>
|
||||
Behavior.same[T]
|
||||
case BehaviorTags.StoppedBehavior =>
|
||||
val s = behavior.asInstanceOf[StoppedBehavior[T]]
|
||||
if (msg == PostStop) s.onPostStop(ctx)
|
||||
s
|
||||
case f: FailedBehavior => f
|
||||
case EmptyBehavior => Behavior.unhandled[T]
|
||||
case ext: ExtensibleBehavior[T] =>
|
||||
val possiblyDeferredResult = msg match {
|
||||
case signal: Signal => ext.receiveSignal(ctx, signal)
|
||||
case m => ext.receive(ctx, m.asInstanceOf[T])
|
||||
}
|
||||
case BehaviorTags.FailedBehavior =>
|
||||
behavior
|
||||
case BehaviorTags.EmptyBehavior =>
|
||||
Behavior.unhandled[T]
|
||||
case BehaviorTags.ExtensibleBehavior =>
|
||||
val ext = behavior.asInstanceOf[ExtensibleBehavior[T]]
|
||||
val possiblyDeferredResult =
|
||||
if (isSignal) ext.receiveSignal(ctx, msg.asInstanceOf[Signal])
|
||||
else ext.receive(ctx, msg.asInstanceOf[T])
|
||||
start(possiblyDeferredResult, ctx)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ import scala.util.Success
|
|||
import scala.util.Try
|
||||
import scala.util.control.Exception.Catcher
|
||||
|
||||
import scala.annotation.switch
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
|
|
@ -33,6 +35,11 @@ import scala.util.control.Exception.Catcher
|
|||
* have logged it.
|
||||
*/
|
||||
final case class TypedActorFailedException(cause: Throwable) extends RuntimeException
|
||||
|
||||
private val DummyReceive: untyped.Actor.Receive = {
|
||||
case _ => throw new RuntimeException("receive should never be called on the typed ActorAdapter")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -44,7 +51,7 @@ import scala.util.control.Exception.Catcher
|
|||
import Behavior._
|
||||
|
||||
private var behavior: Behavior[T] = _initialBehavior
|
||||
final def currentBehavior: Behavior[T] = behavior
|
||||
def currentBehavior: Behavior[T] = behavior
|
||||
|
||||
// context adapter construction must be lazy because so that it is not created before the system is ready
|
||||
// when the adapter is used for the user guardian (which avoids touching context until it is safe)
|
||||
|
|
@ -60,30 +67,37 @@ import scala.util.control.Exception.Catcher
|
|||
*/
|
||||
private var failures: Map[untyped.ActorRef, Throwable] = Map.empty
|
||||
|
||||
def receive: Receive = running
|
||||
def receive: Receive = ActorAdapter.DummyReceive
|
||||
|
||||
def running: Receive = {
|
||||
case untyped.Terminated(ref) =>
|
||||
val msg =
|
||||
if (failures contains ref) {
|
||||
val ex = failures(ref)
|
||||
failures -= ref
|
||||
ChildFailed(ActorRefAdapter(ref), ex)
|
||||
} else Terminated(ActorRefAdapter(ref))
|
||||
handleSignal(msg)
|
||||
case untyped.ReceiveTimeout =>
|
||||
handleMessage(ctx.receiveTimeoutMsg)
|
||||
case wrapped: AdaptMessage[Any, T] @unchecked =>
|
||||
withSafelyAdapted(() => wrapped.adapt()) {
|
||||
case AdaptWithRegisteredMessageAdapter(msg) =>
|
||||
adaptAndHandle(msg)
|
||||
case msg: T @unchecked =>
|
||||
handleMessage(msg)
|
||||
}
|
||||
case AdaptWithRegisteredMessageAdapter(msg) =>
|
||||
adaptAndHandle(msg)
|
||||
case msg: T @unchecked =>
|
||||
handleMessage(msg)
|
||||
override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = {
|
||||
// as we know we never become in "normal" typed actors, it is just the current behavior that
|
||||
// changes, we can avoid some overhead with the partial function/behavior stack of untyped entirely
|
||||
// we also know that the receive is total, so we can avoid the orElse part as well.
|
||||
msg match {
|
||||
case untyped.Terminated(ref) =>
|
||||
val msg =
|
||||
if (failures contains ref) {
|
||||
val ex = failures(ref)
|
||||
failures -= ref
|
||||
ChildFailed(ActorRefAdapter(ref), ex)
|
||||
} else Terminated(ActorRefAdapter(ref))
|
||||
handleSignal(msg)
|
||||
case untyped.ReceiveTimeout =>
|
||||
handleMessage(ctx.receiveTimeoutMsg)
|
||||
case wrapped: AdaptMessage[Any, T] @unchecked =>
|
||||
withSafelyAdapted(() => wrapped.adapt()) {
|
||||
case AdaptWithRegisteredMessageAdapter(msg) =>
|
||||
adaptAndHandle(msg)
|
||||
case msg: T @unchecked =>
|
||||
handleMessage(msg)
|
||||
}
|
||||
case AdaptWithRegisteredMessageAdapter(msg) =>
|
||||
adaptAndHandle(msg)
|
||||
case signal: Signal =>
|
||||
handleSignal(signal)
|
||||
case msg: T @unchecked =>
|
||||
handleMessage(msg)
|
||||
}
|
||||
}
|
||||
|
||||
private def handleMessage(msg: T): Unit = {
|
||||
|
|
@ -111,18 +125,19 @@ import scala.util.control.Exception.Catcher
|
|||
}
|
||||
|
||||
private def next(b: Behavior[T], msg: Any): Unit = {
|
||||
if (Behavior.isUnhandled(b)) unhandled(msg)
|
||||
else {
|
||||
b match {
|
||||
case f: FailedBehavior =>
|
||||
// For the parent untyped supervisor to pick up the exception
|
||||
throw TypedActorFailedException(f.cause)
|
||||
case stopped: StoppedBehavior[T] =>
|
||||
behavior = new ComposedStoppingBehavior[T](behavior, stopped)
|
||||
context.stop(self)
|
||||
case _ =>
|
||||
behavior = Behavior.canonicalize(b, behavior, ctx)
|
||||
}
|
||||
(b._tag: @switch) match {
|
||||
case BehaviorTags.UnhandledBehavior =>
|
||||
unhandled(msg)
|
||||
case BehaviorTags.FailedBehavior =>
|
||||
val f = b.asInstanceOf[FailedBehavior]
|
||||
// For the parent untyped supervisor to pick up the exception
|
||||
throw TypedActorFailedException(f.cause)
|
||||
case BehaviorTags.StoppedBehavior =>
|
||||
val stopped = b.asInstanceOf[StoppedBehavior[T]]
|
||||
behavior = new ComposedStoppingBehavior[T](behavior, stopped)
|
||||
context.stop(self)
|
||||
case _ =>
|
||||
behavior = Behavior.canonicalize(b, behavior, ctx)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -193,7 +208,6 @@ import scala.util.control.Exception.Catcher
|
|||
|
||||
override def preStart(): Unit = {
|
||||
if (isAlive(behavior)) {
|
||||
context.become(running)
|
||||
behavior = validateAsInitial(Behavior.start(behavior, ctx))
|
||||
}
|
||||
// either was stopped initially or became stopped on start
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue