diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala
index bf9b38ca1b..fa57bda71b 100644
--- a/akka-remote/src/main/scala/remote/RemoteServer.scala
+++ b/akka-remote/src/main/scala/remote/RemoteServer.scala
@@ -271,6 +271,11 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
+ /**
+ * Register typed actor by interface name.
+ */
+ def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor)
+
/**
* Register remote typed actor by a specific id.
* @param id custom actor id
diff --git a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd
index c3d7608bee..80b37c41f5 100644
--- a/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd
+++ b/akka-spring/src/main/resources/se/scalablesolutions/akka/spring/akka-1.0-SNAPSHOT.xsd
@@ -66,6 +66,14 @@
+
+
+
+
+
+
+
+
@@ -107,6 +115,20 @@
+
+
+
+ Management type for remote actors: client managed or server managed.
+
+
+
+
+
+
+ Custom service name for server managed actor.
+
+
+
@@ -135,7 +157,7 @@
- Theh default timeout for '!!' invocations.
+ The default timeout for '!!' invocations.
@@ -229,6 +251,41 @@
+
+
+
+
+
+
+
+ Name of the remote host.
+
+
+
+
+
+
+ Port of the remote host.
+
+
+
+
+
+
+ Custom service name or class name for the server managed actor.
+
+
+
+
+
+
+ Name of the interface the typed actor implements.
+
+
+
+
+
+
@@ -294,4 +351,7 @@
+
+
+
diff --git a/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala
new file mode 100644
index 0000000000..55aa82b8e4
--- /dev/null
+++ b/akka-spring/src/main/scala/ActorBeanDefinitionParser.scala
@@ -0,0 +1,72 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+package se.scalablesolutions.akka.spring
+
+import org.springframework.beans.factory.support.BeanDefinitionBuilder
+import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
+import org.springframework.beans.factory.xml.ParserContext
+import AkkaSpringConfigurationTags._
+import org.w3c.dom.Element
+
+
+/**
+ * Parser for custom namespace configuration.
+ * @author michaelkober
+ */
+class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
+ /*
+ * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
+ */
+ override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
+ val typedActorConf = parseActor(element)
+ typedActorConf.typed = TYPED_ACTOR_TAG
+ typedActorConf.setAsProperties(builder)
+ }
+
+ /*
+ * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
+ */
+ override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
+}
+
+
+/**
+ * Parser for custom namespace configuration.
+ * @author michaelkober
+ */
+class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
+ /*
+ * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
+ */
+ override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
+ val untypedActorConf = parseActor(element)
+ untypedActorConf.typed = UNTYPED_ACTOR_TAG
+ untypedActorConf.setAsProperties(builder)
+ }
+
+ /*
+ * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
+ */
+ override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
+}
+
+
+/**
+ * Parser for custom namespace configuration.
+ * @author michaelkober
+ */
+class ActorForBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorForParser {
+ /*
+ * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
+ */
+ override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
+ val actorForConf = parseActorFor(element)
+ actorForConf.setAsProperties(builder)
+ }
+
+ /*
+ * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
+ */
+ override def getBeanClass(element: Element): Class[_] = classOf[ActorForFactoryBean]
+}
diff --git a/akka-spring/src/main/scala/ActorFactoryBean.scala b/akka-spring/src/main/scala/ActorFactoryBean.scala
index 11d5274a70..c47efcdb78 100644
--- a/akka-spring/src/main/scala/ActorFactoryBean.scala
+++ b/akka-spring/src/main/scala/ActorFactoryBean.scala
@@ -4,22 +4,19 @@
package se.scalablesolutions.akka.spring
-import java.beans.PropertyDescriptor
-import java.lang.reflect.Method
-import javax.annotation.PreDestroy
-import javax.annotation.PostConstruct
-
import org.springframework.beans.{BeanUtils,BeansException,BeanWrapper,BeanWrapperImpl}
-import org.springframework.beans.factory.BeanFactory
+import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
+//import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.config.AbstractFactoryBean
import org.springframework.context.{ApplicationContext,ApplicationContextAware}
-import org.springframework.util.ReflectionUtils
+//import org.springframework.util.ReflectionUtils
import org.springframework.util.StringUtils
import se.scalablesolutions.akka.actor.{ActorRef, AspectInitRegistry, TypedActorConfiguration, TypedActor,Actor}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
import se.scalablesolutions.akka.util.{Logging, Duration}
import scala.reflect.BeanProperty
+import java.net.InetSocketAddress
/**
* Exception to use when something goes wrong during bean creation.
@@ -49,6 +46,8 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
@BeanProperty var transactional: Boolean = false
@BeanProperty var host: String = ""
@BeanProperty var port: Int = _
+ @BeanProperty var serverManaged: Boolean = false
+ @BeanProperty var serviceName: String = ""
@BeanProperty var lifecycle: String = ""
@BeanProperty var dispatcher: DispatcherProperties = _
@BeanProperty var scope: String = VAL_SCOPE_SINGLETON
@@ -94,7 +93,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
if (implementation == null || implementation == "") throw new AkkaBeansException(
"The 'implementation' part of the 'akka:typed-actor' element in the Spring config file can't be null or empty string")
- TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig)
+ val typedActor: AnyRef = TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig)
+ if (isRemote && serverManaged) {
+ val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port))
+ if (serviceName.isEmpty) {
+ server.registerTypedActor(interface, typedActor)
+ } else {
+ server.registerTypedActor(serviceName, typedActor)
+ }
+ }
+ typedActor
}
/**
@@ -111,7 +119,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
actorRef.makeTransactionRequired
}
if (isRemote) {
- actorRef.makeRemote(host, port)
+ if (serverManaged) {
+ val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port))
+ if (serviceName.isEmpty) {
+ server.register(actorRef)
+ } else {
+ server.register(serviceName, actorRef)
+ }
+ } else {
+ actorRef.makeRemote(host, port)
+ }
}
if (hasDispatcher) {
if (dispatcher.dispatcherType != THREAD_BASED){
@@ -159,7 +176,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
private[akka] def createConfig: TypedActorConfiguration = {
val config = new TypedActorConfiguration().timeout(Duration(timeout, "millis"))
if (transactional) config.makeTransactionRequired
- if (isRemote) config.makeRemote(host, port)
+ if (isRemote && !serverManaged) config.makeRemote(host, port)
if (hasDispatcher) {
if (dispatcher.dispatcherType != THREAD_BASED) {
config.dispatcher(dispatcherInstance())
@@ -191,3 +208,39 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
}
}
}
+
+/**
+ * Factory bean for remote client actor-for.
+ *
+ * @author michaelkober
+ */
+class ActorForFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware {
+ import StringReflect._
+ import AkkaSpringConfigurationTags._
+
+ @BeanProperty var interface: String = ""
+ @BeanProperty var host: String = ""
+ @BeanProperty var port: Int = _
+ @BeanProperty var serviceName: String = ""
+ //@BeanProperty var scope: String = VAL_SCOPE_SINGLETON
+ @BeanProperty var applicationContext: ApplicationContext = _
+
+ override def isSingleton = false
+
+ /*
+ * @see org.springframework.beans.factory.FactoryBean#getObjectType()
+ */
+ def getObjectType: Class[AnyRef] = classOf[AnyRef]
+
+ /*
+ * @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance()
+ */
+ def createInstance: AnyRef = {
+ if (interface.isEmpty) {
+ RemoteClient.actorFor(serviceName, host, port)
+ } else {
+ RemoteClient.typedActorFor(interface.toClass, serviceName, host, port)
+ }
+ }
+}
+
diff --git a/akka-spring/src/main/scala/ActorParser.scala b/akka-spring/src/main/scala/ActorParser.scala
index 69073bd52f..8736b807d1 100644
--- a/akka-spring/src/main/scala/ActorParser.scala
+++ b/akka-spring/src/main/scala/ActorParser.scala
@@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring
import org.springframework.util.xml.DomUtils
import org.w3c.dom.Element
import scala.collection.JavaConversions._
+import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.actor.IllegalActorStateException
@@ -27,11 +28,17 @@ trait ActorParser extends BeanParser with DispatcherParser {
val objectProperties = new ActorProperties()
val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG);
val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG)
- val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG)
+ val propertyEntries = DomUtils.getChildElementsByTagName(element, PROPERTYENTRY_TAG)
if (remoteElement != null) {
objectProperties.host = mandatory(remoteElement, HOST)
objectProperties.port = mandatory(remoteElement, PORT).toInt
+ objectProperties.serverManaged = (remoteElement.getAttribute(MANAGED_BY) != null) && (remoteElement.getAttribute(MANAGED_BY).equals(SERVER_MANAGED))
+ val serviceName = remoteElement.getAttribute(SERVICE_NAME)
+ if ((serviceName != null) && (!serviceName.isEmpty)) {
+ objectProperties.serviceName = serviceName
+ objectProperties.serverManaged = true
+ }
}
if (dispatcherElement != null) {
@@ -43,7 +50,7 @@ trait ActorParser extends BeanParser with DispatcherParser {
val entry = new PropertyEntry
entry.name = element.getAttribute("name");
entry.value = element.getAttribute("value")
- entry.ref = element.getAttribute("ref")
+ entry.ref = element.getAttribute("ref")
objectProperties.propertyEntries.add(entry)
}
@@ -59,15 +66,13 @@ trait ActorParser extends BeanParser with DispatcherParser {
objectProperties.target = mandatory(element, IMPLEMENTATION)
objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean
- if (!element.getAttribute(INTERFACE).isEmpty) {
+ if (element.hasAttribute(INTERFACE)) {
objectProperties.interface = element.getAttribute(INTERFACE)
}
-
- if (!element.getAttribute(LIFECYCLE).isEmpty) {
+ if (element.hasAttribute(LIFECYCLE)) {
objectProperties.lifecycle = element.getAttribute(LIFECYCLE)
}
-
- if (!element.getAttribute(SCOPE).isEmpty) {
+ if (element.hasAttribute(SCOPE)) {
objectProperties.scope = element.getAttribute(SCOPE)
}
@@ -75,3 +80,159 @@ trait ActorParser extends BeanParser with DispatcherParser {
}
}
+
+/**
+ * Parser trait for custom namespace configuration for RemoteClient actor-for.
+ * @author michaelkober
+ */
+trait ActorForParser extends BeanParser {
+ import AkkaSpringConfigurationTags._
+
+ /**
+ * Parses the given element and returns a ActorForProperties.
+ * @param element dom element to parse
+ * @return configuration for the typed actor
+ */
+ def parseActorFor(element: Element): ActorForProperties = {
+ val objectProperties = new ActorForProperties()
+
+ objectProperties.host = mandatory(element, HOST)
+ objectProperties.port = mandatory(element, PORT).toInt
+ objectProperties.serviceName = mandatory(element, SERVICE_NAME)
+ if (element.hasAttribute(INTERFACE)) {
+ objectProperties.interface = element.getAttribute(INTERFACE)
+ }
+ objectProperties
+ }
+
+}
+
+/**
+ * Base trait with utility methods for bean parsing.
+ */
+trait BeanParser extends Logging {
+
+ /**
+ * Get a mandatory element attribute.
+ * @param element the element with the mandatory attribute
+ * @param attribute name of the mandatory attribute
+ */
+ def mandatory(element: Element, attribute: String): String = {
+ if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) {
+ throw new IllegalArgumentException("Mandatory attribute missing: " + attribute)
+ } else {
+ element.getAttribute(attribute)
+ }
+ }
+
+ /**
+ * Get a mandatory child element.
+ * @param element the parent element
+ * @param childName name of the mandatory child element
+ */
+ def mandatoryElement(element: Element, childName: String): Element = {
+ val childElement = DomUtils.getChildElementByTagName(element, childName);
+ if (childElement == null) {
+ throw new IllegalArgumentException("Mandatory element missing: ''")
+ } else {
+ childElement
+ }
+ }
+
+}
+
+
+/**
+ * Parser trait for custom namespace for Akka dispatcher configuration.
+ * @author michaelkober
+ */
+trait DispatcherParser extends BeanParser {
+ import AkkaSpringConfigurationTags._
+
+ /**
+ * Parses the given element and returns a DispatcherProperties.
+ * @param element dom element to parse
+ * @return configuration for the dispatcher
+ */
+ def parseDispatcher(element: Element): DispatcherProperties = {
+ val properties = new DispatcherProperties()
+ var dispatcherElement = element
+ if (hasRef(element)) {
+ val ref = element.getAttribute(REF)
+ dispatcherElement = element.getOwnerDocument.getElementById(ref)
+ if (dispatcherElement == null) {
+ throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'")
+ }
+ }
+
+ properties.dispatcherType = mandatory(dispatcherElement, TYPE)
+ if (properties.dispatcherType == THREAD_BASED) {
+ val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil
+ if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) {
+ throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!")
+ }
+ }
+
+ if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher
+ properties.name = dispatcherElement.getAttribute(NAME)
+ if (dispatcherElement.hasAttribute(AGGREGATE)) {
+ properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean
+ }
+ } else {
+ properties.name = mandatory(dispatcherElement, NAME)
+ }
+
+ val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG);
+ if (threadPoolElement != null) {
+ if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN ||
+ properties.dispatcherType == THREAD_BASED) {
+ throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.")
+ }
+ val threadPoolProperties = parseThreadPool(threadPoolElement)
+ properties.threadPool = threadPoolProperties
+ }
+ properties
+ }
+
+ /**
+ * Parses the given element and returns a ThreadPoolProperties.
+ * @param element dom element to parse
+ * @return configuration for the thread pool
+ */
+ def parseThreadPool(element: Element): ThreadPoolProperties = {
+ val properties = new ThreadPoolProperties()
+ properties.queue = element.getAttribute(QUEUE)
+ if (element.hasAttribute(CAPACITY)) {
+ properties.capacity = element.getAttribute(CAPACITY).toInt
+ }
+ if (element.hasAttribute(BOUND)) {
+ properties.bound = element.getAttribute(BOUND).toInt
+ }
+ if (element.hasAttribute(FAIRNESS)) {
+ properties.fairness = element.getAttribute(FAIRNESS).toBoolean
+ }
+ if (element.hasAttribute(CORE_POOL_SIZE)) {
+ properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt
+ }
+ if (element.hasAttribute(MAX_POOL_SIZE)) {
+ properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt
+ }
+ if (element.hasAttribute(KEEP_ALIVE)) {
+ properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong
+ }
+ if (element.hasAttribute(REJECTION_POLICY)) {
+ properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY)
+ }
+ if (element.hasAttribute(MAILBOX_CAPACITY)) {
+ properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt
+ }
+ properties
+ }
+
+ def hasRef(element: Element): Boolean = {
+ val ref = element.getAttribute(REF)
+ (ref != null) && !ref.isEmpty
+ }
+
+}
+
diff --git a/akka-spring/src/main/scala/ActorProperties.scala b/akka-spring/src/main/scala/ActorProperties.scala
index 15c7e61fe0..0f86942935 100644
--- a/akka-spring/src/main/scala/ActorProperties.scala
+++ b/akka-spring/src/main/scala/ActorProperties.scala
@@ -8,7 +8,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder
import AkkaSpringConfigurationTags._
/**
- * Data container for typed actor configuration data.
+ * Data container for actor configuration data.
* @author michaelkober
* @author Martin Krasser
*/
@@ -20,6 +20,8 @@ class ActorProperties {
var transactional: Boolean = false
var host: String = ""
var port: Int = _
+ var serverManaged: Boolean = false
+ var serviceName: String = ""
var lifecycle: String = ""
var scope:String = VAL_SCOPE_SINGLETON
var dispatcher: DispatcherProperties = _
@@ -34,6 +36,8 @@ class ActorProperties {
builder.addPropertyValue("typed", typed)
builder.addPropertyValue(HOST, host)
builder.addPropertyValue(PORT, port)
+ builder.addPropertyValue("serverManaged", serverManaged)
+ builder.addPropertyValue("serviceName", serviceName)
builder.addPropertyValue(TIMEOUT, timeout)
builder.addPropertyValue(IMPLEMENTATION, target)
builder.addPropertyValue(INTERFACE, interface)
@@ -45,3 +49,26 @@ class ActorProperties {
}
}
+
+/**
+ * Data container for actor configuration data.
+ * @author michaelkober
+ */
+class ActorForProperties {
+ var interface: String = ""
+ var host: String = ""
+ var port: Int = _
+ var serviceName: String = ""
+
+ /**
+ * Sets the properties to the given builder.
+ * @param builder bean definition builder
+ */
+ def setAsProperties(builder: BeanDefinitionBuilder) {
+ builder.addPropertyValue(HOST, host)
+ builder.addPropertyValue(PORT, port)
+ builder.addPropertyValue("serviceName", serviceName)
+ builder.addPropertyValue(INTERFACE, interface)
+ }
+
+}
diff --git a/akka-spring/src/main/scala/AkkaNamespaceHandler.scala b/akka-spring/src/main/scala/AkkaNamespaceHandler.scala
index a478b7b262..b1c58baa20 100644
--- a/akka-spring/src/main/scala/AkkaNamespaceHandler.scala
+++ b/akka-spring/src/main/scala/AkkaNamespaceHandler.scala
@@ -12,10 +12,11 @@ import AkkaSpringConfigurationTags._
*/
class AkkaNamespaceHandler extends NamespaceHandlerSupport {
def init = {
- registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser());
- registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser());
- registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser());
- registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser());
- registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser);
+ registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser())
+ registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser())
+ registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser())
+ registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser())
+ registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser)
+ registerBeanDefinitionParser(ACTOR_FOR_TAG, new ActorForBeanDefinitionParser());
}
}
diff --git a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala
index 2743d772da..2d9807a806 100644
--- a/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala
+++ b/akka-spring/src/main/scala/AkkaSpringConfigurationTags.scala
@@ -19,6 +19,7 @@ object AkkaSpringConfigurationTags {
val DISPATCHER_TAG = "dispatcher"
val PROPERTYENTRY_TAG = "property"
val CAMEL_SERVICE_TAG = "camel-service"
+ val ACTOR_FOR_TAG = "actor-for"
// actor sub tags
val REMOTE_TAG = "remote"
@@ -45,6 +46,8 @@ object AkkaSpringConfigurationTags {
val TRANSACTIONAL = "transactional"
val HOST = "host"
val PORT = "port"
+ val MANAGED_BY = "managed-by"
+ val SERVICE_NAME = "service-name"
val LIFECYCLE = "lifecycle"
val SCOPE = "scope"
@@ -103,4 +106,8 @@ object AkkaSpringConfigurationTags {
val THREAD_BASED = "thread-based"
val HAWT = "hawt"
+ // managed by types
+ val SERVER_MANAGED = "server"
+ val CLIENT_MANAGED = "client"
+
}
diff --git a/akka-spring/src/main/scala/BeanParser.scala b/akka-spring/src/main/scala/BeanParser.scala
deleted file mode 100644
index 1bbba9f09f..0000000000
--- a/akka-spring/src/main/scala/BeanParser.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-package se.scalablesolutions.akka.spring
-
-import se.scalablesolutions.akka.util.Logging
-import org.w3c.dom.Element
-import org.springframework.util.xml.DomUtils
-
-/**
- * Base trait with utility methods for bean parsing.
- */
-trait BeanParser extends Logging {
-
- /**
- * Get a mandatory element attribute.
- * @param element the element with the mandatory attribute
- * @param attribute name of the mandatory attribute
- */
- def mandatory(element: Element, attribute: String): String = {
- if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) {
- throw new IllegalArgumentException("Mandatory attribute missing: " + attribute)
- } else {
- element.getAttribute(attribute)
- }
- }
-
- /**
- * Get a mandatory child element.
- * @param element the parent element
- * @param childName name of the mandatory child element
- */
- def mandatoryElement(element: Element, childName: String): Element = {
- val childElement = DomUtils.getChildElementByTagName(element, childName);
- if (childElement == null) {
- throw new IllegalArgumentException("Mandatory element missing: ''")
- } else {
- childElement
- }
- }
-
-}
diff --git a/akka-spring/src/main/scala/DispatcherParser.scala b/akka-spring/src/main/scala/DispatcherParser.scala
deleted file mode 100644
index e9f10e1328..0000000000
--- a/akka-spring/src/main/scala/DispatcherParser.scala
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-package se.scalablesolutions.akka.spring
-
-import org.w3c.dom.Element
-import org.springframework.util.xml.DomUtils
-
-/**
- * Parser trait for custom namespace for Akka dispatcher configuration.
- * @author michaelkober
- */
-trait DispatcherParser extends BeanParser {
- import AkkaSpringConfigurationTags._
-
- /**
- * Parses the given element and returns a DispatcherProperties.
- * @param element dom element to parse
- * @return configuration for the dispatcher
- */
- def parseDispatcher(element: Element): DispatcherProperties = {
- val properties = new DispatcherProperties()
- var dispatcherElement = element
- if (hasRef(element)) {
- val ref = element.getAttribute(REF)
- dispatcherElement = element.getOwnerDocument.getElementById(ref)
- if (dispatcherElement == null) {
- throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'")
- }
- }
-
- properties.dispatcherType = mandatory(dispatcherElement, TYPE)
- if (properties.dispatcherType == THREAD_BASED) {
- val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil
- if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) {
- throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!")
- }
- }
-
- if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher
- properties.name = dispatcherElement.getAttribute(NAME)
- if (dispatcherElement.hasAttribute(AGGREGATE)) {
- properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean
- }
- } else {
- properties.name = mandatory(dispatcherElement, NAME)
- }
-
- val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG);
- if (threadPoolElement != null) {
- if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN ||
- properties.dispatcherType == THREAD_BASED) {
- throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.")
- }
- val threadPoolProperties = parseThreadPool(threadPoolElement)
- properties.threadPool = threadPoolProperties
- }
- properties
- }
-
- /**
- * Parses the given element and returns a ThreadPoolProperties.
- * @param element dom element to parse
- * @return configuration for the thread pool
- */
- def parseThreadPool(element: Element): ThreadPoolProperties = {
- val properties = new ThreadPoolProperties()
- properties.queue = element.getAttribute(QUEUE)
- if (element.hasAttribute(CAPACITY)) {
- properties.capacity = element.getAttribute(CAPACITY).toInt
- }
- if (element.hasAttribute(BOUND)) {
- properties.bound = element.getAttribute(BOUND).toInt
- }
- if (element.hasAttribute(FAIRNESS)) {
- properties.fairness = element.getAttribute(FAIRNESS).toBoolean
- }
- if (element.hasAttribute(CORE_POOL_SIZE)) {
- properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt
- }
- if (element.hasAttribute(MAX_POOL_SIZE)) {
- properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt
- }
- if (element.hasAttribute(KEEP_ALIVE)) {
- properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong
- }
- if (element.hasAttribute(REJECTION_POLICY)) {
- properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY)
- }
- if (element.hasAttribute(MAILBOX_CAPACITY)) {
- properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt
- }
- properties
- }
-
- def hasRef(element: Element): Boolean = {
- val ref = element.getAttribute(REF)
- (ref != null) && !ref.isEmpty
- }
-
-}
diff --git a/akka-spring/src/main/scala/PropertyEntries.scala b/akka-spring/src/main/scala/PropertyEntries.scala
index bf1898a805..9a7dc098de 100644
--- a/akka-spring/src/main/scala/PropertyEntries.scala
+++ b/akka-spring/src/main/scala/PropertyEntries.scala
@@ -18,3 +18,19 @@ class PropertyEntries {
entryList.append(entry)
}
}
+
+/**
+ * Represents a property element
+ * @author Johan Rask
+ */
+class PropertyEntry {
+ var name: String = _
+ var value: String = null
+ var ref: String = null
+
+
+ override def toString(): String = {
+ format("name = %s,value = %s, ref = %s", name, value, ref)
+ }
+}
+
diff --git a/akka-spring/src/main/scala/PropertyEntry.scala b/akka-spring/src/main/scala/PropertyEntry.scala
deleted file mode 100644
index 9fe6357fc0..0000000000
--- a/akka-spring/src/main/scala/PropertyEntry.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-package se.scalablesolutions.akka.spring
-
-/**
- * Represents a property element
- * @author Johan Rask
- */
-class PropertyEntry {
- var name: String = _
- var value: String = null
- var ref: String = null
-
-
- override def toString(): String = {
- format("name = %s,value = %s, ref = %s", name, value, ref)
- }
-}
diff --git a/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala
deleted file mode 100644
index e8e0cef7d4..0000000000
--- a/akka-spring/src/main/scala/TypedActorBeanDefinitionParser.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-package se.scalablesolutions.akka.spring
-
-import org.springframework.beans.factory.support.BeanDefinitionBuilder
-import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
-import org.springframework.beans.factory.xml.ParserContext
-import AkkaSpringConfigurationTags._
-import org.w3c.dom.Element
-
-
-/**
- * Parser for custom namespace configuration.
- * @author michaelkober
- */
-class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
- /*
- * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
- */
- override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
- val typedActorConf = parseActor(element)
- typedActorConf.typed = TYPED_ACTOR_TAG
- typedActorConf.setAsProperties(builder)
- }
-
- /*
- * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
- */
- override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
-}
diff --git a/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala b/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala
deleted file mode 100644
index 752e18559f..0000000000
--- a/akka-spring/src/main/scala/UntypedActorBeanDefinitionParser.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Copyright (C) 2009-2010 Scalable Solutions AB
- */
-package se.scalablesolutions.akka.spring
-
-import org.springframework.beans.factory.support.BeanDefinitionBuilder
-import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
-import org.springframework.beans.factory.xml.ParserContext
-import AkkaSpringConfigurationTags._
-import org.w3c.dom.Element
-
-
-/**
- * Parser for custom namespace configuration.
- * @author michaelkober
- */
-class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
- /*
- * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
- */
- override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
- val untypedActorConf = parseActor(element)
- untypedActorConf.typed = UNTYPED_ACTOR_TAG
- untypedActorConf.setAsProperties(builder)
- }
-
- /*
- * @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
- */
- override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
-}
diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java
index f2c5e24884..5a2a272e6c 100644
--- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java
+++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/IMyPojo.java
@@ -8,14 +8,12 @@ package se.scalablesolutions.akka.spring.foo;
* To change this template use File | Settings | File Templates.
*/
public interface IMyPojo {
+ public void oneWay(String message);
+
public String getFoo();
- public String getBar();
-
- public void preRestart();
-
- public void postRestart();
-
public String longRunning();
+
+
}
diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java
index fe3e9ba767..8f610eef63 100644
--- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java
+++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/MyPojo.java
@@ -1,42 +1,34 @@
package se.scalablesolutions.akka.spring.foo;
-import se.scalablesolutions.akka.actor.*;
+import se.scalablesolutions.akka.actor.TypedActor;
-public class MyPojo extends TypedActor implements IMyPojo{
+import java.util.concurrent.CountDownLatch;
- private String foo;
- private String bar;
+public class MyPojo extends TypedActor implements IMyPojo {
+
+ public static CountDownLatch latch = new CountDownLatch(1);
+ public static String lastOneWayMessage = null;
+ private String foo = "foo";
- public MyPojo() {
- this.foo = "foo";
- this.bar = "bar";
- }
+ public MyPojo() {
+ }
+ public String getFoo() {
+ return foo;
+ }
- public String getFoo() {
- return foo;
- }
+ public void oneWay(String message) {
+ lastOneWayMessage = message;
+ latch.countDown();
+ }
-
- public String getBar() {
- return bar;
- }
-
- public void preRestart() {
- System.out.println("pre restart");
- }
-
- public void postRestart() {
- System.out.println("post restart");
- }
-
- public String longRunning() {
- try {
- Thread.sleep(6000);
- } catch (InterruptedException e) {
- }
- return "this took long";
+ public String longRunning() {
+ try {
+ Thread.sleep(6000);
+ } catch (InterruptedException e) {
}
+ return "this took long";
+ }
}
diff --git a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java
index e447b26a28..3063a1b529 100644
--- a/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java
+++ b/akka-spring/src/test/java/se/scalablesolutions/akka/spring/foo/PingActor.java
@@ -6,6 +6,8 @@ import se.scalablesolutions.akka.actor.ActorRef;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
+import java.util.concurrent.CountDownLatch;
+
/**
* test class
@@ -14,6 +16,9 @@ public class PingActor extends UntypedActor implements ApplicationContextAware {
private String stringFromVal;
private String stringFromRef;
+ public static String lastMessage = null;
+ public static CountDownLatch latch = new CountDownLatch(1);
+
private boolean gotApplicationContext = false;
@@ -42,7 +47,6 @@ public class PingActor extends UntypedActor implements ApplicationContextAware {
stringFromRef = s;
}
-
private String longRunning() {
try {
Thread.sleep(6000);
@@ -53,12 +57,12 @@ public class PingActor extends UntypedActor implements ApplicationContextAware {
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
- System.out.println("Ping received String message: " + message);
+ lastMessage = (String) message;
if (message.equals("longRunning")) {
- System.out.println("### starting pong");
ActorRef pongActor = UntypedActor.actorOf(PongActor.class).start();
pongActor.sendRequestReply("longRunning", getContext());
}
+ latch.countDown();
} else {
throw new IllegalArgumentException("Unknown message: " + message);
}
diff --git a/akka-spring/src/test/resources/server-managed-config.xml b/akka-spring/src/test/resources/server-managed-config.xml
new file mode 100644
index 0000000000..128b16c8b6
--- /dev/null
+++ b/akka-spring/src/test/resources/server-managed-config.xml
@@ -0,0 +1,57 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/akka-spring/src/test/resources/typed-actor-config.xml b/akka-spring/src/test/resources/typed-actor-config.xml
index faca749469..989884e4fa 100644
--- a/akka-spring/src/test/resources/typed-actor-config.xml
+++ b/akka-spring/src/test/resources/typed-actor-config.xml
@@ -37,7 +37,7 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000"
transactional="true">
-
+
-
+
+
+
+ val props = parser.parseActor(dom(xml).getDocumentElement);
+ assert(props != null)
+ assert(props.host === "com.some.host")
+ assert(props.port === 9999)
+ assert(props.serviceName === "my-service")
+ assert(props.serverManaged)
}
}
}
diff --git a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala
index 8767b2e75a..3cdcd17cb0 100644
--- a/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala
+++ b/akka-spring/src/test/scala/TypedActorSpringFeatureTest.scala
@@ -4,10 +4,8 @@
package se.scalablesolutions.akka.spring
-import foo.{IMyPojo, MyPojo}
+import foo.{PingActor, IMyPojo, MyPojo}
import se.scalablesolutions.akka.dispatch.FutureTimeoutException
-import se.scalablesolutions.akka.remote.RemoteNode
-import org.scalatest.FeatureSpec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
@@ -16,13 +14,52 @@ import org.springframework.beans.factory.xml.XmlBeanDefinitionReader
import org.springframework.context.ApplicationContext
import org.springframework.context.support.ClassPathXmlApplicationContext
import org.springframework.core.io.{ClassPathResource, Resource}
+import org.scalatest.{BeforeAndAfterAll, FeatureSpec}
+import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer, RemoteNode}
+import java.util.concurrent.CountDownLatch
+import se.scalablesolutions.akka.actor.{TypedActor, RemoteTypedActorOne, Actor}
+import se.scalablesolutions.akka.actor.remote.RemoteTypedActorOneImpl
/**
* Tests for spring configuration of typed actors.
* @author michaelkober
*/
@RunWith(classOf[JUnitRunner])
-class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
+class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll {
+
+ var server1: RemoteServer = null
+ var server2: RemoteServer = null
+
+ override def beforeAll = {
+ val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed
+ server1 = new RemoteServer()
+ server1.start("localhost", 9990)
+ server2 = new RemoteServer()
+ server2.start("localhost", 9992)
+
+ val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
+ server1.registerTypedActor("typed-actor-service", typedActor)
+ }
+
+ // make sure the servers shutdown cleanly after the test has finished
+ override def afterAll = {
+ try {
+ server1.shutdown
+ server2.shutdown
+ RemoteClient.shutdownAll
+ Thread.sleep(1000)
+ } catch {
+ case e => ()
+ }
+ }
+
+ def getTypedActorFromContext(config: String, id: String) : IMyPojo = {
+ MyPojo.latch = new CountDownLatch(1)
+ val context = new ClassPathXmlApplicationContext(config)
+ val myPojo: IMyPojo = context.getBean(id).asInstanceOf[IMyPojo]
+ myPojo
+ }
+
feature("parse Spring application context") {
scenario("akka:typed-actor and akka:supervision and akka:dispatcher can be used as top level elements") {
@@ -37,41 +74,79 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
}
scenario("get a typed actor") {
- val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
- val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo]
- var msg = myPojo.getFoo()
- msg += myPojo.getBar()
- assert(msg === "foobar")
+ val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor")
+ assert(myPojo.getFoo() === "foo")
+ myPojo.oneWay("hello 1")
+ MyPojo.latch.await
+ assert(MyPojo.lastOneWayMessage === "hello 1")
}
scenario("FutureTimeoutException when timed out") {
- val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
- val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo]
+ val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor")
evaluating {myPojo.longRunning()} should produce[FutureTimeoutException]
-
}
scenario("typed-actor with timeout") {
- val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
- val myPojo = context.getBean("simple-typed-actor-long-timeout").asInstanceOf[IMyPojo]
+ val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor-long-timeout")
assert(myPojo.longRunning() === "this took long");
}
scenario("transactional typed-actor") {
- val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
- val myPojo = context.getBean("transactional-typed-actor").asInstanceOf[IMyPojo]
- var msg = myPojo.getFoo()
- msg += myPojo.getBar()
- assert(msg === "foobar")
+ val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "transactional-typed-actor")
+ assert(myPojo.getFoo() === "foo")
+ myPojo.oneWay("hello 2")
+ MyPojo.latch.await
+ assert(MyPojo.lastOneWayMessage === "hello 2")
}
scenario("get a remote typed-actor") {
- RemoteNode.start
- Thread.sleep(1000)
- val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
- val myPojo = context.getBean("remote-typed-actor").asInstanceOf[IMyPojo]
- assert(myPojo.getFoo === "foo")
+ val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "remote-typed-actor")
+ assert(myPojo.getFoo() === "foo")
+ myPojo.oneWay("hello 3")
+ MyPojo.latch.await
+ assert(MyPojo.lastOneWayMessage === "hello 3")
}
+
+ scenario("get a client-managed-remote-typed-actor") {
+ val myPojo = getTypedActorFromContext("/server-managed-config.xml", "client-managed-remote-typed-actor")
+ assert(myPojo.getFoo() === "foo")
+ myPojo.oneWay("hello client-managed-remote-typed-actor")
+ MyPojo.latch.await
+ assert(MyPojo.lastOneWayMessage === "hello client-managed-remote-typed-actor")
+ }
+
+ scenario("get a server-managed-remote-typed-actor") {
+ val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor")
+ //
+ val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], classOf[IMyPojo].getName, 5000L, "localhost", 9990)
+ assert(myPojoProxy.getFoo() === "foo")
+ myPojoProxy.oneWay("hello server-managed-remote-typed-actor")
+ MyPojo.latch.await
+ assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor")
+ }
+
+ scenario("get a server-managed-remote-typed-actor-custom-id") {
+ val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor-custom-id")
+ //
+ val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], "mypojo-service", 5000L, "localhost", 9990)
+ assert(myPojoProxy.getFoo() === "foo")
+ myPojoProxy.oneWay("hello server-managed-remote-typed-actor 2")
+ MyPojo.latch.await
+ assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor 2")
+ }
+
+ scenario("get a client proxy for server-managed-remote-typed-actor") {
+ MyPojo.latch = new CountDownLatch(1)
+ val context = new ClassPathXmlApplicationContext("/server-managed-config.xml")
+ val myPojo: IMyPojo = context.getBean("server-managed-remote-typed-actor-custom-id").asInstanceOf[IMyPojo]
+ // get client proxy from spring context
+ val myPojoProxy = context.getBean("typed-client-1").asInstanceOf[IMyPojo]
+ assert(myPojoProxy.getFoo() === "foo")
+ myPojoProxy.oneWay("hello")
+ MyPojo.latch.await
+ }
+
+
}
}
diff --git a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala
index cf7d8d9805..0397d30bf0 100644
--- a/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala
+++ b/akka-spring/src/test/scala/UntypedActorSpringFeatureTest.scala
@@ -6,74 +6,146 @@ package se.scalablesolutions.akka.spring
import foo.PingActor
import se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
-import se.scalablesolutions.akka.remote.RemoteNode
-import se.scalablesolutions.akka.actor.ActorRef
-import org.scalatest.FeatureSpec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
-import org.springframework.context.ApplicationContext
import org.springframework.context.support.ClassPathXmlApplicationContext
+import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
+import org.scalatest.{BeforeAndAfterAll, FeatureSpec}
+import java.util.concurrent.CountDownLatch
+import se.scalablesolutions.akka.actor.{RemoteActorRef, ActorRegistry, Actor, ActorRef}
/**
* Tests for spring configuration of typed actors.
* @author michaelkober
*/
@RunWith(classOf[JUnitRunner])
-class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
+class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll {
+
+ var server1: RemoteServer = null
+ var server2: RemoteServer = null
+
+
+ override def beforeAll = {
+ val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed
+ server1 = new RemoteServer()
+ server1.start("localhost", 9990)
+ server2 = new RemoteServer()
+ server2.start("localhost", 9992)
+ }
+
+ // make sure the servers shutdown cleanly after the test has finished
+ override def afterAll = {
+ try {
+ server1.shutdown
+ server2.shutdown
+ RemoteClient.shutdownAll
+ Thread.sleep(1000)
+ } catch {
+ case e => ()
+ }
+ }
+
+
+ def getPingActorFromContext(config: String, id: String) : ActorRef = {
+ PingActor.latch = new CountDownLatch(1)
+ val context = new ClassPathXmlApplicationContext(config)
+ val pingActor = context.getBean(id).asInstanceOf[ActorRef]
+ assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
+ pingActor.start()
+ }
+
+
feature("parse Spring application context") {
scenario("get a untyped actor") {
- val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
- val myactor = context.getBean("simple-untyped-actor").asInstanceOf[ActorRef]
- assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
- myactor.start()
+ val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor")
myactor.sendOneWay("Hello")
+ PingActor.latch.await
+ assert(PingActor.lastMessage === "Hello")
assert(myactor.isDefinedAt("some string message"))
}
scenario("untyped-actor with timeout") {
- val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
- val myactor = context.getBean("simple-untyped-actor-long-timeout").asInstanceOf[ActorRef]
- assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
- myactor.start()
- myactor.sendOneWay("Hello")
+ val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor-long-timeout")
assert(myactor.getTimeout() === 10000)
+ myactor.sendOneWay("Hello 2")
+ PingActor.latch.await
+ assert(PingActor.lastMessage === "Hello 2")
}
scenario("transactional untyped-actor") {
- val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
- val myactor = context.getBean("transactional-untyped-actor").asInstanceOf[ActorRef]
- assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
- myactor.start()
- myactor.sendOneWay("Hello")
- assert(myactor.isDefinedAt("some string message"))
+ val myactor = getPingActorFromContext("/untyped-actor-config.xml", "transactional-untyped-actor")
+ myactor.sendOneWay("Hello 3")
+ PingActor.latch.await
+ assert(PingActor.lastMessage === "Hello 3")
}
scenario("get a remote typed-actor") {
- RemoteNode.start
- Thread.sleep(1000)
- val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
- val myactor = context.getBean("remote-untyped-actor").asInstanceOf[ActorRef]
- assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
- myactor.start()
- myactor.sendOneWay("Hello")
- assert(myactor.isDefinedAt("some string message"))
+ val myactor = getPingActorFromContext("/untyped-actor-config.xml", "remote-untyped-actor")
+ myactor.sendOneWay("Hello 4")
assert(myactor.getRemoteAddress().isDefined)
assert(myactor.getRemoteAddress().get.getHostName() === "localhost")
- assert(myactor.getRemoteAddress().get.getPort() === 9999)
+ assert(myactor.getRemoteAddress().get.getPort() === 9992)
+ PingActor.latch.await
+ assert(PingActor.lastMessage === "Hello 4")
}
scenario("untyped-actor with custom dispatcher") {
- val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
- val myactor = context.getBean("untyped-actor-with-dispatcher").asInstanceOf[ActorRef]
- assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
- myactor.start()
- myactor.sendOneWay("Hello")
+ val myactor = getPingActorFromContext("/untyped-actor-config.xml", "untyped-actor-with-dispatcher")
assert(myactor.getTimeout() === 1000)
assert(myactor.getDispatcher.isInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher])
+ myactor.sendOneWay("Hello 5")
+ PingActor.latch.await
+ assert(PingActor.lastMessage === "Hello 5")
}
+
+ scenario("create client managed remote untyped-actor") {
+ val myactor = getPingActorFromContext("/server-managed-config.xml", "client-managed-remote-untyped-actor")
+ myactor.sendOneWay("Hello client managed remote untyped-actor")
+ PingActor.latch.await
+ assert(PingActor.lastMessage === "Hello client managed remote untyped-actor")
+ assert(myactor.getRemoteAddress().isDefined)
+ assert(myactor.getRemoteAddress().get.getHostName() === "localhost")
+ assert(myactor.getRemoteAddress().get.getPort() === 9990)
+ }
+
+ scenario("create server managed remote untyped-actor") {
+ val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor")
+ val nrOfActors = ActorRegistry.actors.length
+ val actorRef = RemoteClient.actorFor("se.scalablesolutions.akka.spring.foo.PingActor", "localhost", 9990)
+ actorRef.sendOneWay("Hello server managed remote untyped-actor")
+ PingActor.latch.await
+ assert(PingActor.lastMessage === "Hello server managed remote untyped-actor")
+ assert(ActorRegistry.actors.length === nrOfActors)
+ }
+
+ scenario("create server managed remote untyped-actor with custom service id") {
+ val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor-custom-id")
+ val nrOfActors = ActorRegistry.actors.length
+ val actorRef = RemoteClient.actorFor("ping-service", "localhost", 9990)
+ actorRef.sendOneWay("Hello server managed remote untyped-actor")
+ PingActor.latch.await
+ assert(PingActor.lastMessage === "Hello server managed remote untyped-actor")
+ assert(ActorRegistry.actors.length === nrOfActors)
+ }
+
+ scenario("get client actor for server managed remote untyped-actor") {
+ PingActor.latch = new CountDownLatch(1)
+ val context = new ClassPathXmlApplicationContext("/server-managed-config.xml")
+ val pingActor = context.getBean("server-managed-remote-untyped-actor-custom-id").asInstanceOf[ActorRef]
+ assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
+ pingActor.start()
+ val nrOfActors = ActorRegistry.actors.length
+ // get client actor ref from spring context
+ val actorRef = context.getBean("client-1").asInstanceOf[ActorRef]
+ assert(actorRef.isInstanceOf[RemoteActorRef])
+ actorRef.sendOneWay("Hello")
+ PingActor.latch.await
+ assert(ActorRegistry.actors.length === nrOfActors)
+ }
+
}
}
diff --git a/akka-typed-actor/src/main/scala/actor/TypedActor.scala b/akka-typed-actor/src/main/scala/actor/TypedActor.scala
index 385c1831a4..7d393070ec 100644
--- a/akka-typed-actor/src/main/scala/actor/TypedActor.scala
+++ b/akka-typed-actor/src/main/scala/actor/TypedActor.scala
@@ -389,7 +389,8 @@ object TypedActor extends Logging {
typedActor.initialize(proxy)
if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get
if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef)
- AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, None, config.timeout))
+ if (config._host.isDefined) actorRef.makeRemote(config._host.get)
+ AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout))
actorRef.start
proxy.asInstanceOf[T]
}
diff --git a/akka-typed-actor/src/test/resources/META-INF/aop.xml b/akka-typed-actor/src/test/resources/META-INF/aop.xml
index bdc167ca54..be133a51b8 100644
--- a/akka-typed-actor/src/test/resources/META-INF/aop.xml
+++ b/akka-typed-actor/src/test/resources/META-INF/aop.xml
@@ -2,6 +2,7 @@
+