Implemented swapping TypedActor instance on restart

This commit is contained in:
Jonas Bonér 2010-07-28 00:58:52 +02:00
parent 0d51c6f1d4
commit df0eddfd66
5 changed files with 51 additions and 42 deletions

View file

@ -1022,20 +1022,14 @@ sealed class LocalActorRef private[akka](
guard.withGuard {
lifeCycle match {
case Some(LifeCycle(Temporary, _, _)) => shutDownTemporaryActor(this)
case _ =>
case _ =>
// either permanent or none where default is permanent
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
Actor.log.debug("Restarting linked actors for actor [%s].", id)
restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
Actor.log.debug("Invoking 'preRestart' for failed actor instance [%s].", id)
failedActor.preRestart(reason)
nullOutActorRefReferencesFor(failedActor)
val freshActor = newActor
freshActor.init
freshActor.initTransactionalState
actorInstance.set(freshActor)
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
freshActor.postRestart(reason)
if (isTypedActorDispatcher(failedActor)) restartTypedActorDispatcher(failedActor, reason)
else restartActor(failedActor, reason)
_isBeingRestarted = false
}
}
@ -1072,6 +1066,24 @@ sealed class LocalActorRef private[akka](
// ========= PRIVATE FUNCTIONS =========
private def isTypedActorDispatcher(a: Actor): Boolean = a.isInstanceOf[Dispatcher]
private def restartTypedActorDispatcher(failedActor: Actor, reason: Throwable) = {
failedActor.preRestart(reason)
failedActor.postRestart(reason)
}
private def restartActor(failedActor: Actor, reason: Throwable) = {
failedActor.preRestart(reason)
nullOutActorRefReferencesFor(failedActor)
val freshActor = newActor
freshActor.init
freshActor.initTransactionalState
actorInstance.set(freshActor)
Actor.log.debug("Invoking 'postRestart' for new actor instance [%s].", id)
freshActor.postRestart(reason)
}
private def spawnButDoNotStart[T <: Actor: Manifest]: ActorRef = guard.withGuard {
val actorRef = Actor.actorOf(manifest[T].erasure.asInstanceOf[Class[T]].newInstance)
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actorRef.dispatcher = dispatcher
@ -1112,7 +1124,8 @@ sealed class LocalActorRef private[akka](
createNewTransactionSet
} else oldTxSet
Actor.log.ifTrace("Joining transaction set [" + currentTxSet +
"];\n\tactor " + toString + "\n\twith message [" + message + "]")
"];\n\tactor " + toString +
"\n\twith message [" + message + "]")
val mtx = ThreadLocalTransaction.getThreadLocalTransaction
if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties
else currentTxSet.incParties(mtx, 1)
@ -1152,8 +1165,7 @@ sealed class LocalActorRef private[akka](
new TransactionSetAbortedException("Transaction set has been aborted by another participant"),
message, topLevelTransaction)
case e: InterruptedException => {} // received message while actor is shutting down, ignore
case e =>
handleExceptionInDispatch(e, message, topLevelTransaction)
case e => handleExceptionInDispatch(e, message, topLevelTransaction)
} finally {
clearTransaction
if (topLevelTransaction) clearTransactionSet

View file

@ -19,7 +19,7 @@ import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
import java.net.InetSocketAddress
import java.lang.reflect.{InvocationTargetException, Method}
import java.lang.reflect.{InvocationTargetException, Method, Field}
import scala.reflect.BeanProperty
@ -729,6 +729,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor {
private var context: Option[TypedActorContext] = None
private var targetClass: Class[_] = _
@volatile private[akka] var targetInstance: TypedActor = _
private var proxyDelegate: Field = _
private[actor] def initialize(
targetClass: Class[_], targetInstance: TypedActor, proxy: AnyRef, ctx: Option[TypedActorContext]) = {
@ -740,6 +741,12 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor {
this.targetInstance = targetInstance
this.context = ctx
proxyDelegate = {
val field = proxy.getClass.getDeclaredField("DELEGATE_0")
field.setAccessible(true)
field
}
if (self.lifeCycle.isEmpty) self.lifeCycle = Some(LifeCycle(Permanent))
}
@ -766,6 +773,10 @@ private[akka] class Dispatcher(transactionalRequired: Boolean) extends Actor {
override def preRestart(reason: Throwable) {
crashedActorTl.set(this)
targetInstance.preRestart(reason)
// rewrite target instance in Dispatcher and AspectWerkz Proxy
targetInstance = TypedActor.newTypedActor(targetClass)
proxyDelegate.set(proxy, targetInstance)
}
override def postRestart(reason: Throwable) {

View file

@ -3,7 +3,6 @@ package se.scalablesolutions.akka.actor;
import java.util.concurrent.CountDownLatch;
public interface SamplePojo {
public CountDownLatch newCountdownLatch(int count);
public String greet(String s);
public String fail();
}

View file

@ -6,7 +6,7 @@ import java.util.concurrent.CountDownLatch;
public class SamplePojoImpl extends TypedActor implements SamplePojo {
private CountDownLatch latch;
public static CountDownLatch latch = new CountDownLatch(1);
public static boolean _pre = false;
public static boolean _post = false;
@ -17,15 +17,6 @@ public class SamplePojoImpl extends TypedActor implements SamplePojo {
_down = false;
}
public SamplePojoImpl() {
latch = new CountDownLatch(1);
}
public CountDownLatch newCountdownLatch(int count) {
latch = new CountDownLatch(count);
return latch;
}
public String greet(String s) {
return "hello " + s;
}

View file

@ -10,40 +10,35 @@ import se.scalablesolutions.akka.actor.TypedActor._
import se.scalablesolutions.akka.config.{OneForOneStrategy, TypedActorConfigurator}
import se.scalablesolutions.akka.config.JavaConfig._
import java.util.concurrent.CountDownLatch
/**
* @author Martin Krasser
*/
@RunWith(classOf[JUnitRunner])
class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAfterAll {
// var conf1: TypedActorConfigurator = _
// var conf2: TypedActorConfigurator = _
var conf3: TypedActorConfigurator = _
var conf4: TypedActorConfigurator = _
var conf1: TypedActorConfigurator = _
var conf2: TypedActorConfigurator = _
override protected def beforeAll() = {
val strategy = new RestartStrategy(new AllForOne(), 3, 1000, Array(classOf[Exception]))
// val comp1 = new Component(classOf[SamplePojoAnnotated], classOf[SamplePojoAnnotatedImpl], new LifeCycle(new Permanent()), 1000)
// val comp2 = new Component(classOf[SamplePojoAnnotated], classOf[SamplePojoAnnotatedImpl], new LifeCycle(new Temporary()), 1000)
val comp3 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new LifeCycle(new Permanent()), 1000)
val comp4 = new Component(classOf[SamplePojo], classOf[SamplePojoImpl], new LifeCycle(new Temporary()), 1000)
// conf1 = new TypedActorConfigurator().configure(strategy, Array(comp1)).supervise
// conf2 = new TypedActorConfigurator().configure(strategy, Array(comp2)).supervise
conf3 = new TypedActorConfigurator().configure(strategy, Array(comp3)).supervise
conf4 = new TypedActorConfigurator().configure(strategy, Array(comp4)).supervise
conf1 = new TypedActorConfigurator().configure(strategy, Array(comp3)).supervise
conf2 = new TypedActorConfigurator().configure(strategy, Array(comp4)).supervise
}
override protected def afterAll() = {
// conf1.stop
// conf2.stop
conf3.stop
conf4.stop
conf1.stop
conf2.stop
}
describe("TypedActor lifecycle management") {
it("should restart supervised, non-annotated typed actor on failure") {
SamplePojoImpl.reset
val obj = conf3.getInstance[SamplePojo](classOf[SamplePojo])
val cdl = obj.newCountdownLatch(2)
val obj = conf1.getInstance[SamplePojo](classOf[SamplePojo])
val cdl = new CountDownLatch(2)
SamplePojoImpl.latch = cdl
assert(AspectInitRegistry.initFor(obj) ne null)
try {
obj.fail
@ -61,8 +56,9 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
it("should shutdown supervised, non-annotated typed actor on failure") {
SamplePojoImpl.reset
val obj = conf4.getInstance[SamplePojo](classOf[SamplePojo])
val cdl = obj.newCountdownLatch(1)
val obj = conf2.getInstance[SamplePojo](classOf[SamplePojo])
val cdl = new CountDownLatch(1)
SamplePojoImpl.latch = cdl
assert(AspectInitRegistry.initFor(obj) ne null)
try {
obj.fail