diff --git a/akka-core/src/main/java/se/scalablesolutions/akka/annotation/shutdown.java b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/shutdown.java new file mode 100644 index 0000000000..f806e7bca6 --- /dev/null +++ b/akka-core/src/main/java/se/scalablesolutions/akka/annotation/shutdown.java @@ -0,0 +1,14 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.actor.annotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface shutdown {} \ No newline at end of file diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index 0201391940..5fd5dad1c7 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -25,6 +25,7 @@ object Annotations { val transactionrequired = classOf[transactionrequired] val prerestart = classOf[prerestart] val postrestart = classOf[postrestart] + val shutdown = classOf[shutdown] val inittransactionalstate = classOf[inittransactionalstate] } @@ -358,7 +359,6 @@ object ActiveObject extends Logging { val proxy = Proxy.newInstance(target, true, false) val context = injectActiveObjectContext(proxy) actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context) - ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC actorRef.timeout = timeout if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout)) @@ -371,7 +371,6 @@ object ActiveObject extends Logging { val context = injectActiveObjectContext(target) val proxy = Proxy.newInstance(Array(intf), Array(target), true, false) actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context) - ActorRegistry.unregister(actorRef) // do not store the dispatcher in the ActorRegistry since it will prevent GC actorRef.timeout = timeout if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get) AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout)) @@ -379,6 +378,11 @@ object ActiveObject extends Logging { proxy.asInstanceOf[T] } + def stop(obj: AnyRef): Unit = { + val init = AspectInitRegistry.initFor(obj) + init.actorRef.stop + } + /** * Get the underlying dispatcher actor for the given active object. */ @@ -483,9 +487,7 @@ private[akka] object AspectInitRegistry extends ListenerManagement { private val initializations = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit] def initFor(target: AnyRef) = { - val init = initializations.get(target) - initializations.remove(target) - init + initializations.get(target) } def register(target: AnyRef, init: AspectInit) = { @@ -493,10 +495,17 @@ private[akka] object AspectInitRegistry extends ListenerManagement { foreachListener(_ ! AspectInitRegistered(target, init)) res } + + def unregister(target: AnyRef) = { + val res = initializations.remove(target) + foreachListener(_ ! AspectInitUnregistered(target, res)) + res + } } private[akka] sealed trait AspectInitRegistryEvent private[akka] case class AspectInitRegistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent +private[akka] case class AspectInitUnregistered(proxy: AnyRef, init: AspectInit) extends AspectInitRegistryEvent private[akka] sealed case class AspectInit( val target: Class[_], @@ -547,7 +556,10 @@ private[akka] sealed class ActiveObjectAspect { val isOneWay = isVoid(rtti) val sender = ActiveObjectContext.sender.value val senderFuture = ActiveObjectContext.senderFuture.value - if (isOneWay) { + + if (!actorRef.isRunning) { + joinPoint.proceed + } else if (isOneWay) { actorRef ! Invocation(joinPoint, true, true, sender, senderFuture) null.asInstanceOf[AnyRef] } else { @@ -656,10 +668,13 @@ object Dispatcher { * * @author Jonas Bonér */ -private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Option[RestartCallbacks]) extends Actor { +private[akka] class Dispatcher(transactionalRequired: Boolean, + var restartCallbacks: Option[RestartCallbacks], + var shutdownCallback: Option[ShutdownCallback] = None) extends Actor { import Dispatcher._ private[actor] var target: Option[AnyRef] = None + private var zhutdown: Option[Method] = None private var preRestart: Option[Method] = None private var postRestart: Option[Method] = None private var initTxState: Option[Method] = None @@ -681,7 +696,7 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op val methods = targetInstance.getClass.getDeclaredMethods.toList // See if we have any config define restart callbacks - callbacks match { + restartCallbacks match { case None => {} case Some(RestartCallbacks(pre, post)) => preRestart = Some(try { @@ -695,10 +710,22 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op "Could not find post restart method [" + post + "] \nin [" + targetClass.getName + "]. \nIt must have a zero argument definition.") }) } + // See if we have any config define a shutdown callback + shutdownCallback match { + case None => {} + case Some(ShutdownCallback(down)) => + zhutdown = Some(try { + targetInstance.getClass.getDeclaredMethod(down, ZERO_ITEM_CLASS_ARRAY: _*) + } catch { case e => throw new IllegalStateException( + "Could not find shutdown method [" + down + "] \nin [" + + targetClass.getName + "]. \nIt must have a zero argument definition.") }) + } // See if we have any annotation defined restart callbacks if (!preRestart.isDefined) preRestart = methods.find(m => m.isAnnotationPresent(Annotations.prerestart)) if (!postRestart.isDefined) postRestart = methods.find(m => m.isAnnotationPresent(Annotations.postrestart)) + // See if we have an annotation defined shutdown callback + if (!zhutdown.isDefined) zhutdown = methods.find(m => m.isAnnotationPresent(Annotations.shutdown)) if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0) throw new IllegalStateException( @@ -708,9 +735,14 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op throw new IllegalStateException( "Method annotated with @postrestart or defined as a restart callback in \n[" + targetClass.getName + "] must have a zero argument definition") + if (zhutdown.isDefined && zhutdown.get.getParameterTypes.length != 0) + throw new IllegalStateException( + "Method annotated with @shutdown or defined as a shutdown callback in \n[" + + targetClass.getName + "] must have a zero argument definition") if (preRestart.isDefined) preRestart.get.setAccessible(true) if (postRestart.isDefined) postRestart.get.setAccessible(true) + if (zhutdown.isDefined) zhutdown.get.setAccessible(true) // see if we have a method annotated with @inittransactionalstate, if so invoke it initTxState = methods.find(m => m.isAnnotationPresent(Annotations.inittransactionalstate)) @@ -770,6 +802,15 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, var callbacks: Op } } + override def shutdown = { + AspectInitRegistry.unregister(target.get); + try { + if (zhutdown.isDefined) { + zhutdown.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) + } + } catch { case e: InvocationTargetException => throw e.getCause } + } + override def initTransactionalState = { try { if (initTxState.isDefined && target.isDefined) initTxState.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*) diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index d07e18a314..bc72a2768e 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -831,10 +831,10 @@ sealed class LocalActorRef private[akka]( } val builder = LifeCycleProtocol.newBuilder lifeCycle match { - case Some(LifeCycle(scope, None)) => + case Some(LifeCycle(scope, None, _)) => setScope(builder, scope) Some(builder.build) - case Some(LifeCycle(scope, Some(callbacks))) => + case Some(LifeCycle(scope, Some(callbacks), _)) => setScope(builder, scope) builder.setPreRestart(callbacks.preRestart) builder.setPostRestart(callbacks.postRestart) @@ -1314,7 +1314,7 @@ sealed class LocalActorRef private[akka]( val failedActor = actorInstance.get failedActor.synchronized { lifeCycle.get match { - case LifeCycle(scope, _) => { + case LifeCycle(scope, _, _) => { scope match { case Permanent => Actor.log.info("Restarting actor [%s] configured as PERMANENT.", id) @@ -1343,7 +1343,7 @@ sealed class LocalActorRef private[akka]( linkedActorsAsList.foreach { actorRef => if (actorRef.lifeCycle.isEmpty) actorRef.lifeCycle = Some(LifeCycle(Permanent)) actorRef.lifeCycle.get match { - case LifeCycle(scope, _) => { + case LifeCycle(scope, _, _) => { scope match { case Permanent => actorRef.restart(reason) case Temporary => shutDownTemporaryActor(actorRef) diff --git a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala index 8d3a089d26..15e9852181 100644 --- a/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala +++ b/akka-core/src/main/scala/config/ActiveObjectGuiceConfigurator.scala @@ -82,7 +82,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat private def newSubclassingProxy(component: Component): DependencyBinding = { val targetClass = component.target - val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) + val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, + component.lifeCycle.restartCallbacks, + component.lifeCycle.shutdownCallback)) if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get val remoteAddress = if (component.remoteAddress.isDefined) @@ -99,7 +101,9 @@ private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurat val targetClass = component.intf.get val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true) - val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, component.lifeCycle.callbacks)) + val actorRef = Actor.actorOf(new Dispatcher(component.transactionRequired, + component.lifeCycle.restartCallbacks, + component.lifeCycle.shutdownCallback)) if (component.dispatcher.isDefined) actorRef.dispatcher = component.dispatcher.get val remoteAddress = if (component.remoteAddress.isDefined) diff --git a/akka-core/src/main/scala/config/SupervisionConfig.scala b/akka-core/src/main/scala/config/SupervisionConfig.scala index dc04d9f38c..11ce0fe69e 100644 --- a/akka-core/src/main/scala/config/SupervisionConfig.scala +++ b/akka-core/src/main/scala/config/SupervisionConfig.scala @@ -43,13 +43,15 @@ object ScalaConfig { case object AllForOne extends FailOverScheme case object OneForOne extends FailOverScheme - case class LifeCycle(scope: Scope, callbacks: Option[RestartCallbacks]) extends ConfigElement - object LifeCycle { - def apply(scope: Scope) = new LifeCycle(scope, None) - } + case class LifeCycle(scope: Scope, + restartCallbacks: Option[RestartCallbacks] = None, + shutdownCallback: Option[ShutdownCallback] = None) extends ConfigElement case class RestartCallbacks(preRestart: String, postRestart: String) { if ((preRestart eq null) || (postRestart eq null)) throw new IllegalArgumentException("Restart callback methods can't be null") } + case class ShutdownCallback(shutdown: String) { + if (shutdown eq null) throw new IllegalArgumentException("Shutdown callback method can't be null") + } case object Permanent extends Scope case object Temporary extends Scope @@ -136,17 +138,25 @@ object JavaConfig { scheme.transform, maxNrOfRetries, withinTimeRange, trapExceptions.toList) } - class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val callbacks: RestartCallbacks) extends ConfigElement { - def this(scope: Scope) = this(scope, null) + class LifeCycle(@BeanProperty val scope: Scope, + @BeanProperty val restartCallbacks: RestartCallbacks, + @BeanProperty val shutdownCallback: ShutdownCallback) extends ConfigElement { + def this(scope: Scope) = this(scope, null, null) + def this(scope: Scope, restartCallbacks: RestartCallbacks) = this(scope, restartCallbacks, null) + def this(scope: Scope, shutdownCallback: ShutdownCallback) = this(scope, null, shutdownCallback) def transform = { - val callbackOption = if (callbacks eq null) None else Some(callbacks.transform) - se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform, callbackOption) + val restartCallbacksOption = if (restartCallbacks eq null) None else Some(restartCallbacks.transform) + val shutdownCallbackOption = if (shutdownCallback eq null) None else Some(shutdownCallback.transform) + se.scalablesolutions.akka.config.ScalaConfig.LifeCycle(scope.transform, restartCallbacksOption, shutdownCallbackOption) } } class RestartCallbacks(@BeanProperty val preRestart: String, @BeanProperty val postRestart: String) { def transform = se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks(preRestart, postRestart) } + class ShutdownCallback(@BeanProperty val shutdown: String) { + def transform = se.scalablesolutions.akka.config.ScalaConfig.ShutdownCallback(shutdown) + } abstract class Scope extends ConfigElement { def transform: se.scalablesolutions.akka.config.ScalaConfig.Scope