Potential fix for #628 and #629

This commit is contained in:
Viktor Klang 2011-01-25 16:23:50 +01:00
parent 98c41733bc
commit 1dfaebda15
3 changed files with 66 additions and 31 deletions

View file

@ -849,7 +849,10 @@ class LocalActorRef private[akka] (
dead.restart(reason, maxRetries, within)
case _ =>
notifySupervisorWithMessage(Exit(this, reason))
if(_supervisor.isDefined)
notifySupervisorWithMessage(Exit(this, reason))
else
dead.stop
}
}
@ -884,22 +887,27 @@ class LocalActorRef private[akka] (
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
def performRestart {
def performRestart {
Actor.log.slf4j.info("Restarting actor [{}] configured as PERMANENT.", id)
val failedActor = actorInstance.get
Actor.log.slf4j.debug("Invoking 'preRestart' for failed actor instance [{}].", id)
failedActor.preRestart(reason)
val freshActor = newActor
setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor
actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.preStart
failedActor match {
case p: Proxyable => p.swapProxiedActor(freshActor)
case p: Proxyable =>
//p.swapProxiedActor(freshActor) //TODO: broken
Actor.log.slf4j.debug("Invoking 'preRestart' for failed actor instance [{}].", id)
failedActor.preRestart(reason)
Actor.log.slf4j.debug("Invoking 'postRestart' for failed actor instance [{}].", id)
failedActor.postRestart(reason)
case _ =>
Actor.log.slf4j.debug("Invoking 'preRestart' for failed actor instance [{}].", id)
failedActor.preRestart(reason)
val freshActor = newActor
setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor
actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call
freshActor.preStart
Actor.log.slf4j.debug("Invoking 'postRestart' for new actor instance [{}].", id)
freshActor.postRestart(reason)
}
Actor.log.slf4j.debug("Invoking 'postRestart' for new actor instance [{}].", id)
freshActor.postRestart(reason)
}
def tooManyRestarts {
@ -935,7 +943,8 @@ class LocalActorRef private[akka] (
performRestart
true
} catch {
case e => false //An error or exception here should trigger a retry
case e => Actor.log.slf4j.debug("Unexpected exception during restart",e)
false //An error or exception here should trigger a retry
}
Actor.log.slf4j.debug("Restart: {} for [{}].", success, id)

View file

@ -17,6 +17,7 @@ import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicBoolean
import scala.reflect.BeanProperty
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy}
@ -807,7 +808,7 @@ private[akka] sealed class ServerManagedTypedActorAspect extends ActorAspect {
@Around("execution(* *.*(..)) && this(akka.actor.ServerManagedTypedActor)")
def invoke(joinPoint: JoinPoint): AnyRef = {
if (!isInitialized) initialize(joinPoint)
initialize(joinPoint)
remoteDispatch(joinPoint)
}
@ -830,7 +831,7 @@ private[akka] sealed class TypedActorAspect extends ActorAspect {
@Around("execution(* *.*(..)) && !this(akka.actor.ServerManagedTypedActor)")
def invoke(joinPoint: JoinPoint): AnyRef = {
if (!isInitialized) initialize(joinPoint)
initialize(joinPoint)
dispatch(joinPoint)
}
@ -844,8 +845,8 @@ private[akka] sealed class TypedActorAspect extends ActorAspect {
* Base class for TypedActorAspect and ServerManagedTypedActorAspect to reduce code duplication.
*/
private[akka] abstract class ActorAspect {
@volatile protected var isInitialized = false
@volatile protected var isStopped = false
protected val isInitialized = new AtomicBoolean(false)
protected val isStopped = new AtomicBoolean(false)
protected var interfaceClass: Class[_] = _
protected var typedActor: TypedActor = _
protected var actorRef: ActorRef = _
@ -861,11 +862,16 @@ private[akka] abstract class ActorAspect {
val isCoordinated = TypedActor.isCoordinated(methodRtti)
typedActor.context._sender = senderProxy
if (!actorRef.isRunning && !isStopped) {
isStopped = true
joinPoint.proceed
} else if (isOneWay && isCoordinated) {
if (!isStopped.get && !actorRef.isRunning) {
if (isStopped.compareAndSet(false,true)) {
val proxy = TypedActor.proxyFor(actorRef)
if (proxy ne null)
TypedActor.stop(proxy)
}
}
if (isOneWay && isCoordinated) {
val coordinatedOpt = Option(Coordination.coordinated.value)
val coordinated = coordinatedOpt.map( coord =>
if (Coordination.firstParty.value) { // already included in coordination
@ -880,7 +886,6 @@ private[akka] abstract class ActorAspect {
} else if (isCoordinated) {
throw new CoordinateException("Can't use @Coordinated annotation with non-void methods.")
} else if (isOneWay) {
actorRef.!(joinPoint)(senderActorRef)
null.asInstanceOf[AnyRef]
@ -942,15 +947,16 @@ private[akka] abstract class ActorAspect {
(escapedArgs, isEscaped)
}
protected def initialize(joinPoint: JoinPoint): Unit = {
val init = AspectInitRegistry.initFor(joinPoint.getThis)
interfaceClass = init.interfaceClass
typedActor = init.targetInstance
actorRef = init.actorRef
uuid = actorRef.uuid
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
protected def initialize(joinPoint: JoinPoint) {
if(isInitialized.compareAndSet(false, true)) {
val init = AspectInitRegistry.initFor(joinPoint.getThis)
interfaceClass = init.interfaceClass
typedActor = init.targetInstance
actorRef = init.actorRef
uuid = actorRef.uuid
remoteAddress = init.remoteAddress
timeout = init.timeout
}
}
}

View file

@ -93,5 +93,25 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
SimpleJavaPojoImpl._pre should be(true)
SimpleJavaPojoImpl._post should be(true)
}
it("should be stopped when supervision cannot handle the problem in") {
val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(),30000)
val conf = new TypedActorConfigurator().configure(OneForOneStrategy(Nil, 3, 500000), Array(actorSupervision)).inject.supervise
try {
val first = conf.getInstance(classOf[TypedActorFailer])
intercept[RuntimeException] {
first.fail
}
val second = conf.getInstance(classOf[TypedActorFailer])
first should be (second)
intercept[ActorInitializationException] {
second.fail
}
} finally {
conf.stop
}
}
}
}