closes #314 akka-spring to support active object lifecycle management

closes #315 akka-spring to support configuration of shutdown callback method
This commit is contained in:
Martin Krasser 2010-07-06 07:00:04 +02:00
parent 460dcfe516
commit fd9fbb1b05
12 changed files with 168 additions and 90 deletions

View file

@ -37,6 +37,7 @@ object Annotations {
final class ActiveObjectConfiguration {
private[akka] var _timeout: Long = Actor.TIMEOUT
private[akka] var _restartCallbacks: Option[RestartCallbacks] = None
private[akka] var _shutdownCallback: Option[ShutdownCallback] = None
private[akka] var _transactionRequired = false
private[akka] var _host: Option[InetSocketAddress] = None
private[akka] var _messageDispatcher: Option[MessageDispatcher] = None
@ -51,6 +52,11 @@ final class ActiveObjectConfiguration {
this
}
def shutdownCallback(down: String) : ActiveObjectConfiguration = {
_shutdownCallback = Some(new ShutdownCallback(down))
this
}
def makeTransactionRequired() : ActiveObjectConfiguration = {
_transactionRequired = true;
this
@ -153,25 +159,25 @@ object ActiveObject extends Logging {
private[actor] val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
def newInstance[T](target: Class[T], timeout: Long): T =
newInstance(target, actorOf(new Dispatcher(false, None)), None, timeout)
newInstance(target, actorOf(new Dispatcher(false)), None, timeout)
def newInstance[T](target: Class[T]): T =
newInstance(target, actorOf(new Dispatcher(false, None)), None, Actor.TIMEOUT)
newInstance(target, actorOf(new Dispatcher(false)), None, Actor.TIMEOUT)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
newInstance(intf, target, actorOf(new Dispatcher(false, None)), None, timeout)
newInstance(intf, target, actorOf(new Dispatcher(false)), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef): T =
newInstance(intf, target, actorOf(new Dispatcher(false, None)), None, Actor.TIMEOUT)
newInstance(intf, target, actorOf(new Dispatcher(false)), None, Actor.TIMEOUT)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
newInstance(target, actorOf(new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), timeout)
newInstance(target, actorOf(new Dispatcher(false)), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](target: Class[T], hostname: String, port: Int): T =
newInstance(target, actorOf(new Dispatcher(false, None)), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT)
newInstance(target, actorOf(new Dispatcher(false)), Some(new InetSocketAddress(hostname, port)), Actor.TIMEOUT)
def newInstance[T](target: Class[T], config: ActiveObjectConfiguration): T = {
val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks))
val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks, config._shutdownCallback))
if (config._messageDispatcher.isDefined) {
actor.dispatcher = config._messageDispatcher.get
}
@ -179,7 +185,7 @@ object ActiveObject extends Logging {
}
def newInstance[T](intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration): T = {
val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks))
val actor = actorOf(new Dispatcher(config._transactionRequired, config._restartCallbacks, config._shutdownCallback))
if (config._messageDispatcher.isDefined) {
actor.dispatcher = config._messageDispatcher.get
}
@ -515,8 +521,6 @@ private[akka] sealed case class AspectInit(
def this(target: Class[_], actorRef: ActorRef, timeout: Long) = this(target, actorRef, None, timeout)
}
// FIXME: add @shutdown callback to ActiveObject in which we get the Aspect through 'Aspects.aspectOf(MyAspect.class, targetInstance)' and shuts down the Dispatcher actor
/**
* AspectWerkz Aspect that is turning POJOs into Active Object.
* Is deployed on a 'per-instance' basis.
@ -671,7 +675,7 @@ object Dispatcher {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] class Dispatcher(transactionalRequired: Boolean,
var restartCallbacks: Option[RestartCallbacks],
var restartCallbacks: Option[RestartCallbacks] = None,
var shutdownCallback: Option[ShutdownCallback] = None) extends Actor {
import Dispatcher._
@ -805,12 +809,15 @@ private[akka] class Dispatcher(transactionalRequired: Boolean,
}
override def shutdown = {
AspectInitRegistry.unregister(target.get);
try {
if (zhutdown.isDefined) {
zhutdown.get.invoke(target.get, ZERO_ITEM_OBJECT_ARRAY: _*)
}
} catch { case e: InvocationTargetException => throw e.getCause }
} catch {
case e: InvocationTargetException => throw e.getCause
} finally {
AspectInitRegistry.unregister(target.get);
}
}
override def initTransactionalState = {

View file

@ -143,5 +143,13 @@ class ActiveObjectLifecycleSpec extends Spec with ShouldMatchers with BeforeAndA
case e: Exception => { /* test passed */ }
}
}
it("should shutdown non-supervised, non-initialized active object on ActiveObject.stop") {
val obj = ActiveObject.newInstance(classOf[SamplePojoAnnotated])
ActiveObject.stop(obj)
assert(!obj._pre)
assert(!obj._post)
assert(obj._down)
}
}
}

View file

@ -105,7 +105,7 @@
</xsd:attribute>
</xsd:complexType>
<!-- callbacks -->
<!-- restart callbacks -->
<xsd:complexType name="restart-callbacks-type">
<xsd:attribute name="pre" type="xsd:string">
<xsd:annotation>
@ -123,11 +123,23 @@
</xsd:attribute>
</xsd:complexType>
<!-- shutdown callbacks -->
<xsd:complexType name="shutdown-callback-type">
<xsd:attribute name="method" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Shutdown callback method that is called during shut down.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- active object -->
<xsd:complexType name="active-object-type">
<xsd:sequence>
<xsd:element name="remote" type="remote-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="restart-callbacks" type="restart-callbacks-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="shutdown-callback" type="shutdown-callback-type" minOccurs="0" maxOccurs="1"/>
<xsd:element name="dispatcher" type="dispatcher-type" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="dispatcher" minOccurs="0" maxOccurs="1"/>
<xsd:element ref="beans:property" minOccurs="0" maxOccurs="unbounded"/>

View file

@ -5,18 +5,20 @@
package se.scalablesolutions.akka.spring
import java.beans.PropertyDescriptor
import java.lang.reflect.Method
import reflect.BeanProperty
import org.springframework.beans.BeanWrapperImpl
import org.springframework.beans.BeanWrapper
import org.springframework.beans.BeanUtils
import org.springframework.util.ReflectionUtils
import org.springframework.util.StringUtils
import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.config.AbstractFactoryBean
import se.scalablesolutions.akka.actor.ActiveObject
import reflect.BeanProperty
import se.scalablesolutions.akka.config.ScalaConfig.RestartCallbacks
import org.springframework.util.ReflectionUtils
import org.springframework.util.StringUtils
import se.scalablesolutions.akka.actor.{ActiveObjectConfiguration, ActiveObject}
import se.scalablesolutions.akka.config.ScalaConfig.{ShutdownCallback, RestartCallbacks}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
import se.scalablesolutions.akka.util.Logging
@ -25,6 +27,7 @@ import se.scalablesolutions.akka.util.Logging
*
* @author michaelkober
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
* @author Martin Krasser
*/
class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
import StringReflect._
@ -36,6 +39,7 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
@BeanProperty var transactional: Boolean = false
@BeanProperty var pre: String = ""
@BeanProperty var post: String = ""
@BeanProperty var shutdown: String = ""
@BeanProperty var host: String = ""
@BeanProperty var port: Int = _
@BeanProperty var lifecycle: String = ""
@ -67,14 +71,19 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
if (hasInterface) argumentList += "i"
if (hasDispatcher) argumentList += "d"
setProperties(
create(argumentList))
}
setProperties(create(argumentList))
}
/**
* This method manages <property/> element by injecting either
* values (<property value="value"/>) and bean references (<property ref="beanId"/>)
* Stop the active object if it is a singleton.
*/
override def destroy = {
if(scope.equals(VAL_SCOPE_SINGLETON)) {
ActiveObject.stop(getObject)
}
super.destroy
}
private def setProperties(ref:AnyRef) : AnyRef = {
log.debug("Processing properties and dependencies for target class %s",target)
val beanWrapper = new BeanWrapperImpl(ref);
@ -97,60 +106,45 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging {
ref
}
// TODO: check if this works in 2.8 (type inferred to Nothing instead of AnyRef here)
//
// private[akka] def create(argList : String) : AnyRef = argList match {
// case "r" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks)
// case "ri" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, host, port, callbacks)
// case "rd" => ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
// case "rid" => ActiveObject.newRemoteInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
// case "i" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, callbacks)
// case "id" => ActiveObject.newInstance(interface.toClass, target.toClass, timeout, transactional, dispatcherInstance, callbacks)
// case "d" => ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks)
// case _ => ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks)
// }
private[akka] def create(argList : String) : AnyRef = {
if (argList == "r") {
ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, host, port, callbacks)
ActiveObject.newInstance(target.toClass, createConfig.makeRemote(host, port))
} else if (argList == "ri" ) {
ActiveObject.newRemoteInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, host, port, callbacks)
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.makeRemote(host, port))
} else if (argList == "rd") {
ActiveObject.newRemoteInstance(target.toClass, timeout, transactional, dispatcherInstance, host, port, callbacks)
ActiveObject.newInstance(target.toClass, createConfig.makeRemote(host, port).dispatcher(dispatcherInstance))
} else if (argList == "rid") {
ActiveObject.newRemoteInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, dispatcherInstance, host, port, callbacks)
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.makeRemote(host, port).dispatcher(dispatcherInstance))
} else if (argList == "i") {
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, callbacks)
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig)
} else if (argList == "id") {
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), timeout, transactional, dispatcherInstance, callbacks)
ActiveObject.newInstance(interface.toClass, aNewInstance(target.toClass), createConfig.dispatcher(dispatcherInstance))
} else if (argList == "d") {
ActiveObject.newInstance(target.toClass, timeout, transactional, dispatcherInstance, callbacks)
ActiveObject.newInstance(target.toClass, createConfig.dispatcher(dispatcherInstance))
} else {
ActiveObject.newInstance(target.toClass, timeout, transactional, callbacks)
ActiveObject.newInstance(target.toClass, createConfig)
}
}
def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = {
private[akka] def createConfig: ActiveObjectConfiguration = {
val config = new ActiveObjectConfiguration().timeout(timeout)
if (hasRestartCallbacks) config.restartCallbacks(pre, post)
if (hasShutdownCallback) config.shutdownCallback(shutdown)
if (transactional) config.makeTransactionRequired
config
}
private[akka] def aNewInstance[T <: AnyRef](clazz: Class[T]) : T = {
clazz.newInstance().asInstanceOf[T]
}
/**
* create Option[RestartCallback]
*/
private def callbacks: Option[RestartCallbacks] = {
if (hasCallbacks) {
val callbacks = new RestartCallbacks(pre, post)
Some(callbacks)
} else {
None
}
}
private[akka] def isRemote = (host != null) && (!host.isEmpty)
private[akka] def hasInterface = (interface != null) && (!interface.isEmpty)
private[akka] def hasCallbacks = ((pre != null) && !pre.isEmpty) || ((post != null) && !post.isEmpty)
private[akka] def hasRestartCallbacks = ((pre != null) && !pre.isEmpty) || ((post != null) && !post.isEmpty)
private[akka] def hasShutdownCallback = ((shutdown != null) && !shutdown.isEmpty)
private[akka] def hasDispatcher = (dispatcher != null) && (dispatcher.dispatcherType != null) && (!dispatcher.dispatcherType.isEmpty)

View file

@ -13,6 +13,7 @@ import se.scalablesolutions.akka.actor.IllegalActorStateException
* Parser trait for custom namespace configuration for active-object.
* @author michaelkober
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
* @author Martin Krasser
*/
trait ActiveObjectParser extends BeanParser with DispatcherParser {
import AkkaSpringConfigurationTags._
@ -25,7 +26,8 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser {
def parseActiveObject(element: Element): ActiveObjectProperties = {
val objectProperties = new ActiveObjectProperties()
val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG);
val callbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG);
val restartCallbacksElement = DomUtils.getChildElementByTagName(element, RESTART_CALLBACKS_TAG);
val shutdownCallbackElement = DomUtils.getChildElementByTagName(element, SHUTDOWN_CALLBACK_TAG);
val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG)
val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG)
@ -34,14 +36,18 @@ trait ActiveObjectParser extends BeanParser with DispatcherParser {
objectProperties.port = mandatory(remoteElement, PORT).toInt
}
if (callbacksElement != null) {
objectProperties.preRestart = callbacksElement.getAttribute(PRE_RESTART)
objectProperties.postRestart = callbacksElement.getAttribute(POST_RESTART)
if (restartCallbacksElement != null) {
objectProperties.preRestart = restartCallbacksElement.getAttribute(PRE_RESTART)
objectProperties.postRestart = restartCallbacksElement.getAttribute(POST_RESTART)
if ((objectProperties.preRestart.isEmpty) && (objectProperties.preRestart.isEmpty)) {
throw new IllegalActorStateException("At least one of pre or post must be defined.")
}
}
if (shutdownCallbackElement != null) {
objectProperties.shutdown = shutdownCallbackElement.getAttribute("method")
}
if (dispatcherElement != null) {
val dispatcherProperties = parseDispatcher(dispatcherElement)
objectProperties.dispatcher = dispatcherProperties

View file

@ -10,6 +10,7 @@ import AkkaSpringConfigurationTags._
/**
* Data container for active object configuration data.
* @author michaelkober
* @author Martin Krasser
*/
class ActiveObjectProperties {
var target: String = ""
@ -18,10 +19,11 @@ class ActiveObjectProperties {
var transactional: Boolean = false
var preRestart: String = ""
var postRestart: String = ""
var shutdown: String = ""
var host: String = ""
var port: Int = _
var lifecycle: String = ""
var scope:String = ""
var scope:String = VAL_SCOPE_SINGLETON
var dispatcher: DispatcherProperties = _
var propertyEntries = new PropertyEntries()
@ -35,6 +37,7 @@ class ActiveObjectProperties {
builder.addPropertyValue(PORT, port)
builder.addPropertyValue(PRE_RESTART, preRestart)
builder.addPropertyValue(POST_RESTART, postRestart)
builder.addPropertyValue(SHUTDOWN, shutdown)
builder.addPropertyValue(TIMEOUT, timeout)
builder.addPropertyValue(TARGET, target)
builder.addPropertyValue(INTERFACE, interface)

View file

@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring
/**
* XML configuration tags.
* @author michaelkober
* @author Martin Krasser
*/
object AkkaSpringConfigurationTags {
@ -20,6 +21,7 @@ object AkkaSpringConfigurationTags {
// active-object sub tags
val RESTART_CALLBACKS_TAG = "restart-callbacks"
val SHUTDOWN_CALLBACK_TAG = "shutdown-callback"
val REMOTE_TAG = "remote"
// superivision sub tags
@ -45,6 +47,7 @@ object AkkaSpringConfigurationTags {
val PORT = "port"
val PRE_RESTART = "pre"
val POST_RESTART = "post"
val SHUTDOWN = "shutdown"
val LIFECYCLE = "lifecycle"
val SCOPE = "scope"

View file

@ -1,9 +1,22 @@
package se.scalablesolutions.akka.spring;
import se.scalablesolutions.akka.actor.annotation.shutdown;
public class SampleBean {
public boolean down;
public SampleBean() {
down = false;
}
public String foo(String s) {
return "hello " + s;
}
@shutdown
public void shutdown() {
down = true;
}
}

View file

@ -7,6 +7,9 @@
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-0.10.xsd">
<akka:active-object id="bean-singleton" target="se.scalablesolutions.akka.spring.SampleBean" timeout="1000"/>
<akka:active-object id="bean-prototype" target="se.scalablesolutions.akka.spring.SampleBean" timeout="1000" scope="prototype"/>
<akka:active-object id="bean"
target="org.springframework.core.io.ResourceEditor"
transactional="true"
@ -18,4 +21,4 @@
<bean id="string" class="java.lang.String">
<constructor-arg value="someString"/>
</bean>
</beans>
</beans>

View file

@ -54,8 +54,8 @@ class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers {
</akka:active-object>
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
assert(props.dispatcher.dispatcherType == "thread-based")
}
assert(props.dispatcher.dispatcherType === "thread-based")
}
it("should parse remote ActiveObjects configuration") {
val xml = <akka:active-object id="remote active-object" target="se.scalablesolutions.akka.spring.foo.MyPojo"
@ -64,8 +64,8 @@ class ActiveObjectBeanDefinitionParserTest extends Spec with ShouldMatchers {
</akka:active-object>
val props = parser.parseActiveObject(dom(xml).getDocumentElement);
assert(props != null)
assert(props.host == "com.some.host")
assert(props.port == 9999)
assert(props.host === "com.some.host")
assert(props.port === 9999)
}
}
}

View file

@ -70,5 +70,21 @@ class ActiveObjectFactoryBeanTest extends Spec with ShouldMatchers {
val target:ResourceEditor = ctx.getBean("bean").asInstanceOf[ResourceEditor]
assert(target.getSource === "someString")
}
it("should stop the created active object when scope is singleton and the context is closed") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val target = ctx.getBean("bean-singleton").asInstanceOf[SampleBean]
assert(!target.down)
ctx.close
assert(target.down)
}
it("should not stop the created active object when scope is prototype and the context is closed") {
var ctx = new ClassPathXmlApplicationContext("appContext.xml");
val target = ctx.getBean("bean-prototype").asInstanceOf[SampleBean]
assert(!target.down)
ctx.close
assert(!target.down)
}
}
}

View file

@ -49,11 +49,21 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
parser.parseSupervisor(createSupervisorElement, builder);
val supervised = builder.getBeanDefinition.getPropertyValues.getPropertyValue("supervised").getValue.asInstanceOf[List[ActiveObjectProperties]]
assert(supervised != null)
expect(3) { supervised.length }
expect(4) { supervised.length }
val iterator = supervised.iterator
expect("foo.bar.Foo") { iterator.next.target }
expect("foo.bar.Bar") { iterator.next.target }
expect("foo.bar.MyPojo") { iterator.next.target }
val prop1 = iterator.next
val prop2 = iterator.next
val prop3 = iterator.next
val prop4 = iterator.next
expect("foo.bar.Foo") { prop1.target }
expect("foo.bar.Bar") { prop2.target }
expect("foo.bar.MyPojo") { prop3.target }
expect("foo.bar.MyPojo") { prop4.target }
expect("preRestart") { prop3.preRestart }
expect("postRestart") { prop3.postRestart }
expect("shutdown") { prop4.shutdown }
expect("permanent") { prop1.lifecycle }
expect("temporary") { prop4.lifecycle }
}
it("should throw IllegalArgumentException on missing mandatory attributes") {
@ -87,6 +97,9 @@ class SupervisionBeanDefinitionParserTest extends Spec with ShouldMatchers {
<akka:active-object target="foo.bar.MyPojo" lifecycle="temporary" timeout="1000">
<akka:restart-callbacks pre="preRestart" post="postRestart"/>
</akka:active-object>
<akka:active-object target="foo.bar.MyPojo" lifecycle="temporary" timeout="1000">
<akka:shutdown-callback method="shutdown"/>
</akka:active-object>
</akka:active-objects>
</akka:supervision>
dom(xml).getDocumentElement