Merge branch 'ticket194'

This commit is contained in:
Michael Kober 2010-09-13 18:42:17 +02:00
commit efd3287a0d
34 changed files with 1078 additions and 421 deletions

View file

@ -1344,17 +1344,18 @@ object RemoteActorSystemMessage {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] case class RemoteActorRef private[akka] (
uuuid: String,
classOrServiceName: String,
val className: String,
val hostname: String,
val port: Int,
_timeout: Long,
loader: Option[ClassLoader])
loader: Option[ClassLoader],
val actorType: ActorType = ActorType.ScalaActor)
extends ActorRef with ScalaActorRef {
ensureRemotingEnabled
_uuid = uuuid
id = classOrServiceName
timeout = _timeout
start
@ -1362,7 +1363,7 @@ private[akka] case class RemoteActorRef private[akka] (
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
RemoteClientModule.send[Any](
message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType)
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
@ -1370,7 +1371,7 @@ private[akka] case class RemoteActorRef private[akka] (
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = RemoteClientModule.send[T](
message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor)
message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}

View file

@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.BeanProperty
import se.scalablesolutions.akka.actor._
/**
* Atomic remote request/reply message id generator.
@ -76,8 +77,6 @@ object RemoteClient extends Logging {
private val remoteClients = new HashMap[String, RemoteClient]
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
// FIXME: simplify overloaded methods when we have Scala 2.8
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None)
@ -99,6 +98,27 @@ object RemoteClient extends Logging {
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
RemoteActorRef(serviceId, className, hostname, port, timeout, None)
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int) : T = {
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, 5000L, hostname, port, None)
}
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int) : T = {
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None)
}
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = {
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader))
}
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = {
typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
}
private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]) : T = {
val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, ActorType.TypedActor)
TypedActor.createProxyForRemoteActorRef(intfClass, actorRef)
}
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader))

View file

@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap}
import se.scalablesolutions.akka.actor.{
Actor, TypedActor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, RemoteActorSystemMessage}
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
@ -133,8 +133,8 @@ object RemoteServer {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
}
private[akka] def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(name, typedActor)
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
}
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
@ -271,12 +271,28 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
// TODO: register typed actor in RemoteServer as well
/**
* Register typed actor by interface name.
*/
def registerTypedActor(intfClass: Class[_], typedActor: AnyRef) : Unit = registerTypedActor(intfClass.getName, typedActor)
/**
* Register Remote Actor by the Actor's 'uuid' field. It starts the Actor if it is not started already.
* Register remote typed actor by a specific id.
* @param id custom actor id
* @param typedActor typed actor to register
*/
def register(actorRef: ActorRef): Unit = register(actorRef.id,actorRef)
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
if (!typedActors.contains(id)) {
log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port)
typedActors.put(id, typedActor)
}
}
/**
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
*/
def register(actorRef: ActorRef): Unit = register(actorRef.id, actorRef)
/**
* Register Remote Actor by a specific 'id' passed as argument.
@ -321,16 +337,25 @@ class RemoteServer extends Logging with ListenerManagement {
}
}
/**
* Unregister Remote Typed Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterTypedActor(id: String):Unit = synchronized {
if (_isRunning) {
log.info("Unregistering server side remote typed actor with id [%s]", id)
val registeredTypedActors = typedActors()
registeredTypedActors.remove(id)
}
}
protected override def manageLifeCycleOfListeners = false
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = {
RemoteServer.actorsFor(address).actors
}
private[akka] def typedActors() : ConcurrentHashMap[String, AnyRef] = {
RemoteServer.actorsFor(address).typedActors
}
private[akka] def actors() = RemoteServer.actorsFor(address).actors
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
}
object RemoteServerSslContext {
@ -530,6 +555,32 @@ class RemoteServerHandler(
}
}
/**
* Find a registered actor by ID (default) or UUID.
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
*/
private def findActorByIdOrUuid(id: String, uuid: String) : ActorRef = {
val registeredActors = server.actors()
var actorRefOrNull = registeredActors get id
if (actorRefOrNull eq null) {
actorRefOrNull = registeredActors get uuid
}
actorRefOrNull
}
/**
* Find a registered typed actor by ID (default) or UUID.
* Actors are registered by id apart from registering during serialization see SerializationProtocol.
*/
private def findTypedActorByIdOrUUid(id: String, uuid: String) : AnyRef = {
val registeredActors = server.typedActors()
var actorRefOrNull = registeredActors get id
if (actorRefOrNull eq null) {
actorRefOrNull = registeredActors get uuid
}
actorRefOrNull
}
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
*
@ -538,12 +589,14 @@ class RemoteServerHandler(
* Does not start the actor.
*/
private def createActor(actorInfo: ActorInfoProtocol): ActorRef = {
val uuid = actorInfo.getUuid
val ids = actorInfo.getUuid.split(':')
val uuid = ids(0)
val id = ids(1)
val name = actorInfo.getTarget
val timeout = actorInfo.getTimeout
val registeredActors = server.actors()
val actorRefOrNull = registeredActors get uuid
val actorRefOrNull = findActorByIdOrUuid(id, uuid)
if (actorRefOrNull eq null) {
try {
@ -552,9 +605,10 @@ class RemoteServerHandler(
else Class.forName(name)
val actorRef = Actor.actorOf(clazz.newInstance.asInstanceOf[Actor])
actorRef.uuid = uuid
actorRef.id = id
actorRef.timeout = timeout
actorRef.remoteAddress = None
registeredActors.put(uuid, actorRef)
server.actors.put(id, actorRef) // register by id
actorRef
} catch {
case e =>
@ -566,9 +620,11 @@ class RemoteServerHandler(
}
private def createTypedActor(actorInfo: ActorInfoProtocol): AnyRef = {
val uuid = actorInfo.getUuid
val registeredTypedActors = server.typedActors()
val typedActorOrNull = registeredTypedActors get uuid
val ids = actorInfo.getUuid.split(':')
val uuid = ids(0)
val id = ids(1)
val typedActorOrNull = findTypedActorByIdOrUUid(id, uuid)
if (typedActorOrNull eq null) {
val typedActorInfo = actorInfo.getTypedActorInfo
@ -585,7 +641,7 @@ class RemoteServerHandler(
val newInstance = TypedActor.newInstance(
interfaceClass, targetClass.asInstanceOf[Class[_ <: TypedActor]], actorInfo.getTimeout).asInstanceOf[AnyRef]
registeredTypedActors.put(uuid, newInstance)
server.typedActors.put(id, newInstance) // register by id
newInstance
} catch {
case e =>

View file

@ -230,7 +230,7 @@ object RemoteActorSerialization {
}
RemoteActorRefProtocol.newBuilder
.setUuid(uuid)
.setUuid(uuid + ":" + id)
.setActorClassname(actorClass.getName)
.setHomeAddress(AddressProtocol.newBuilder.setHostname(host).setPort(port).build)
.setTimeout(timeout)
@ -248,7 +248,7 @@ object RemoteActorSerialization {
import actorRef._
val actorInfoBuilder = ActorInfoProtocol.newBuilder
.setUuid(uuid)
.setUuid(uuid + ":" + actorRef.id)
.setTarget(actorClassName)
.setTimeout(timeout)

View file

@ -2,6 +2,7 @@
<system id="akka">
<package name="se.scalablesolutions.akka.actor">
<aspect class="TypedActorAspect" />
<aspect class="ServerManagedTypedActorAspect" />
</package>
</system>
</aspectwerkz>

View file

@ -93,6 +93,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
actor.stop
}
@Test
def shouldSendOneWayAndReceiveReply = {
val actor = actorOf[SendOneWayAndReplyReceiverActor]
@ -103,7 +104,7 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendTo = actor
sender.start
sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendOff
assert(SendOneWayAndReplySenderActor.latch.await(1, TimeUnit.SECONDS))
assert(SendOneWayAndReplySenderActor.latch.await(3, TimeUnit.SECONDS))
assert(sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.isDefined === true)
assert("World" === sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.get.asInstanceOf[String])
actor.stop
@ -134,6 +135,6 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
actor.stop
}
}
}

View file

@ -4,10 +4,7 @@
package se.scalablesolutions.akka.actor.remote
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
@ -19,6 +16,7 @@ import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, BlockingQueue}
import org.scalatest.{BeforeAndAfterEach, Spec, Assertions, BeforeAndAfterAll}
object RemoteTypedActorSpec {
val HOSTNAME = "localhost"
@ -40,7 +38,7 @@ object RemoteTypedActorLog {
class RemoteTypedActorSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
BeforeAndAfterEach with BeforeAndAfterAll {
import RemoteTypedActorLog._
import RemoteTypedActorSpec._
@ -82,6 +80,10 @@ class RemoteTypedActorSpec extends
ActorRegistry.shutdownAll
}
override def afterEach() {
server.typedActors.clear
}
describe("Remote Typed Actor ") {
it("should receive one-way message") {

View file

@ -79,6 +79,7 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
}
}
@Test
def shouldSendWithBang {
val actor = RemoteClient.actorFor(
@ -153,10 +154,29 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
server.register(actorOf[RemoteActorSpecActorUnidirectional])
val actor = RemoteClient.actorFor("se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT)
val numberOfActorsInRegistry = ActorRegistry.actors.length
val result = actor ! "OneWay"
actor ! "OneWay"
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
actor.stop
}
@Test
def shouldUseServiceNameAsIdForRemoteActorRef {
server.register(actorOf[RemoteActorSpecActorUnidirectional])
server.register("my-service", actorOf[RemoteActorSpecActorUnidirectional])
val actor1 = RemoteClient.actorFor("se.scalablesolutions.akka.actor.remote.ServerInitiatedRemoteActorSpec$RemoteActorSpecActorUnidirectional", HOSTNAME, PORT)
val actor2 = RemoteClient.actorFor("my-service", HOSTNAME, PORT)
val actor3 = RemoteClient.actorFor("my-service", HOSTNAME, PORT)
actor1 ! "OneWay"
actor2 ! "OneWay"
actor3 ! "OneWay"
assert(actor1.uuid != actor2.uuid)
assert(actor1.uuid != actor3.uuid)
assert(actor1.id != actor2.id)
assert(actor2.id == actor3.id)
}
}

View file

@ -0,0 +1,112 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.actor.remote
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
import se.scalablesolutions.akka.actor._
import RemoteTypedActorLog._
object ServerInitiatedRemoteTypedActorSpec {
val HOSTNAME = "localhost"
val PORT = 9990
var server: RemoteServer = null
}
@RunWith(classOf[JUnitRunner])
class ServerInitiatedRemoteTypedActorSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
import ServerInitiatedRemoteTypedActorSpec._
private val unit = TimeUnit.MILLISECONDS
override def beforeAll = {
server = new RemoteServer()
server.start(HOSTNAME, PORT)
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
server.registerTypedActor("typed-actor-service", typedActor)
Thread.sleep(1000)
}
// make sure the servers shutdown cleanly after the test has finished
override def afterAll = {
try {
server.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
} catch {
case e => ()
}
}
describe("Server managed remote typed Actor ") {
it("should receive one-way message") {
clearMessageLogs
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
}
it("should respond to request-reply message") {
clearMessageLogs
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
expect("pong") {
actor.requestReply("ping")
}
}
it("should not recreate registered actors") {
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
val numberOfActorsInRegistry = ActorRegistry.actors.length
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
}
it("should support multiple variants to get the actor from client side") {
var actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", HOSTNAME, PORT)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT, this.getClass().getClassLoader)
expect("oneway") {
actor.oneWay
oneWayLog.poll(5, TimeUnit.SECONDS)
}
}
it("should register and unregister typed actors") {
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
server.registerTypedActor("my-test-service", typedActor)
assert(server.typedActors().get("my-test-service") != null)
server.unregisterTypedActor("my-test-service")
assert(server.typedActors().get("my-test-service") == null)
}
}
}

View file

@ -64,6 +64,14 @@
</xsd:restriction>
</xsd:simpleType>
<!-- management type for remote actors: client managed / server managed -->
<xsd:simpleType name="managed-by-type">
<xsd:restriction base="xsd:token">
<xsd:enumeration value="client"/>
<xsd:enumeration value="server"/>
</xsd:restriction>
</xsd:simpleType>
<!-- dispatcher type -->
<xsd:complexType name="dispatcher-type">
@ -105,6 +113,20 @@
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="managed-by" type="managed-by-type">
<xsd:annotation>
<xsd:documentation>
Management type for remote actors: client managed or server managed.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="service-name" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Custom service name for server managed actor.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- typed actor -->
@ -133,7 +155,7 @@
<xsd:attribute name="timeout" type="xsd:long" use="required">
<xsd:annotation>
<xsd:documentation>
Theh default timeout for '!!' invocations.
The default timeout for '!!' invocations.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
@ -227,6 +249,41 @@
</xsd:choice>
</xsd:complexType>
<!-- actor-for -->
<!-- typed actor -->
<xsd:complexType name="actor-for-type">
<xsd:attribute name="id" type="xsd:ID"/>
<xsd:attribute name="host" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Name of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="port" type="xsd:integer" use="required">
<xsd:annotation>
<xsd:documentation>
Port of the remote host.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="service-name" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
Custom service name or class name for the server managed actor.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attribute name="interface" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Name of the interface the typed actor implements.
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
</xsd:complexType>
<!-- Supervisor strategy -->
<xsd:complexType name="strategy-type">
<xsd:sequence>
@ -292,4 +349,7 @@
<!-- CamelService -->
<xsd:element name="camel-service" type="camel-service-type"/>
<!-- ActorFor -->
<xsd:element name="actor-for" type="actor-for-type"/>
</xsd:schema>

View file

@ -0,0 +1,72 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
import org.springframework.beans.factory.xml.ParserContext
import AkkaSpringConfigurationTags._
import org.w3c.dom.Element
/**
* Parser for custom namespace configuration.
* @author michaelkober
*/
class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
*/
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
val typedActorConf = parseActor(element)
typedActorConf.typed = TYPED_ACTOR_TAG
typedActorConf.setAsProperties(builder)
}
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
*/
override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
}
/**
* Parser for custom namespace configuration.
* @author michaelkober
*/
class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
*/
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
val untypedActorConf = parseActor(element)
untypedActorConf.typed = UNTYPED_ACTOR_TAG
untypedActorConf.setAsProperties(builder)
}
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
*/
override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
}
/**
* Parser for custom namespace configuration.
* @author michaelkober
*/
class ActorForBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorForParser {
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
*/
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
val actorForConf = parseActorFor(element)
actorForConf.setAsProperties(builder)
}
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
*/
override def getBeanClass(element: Element): Class[_] = classOf[ActorForFactoryBean]
}

View file

@ -4,22 +4,19 @@
package se.scalablesolutions.akka.spring
import java.beans.PropertyDescriptor
import java.lang.reflect.Method
import javax.annotation.PreDestroy
import javax.annotation.PostConstruct
import org.springframework.beans.{BeanUtils,BeansException,BeanWrapper,BeanWrapperImpl}
import org.springframework.beans.factory.BeanFactory
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
//import org.springframework.beans.factory.BeanFactory
import org.springframework.beans.factory.config.AbstractFactoryBean
import org.springframework.context.{ApplicationContext,ApplicationContextAware}
import org.springframework.util.ReflectionUtils
//import org.springframework.util.ReflectionUtils
import org.springframework.util.StringUtils
import se.scalablesolutions.akka.actor.{ActorRef, AspectInitRegistry, TypedActorConfiguration, TypedActor,Actor}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
import se.scalablesolutions.akka.util.{Logging, Duration}
import scala.reflect.BeanProperty
import java.net.InetSocketAddress
/**
* Exception to use when something goes wrong during bean creation.
@ -49,6 +46,8 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
@BeanProperty var transactional: Boolean = false
@BeanProperty var host: String = ""
@BeanProperty var port: Int = _
@BeanProperty var serverManaged: Boolean = false
@BeanProperty var serviceName: String = ""
@BeanProperty var lifecycle: String = ""
@BeanProperty var dispatcher: DispatcherProperties = _
@BeanProperty var scope: String = VAL_SCOPE_SINGLETON
@ -94,7 +93,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
if (implementation == null || implementation == "") throw new AkkaBeansException(
"The 'implementation' part of the 'akka:typed-actor' element in the Spring config file can't be null or empty string")
TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig)
val typedActor: AnyRef = TypedActor.newInstance(interface.toClass, implementation.toClass, createConfig)
if (isRemote && serverManaged) {
val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port))
if (serviceName.isEmpty) {
server.registerTypedActor(interface, typedActor)
} else {
server.registerTypedActor(serviceName, typedActor)
}
}
typedActor
}
/**
@ -111,7 +119,16 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
actorRef.makeTransactionRequired
}
if (isRemote) {
actorRef.makeRemote(host, port)
if (serverManaged) {
val server = RemoteServer.getOrCreateServer(new InetSocketAddress(host, port))
if (serviceName.isEmpty) {
server.register(actorRef)
} else {
server.register(serviceName, actorRef)
}
} else {
actorRef.makeRemote(host, port)
}
}
if (hasDispatcher) {
if (dispatcher.dispatcherType != THREAD_BASED){
@ -159,7 +176,7 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
private[akka] def createConfig: TypedActorConfiguration = {
val config = new TypedActorConfiguration().timeout(Duration(timeout, "millis"))
if (transactional) config.makeTransactionRequired
if (isRemote) config.makeRemote(host, port)
if (isRemote && !serverManaged) config.makeRemote(host, port)
if (hasDispatcher) {
if (dispatcher.dispatcherType != THREAD_BASED) {
config.dispatcher(dispatcherInstance())
@ -191,3 +208,39 @@ class ActorFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with App
}
}
}
/**
* Factory bean for remote client actor-for.
*
* @author michaelkober
*/
class ActorForFactoryBean extends AbstractFactoryBean[AnyRef] with Logging with ApplicationContextAware {
import StringReflect._
import AkkaSpringConfigurationTags._
@BeanProperty var interface: String = ""
@BeanProperty var host: String = ""
@BeanProperty var port: Int = _
@BeanProperty var serviceName: String = ""
//@BeanProperty var scope: String = VAL_SCOPE_SINGLETON
@BeanProperty var applicationContext: ApplicationContext = _
override def isSingleton = false
/*
* @see org.springframework.beans.factory.FactoryBean#getObjectType()
*/
def getObjectType: Class[AnyRef] = classOf[AnyRef]
/*
* @see org.springframework.beans.factory.config.AbstractFactoryBean#createInstance()
*/
def createInstance: AnyRef = {
if (interface.isEmpty) {
RemoteClient.actorFor(serviceName, host, port)
} else {
RemoteClient.typedActorFor(interface.toClass, serviceName, host, port)
}
}
}

View file

@ -6,6 +6,7 @@ package se.scalablesolutions.akka.spring
import org.springframework.util.xml.DomUtils
import org.w3c.dom.Element
import scala.collection.JavaConversions._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.actor.IllegalActorStateException
@ -27,11 +28,17 @@ trait ActorParser extends BeanParser with DispatcherParser {
val objectProperties = new ActorProperties()
val remoteElement = DomUtils.getChildElementByTagName(element, REMOTE_TAG);
val dispatcherElement = DomUtils.getChildElementByTagName(element, DISPATCHER_TAG)
val propertyEntries = DomUtils.getChildElementsByTagName(element,PROPERTYENTRY_TAG)
val propertyEntries = DomUtils.getChildElementsByTagName(element, PROPERTYENTRY_TAG)
if (remoteElement != null) {
objectProperties.host = mandatory(remoteElement, HOST)
objectProperties.port = mandatory(remoteElement, PORT).toInt
objectProperties.serverManaged = (remoteElement.getAttribute(MANAGED_BY) != null) && (remoteElement.getAttribute(MANAGED_BY).equals(SERVER_MANAGED))
val serviceName = remoteElement.getAttribute(SERVICE_NAME)
if ((serviceName != null) && (!serviceName.isEmpty)) {
objectProperties.serviceName = serviceName
objectProperties.serverManaged = true
}
}
if (dispatcherElement != null) {
@ -43,7 +50,7 @@ trait ActorParser extends BeanParser with DispatcherParser {
val entry = new PropertyEntry
entry.name = element.getAttribute("name");
entry.value = element.getAttribute("value")
entry.ref = element.getAttribute("ref")
entry.ref = element.getAttribute("ref")
objectProperties.propertyEntries.add(entry)
}
@ -59,15 +66,13 @@ trait ActorParser extends BeanParser with DispatcherParser {
objectProperties.target = mandatory(element, IMPLEMENTATION)
objectProperties.transactional = if (element.getAttribute(TRANSACTIONAL).isEmpty) false else element.getAttribute(TRANSACTIONAL).toBoolean
if (!element.getAttribute(INTERFACE).isEmpty) {
if (element.hasAttribute(INTERFACE)) {
objectProperties.interface = element.getAttribute(INTERFACE)
}
if (!element.getAttribute(LIFECYCLE).isEmpty) {
if (element.hasAttribute(LIFECYCLE)) {
objectProperties.lifecycle = element.getAttribute(LIFECYCLE)
}
if (!element.getAttribute(SCOPE).isEmpty) {
if (element.hasAttribute(SCOPE)) {
objectProperties.scope = element.getAttribute(SCOPE)
}
@ -75,3 +80,158 @@ trait ActorParser extends BeanParser with DispatcherParser {
}
}
/**
* Parser trait for custom namespace configuration for RemoteClient actor-for.
* @author michaelkober
*/
trait ActorForParser extends BeanParser {
import AkkaSpringConfigurationTags._
/**
* Parses the given element and returns a ActorForProperties.
* @param element dom element to parse
* @return configuration for the typed actor
*/
def parseActorFor(element: Element): ActorForProperties = {
val objectProperties = new ActorForProperties()
objectProperties.host = mandatory(element, HOST)
objectProperties.port = mandatory(element, PORT).toInt
objectProperties.serviceName = mandatory(element, SERVICE_NAME)
if (element.hasAttribute(INTERFACE)) {
objectProperties.interface = element.getAttribute(INTERFACE)
}
objectProperties
}
}
/**
* Base trait with utility methods for bean parsing.
*/
trait BeanParser extends Logging {
/**
* Get a mandatory element attribute.
* @param element the element with the mandatory attribute
* @param attribute name of the mandatory attribute
*/
def mandatory(element: Element, attribute: String): String = {
if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) {
throw new IllegalArgumentException("Mandatory attribute missing: " + attribute)
} else {
element.getAttribute(attribute)
}
}
/**
* Get a mandatory child element.
* @param element the parent element
* @param childName name of the mandatory child element
*/
def mandatoryElement(element: Element, childName: String): Element = {
val childElement = DomUtils.getChildElementByTagName(element, childName);
if (childElement == null) {
throw new IllegalArgumentException("Mandatory element missing: '<akka:" + childName + ">'")
} else {
childElement
}
}
}
/**
* Parser trait for custom namespace for Akka dispatcher configuration.
* @author michaelkober
*/
trait DispatcherParser extends BeanParser {
import AkkaSpringConfigurationTags._
/**
* Parses the given element and returns a DispatcherProperties.
* @param element dom element to parse
* @return configuration for the dispatcher
*/
def parseDispatcher(element: Element): DispatcherProperties = {
val properties = new DispatcherProperties()
var dispatcherElement = element
if (hasRef(element)) {
val ref = element.getAttribute(REF)
dispatcherElement = element.getOwnerDocument.getElementById(ref)
if (dispatcherElement == null) {
throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'")
}
}
properties.dispatcherType = mandatory(dispatcherElement, TYPE)
if (properties.dispatcherType == THREAD_BASED) {
val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil
if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) {
throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!")
}
}
if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher
properties.name = dispatcherElement.getAttribute(NAME)
if (dispatcherElement.hasAttribute(AGGREGATE)) {
properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean
}
} else {
properties.name = mandatory(dispatcherElement, NAME)
}
val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG);
if (threadPoolElement != null) {
if (properties.dispatcherType == THREAD_BASED) {
throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.")
}
val threadPoolProperties = parseThreadPool(threadPoolElement)
properties.threadPool = threadPoolProperties
}
properties
}
/**
* Parses the given element and returns a ThreadPoolProperties.
* @param element dom element to parse
* @return configuration for the thread pool
*/
def parseThreadPool(element: Element): ThreadPoolProperties = {
val properties = new ThreadPoolProperties()
properties.queue = element.getAttribute(QUEUE)
if (element.hasAttribute(CAPACITY)) {
properties.capacity = element.getAttribute(CAPACITY).toInt
}
if (element.hasAttribute(BOUND)) {
properties.bound = element.getAttribute(BOUND).toInt
}
if (element.hasAttribute(FAIRNESS)) {
properties.fairness = element.getAttribute(FAIRNESS).toBoolean
}
if (element.hasAttribute(CORE_POOL_SIZE)) {
properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt
}
if (element.hasAttribute(MAX_POOL_SIZE)) {
properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt
}
if (element.hasAttribute(KEEP_ALIVE)) {
properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong
}
if (element.hasAttribute(REJECTION_POLICY)) {
properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY)
}
if (element.hasAttribute(MAILBOX_CAPACITY)) {
properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt
}
properties
}
def hasRef(element: Element): Boolean = {
val ref = element.getAttribute(REF)
(ref != null) && !ref.isEmpty
}
}

View file

@ -8,7 +8,7 @@ import org.springframework.beans.factory.support.BeanDefinitionBuilder
import AkkaSpringConfigurationTags._
/**
* Data container for typed actor configuration data.
* Data container for actor configuration data.
* @author michaelkober
* @author Martin Krasser
*/
@ -20,6 +20,8 @@ class ActorProperties {
var transactional: Boolean = false
var host: String = ""
var port: Int = _
var serverManaged: Boolean = false
var serviceName: String = ""
var lifecycle: String = ""
var scope:String = VAL_SCOPE_SINGLETON
var dispatcher: DispatcherProperties = _
@ -34,6 +36,8 @@ class ActorProperties {
builder.addPropertyValue("typed", typed)
builder.addPropertyValue(HOST, host)
builder.addPropertyValue(PORT, port)
builder.addPropertyValue("serverManaged", serverManaged)
builder.addPropertyValue("serviceName", serviceName)
builder.addPropertyValue(TIMEOUT, timeout)
builder.addPropertyValue(IMPLEMENTATION, target)
builder.addPropertyValue(INTERFACE, interface)
@ -45,3 +49,26 @@ class ActorProperties {
}
}
/**
* Data container for actor configuration data.
* @author michaelkober
*/
class ActorForProperties {
var interface: String = ""
var host: String = ""
var port: Int = _
var serviceName: String = ""
/**
* Sets the properties to the given builder.
* @param builder bean definition builder
*/
def setAsProperties(builder: BeanDefinitionBuilder) {
builder.addPropertyValue(HOST, host)
builder.addPropertyValue(PORT, port)
builder.addPropertyValue("serviceName", serviceName)
builder.addPropertyValue(INTERFACE, interface)
}
}

View file

@ -12,10 +12,11 @@ import AkkaSpringConfigurationTags._
*/
class AkkaNamespaceHandler extends NamespaceHandlerSupport {
def init = {
registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser());
registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser());
registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser());
registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser());
registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser);
registerBeanDefinitionParser(TYPED_ACTOR_TAG, new TypedActorBeanDefinitionParser())
registerBeanDefinitionParser(UNTYPED_ACTOR_TAG, new UntypedActorBeanDefinitionParser())
registerBeanDefinitionParser(SUPERVISION_TAG, new SupervisionBeanDefinitionParser())
registerBeanDefinitionParser(DISPATCHER_TAG, new DispatcherBeanDefinitionParser())
registerBeanDefinitionParser(CAMEL_SERVICE_TAG, new CamelServiceBeanDefinitionParser)
registerBeanDefinitionParser(ACTOR_FOR_TAG, new ActorForBeanDefinitionParser());
}
}

View file

@ -19,6 +19,7 @@ object AkkaSpringConfigurationTags {
val DISPATCHER_TAG = "dispatcher"
val PROPERTYENTRY_TAG = "property"
val CAMEL_SERVICE_TAG = "camel-service"
val ACTOR_FOR_TAG = "actor-for"
// actor sub tags
val REMOTE_TAG = "remote"
@ -45,6 +46,8 @@ object AkkaSpringConfigurationTags {
val TRANSACTIONAL = "transactional"
val HOST = "host"
val PORT = "port"
val MANAGED_BY = "managed-by"
val SERVICE_NAME = "service-name"
val LIFECYCLE = "lifecycle"
val SCOPE = "scope"
@ -101,4 +104,8 @@ object AkkaSpringConfigurationTags {
val THREAD_BASED = "thread-based"
val HAWT = "hawt"
// managed by types
val SERVER_MANAGED = "server"
val CLIENT_MANAGED = "client"
}

View file

@ -1,42 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import se.scalablesolutions.akka.util.Logging
import org.w3c.dom.Element
import org.springframework.util.xml.DomUtils
/**
* Base trait with utility methods for bean parsing.
*/
trait BeanParser extends Logging {
/**
* Get a mandatory element attribute.
* @param element the element with the mandatory attribute
* @param attribute name of the mandatory attribute
*/
def mandatory(element: Element, attribute: String): String = {
if ((element.getAttribute(attribute) == null) || (element.getAttribute(attribute).isEmpty)) {
throw new IllegalArgumentException("Mandatory attribute missing: " + attribute)
} else {
element.getAttribute(attribute)
}
}
/**
* Get a mandatory child element.
* @param element the parent element
* @param childName name of the mandatory child element
*/
def mandatoryElement(element: Element, childName: String): Element = {
val childElement = DomUtils.getChildElementByTagName(element, childName);
if (childElement == null) {
throw new IllegalArgumentException("Mandatory element missing: '<akka:" + childName + ">'")
} else {
childElement
}
}
}

View file

@ -1,100 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.w3c.dom.Element
import org.springframework.util.xml.DomUtils
/**
* Parser trait for custom namespace for Akka dispatcher configuration.
* @author michaelkober
*/
trait DispatcherParser extends BeanParser {
import AkkaSpringConfigurationTags._
/**
* Parses the given element and returns a DispatcherProperties.
* @param element dom element to parse
* @return configuration for the dispatcher
*/
def parseDispatcher(element: Element): DispatcherProperties = {
val properties = new DispatcherProperties()
var dispatcherElement = element
if (hasRef(element)) {
val ref = element.getAttribute(REF)
dispatcherElement = element.getOwnerDocument.getElementById(ref)
if (dispatcherElement == null) {
throw new IllegalArgumentException("Referenced dispatcher not found: '" + ref + "'")
}
}
properties.dispatcherType = mandatory(dispatcherElement, TYPE)
if (properties.dispatcherType == THREAD_BASED) {
val allowedParentNodes = "akka:typed-actor" :: "akka:untyped-actor" :: "typed-actor" :: "untyped-actor" :: Nil
if (!allowedParentNodes.contains(dispatcherElement.getParentNode.getNodeName)) {
throw new IllegalArgumentException("Thread based dispatcher must be nested in 'typed-actor' or 'untyped-actor' element!")
}
}
if (properties.dispatcherType == HAWT) { // no name for HawtDispatcher
properties.name = dispatcherElement.getAttribute(NAME)
if (dispatcherElement.hasAttribute(AGGREGATE)) {
properties.aggregate = dispatcherElement.getAttribute(AGGREGATE).toBoolean
}
} else {
properties.name = mandatory(dispatcherElement, NAME)
}
val threadPoolElement = DomUtils.getChildElementByTagName(dispatcherElement, THREAD_POOL_TAG);
if (threadPoolElement != null) {
if (properties.dispatcherType == THREAD_BASED) {
throw new IllegalArgumentException("Element 'thread-pool' not allowed for this dispatcher type.")
}
val threadPoolProperties = parseThreadPool(threadPoolElement)
properties.threadPool = threadPoolProperties
}
properties
}
/**
* Parses the given element and returns a ThreadPoolProperties.
* @param element dom element to parse
* @return configuration for the thread pool
*/
def parseThreadPool(element: Element): ThreadPoolProperties = {
val properties = new ThreadPoolProperties()
properties.queue = element.getAttribute(QUEUE)
if (element.hasAttribute(CAPACITY)) {
properties.capacity = element.getAttribute(CAPACITY).toInt
}
if (element.hasAttribute(BOUND)) {
properties.bound = element.getAttribute(BOUND).toInt
}
if (element.hasAttribute(FAIRNESS)) {
properties.fairness = element.getAttribute(FAIRNESS).toBoolean
}
if (element.hasAttribute(CORE_POOL_SIZE)) {
properties.corePoolSize = element.getAttribute(CORE_POOL_SIZE).toInt
}
if (element.hasAttribute(MAX_POOL_SIZE)) {
properties.maxPoolSize = element.getAttribute(MAX_POOL_SIZE).toInt
}
if (element.hasAttribute(KEEP_ALIVE)) {
properties.keepAlive = element.getAttribute(KEEP_ALIVE).toLong
}
if (element.hasAttribute(REJECTION_POLICY)) {
properties.rejectionPolicy = element.getAttribute(REJECTION_POLICY)
}
if (element.hasAttribute(MAILBOX_CAPACITY)) {
properties.mailboxCapacity = element.getAttribute(MAILBOX_CAPACITY).toInt
}
properties
}
def hasRef(element: Element): Boolean = {
val ref = element.getAttribute(REF)
(ref != null) && !ref.isEmpty
}
}

View file

@ -18,3 +18,19 @@ class PropertyEntries {
entryList.append(entry)
}
}
/**
* Represents a property element
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
*/
class PropertyEntry {
var name: String = _
var value: String = null
var ref: String = null
override def toString(): String = {
format("name = %s,value = %s, ref = %s", name, value, ref)
}
}

View file

@ -1,19 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
/**
* Represents a property element
* @author <a href="johan.rask@jayway.com">Johan Rask</a>
*/
class PropertyEntry {
var name: String = _
var value: String = null
var ref: String = null
override def toString(): String = {
format("name = %s,value = %s, ref = %s", name, value, ref)
}
}

View file

@ -1,31 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
import org.springframework.beans.factory.xml.ParserContext
import AkkaSpringConfigurationTags._
import org.w3c.dom.Element
/**
* Parser for custom namespace configuration.
* @author michaelkober
*/
class TypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
*/
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
val typedActorConf = parseActor(element)
typedActorConf.typed = TYPED_ACTOR_TAG
typedActorConf.setAsProperties(builder)
}
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
*/
override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
}

View file

@ -1,31 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.spring
import org.springframework.beans.factory.support.BeanDefinitionBuilder
import org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser
import org.springframework.beans.factory.xml.ParserContext
import AkkaSpringConfigurationTags._
import org.w3c.dom.Element
/**
* Parser for custom namespace configuration.
* @author michaelkober
*/
class UntypedActorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser with ActorParser {
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#doParse(org.w3c.dom.Element, org.springframework.beans.factory.xml.ParserContext, org.springframework.beans.factory.support.BeanDefinitionBuilder)
*/
override def doParse(element: Element, parserContext: ParserContext, builder: BeanDefinitionBuilder) {
val untypedActorConf = parseActor(element)
untypedActorConf.typed = UNTYPED_ACTOR_TAG
untypedActorConf.setAsProperties(builder)
}
/*
* @see org.springframework.beans.factory.xml.AbstractSingleBeanDefinitionParser#getBeanClass(org.w3c.dom.Element)
*/
override def getBeanClass(element: Element): Class[_] = classOf[ActorFactoryBean]
}

View file

@ -8,14 +8,12 @@ package se.scalablesolutions.akka.spring.foo;
* To change this template use File | Settings | File Templates.
*/
public interface IMyPojo {
public void oneWay(String message);
public String getFoo();
public String getBar();
public void preRestart();
public void postRestart();
public String longRunning();
}

View file

@ -1,42 +1,34 @@
package se.scalablesolutions.akka.spring.foo;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.actor.TypedActor;
public class MyPojo extends TypedActor implements IMyPojo{
import java.util.concurrent.CountDownLatch;
private String foo;
private String bar;
public class MyPojo extends TypedActor implements IMyPojo {
public static CountDownLatch latch = new CountDownLatch(1);
public static String lastOneWayMessage = null;
private String foo = "foo";
public MyPojo() {
this.foo = "foo";
this.bar = "bar";
}
public MyPojo() {
}
public String getFoo() {
return foo;
}
public String getFoo() {
return foo;
}
public void oneWay(String message) {
lastOneWayMessage = message;
latch.countDown();
}
public String getBar() {
return bar;
}
public void preRestart() {
System.out.println("pre restart");
}
public void postRestart() {
System.out.println("post restart");
}
public String longRunning() {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
}
return "this took long";
public String longRunning() {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
}
return "this took long";
}
}

View file

@ -6,6 +6,8 @@ import se.scalablesolutions.akka.actor.ActorRef;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import java.util.concurrent.CountDownLatch;
/**
* test class
@ -14,6 +16,9 @@ public class PingActor extends UntypedActor implements ApplicationContextAware {
private String stringFromVal;
private String stringFromRef;
public static String lastMessage = null;
public static CountDownLatch latch = new CountDownLatch(1);
private boolean gotApplicationContext = false;
@ -42,7 +47,6 @@ public class PingActor extends UntypedActor implements ApplicationContextAware {
stringFromRef = s;
}
private String longRunning() {
try {
Thread.sleep(6000);
@ -53,12 +57,12 @@ public class PingActor extends UntypedActor implements ApplicationContextAware {
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
System.out.println("Ping received String message: " + message);
lastMessage = (String) message;
if (message.equals("longRunning")) {
System.out.println("### starting pong");
ActorRef pongActor = UntypedActor.actorOf(PongActor.class).start();
pongActor.sendRequestReply("longRunning", getContext());
}
latch.countDown();
} else {
throw new IllegalArgumentException("Unknown message: " + message);
}

View file

@ -0,0 +1,57 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:akka="http://www.akkasource.org/schema/akka"
xmlns:beans="http://www.springframework.org/schema/lang"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
<akka:untyped-actor id="client-managed-remote-untyped-actor"
implementation="se.scalablesolutions.akka.spring.foo.PingActor">
<akka:remote host="localhost" port="9990" managed-by="client"/>
</akka:untyped-actor>
<akka:untyped-actor id="server-managed-remote-untyped-actor"
implementation="se.scalablesolutions.akka.spring.foo.PingActor">
<akka:remote host="localhost" port="9990" managed-by="server"/>
</akka:untyped-actor>
<akka:untyped-actor id="server-managed-remote-untyped-actor-custom-id"
implementation="se.scalablesolutions.akka.spring.foo.PingActor">
<akka:remote host="localhost" port="9990" service-name="ping-service"/>
</akka:untyped-actor>
<akka:typed-actor id="client-managed-remote-typed-actor"
interface="se.scalablesolutions.akka.spring.foo.IMyPojo"
implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000">
<akka:remote host="localhost" port="9990" managed-by="client"/>
</akka:typed-actor>
<akka:typed-actor id="server-managed-remote-typed-actor"
interface="se.scalablesolutions.akka.spring.foo.IMyPojo"
implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000">
<akka:remote host="localhost" port="9990" managed-by="server"/>
</akka:typed-actor>
<akka:typed-actor id="server-managed-remote-typed-actor-custom-id"
interface="se.scalablesolutions.akka.spring.foo.IMyPojo"
implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000">
<akka:remote host="localhost" port="9990" service-name="mypojo-service"/>
</akka:typed-actor>
<akka:actor-for id="client-1" host="localhost" port="9990" service-name="ping-service"/>
<akka:actor-for id="typed-client-1"
interface="se.scalablesolutions.akka.spring.foo.IMyPojo"
host="localhost"
port="9990"
service-name="mypojo-service"/>
</beans>

View file

@ -37,7 +37,7 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="2000"
transactional="true">
<akka:remote host="localhost" port="9999"/>
<akka:remote host="localhost" port="9990"/>
</akka:typed-actor>
<akka:typed-actor id="remote-service1"

View file

@ -24,7 +24,7 @@ http://scalablesolutions.se/akka/akka-1.0-SNAPSHOT.xsd">
<akka:untyped-actor id="remote-untyped-actor"
implementation="se.scalablesolutions.akka.spring.foo.PingActor"
timeout="2000">
<akka:remote host="localhost" port="9999"/>
<akka:remote host="localhost" port="9992"/>
</akka:untyped-actor>
<akka:untyped-actor id="untyped-actor-with-dispatcher"

View file

@ -19,7 +19,7 @@ import org.w3c.dom.Element
class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers {
private class Parser extends ActorParser
describe("An TypedActorParser") {
describe("A TypedActorParser") {
val parser = new Parser()
it("should parse the typed actor configuration") {
val xml = <akka:typed-actor id="typed-actor1"
@ -66,6 +66,20 @@ class TypedActorBeanDefinitionParserTest extends Spec with ShouldMatchers {
assert(props != null)
assert(props.host === "com.some.host")
assert(props.port === 9999)
assert(!props.serverManaged)
}
it("should parse remote server managed TypedActors configuration") {
val xml = <akka:typed-actor id="remote typed-actor" implementation="se.scalablesolutions.akka.spring.foo.MyPojo"
timeout="1000">
<akka:remote host="com.some.host" port="9999" service-name="my-service"/>
</akka:typed-actor>
val props = parser.parseActor(dom(xml).getDocumentElement);
assert(props != null)
assert(props.host === "com.some.host")
assert(props.port === 9999)
assert(props.serviceName === "my-service")
assert(props.serverManaged)
}
}
}

View file

@ -4,10 +4,8 @@
package se.scalablesolutions.akka.spring
import foo.{IMyPojo, MyPojo}
import foo.{PingActor, IMyPojo, MyPojo}
import se.scalablesolutions.akka.dispatch.FutureTimeoutException
import se.scalablesolutions.akka.remote.RemoteNode
import org.scalatest.FeatureSpec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
@ -16,13 +14,52 @@ import org.springframework.beans.factory.xml.XmlBeanDefinitionReader
import org.springframework.context.ApplicationContext
import org.springframework.context.support.ClassPathXmlApplicationContext
import org.springframework.core.io.{ClassPathResource, Resource}
import org.scalatest.{BeforeAndAfterAll, FeatureSpec}
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer, RemoteNode}
import java.util.concurrent.CountDownLatch
import se.scalablesolutions.akka.actor.{TypedActor, RemoteTypedActorOne, Actor}
import se.scalablesolutions.akka.actor.remote.RemoteTypedActorOneImpl
/**
* Tests for spring configuration of typed actors.
* @author michaelkober
*/
@RunWith(classOf[JUnitRunner])
class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll {
var server1: RemoteServer = null
var server2: RemoteServer = null
override def beforeAll = {
val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed
server1 = new RemoteServer()
server1.start("localhost", 9990)
server2 = new RemoteServer()
server2.start("localhost", 9992)
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
server1.registerTypedActor("typed-actor-service", typedActor)
}
// make sure the servers shutdown cleanly after the test has finished
override def afterAll = {
try {
server1.shutdown
server2.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
} catch {
case e => ()
}
}
def getTypedActorFromContext(config: String, id: String) : IMyPojo = {
MyPojo.latch = new CountDownLatch(1)
val context = new ClassPathXmlApplicationContext(config)
val myPojo: IMyPojo = context.getBean(id).asInstanceOf[IMyPojo]
myPojo
}
feature("parse Spring application context") {
scenario("akka:typed-actor and akka:supervision and akka:dispatcher can be used as top level elements") {
@ -37,41 +74,79 @@ class TypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
}
scenario("get a typed actor") {
val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo]
var msg = myPojo.getFoo()
msg += myPojo.getBar()
assert(msg === "foobar")
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor")
assert(myPojo.getFoo() === "foo")
myPojo.oneWay("hello 1")
MyPojo.latch.await
assert(MyPojo.lastOneWayMessage === "hello 1")
}
scenario("FutureTimeoutException when timed out") {
val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
val myPojo = context.getBean("simple-typed-actor").asInstanceOf[IMyPojo]
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor")
evaluating {myPojo.longRunning()} should produce[FutureTimeoutException]
}
scenario("typed-actor with timeout") {
val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
val myPojo = context.getBean("simple-typed-actor-long-timeout").asInstanceOf[IMyPojo]
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "simple-typed-actor-long-timeout")
assert(myPojo.longRunning() === "this took long");
}
scenario("transactional typed-actor") {
val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
val myPojo = context.getBean("transactional-typed-actor").asInstanceOf[IMyPojo]
var msg = myPojo.getFoo()
msg += myPojo.getBar()
assert(msg === "foobar")
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "transactional-typed-actor")
assert(myPojo.getFoo() === "foo")
myPojo.oneWay("hello 2")
MyPojo.latch.await
assert(MyPojo.lastOneWayMessage === "hello 2")
}
scenario("get a remote typed-actor") {
RemoteNode.start
Thread.sleep(1000)
val context = new ClassPathXmlApplicationContext("/typed-actor-config.xml")
val myPojo = context.getBean("remote-typed-actor").asInstanceOf[IMyPojo]
assert(myPojo.getFoo === "foo")
val myPojo = getTypedActorFromContext("/typed-actor-config.xml", "remote-typed-actor")
assert(myPojo.getFoo() === "foo")
myPojo.oneWay("hello 3")
MyPojo.latch.await
assert(MyPojo.lastOneWayMessage === "hello 3")
}
scenario("get a client-managed-remote-typed-actor") {
val myPojo = getTypedActorFromContext("/server-managed-config.xml", "client-managed-remote-typed-actor")
assert(myPojo.getFoo() === "foo")
myPojo.oneWay("hello client-managed-remote-typed-actor")
MyPojo.latch.await
assert(MyPojo.lastOneWayMessage === "hello client-managed-remote-typed-actor")
}
scenario("get a server-managed-remote-typed-actor") {
val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor")
//
val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], classOf[IMyPojo].getName, 5000L, "localhost", 9990)
assert(myPojoProxy.getFoo() === "foo")
myPojoProxy.oneWay("hello server-managed-remote-typed-actor")
MyPojo.latch.await
assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor")
}
scenario("get a server-managed-remote-typed-actor-custom-id") {
val serverPojo = getTypedActorFromContext("/server-managed-config.xml", "server-managed-remote-typed-actor-custom-id")
//
val myPojoProxy = RemoteClient.typedActorFor(classOf[IMyPojo], "mypojo-service", 5000L, "localhost", 9990)
assert(myPojoProxy.getFoo() === "foo")
myPojoProxy.oneWay("hello server-managed-remote-typed-actor 2")
MyPojo.latch.await
assert(MyPojo.lastOneWayMessage === "hello server-managed-remote-typed-actor 2")
}
scenario("get a client proxy for server-managed-remote-typed-actor") {
MyPojo.latch = new CountDownLatch(1)
val context = new ClassPathXmlApplicationContext("/server-managed-config.xml")
val myPojo: IMyPojo = context.getBean("server-managed-remote-typed-actor-custom-id").asInstanceOf[IMyPojo]
// get client proxy from spring context
val myPojoProxy = context.getBean("typed-client-1").asInstanceOf[IMyPojo]
assert(myPojoProxy.getFoo() === "foo")
myPojoProxy.oneWay("hello")
MyPojo.latch.await
}
}
}

View file

@ -6,74 +6,146 @@ package se.scalablesolutions.akka.spring
import foo.PingActor
import se.scalablesolutions.akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher
import se.scalablesolutions.akka.remote.RemoteNode
import se.scalablesolutions.akka.actor.ActorRef
import org.scalatest.FeatureSpec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import org.springframework.context.ApplicationContext
import org.springframework.context.support.ClassPathXmlApplicationContext
import se.scalablesolutions.akka.remote.{RemoteClient, RemoteServer}
import org.scalatest.{BeforeAndAfterAll, FeatureSpec}
import java.util.concurrent.CountDownLatch
import se.scalablesolutions.akka.actor.{RemoteActorRef, ActorRegistry, Actor, ActorRef}
/**
* Tests for spring configuration of typed actors.
* @author michaelkober
*/
@RunWith(classOf[JUnitRunner])
class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers {
class UntypedActorSpringFeatureTest extends FeatureSpec with ShouldMatchers with BeforeAndAfterAll {
var server1: RemoteServer = null
var server2: RemoteServer = null
override def beforeAll = {
val actor = Actor.actorOf[PingActor] // FIXME: remove this line when ticket 425 is fixed
server1 = new RemoteServer()
server1.start("localhost", 9990)
server2 = new RemoteServer()
server2.start("localhost", 9992)
}
// make sure the servers shutdown cleanly after the test has finished
override def afterAll = {
try {
server1.shutdown
server2.shutdown
RemoteClient.shutdownAll
Thread.sleep(1000)
} catch {
case e => ()
}
}
def getPingActorFromContext(config: String, id: String) : ActorRef = {
PingActor.latch = new CountDownLatch(1)
val context = new ClassPathXmlApplicationContext(config)
val pingActor = context.getBean(id).asInstanceOf[ActorRef]
assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
pingActor.start()
}
feature("parse Spring application context") {
scenario("get an untyped actor") {
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
val myactor = context.getBean("simple-untyped-actor").asInstanceOf[ActorRef]
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
myactor.start()
scenario("get a untyped actor") {
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor")
myactor.sendOneWay("Hello")
PingActor.latch.await
assert(PingActor.lastMessage === "Hello")
assert(myactor.isDefinedAt("some string message"))
}
scenario("untyped-actor with timeout") {
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
val myactor = context.getBean("simple-untyped-actor-long-timeout").asInstanceOf[ActorRef]
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
myactor.start()
myactor.sendOneWay("Hello")
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "simple-untyped-actor-long-timeout")
assert(myactor.getTimeout() === 10000)
myactor.sendOneWay("Hello 2")
PingActor.latch.await
assert(PingActor.lastMessage === "Hello 2")
}
scenario("transactional untyped-actor") {
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
val myactor = context.getBean("transactional-untyped-actor").asInstanceOf[ActorRef]
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
myactor.start()
myactor.sendOneWay("Hello")
assert(myactor.isDefinedAt("some string message"))
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "transactional-untyped-actor")
myactor.sendOneWay("Hello 3")
PingActor.latch.await
assert(PingActor.lastMessage === "Hello 3")
}
scenario("get a remote typed-actor") {
RemoteNode.start
Thread.sleep(1000)
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
val myactor = context.getBean("remote-untyped-actor").asInstanceOf[ActorRef]
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
myactor.start()
myactor.sendOneWay("Hello")
assert(myactor.isDefinedAt("some string message"))
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "remote-untyped-actor")
myactor.sendOneWay("Hello 4")
assert(myactor.getRemoteAddress().isDefined)
assert(myactor.getRemoteAddress().get.getHostName() === "localhost")
assert(myactor.getRemoteAddress().get.getPort() === 9999)
assert(myactor.getRemoteAddress().get.getPort() === 9992)
PingActor.latch.await
assert(PingActor.lastMessage === "Hello 4")
}
scenario("untyped-actor with custom dispatcher") {
val context = new ClassPathXmlApplicationContext("/untyped-actor-config.xml")
val myactor = context.getBean("untyped-actor-with-dispatcher").asInstanceOf[ActorRef]
assert(myactor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
myactor.start()
myactor.sendOneWay("Hello")
val myactor = getPingActorFromContext("/untyped-actor-config.xml", "untyped-actor-with-dispatcher")
assert(myactor.getTimeout() === 1000)
assert(myactor.getDispatcher.isInstanceOf[ExecutorBasedEventDrivenWorkStealingDispatcher])
myactor.sendOneWay("Hello 5")
PingActor.latch.await
assert(PingActor.lastMessage === "Hello 5")
}
scenario("create client managed remote untyped-actor") {
val myactor = getPingActorFromContext("/server-managed-config.xml", "client-managed-remote-untyped-actor")
myactor.sendOneWay("Hello client managed remote untyped-actor")
PingActor.latch.await
assert(PingActor.lastMessage === "Hello client managed remote untyped-actor")
assert(myactor.getRemoteAddress().isDefined)
assert(myactor.getRemoteAddress().get.getHostName() === "localhost")
assert(myactor.getRemoteAddress().get.getPort() === 9990)
}
scenario("create server managed remote untyped-actor") {
val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor")
val nrOfActors = ActorRegistry.actors.length
val actorRef = RemoteClient.actorFor("se.scalablesolutions.akka.spring.foo.PingActor", "localhost", 9990)
actorRef.sendOneWay("Hello server managed remote untyped-actor")
PingActor.latch.await
assert(PingActor.lastMessage === "Hello server managed remote untyped-actor")
assert(ActorRegistry.actors.length === nrOfActors)
}
scenario("create server managed remote untyped-actor with custom service id") {
val myactor = getPingActorFromContext("/server-managed-config.xml", "server-managed-remote-untyped-actor-custom-id")
val nrOfActors = ActorRegistry.actors.length
val actorRef = RemoteClient.actorFor("ping-service", "localhost", 9990)
actorRef.sendOneWay("Hello server managed remote untyped-actor")
PingActor.latch.await
assert(PingActor.lastMessage === "Hello server managed remote untyped-actor")
assert(ActorRegistry.actors.length === nrOfActors)
}
scenario("get client actor for server managed remote untyped-actor") {
PingActor.latch = new CountDownLatch(1)
val context = new ClassPathXmlApplicationContext("/server-managed-config.xml")
val pingActor = context.getBean("server-managed-remote-untyped-actor-custom-id").asInstanceOf[ActorRef]
assert(pingActor.getActorClassName() === "se.scalablesolutions.akka.spring.foo.PingActor")
pingActor.start()
val nrOfActors = ActorRegistry.actors.length
// get client actor ref from spring context
val actorRef = context.getBean("client-1").asInstanceOf[ActorRef]
assert(actorRef.isInstanceOf[RemoteActorRef])
actorRef.sendOneWay("Hello")
PingActor.latch.await
assert(ActorRegistry.actors.length === nrOfActors)
}
}
}

View file

@ -16,9 +16,8 @@ import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
import java.net.InetSocketAddress
import java.lang.reflect.{InvocationTargetException, Method, Field}
import scala.reflect.BeanProperty
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy}
/**
* TypedActor is a type-safe actor made out of a POJO with interface.
@ -390,7 +389,8 @@ object TypedActor extends Logging {
typedActor.initialize(proxy)
if (config._messageDispatcher.isDefined) actorRef.dispatcher = config._messageDispatcher.get
if (config._threadBasedDispatcher.isDefined) actorRef.dispatcher = Dispatchers.newThreadBasedDispatcher(actorRef)
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, None, config.timeout))
if (config._host.isDefined) actorRef.makeRemote(config._host.get)
AspectInitRegistry.register(proxy, AspectInit(intfClass, typedActor, actorRef, config._host, config.timeout))
actorRef.start
proxy.asInstanceOf[T]
}
@ -408,24 +408,47 @@ object TypedActor extends Logging {
proxy.asInstanceOf[T]
}
/*
// NOTE: currently not used - but keep it around
private[akka] def newInstance[T <: TypedActor](targetClass: Class[T],
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val proxy = {
val instance = Proxy.newInstance(targetClass, true, false)
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
/**
* Create a proxy for a RemoteActorRef representing a server managed remote typed actor.
*
*/
private[akka] def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = {
class MyInvocationHandler extends InvocationHandler {
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = {
// do nothing, this is just a dummy
null
}
}
val context = injectTypedActorContext(proxy)
actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context)
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout))
actorRef.start
proxy.asInstanceOf[T]
val handler = new MyInvocationHandler()
val interfaces = Array(intfClass, classOf[ServerManagedTypedActor]).asInstanceOf[Array[java.lang.Class[_]]]
val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler)
val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false)
AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L))
awProxy.asInstanceOf[T]
}
*/
/*
// NOTE: currently not used - but keep it around
private[akka] def newInstance[T <: TypedActor](targetClass: Class[T],
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val proxy = {
val instance = Proxy.newInstance(targetClass, true, false)
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
}
val context = injectTypedActorContext(proxy)
actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context)
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout))
actorRef.start
proxy.asInstanceOf[T]
}
*/
/**
* Stops the current Typed Actor.
@ -546,6 +569,30 @@ object TypedActor extends Logging {
private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint]
}
/**
* AspectWerkz Aspect that is turning POJO into proxy to a server managed remote TypedActor.
* <p/>
* Is deployed on a 'perInstance' basis with the pointcut 'execution(* *.*(..))',
* e.g. all methods on the instance.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@Aspect("perInstance")
private[akka] sealed class ServerManagedTypedActorAspect extends ActorAspect {
@Around("execution(* *.*(..)) && this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)")
def invoke(joinPoint: JoinPoint): AnyRef = {
if (!isInitialized) initialize(joinPoint)
remoteDispatch(joinPoint)
}
override def initialize(joinPoint: JoinPoint): Unit = {
super.initialize(joinPoint)
remoteAddress = actorRef.remoteAddress
}
}
/**
* AspectWerkz Aspect that is turning POJO into TypedActor.
* <p/>
@ -555,18 +602,9 @@ object TypedActor extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@Aspect("perInstance")
private[akka] sealed class TypedActorAspect {
@volatile private var isInitialized = false
@volatile private var isStopped = false
private var interfaceClass: Class[_] = _
private var typedActor: TypedActor = _
private var actorRef: ActorRef = _
private var remoteAddress: Option[InetSocketAddress] = _
private var timeout: Long = _
private var uuid: String = _
@volatile private var instance: TypedActor = _
private[akka] sealed class TypedActorAspect extends ActorAspect {
@Around("execution(* *.*(..))")
@Around("execution(* *.*(..)) && !this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)")
def invoke(joinPoint: JoinPoint): AnyRef = {
if (!isInitialized) initialize(joinPoint)
dispatch(joinPoint)
@ -576,12 +614,26 @@ private[akka] sealed class TypedActorAspect {
if (remoteAddress.isDefined) remoteDispatch(joinPoint)
else localDispatch(joinPoint)
}
}
private def localDispatch(joinPoint: JoinPoint): AnyRef = {
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
val isOneWay = TypedActor.isOneWay(methodRtti)
/**
* Base class for TypedActorAspect and ServerManagedTypedActorAspect to reduce code duplication.
*/
private[akka] abstract class ActorAspect {
@volatile protected var isInitialized = false
@volatile protected var isStopped = false
protected var interfaceClass: Class[_] = _
protected var typedActor: TypedActor = _
protected var actorRef: ActorRef = _
protected var timeout: Long = _
protected var uuid: String = _
protected var remoteAddress: Option[InetSocketAddress] = _
protected def localDispatch(joinPoint: JoinPoint): AnyRef = {
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
val isOneWay = TypedActor.isOneWay(methodRtti)
val senderActorRef = Some(SenderContextInfo.senderActorRef.value)
val senderProxy = Some(SenderContextInfo.senderProxy.value)
val senderProxy = Some(SenderContextInfo.senderProxy.value)
typedActor.context._sender = senderProxy
if (!actorRef.isRunning && !isStopped) {
@ -602,7 +654,7 @@ private[akka] sealed class TypedActorAspect {
}
}
private def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
protected def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
val isOneWay = TypedActor.isOneWay(methodRtti)
@ -641,7 +693,7 @@ private[akka] sealed class TypedActorAspect {
(escapedArgs, isEscaped)
}
private def initialize(joinPoint: JoinPoint): Unit = {
protected def initialize(joinPoint: JoinPoint): Unit = {
val init = AspectInitRegistry.initFor(joinPoint.getThis)
interfaceClass = init.interfaceClass
typedActor = init.targetInstance
@ -653,6 +705,7 @@ private[akka] sealed class TypedActorAspect {
}
}
/**
* Internal helper class to help pass the contextual information between threads.
*
@ -704,5 +757,11 @@ private[akka] sealed case class AspectInit(
val timeout: Long) {
def this(interfaceClass: Class[_], targetInstance: TypedActor, actorRef: ActorRef, timeout: Long) =
this(interfaceClass, targetInstance, actorRef, None, timeout)
}
/**
* Marker interface for server manager typed actors.
*/
private[akka] sealed trait ServerManagedTypedActor extends TypedActor

View file

@ -122,7 +122,6 @@ private[akka] class TypedActorGuiceConfigurator extends TypedActorConfiguratorBa
remoteAddress.foreach { address =>
actorRef.makeRemote(remoteAddress.get)
RemoteServerModule.registerTypedActor(address, implementationClass.getName, proxy)
}
AspectInitRegistry.register(

View file

@ -2,6 +2,7 @@
<system id="akka">
<package name="se.scalablesolutions.akka.actor">
<aspect class="TypedActorAspect" />
<aspect class="ServerManagedTypedActorAspect" />
</package>
</system>
</aspectwerkz>