diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 3d01ec7886..cb68590422 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -1022,20 +1022,14 @@ sealed class LocalActorRef private[akka]( guard.withGuard { lifeCycle match { case Some(LifeCycle(Temporary, _, _)) => shutDownTemporaryActor(this) - case _ => + case _ => // either permanent or none where default is permanent Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) Actor.log.debug("Restarting linked actors for actor [%s].", id) restartLinkedActors(reason, maxNrOfRetries, withinTimeRange) Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id) - failedActor.preRestart(reason) - nullOutActorRefReferencesFor(failedActor) - val freshActor = newActor - freshActor.init - freshActor.initTransactionalState - actorInstance.set(freshActor) - Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) - freshActor.postRestart(reason) + if (isTypedActorDispatcher(failedActor)) restartTypedActorDispatcher(failedActor, reason) + else restartActor(failedActor, reason) _isBeingRestarted = false } } @@ -1072,6 +1066,24 @@ sealed class LocalActorRef private[akka]( // ========= PRIVATE FUNCTIONS ========= + private def isTypedActorDispatcher(a: Actor): Boolean = a.isInstanceOf[Dispatcher] + + private def restartTypedActorDispatcher(failedActor: Actor, reason: Throwable) = { + failedActor.preRestart(reason) + failedActor.postRestart(reason) + } + + private def restartActor(failedActor: Actor, reason: Throwable) = { + failedActor.preRestart(reason) + nullOutActorRefReferencesFor(failedActor) + val freshActor = newActor + freshActor.init + freshActor.initTransactionalState + actorInstance.set(freshActor) + Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id) + freshActor.postRestart(reason) + } + private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard { val actorRef = Actor.actorOf(manifest[T].erasure.asInstanceOf[Class[T]].newInstance) if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actorRef.dispatcher = dispatcher @@ -1112,7 +1124,8 @@ sealed class LocalActorRef private[akka]( createNewTransactionSet } else oldTxSet Actor.log.ifTrace("Joining transaction set [" + currentTxSet + - "];\n\tactor " + toString + "\n\twith message [" + message + "]") + "];\n\tactor " + toString + + "\n\twith message [" + message + "]") val mtx = ThreadLocalTransaction.getThreadLocalTransaction if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties else currentTxSet.incParties(mtx, 1) @@ -1152,8 +1165,7 @@ sealed class LocalActorRef private[akka]( new TransactionSetAbortedException("Transaction set has been aborted by another participant"), message, topLevelTransaction) case e: InterruptedException => {} // received message while actor is shutting down, ignore - case e => - handleExceptionInDispatch(e, message, topLevelTransaction) + case e => handleExceptionInDispatch(e, message, topLevelTransaction) } finally { clearTransaction if (topLevelTransaction) clearTransactionSet diff --git a/akka-core/src/main/scala/actor/TypedActor.scala b/akka-core/src/main/scala/actor/TypedActor.scala index 971551620e..7d41775b60 100644 --- a/akka-core/src/main/scala/actor/TypedActor.scala +++ b/akka-core/src/main/scala/actor/TypedActor.scala @@ -19,7 +19,7 @@ import org.codehaus.aspectwerkz.proxy.Proxy import org.codehaus.aspectwerkz.annotation.{Aspect, Around} import java.net.InetSocketAddress -import java.lang.reflect.{InvocationTargetException, Method} +import java.lang.reflect.{InvocationTargetException, Method, Field} import scala.reflect.BeanProperty @@ -729,6 +729,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor { private var context: Option[TypedActorContext] = None private var targetClass: Class[_] = _ @volatile private[akka] var targetInstance: TypedActor = _ + private var proxyDelegate: Field = _ private[actor] def initialize( targetClass: Class[_], targetInstance: TypedActor, proxy: AnyRef, ctx: Option[TypedActorContext]) = { @@ -740,6 +741,12 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor { this.targetInstance = targetInstance this.context = ctx + proxyDelegate = { + val field = proxy.getClass.getDeclaredField("DELEGATE_0") + field.setAccessible(true) + field + } + if (self.lifeCycle.isEmpty) self.lifeCycle = Some(LifeCycle(Permanent)) } @@ -766,6 +773,10 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor { override def preRestart(reason: Throwable) { crashedActorTl.set(this) targetInstance.preRestart(reason) + + // rewrite target instance in Dispatcher and AspectWerkz Proxy + targetInstance = TypedActor.newTypedActor(targetClass) + proxyDelegate.set(proxy, targetInstance) } override def postRestart(reason: Throwable) { diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java index 2e3da4f038..ae47276ba6 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojo.java @@ -3,7 +3,6 @@ package se.scalablesolutions.akka.actor; import java.util.concurrent.CountDownLatch; public interface SamplePojo { - public CountDownLatch newCountdownLatch(int count); public String greet(String s); public String fail(); } \ No newline at end of file diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java index b46a9f8fa4..d57232b629 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/SamplePojoImpl.java @@ -6,7 +6,7 @@ import java.util.concurrent.CountDownLatch; public class SamplePojoImpl extends TypedActor implements SamplePojo { - private CountDownLatch latch; + public static CountDownLatch latch = new CountDownLatch(1); public static boolean _pre = false; public static boolean _post = false; @@ -17,15 +17,6 @@ public class SamplePojoImpl extends TypedActor implements SamplePojo { _down = false; } - public SamplePojoImpl() { - latch = new CountDownLatch(1); - } - - public CountDownLatch newCountdownLatch(int count) { - latch = new CountDownLatch(count); - return latch; - } - public String greet(String s) { return "hello " + s; } diff --git a/akka-core/src/test/scala/TypedActorLifecycleSpec.scala b/akka-core/src/test/scala/TypedActorLifecycleSpec.scala index f17f50e9a5..02a5712410 100644 --- a/akka-core/src/test/scala/TypedActorLifecycleSpec.scala +++ b/akka-core/src/test/scala/TypedActorLifecycleSpec.scala @@ -10,40 +10,35 @@ import se.scalablesolutions.akka.actor.TypedActor._ import se.scalablesolutions.akka.config.{OneForOneStrategy, TypedActorConfigurator} import se.scalablesolutions.akka.config.JavaConfig._ +import java.util.concurrent.CountDownLatch + /** * @author Martin Krasser */ @RunWith(classOf[JUnitRunner]) class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAfterAll { -// var conf1: TypedActorConfigurator = _ -// var conf2: TypedActorConfigurator = _ - var conf3: TypedActorConfigurator = _ - var conf4: TypedActorConfigurator = _ + var conf1: TypedActorConfigurator = _ + var conf2: TypedActorConfigurator = _ override protected def beforeAll() = { val strategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Exception])) -// val comp1 = new Component(classOf[SamplePojoAnnotated], classOf[SamplePojoAnnotatedImpl], new LifeCycle(new Permanent()), 1000) -// val comp2 = new Component(classOf[SamplePojoAnnotated], classOf[SamplePojoAnnotatedImpl], new LifeCycle(new Temporary()), 1000) val comp3 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new LifeCycle(new Permanent()), 1000) val comp4 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new LifeCycle(new Temporary()), 1000) -// conf1 = new TypedActorConfigurator().configure(strategy, Array(comp1)).supervise -// conf2 = new TypedActorConfigurator().configure(strategy, Array(comp2)).supervise - conf3 = new TypedActorConfigurator().configure(strategy, Array(comp3)).supervise - conf4 = new TypedActorConfigurator().configure(strategy, Array(comp4)).supervise + conf1 = new TypedActorConfigurator().configure(strategy, Array(comp3)).supervise + conf2 = new TypedActorConfigurator().configure(strategy, Array(comp4)).supervise } override protected def afterAll() = { -// conf1.stop -// conf2.stop - conf3.stop - conf4.stop + conf1.stop + conf2.stop } describe("TypedActor lifecycle management") { it("should restart supervised, non-annotated typed actor on failure") { SamplePojoImpl.reset - val obj = conf3.getInstance[SamplePojo](classOf[SamplePojo]) - val cdl = obj.newCountdownLatch(2) + val obj = conf1.getInstance[SamplePojo](classOf[SamplePojo]) + val cdl = new CountDownLatch(2) + SamplePojoImpl.latch = cdl assert(AspectInitRegistry.initFor(obj) ne null) try { obj.fail @@ -61,8 +56,9 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft it("should shutdown supervised, non-annotated typed actor on failure") { SamplePojoImpl.reset - val obj = conf4.getInstance[SamplePojo](classOf[SamplePojo]) - val cdl = obj.newCountdownLatch(1) + val obj = conf2.getInstance[SamplePojo](classOf[SamplePojo]) + val cdl = new CountDownLatch(1) + SamplePojoImpl.latch = cdl assert(AspectInitRegistry.initFor(obj) ne null) try { obj.fail