Moving the cause into Recreate
This commit is contained in:
parent
5321d02f64
commit
785e2a2636
5 changed files with 71 additions and 67 deletions
|
|
@ -30,7 +30,7 @@ object SupervisorSpec {
|
|||
|
||||
val PingMessage = "ping"
|
||||
val PongMessage = "pong"
|
||||
val ExceptionMessage = "CRASHED" //"Expected exception; to test fault-tolerance"
|
||||
val ExceptionMessage = "Expected exception; to test fault-tolerance"
|
||||
|
||||
var messageLog = new LinkedBlockingQueue[String]
|
||||
|
||||
|
|
|
|||
|
|
@ -98,9 +98,9 @@ sealed abstract class FaultHandlingStrategy {
|
|||
linkedActors.foreach(_.child.suspend())
|
||||
}
|
||||
|
||||
def handleSupervisorRestarted(supervisor: ActorRef, linkedActors: List[ChildRestartStats]): Unit = {
|
||||
def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, linkedActors: List[ChildRestartStats]): Unit = {
|
||||
if (linkedActors.nonEmpty)
|
||||
linkedActors.foreach(_.child.restart())
|
||||
linkedActors.foreach(_.child.restart(cause))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -149,7 +149,7 @@ case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]],
|
|||
def processFailure(fail: Failed, linkedActors: List[ChildRestartStats]): Unit = {
|
||||
if (linkedActors.nonEmpty) {
|
||||
if (linkedActors.forall(_.requestRestartPermission(maxNrOfRetries, withinTimeRange)))
|
||||
linkedActors.foreach(_.child.restart())
|
||||
linkedActors.foreach(_.child.restart(fail.cause))
|
||||
else
|
||||
linkedActors.foreach(_.child.stop())
|
||||
}
|
||||
|
|
@ -189,7 +189,7 @@ case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]],
|
|||
linkedActors.find(_.child == fail.actor) match {
|
||||
case Some(stats) ⇒
|
||||
if (stats.requestRestartPermission(maxNrOfRetries, withinTimeRange))
|
||||
fail.actor.restart()
|
||||
fail.actor.restart(fail.cause)
|
||||
else
|
||||
fail.actor.stop() //TODO optimization to drop child here already?
|
||||
case None ⇒ EventHandler.warning(this, "Got Failure from non-child: " + fail)
|
||||
|
|
@ -306,60 +306,32 @@ private[akka] class ActorCell(
|
|||
case msg ⇒ msg.channel
|
||||
}
|
||||
|
||||
//This method is in charge of setting up the contextStack and create a new instance of the Actor
|
||||
protected def newActor(): Actor = {
|
||||
val stackBefore = contextStack.get
|
||||
contextStack.set(stackBefore.push(this))
|
||||
try {
|
||||
val instance = props.creator()
|
||||
|
||||
if (instance eq null)
|
||||
throw new ActorInitializationException("Actor instance passed to actorOf can't be 'null'")
|
||||
|
||||
instance
|
||||
} finally {
|
||||
val stackAfter = contextStack.get
|
||||
if (stackAfter.nonEmpty)
|
||||
contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context
|
||||
}
|
||||
}
|
||||
|
||||
def systemInvoke(envelope: SystemEnvelope) {
|
||||
def create(recreation: Boolean): Unit = try {
|
||||
|
||||
//This method is in charge of setting up the contextStack and create a new instance of the Actor
|
||||
def newActor(): Actor = {
|
||||
val stackBefore = contextStack.get
|
||||
contextStack.set(stackBefore.push(this))
|
||||
try {
|
||||
val instance = props.creator()
|
||||
|
||||
if (instance eq null)
|
||||
throw new ActorInitializationException("Actor instance passed to actorOf can't be 'null'")
|
||||
|
||||
instance
|
||||
} finally {
|
||||
val stackAfter = contextStack.get
|
||||
if (stackAfter.nonEmpty)
|
||||
contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context
|
||||
}
|
||||
}
|
||||
|
||||
actor match {
|
||||
case null if !recreation ⇒
|
||||
val created = newActor() //TODO !!!! Notify supervisor on failure to create!
|
||||
actor = created
|
||||
created.preStart()
|
||||
checkReceiveTimeout
|
||||
if (Actor.debugLifecycle) EventHandler.debug(created, "started")
|
||||
|
||||
case failedActor if recreation ⇒
|
||||
val reason = new Exception("CRASHED") //FIXME TODO stash away the exception that caused the failure and reuse that? <------- !!!!!!!!!! RED RED RED
|
||||
if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting")
|
||||
val freshActor = newActor()
|
||||
if (failedActor ne null) {
|
||||
val c = currentMessage //One read only plz
|
||||
try {
|
||||
failedActor.preRestart(reason, if (c ne null) Some(c.message) else None)
|
||||
} finally {
|
||||
clearActorContext()
|
||||
currentMessage = null
|
||||
actor = null
|
||||
}
|
||||
}
|
||||
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
|
||||
freshActor.postRestart(reason)
|
||||
if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted")
|
||||
|
||||
dispatcher.resume(this) //FIXME should this be moved down?
|
||||
|
||||
//FIXME TODO How should we handle restarting of children? <----- !!!!!!!!!!!!! RED RED RED
|
||||
props.faultHandler.handleSupervisorRestarted(self, _linkedActors)
|
||||
|
||||
case _ ⇒ EventHandler.warning(this, "Anomaly in Actor creation, current Actor is '" + actor + "' and recreate = '" + recreation + "', this shouldn't be possible")
|
||||
}
|
||||
def create(): Unit = try {
|
||||
val created = newActor() //TODO !!!! Notify supervisor on failure to create!
|
||||
actor = created
|
||||
created.preStart()
|
||||
checkReceiveTimeout
|
||||
if (Actor.debugLifecycle) EventHandler.debug(created, "started")
|
||||
} catch {
|
||||
case e ⇒ try {
|
||||
EventHandler.error(e, this, "error while creating actor")
|
||||
|
|
@ -367,7 +339,39 @@ private[akka] class ActorCell(
|
|||
dispatcher.suspend(this)
|
||||
envelope.channel.sendException(e)
|
||||
} finally {
|
||||
if (supervisor.isDefined) supervisor.get ! Failed(self, e) else throw e //FIXME TODO What should be done when there's a systemMessage failure in a non-supervised actor?
|
||||
if (supervisor.isDefined) supervisor.get ! Failed(self, e) else self.stop()
|
||||
}
|
||||
}
|
||||
|
||||
def recreate(cause: Throwable): Unit = try {
|
||||
val failedActor = actor
|
||||
if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting")
|
||||
val freshActor = newActor()
|
||||
if (failedActor ne null) {
|
||||
val c = currentMessage //One read only plz
|
||||
try {
|
||||
failedActor.preRestart(cause, if (c ne null) Some(c.message) else None)
|
||||
} finally {
|
||||
clearActorContext()
|
||||
currentMessage = null
|
||||
actor = null
|
||||
}
|
||||
}
|
||||
actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call
|
||||
freshActor.postRestart(cause)
|
||||
if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted")
|
||||
|
||||
dispatcher.resume(this) //FIXME should this be moved down?
|
||||
|
||||
props.faultHandler.handleSupervisorRestarted(cause, self, _linkedActors)
|
||||
} catch {
|
||||
case e ⇒ try {
|
||||
EventHandler.error(e, this, "error while creating actor")
|
||||
// prevent any further messages to be processed until the actor has been restarted
|
||||
dispatcher.suspend(this)
|
||||
envelope.channel.sendException(e)
|
||||
} finally {
|
||||
if (supervisor.isDefined) supervisor.get ! Failed(self, e) else self.stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -414,8 +418,8 @@ private[akka] class ActorCell(
|
|||
val isClosed = mailbox.isClosed //Fence plus volatile read
|
||||
if (!isClosed) {
|
||||
envelope.message match {
|
||||
case Create ⇒ create(recreation = false)
|
||||
case Recreate ⇒ create(recreation = true)
|
||||
case Create ⇒ create()
|
||||
case Recreate(cause) ⇒ recreate(cause)
|
||||
case Link(subject) ⇒
|
||||
akka.event.DumbMonitoring.link(self, subject)
|
||||
if (Actor.debugLifecycle) EventHandler.debug(actor, "now monitoring " + subject)
|
||||
|
|
@ -488,7 +492,7 @@ private[akka] class ActorCell(
|
|||
|
||||
def handleChildTerminated(child: ActorRef): Unit = _linkedActors = props.faultHandler.handleChildTerminated(child, _linkedActors)
|
||||
|
||||
def restart(): Unit = dispatcher.systemDispatch(SystemEnvelope(this, Recreate, NullChannel))
|
||||
def restart(cause: Throwable): Unit = dispatcher.systemDispatch(SystemEnvelope(this, Recreate(cause), NullChannel))
|
||||
|
||||
def checkReceiveTimeout() {
|
||||
cancelReceiveTimeout()
|
||||
|
|
|
|||
|
|
@ -144,7 +144,7 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
|
|||
timeout: Timeout,
|
||||
channel: UntypedChannel): Future[Any]
|
||||
|
||||
protected[akka] def restart(): Unit
|
||||
protected[akka] def restart(cause: Throwable): Unit
|
||||
|
||||
override def hashCode: Int = HashCode.hash(HashCode.SEED, address)
|
||||
|
||||
|
|
@ -264,7 +264,7 @@ class LocalActorRef private[akka] (
|
|||
|
||||
protected[akka] def handleFailure(fail: Failed): Unit = actorCell.handleFailure(fail)
|
||||
|
||||
protected[akka] def restart(): Unit = actorCell.restart()
|
||||
protected[akka] def restart(cause: Throwable): Unit = actorCell.restart(cause)
|
||||
|
||||
// ========= PRIVATE FUNCTIONS =========
|
||||
|
||||
|
|
@ -348,7 +348,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
|
||||
def unlink(actorRef: ActorRef): ActorRef = unsupported
|
||||
|
||||
protected[akka] def restart(): Unit = unsupported
|
||||
protected[akka] def restart(cause: Throwable): Unit = unsupported
|
||||
|
||||
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
|
||||
}
|
||||
|
|
@ -444,7 +444,7 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef {
|
|||
|
||||
def resume(): Unit = unsupported
|
||||
|
||||
protected[akka] def restart(): Unit = unsupported
|
||||
protected[akka] def restart(cause: Throwable): Unit = unsupported
|
||||
|
||||
private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ final case class Envelope(val receiver: ActorCell, val message: Any, val channel
|
|||
|
||||
sealed trait SystemMessage extends PossiblyHarmful
|
||||
case object Create extends SystemMessage
|
||||
case object Recreate extends SystemMessage
|
||||
case class Recreate(cause: Throwable) extends SystemMessage
|
||||
case object Suspend extends SystemMessage
|
||||
case object Resume extends SystemMessage
|
||||
case object Terminate extends SystemMessage
|
||||
|
|
|
|||
|
|
@ -300,7 +300,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall
|
|||
def unlink(actorRef: ActorRef): ActorRef = unsupported
|
||||
|
||||
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Timeout, channel: UntypedChannel) = unsupported
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported
|
||||
def restart(reason: Throwable): Unit = unsupported
|
||||
|
||||
private def unsupported = throw new UnsupportedOperationException("Not supported for %s" format classOf[AsyncCallbackAdapter].getName)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue