diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 6c8a92f325..83b78c8645 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -849,7 +849,10 @@ class LocalActorRef private[akka] ( dead.restart(reason, maxRetries, within) case _ => - notifySupervisorWithMessage(Exit(this, reason)) + if(_supervisor.isDefined) + notifySupervisorWithMessage(Exit(this, reason)) + else + dead.stop } } @@ -884,22 +887,27 @@ class LocalActorRef private[akka] ( } protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) { - - def performRestart { + def performRestart { Actor.log.slf4j.info("Restarting actor [{}] configured as PERMANENT.", id) val failedActor = actorInstance.get - Actor.log.slf4j.debug("Invoking 'preRestart' for failed actor instance [{}].", id) - failedActor.preRestart(reason) - val freshActor = newActor - setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor - actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call - freshActor.preStart + failedActor match { - case p: Proxyable => p.swapProxiedActor(freshActor) + case p: Proxyable => + //p.swapProxiedActor(freshActor) //TODO: broken + Actor.log.slf4j.debug("Invoking 'preRestart' for failed actor instance [{}].", id) + failedActor.preRestart(reason) + Actor.log.slf4j.debug("Invoking 'postRestart' for failed actor instance [{}].", id) + failedActor.postRestart(reason) case _ => + Actor.log.slf4j.debug("Invoking 'preRestart' for failed actor instance [{}].", id) + failedActor.preRestart(reason) + val freshActor = newActor + setActorSelfFields(failedActor,null) //Only null out the references if we could instantiate the new actor + actorInstance.set(freshActor) //Assign it here so if preStart fails, we can null out the sef-refs next call + freshActor.preStart + Actor.log.slf4j.debug("Invoking 'postRestart' for new actor instance [{}].", id) + freshActor.postRestart(reason) } - Actor.log.slf4j.debug("Invoking 'postRestart' for new actor instance [{}].", id) - freshActor.postRestart(reason) } def tooManyRestarts { @@ -935,7 +943,8 @@ class LocalActorRef private[akka] ( performRestart true } catch { - case e => false //An error or exception here should trigger a retry + case e => Actor.log.slf4j.debug("Unexpected exception during restart",e) + false //An error or exception here should trigger a retry } Actor.log.slf4j.debug("Restart: {} for [{}].", success, id) diff --git a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala index 45962ed7fa..874342e5dc 100644 --- a/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/akka/actor/TypedActor.scala @@ -17,6 +17,7 @@ import org.codehaus.aspectwerkz.proxy.Proxy import org.codehaus.aspectwerkz.annotation.{Aspect, Around} import java.net.InetSocketAddress +import java.util.concurrent.atomic.AtomicBoolean import scala.reflect.BeanProperty import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy} @@ -807,7 +808,7 @@ private[akka] sealed class ServerManagedTypedActorAspect extends ActorAspect { @Around("execution(* *.*(..)) && this(akka.actor.ServerManagedTypedActor)") def invoke(joinPoint: JoinPoint): AnyRef = { - if (!isInitialized) initialize(joinPoint) + initialize(joinPoint) remoteDispatch(joinPoint) } @@ -830,7 +831,7 @@ private[akka] sealed class TypedActorAspect extends ActorAspect { @Around("execution(* *.*(..)) && !this(akka.actor.ServerManagedTypedActor)") def invoke(joinPoint: JoinPoint): AnyRef = { - if (!isInitialized) initialize(joinPoint) + initialize(joinPoint) dispatch(joinPoint) } @@ -844,8 +845,8 @@ private[akka] sealed class TypedActorAspect extends ActorAspect { * Base class for TypedActorAspect and ServerManagedTypedActorAspect to reduce code duplication. */ private[akka] abstract class ActorAspect { - @volatile protected var isInitialized = false - @volatile protected var isStopped = false + protected val isInitialized = new AtomicBoolean(false) + protected val isStopped = new AtomicBoolean(false) protected var interfaceClass: Class[_] = _ protected var typedActor: TypedActor = _ protected var actorRef: ActorRef = _ @@ -861,11 +862,16 @@ private[akka] abstract class ActorAspect { val isCoordinated = TypedActor.isCoordinated(methodRtti) typedActor.context._sender = senderProxy - if (!actorRef.isRunning && !isStopped) { - isStopped = true - joinPoint.proceed - } else if (isOneWay && isCoordinated) { + if (!isStopped.get && !actorRef.isRunning) { + if (isStopped.compareAndSet(false,true)) { + val proxy = TypedActor.proxyFor(actorRef) + if (proxy ne null) + TypedActor.stop(proxy) + } + } + + if (isOneWay && isCoordinated) { val coordinatedOpt = Option(Coordination.coordinated.value) val coordinated = coordinatedOpt.map( coord => if (Coordination.firstParty.value) { // already included in coordination @@ -880,7 +886,6 @@ private[akka] abstract class ActorAspect { } else if (isCoordinated) { throw new CoordinateException("Can't use @Coordinated annotation with non-void methods.") - } else if (isOneWay) { actorRef.!(joinPoint)(senderActorRef) null.asInstanceOf[AnyRef] @@ -942,15 +947,16 @@ private[akka] abstract class ActorAspect { (escapedArgs, isEscaped) } - protected def initialize(joinPoint: JoinPoint): Unit = { - val init = AspectInitRegistry.initFor(joinPoint.getThis) - interfaceClass = init.interfaceClass - typedActor = init.targetInstance - actorRef = init.actorRef - uuid = actorRef.uuid - remoteAddress = init.remoteAddress - timeout = init.timeout - isInitialized = true + protected def initialize(joinPoint: JoinPoint) { + if(isInitialized.compareAndSet(false, true)) { + val init = AspectInitRegistry.initFor(joinPoint.getThis) + interfaceClass = init.interfaceClass + typedActor = init.targetInstance + actorRef = init.actorRef + uuid = actorRef.uuid + remoteAddress = init.remoteAddress + timeout = init.timeout + } } } diff --git a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala index f9b0726f92..7478f2566a 100644 --- a/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala +++ b/akka-typed-actor/src/test/scala/actor/typed-actor/TypedActorLifecycleSpec.scala @@ -93,5 +93,25 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft SimpleJavaPojoImpl._pre should be(true) SimpleJavaPojoImpl._post should be(true) } + + it("should be stopped when supervision cannot handle the problem in") { + val actorSupervision = new SuperviseTypedActor(classOf[TypedActorFailer],classOf[TypedActorFailerImpl],permanent(),30000) + val conf = new TypedActorConfigurator().configure(OneForOneStrategy(Nil, 3, 500000), Array(actorSupervision)).inject.supervise + try { + val first = conf.getInstance(classOf[TypedActorFailer]) + intercept[RuntimeException] { + first.fail + } + val second = conf.getInstance(classOf[TypedActorFailer]) + + first should be (second) + + intercept[ActorInitializationException] { + second.fail + } + } finally { + conf.stop + } + } } }