Renaming Death to Failure and separating Failure from the user-lever lifecycle monitoring message Terminated
This commit is contained in:
parent
03706baf2b
commit
fbf9700170
7 changed files with 48 additions and 67 deletions
|
|
@ -34,7 +34,7 @@ class SupervisorHierarchySpec extends JUnitSuite {
|
|||
|
||||
val workerOne, workerTwo, workerThree = actorOf(Props(new CountDownActor(countDown)).withSupervisor(manager))
|
||||
|
||||
manager ! Death(workerOne, new FireWorkerException("Fire the worker!"), true)
|
||||
workerOne ! Crash
|
||||
|
||||
// manager + all workers should be restarted by only killing a worker
|
||||
// manager doesn't trap exits, so boss will restart manager
|
||||
|
|
|
|||
|
|
@ -125,8 +125,7 @@ object ActorModelSpec {
|
|||
}
|
||||
|
||||
protected[akka] abstract override def dispatch(invocation: MessageInvocation) {
|
||||
if (!invocation.message.isInstanceOf[LifeCycleMessage])
|
||||
getStats(invocation.receiver.ref).msgsReceived.incrementAndGet()
|
||||
getStats(invocation.receiver.ref).msgsReceived.incrementAndGet()
|
||||
super.dispatch(invocation)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -32,10 +32,7 @@ import java.util.{ Collection ⇒ JCollection }
|
|||
*/
|
||||
sealed trait AutoReceivedMessage extends Serializable
|
||||
|
||||
/**
|
||||
* Life-cycle messages for the Actors
|
||||
*/
|
||||
sealed trait LifeCycleMessage extends Serializable { self: AutoReceivedMessage ⇒ }
|
||||
trait PossiblyHarmful
|
||||
|
||||
case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage {
|
||||
|
||||
|
|
@ -56,28 +53,31 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true)
|
|||
def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true)
|
||||
}
|
||||
|
||||
case class Death(deceased: ActorRef, cause: Throwable, recoverable: Boolean) extends AutoReceivedMessage with LifeCycleMessage
|
||||
case class Crash(reason: Throwable) extends AutoReceivedMessage
|
||||
case class Failed(actor: ActorRef, cause: Throwable, recoverable: Boolean, timesRestarted: Int, restartTimeWindowStartMs: Long) extends AutoReceivedMessage with PossiblyHarmful
|
||||
|
||||
case object RevertHotSwap extends AutoReceivedMessage
|
||||
case class Crash(reason: Throwable) extends AutoReceivedMessage with PossiblyHarmful
|
||||
|
||||
case class Link(child: ActorRef) extends AutoReceivedMessage
|
||||
case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful
|
||||
|
||||
case class Unlink(child: ActorRef) extends AutoReceivedMessage
|
||||
case class Link(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful
|
||||
|
||||
case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage
|
||||
case class Unlink(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful
|
||||
|
||||
case object PoisonPill extends AutoReceivedMessage
|
||||
case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful
|
||||
|
||||
case object Kill extends AutoReceivedMessage
|
||||
case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful
|
||||
|
||||
case object ReceiveTimeout
|
||||
case object Kill extends AutoReceivedMessage with PossiblyHarmful
|
||||
|
||||
case object ReceiveTimeout extends PossiblyHarmful
|
||||
|
||||
case class MaximumNumberOfRestartsWithinTimeRangeReached(
|
||||
@BeanProperty victim: ActorRef,
|
||||
@BeanProperty maxNrOfRetries: Option[Int],
|
||||
@BeanProperty withinTimeRange: Option[Int],
|
||||
@BeanProperty lastExceptionCausingRestart: Throwable)
|
||||
@BeanProperty lastExceptionCausingRestart: Throwable) //FIXME should be removed and replaced with Terminated
|
||||
|
||||
case class Terminated(@BeanProperty actor: ActorRef, @BeanProperty cause: Throwable)
|
||||
|
||||
// Exceptions for Actors
|
||||
class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) {
|
||||
|
|
@ -641,34 +641,22 @@ trait Actor {
|
|||
if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
|
||||
throw new InvalidMessageException("Message from [" + channel + "] to [" + self.toString + "] is null")
|
||||
|
||||
def autoReceiveMessage(msg: AutoReceivedMessage): Boolean = {
|
||||
def autoReceiveMessage(msg: AutoReceivedMessage) {
|
||||
if (debugAutoReceive) EventHandler.debug(this, "received AutoReceiveMessage " + msg)
|
||||
|
||||
/**
|
||||
* System priority messages that should be handled by the dispatcher
|
||||
*
|
||||
* Init
|
||||
* Death
|
||||
* Restart
|
||||
* Suspend
|
||||
* Resume
|
||||
* Terminate
|
||||
*/
|
||||
|
||||
msg match {
|
||||
case HotSwap(code, discardOld) ⇒ become(code(self), discardOld); false
|
||||
case RevertHotSwap ⇒ unbecome(); false
|
||||
case d: Death ⇒ context.handleDeath(d); false
|
||||
case Link(child) ⇒ self.link(child); false
|
||||
case Unlink(child) ⇒ self.unlink(child); false
|
||||
case UnlinkAndStop(child) ⇒ self.unlink(child); child.stop(); false
|
||||
case HotSwap(code, discardOld) ⇒ become(code(self), discardOld)
|
||||
case RevertHotSwap ⇒ unbecome()
|
||||
case f: Failed ⇒ context.handleFailure(f)
|
||||
case Link(child) ⇒ self.link(child)
|
||||
case Unlink(child) ⇒ self.unlink(child)
|
||||
case UnlinkAndStop(child) ⇒ self.unlink(child); child.stop()
|
||||
case Crash(reason) ⇒ throw reason
|
||||
case Kill ⇒ throw new ActorKilledException("Kill")
|
||||
case PoisonPill ⇒
|
||||
val ch = channel
|
||||
self.stop()
|
||||
ch.sendException(new ActorKilledException("PoisonPill"))
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ private[akka] trait ActorContext {
|
|||
|
||||
def dispatcher: MessageDispatcher
|
||||
|
||||
def handleDeath(death: Death)
|
||||
def handleFailure(fail: Failed)
|
||||
}
|
||||
|
||||
private[akka] object ActorCell {
|
||||
|
|
@ -74,28 +74,28 @@ private[akka] class ActorCell(
|
|||
@volatile
|
||||
var futureTimeout: Option[ScheduledFuture[AnyRef]] = None
|
||||
|
||||
@volatile
|
||||
@volatile //Should be a final field
|
||||
var _supervisor: Option[ActorRef] = None
|
||||
|
||||
@volatile
|
||||
@volatile //FIXME doesn't need to be volatile
|
||||
var maxNrOfRetriesCount: Int = 0
|
||||
|
||||
@volatile
|
||||
@volatile //FIXME doesn't need to be volatile
|
||||
var restartTimeWindowStartNanos: Long = 0L
|
||||
|
||||
@volatile
|
||||
lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef]
|
||||
|
||||
@volatile
|
||||
@volatile //FIXME doesn't need to be volatile
|
||||
var hotswap: Stack[PartialFunction[Any, Unit]] = _hotswap // TODO: currently settable from outside for compatibility
|
||||
|
||||
@volatile
|
||||
var receiveTimeout: Option[Long] = _receiveTimeout // TODO: currently settable from outside for compatibility
|
||||
|
||||
@volatile
|
||||
@volatile //FIXME volatile can be removed
|
||||
var currentMessage: MessageInvocation = null
|
||||
|
||||
val actor: AtomicReference[Actor] = new AtomicReference[Actor]()
|
||||
val actor: AtomicReference[Actor] = new AtomicReference[Actor]() //FIXME We can most probably make this just a regular reference to Actor
|
||||
|
||||
def ref: ActorRef with ScalaActorRef = self
|
||||
|
||||
|
|
@ -240,7 +240,7 @@ private[akka] class ActorCell(
|
|||
} catch {
|
||||
case e ⇒
|
||||
envelope.channel.sendException(e)
|
||||
if (supervisor.isDefined) supervisor.get ! Death(self, e, false) else throw e
|
||||
if (supervisor.isDefined) supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos) else throw e
|
||||
}
|
||||
|
||||
def suspend(): Unit = dispatcher suspend this
|
||||
|
|
@ -268,10 +268,11 @@ private[akka] class ActorCell(
|
|||
|
||||
} finally {
|
||||
try {
|
||||
if (supervisor.isDefined) supervisor.get ! Death(self, new ActorKilledException("Stopped"), false)
|
||||
if (supervisor.isDefined)
|
||||
supervisor.get ! Failed(self, new ActorKilledException("Stopped"), false, maxNrOfRetriesCount, restartTimeWindowStartNanos) //Death(self, new ActorKilledException("Stopped"), false)
|
||||
} catch {
|
||||
case e: ActorInitializationException ⇒
|
||||
// TODO: remove when ! cannot throw anymore
|
||||
// TODO: remove when ! cannot throw anymore
|
||||
}
|
||||
currentMessage = null
|
||||
clearActorContext()
|
||||
|
|
@ -319,7 +320,7 @@ private[akka] class ActorCell(
|
|||
|
||||
channel.sendException(e)
|
||||
|
||||
if (supervisor.isDefined) supervisor.get ! Death(self, e, true) else dispatcher.resume(this)
|
||||
if (supervisor.isDefined) supervisor.get ! Failed(self, e, true, maxNrOfRetriesCount, restartTimeWindowStartNanos) else dispatcher.resume(this)
|
||||
|
||||
if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected
|
||||
} finally {
|
||||
|
|
@ -340,23 +341,23 @@ private[akka] class ActorCell(
|
|||
}
|
||||
}
|
||||
|
||||
def handleDeath(death: Death): Unit = {
|
||||
def handleFailure(fail: Failed): Unit = {
|
||||
props.faultHandler match {
|
||||
case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒
|
||||
restartLinkedActors(death.cause, maxRetries, within)
|
||||
case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒
|
||||
restartLinkedActors(fail.cause, maxRetries, within)
|
||||
|
||||
case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒
|
||||
restartLinkedActors(death.cause, None, None)
|
||||
case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒
|
||||
restartLinkedActors(fail.cause, None, None)
|
||||
|
||||
case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒
|
||||
death.deceased.restart(death.cause, maxRetries, within)
|
||||
case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒
|
||||
fail.actor.restart(fail.cause, maxRetries, within)
|
||||
|
||||
case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒
|
||||
death.deceased.stop()
|
||||
self ! MaximumNumberOfRestartsWithinTimeRangeReached(death.deceased, None, None, death.cause)
|
||||
case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒
|
||||
fail.actor.stop()
|
||||
self ! MaximumNumberOfRestartsWithinTimeRangeReached(fail.actor, None, None, fail.cause) //FIXME this should be removed, you should link to an actor to get Terminated messages
|
||||
|
||||
case _ ⇒
|
||||
if (_supervisor.isDefined) throw death.cause else death.deceased.stop() //Escalate problem if not handled here
|
||||
if (_supervisor.isDefined) throw fail.cause else fail.actor.stop() //Escalate problem if not handled here
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -113,12 +113,6 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha
|
|||
*/
|
||||
def resume(): Unit
|
||||
|
||||
/**
|
||||
* Shuts down the actor its dispatcher and message queue.
|
||||
* Alias for 'stop'.
|
||||
*/
|
||||
def exit(): Unit = stop()
|
||||
|
||||
/**
|
||||
* Shuts down the actor its dispatcher and message queue.
|
||||
*/
|
||||
|
|
@ -285,7 +279,7 @@ class LocalActorRef private[akka] (
|
|||
actorCell.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel)
|
||||
}
|
||||
|
||||
protected[akka] def handleDeath(death: Death): Unit = actorCell.handleDeath(death)
|
||||
protected[akka] def handleFailure(fail: Failed): Unit = actorCell.handleFailure(fail)
|
||||
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit =
|
||||
actorCell.restart(reason, maxNrOfRetries, withinTimeRange)
|
||||
|
|
|
|||
|
|
@ -274,7 +274,6 @@ object EventHandler extends ListenerManagement {
|
|||
def instanceName(instance: AnyRef): String = instance match {
|
||||
case null ⇒ "NULL"
|
||||
case a: ActorRef ⇒ a.address
|
||||
case null ⇒ "null instance"
|
||||
case _ ⇒ instance.getClass.getSimpleName
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒
|
|||
tryReply(Stats(_delegates length))
|
||||
case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _) ⇒
|
||||
_delegates = _delegates filterNot { _.uuid == victim.uuid }
|
||||
case Death(victim, _, _) ⇒
|
||||
case Terminated(victim, _) ⇒
|
||||
_delegates = _delegates filterNot { _.uuid == victim.uuid }
|
||||
case msg ⇒
|
||||
resizeIfAppropriate()
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue