#1896 - Sprinkling some magic sauce so that we can support recreating the exact same instance without ending up in a world of pain. A world of pain.
This commit is contained in:
parent
1614ae3b9f
commit
96d657f048
2 changed files with 62 additions and 12 deletions
|
|
@ -157,6 +157,55 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
|
|||
expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
"restart properly when same instance is returned" in {
|
||||
val restarts = 3 //max number of restarts
|
||||
lazy val childInstance = new Actor {
|
||||
var preRestarts = 0
|
||||
var postRestarts = 0
|
||||
var preStarts = 0
|
||||
var postStops = 0
|
||||
override def preRestart(reason: Throwable, message: Option[Any]) { preRestarts += 1; testActor ! ("preRestart" + preRestarts) }
|
||||
override def postRestart(reason: Throwable) { postRestarts += 1; testActor ! ("postRestart" + postRestarts) }
|
||||
override def preStart() { preStarts += 1; testActor ! ("preStart" + preStarts) }
|
||||
override def postStop() { postStops += 1; testActor ! ("postStop" + postStops) }
|
||||
def receive = {
|
||||
case "crash" ⇒ testActor ! "crashed"; throw new RuntimeException("Expected")
|
||||
case "ping" ⇒ sender ! "pong"
|
||||
}
|
||||
}
|
||||
val master = system.actorOf(Props(new Actor {
|
||||
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = restarts)(List(classOf[Exception]))
|
||||
val child = context.actorOf(Props(childInstance))
|
||||
def receive = {
|
||||
case msg ⇒ child forward msg
|
||||
}
|
||||
}))
|
||||
|
||||
expectMsg("preStart1")
|
||||
|
||||
master ! "ping"
|
||||
expectMsg("pong")
|
||||
|
||||
filterEvents(EventFilter[RuntimeException]("Expected", occurrences = restarts + 1)) {
|
||||
(1 to restarts) foreach {
|
||||
i ⇒
|
||||
master ! "crash"
|
||||
expectMsg("crashed")
|
||||
|
||||
expectMsg("preRestart" + i)
|
||||
expectMsg("postRestart" + i)
|
||||
|
||||
master ! "ping"
|
||||
expectMsg("pong")
|
||||
}
|
||||
master ! "crash"
|
||||
expectMsg("crashed")
|
||||
expectMsg("postStop1")
|
||||
}
|
||||
|
||||
expectNoMsg(1 second)
|
||||
}
|
||||
|
||||
"not restart temporary actor" in {
|
||||
val (temporaryActor, _) = temporaryActorAllForOne
|
||||
|
||||
|
|
|
|||
|
|
@ -529,7 +529,7 @@ private[akka] class ActorCell(
|
|||
try {
|
||||
if (failedActor.context ne null) failedActor.preRestart(cause, if (c ne null) Some(c.message) else None)
|
||||
} finally {
|
||||
clearActorFields()
|
||||
clearActorFields(failedActor)
|
||||
}
|
||||
}
|
||||
childrenRefs match {
|
||||
|
|
@ -537,7 +537,7 @@ private[akka] class ActorCell(
|
|||
childrenRefs = ct.copy(reason = Recreation(cause))
|
||||
dispatcher suspend this
|
||||
case _ ⇒
|
||||
doRecreate(cause)
|
||||
doRecreate(cause, failedActor)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) ⇒ try {
|
||||
|
|
@ -696,18 +696,19 @@ private[akka] class ActorCell(
|
|||
system.eventStream.publish(Debug(self.path.toString, clazz(actor), "stopped"))
|
||||
} finally {
|
||||
if (a ne null) a.clearBehaviorStack()
|
||||
clearActorFields()
|
||||
clearActorFields(a)
|
||||
actor = null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def doRecreate(cause: Throwable): Unit = try {
|
||||
private def doRecreate(cause: Throwable, failedActor: Actor): Unit = try {
|
||||
// after all killed children have terminated, recreate the rest, then go on to start the new instance
|
||||
actor.supervisorStrategy.handleSupervisorRestarted(cause, self, children)
|
||||
|
||||
val freshActor = newActor()
|
||||
actor = freshActor // this must happen before postRestart has a chance to fail
|
||||
if (freshActor eq failedActor) setActorFields(freshActor, this, self) // If the creator returns the same instance, we need to restore our nulled out fields.
|
||||
|
||||
freshActor.postRestart(cause)
|
||||
if (system.settings.DebugLifecycle) system.eventStream.publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
|
||||
|
|
@ -719,6 +720,7 @@ private[akka] class ActorCell(
|
|||
// prevent any further messages to be processed until the actor has been restarted
|
||||
dispatcher.suspend(this)
|
||||
actor.supervisorStrategy.handleSupervisorFailing(self, children)
|
||||
clearActorFields(actor) // If this fails, we need to ensure that preRestart isn't called.
|
||||
} finally {
|
||||
parent.tell(Failed(ActorInitializationException(self, "exception during re-creation", e)), self)
|
||||
}
|
||||
|
|
@ -736,7 +738,7 @@ private[akka] class ActorCell(
|
|||
childrenRefs = n
|
||||
actor.supervisorStrategy.handleChildTerminated(this, child, children)
|
||||
if (!n.isInstanceOf[TerminatingChildrenContainer]) reason match {
|
||||
case Recreation(cause) ⇒ doRecreate(cause)
|
||||
case Recreation(cause) ⇒ doRecreate(cause, actor) // doRecreate since this is the continuation of "recreate"
|
||||
case Termination ⇒ doTerminate()
|
||||
case _ ⇒
|
||||
}
|
||||
|
|
@ -775,12 +777,12 @@ private[akka] class ActorCell(
|
|||
}
|
||||
}
|
||||
|
||||
final def clearActorFields(): Unit = {
|
||||
setActorFields(context = null, self = system.deadLetters)
|
||||
final def clearActorFields(actorInstance: Actor): Unit = {
|
||||
setActorFields(actorInstance, context = null, self = system.deadLetters)
|
||||
currentMessage = null
|
||||
}
|
||||
|
||||
final def setActorFields(context: ActorContext, self: ActorRef) {
|
||||
final def setActorFields(actorInstance: Actor, context: ActorContext, self: ActorRef) {
|
||||
@tailrec
|
||||
def lookupAndSetField(clazz: Class[_], actor: Actor, name: String, value: Any): Boolean = {
|
||||
val success = try {
|
||||
|
|
@ -799,10 +801,9 @@ private[akka] class ActorCell(
|
|||
lookupAndSetField(parent, actor, name, value)
|
||||
}
|
||||
}
|
||||
val a = actor
|
||||
if (a ne null) {
|
||||
lookupAndSetField(a.getClass, a, "context", context)
|
||||
lookupAndSetField(a.getClass, a, "self", self)
|
||||
if (actorInstance ne null) {
|
||||
lookupAndSetField(actorInstance.getClass, actorInstance, "context", context)
|
||||
lookupAndSetField(actorInstance.getClass, actorInstance, "self", self)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue