closing ticket 378
This commit is contained in:
parent
8886de10bb
commit
bfb612908b
25 changed files with 741 additions and 349 deletions
|
|
@ -271,6 +271,11 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Register typed actor by interface name.
|
||||
*/
|
||||
def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor)
|
||||
|
||||
/**
|
||||
* Register remote typed actor by a specific id.
|
||||
* @param id custom actor id
|
||||
|
|
|
|||
|
|
@ -66,6 +66,14 @@
|
|||
</xsd:restriction>
|
||||
</xsd:simpleType>
|
||||
|
||||
<!-- management type for remote actors: client managed / server managed -->
|
||||
<xsd:simpleType name="managed-by-type">
|
||||
<xsd:restriction base="xsd:token">
|
||||
<xsd:enumeration value="client"/>
|
||||
<xsd:enumeration value="server"/>
|
||||
</xsd:restriction>
|
||||
</xsd:simpleType>
|
||||
|
||||
|
||||
<!-- dispatcher type -->
|
||||
<xsd:complexType name="dispatcher-type">
|
||||
|
|
@ -107,6 +115,20 @@
|
|||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="managed-by" type="managed-by-type">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Management type for remote actors: client managed or server managed.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="service-name" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Custom service name for server managed actor.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
</xsd:complexType>
|
||||
|
||||
<!-- typed actor -->
|
||||
|
|
@ -135,7 +157,7 @@
|
|||
<xsd:attribute name="timeout" type="xsd:long" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Theh default timeout for '!!' invocations.
|
||||
The default timeout for '!!' invocations.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
|
|
@ -229,6 +251,41 @@
|
|||
</xsd:choice>
|
||||
</xsd:complexType>
|
||||
|
||||
<!-- actor-for -->
|
||||
<!-- typed actor -->
|
||||
<xsd:complexType name="actor-for-type">
|
||||
<xsd:attribute name="id" type="xsd:ID"/>
|
||||
<xsd:attribute name="host" type="xsd:string" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Name of the remote host.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="port" type="xsd:integer" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Port of the remote host.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="service-name" type="xsd:string" use="required">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Custom service name or class name for the server managed actor.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
<xsd:attribute name="interface" type="xsd:string">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
Name of the interface the typed actor implements.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:attribute>
|
||||
|
||||
</xsd:complexType>
|
||||
|
||||
<!-- Supervisor strategy -->
|
||||
<xsd:complexType name="strategy-type">
|
||||
<xsd:sequence>
|
||||
|
|
@ -294,4 +351,7 @@
|
|||
<!-- CamelService -->
|
||||
<xsd:element name="camel-service" type="camel-service-type"/>
|
||||
|
||||
<!-- ActorFor -->
|
||||
<xsd:element name="actor-for" type="actor-for-type"/>
|
||||
|
||||
</xsd:schema>
|
||||
|
|
|
|||
72
akka-spring/src/main/scala/ActorBeanDefinitionParser.scala
Normal file
72
akka-spring/src/main/scala/ActorBeanDefinitionParser.scala
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
package se.scalablesolutions.akka.spring
|
||||
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
||||
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
|
||||
import org.springframework.beans.factory.xml.ParserContext
|
||||
import AkkaSpringConfigurationTags._
|
||||
import org.w3c.dom.Element
|
||||
|
||||
|
||||
/**
|
||||
* Parser for custom namespace configuration.
|
||||
* @author michaelkober
|
||||
*/
|
||||
class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
|
||||
/*
|
||||
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
|
||||
*/
|
||||
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
|
||||
val typedActorConf = parseActor(element)
|
||||
typedActorConf.typed = TYPED_ACTOR_TAG
|
||||
typedActorConf.setAsProperties(builder)
|
||||
}
|
||||
|
||||
/*
|
||||
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
|
||||
*/
|
||||
override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Parser for custom namespace configuration.
|
||||
* @author michaelkober
|
||||
*/
|
||||
class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
|
||||
/*
|
||||
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
|
||||
*/
|
||||
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
|
||||
val untypedActorConf = parseActor(element)
|
||||
untypedActorConf.typed = UNTYPED_ACTOR_TAG
|
||||
untypedActorConf.setAsProperties(builder)
|
||||
}
|
||||
|
||||
/*
|
||||
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
|
||||
*/
|
||||
override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Parser for custom namespace configuration.
|
||||
* @author michaelkober
|
||||
*/
|
||||
class ActorForBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorForParser {
|
||||
/*
|
||||
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
|
||||
*/
|
||||
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
|
||||
val actorForConf = parseActorFor(element)
|
||||
actorForConf.setAsProperties(builder)
|
||||
}
|
||||
|
||||
/*
|
||||
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
|
||||
*/
|
||||
override def getBeanClass(element: Element): Class[_] = classOf[ActorForFactoryBean]
|
||||
}
|
||||
|
|
@ -4,22 +4,19 @@
|
|||
|
||||
package se.scalablesolutions.akka.spring
|
||||
|
||||
import java.beans.PropertyDescriptor
|
||||
import java.lang.reflect.Method
|
||||
import javax.annotation.PreDestroy
|
||||
import javax.annotation.PostConstruct
|
||||
|
||||
import org.springframework.beans.{BeanUtils,BeansException,BeanWrapper,BeanWrapperImpl}
|
||||
import org.springframework.beans.factory.BeanFactory
|
||||
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
|
||||
//import org.springframework.beans.factory.BeanFactory
|
||||
import org.springframework.beans.factory.config.AbstractFactoryBean
|
||||
import org.springframework.context.{ApplicationContext,ApplicationContextAware}
|
||||
import org.springframework.util.ReflectionUtils
|
||||
//import org.springframework.util.ReflectionUtils
|
||||
import org.springframework.util.StringUtils
|
||||
|
||||
import se.scalablesolutions.akka.actor.{ActorRef, AspectInitRegistry, TypedActorConfiguration, TypedActor,Actor}
|
||||
import se.scalablesolutions.akka.dispatch.MessageDispatcher
|
||||
import se.scalablesolutions.akka.util.{Logging, Duration}
|
||||
import scala.reflect.BeanProperty
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
/**
|
||||
* Exception to use when something goes wrong during bean creation.
|
||||
|
|
@ -49,6 +46,8 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
|
|||
@BeanProperty var transactional: Boolean = false
|
||||
@BeanProperty var host: String = ""
|
||||
@BeanProperty var port: Int = _
|
||||
@BeanProperty var serverManaged: Boolean = false
|
||||
@BeanProperty var serviceName: String = ""
|
||||
@BeanProperty var lifecycle: String = ""
|
||||
@BeanProperty var dispatcher: DispatcherProperties = _
|
||||
@BeanProperty var scope: String = VAL_SCOPE_SINGLETON
|
||||
|
|
@ -94,7 +93,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
|
|||
if (implementation == null || implementation == "") throw new AkkaBeansException(
|
||||
"The 'implementation' part of the 'akka:typed-actor' element in the Spring config file can't be null or empty string")
|
||||
|
||||
TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig)
|
||||
val typedActor: AnyRef = TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig)
|
||||
if (isRemote && serverManaged) {
|
||||
val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port))
|
||||
if (serviceName.isEmpty) {
|
||||
server.registerTypedActor(interface, typedActor)
|
||||
} else {
|
||||
server.registerTypedActor(serviceName, typedActor)
|
||||
}
|
||||
}
|
||||
typedActor
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -111,8 +119,17 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
|
|||
actorRef.makeTransactionRequired
|
||||
}
|
||||
if (isRemote) {
|
||||
if (serverManaged) {
|
||||
val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port))
|
||||
if (serviceName.isEmpty) {
|
||||
server.register(actorRef)
|
||||
} else {
|
||||
server.register(serviceName, actorRef)
|
||||
}
|
||||
} else {
|
||||
actorRef.makeRemote(host, port)
|
||||
}
|
||||
}
|
||||
if (hasDispatcher) {
|
||||
if (dispatcher.dispatcherType != THREAD_BASED){
|
||||
actorRef.setDispatcher(dispatcherInstance())
|
||||
|
|
@ -159,7 +176,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
|
|||
private[akka] def createConfig: TypedActorConfiguration = {
|
||||
val config = new TypedActorConfiguration().timeout(Duration(timeout, "millis"))
|
||||
if (transactional) config.makeTransactionRequired
|
||||
if (isRemote) config.makeRemote(host, port)
|
||||
if (isRemote && !serverManaged) config.makeRemote(host, port)
|
||||
if (hasDispatcher) {
|
||||
if (dispatcher.dispatcherType != THREAD_BASED) {
|
||||
config.dispatcher(dispatcherInstance())
|
||||
|
|
@ -191,3 +208,39 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Factory bean for remote client actor-for.
|
||||
*
|
||||
* @author michaelkober
|
||||
*/
|
||||
class ActorForFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware {
|
||||
import StringReflect._
|
||||
import AkkaSpringConfigurationTags._
|
||||
|
||||
@BeanProperty var interface: String = ""
|
||||
@BeanProperty var host: String = ""
|
||||
@BeanProperty var port: Int = _
|
||||
@BeanProperty var serviceName: String = ""
|
||||
//@BeanProperty var scope: String = VAL_SCOPE_SINGLETON
|
||||
@BeanProperty var applicationContext: ApplicationContext = _
|
||||
|
||||
override def isSingleton = false
|
||||
|
||||
/*
|
||||
* @see org.springframework.beans.factory.FactoryBean#getObjectType()
|
||||
*/
|
||||
def getObjectType: Class[AnyRef] = classOf[AnyRef]
|
||||
|
||||
/*
|
||||
* @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance()
|
||||
*/
|
||||
def createInstance: AnyRef = {
|
||||
if (interface.isEmpty) {
|
||||
RemoteClient.actorFor(serviceName, host, port)
|
||||
} else {
|
||||
RemoteClient.typedActorFor(interface.toClass, serviceName, host, port)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring
|
|||
import org.springframework.util.xml.DomUtils
|
||||
import org.w3c.dom.Element
|
||||
import scala.collection.JavaConversions._
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
|
||||
import se.scalablesolutions.akka.actor.IllegalActorStateException
|
||||
|
||||
|
|
@ -27,11 +28,17 @@ trait ActorParser extends BeanParser with DispatcherParser {
|
|||
val objectProperties = new ActorProperties()
|
||||
val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG);
|
||||
val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG)
|
||||
val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG)
|
||||
val propertyEntries = DomUtils.getChildElementsByTagName(element, PROPERTYENTRY_TAG)
|
||||
|
||||
if (remoteElement != null) {
|
||||
objectProperties.host = mandatory(remoteElement, HOST)
|
||||
objectProperties.port = mandatory(remoteElement, PORT).toInt
|
||||
objectProperties.serverManaged = (remoteElement.getAttribute(MANAGED_BY) != null) && (remoteElement.getAttribute(MANAGED_BY).equals(SERVER_MANAGED))
|
||||
val serviceName = remoteElement.getAttribute(SERVICE_NAME)
|
||||
if ((serviceName != null) && (!serviceName.isEmpty)) {
|
||||
objectProperties.serviceName = serviceName
|
||||
objectProperties.serverManaged = true
|
||||
}
|
||||
}
|
||||
|
||||
if (dispatcherElement != null) {
|
||||
|
|
@ -59,15 +66,13 @@ trait ActorParser extends BeanParser with DispatcherParser {
|
|||
objectProperties.target = mandatory(element, IMPLEMENTATION)
|
||||
objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean
|
||||
|
||||
if (!element.getAttribute(INTERFACE).isEmpty) {
|
||||
if (element.hasAttribute(INTERFACE)) {
|
||||
objectProperties.interface = element.getAttribute(INTERFACE)
|
||||
}
|
||||
|
||||
if (!element.getAttribute(LIFECYCLE).isEmpty) {
|
||||
if (element.hasAttribute(LIFECYCLE)) {
|
||||
objectProperties.lifecycle = element.getAttribute(LIFECYCLE)
|
||||
}
|
||||
|
||||
if (!element.getAttribute(SCOPE).isEmpty) {
|
||||
if (element.hasAttribute(SCOPE)) {
|
||||
objectProperties.scope = element.getAttribute(SCOPE)
|
||||
}
|
||||
|
||||
|
|
@ -75,3 +80,159 @@ trait ActorParser extends BeanParser with DispatcherParser {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Parser trait for custom namespace configuration for RemoteClient actor-for.
|
||||
* @author michaelkober
|
||||
*/
|
||||
trait ActorForParser extends BeanParser {
|
||||
import AkkaSpringConfigurationTags._
|
||||
|
||||
/**
|
||||
* Parses the given element and returns a ActorForProperties.
|
||||
* @param element dom element to parse
|
||||
* @return configuration for the typed actor
|
||||
*/
|
||||
def parseActorFor(element: Element): ActorForProperties = {
|
||||
val objectProperties = new ActorForProperties()
|
||||
|
||||
objectProperties.host = mandatory(element, HOST)
|
||||
objectProperties.port = mandatory(element, PORT).toInt
|
||||
objectProperties.serviceName = mandatory(element, SERVICE_NAME)
|
||||
if (element.hasAttribute(INTERFACE)) {
|
||||
objectProperties.interface = element.getAttribute(INTERFACE)
|
||||
}
|
||||
objectProperties
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Base trait with utility methods for bean parsing.
|
||||
*/
|
||||
trait BeanParser extends Logging {
|
||||
|
||||
/**
|
||||
* Get a mandatory element attribute.
|
||||
* @param element the element with the mandatory attribute
|
||||
* @param attribute name of the mandatory attribute
|
||||
*/
|
||||
def mandatory(element: Element, attribute: String): String = {
|
||||
if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) {
|
||||
throw new IllegalArgumentException("Mandatory attribute missing: " + attribute)
|
||||
} else {
|
||||
element.getAttribute(attribute)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a mandatory child element.
|
||||
* @param element the parent element
|
||||
* @param childName name of the mandatory child element
|
||||
*/
|
||||
def mandatoryElement(element: Element, childName: String): Element = {
|
||||
val childElement = DomUtils.getChildElementByTagName(element, childName);
|
||||
if (childElement == null) {
|
||||
throw new IllegalArgumentException("Mandatory element missing: '<akka:" + childName + ">'")
|
||||
} else {
|
||||
childElement
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Parser trait for custom namespace for Akka dispatcher configuration.
|
||||
* @author michaelkober
|
||||
*/
|
||||
trait DispatcherParser extends BeanParser {
|
||||
import AkkaSpringConfigurationTags._
|
||||
|
||||
/**
|
||||
* Parses the given element and returns a DispatcherProperties.
|
||||
* @param element dom element to parse
|
||||
* @return configuration for the dispatcher
|
||||
*/
|
||||
def parseDispatcher(element: Element): DispatcherProperties = {
|
||||
val properties = new DispatcherProperties()
|
||||
var dispatcherElement = element
|
||||
if (hasRef(element)) {
|
||||
val ref = element.getAttribute(REF)
|
||||
dispatcherElement = element.getOwnerDocument.getElementById(ref)
|
||||
if (dispatcherElement == null) {
|
||||
throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'")
|
||||
}
|
||||
}
|
||||
|
||||
properties.dispatcherType = mandatory(dispatcherElement, TYPE)
|
||||
if (properties.dispatcherType == THREAD_BASED) {
|
||||
val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil
|
||||
if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) {
|
||||
throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!")
|
||||
}
|
||||
}
|
||||
|
||||
if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher
|
||||
properties.name = dispatcherElement.getAttribute(NAME)
|
||||
if (dispatcherElement.hasAttribute(AGGREGATE)) {
|
||||
properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean
|
||||
}
|
||||
} else {
|
||||
properties.name = mandatory(dispatcherElement, NAME)
|
||||
}
|
||||
|
||||
val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG);
|
||||
if (threadPoolElement != null) {
|
||||
if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN ||
|
||||
properties.dispatcherType == THREAD_BASED) {
|
||||
throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.")
|
||||
}
|
||||
val threadPoolProperties = parseThreadPool(threadPoolElement)
|
||||
properties.threadPool = threadPoolProperties
|
||||
}
|
||||
properties
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the given element and returns a ThreadPoolProperties.
|
||||
* @param element dom element to parse
|
||||
* @return configuration for the thread pool
|
||||
*/
|
||||
def parseThreadPool(element: Element): ThreadPoolProperties = {
|
||||
val properties = new ThreadPoolProperties()
|
||||
properties.queue = element.getAttribute(QUEUE)
|
||||
if (element.hasAttribute(CAPACITY)) {
|
||||
properties.capacity = element.getAttribute(CAPACITY).toInt
|
||||
}
|
||||
if (element.hasAttribute(BOUND)) {
|
||||
properties.bound = element.getAttribute(BOUND).toInt
|
||||
}
|
||||
if (element.hasAttribute(FAIRNESS)) {
|
||||
properties.fairness = element.getAttribute(FAIRNESS).toBoolean
|
||||
}
|
||||
if (element.hasAttribute(CORE_POOL_SIZE)) {
|
||||
properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt
|
||||
}
|
||||
if (element.hasAttribute(MAX_POOL_SIZE)) {
|
||||
properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt
|
||||
}
|
||||
if (element.hasAttribute(KEEP_ALIVE)) {
|
||||
properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong
|
||||
}
|
||||
if (element.hasAttribute(REJECTION_POLICY)) {
|
||||
properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY)
|
||||
}
|
||||
if (element.hasAttribute(MAILBOX_CAPACITY)) {
|
||||
properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt
|
||||
}
|
||||
properties
|
||||
}
|
||||
|
||||
def hasRef(element: Element): Boolean = {
|
||||
val ref = element.getAttribute(REF)
|
||||
(ref != null) && !ref.isEmpty
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
|||
import AkkaSpringConfigurationTags._
|
||||
|
||||
/**
|
||||
* Data container for typed actor configuration data.
|
||||
* Data container for actor configuration data.
|
||||
* @author michaelkober
|
||||
* @author Martin Krasser
|
||||
*/
|
||||
|
|
@ -20,6 +20,8 @@ class ActorProperties {
|
|||
var transactional: Boolean = false
|
||||
var host: String = ""
|
||||
var port: Int = _
|
||||
var serverManaged: Boolean = false
|
||||
var serviceName: String = ""
|
||||
var lifecycle: String = ""
|
||||
var scope:String = VAL_SCOPE_SINGLETON
|
||||
var dispatcher: DispatcherProperties = _
|
||||
|
|
@ -34,6 +36,8 @@ class ActorProperties {
|
|||
builder.addPropertyValue("typed", typed)
|
||||
builder.addPropertyValue(HOST, host)
|
||||
builder.addPropertyValue(PORT, port)
|
||||
builder.addPropertyValue("serverManaged", serverManaged)
|
||||
builder.addPropertyValue("serviceName", serviceName)
|
||||
builder.addPropertyValue(TIMEOUT, timeout)
|
||||
builder.addPropertyValue(IMPLEMENTATION, target)
|
||||
builder.addPropertyValue(INTERFACE, interface)
|
||||
|
|
@ -45,3 +49,26 @@ class ActorProperties {
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Data container for actor configuration data.
|
||||
* @author michaelkober
|
||||
*/
|
||||
class ActorForProperties {
|
||||
var interface: String = ""
|
||||
var host: String = ""
|
||||
var port: Int = _
|
||||
var serviceName: String = ""
|
||||
|
||||
/**
|
||||
* Sets the properties to the given builder.
|
||||
* @param builder bean definition builder
|
||||
*/
|
||||
def setAsProperties(builder: BeanDefinitionBuilder) {
|
||||
builder.addPropertyValue(HOST, host)
|
||||
builder.addPropertyValue(PORT, port)
|
||||
builder.addPropertyValue("serviceName", serviceName)
|
||||
builder.addPropertyValue(INTERFACE, interface)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,10 +12,11 @@ import AkkaSpringConfigurationTags._
|
|||
*/
|
||||
class AkkaNamespaceHandler extends NamespaceHandlerSupport {
|
||||
def init = {
|
||||
registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser());
|
||||
registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser());
|
||||
registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser());
|
||||
registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser());
|
||||
registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser);
|
||||
registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser())
|
||||
registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser())
|
||||
registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser())
|
||||
registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser())
|
||||
registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser)
|
||||
registerBeanDefinitionParser(ACTOR_FOR_TAG, new ActorForBeanDefinitionParser());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ object AkkaSpringConfigurationTags {
|
|||
val DISPATCHER_TAG = "dispatcher"
|
||||
val PROPERTYENTRY_TAG = "property"
|
||||
val CAMEL_SERVICE_TAG = "camel-service"
|
||||
val ACTOR_FOR_TAG = "actor-for"
|
||||
|
||||
// actor sub tags
|
||||
val REMOTE_TAG = "remote"
|
||||
|
|
@ -45,6 +46,8 @@ object AkkaSpringConfigurationTags {
|
|||
val TRANSACTIONAL = "transactional"
|
||||
val HOST = "host"
|
||||
val PORT = "port"
|
||||
val MANAGED_BY = "managed-by"
|
||||
val SERVICE_NAME = "service-name"
|
||||
val LIFECYCLE = "lifecycle"
|
||||
val SCOPE = "scope"
|
||||
|
||||
|
|
@ -103,4 +106,8 @@ object AkkaSpringConfigurationTags {
|
|||
val THREAD_BASED = "thread-based"
|
||||
val HAWT = "hawt"
|
||||
|
||||
// managed by types
|
||||
val SERVER_MANAGED = "server"
|
||||
val CLIENT_MANAGED = "client"
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,42 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
package se.scalablesolutions.akka.spring
|
||||
|
||||
import se.scalablesolutions.akka.util.Logging
|
||||
import org.w3c.dom.Element
|
||||
import org.springframework.util.xml.DomUtils
|
||||
|
||||
/**
|
||||
* Base trait with utility methods for bean parsing.
|
||||
*/
|
||||
trait BeanParser extends Logging {
|
||||
|
||||
/**
|
||||
* Get a mandatory element attribute.
|
||||
* @param element the element with the mandatory attribute
|
||||
* @param attribute name of the mandatory attribute
|
||||
*/
|
||||
def mandatory(element: Element, attribute: String): String = {
|
||||
if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) {
|
||||
throw new IllegalArgumentException("Mandatory attribute missing: " + attribute)
|
||||
} else {
|
||||
element.getAttribute(attribute)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a mandatory child element.
|
||||
* @param element the parent element
|
||||
* @param childName name of the mandatory child element
|
||||
*/
|
||||
def mandatoryElement(element: Element, childName: String): Element = {
|
||||
val childElement = DomUtils.getChildElementByTagName(element, childName);
|
||||
if (childElement == null) {
|
||||
throw new IllegalArgumentException("Mandatory element missing: '<akka:" + childName + ">'")
|
||||
} else {
|
||||
childElement
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,101 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
package se.scalablesolutions.akka.spring
|
||||
|
||||
import org.w3c.dom.Element
|
||||
import org.springframework.util.xml.DomUtils
|
||||
|
||||
/**
|
||||
* Parser trait for custom namespace for Akka dispatcher configuration.
|
||||
* @author michaelkober
|
||||
*/
|
||||
trait DispatcherParser extends BeanParser {
|
||||
import AkkaSpringConfigurationTags._
|
||||
|
||||
/**
|
||||
* Parses the given element and returns a DispatcherProperties.
|
||||
* @param element dom element to parse
|
||||
* @return configuration for the dispatcher
|
||||
*/
|
||||
def parseDispatcher(element: Element): DispatcherProperties = {
|
||||
val properties = new DispatcherProperties()
|
||||
var dispatcherElement = element
|
||||
if (hasRef(element)) {
|
||||
val ref = element.getAttribute(REF)
|
||||
dispatcherElement = element.getOwnerDocument.getElementById(ref)
|
||||
if (dispatcherElement == null) {
|
||||
throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'")
|
||||
}
|
||||
}
|
||||
|
||||
properties.dispatcherType = mandatory(dispatcherElement, TYPE)
|
||||
if (properties.dispatcherType == THREAD_BASED) {
|
||||
val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil
|
||||
if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) {
|
||||
throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!")
|
||||
}
|
||||
}
|
||||
|
||||
if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher
|
||||
properties.name = dispatcherElement.getAttribute(NAME)
|
||||
if (dispatcherElement.hasAttribute(AGGREGATE)) {
|
||||
properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean
|
||||
}
|
||||
} else {
|
||||
properties.name = mandatory(dispatcherElement, NAME)
|
||||
}
|
||||
|
||||
val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG);
|
||||
if (threadPoolElement != null) {
|
||||
if (properties.dispatcherType == REACTOR_BASED_SINGLE_THREAD_EVENT_DRIVEN ||
|
||||
properties.dispatcherType == THREAD_BASED) {
|
||||
throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.")
|
||||
}
|
||||
val threadPoolProperties = parseThreadPool(threadPoolElement)
|
||||
properties.threadPool = threadPoolProperties
|
||||
}
|
||||
properties
|
||||
}
|
||||
|
||||
/**
|
||||
* Parses the given element and returns a ThreadPoolProperties.
|
||||
* @param element dom element to parse
|
||||
* @return configuration for the thread pool
|
||||
*/
|
||||
def parseThreadPool(element: Element): ThreadPoolProperties = {
|
||||
val properties = new ThreadPoolProperties()
|
||||
properties.queue = element.getAttribute(QUEUE)
|
||||
if (element.hasAttribute(CAPACITY)) {
|
||||
properties.capacity = element.getAttribute(CAPACITY).toInt
|
||||
}
|
||||
if (element.hasAttribute(BOUND)) {
|
||||
properties.bound = element.getAttribute(BOUND).toInt
|
||||
}
|
||||
if (element.hasAttribute(FAIRNESS)) {
|
||||
properties.fairness = element.getAttribute(FAIRNESS).toBoolean
|
||||
}
|
||||
if (element.hasAttribute(CORE_POOL_SIZE)) {
|
||||
properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt
|
||||
}
|
||||
if (element.hasAttribute(MAX_POOL_SIZE)) {
|
||||
properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt
|
||||
}
|
||||
if (element.hasAttribute(KEEP_ALIVE)) {
|
||||
properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong
|
||||
}
|
||||
if (element.hasAttribute(REJECTION_POLICY)) {
|
||||
properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY)
|
||||
}
|
||||
if (element.hasAttribute(MAILBOX_CAPACITY)) {
|
||||
properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt
|
||||
}
|
||||
properties
|
||||
}
|
||||
|
||||
def hasRef(element: Element): Boolean = {
|
||||
val ref = element.getAttribute(REF)
|
||||
(ref != null) && !ref.isEmpty
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -18,3 +18,19 @@ class PropertyEntries {
|
|||
entryList.append(entry)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a property element
|
||||
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
|
||||
*/
|
||||
class PropertyEntry {
|
||||
var name: String = _
|
||||
var value: String = null
|
||||
var ref: String = null
|
||||
|
||||
|
||||
override def toString(): String = {
|
||||
format("name = %s,value = %s, ref = %s", name, value, ref)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,19 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
package se.scalablesolutions.akka.spring
|
||||
|
||||
/**
|
||||
* Represents a property element
|
||||
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
|
||||
*/
|
||||
class PropertyEntry {
|
||||
var name: String = _
|
||||
var value: String = null
|
||||
var ref: String = null
|
||||
|
||||
|
||||
override def toString(): String = {
|
||||
format("name = %s,value = %s, ref = %s", name, value, ref)
|
||||
}
|
||||
}
|
||||
|
|
@ -1,31 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
package se.scalablesolutions.akka.spring
|
||||
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
||||
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
|
||||
import org.springframework.beans.factory.xml.ParserContext
|
||||
import AkkaSpringConfigurationTags._
|
||||
import org.w3c.dom.Element
|
||||
|
||||
|
||||
/**
|
||||
* Parser for custom namespace configuration.
|
||||
* @author michaelkober
|
||||
*/
|
||||
class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
|
||||
/*
|
||||
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
|
||||
*/
|
||||
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
|
||||
val typedActorConf = parseActor(element)
|
||||
typedActorConf.typed = TYPED_ACTOR_TAG
|
||||
typedActorConf.setAsProperties(builder)
|
||||
}
|
||||
|
||||
/*
|
||||
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
|
||||
*/
|
||||
override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
|
||||
}
|
||||
|
|
@ -1,31 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
package se.scalablesolutions.akka.spring
|
||||
|
||||
import org.springframework.beans.factory.support.BeanDefinitionBuilder
|
||||
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
|
||||
import org.springframework.beans.factory.xml.ParserContext
|
||||
import AkkaSpringConfigurationTags._
|
||||
import org.w3c.dom.Element
|
||||
|
||||
|
||||
/**
|
||||
* Parser for custom namespace configuration.
|
||||
* @author michaelkober
|
||||
*/
|
||||
class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
|
||||
/*
|
||||
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
|
||||
*/
|
||||
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
|
||||
val untypedActorConf = parseActor(element)
|
||||
untypedActorConf.typed = UNTYPED_ACTOR_TAG
|
||||
untypedActorConf.setAsProperties(builder)
|
||||
}
|
||||
|
||||
/*
|
||||
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
|
||||
*/
|
||||
override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
|
||||
}
|
||||
|
|
@ -8,14 +8,12 @@ package se.scalablesolutions.akka.spring.foo;
|
|||
* To change this template use File | Settings | File Templates.
|
||||
*/
|
||||
public interface IMyPojo {
|
||||
public void oneWay(String message);
|
||||
|
||||
public String getFoo();
|
||||
|
||||
public String getBar();
|
||||
|
||||
public void preRestart();
|
||||
|
||||
public void postRestart();
|
||||
|
||||
public String longRunning();
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,34 +1,26 @@
|
|||
package se.scalablesolutions.akka.spring.foo;
|
||||
|
||||
import se.scalablesolutions.akka.actor.*;
|
||||
import se.scalablesolutions.akka.actor.TypedActor;
|
||||
|
||||
public class MyPojo extends TypedActor implements IMyPojo{
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
private String foo;
|
||||
private String bar;
|
||||
public class MyPojo extends TypedActor implements IMyPojo {
|
||||
|
||||
public static CountDownLatch latch = new CountDownLatch(1);
|
||||
public static String lastOneWayMessage = null;
|
||||
private String foo = "foo";
|
||||
|
||||
|
||||
public MyPojo() {
|
||||
this.foo = "foo";
|
||||
this.bar = "bar";
|
||||
}
|
||||
|
||||
|
||||
public String getFoo() {
|
||||
return foo;
|
||||
}
|
||||
|
||||
|
||||
public String getBar() {
|
||||
return bar;
|
||||
}
|
||||
|
||||
public void preRestart() {
|
||||
System.out.println("pre restart");
|
||||
}
|
||||
|
||||
public void postRestart() {
|
||||
System.out.println("post restart");
|
||||
public void oneWay(String message) {
|
||||
lastOneWayMessage = message;
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
public String longRunning() {
|
||||
|
|
|
|||
|
|
@ -6,6 +6,8 @@ import se.scalablesolutions.akka.actor.ActorRef;
|
|||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.ApplicationContextAware;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
|
||||
/**
|
||||
* test class
|
||||
|
|
@ -14,6 +16,9 @@ public class PingActor extends UntypedActor implements ApplicationContextAware {
|
|||
|
||||
private String stringFromVal;
|
||||
private String stringFromRef;
|
||||
public static String lastMessage = null;
|
||||
public static CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
|
||||
private boolean gotApplicationContext = false;
|
||||
|
||||
|
|
@ -42,7 +47,6 @@ public class PingActor extends UntypedActor implements ApplicationContextAware {
|
|||
stringFromRef = s;
|
||||
}
|
||||
|
||||
|
||||
private String longRunning() {
|
||||
try {
|
||||
Thread.sleep(6000);
|
||||
|
|
@ -53,12 +57,12 @@ public class PingActor extends UntypedActor implements ApplicationContextAware {
|
|||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof String) {
|
||||
System.out.println("Ping received String message: " + message);
|
||||
lastMessage = (String) message;
|
||||
if (message.equals("longRunning")) {
|
||||
System.out.println("### starting pong");
|
||||
ActorRef pongActor = UntypedActor.actorOf(PongActor.class).start();
|
||||
pongActor.sendRequestReply("longRunning", getContext());
|
||||
}
|
||||
latch.countDown();
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown message: " + message);
|
||||
}
|
||||
|
|
|
|||
57
akka-spring/src/test/resources/server-managed-config.xml
Normal file
57
akka-spring/src/test/resources/server-managed-config.xml
Normal file
|
|
@ -0,0 +1,57 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<beans xmlns="http://www.springframework.org/schema/beans"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xmlns:akka="http://www.akkasource.org/schema/akka"
|
||||
xmlns:beans="http://www.springframework.org/schema/lang"
|
||||
xsi:schemaLocation="
|
||||
http://www.springframework.org/schema/beans
|
||||
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
|
||||
http://www.akkasource.org/schema/akka
|
||||
http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
|
||||
|
||||
|
||||
<akka:untyped-actor id="client-managed-remote-untyped-actor"
|
||||
implementation="se.scalablesolutions.akka.spring.foo.PingActor">
|
||||
<akka:remote host="localhost" port="9990" managed-by="client"/>
|
||||
</akka:untyped-actor>
|
||||
|
||||
|
||||
<akka:untyped-actor id="server-managed-remote-untyped-actor"
|
||||
implementation="se.scalablesolutions.akka.spring.foo.PingActor">
|
||||
<akka:remote host="localhost" port="9990" managed-by="server"/>
|
||||
</akka:untyped-actor>
|
||||
|
||||
<akka:untyped-actor id="server-managed-remote-untyped-actor-custom-id"
|
||||
implementation="se.scalablesolutions.akka.spring.foo.PingActor">
|
||||
<akka:remote host="localhost" port="9990" service-name="ping-service"/>
|
||||
</akka:untyped-actor>
|
||||
|
||||
<akka:typed-actor id="client-managed-remote-typed-actor"
|
||||
interface="se.scalablesolutions.akka.spring.foo.IMyPojo"
|
||||
implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
|
||||
timeout="2000">
|
||||
<akka:remote host="localhost" port="9990" managed-by="client"/>
|
||||
</akka:typed-actor>
|
||||
|
||||
<akka:typed-actor id="server-managed-remote-typed-actor"
|
||||
interface="se.scalablesolutions.akka.spring.foo.IMyPojo"
|
||||
implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
|
||||
timeout="2000">
|
||||
<akka:remote host="localhost" port="9990" managed-by="server"/>
|
||||
</akka:typed-actor>
|
||||
|
||||
<akka:typed-actor id="server-managed-remote-typed-actor-custom-id"
|
||||
interface="se.scalablesolutions.akka.spring.foo.IMyPojo"
|
||||
implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
|
||||
timeout="2000">
|
||||
<akka:remote host="localhost" port="9990" service-name="mypojo-service"/>
|
||||
</akka:typed-actor>
|
||||
|
||||
<akka:actor-for id="client-1" host="localhost" port="9990" service-name="ping-service"/>
|
||||
<akka:actor-for id="typed-client-1"
|
||||
interface="se.scalablesolutions.akka.spring.foo.IMyPojo"
|
||||
host="localhost"
|
||||
port="9990"
|
||||
service-name="mypojo-service"/>
|
||||
|
||||
</beans>
|
||||
|
|
@ -37,7 +37,7 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
|
|||
implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
|
||||
timeout="2000"
|
||||
transactional="true">
|
||||
<akka:remote host="localhost" port="9999"/>
|
||||
<akka:remote host="localhost" port="9990"/>
|
||||
</akka:typed-actor>
|
||||
|
||||
<akka:typed-actor id="remote-service1"
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
|
|||
<akka:untyped-actor id="remote-untyped-actor"
|
||||
implementation="se.scalablesolutions.akka.spring.foo.PingActor"
|
||||
timeout="2000">
|
||||
<akka:remote host="localhost" port="9999"/>
|
||||
<akka:remote host="localhost" port="9992"/>
|
||||
</akka:untyped-actor>
|
||||
|
||||
<akka:untyped-actor id="untyped-actor-with-dispatcher"
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ import org.w3c.dom.Element
|
|||
class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers {
|
||||
private class Parser extends ActorParser
|
||||
|
||||
describe("An TypedActorParser") {
|
||||
describe("A TypedActorParser") {
|
||||
val parser = new Parser()
|
||||
it("should parse the typed actor configuration") {
|
||||
val xml = <akka:typed-actor id="typed-actor1"
|
||||
|
|
@ -66,6 +66,20 @@ class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers {
|
|||
assert(props != null)
|
||||
assert(props.host === "com.some.host")
|
||||
assert(props.port === 9999)
|
||||
assert(!props.serverManaged)
|
||||
}
|
||||
|
||||
it("should parse remote server managed TypedActors configuration") {
|
||||
val xml = <akka:typed-actor id="remote typed-actor" implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
|
||||
timeout="1000">
|
||||
<akka:remote host="com.some.host" port="9999" service-name="my-service"/>
|
||||
</akka:typed-actor>
|
||||
val props = parser.parseActor(dom(xml).getDocumentElement);
|
||||
assert(props != null)
|
||||
assert(props.host === "com.some.host")
|
||||
assert(props.port === 9999)
|
||||
assert(props.serviceName === "my-service")
|
||||
assert(props.serverManaged)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,10 +4,8 @@
|
|||
package se.scalablesolutions.akka.spring
|
||||
|
||||
|
||||
import foo.{IMyPojo, MyPojo}
|
||||
import foo.{PingActor, IMyPojo, MyPojo}
|
||||
import se.scalablesolutions.akka.dispatch.FutureTimeoutException
|
||||
import se.scalablesolutions.akka.remote.RemoteNode
|
||||
import org.scalatest.FeatureSpec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
|
@ -16,13 +14,52 @@ import org.springframework.beans.factory.xml.XmlBeanDefinitionReader
|
|||
import org.springframework.context.ApplicationContext
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext
|
||||
import org.springframework.core.io.{ClassPathResource, Resource}
|
||||
import org.scalatest.{BeforeAndAfterAll, FeatureSpec}
|
||||
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer, RemoteNode}
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import se.scalablesolutions.akka.actor.{TypedActor, RemoteTypedActorOne, Actor}
|
||||
import se.scalablesolutions.akka.actor.remote.RemoteTypedActorOneImpl
|
||||
|
||||
/**
|
||||
* Tests for spring configuration of typed actors.
|
||||
* @author michaelkober
|
||||
*/
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
|
||||
class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll {
|
||||
|
||||
var server1: RemoteServer = null
|
||||
var server2: RemoteServer = null
|
||||
|
||||
override def beforeAll = {
|
||||
val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed
|
||||
server1 = new RemoteServer()
|
||||
server1.start("localhost", 9990)
|
||||
server2 = new RemoteServer()
|
||||
server2.start("localhost", 9992)
|
||||
|
||||
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
|
||||
server1.registerTypedActor("typed-actor-service", typedActor)
|
||||
}
|
||||
|
||||
// make sure the servers shutdown cleanly after the test has finished
|
||||
override def afterAll = {
|
||||
try {
|
||||
server1.shutdown
|
||||
server2.shutdown
|
||||
RemoteClient.shutdownAll
|
||||
Thread.sleep(1000)
|
||||
} catch {
|
||||
case e => ()
|
||||
}
|
||||
}
|
||||
|
||||
def getTypedActorFromContext(config: String, id: String) : IMyPojo = {
|
||||
MyPojo.latch = new CountDownLatch(1)
|
||||
val context = new ClassPathXmlApplicationContext(config)
|
||||
val myPojo: IMyPojo = context.getBean(id).asInstanceOf[IMyPojo]
|
||||
myPojo
|
||||
}
|
||||
|
||||
feature("parse Spring application context") {
|
||||
|
||||
scenario("akka:typed-actor and akka:supervision and akka:dispatcher can be used as top level elements") {
|
||||
|
|
@ -37,41 +74,79 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
|
|||
}
|
||||
|
||||
scenario("get a typed actor") {
|
||||
val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
|
||||
val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo]
|
||||
var msg = myPojo.getFoo()
|
||||
msg += myPojo.getBar()
|
||||
assert(msg === "foobar")
|
||||
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor")
|
||||
assert(myPojo.getFoo() === "foo")
|
||||
myPojo.oneWay("hello 1")
|
||||
MyPojo.latch.await
|
||||
assert(MyPojo.lastOneWayMessage === "hello 1")
|
||||
}
|
||||
|
||||
scenario("FutureTimeoutException when timed out") {
|
||||
val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
|
||||
val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo]
|
||||
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor")
|
||||
evaluating {myPojo.longRunning()} should produce[FutureTimeoutException]
|
||||
|
||||
}
|
||||
|
||||
scenario("typed-actor with timeout") {
|
||||
val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
|
||||
val myPojo = context.getBean("simple-typed-actor-long-timeout").asInstanceOf[IMyPojo]
|
||||
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor-long-timeout")
|
||||
assert(myPojo.longRunning() === "this took long");
|
||||
}
|
||||
|
||||
scenario("transactional typed-actor") {
|
||||
val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
|
||||
val myPojo = context.getBean("transactional-typed-actor").asInstanceOf[IMyPojo]
|
||||
var msg = myPojo.getFoo()
|
||||
msg += myPojo.getBar()
|
||||
assert(msg === "foobar")
|
||||
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "transactional-typed-actor")
|
||||
assert(myPojo.getFoo() === "foo")
|
||||
myPojo.oneWay("hello 2")
|
||||
MyPojo.latch.await
|
||||
assert(MyPojo.lastOneWayMessage === "hello 2")
|
||||
}
|
||||
|
||||
scenario("get a remote typed-actor") {
|
||||
RemoteNode.start
|
||||
Thread.sleep(1000)
|
||||
val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
|
||||
val myPojo = context.getBean("remote-typed-actor").asInstanceOf[IMyPojo]
|
||||
assert(myPojo.getFoo === "foo")
|
||||
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "remote-typed-actor")
|
||||
assert(myPojo.getFoo() === "foo")
|
||||
myPojo.oneWay("hello 3")
|
||||
MyPojo.latch.await
|
||||
assert(MyPojo.lastOneWayMessage === "hello 3")
|
||||
}
|
||||
|
||||
scenario("get a client-managed-remote-typed-actor") {
|
||||
val myPojo = getTypedActorFromContext("/server-managed-config.xml", "client-managed-remote-typed-actor")
|
||||
assert(myPojo.getFoo() === "foo")
|
||||
myPojo.oneWay("hello client-managed-remote-typed-actor")
|
||||
MyPojo.latch.await
|
||||
assert(MyPojo.lastOneWayMessage === "hello client-managed-remote-typed-actor")
|
||||
}
|
||||
|
||||
scenario("get a server-managed-remote-typed-actor") {
|
||||
val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor")
|
||||
//
|
||||
val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], classOf[IMyPojo].getName, 5000L, "localhost", 9990)
|
||||
assert(myPojoProxy.getFoo() === "foo")
|
||||
myPojoProxy.oneWay("hello server-managed-remote-typed-actor")
|
||||
MyPojo.latch.await
|
||||
assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor")
|
||||
}
|
||||
|
||||
scenario("get a server-managed-remote-typed-actor-custom-id") {
|
||||
val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor-custom-id")
|
||||
//
|
||||
val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], "mypojo-service", 5000L, "localhost", 9990)
|
||||
assert(myPojoProxy.getFoo() === "foo")
|
||||
myPojoProxy.oneWay("hello server-managed-remote-typed-actor 2")
|
||||
MyPojo.latch.await
|
||||
assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor 2")
|
||||
}
|
||||
|
||||
scenario("get a client proxy for server-managed-remote-typed-actor") {
|
||||
MyPojo.latch = new CountDownLatch(1)
|
||||
val context = new ClassPathXmlApplicationContext("/server-managed-config.xml")
|
||||
val myPojo: IMyPojo = context.getBean("server-managed-remote-typed-actor-custom-id").asInstanceOf[IMyPojo]
|
||||
// get client proxy from spring context
|
||||
val myPojoProxy = context.getBean("typed-client-1").asInstanceOf[IMyPojo]
|
||||
assert(myPojoProxy.getFoo() === "foo")
|
||||
myPojoProxy.oneWay("hello")
|
||||
MyPojo.latch.await
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,74 +6,146 @@ package se.scalablesolutions.akka.spring
|
|||
|
||||
import foo.PingActor
|
||||
import se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
|
||||
import se.scalablesolutions.akka.remote.RemoteNode
|
||||
import se.scalablesolutions.akka.actor.ActorRef
|
||||
import org.scalatest.FeatureSpec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
import org.springframework.context.ApplicationContext
|
||||
import org.springframework.context.support.ClassPathXmlApplicationContext
|
||||
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
|
||||
import org.scalatest.{BeforeAndAfterAll, FeatureSpec}
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import se.scalablesolutions.akka.actor.{RemoteActorRef, ActorRegistry, Actor, ActorRef}
|
||||
|
||||
/**
|
||||
* Tests for spring configuration of typed actors.
|
||||
* @author michaelkober
|
||||
*/
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
|
||||
class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll {
|
||||
|
||||
var server1: RemoteServer = null
|
||||
var server2: RemoteServer = null
|
||||
|
||||
|
||||
override def beforeAll = {
|
||||
val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed
|
||||
server1 = new RemoteServer()
|
||||
server1.start("localhost", 9990)
|
||||
server2 = new RemoteServer()
|
||||
server2.start("localhost", 9992)
|
||||
}
|
||||
|
||||
// make sure the servers shutdown cleanly after the test has finished
|
||||
override def afterAll = {
|
||||
try {
|
||||
server1.shutdown
|
||||
server2.shutdown
|
||||
RemoteClient.shutdownAll
|
||||
Thread.sleep(1000)
|
||||
} catch {
|
||||
case e => ()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def getPingActorFromContext(config: String, id: String) : ActorRef = {
|
||||
PingActor.latch = new CountDownLatch(1)
|
||||
val context = new ClassPathXmlApplicationContext(config)
|
||||
val pingActor = context.getBean(id).asInstanceOf[ActorRef]
|
||||
assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
pingActor.start()
|
||||
}
|
||||
|
||||
|
||||
feature("parse Spring application context") {
|
||||
|
||||
scenario("get a untyped actor") {
|
||||
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
|
||||
val myactor = context.getBean("simple-untyped-actor").asInstanceOf[ActorRef]
|
||||
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
myactor.start()
|
||||
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor")
|
||||
myactor.sendOneWay("Hello")
|
||||
PingActor.latch.await
|
||||
assert(PingActor.lastMessage === "Hello")
|
||||
assert(myactor.isDefinedAt("some string message"))
|
||||
}
|
||||
|
||||
scenario("untyped-actor with timeout") {
|
||||
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
|
||||
val myactor = context.getBean("simple-untyped-actor-long-timeout").asInstanceOf[ActorRef]
|
||||
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
myactor.start()
|
||||
myactor.sendOneWay("Hello")
|
||||
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor-long-timeout")
|
||||
assert(myactor.getTimeout() === 10000)
|
||||
myactor.sendOneWay("Hello 2")
|
||||
PingActor.latch.await
|
||||
assert(PingActor.lastMessage === "Hello 2")
|
||||
}
|
||||
|
||||
scenario("transactional untyped-actor") {
|
||||
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
|
||||
val myactor = context.getBean("transactional-untyped-actor").asInstanceOf[ActorRef]
|
||||
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
myactor.start()
|
||||
myactor.sendOneWay("Hello")
|
||||
assert(myactor.isDefinedAt("some string message"))
|
||||
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "transactional-untyped-actor")
|
||||
myactor.sendOneWay("Hello 3")
|
||||
PingActor.latch.await
|
||||
assert(PingActor.lastMessage === "Hello 3")
|
||||
}
|
||||
|
||||
scenario("get a remote typed-actor") {
|
||||
RemoteNode.start
|
||||
Thread.sleep(1000)
|
||||
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
|
||||
val myactor = context.getBean("remote-untyped-actor").asInstanceOf[ActorRef]
|
||||
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
myactor.start()
|
||||
myactor.sendOneWay("Hello")
|
||||
assert(myactor.isDefinedAt("some string message"))
|
||||
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "remote-untyped-actor")
|
||||
myactor.sendOneWay("Hello 4")
|
||||
assert(myactor.getRemoteAddress().isDefined)
|
||||
assert(myactor.getRemoteAddress().get.getHostName() === "localhost")
|
||||
assert(myactor.getRemoteAddress().get.getPort() === 9999)
|
||||
assert(myactor.getRemoteAddress().get.getPort() === 9992)
|
||||
PingActor.latch.await
|
||||
assert(PingActor.lastMessage === "Hello 4")
|
||||
}
|
||||
|
||||
scenario("untyped-actor with custom dispatcher") {
|
||||
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
|
||||
val myactor = context.getBean("untyped-actor-with-dispatcher").asInstanceOf[ActorRef]
|
||||
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
myactor.start()
|
||||
myactor.sendOneWay("Hello")
|
||||
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "untyped-actor-with-dispatcher")
|
||||
assert(myactor.getTimeout() === 1000)
|
||||
assert(myactor.getDispatcher.isInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher])
|
||||
myactor.sendOneWay("Hello 5")
|
||||
PingActor.latch.await
|
||||
assert(PingActor.lastMessage === "Hello 5")
|
||||
}
|
||||
|
||||
scenario("create client managed remote untyped-actor") {
|
||||
val myactor = getPingActorFromContext("/server-managed-config.xml", "client-managed-remote-untyped-actor")
|
||||
myactor.sendOneWay("Hello client managed remote untyped-actor")
|
||||
PingActor.latch.await
|
||||
assert(PingActor.lastMessage === "Hello client managed remote untyped-actor")
|
||||
assert(myactor.getRemoteAddress().isDefined)
|
||||
assert(myactor.getRemoteAddress().get.getHostName() === "localhost")
|
||||
assert(myactor.getRemoteAddress().get.getPort() === 9990)
|
||||
}
|
||||
|
||||
scenario("create server managed remote untyped-actor") {
|
||||
val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor")
|
||||
val nrOfActors = ActorRegistry.actors.length
|
||||
val actorRef = RemoteClient.actorFor("se.scalablesolutions.akka.spring.foo.PingActor", "localhost", 9990)
|
||||
actorRef.sendOneWay("Hello server managed remote untyped-actor")
|
||||
PingActor.latch.await
|
||||
assert(PingActor.lastMessage === "Hello server managed remote untyped-actor")
|
||||
assert(ActorRegistry.actors.length === nrOfActors)
|
||||
}
|
||||
|
||||
scenario("create server managed remote untyped-actor with custom service id") {
|
||||
val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor-custom-id")
|
||||
val nrOfActors = ActorRegistry.actors.length
|
||||
val actorRef = RemoteClient.actorFor("ping-service", "localhost", 9990)
|
||||
actorRef.sendOneWay("Hello server managed remote untyped-actor")
|
||||
PingActor.latch.await
|
||||
assert(PingActor.lastMessage === "Hello server managed remote untyped-actor")
|
||||
assert(ActorRegistry.actors.length === nrOfActors)
|
||||
}
|
||||
|
||||
scenario("get client actor for server managed remote untyped-actor") {
|
||||
PingActor.latch = new CountDownLatch(1)
|
||||
val context = new ClassPathXmlApplicationContext("/server-managed-config.xml")
|
||||
val pingActor = context.getBean("server-managed-remote-untyped-actor-custom-id").asInstanceOf[ActorRef]
|
||||
assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
|
||||
pingActor.start()
|
||||
val nrOfActors = ActorRegistry.actors.length
|
||||
// get client actor ref from spring context
|
||||
val actorRef = context.getBean("client-1").asInstanceOf[ActorRef]
|
||||
assert(actorRef.isInstanceOf[RemoteActorRef])
|
||||
actorRef.sendOneWay("Hello")
|
||||
PingActor.latch.await
|
||||
assert(ActorRegistry.actors.length === nrOfActors)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -389,7 +389,8 @@ object TypedActor extends Logging {
|
|||
typedActor.initialize(proxy)
|
||||
if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get
|
||||
if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef)
|
||||
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, None, config.timeout))
|
||||
if (config._host.isDefined) actorRef.makeRemote(config._host.get)
|
||||
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout))
|
||||
actorRef.start
|
||||
proxy.asInstanceOf[T]
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
<system id="akka">
|
||||
<package name="se.scalablesolutions.akka.actor">
|
||||
<aspect class="TypedActorAspect" />
|
||||
<aspect class="ServerManagedTypedActorAspect" />
|
||||
</package>
|
||||
</system>
|
||||
</aspectwerkz>
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue