Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2010-10-31 07:13:21 +01:00
commit 2095f50542
6 changed files with 159 additions and 89 deletions

View file

@ -999,25 +999,21 @@ class LocalActorRef private[akka] (
} }
} }
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = { protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
if (faultHandler.trapExit.exists(_.isAssignableFrom(reason.getClass))) {
faultHandler match { faultHandler match {
case AllForOneStrategy(_,maxNrOfRetries, withinTimeRange) => case AllForOneStrategy(trapExit,maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) =>
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) restartLinkedActors(reason, maxRetries, within)
case OneForOneStrategy(_,maxNrOfRetries, withinTimeRange) => case OneForOneStrategy(trapExit,maxRetries, within) if trapExit.exists(_.isAssignableFrom(reason.getClass)) =>
dead.restart(reason, maxNrOfRetries, withinTimeRange) dead.restart(reason, maxRetries, within)
case NoFaultHandlingStrategy => case _ =>
notifySupervisorWithMessage(Exit(this, reason)) //This shouldn't happen notifySupervisorWithMessage(Exit(this, reason))
}
} else {
notifySupervisorWithMessage(Exit(this, reason)) // if 'trapExit' isn't triggered then pass the Exit on
} }
} }
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = { private def requestRestartPermission(maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Boolean = {
val isUnrestartable = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal val denied = if (maxNrOfRetries.isEmpty && withinTimeRange.isEmpty) { //Immortal
false false
} else if (withinTimeRange.isEmpty) { // restrict number of restarts } else if (withinTimeRange.isEmpty) { // restrict number of restarts
maxNrOfRetriesCount += 1 //Increment number of retries maxNrOfRetriesCount += 1 //Increment number of retries
@ -1043,7 +1039,29 @@ class LocalActorRef private[akka] (
unrestartable unrestartable
} }
if (isUnrestartable) { denied == false //If we weren't denied, we have a go
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
def performRestart {
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
val failedActor = actorInstance.get
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
failedActor.preRestart(reason)
val freshActor = newActor
setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor
actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.preStart
failedActor match {
case p: Proxyable => p.swapProxiedActor(freshActor)
case _ =>
}
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
freshActor.postRestart(reason)
}
def tooManyRestarts {
Actor.log.warning( Actor.log.warning(
"Maximum number of restarts [%s] within time range [%s] reached." + "Maximum number of restarts [%s] within time range [%s] reached." +
"\n\tWill *not* restart actor [%s] anymore." + "\n\tWill *not* restart actor [%s] anymore." +
@ -1060,30 +1078,48 @@ class LocalActorRef private[akka] (
} }
stop stop
} else { }
guard.withGuard {
@tailrec def attemptRestart() {
val success = if (requestRestartPermission(maxNrOfRetries,withinTimeRange)) {
guard.withGuard[Boolean] {
_status = ActorRefInternals.BEING_RESTARTED _status = ActorRefInternals.BEING_RESTARTED
lifeCycle match { lifeCycle match {
case Temporary => shutDownTemporaryActor(this) case Temporary =>
case _ => shutDownTemporaryActor(this)
val failedActor = actorInstance.get true
case _ => // either permanent or none where default is permanent
val success = try {
performRestart
true
} catch {
case e => false //An error or exception here should trigger a retry
}
// either permanent or none where default is permanent Actor.log.debug("Restart: %s for [%s].", success, id)
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
Actor.log.debug("Restarting linked actors for actor [%s].", id)
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
if (isProxyableDispatcher(failedActor))
restartProxyableDispatcher(failedActor, reason)
else
restartActor(failedActor, reason)
if (success) {
_status = ActorRefInternals.RUNNING _status = ActorRefInternals.RUNNING
dispatcher.resume(this) dispatcher.resume(this)
restartLinkedActors(reason,maxNrOfRetries,withinTimeRange)
}
success
} }
} }
} else {
tooManyRestarts
true //Done
} }
if (!success)
attemptRestart
else
() //Yay!
}
attemptRestart() //Tailrecursion
} }
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = { protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) = {
@ -1109,25 +1145,6 @@ class LocalActorRef private[akka] (
// ========= PRIVATE FUNCTIONS ========= // ========= PRIVATE FUNCTIONS =========
private def isProxyableDispatcher(a: Actor): Boolean = a.isInstanceOf[Proxyable]
private def restartProxyableDispatcher(failedActor: Actor, reason: Throwable) = {
failedActor.preRestart(reason)
failedActor.postRestart(reason)
}
private def restartActor(failedActor: Actor, reason: Throwable) = {
failedActor.preRestart(reason)
setActorSelfFields(failedActor,null)
val freshActor = newActor
freshActor.preStart
actorInstance.set(freshActor)
if (failedActor.isInstanceOf[Proxyable])
failedActor.asInstanceOf[Proxyable].swapProxiedActor(freshActor)
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
freshActor.postRestart(reason)
}
private[this] def newActor: Actor = { private[this] def newActor: Actor = {
Actor.actorRefInCreation.withValue(Some(this)) { Actor.actorRefInCreation.withValue(Some(this)) {
val actor = actorFactory() val actor = actorFactory()
@ -1196,7 +1213,7 @@ class LocalActorRef private[akka] (
} }
} }
private def shutDownTemporaryActor(temporaryActor: ActorRef) = { private def shutDownTemporaryActor(temporaryActor: ActorRef) {
Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", temporaryActor.id) Actor.log.info("Actor [%s] configured as TEMPORARY and will not be restarted.", temporaryActor.id)
temporaryActor.stop temporaryActor.stop
linkedActors.remove(temporaryActor.uuid) // remove the temporary actor linkedActors.remove(temporaryActor.uuid) // remove the temporary actor
@ -1208,6 +1225,8 @@ class LocalActorRef private[akka] (
temporaryActor.id) temporaryActor.id)
notifySupervisorWithMessage(UnlinkAndStop(this)) notifySupervisorWithMessage(UnlinkAndStop(this))
} }
true
} }
private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = { private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = {

View file

@ -141,9 +141,9 @@ sealed class Supervisor(handler: FaultHandlingStrategy) {
_childActors.put(className, actorRef :: currentActors) _childActors.put(className, actorRef :: currentActors)
actorRef.lifeCycle = lifeCycle actorRef.lifeCycle = lifeCycle
supervisor.link(actorRef) supervisor.link(actorRef)
remoteAddress.foreach { address => if (remoteAddress.isDefined) {
RemoteServerModule.registerActor( val address = remoteAddress.get
new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef) RemoteServerModule.registerActor(new InetSocketAddress(address.hostname, address.port), actorRef)
} }
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val childSupervisor = Supervisor(supervisorConfig) val childSupervisor = Supervisor(supervisorConfig)

View file

@ -114,7 +114,7 @@ object ReflectiveAccess extends Logging {
val PORT = Config.config.getInt("akka.remote.server.port", 2552) val PORT = Config.config.getInt("akka.remote.server.port", 2552)
type RemoteServerObject = { type RemoteServerObject = {
def registerActor(address: InetSocketAddress, uuid: Uuid, actor: ActorRef): Unit def registerActor(address: InetSocketAddress, actor: ActorRef): Unit
def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef): Unit def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef): Unit
} }
@ -128,9 +128,9 @@ object ReflectiveAccess extends Logging {
val remoteNodeObjectInstance: Option[RemoteNodeObject] = val remoteNodeObjectInstance: Option[RemoteNodeObject] =
getObjectFor("akka.remote.RemoteNode$") getObjectFor("akka.remote.RemoteNode$")
def registerActor(address: InetSocketAddress, uuid: Uuid, actorRef: ActorRef) = { def registerActor(address: InetSocketAddress, actorRef: ActorRef) = {
ensureRemotingEnabled ensureRemotingEnabled
remoteServerObjectInstance.get.registerActor(address, uuid, actorRef) remoteServerObjectInstance.get.registerActor(address, actorRef)
} }
def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = { def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) = {

View file

@ -10,7 +10,8 @@ import Actor._
import org.scalatest.junit.JUnitSuite import org.scalatest.junit.JUnitSuite
import org.junit.Test import org.junit.Test
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent. {CountDownLatch, TimeUnit, LinkedBlockingQueue}
object SupervisorSpec { object SupervisorSpec {
var messageLog = new LinkedBlockingQueue[String] var messageLog = new LinkedBlockingQueue[String]
@ -493,6 +494,40 @@ class SupervisorSpec extends JUnitSuite {
} }
} }
@Test def shouldAttemptRestartWhenExceptionDuringRestart {
val inits = new AtomicInteger(0)
val dyingActor = actorOf(new Actor {
self.lifeCycle = Permanent
log.debug("Creating dying actor, attempt: " + inits.incrementAndGet)
if (!(inits.get % 2 != 0))
throw new IllegalStateException("Don't wanna!")
def receive = {
case Ping => self.reply_?("pong")
case Die => throw new Exception("expected")
}
})
val supervisor =
Supervisor(
SupervisorConfig(
OneForOneStrategy(classOf[Exception] :: Nil,3,10000),
Supervise(dyingActor,Permanent) :: Nil))
intercept[Exception] {
dyingActor !! (Die, 5000)
}
expect("pong") {
(dyingActor !! (Ping, 5000)).getOrElse("nil")
}
expect(3) { inits.get }
supervisor.shutdown
}
// ============================================= // =============================================
// Create some supervisors with different configurations // Create some supervisors with different configurations

View file

@ -139,6 +139,20 @@ object RemoteServer {
private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard { private[akka] def unregister(hostname: String, port: Int) = guard.withWriteGuard {
remoteServers.remove(Address(hostname, port)) remoteServers.remove(Address(hostname, port))
} }
/**
* Used in REflectiveAccess
*/
private[akka] def registerActor(address: InetSocketAddress, actorRef: ActorRef) {
serverFor(address) foreach { _.register(actorRef) }
}
/**
* Used in Reflective
*/
private[akka] def registerTypedActor(address: InetSocketAddress, implementationClassName: String, proxy: AnyRef) {
serverFor(address) foreach { _.registerTypedActor(implementationClassName,proxy)}
}
} }
/** /**

View file

@ -919,9 +919,11 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
* Unregisters initialization and stops its ActorRef. * Unregisters initialization and stops its ActorRef.
*/ */
def unregister(proxy: AnyRef): AspectInit = { def unregister(proxy: AnyRef): AspectInit = {
val init = initializations.remove(proxy) val init = if (proxy ne null) initializations.remove(proxy) else null
if (init ne null) {
notifyListeners(AspectInitUnregistered(proxy, init)) notifyListeners(AspectInitUnregistered(proxy, init))
init.actorRef.stop init.actorRef.stop
}
init init
} }
} }