From 785e2a26362ea81f2a6c7e39c1c81d3aaf5c5e65 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 4 Oct 2011 13:23:24 +0200 Subject: [PATCH] Moving the cause into Recreate --- .../scala/akka/actor/SupervisorSpec.scala | 2 +- .../src/main/scala/akka/actor/ActorCell.scala | 124 +++++++++--------- .../src/main/scala/akka/actor/ActorRef.scala | 8 +- .../scala/akka/dispatch/MessageHandling.scala | 2 +- .../akka/camel/component/ActorComponent.scala | 2 +- 5 files changed, 71 insertions(+), 67 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index fa518efe1e..fc29ad35c7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -30,7 +30,7 @@ object SupervisorSpec { val PingMessage = "ping" val PongMessage = "pong" - val ExceptionMessage = "CRASHED" //"Expected exception; to test fault-tolerance" + val ExceptionMessage = "Expected exception; to test fault-tolerance" var messageLog = new LinkedBlockingQueue[String] diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 83d586b6af..f864c571cb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -98,9 +98,9 @@ sealed abstract class FaultHandlingStrategy { linkedActors.foreach(_.child.suspend()) } - def handleSupervisorRestarted(supervisor: ActorRef, linkedActors: List[ChildRestartStats]): Unit = { + def handleSupervisorRestarted(cause: Throwable, supervisor: ActorRef, linkedActors: List[ChildRestartStats]): Unit = { if (linkedActors.nonEmpty) - linkedActors.foreach(_.child.restart()) + linkedActors.foreach(_.child.restart(cause)) } /** @@ -149,7 +149,7 @@ case class AllForOneStrategy(trapExit: List[Class[_ <: Throwable]], def processFailure(fail: Failed, linkedActors: List[ChildRestartStats]): Unit = { if (linkedActors.nonEmpty) { if (linkedActors.forall(_.requestRestartPermission(maxNrOfRetries, withinTimeRange))) - linkedActors.foreach(_.child.restart()) + linkedActors.foreach(_.child.restart(fail.cause)) else linkedActors.foreach(_.child.stop()) } @@ -189,7 +189,7 @@ case class OneForOneStrategy(trapExit: List[Class[_ <: Throwable]], linkedActors.find(_.child == fail.actor) match { case Some(stats) ⇒ if (stats.requestRestartPermission(maxNrOfRetries, withinTimeRange)) - fail.actor.restart() + fail.actor.restart(fail.cause) else fail.actor.stop() //TODO optimization to drop child here already? case None ⇒ EventHandler.warning(this, "Got Failure from non-child: " + fail) @@ -306,60 +306,32 @@ private[akka] class ActorCell( case msg ⇒ msg.channel } + //This method is in charge of setting up the contextStack and create a new instance of the Actor + protected def newActor(): Actor = { + val stackBefore = contextStack.get + contextStack.set(stackBefore.push(this)) + try { + val instance = props.creator() + + if (instance eq null) + throw new ActorInitializationException("Actor instance passed to actorOf can't be 'null'") + + instance + } finally { + val stackAfter = contextStack.get + if (stackAfter.nonEmpty) + contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context + } + } + def systemInvoke(envelope: SystemEnvelope) { - def create(recreation: Boolean): Unit = try { - //This method is in charge of setting up the contextStack and create a new instance of the Actor - def newActor(): Actor = { - val stackBefore = contextStack.get - contextStack.set(stackBefore.push(this)) - try { - val instance = props.creator() - - if (instance eq null) - throw new ActorInitializationException("Actor instance passed to actorOf can't be 'null'") - - instance - } finally { - val stackAfter = contextStack.get - if (stackAfter.nonEmpty) - contextStack.set(if (stackAfter.head eq null) stackAfter.pop.pop else stackAfter.pop) // pop null marker plus our context - } - } - - actor match { - case null if !recreation ⇒ - val created = newActor() //TODO !!!! Notify supervisor on failure to create! - actor = created - created.preStart() - checkReceiveTimeout - if (Actor.debugLifecycle) EventHandler.debug(created, "started") - - case failedActor if recreation ⇒ - val reason = new Exception("CRASHED") //FIXME TODO stash away the exception that caused the failure and reuse that? <------- !!!!!!!!!! RED RED RED - if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") - val freshActor = newActor() - if (failedActor ne null) { - val c = currentMessage //One read only plz - try { - failedActor.preRestart(reason, if (c ne null) Some(c.message) else None) - } finally { - clearActorContext() - currentMessage = null - actor = null - } - } - actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call - freshActor.postRestart(reason) - if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted") - - dispatcher.resume(this) //FIXME should this be moved down? - - //FIXME TODO How should we handle restarting of children? <----- !!!!!!!!!!!!! RED RED RED - props.faultHandler.handleSupervisorRestarted(self, _linkedActors) - - case _ ⇒ EventHandler.warning(this, "Anomaly in Actor creation, current Actor is '" + actor + "' and recreate = '" + recreation + "', this shouldn't be possible") - } + def create(): Unit = try { + val created = newActor() //TODO !!!! Notify supervisor on failure to create! + actor = created + created.preStart() + checkReceiveTimeout + if (Actor.debugLifecycle) EventHandler.debug(created, "started") } catch { case e ⇒ try { EventHandler.error(e, this, "error while creating actor") @@ -367,7 +339,39 @@ private[akka] class ActorCell( dispatcher.suspend(this) envelope.channel.sendException(e) } finally { - if (supervisor.isDefined) supervisor.get ! Failed(self, e) else throw e //FIXME TODO What should be done when there's a systemMessage failure in a non-supervised actor? + if (supervisor.isDefined) supervisor.get ! Failed(self, e) else self.stop() + } + } + + def recreate(cause: Throwable): Unit = try { + val failedActor = actor + if (Actor.debugLifecycle) EventHandler.debug(failedActor, "restarting") + val freshActor = newActor() + if (failedActor ne null) { + val c = currentMessage //One read only plz + try { + failedActor.preRestart(cause, if (c ne null) Some(c.message) else None) + } finally { + clearActorContext() + currentMessage = null + actor = null + } + } + actor = freshActor // assign it here so if preStart fails, we can null out the sef-refs next call + freshActor.postRestart(cause) + if (Actor.debugLifecycle) EventHandler.debug(freshActor, "restarted") + + dispatcher.resume(this) //FIXME should this be moved down? + + props.faultHandler.handleSupervisorRestarted(cause, self, _linkedActors) + } catch { + case e ⇒ try { + EventHandler.error(e, this, "error while creating actor") + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + envelope.channel.sendException(e) + } finally { + if (supervisor.isDefined) supervisor.get ! Failed(self, e) else self.stop() } } @@ -414,8 +418,8 @@ private[akka] class ActorCell( val isClosed = mailbox.isClosed //Fence plus volatile read if (!isClosed) { envelope.message match { - case Create ⇒ create(recreation = false) - case Recreate ⇒ create(recreation = true) + case Create ⇒ create() + case Recreate(cause) ⇒ recreate(cause) case Link(subject) ⇒ akka.event.DumbMonitoring.link(self, subject) if (Actor.debugLifecycle) EventHandler.debug(actor, "now monitoring " + subject) @@ -488,7 +492,7 @@ private[akka] class ActorCell( def handleChildTerminated(child: ActorRef): Unit = _linkedActors = props.faultHandler.handleChildTerminated(child, _linkedActors) - def restart(): Unit = dispatcher.systemDispatch(SystemEnvelope(this, Recreate, NullChannel)) + def restart(cause: Throwable): Unit = dispatcher.systemDispatch(SystemEnvelope(this, Recreate(cause), NullChannel)) def checkReceiveTimeout() { cancelReceiveTimeout() diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 3622500f8d..4dad678d02 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -144,7 +144,7 @@ abstract class ActorRef extends ActorRefShared with UntypedChannel with ReplyCha timeout: Timeout, channel: UntypedChannel): Future[Any] - protected[akka] def restart(): Unit + protected[akka] def restart(cause: Throwable): Unit override def hashCode: Int = HashCode.hash(HashCode.SEED, address) @@ -264,7 +264,7 @@ class LocalActorRef private[akka] ( protected[akka] def handleFailure(fail: Failed): Unit = actorCell.handleFailure(fail) - protected[akka] def restart(): Unit = actorCell.restart() + protected[akka] def restart(cause: Throwable): Unit = actorCell.restart(cause) // ========= PRIVATE FUNCTIONS ========= @@ -348,7 +348,7 @@ private[akka] case class RemoteActorRef private[akka] ( def unlink(actorRef: ActorRef): ActorRef = unsupported - protected[akka] def restart(): Unit = unsupported + protected[akka] def restart(cause: Throwable): Unit = unsupported private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") } @@ -444,7 +444,7 @@ trait UnsupportedActorRef extends ActorRef with ScalaActorRef { def resume(): Unit = unsupported - protected[akka] def restart(): Unit = unsupported + protected[akka] def restart(cause: Throwable): Unit = unsupported private def unsupported = throw new UnsupportedOperationException("Not supported for %s".format(getClass.getName)) } diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index abdc5dd791..41dd64847b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -26,7 +26,7 @@ final case class Envelope(val receiver: ActorCell, val message: Any, val channel sealed trait SystemMessage extends PossiblyHarmful case object Create extends SystemMessage -case object Recreate extends SystemMessage +case class Recreate(cause: Throwable) extends SystemMessage case object Suspend extends SystemMessage case object Resume extends SystemMessage case object Terminate extends SystemMessage diff --git a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala index a87bd5fb76..b4d057e981 100644 --- a/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala +++ b/akka-camel/src/main/scala/akka/camel/component/ActorComponent.scala @@ -300,7 +300,7 @@ private[akka] class AsyncCallbackAdapter(exchange: Exchange, callback: AsyncCall def unlink(actorRef: ActorRef): ActorRef = unsupported protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(message: Any, timeout: Timeout, channel: UntypedChannel) = unsupported - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = unsupported + def restart(reason: Throwable): Unit = unsupported private def unsupported = throw new UnsupportedOperationException("Not supported for %s" format classOf[AsyncCallbackAdapter].getName) }