diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index a71679df60..10a8b9c935 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -999,51 +999,69 @@ class LocalActorRef private[akka] ( } } - protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { - if (faultHandler.trapExit.exists(_.isAssignableFrom(reason.getClass))) { - faultHandler match { - case AllForOneStrategy(_,maxNrOfRetries, withinTimeRange) => - restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) + protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) { + faultHandler match { + case AllForOneStrategy(trapExit,maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) => + restartLinkedActors(reason, maxRetries, within) - case OneForOneStrategy(_,maxNrOfRetries, withinTimeRange) => - dead.restart(reason, maxNrOfRetries, withinTimeRange) + case OneForOneStrategy(trapExit,maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) => + dead.restart(reason, maxRetries, within) - case NoFaultHandlingStrategy => - notifySupervisorWithMessage(Exit(this, reason)) //This shouldn't happen - } - } else { - notifySupervisorWithMessage(Exit(this, reason)) // if 'trapExit' isn't triggered then pass the Exit on + case _ => + notifySupervisorWithMessage(Exit(this, reason)) } } - protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { - val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal - false - } else if (withinTimeRange.isEmpty) { // restrict number of restarts - maxNrOfRetriesCount += 1 //Increment number of retries - maxNrOfRetriesCount > maxNrOfRetries.get - } else { // cannot restart more than N within M timerange - maxNrOfRetriesCount += 1 //Increment number of retries - val windowStart = restartsWithinTimeRangeTimestamp - val now = System.currentTimeMillis - val retries = maxNrOfRetriesCount - //We are within the time window if it isn't the first restart, or if the window hasn't closed - val insideWindow = if (windowStart == 0) false - else (now - windowStart) <= withinTimeRange.get + private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = { + val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal + false + } else if (withinTimeRange.isEmpty) { // restrict number of restarts + maxNrOfRetriesCount += 1 //Increment number of retries + maxNrOfRetriesCount > maxNrOfRetries.get + } else { // cannot restart more than N within M timerange + maxNrOfRetriesCount += 1 //Increment number of retries + val windowStart = restartsWithinTimeRangeTimestamp + val now = System.currentTimeMillis + val retries = maxNrOfRetriesCount + //We are within the time window if it isn't the first restart, or if the window hasn't closed + val insideWindow = if (windowStart == 0) false + else (now - windowStart) <= withinTimeRange.get - //The actor is dead if it dies X times within the window of restart - val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1) + //The actor is dead if it dies X times within the window of restart + val unrestartable = insideWindow && retries > maxNrOfRetries.getOrElse(1) - if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window - restartsWithinTimeRangeTimestamp = now + if (windowStart == 0 || !insideWindow) //(Re-)set the start of the window + restartsWithinTimeRangeTimestamp = now - if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired - maxNrOfRetriesCount = 1 + if (windowStart != 0 && !insideWindow) //Reset number of restarts if window has expired + maxNrOfRetriesCount = 1 - unrestartable - } + unrestartable + } - if (isUnrestartable) { + denied == false //If we weren't denied, we have a go + } + + protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { + + def performRestart { + Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) + val failedActor = actorInstance.get + Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) + failedActor.preRestart(reason) + val freshActor = newActor + setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor + actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call + freshActor.preStart + failedActor match { + case p: Proxyable => p.swapProxiedActor(freshActor) + case _ => + } + Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) + freshActor.postRestart(reason) + } + + def tooManyRestarts { Actor.log.warning( "Maximum number of restarts [%s] within time range [%s] reached." + "\n\tWill *not* restart actor [%s] anymore." + @@ -1060,30 +1078,48 @@ class LocalActorRef private[akka] ( } stop - } else { - guard.withGuard { - _status = ActorRefInternals.BEING_RESTARTED - lifeCycle match { - case Temporary => shutDownTemporaryActor(this) - case _ => - val failedActor = actorInstance.get - - // either permanent or none where default is permanent - Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) - Actor.log.debug("Restarting linked actors for actor [%s].", id) - restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) - - Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) - if (isProxyableDispatcher(failedActor)) - restartProxyableDispatcher(failedActor, reason) - else - restartActor(failedActor, reason) - - _status = ActorRefInternals.RUNNING - dispatcher.resume(this) - } - } } + + @tailrec def attemptRestart() { + val success = if (requestRestartPermission(maxNrOfRetries,withinTimeRange)) { + guard.withGuard[Boolean] { + _status = ActorRefInternals.BEING_RESTARTED + + lifeCycle match { + case Temporary => + shutDownTemporaryActor(this) + true + case _ => // either permanent or none where default is permanent + val success = try { + performRestart + true + } catch { + case e => false //An error or exception here should trigger a retry + } + + Actor.log.debug("Restart: %s for [%s].", success, id) + + if (success) { + _status = ActorRefInternals.RUNNING + dispatcher.resume(this) + restartLinkedActors(reason,maxNrOfRetries,withinTimeRange) + } + + success + } + } + } else { + tooManyRestarts + true //Done + } + + if (!success) + attemptRestart + else + () //Yay! + } + + attemptRestart() //Tailrecursion } protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { @@ -1109,25 +1145,6 @@ class LocalActorRef private[akka] ( // ========= PRIVATE FUNCTIONS ========= - private def isProxyableDispatcher(a: Actor): Boolean = a.isInstanceOf[Proxyable] - - private def restartProxyableDispatcher(failedActor: Actor, reason: Throwable) = { - failedActor.preRestart(reason) - failedActor.postRestart(reason) - } - - private def restartActor(failedActor: Actor, reason: Throwable) = { - failedActor.preRestart(reason) - setActorSelfFields(failedActor,null) - val freshActor = newActor - freshActor.preStart - actorInstance.set(freshActor) - if (failedActor.isInstanceOf[Proxyable]) - failedActor.asInstanceOf[Proxyable].swapProxiedActor(freshActor) - Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) - freshActor.postRestart(reason) - } - private[this] def newActor: Actor = { Actor.actorRefInCreation.withValue(Some(this)) { val actor = actorFactory() @@ -1196,7 +1213,7 @@ class LocalActorRef private[akka] ( } } - private def shutDownTemporaryActor(temporaryActor: ActorRef) = { + private def shutDownTemporaryActor(temporaryActor: ActorRef) { Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", temporaryActor.id) temporaryActor.stop linkedActors.remove(temporaryActor.uuid) // remove the temporary actor @@ -1208,6 +1225,8 @@ class LocalActorRef private[akka] ( temporaryActor.id) notifySupervisorWithMessage(UnlinkAndStop(this)) } + + true } private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = { diff --git a/akka-actor/src/main/scala/actor/Supervisor.scala b/akka-actor/src/main/scala/actor/Supervisor.scala index 1f30f1a9c8..daa0bac6c9 100644 --- a/akka-actor/src/main/scala/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/actor/Supervisor.scala @@ -141,9 +141,9 @@ sealed class Supervisor(handler: FaultHandlingStrategy) { _childActors.put(className, actorRef :: currentActors) actorRef.lifeCycle = lifeCycle supervisor.link(actorRef) - remoteAddress.foreach { address => - RemoteServerModule.registerActor( - new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef) + if (remoteAddress.isDefined) { + val address = remoteAddress.get + RemoteServerModule.registerActor(new InetSocketAddress(address.hostname, address.port), actorRef) } case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val childSupervisor = Supervisor(supervisorConfig) diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala index 9f9c644dd3..51c895728e 100644 --- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala @@ -114,7 +114,7 @@ object ReflectiveAccess extends Logging { val PORT = Config.config.getInt("akka.remote.server.port", 2552) type RemoteServerObject = { - def registerActor(address: InetSocketAddress, uuid: Uuid, actor: ActorRef): Unit + def registerActor(address: InetSocketAddress, actor: ActorRef): Unit def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef): Unit } @@ -128,9 +128,9 @@ object ReflectiveAccess extends Logging { val remoteNodeObjectInstance: Option[RemoteNodeObject] = getObjectFor("akka.remote.RemoteNode$") - def registerActor(address: InetSocketAddress, uuid: Uuid, actorRef: ActorRef) = { + def registerActor(address: InetSocketAddress, actorRef: ActorRef) = { ensureRemotingEnabled - remoteServerObjectInstance.get.registerActor(address, uuid, actorRef) + remoteServerObjectInstance.get.registerActor(address, actorRef) } def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = { diff --git a/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala b/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala index d01c064b3b..de06871323 100644 --- a/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala +++ b/akka-actor/src/test/scala/actor/supervisor/SupervisorSpec.scala @@ -10,7 +10,8 @@ import Actor._ import org.scalatest.junit.JUnitSuite import org.junit.Test -import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent. {CountDownLatch, TimeUnit, LinkedBlockingQueue} object SupervisorSpec { var messageLog = new LinkedBlockingQueue[String] @@ -493,6 +494,40 @@ class SupervisorSpec extends JUnitSuite { } } + @Test def shouldAttemptRestartWhenExceptionDuringRestart { + val inits = new AtomicInteger(0) + val dyingActor = actorOf(new Actor { + self.lifeCycle = Permanent + log.debug("Creating dying actor, attempt: " + inits.incrementAndGet) + + if (!(inits.get % 2 != 0)) + throw new IllegalStateException("Don't wanna!") + + + def receive = { + case Ping => self.reply_?("pong") + case Die => throw new Exception("expected") + } + }) + + val supervisor = + Supervisor( + SupervisorConfig( + OneForOneStrategy(classOf[Exception] :: Nil,3,10000), + Supervise(dyingActor,Permanent) :: Nil)) + + intercept[Exception] { + dyingActor !! (Die, 5000) + } + + expect("pong") { + (dyingActor !! (Ping, 5000)).getOrElse("nil") + } + + expect(3) { inits.get } + supervisor.shutdown + } + // ============================================= // Create some supervisors with different configurations diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index 3c9e990ed2..bc50ebb781 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -139,10 +139,24 @@ object RemoteServer { private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard { remoteServers.remove(Address(hostname, port)) } + + /** + * Used in REflectiveAccess + */ + private[akka] def registerActor(address: InetSocketAddress, actorRef: ActorRef) { + serverFor(address) foreach { _.register(actorRef) } + } + + /** + * Used in Reflective + */ + private[akka] def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) { + serverFor(address) foreach { _.registerTypedActor(implementationClassName,proxy)} + } } /** - * Life-cycle events for RemoteServer. + * Life-cycle events for RemoteServer. */ sealed trait RemoteServerLifeCycleEvent case class RemoteServerStarted( diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 46ac719186..98d9bcdea3 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -919,9 +919,11 @@ private[akka] object AspectInitRegistry extends ListenerManagement { * Unregisters initialization and stops its ActorRef. */ def unregister(proxy: AnyRef): AspectInit = { - val init = initializations.remove(proxy) - notifyListeners(AspectInitUnregistered(proxy, init)) - init.actorRef.stop + val init = if (proxy ne null) initializations.remove(proxy) else null + if (init ne null) { + notifyListeners(AspectInitUnregistered(proxy, init)) + init.actorRef.stop + } init } }