From bfb612908b092ee81c34bc3bfc6612744d2ceb22 Mon Sep 17 00:00:00 2001 From: Michael Kober Date: Thu, 9 Sep 2010 10:42:03 +0200 Subject: [PATCH] closing ticket 378 --- .../src/main/scala/remote/RemoteServer.scala | 5 + .../akka/spring/akka-1.0-SNAPSHOT.xsd | 62 ++++++- .../scala/ActorBeanDefinitionParser.scala | 72 +++++++ .../src/main/scala/ActorFactoryBean.scala | 73 +++++++- akka-spring/src/main/scala/ActorParser.scala | 175 +++++++++++++++++- .../src/main/scala/ActorProperties.scala | 29 ++- .../src/main/scala/AkkaNamespaceHandler.scala | 11 +- .../scala/AkkaSpringConfigurationTags.scala | 7 + akka-spring/src/main/scala/BeanParser.scala | 42 ----- .../src/main/scala/DispatcherParser.scala | 101 ---------- .../src/main/scala/PropertyEntries.scala | 16 ++ .../src/main/scala/PropertyEntry.scala | 19 -- .../TypedActorBeanDefinitionParser.scala | 31 ---- .../UntypedActorBeanDefinitionParser.scala | 31 ---- .../akka/spring/foo/IMyPojo.java | 10 +- .../akka/spring/foo/MyPojo.java | 52 +++--- .../akka/spring/foo/PingActor.java | 10 +- .../test/resources/server-managed-config.xml | 57 ++++++ .../src/test/resources/typed-actor-config.xml | 2 +- .../test/resources/untyped-actor-config.xml | 2 +- .../TypedActorBeanDefinitionParserTest.scala | 16 +- .../scala/TypedActorSpringFeatureTest.scala | 123 +++++++++--- .../scala/UntypedActorSpringFeatureTest.scala | 140 ++++++++++---- .../src/main/scala/actor/TypedActor.scala | 3 +- .../src/test/resources/META-INF/aop.xml | 1 + 25 files changed, 741 insertions(+), 349 deletions(-) create mode 100644 akka-spring/src/main/scala/ActorBeanDefinitionParser.scala delete mode 100644 akka-spring/src/main/scala/BeanParser.scala delete mode 100644 akka-spring/src/main/scala/DispatcherParser.scala delete mode 100644 akka-spring/src/main/scala/PropertyEntry.scala delete mode 100644 akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala delete mode 100644 akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala create mode 100644 akka-spring/src/test/resources/server-managed-config.xml diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index bf9b38ca1b..fa57bda71b 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -271,6 +271,11 @@ class RemoteServer extends Logging with ListenerManagement { } } + /** + * Register typed actor by interface name. + */ + def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor) + /** * Register remote typed actor by a specific id. * @param id custom actor id diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd index c3d7608bee..80b37c41f5 100644 --- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd +++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd @@ -66,6 +66,14 @@ + + + + + + + + @@ -107,6 +115,20 @@ + + + + Management type for remote actors: client managed or server managed. + + + + + + + Custom service name for server managed actor. + + + @@ -135,7 +157,7 @@ - Theh default timeout for '!!' invocations. + The default timeout for '!!' invocations. @@ -229,6 +251,41 @@ + + + + + + + + Name of the remote host. + + + + + + + Port of the remote host. + + + + + + + Custom service name or class name for the server managed actor. + + + + + + + Name of the interface the typed actor implements. + + + + + + @@ -294,4 +351,7 @@ + + + diff --git a/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala new file mode 100644 index 0000000000..55aa82b8e4 --- /dev/null +++ b/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala @@ -0,0 +1,72 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.spring + +import org.springframework.beans.factory.support.BeanDefinitionBuilder +import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser +import org.springframework.beans.factory.xml.ParserContext +import AkkaSpringConfigurationTags._ +import org.w3c.dom.Element + + +/** + * Parser for custom namespace configuration. + * @author michaelkober + */ +class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) + */ + override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { + val typedActorConf = parseActor(element) + typedActorConf.typed = TYPED_ACTOR_TAG + typedActorConf.setAsProperties(builder) + } + + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) + */ + override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] +} + + +/** + * Parser for custom namespace configuration. + * @author michaelkober + */ +class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) + */ + override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { + val untypedActorConf = parseActor(element) + untypedActorConf.typed = UNTYPED_ACTOR_TAG + untypedActorConf.setAsProperties(builder) + } + + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) + */ + override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] +} + + +/** + * Parser for custom namespace configuration. + * @author michaelkober + */ +class ActorForBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorForParser { + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) + */ + override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { + val actorForConf = parseActorFor(element) + actorForConf.setAsProperties(builder) + } + + /* + * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) + */ + override def getBeanClass(element: Element): Class[_] = classOf[ActorForFactoryBean] +} diff --git a/akka-spring/src/main/scala/ActorFactoryBean.scala b/akka-spring/src/main/scala/ActorFactoryBean.scala index 11d5274a70..c47efcdb78 100644 --- a/akka-spring/src/main/scala/ActorFactoryBean.scala +++ b/akka-spring/src/main/scala/ActorFactoryBean.scala @@ -4,22 +4,19 @@ package se.scalablesolutions.akka.spring -import java.beans.PropertyDescriptor -import java.lang.reflect.Method -import javax.annotation.PreDestroy -import javax.annotation.PostConstruct - import org.springframework.beans.{BeanUtils,BeansException,BeanWrapper,BeanWrapperImpl} -import org.springframework.beans.factory.BeanFactory +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer} +//import org.springframework.beans.factory.BeanFactory import org.springframework.beans.factory.config.AbstractFactoryBean import org.springframework.context.{ApplicationContext,ApplicationContextAware} -import org.springframework.util.ReflectionUtils +//import org.springframework.util.ReflectionUtils import org.springframework.util.StringUtils import se.scalablesolutions.akka.actor.{ActorRef, AspectInitRegistry, TypedActorConfiguration, TypedActor,Actor} import se.scalablesolutions.akka.dispatch.MessageDispatcher import se.scalablesolutions.akka.util.{Logging, Duration} import scala.reflect.BeanProperty +import java.net.InetSocketAddress /** * Exception to use when something goes wrong during bean creation. @@ -49,6 +46,8 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App @BeanProperty var transactional: Boolean = false @BeanProperty var host: String = "" @BeanProperty var port: Int = _ + @BeanProperty var serverManaged: Boolean = false + @BeanProperty var serviceName: String = "" @BeanProperty var lifecycle: String = "" @BeanProperty var dispatcher: DispatcherProperties = _ @BeanProperty var scope: String = VAL_SCOPE_SINGLETON @@ -94,7 +93,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App if (implementation == null || implementation == "") throw new AkkaBeansException( "The 'implementation' part of the 'akka:typed-actor' element in the Spring config file can't be null or empty string") - TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig) + val typedActor: AnyRef = TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig) + if (isRemote && serverManaged) { + val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port)) + if (serviceName.isEmpty) { + server.registerTypedActor(interface, typedActor) + } else { + server.registerTypedActor(serviceName, typedActor) + } + } + typedActor } /** @@ -111,7 +119,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App actorRef.makeTransactionRequired } if (isRemote) { - actorRef.makeRemote(host, port) + if (serverManaged) { + val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port)) + if (serviceName.isEmpty) { + server.register(actorRef) + } else { + server.register(serviceName, actorRef) + } + } else { + actorRef.makeRemote(host, port) + } } if (hasDispatcher) { if (dispatcher.dispatcherType != THREAD_BASED){ @@ -159,7 +176,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App private[akka] def createConfig: TypedActorConfiguration = { val config = new TypedActorConfiguration().timeout(Duration(timeout, "millis")) if (transactional) config.makeTransactionRequired - if (isRemote) config.makeRemote(host, port) + if (isRemote && !serverManaged) config.makeRemote(host, port) if (hasDispatcher) { if (dispatcher.dispatcherType != THREAD_BASED) { config.dispatcher(dispatcherInstance()) @@ -191,3 +208,39 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App } } } + +/** + * Factory bean for remote client actor-for. + * + * @author michaelkober + */ +class ActorForFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware { + import StringReflect._ + import AkkaSpringConfigurationTags._ + + @BeanProperty var interface: String = "" + @BeanProperty var host: String = "" + @BeanProperty var port: Int = _ + @BeanProperty var serviceName: String = "" + //@BeanProperty var scope: String = VAL_SCOPE_SINGLETON + @BeanProperty var applicationContext: ApplicationContext = _ + + override def isSingleton = false + + /* + * @see org.springframework.beans.factory.FactoryBean#getObjectType() + */ + def getObjectType: Class[AnyRef] = classOf[AnyRef] + + /* + * @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance() + */ + def createInstance: AnyRef = { + if (interface.isEmpty) { + RemoteClient.actorFor(serviceName, host, port) + } else { + RemoteClient.typedActorFor(interface.toClass, serviceName, host, port) + } + } +} + diff --git a/akka-spring/src/main/scala/ActorParser.scala b/akka-spring/src/main/scala/ActorParser.scala index 69073bd52f..8736b807d1 100644 --- a/akka-spring/src/main/scala/ActorParser.scala +++ b/akka-spring/src/main/scala/ActorParser.scala @@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring import org.springframework.util.xml.DomUtils import org.w3c.dom.Element import scala.collection.JavaConversions._ +import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.actor.IllegalActorStateException @@ -27,11 +28,17 @@ trait ActorParser extends BeanParser with DispatcherParser { val objectProperties = new ActorProperties() val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG); val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG) - val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG) + val propertyEntries = DomUtils.getChildElementsByTagName(element, PROPERTYENTRY_TAG) if (remoteElement != null) { objectProperties.host = mandatory(remoteElement, HOST) objectProperties.port = mandatory(remoteElement, PORT).toInt + objectProperties.serverManaged = (remoteElement.getAttribute(MANAGED_BY) != null) && (remoteElement.getAttribute(MANAGED_BY).equals(SERVER_MANAGED)) + val serviceName = remoteElement.getAttribute(SERVICE_NAME) + if ((serviceName != null) && (!serviceName.isEmpty)) { + objectProperties.serviceName = serviceName + objectProperties.serverManaged = true + } } if (dispatcherElement != null) { @@ -43,7 +50,7 @@ trait ActorParser extends BeanParser with DispatcherParser { val entry = new PropertyEntry entry.name = element.getAttribute("name"); entry.value = element.getAttribute("value") - entry.ref = element.getAttribute("ref") + entry.ref = element.getAttribute("ref") objectProperties.propertyEntries.add(entry) } @@ -59,15 +66,13 @@ trait ActorParser extends BeanParser with DispatcherParser { objectProperties.target = mandatory(element, IMPLEMENTATION) objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean - if (!element.getAttribute(INTERFACE).isEmpty) { + if (element.hasAttribute(INTERFACE)) { objectProperties.interface = element.getAttribute(INTERFACE) } - - if (!element.getAttribute(LIFECYCLE).isEmpty) { + if (element.hasAttribute(LIFECYCLE)) { objectProperties.lifecycle = element.getAttribute(LIFECYCLE) } - - if (!element.getAttribute(SCOPE).isEmpty) { + if (element.hasAttribute(SCOPE)) { objectProperties.scope = element.getAttribute(SCOPE) } @@ -75,3 +80,159 @@ trait ActorParser extends BeanParser with DispatcherParser { } } + +/** + * Parser trait for custom namespace configuration for RemoteClient actor-for. + * @author michaelkober + */ +trait ActorForParser extends BeanParser { + import AkkaSpringConfigurationTags._ + + /** + * Parses the given element and returns a ActorForProperties. + * @param element dom element to parse + * @return configuration for the typed actor + */ + def parseActorFor(element: Element): ActorForProperties = { + val objectProperties = new ActorForProperties() + + objectProperties.host = mandatory(element, HOST) + objectProperties.port = mandatory(element, PORT).toInt + objectProperties.serviceName = mandatory(element, SERVICE_NAME) + if (element.hasAttribute(INTERFACE)) { + objectProperties.interface = element.getAttribute(INTERFACE) + } + objectProperties + } + +} + +/** + * Base trait with utility methods for bean parsing. + */ +trait BeanParser extends Logging { + + /** + * Get a mandatory element attribute. + * @param element the element with the mandatory attribute + * @param attribute name of the mandatory attribute + */ + def mandatory(element: Element, attribute: String): String = { + if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) { + throw new IllegalArgumentException("Mandatory attribute missing: " + attribute) + } else { + element.getAttribute(attribute) + } + } + + /** + * Get a mandatory child element. + * @param element the parent element + * @param childName name of the mandatory child element + */ + def mandatoryElement(element: Element, childName: String): Element = { + val childElement = DomUtils.getChildElementByTagName(element, childName); + if (childElement == null) { + throw new IllegalArgumentException("Mandatory element missing: ''") + } else { + childElement + } + } + +} + + +/** + * Parser trait for custom namespace for Akka dispatcher configuration. + * @author michaelkober + */ +trait DispatcherParser extends BeanParser { + import AkkaSpringConfigurationTags._ + + /** + * Parses the given element and returns a DispatcherProperties. + * @param element dom element to parse + * @return configuration for the dispatcher + */ + def parseDispatcher(element: Element): DispatcherProperties = { + val properties = new DispatcherProperties() + var dispatcherElement = element + if (hasRef(element)) { + val ref = element.getAttribute(REF) + dispatcherElement = element.getOwnerDocument.getElementById(ref) + if (dispatcherElement == null) { + throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'") + } + } + + properties.dispatcherType = mandatory(dispatcherElement, TYPE) + if (properties.dispatcherType == THREAD_BASED) { + val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil + if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) { + throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!") + } + } + + if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher + properties.name = dispatcherElement.getAttribute(NAME) + if (dispatcherElement.hasAttribute(AGGREGATE)) { + properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean + } + } else { + properties.name = mandatory(dispatcherElement, NAME) + } + + val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG); + if (threadPoolElement != null) { + if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN || + properties.dispatcherType == THREAD_BASED) { + throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.") + } + val threadPoolProperties = parseThreadPool(threadPoolElement) + properties.threadPool = threadPoolProperties + } + properties + } + + /** + * Parses the given element and returns a ThreadPoolProperties. + * @param element dom element to parse + * @return configuration for the thread pool + */ + def parseThreadPool(element: Element): ThreadPoolProperties = { + val properties = new ThreadPoolProperties() + properties.queue = element.getAttribute(QUEUE) + if (element.hasAttribute(CAPACITY)) { + properties.capacity = element.getAttribute(CAPACITY).toInt + } + if (element.hasAttribute(BOUND)) { + properties.bound = element.getAttribute(BOUND).toInt + } + if (element.hasAttribute(FAIRNESS)) { + properties.fairness = element.getAttribute(FAIRNESS).toBoolean + } + if (element.hasAttribute(CORE_POOL_SIZE)) { + properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt + } + if (element.hasAttribute(MAX_POOL_SIZE)) { + properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt + } + if (element.hasAttribute(KEEP_ALIVE)) { + properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong + } + if (element.hasAttribute(REJECTION_POLICY)) { + properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY) + } + if (element.hasAttribute(MAILBOX_CAPACITY)) { + properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt + } + properties + } + + def hasRef(element: Element): Boolean = { + val ref = element.getAttribute(REF) + (ref != null) && !ref.isEmpty + } + +} + diff --git a/akka-spring/src/main/scala/ActorProperties.scala b/akka-spring/src/main/scala/ActorProperties.scala index 15c7e61fe0..0f86942935 100644 --- a/akka-spring/src/main/scala/ActorProperties.scala +++ b/akka-spring/src/main/scala/ActorProperties.scala @@ -8,7 +8,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder import AkkaSpringConfigurationTags._ /** - * Data container for typed actor configuration data. + * Data container for actor configuration data. * @author michaelkober * @author Martin Krasser */ @@ -20,6 +20,8 @@ class ActorProperties { var transactional: Boolean = false var host: String = "" var port: Int = _ + var serverManaged: Boolean = false + var serviceName: String = "" var lifecycle: String = "" var scope:String = VAL_SCOPE_SINGLETON var dispatcher: DispatcherProperties = _ @@ -34,6 +36,8 @@ class ActorProperties { builder.addPropertyValue("typed", typed) builder.addPropertyValue(HOST, host) builder.addPropertyValue(PORT, port) + builder.addPropertyValue("serverManaged", serverManaged) + builder.addPropertyValue("serviceName", serviceName) builder.addPropertyValue(TIMEOUT, timeout) builder.addPropertyValue(IMPLEMENTATION, target) builder.addPropertyValue(INTERFACE, interface) @@ -45,3 +49,26 @@ class ActorProperties { } } + +/** + * Data container for actor configuration data. + * @author michaelkober + */ +class ActorForProperties { + var interface: String = "" + var host: String = "" + var port: Int = _ + var serviceName: String = "" + + /** + * Sets the properties to the given builder. + * @param builder bean definition builder + */ + def setAsProperties(builder: BeanDefinitionBuilder) { + builder.addPropertyValue(HOST, host) + builder.addPropertyValue(PORT, port) + builder.addPropertyValue("serviceName", serviceName) + builder.addPropertyValue(INTERFACE, interface) + } + +} diff --git a/akka-spring/src/main/scala/AkkaNamespaceHandler.scala b/akka-spring/src/main/scala/AkkaNamespaceHandler.scala index a478b7b262..b1c58baa20 100644 --- a/akka-spring/src/main/scala/AkkaNamespaceHandler.scala +++ b/akka-spring/src/main/scala/AkkaNamespaceHandler.scala @@ -12,10 +12,11 @@ import AkkaSpringConfigurationTags._ */ class AkkaNamespaceHandler extends NamespaceHandlerSupport { def init = { - registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser()); - registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser()); - registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser()); - registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser()); - registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser); + registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser()) + registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser()) + registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser()) + registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser()) + registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser) + registerBeanDefinitionParser(ACTOR_FOR_TAG, new ActorForBeanDefinitionParser()); } } diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala index 2743d772da..2d9807a806 100644 --- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala +++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala @@ -19,6 +19,7 @@ object AkkaSpringConfigurationTags { val DISPATCHER_TAG = "dispatcher" val PROPERTYENTRY_TAG = "property" val CAMEL_SERVICE_TAG = "camel-service" + val ACTOR_FOR_TAG = "actor-for" // actor sub tags val REMOTE_TAG = "remote" @@ -45,6 +46,8 @@ object AkkaSpringConfigurationTags { val TRANSACTIONAL = "transactional" val HOST = "host" val PORT = "port" + val MANAGED_BY = "managed-by" + val SERVICE_NAME = "service-name" val LIFECYCLE = "lifecycle" val SCOPE = "scope" @@ -103,4 +106,8 @@ object AkkaSpringConfigurationTags { val THREAD_BASED = "thread-based" val HAWT = "hawt" + // managed by types + val SERVER_MANAGED = "server" + val CLIENT_MANAGED = "client" + } diff --git a/akka-spring/src/main/scala/BeanParser.scala b/akka-spring/src/main/scala/BeanParser.scala deleted file mode 100644 index 1bbba9f09f..0000000000 --- a/akka-spring/src/main/scala/BeanParser.scala +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import se.scalablesolutions.akka.util.Logging -import org.w3c.dom.Element -import org.springframework.util.xml.DomUtils - -/** - * Base trait with utility methods for bean parsing. - */ -trait BeanParser extends Logging { - - /** - * Get a mandatory element attribute. - * @param element the element with the mandatory attribute - * @param attribute name of the mandatory attribute - */ - def mandatory(element: Element, attribute: String): String = { - if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) { - throw new IllegalArgumentException("Mandatory attribute missing: " + attribute) - } else { - element.getAttribute(attribute) - } - } - - /** - * Get a mandatory child element. - * @param element the parent element - * @param childName name of the mandatory child element - */ - def mandatoryElement(element: Element, childName: String): Element = { - val childElement = DomUtils.getChildElementByTagName(element, childName); - if (childElement == null) { - throw new IllegalArgumentException("Mandatory element missing: ''") - } else { - childElement - } - } - -} diff --git a/akka-spring/src/main/scala/DispatcherParser.scala b/akka-spring/src/main/scala/DispatcherParser.scala deleted file mode 100644 index e9f10e1328..0000000000 --- a/akka-spring/src/main/scala/DispatcherParser.scala +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import org.w3c.dom.Element -import org.springframework.util.xml.DomUtils - -/** - * Parser trait for custom namespace for Akka dispatcher configuration. - * @author michaelkober - */ -trait DispatcherParser extends BeanParser { - import AkkaSpringConfigurationTags._ - - /** - * Parses the given element and returns a DispatcherProperties. - * @param element dom element to parse - * @return configuration for the dispatcher - */ - def parseDispatcher(element: Element): DispatcherProperties = { - val properties = new DispatcherProperties() - var dispatcherElement = element - if (hasRef(element)) { - val ref = element.getAttribute(REF) - dispatcherElement = element.getOwnerDocument.getElementById(ref) - if (dispatcherElement == null) { - throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'") - } - } - - properties.dispatcherType = mandatory(dispatcherElement, TYPE) - if (properties.dispatcherType == THREAD_BASED) { - val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil - if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) { - throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!") - } - } - - if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher - properties.name = dispatcherElement.getAttribute(NAME) - if (dispatcherElement.hasAttribute(AGGREGATE)) { - properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean - } - } else { - properties.name = mandatory(dispatcherElement, NAME) - } - - val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG); - if (threadPoolElement != null) { - if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN || - properties.dispatcherType == THREAD_BASED) { - throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.") - } - val threadPoolProperties = parseThreadPool(threadPoolElement) - properties.threadPool = threadPoolProperties - } - properties - } - - /** - * Parses the given element and returns a ThreadPoolProperties. - * @param element dom element to parse - * @return configuration for the thread pool - */ - def parseThreadPool(element: Element): ThreadPoolProperties = { - val properties = new ThreadPoolProperties() - properties.queue = element.getAttribute(QUEUE) - if (element.hasAttribute(CAPACITY)) { - properties.capacity = element.getAttribute(CAPACITY).toInt - } - if (element.hasAttribute(BOUND)) { - properties.bound = element.getAttribute(BOUND).toInt - } - if (element.hasAttribute(FAIRNESS)) { - properties.fairness = element.getAttribute(FAIRNESS).toBoolean - } - if (element.hasAttribute(CORE_POOL_SIZE)) { - properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt - } - if (element.hasAttribute(MAX_POOL_SIZE)) { - properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt - } - if (element.hasAttribute(KEEP_ALIVE)) { - properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong - } - if (element.hasAttribute(REJECTION_POLICY)) { - properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY) - } - if (element.hasAttribute(MAILBOX_CAPACITY)) { - properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt - } - properties - } - - def hasRef(element: Element): Boolean = { - val ref = element.getAttribute(REF) - (ref != null) && !ref.isEmpty - } - -} diff --git a/akka-spring/src/main/scala/PropertyEntries.scala b/akka-spring/src/main/scala/PropertyEntries.scala index bf1898a805..9a7dc098de 100644 --- a/akka-spring/src/main/scala/PropertyEntries.scala +++ b/akka-spring/src/main/scala/PropertyEntries.scala @@ -18,3 +18,19 @@ class PropertyEntries { entryList.append(entry) } } + +/** + * Represents a property element + * @author Johan Rask + */ +class PropertyEntry { + var name: String = _ + var value: String = null + var ref: String = null + + + override def toString(): String = { + format("name = %s,value = %s, ref = %s", name, value, ref) + } +} + diff --git a/akka-spring/src/main/scala/PropertyEntry.scala b/akka-spring/src/main/scala/PropertyEntry.scala deleted file mode 100644 index 9fe6357fc0..0000000000 --- a/akka-spring/src/main/scala/PropertyEntry.scala +++ /dev/null @@ -1,19 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -/** - * Represents a property element - * @author Johan Rask - */ -class PropertyEntry { - var name: String = _ - var value: String = null - var ref: String = null - - - override def toString(): String = { - format("name = %s,value = %s, ref = %s", name, value, ref) - } -} diff --git a/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala deleted file mode 100644 index e8e0cef7d4..0000000000 --- a/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import org.springframework.beans.factory.support.BeanDefinitionBuilder -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser -import org.springframework.beans.factory.xml.ParserContext -import AkkaSpringConfigurationTags._ -import org.w3c.dom.Element - - -/** - * Parser for custom namespace configuration. - * @author michaelkober - */ -class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) - */ - override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { - val typedActorConf = parseActor(element) - typedActorConf.typed = TYPED_ACTOR_TAG - typedActorConf.setAsProperties(builder) - } - - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) - */ - override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] -} diff --git a/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala deleted file mode 100644 index 752e18559f..0000000000 --- a/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ -package se.scalablesolutions.akka.spring - -import org.springframework.beans.factory.support.BeanDefinitionBuilder -import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser -import org.springframework.beans.factory.xml.ParserContext -import AkkaSpringConfigurationTags._ -import org.w3c.dom.Element - - -/** - * Parser for custom namespace configuration. - * @author michaelkober - */ -class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser { - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder) - */ - override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) { - val untypedActorConf = parseActor(element) - untypedActorConf.typed = UNTYPED_ACTOR_TAG - untypedActorConf.setAsProperties(builder) - } - - /* - * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element) - */ - override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean] -} diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java index f2c5e24884..5a2a272e6c 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java @@ -8,14 +8,12 @@ package se.scalablesolutions.akka.spring.foo; * To change this template use File | Settings | File Templates. */ public interface IMyPojo { + public void oneWay(String message); + public String getFoo(); - public String getBar(); - - public void preRestart(); - - public void postRestart(); - public String longRunning(); + + } diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java index fe3e9ba767..8f610eef63 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java @@ -1,42 +1,34 @@ package se.scalablesolutions.akka.spring.foo; -import se.scalablesolutions.akka.actor.*; +import se.scalablesolutions.akka.actor.TypedActor; -public class MyPojo extends TypedActor implements IMyPojo{ +import java.util.concurrent.CountDownLatch; - private String foo; - private String bar; +public class MyPojo extends TypedActor implements IMyPojo { + + public static CountDownLatch latch = new CountDownLatch(1); + public static String lastOneWayMessage = null; + private String foo = "foo"; - public MyPojo() { - this.foo = "foo"; - this.bar = "bar"; - } + public MyPojo() { + } + public String getFoo() { + return foo; + } - public String getFoo() { - return foo; - } + public void oneWay(String message) { + lastOneWayMessage = message; + latch.countDown(); + } - - public String getBar() { - return bar; - } - - public void preRestart() { - System.out.println("pre restart"); - } - - public void postRestart() { - System.out.println("post restart"); - } - - public String longRunning() { - try { - Thread.sleep(6000); - } catch (InterruptedException e) { - } - return "this took long"; + public String longRunning() { + try { + Thread.sleep(6000); + } catch (InterruptedException e) { } + return "this took long"; + } } diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java index e447b26a28..3063a1b529 100644 --- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java +++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java @@ -6,6 +6,8 @@ import se.scalablesolutions.akka.actor.ActorRef; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; +import java.util.concurrent.CountDownLatch; + /** * test class @@ -14,6 +16,9 @@ public class PingActor extends UntypedActor implements ApplicationContextAware { private String stringFromVal; private String stringFromRef; + public static String lastMessage = null; + public static CountDownLatch latch = new CountDownLatch(1); + private boolean gotApplicationContext = false; @@ -42,7 +47,6 @@ public class PingActor extends UntypedActor implements ApplicationContextAware { stringFromRef = s; } - private String longRunning() { try { Thread.sleep(6000); @@ -53,12 +57,12 @@ public class PingActor extends UntypedActor implements ApplicationContextAware { public void onReceive(Object message) throws Exception { if (message instanceof String) { - System.out.println("Ping received String message: " + message); + lastMessage = (String) message; if (message.equals("longRunning")) { - System.out.println("### starting pong"); ActorRef pongActor = UntypedActor.actorOf(PongActor.class).start(); pongActor.sendRequestReply("longRunning", getContext()); } + latch.countDown(); } else { throw new IllegalArgumentException("Unknown message: " + message); } diff --git a/akka-spring/src/test/resources/server-managed-config.xml b/akka-spring/src/test/resources/server-managed-config.xml new file mode 100644 index 0000000000..128b16c8b6 --- /dev/null +++ b/akka-spring/src/test/resources/server-managed-config.xml @@ -0,0 +1,57 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/akka-spring/src/test/resources/typed-actor-config.xml b/akka-spring/src/test/resources/typed-actor-config.xml index faca749469..989884e4fa 100644 --- a/akka-spring/src/test/resources/typed-actor-config.xml +++ b/akka-spring/src/test/resources/typed-actor-config.xml @@ -37,7 +37,7 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd"> implementation="se.scalablesolutions.akka.spring.foo.MyPojo" timeout="2000" transactional="true"> - + - + + + + val props = parser.parseActor(dom(xml).getDocumentElement); + assert(props != null) + assert(props.host === "com.some.host") + assert(props.port === 9999) + assert(props.serviceName === "my-service") + assert(props.serverManaged) } } } diff --git a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala index 8767b2e75a..3cdcd17cb0 100644 --- a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala @@ -4,10 +4,8 @@ package se.scalablesolutions.akka.spring -import foo.{IMyPojo, MyPojo} +import foo.{PingActor, IMyPojo, MyPojo} import se.scalablesolutions.akka.dispatch.FutureTimeoutException -import se.scalablesolutions.akka.remote.RemoteNode -import org.scalatest.FeatureSpec import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith @@ -16,13 +14,52 @@ import org.springframework.beans.factory.xml.XmlBeanDefinitionReader import org.springframework.context.ApplicationContext import org.springframework.context.support.ClassPathXmlApplicationContext import org.springframework.core.io.{ClassPathResource, Resource} +import org.scalatest.{BeforeAndAfterAll, FeatureSpec} +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer, RemoteNode} +import java.util.concurrent.CountDownLatch +import se.scalablesolutions.akka.actor.{TypedActor, RemoteTypedActorOne, Actor} +import se.scalablesolutions.akka.actor.remote.RemoteTypedActorOneImpl /** * Tests for spring configuration of typed actors. * @author michaelkober */ @RunWith(classOf[JUnitRunner]) -class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers { +class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll { + + var server1: RemoteServer = null + var server2: RemoteServer = null + + override def beforeAll = { + val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed + server1 = new RemoteServer() + server1.start("localhost", 9990) + server2 = new RemoteServer() + server2.start("localhost", 9992) + + val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000) + server1.registerTypedActor("typed-actor-service", typedActor) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterAll = { + try { + server1.shutdown + server2.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + def getTypedActorFromContext(config: String, id: String) : IMyPojo = { + MyPojo.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext(config) + val myPojo: IMyPojo = context.getBean(id).asInstanceOf[IMyPojo] + myPojo + } + feature("parse Spring application context") { scenario("akka:typed-actor and akka:supervision and akka:dispatcher can be used as top level elements") { @@ -37,41 +74,79 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers { } scenario("get a typed actor") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo] - var msg = myPojo.getFoo() - msg += myPojo.getBar() - assert(msg === "foobar") + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello 1") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello 1") } scenario("FutureTimeoutException when timed out") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo] + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor") evaluating {myPojo.longRunning()} should produce[FutureTimeoutException] - } scenario("typed-actor with timeout") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("simple-typed-actor-long-timeout").asInstanceOf[IMyPojo] + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor-long-timeout") assert(myPojo.longRunning() === "this took long"); } scenario("transactional typed-actor") { - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("transactional-typed-actor").asInstanceOf[IMyPojo] - var msg = myPojo.getFoo() - msg += myPojo.getBar() - assert(msg === "foobar") + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "transactional-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello 2") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello 2") } scenario("get a remote typed-actor") { - RemoteNode.start - Thread.sleep(1000) - val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml") - val myPojo = context.getBean("remote-typed-actor").asInstanceOf[IMyPojo] - assert(myPojo.getFoo === "foo") + val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "remote-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello 3") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello 3") } + + scenario("get a client-managed-remote-typed-actor") { + val myPojo = getTypedActorFromContext("/server-managed-config.xml", "client-managed-remote-typed-actor") + assert(myPojo.getFoo() === "foo") + myPojo.oneWay("hello client-managed-remote-typed-actor") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello client-managed-remote-typed-actor") + } + + scenario("get a server-managed-remote-typed-actor") { + val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor") + // + val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], classOf[IMyPojo].getName, 5000L, "localhost", 9990) + assert(myPojoProxy.getFoo() === "foo") + myPojoProxy.oneWay("hello server-managed-remote-typed-actor") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor") + } + + scenario("get a server-managed-remote-typed-actor-custom-id") { + val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor-custom-id") + // + val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], "mypojo-service", 5000L, "localhost", 9990) + assert(myPojoProxy.getFoo() === "foo") + myPojoProxy.oneWay("hello server-managed-remote-typed-actor 2") + MyPojo.latch.await + assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor 2") + } + + scenario("get a client proxy for server-managed-remote-typed-actor") { + MyPojo.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext("/server-managed-config.xml") + val myPojo: IMyPojo = context.getBean("server-managed-remote-typed-actor-custom-id").asInstanceOf[IMyPojo] + // get client proxy from spring context + val myPojoProxy = context.getBean("typed-client-1").asInstanceOf[IMyPojo] + assert(myPojoProxy.getFoo() === "foo") + myPojoProxy.oneWay("hello") + MyPojo.latch.await + } + + } } diff --git a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala index cf7d8d9805..0397d30bf0 100644 --- a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala +++ b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala @@ -6,74 +6,146 @@ package se.scalablesolutions.akka.spring import foo.PingActor import se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher -import se.scalablesolutions.akka.remote.RemoteNode -import se.scalablesolutions.akka.actor.ActorRef -import org.scalatest.FeatureSpec import org.scalatest.matchers.ShouldMatchers import org.scalatest.junit.JUnitRunner import org.junit.runner.RunWith -import org.springframework.context.ApplicationContext import org.springframework.context.support.ClassPathXmlApplicationContext +import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer} +import org.scalatest.{BeforeAndAfterAll, FeatureSpec} +import java.util.concurrent.CountDownLatch +import se.scalablesolutions.akka.actor.{RemoteActorRef, ActorRegistry, Actor, ActorRef} /** * Tests for spring configuration of typed actors. * @author michaelkober */ @RunWith(classOf[JUnitRunner]) -class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers { +class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll { + + var server1: RemoteServer = null + var server2: RemoteServer = null + + + override def beforeAll = { + val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed + server1 = new RemoteServer() + server1.start("localhost", 9990) + server2 = new RemoteServer() + server2.start("localhost", 9992) + } + + // make sure the servers shutdown cleanly after the test has finished + override def afterAll = { + try { + server1.shutdown + server2.shutdown + RemoteClient.shutdownAll + Thread.sleep(1000) + } catch { + case e => () + } + } + + + def getPingActorFromContext(config: String, id: String) : ActorRef = { + PingActor.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext(config) + val pingActor = context.getBean(id).asInstanceOf[ActorRef] + assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") + pingActor.start() + } + + feature("parse Spring application context") { scenario("get a untyped actor") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("simple-untyped-actor").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor") myactor.sendOneWay("Hello") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello") assert(myactor.isDefinedAt("some string message")) } scenario("untyped-actor with timeout") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("simple-untyped-actor-long-timeout").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor-long-timeout") assert(myactor.getTimeout() === 10000) + myactor.sendOneWay("Hello 2") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 2") } scenario("transactional untyped-actor") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("transactional-untyped-actor").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") - assert(myactor.isDefinedAt("some string message")) + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "transactional-untyped-actor") + myactor.sendOneWay("Hello 3") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 3") } scenario("get a remote typed-actor") { - RemoteNode.start - Thread.sleep(1000) - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("remote-untyped-actor").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") - assert(myactor.isDefinedAt("some string message")) + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "remote-untyped-actor") + myactor.sendOneWay("Hello 4") assert(myactor.getRemoteAddress().isDefined) assert(myactor.getRemoteAddress().get.getHostName() === "localhost") - assert(myactor.getRemoteAddress().get.getPort() === 9999) + assert(myactor.getRemoteAddress().get.getPort() === 9992) + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 4") } scenario("untyped-actor with custom dispatcher") { - val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml") - val myactor = context.getBean("untyped-actor-with-dispatcher").asInstanceOf[ActorRef] - assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") - myactor.start() - myactor.sendOneWay("Hello") + val myactor = getPingActorFromContext("/untyped-actor-config.xml", "untyped-actor-with-dispatcher") assert(myactor.getTimeout() === 1000) assert(myactor.getDispatcher.isInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher]) + myactor.sendOneWay("Hello 5") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello 5") } + + scenario("create client managed remote untyped-actor") { + val myactor = getPingActorFromContext("/server-managed-config.xml", "client-managed-remote-untyped-actor") + myactor.sendOneWay("Hello client managed remote untyped-actor") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello client managed remote untyped-actor") + assert(myactor.getRemoteAddress().isDefined) + assert(myactor.getRemoteAddress().get.getHostName() === "localhost") + assert(myactor.getRemoteAddress().get.getPort() === 9990) + } + + scenario("create server managed remote untyped-actor") { + val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor") + val nrOfActors = ActorRegistry.actors.length + val actorRef = RemoteClient.actorFor("se.scalablesolutions.akka.spring.foo.PingActor", "localhost", 9990) + actorRef.sendOneWay("Hello server managed remote untyped-actor") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello server managed remote untyped-actor") + assert(ActorRegistry.actors.length === nrOfActors) + } + + scenario("create server managed remote untyped-actor with custom service id") { + val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor-custom-id") + val nrOfActors = ActorRegistry.actors.length + val actorRef = RemoteClient.actorFor("ping-service", "localhost", 9990) + actorRef.sendOneWay("Hello server managed remote untyped-actor") + PingActor.latch.await + assert(PingActor.lastMessage === "Hello server managed remote untyped-actor") + assert(ActorRegistry.actors.length === nrOfActors) + } + + scenario("get client actor for server managed remote untyped-actor") { + PingActor.latch = new CountDownLatch(1) + val context = new ClassPathXmlApplicationContext("/server-managed-config.xml") + val pingActor = context.getBean("server-managed-remote-untyped-actor-custom-id").asInstanceOf[ActorRef] + assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor") + pingActor.start() + val nrOfActors = ActorRegistry.actors.length + // get client actor ref from spring context + val actorRef = context.getBean("client-1").asInstanceOf[ActorRef] + assert(actorRef.isInstanceOf[RemoteActorRef]) + actorRef.sendOneWay("Hello") + PingActor.latch.await + assert(ActorRegistry.actors.length === nrOfActors) + } + } } diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala index 385c1831a4..7d393070ec 100644 --- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala +++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala @@ -389,7 +389,8 @@ object TypedActor extends Logging { typedActor.initialize(proxy) if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef) - AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, None, config.timeout)) + if (config._host.isDefined) actorRef.makeRemote(config._host.get) + AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout)) actorRef.start proxy.asInstanceOf[T] } diff --git a/akka-typed-actor/src/test/resources/META-INF/aop.xml b/akka-typed-actor/src/test/resources/META-INF/aop.xml index bdc167ca54..be133a51b8 100644 --- a/akka-typed-actor/src/test/resources/META-INF/aop.xml +++ b/akka-typed-actor/src/test/resources/META-INF/aop.xml @@ -2,6 +2,7 @@ +