re #296: Initial support for active object lifecycle management

This commit is contained in:
Martin Krasser 2010-07-01 15:36:27 +02:00
parent 21dc177b88
commit 1408dbe80b
5 changed files with 91 additions and 22 deletions

View file

@ -0,0 +1,14 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.annotation;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface shutdown {}

View file

@ -25,6 +25,7 @@ object Annotations {
val transactionrequired = classOf[transactionrequired]
val prerestart = classOf[prerestart]
val postrestart = classOf[postrestart]
val shutdown = classOf[shutdown]
val inittransactionalstate = classOf[inittransactionalstate]
}
@ -358,7 +359,6 @@ object ActiveObject extends Logging {
val proxy = Proxy.newInstance(target, true, false)
val context = injectActiveObjectContext(proxy)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context)
ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout))
@ -371,7 +371,6 @@ object ActiveObject extends Logging {
val context = injectActiveObjectContext(target)
val proxy = Proxy.newInstance(Array(intf), Array(target), true, false)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context)
ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout))
@ -379,6 +378,11 @@ object ActiveObject extends Logging {
proxy.asInstanceOf[T]
}
def stop(obj: AnyRef): Unit = {
val init = AspectInitRegistry.initFor(obj)
init.actorRef.stop
}
/**
* Get the underlying dispatcher actor for the given active object.
*/
@ -483,9 +487,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
def initFor(target: AnyRef) = {
val init = initializations.get(target)
initializations.remove(target)
init
initializations.get(target)
}
def register(target: AnyRef, init: AspectInit) = {
@ -493,10 +495,17 @@ private[akka] object AspectInitRegistry extends ListenerManagement {
foreachListener(_ ! AspectInitRegistered(target, init))
res
}
def unregister(target: AnyRef) = {
val res = initializations.remove(target)
foreachListener(_ ! AspectInitUnregistered(target, res))
res
}
}
private[akka] sealed trait AspectInitRegistryEvent
private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent
private[akka] case class AspectInitUnregistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent
private[akka] sealed case class AspectInit(
val target: Class[_],
@ -547,7 +556,10 @@ private[akka] sealed class ActiveObjectAspect {
val isOneWay = isVoid(rtti)
val sender = ActiveObjectContext.sender.value
val senderFuture = ActiveObjectContext.senderFuture.value
if (isOneWay) {
if (!actorRef.isRunning) {
joinPoint.proceed
} else if (isOneWay) {
actorRef ! Invocation(joinPoint, true, true, sender, senderFuture)
null.asInstanceOf[AnyRef]
} else {
@ -656,10 +668,13 @@ object Dispatcher {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Option[RestartCallbacks]) extends Actor {
private[akka] class Dispatcher(transactionalRequired: Boolean,
var restartCallbacks: Option[RestartCallbacks],
var shutdownCallback: Option[ShutdownCallback] = None) extends Actor {
import Dispatcher._
private[actor] var target: Option[AnyRef] = None
private var zhutdown: Option[Method] = None
private var preRestart: Option[Method] = None
private var postRestart: Option[Method] = None
private var initTxState: Option[Method] = None
@ -681,7 +696,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
val methods = targetInstance.getClass.getDeclaredMethods.toList
// See if we have any config define restart callbacks
callbacks match {
restartCallbacks match {
case None => {}
case Some(RestartCallbacks(pre, post)) =>
preRestart = Some(try {
@ -695,10 +710,22 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
"Could not find post restart method [" + post + "] \nin [" +
targetClass.getName + "]. \nIt must have a zero argument definition.") })
}
// See if we have any config define a shutdown callback
shutdownCallback match {
case None => {}
case Some(ShutdownCallback(down)) =>
zhutdown = Some(try {
targetInstance.getClass.getDeclaredMethod(down, ZERO_ITEM_CLASS_ARRAY: _*)
} catch { case e => throw new IllegalStateException(
"Could not find shutdown method [" + down + "] \nin [" +
targetClass.getName + "]. \nIt must have a zero argument definition.") })
}
// See if we have any annotation defined restart callbacks
if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart))
if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart))
// See if we have an annotation defined shutdown callback
if (!zhutdown.isDefined) zhutdown = methods.find(m => m.isAnnotationPresent(Annotations.shutdown))
if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0)
throw new IllegalStateException(
@ -708,9 +735,14 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
throw new IllegalStateException(
"Method annotated with @postrestart or defined as a restart callback in \n[" +
targetClass.getName + "] must have a zero argument definition")
if (zhutdown.isDefined && zhutdown.get.getParameterTypes.length != 0)
throw new IllegalStateException(
"Method annotated with @shutdown or defined as a shutdown callback in \n[" +
targetClass.getName + "] must have a zero argument definition")
if (preRestart.isDefined) preRestart.get.setAccessible(true)
if (postRestart.isDefined) postRestart.get.setAccessible(true)
if (zhutdown.isDefined) zhutdown.get.setAccessible(true)
// see if we have a method annotated with @inittransactionalstate, if so invoke it
initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate))
@ -770,6 +802,15 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op
}
}
override def shutdown = {
AspectInitRegistry.unregister(target.get);
try {
if (zhutdown.isDefined) {
zhutdown.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
}
} catch { case e: InvocationTargetException => throw e.getCause }
}
override def initTransactionalState = {
try {
if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)

View file

@ -831,10 +831,10 @@ sealed class LocalActorRef private[akka](
}
val builder = LifeCycleProtocol.newBuilder
lifeCycle match {
case Some(LifeCycle(scope, None)) =>
case Some(LifeCycle(scope, None, _)) =>
setScope(builder, scope)
Some(builder.build)
case Some(LifeCycle(scope, Some(callbacks))) =>
case Some(LifeCycle(scope, Some(callbacks), _)) =>
setScope(builder, scope)
builder.setPreRestart(callbacks.preRestart)
builder.setPostRestart(callbacks.postRestart)
@ -1314,7 +1314,7 @@ sealed class LocalActorRef private[akka](
val failedActor = actorInstance.get
failedActor.synchronized {
lifeCycle.get match {
case LifeCycle(scope, _) => {
case LifeCycle(scope, _, _) => {
scope match {
case Permanent =>
Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id)
@ -1343,7 +1343,7 @@ sealed class LocalActorRef private[akka](
linkedActorsAsList.foreach { actorRef =>
if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent))
actorRef.lifeCycle.get match {
case LifeCycle(scope, _) => {
case LifeCycle(scope, _, _) => {
scope match {
case Permanent => actorRef.restart(reason)
case Temporary => shutDownTemporaryActor(actorRef)

View file

@ -82,7 +82,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks))
val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired,
component.lifeCycle.restartCallbacks,
component.lifeCycle.shutdownCallback))
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)
@ -99,7 +101,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat
val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks))
val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired,
component.lifeCycle.restartCallbacks,
component.lifeCycle.shutdownCallback))
if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined)

View file

@ -43,13 +43,15 @@ object ScalaConfig {
case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme
case class LifeCycle(scope: Scope, callbacks: Option[RestartCallbacks]) extends ConfigElement
object LifeCycle {
def apply(scope: Scope) = new LifeCycle(scope, None)
}
case class LifeCycle(scope: Scope,
restartCallbacks: Option[RestartCallbacks] = None,
shutdownCallback: Option[ShutdownCallback] = None) extends ConfigElement
case class RestartCallbacks(preRestart: String, postRestart: String) {
if ((preRestart eq null) || (postRestart eq null)) throw new IllegalArgumentException("Restart callback methods can't be null")
}
case class ShutdownCallback(shutdown: String) {
if (shutdown eq null) throw new IllegalArgumentException("Shutdown callback method can't be null")
}
case object Permanent extends Scope
case object Temporary extends Scope
@ -136,17 +138,25 @@ object JavaConfig {
scheme.transform, maxNrOfRetries, withinTimeRange, trapExceptions.toList)
}
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val callbacks: RestartCallbacks) extends ConfigElement {
def this(scope: Scope) = this(scope, null)
class LifeCycle(@BeanProperty val scope: Scope,
@BeanProperty val restartCallbacks: RestartCallbacks,
@BeanProperty val shutdownCallback: ShutdownCallback) extends ConfigElement {
def this(scope: Scope) = this(scope, null, null)
def this(scope: Scope, restartCallbacks: RestartCallbacks) = this(scope, restartCallbacks, null)
def this(scope: Scope, shutdownCallback: ShutdownCallback) = this(scope, null, shutdownCallback)
def transform = {
val callbackOption = if (callbacks eq null) None else Some(callbacks.transform)
se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform, callbackOption)
val restartCallbacksOption = if (restartCallbacks eq null) None else Some(restartCallbacks.transform)
val shutdownCallbackOption = if (shutdownCallback eq null) None else Some(shutdownCallback.transform)
se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform, restartCallbacksOption, shutdownCallbackOption)
}
}
class RestartCallbacks(@BeanProperty val preRestart: String, @BeanProperty val postRestart: String) {
def transform = se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks(preRestart, postRestart)
}
class ShutdownCallback(@BeanProperty val shutdown: String) {
def transform = se.scalablesolutions.akka.config.ScalaConfig.ShutdownCallback(shutdown)
}
abstract class Scope extends ConfigElement {
def transform: se.scalablesolutions.akka.config.ScalaConfig.Scope