From fbf9700170b7cd4e5282177eb4e9009d3886e9e3 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 21 Sep 2011 11:26:44 +0200 Subject: [PATCH] Renaming Death to Failure and separating Failure from the user-lever lifecycle monitoring message Terminated --- .../akka/actor/SupervisorHierarchySpec.scala | 2 +- .../akka/actor/dispatch/ActorModelSpec.scala | 3 +- .../src/main/scala/akka/actor/Actor.scala | 54 ++++++++----------- .../src/main/scala/akka/actor/ActorCell.scala | 45 ++++++++-------- .../src/main/scala/akka/actor/ActorRef.scala | 8 +-- .../main/scala/akka/event/EventHandler.scala | 1 - .../src/main/scala/akka/routing/Pool.scala | 2 +- 7 files changed, 48 insertions(+), 67 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 3d276d400e..337bc81ba8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -34,7 +34,7 @@ class SupervisorHierarchySpec extends JUnitSuite { val workerOne, workerTwo, workerThree = actorOf(Props(new CountDownActor(countDown)).withSupervisor(manager)) - manager ! Death(workerOne, new FireWorkerException("Fire the worker!"), true) + workerOne ! Crash // manager + all workers should be restarted by only killing a worker // manager doesn't trap exits, so boss will restart manager diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index ec458c4eed..0216f9825e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -125,8 +125,7 @@ object ActorModelSpec { } protected[akka] abstract override def dispatch(invocation: MessageInvocation) { - if (!invocation.message.isInstanceOf[LifeCycleMessage]) - getStats(invocation.receiver.ref).msgsReceived.incrementAndGet() + getStats(invocation.receiver.ref).msgsReceived.incrementAndGet() super.dispatch(invocation) } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 634c9126d8..2e0b92b2b3 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -32,10 +32,7 @@ import java.util.{ Collection ⇒ JCollection } */ sealed trait AutoReceivedMessage extends Serializable -/** - * Life-cycle messages for the Actors - */ -sealed trait LifeCycleMessage extends Serializable { self: AutoReceivedMessage ⇒ } +trait PossiblyHarmful case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true) extends AutoReceivedMessage { @@ -56,28 +53,31 @@ case class HotSwap(code: ActorRef ⇒ Actor.Receive, discardOld: Boolean = true) def this(code: akka.japi.Function[ActorRef, Procedure[Any]]) = this(code, true) } -case class Death(deceased: ActorRef, cause: Throwable, recoverable: Boolean) extends AutoReceivedMessage with LifeCycleMessage -case class Crash(reason: Throwable) extends AutoReceivedMessage +case class Failed(actor: ActorRef, cause: Throwable, recoverable: Boolean, timesRestarted: Int, restartTimeWindowStartMs: Long) extends AutoReceivedMessage with PossiblyHarmful -case object RevertHotSwap extends AutoReceivedMessage +case class Crash(reason: Throwable) extends AutoReceivedMessage with PossiblyHarmful -case class Link(child: ActorRef) extends AutoReceivedMessage +case object RevertHotSwap extends AutoReceivedMessage with PossiblyHarmful -case class Unlink(child: ActorRef) extends AutoReceivedMessage +case class Link(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful -case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage +case class Unlink(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful -case object PoisonPill extends AutoReceivedMessage +case class UnlinkAndStop(child: ActorRef) extends AutoReceivedMessage with PossiblyHarmful -case object Kill extends AutoReceivedMessage +case object PoisonPill extends AutoReceivedMessage with PossiblyHarmful -case object ReceiveTimeout +case object Kill extends AutoReceivedMessage with PossiblyHarmful + +case object ReceiveTimeout extends PossiblyHarmful case class MaximumNumberOfRestartsWithinTimeRangeReached( @BeanProperty victim: ActorRef, @BeanProperty maxNrOfRetries: Option[Int], @BeanProperty withinTimeRange: Option[Int], - @BeanProperty lastExceptionCausingRestart: Throwable) + @BeanProperty lastExceptionCausingRestart: Throwable) //FIXME should be removed and replaced with Terminated + +case class Terminated(@BeanProperty actor: ActorRef, @BeanProperty cause: Throwable) // Exceptions for Actors class ActorStartException private[akka] (message: String, cause: Throwable = null) extends AkkaException(message, cause) { @@ -641,34 +641,22 @@ trait Actor { if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null)) throw new InvalidMessageException("Message from [" + channel + "] to [" + self.toString + "] is null") - def autoReceiveMessage(msg: AutoReceivedMessage): Boolean = { + def autoReceiveMessage(msg: AutoReceivedMessage) { if (debugAutoReceive) EventHandler.debug(this, "received AutoReceiveMessage " + msg) - /** - * System priority messages that should be handled by the dispatcher - * - * Init - * Death - * Restart - * Suspend - * Resume - * Terminate - */ - msg match { - case HotSwap(code, discardOld) ⇒ become(code(self), discardOld); false - case RevertHotSwap ⇒ unbecome(); false - case d: Death ⇒ context.handleDeath(d); false - case Link(child) ⇒ self.link(child); false - case Unlink(child) ⇒ self.unlink(child); false - case UnlinkAndStop(child) ⇒ self.unlink(child); child.stop(); false + case HotSwap(code, discardOld) ⇒ become(code(self), discardOld) + case RevertHotSwap ⇒ unbecome() + case f: Failed ⇒ context.handleFailure(f) + case Link(child) ⇒ self.link(child) + case Unlink(child) ⇒ self.unlink(child) + case UnlinkAndStop(child) ⇒ self.unlink(child); child.stop() case Crash(reason) ⇒ throw reason case Kill ⇒ throw new ActorKilledException("Kill") case PoisonPill ⇒ val ch = channel self.stop() ch.sendException(new ActorKilledException("PoisonPill")) - false } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 2721e5c154..2cd84f85f4 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -45,7 +45,7 @@ private[akka] trait ActorContext { def dispatcher: MessageDispatcher - def handleDeath(death: Death) + def handleFailure(fail: Failed) } private[akka] object ActorCell { @@ -74,28 +74,28 @@ private[akka] class ActorCell( @volatile var futureTimeout: Option[ScheduledFuture[AnyRef]] = None - @volatile + @volatile //Should be a final field var _supervisor: Option[ActorRef] = None - @volatile + @volatile //FIXME doesn't need to be volatile var maxNrOfRetriesCount: Int = 0 - @volatile + @volatile //FIXME doesn't need to be volatile var restartTimeWindowStartNanos: Long = 0L @volatile lazy val _linkedActors = new ConcurrentHashMap[Uuid, ActorRef] - @volatile + @volatile //FIXME doesn't need to be volatile var hotswap: Stack[PartialFunction[Any, Unit]] = _hotswap // TODO: currently settable from outside for compatibility @volatile var receiveTimeout: Option[Long] = _receiveTimeout // TODO: currently settable from outside for compatibility - @volatile + @volatile //FIXME volatile can be removed var currentMessage: MessageInvocation = null - val actor: AtomicReference[Actor] = new AtomicReference[Actor]() + val actor: AtomicReference[Actor] = new AtomicReference[Actor]() //FIXME We can most probably make this just a regular reference to Actor def ref: ActorRef with ScalaActorRef = self @@ -240,7 +240,7 @@ private[akka] class ActorCell( } catch { case e ⇒ envelope.channel.sendException(e) - if (supervisor.isDefined) supervisor.get ! Death(self, e, false) else throw e + if (supervisor.isDefined) supervisor.get ! Failed(self, e, false, maxNrOfRetriesCount, restartTimeWindowStartNanos) else throw e } def suspend(): Unit = dispatcher suspend this @@ -268,10 +268,11 @@ private[akka] class ActorCell( } finally { try { - if (supervisor.isDefined) supervisor.get ! Death(self, new ActorKilledException("Stopped"), false) + if (supervisor.isDefined) + supervisor.get ! Failed(self, new ActorKilledException("Stopped"), false, maxNrOfRetriesCount, restartTimeWindowStartNanos) //Death(self, new ActorKilledException("Stopped"), false) } catch { case e: ActorInitializationException ⇒ - // TODO: remove when ! cannot throw anymore + // TODO: remove when ! cannot throw anymore } currentMessage = null clearActorContext() @@ -319,7 +320,7 @@ private[akka] class ActorCell( channel.sendException(e) - if (supervisor.isDefined) supervisor.get ! Death(self, e, true) else dispatcher.resume(this) + if (supervisor.isDefined) supervisor.get ! Failed(self, e, true, maxNrOfRetriesCount, restartTimeWindowStartNanos) else dispatcher.resume(this) if (e.isInstanceOf[InterruptedException]) throw e //Re-throw InterruptedExceptions as expected } finally { @@ -340,23 +341,23 @@ private[akka] class ActorCell( } } - def handleDeath(death: Death): Unit = { + def handleFailure(fail: Failed): Unit = { props.faultHandler match { - case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ - restartLinkedActors(death.cause, maxRetries, within) + case AllForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒ + restartLinkedActors(fail.cause, maxRetries, within) - case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ - restartLinkedActors(death.cause, None, None) + case AllForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒ + restartLinkedActors(fail.cause, None, None) - case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ - death.deceased.restart(death.cause, maxRetries, within) + case OneForOnePermanentStrategy(trapExit, maxRetries, within) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒ + fail.actor.restart(fail.cause, maxRetries, within) - case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(death.cause.getClass)) ⇒ - death.deceased.stop() - self ! MaximumNumberOfRestartsWithinTimeRangeReached(death.deceased, None, None, death.cause) + case OneForOneTemporaryStrategy(trapExit) if trapExit.exists(_.isAssignableFrom(fail.cause.getClass)) ⇒ + fail.actor.stop() + self ! MaximumNumberOfRestartsWithinTimeRangeReached(fail.actor, None, None, fail.cause) //FIXME this should be removed, you should link to an actor to get Terminated messages case _ ⇒ - if (_supervisor.isDefined) throw death.cause else death.deceased.stop() //Escalate problem if not handled here + if (_supervisor.isDefined) throw fail.cause else fail.actor.stop() //Escalate problem if not handled here } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index a5e761d86f..1c8ece435f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -113,12 +113,6 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha */ def resume(): Unit - /** - * Shuts down the actor its dispatcher and message queue. - * Alias for 'stop'. - */ - def exit(): Unit = stop() - /** * Shuts down the actor its dispatcher and message queue. */ @@ -285,7 +279,7 @@ class LocalActorRef private[akka] ( actorCell.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, channel) } - protected[akka] def handleDeath(death: Death): Unit = actorCell.handleDeath(death) + protected[akka] def handleFailure(fail: Failed): Unit = actorCell.handleFailure(fail) protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = actorCell.restart(reason, maxNrOfRetries, withinTimeRange) diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 66d858f015..cf49fee14a 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -274,7 +274,6 @@ object EventHandler extends ListenerManagement { def instanceName(instance: AnyRef): String = instance match { case null ⇒ "NULL" case a: ActorRef ⇒ a.address - case null ⇒ "null instance" case _ ⇒ instance.getClass.getSimpleName } } diff --git a/akka-actor/src/main/scala/akka/routing/Pool.scala b/akka-actor/src/main/scala/akka/routing/Pool.scala index 9960210f48..ebfe64e946 100644 --- a/akka-actor/src/main/scala/akka/routing/Pool.scala +++ b/akka-actor/src/main/scala/akka/routing/Pool.scala @@ -101,7 +101,7 @@ trait DefaultActorPool extends ActorPool { this: Actor ⇒ tryReply(Stats(_delegates length)) case MaximumNumberOfRestartsWithinTimeRangeReached(victim, _, _, _) ⇒ _delegates = _delegates filterNot { _.uuid == victim.uuid } - case Death(victim, _, _) ⇒ + case Terminated(victim, _) ⇒ _delegates = _delegates filterNot { _.uuid == victim.uuid } case msg ⇒ resizeIfAppropriate()