Move hotswap stack into Actor trait. See #1717
This commit is contained in:
parent
c1178c9b1a
commit
d12d56a1ae
6 changed files with 78 additions and 23 deletions
49
akka-actor/src/main/scala/Errtest.scala
Normal file
49
akka-actor/src/main/scala/Errtest.scala
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
|
||||
import akka.actor._
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.event.LoggingReceive
|
||||
import scala.annotation.tailrec
|
||||
|
||||
object Errtest extends App {
|
||||
|
||||
val config = ConfigFactory.parseString("""
|
||||
akka.loglevel = DEBUG
|
||||
akka.actor.debug {
|
||||
receive = on
|
||||
lifecycle = on
|
||||
}
|
||||
""")
|
||||
|
||||
val sys = ActorSystem("ErrSys", config)
|
||||
val top = sys.actorOf(Props[Top], name = "top")
|
||||
|
||||
for (n ← 1 to 100) {
|
||||
top ! "run " + n
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
}
|
||||
|
||||
class Top extends Actor {
|
||||
var c: ActorRef = _
|
||||
def receive = LoggingReceive {
|
||||
case x ⇒
|
||||
c = context.actorOf(Props[Child]);
|
||||
c ! "ok"
|
||||
}
|
||||
}
|
||||
|
||||
class Child extends Actor {
|
||||
|
||||
//throw new Error("Simulated ERR")
|
||||
blowUp(0)
|
||||
|
||||
//not @tailrec
|
||||
private final def blowUp(n: Long): Long = {
|
||||
blowUp(n + 1) + 1
|
||||
}
|
||||
|
||||
def receive = LoggingReceive {
|
||||
case x ⇒
|
||||
//context.system.shutdown();
|
||||
}
|
||||
}
|
||||
|
|
@ -7,6 +7,7 @@ package akka.actor
|
|||
import akka.AkkaException
|
||||
import scala.reflect.BeanProperty
|
||||
import scala.util.control.NoStackTrace
|
||||
import scala.collection.immutable.Stack
|
||||
import java.util.regex.Pattern
|
||||
|
||||
/**
|
||||
|
|
@ -112,6 +113,8 @@ object Actor {
|
|||
def isDefinedAt(x: Any) = false
|
||||
def apply(x: Any) = throw new UnsupportedOperationException("Empty behavior apply()")
|
||||
}
|
||||
|
||||
private final val emptyBehaviourStack: Stack[Actor.Receive] = Stack.empty
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -172,7 +175,7 @@ trait Actor {
|
|||
type Receive = Actor.Receive
|
||||
|
||||
/**
|
||||
* Stores the context for this actor, including self, sender, and hotswap.
|
||||
* Stores the context for this actor, including self, and sender.
|
||||
* It is implicit to support operations such as `forward`.
|
||||
*
|
||||
* [[akka.actor.ActorContext]] is the Scala API. `getContext` returns a
|
||||
|
|
@ -282,7 +285,6 @@ trait Actor {
|
|||
// =========================================
|
||||
|
||||
private[akka] final def apply(msg: Any) = {
|
||||
val behaviorStack = context.asInstanceOf[ActorCell].hotswap
|
||||
msg match {
|
||||
case msg if behaviorStack.nonEmpty && behaviorStack.head.isDefinedAt(msg) ⇒ behaviorStack.head.apply(msg)
|
||||
case msg if behaviorStack.isEmpty && processingBehavior.isDefinedAt(msg) ⇒ processingBehavior.apply(msg)
|
||||
|
|
@ -291,5 +293,18 @@ trait Actor {
|
|||
}
|
||||
|
||||
private[this] val processingBehavior = receive //ProcessingBehavior is the original behavior
|
||||
|
||||
private[akka] def pushBehavior(behavior: Receive): Unit = {
|
||||
behaviorStack = behaviorStack.push(behavior)
|
||||
}
|
||||
|
||||
private[akka] def popBehavior(): Unit = {
|
||||
val stack = behaviorStack
|
||||
if (stack.nonEmpty) behaviorStack = stack.pop
|
||||
}
|
||||
|
||||
private[akka] def clearBehaviorStack(): Unit = { behaviorStack = emptyBehaviourStack }
|
||||
|
||||
private var behaviorStack: Stack[PartialFunction[Any, Unit]] = emptyBehaviourStack
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -174,8 +174,7 @@ private[akka] class ActorCell(
|
|||
val self: InternalActorRef,
|
||||
val props: Props,
|
||||
@volatile var parent: InternalActorRef,
|
||||
/*no member*/ _receiveTimeout: Option[Duration],
|
||||
var hotswap: Stack[PartialFunction[Any, Unit]]) extends UntypedActorContext {
|
||||
/*no member*/ _receiveTimeout: Option[Duration]) extends UntypedActorContext {
|
||||
|
||||
import ActorCell._
|
||||
|
||||
|
|
@ -389,7 +388,6 @@ private[akka] class ActorCell(
|
|||
}
|
||||
}
|
||||
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
|
||||
hotswap = Props.noHotSwap // Reset the behavior
|
||||
freshActor.postRestart(cause)
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
|
||||
|
||||
|
|
@ -509,9 +507,9 @@ private[akka] class ActorCell(
|
|||
}
|
||||
}
|
||||
|
||||
def become(behavior: Actor.Receive, discardOld: Boolean = true) {
|
||||
def become(behavior: Actor.Receive, discardOld: Boolean = true): Unit = {
|
||||
if (discardOld) unbecome()
|
||||
hotswap = hotswap.push(behavior)
|
||||
actor.pushBehavior(behavior)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -527,10 +525,7 @@ private[akka] class ActorCell(
|
|||
become(newReceive, discardOld)
|
||||
}
|
||||
|
||||
def unbecome() {
|
||||
val h = hotswap
|
||||
if (h.nonEmpty) hotswap = h.pop
|
||||
}
|
||||
def unbecome(): Unit = actor.popBehavior()
|
||||
|
||||
def autoReceiveMessage(msg: Envelope) {
|
||||
if (system.settings.DebugAutoReceive)
|
||||
|
|
@ -547,9 +542,9 @@ private[akka] class ActorCell(
|
|||
}
|
||||
|
||||
private def doTerminate() {
|
||||
val a = actor
|
||||
try {
|
||||
try {
|
||||
val a = actor
|
||||
if (a ne null) a.postStop()
|
||||
} finally {
|
||||
dispatcher.detach(this)
|
||||
|
|
@ -563,7 +558,7 @@ private[akka] class ActorCell(
|
|||
} finally {
|
||||
currentMessage = null
|
||||
clearActorFields()
|
||||
hotswap = Props.noHotSwap
|
||||
if (a ne null) a.clearBehaviorStack()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -224,8 +224,7 @@ private[akka] class LocalActorRef private[akka] (
|
|||
_supervisor: InternalActorRef,
|
||||
val path: ActorPath,
|
||||
val systemService: Boolean = false,
|
||||
_receiveTimeout: Option[Duration] = None,
|
||||
_hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
|
||||
_receiveTimeout: Option[Duration] = None)
|
||||
extends InternalActorRef with LocalRef {
|
||||
|
||||
/*
|
||||
|
|
@ -238,7 +237,7 @@ private[akka] class LocalActorRef private[akka] (
|
|||
* us to use purely factory methods for creating LocalActorRefs.
|
||||
*/
|
||||
@volatile
|
||||
private var actorCell = newActorCell(_system, this, _props, _supervisor, _receiveTimeout, _hotswap)
|
||||
private var actorCell = newActorCell(_system, this, _props, _supervisor, _receiveTimeout)
|
||||
actorCell.start()
|
||||
|
||||
protected def newActorCell(
|
||||
|
|
@ -246,9 +245,8 @@ private[akka] class LocalActorRef private[akka] (
|
|||
ref: InternalActorRef,
|
||||
props: Props,
|
||||
supervisor: InternalActorRef,
|
||||
receiveTimeout: Option[Duration],
|
||||
hotswap: Stack[PartialFunction[Any, Unit]]): ActorCell =
|
||||
new ActorCell(system, ref, props, supervisor, receiveTimeout, hotswap)
|
||||
receiveTimeout: Option[Duration]): ActorCell =
|
||||
new ActorCell(system, ref, props, supervisor, receiveTimeout)
|
||||
|
||||
protected def actorContext: ActorContext = actorCell
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ object Props {
|
|||
|
||||
final val defaultRoutedProps: RouterConfig = NoRouter
|
||||
|
||||
final val noHotSwap: Stack[Actor.Receive] = Stack.empty
|
||||
final val empty = new Props(() ⇒ new Actor { def receive = Actor.emptyBehavior })
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -41,9 +41,8 @@ class TestActorRef[T <: Actor](
|
|||
ref: InternalActorRef,
|
||||
props: Props,
|
||||
supervisor: InternalActorRef,
|
||||
receiveTimeout: Option[Duration],
|
||||
hotswap: Stack[PartialFunction[Any, Unit]]): ActorCell =
|
||||
new ActorCell(system, ref, props, supervisor, receiveTimeout, hotswap) {
|
||||
receiveTimeout: Option[Duration]): ActorCell =
|
||||
new ActorCell(system, ref, props, supervisor, receiveTimeout) {
|
||||
override def autoReceiveMessage(msg: Envelope) {
|
||||
msg.message match {
|
||||
case InternalGetActor ⇒ sender ! actor
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue